In [None]:
from pyspark.sql import types
from pyspark.sql import functions as F
from pyspark.sql import window

from delta.tables import *



In [None]:
%sql set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;

key,value
spark.databricks.delta.properties.defaults.enableChangeDataFeed,True


In [None]:
table = "players.csv"
full_load_path = f"s3://csgo-datalake/raw/cs/full-load/{table}/full_load.csv"
bronze_table = f'bronze.{table.split(".")[0]}Table'
inc_data = f"s3://csgo-datalake/raw/cs/inc/"
checkpoint_path = f"s3://csgo-datalake/bronze/{table}_checkpoint"
table_name = f"{table.split('.')[0]}Table"


In [None]:
id_field = ['player_id','team']
strongly_date = 'date'

In [None]:
def check_table_exist(db_tbl_name):
    table_exist = False
    try:
        spark.read.table(db_tbl_name) # Check if spark can read the table
        table_exist = True        
    except:
        pass
    return table_exist

In [None]:
if not check_table_exist(bronze_table):
    print("Creating Table")
    df = spark.read.csv(full_load_path, header=True,  inferSchema=True)
    df.write.mode("overwrite").saveAsTable(bronze_table)
    print(f"{bronze_table} criada")
else:
    print("Table already exists")

In [None]:
table_schema =df.schema
stream_schema = table_schema[:]
stream_schema = stream_schema.add('Op', data_type=types.StringType(), nullable=False, metadata={})

##Checando se os dados de streaming fazem parte da tabela antes do stream

In [None]:
bronze = spark.read.format('delta').table(bronze_table)
bronze.display()

In [None]:
display(bronze.filter(bronze.team=='Streaming'))

date,player_name,team,opponent,country,player_id,match_id,event_id,event_name,best_of,map_1,map_2,map_3,kills,assists,deaths,hs,flash_assists,kast,kddiff,adr,fkdiff,rating,m1_kills,m1_assists,m1_deaths,m1_hs,m1_flash_assists,m1_kast,m1_kddiff,m1_adr,m1_fkdiff,m1_rating,m2_kills,m2_assists,m2_deaths,m2_hs,m2_flash_assists,m2_kast,m2_kddiff,m2_adr,m2_fkdiff,m2_rating,m3_kills,m3_assists,m3_deaths,m3_hs,m3_flash_assists,m3_kast,m3_kddiff,m3_adr,m3_fkdiff,m3_rating,kills_ct,deaths_ct,kddiff_ct,adr_ct,kast_ct,rating_ct,kills_t,deaths_t,kddiff_t,adr_t,kast_t,rating_t,m1_kills_ct,m1_deaths_ct,m1_kddiff_ct,m1_adr_ct,m1_kast_ct,m1_rating_ct,m1_kills_t,m1_deaths_t,m1_kddiff_t,m1_adr_t,m1_kast_t,m1_rating_t,m2_kills_ct,m2_deaths_ct,m2_kddiff_ct,m2_adr_ct,m2_kast_ct,m2_rating_ct,m2_kills_t,m2_deaths_t,m2_kddiff_t,m2_adr_t,m2_kast_t,m2_rating_t,m3_kills_ct,m3_deaths_ct,m3_kddiff_ct,m3_adr_ct,m3_kast_ct,m3_rating_ct,m3_kills_t,m3_deaths_t,m3_kddiff_t,m3_adr_t,m3_kast_t,m3_rating_t


In [None]:
def upsert_delta(df, batchId, delta_table, id_field, strongly_date):

    join = " and ".join([f'd.{i} = c.{i}' for i in id_field]) 
    
    w = window.Window.partitionBy(*id_field).orderBy(F.desc(strongly_date))
    cdc_data = (df.withColumn('rn', F.row_number().over(w))
                  .filter('rn=1')
                  .drop(F.col('rn')))

    (delta_table.alias("d")
                .merge(cdc_data.alias("c"), join) 
                .whenMatchedDelete(condition = "c.Op = 'D'")
                .whenMatchedUpdateAll(condition = "c.Op ='U'")
                .whenNotMatchedInsertAll(condition = "c.Op = 'I'")
                .execute())

 

    return None



In [None]:
delta_table = DeltaTable.forName(spark, bronze_table)

df_stream = (spark.readStream
                  .format('cloudFiles')
                  .option('cloudFiles.format', 'csv')
                  .option('header', 'true')
                  .schema(stream_schema)
                  .load(inc_data))

stream = (df_stream.writeStream
                   .format('delta')
                   .foreachBatch(lambda df, batchId: upsert_delta(df, batchId, delta_table, id_field, strongly_date))
                   .option('checkpointLocation', checkpoint_path)
                   .start())

In [None]:
bronze = spark.read.format('delta').table(bronze_table)
display(bronze.filter(bronze.team=='Streaming'))

date,player_name,team,opponent,country,player_id,match_id,event_id,event_name,best_of,map_1,map_2,map_3,kills,assists,deaths,hs,flash_assists,kast,kddiff,adr,fkdiff,rating,m1_kills,m1_assists,m1_deaths,m1_hs,m1_flash_assists,m1_kast,m1_kddiff,m1_adr,m1_fkdiff,m1_rating,m2_kills,m2_assists,m2_deaths,m2_hs,m2_flash_assists,m2_kast,m2_kddiff,m2_adr,m2_fkdiff,m2_rating,m3_kills,m3_assists,m3_deaths,m3_hs,m3_flash_assists,m3_kast,m3_kddiff,m3_adr,m3_fkdiff,m3_rating,kills_ct,deaths_ct,kddiff_ct,adr_ct,kast_ct,rating_ct,kills_t,deaths_t,kddiff_t,adr_t,kast_t,rating_t,m1_kills_ct,m1_deaths_ct,m1_kddiff_ct,m1_adr_ct,m1_kast_ct,m1_rating_ct,m1_kills_t,m1_deaths_t,m1_kddiff_t,m1_adr_t,m1_kast_t,m1_rating_t,m2_kills_ct,m2_deaths_ct,m2_kddiff_ct,m2_adr_ct,m2_kast_ct,m2_rating_ct,m2_kills_t,m2_deaths_t,m2_kddiff_t,m2_adr_t,m2_kast_t,m2_rating_t,m3_kills_ct,m3_deaths_ct,m3_kddiff_ct,m3_adr_ct,m3_kast_ct,m3_rating_ct,m3_kills_t,m3_deaths_t,m3_kddiff_t,m3_adr_t,m3_kast_t,m3_rating_t
2022-06-18T00:00:00.000+0000,NiKo,Streaming,E-frag.net,Bosnia and Herzegovina,3741,2300764,1958,Acer Predator Masters powered by Intel Season 2 Finals,3,Cache,Cobblestone,,52,11,31,32,,,21,,1,1.53,25,4,9,15,,,16,,1,2.04,27.0,7.0,22.0,17.0,,,5.0,,0.0,1.15,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
2022-06-18T00:00:00.000+0000,EliGE,Streaming,CLG,United States,8738,2315939,3060,ECS Season 4 North America,1,Mirage,,,19,6,19,10,1.0,84.6,0,89.9,2,1.29,19,6,19,10,1.0,84.6,0,89.9,2,1.29,,,,,,,,,,,,,,,,,,,,,11.0,12.0,-1.0,75.1,93.3,1.1,8.0,7.0,1.0,110.0,72.7,1.55,11.0,12.0,-1.0,75.1,93.3,1.1,8.0,7.0,1.0,110.0,72.7,1.55,,,,,,,,,,,,,,,,,,,,,,,,
2022-06-18T00:00:00.000+0000,alter,Streaming,ex-Atmosphere,United States,14111,2323825,3688,ESEA MDL Season 28 North America,1,Dust2,,,26,2,21,19,,67.9,5,102.5,0,1.24,26,2,21,19,,67.9,5,102.5,0,1.24,,,,,,,,,,,,,,,,,,,,,13.0,10.0,3.0,109.5,73.3,1.24,13.0,11.0,2.0,94.5,61.5,1.25,13.0,10.0,3.0,109.5,73.3,1.24,13.0,11.0,2.0,94.5,61.5,1.25,,,,,,,,,,,,,,,,,,,,,,,,
2022-06-18T00:00:00.000+0000,ToM223,Streaming,AGO,Poland,15300,2318047,3354,Legend Series #4,3,Overpass,Mirage,Train,64,18,60,32,2.0,73.9,4,76.5,-1,1.09,22,8,16,12,1.0,83.3,6,74.7,1,1.28,34.0,9.0,28.0,15.0,1.0,77.5,6.0,85.9,-3.0,1.18,8.0,1.0,16.0,5.0,0.0,50.0,-8.0,58.8,1.0,0.6,34.0,26.0,8.0,86.3,76.9,1.18,30.0,34.0,-4.0,68.8,71.4,1.01,15.0,7.0,8.0,86.3,86.7,1.41,7.0,9.0,-2.0,63.1,80.0,1.15,19.0,16.0,3.0,91.0,76.2,1.18,15.0,12.0,3.0,80.2,78.9,1.18,0.0,3.0,-3.0,52.3,33.3,0.22,8.0,13.0,-5.0,60.1,53.3,0.67
2022-06-18T00:00:00.000+0000,refrezh,Streaming,FATE,Denmark,10610,2338077,5031,ESEA Advanced Season 32 Europe,3,Inferno,Dust2,,43,10,27,14,4.0,76.8,16,84.2,3,1.32,14,7,9,7,4.0,90.0,5,81.0,0,1.31,29.0,7.0,18.0,7.0,4.0,69.4,11.0,85.9,3.0,1.34,,,,,,,,,,,27.0,7.0,20.0,120.1,91.3,2.0,16.0,20.0,-4.0,59.1,66.7,0.84,5.0,2.0,3.0,88.6,100.0,1.6,9.0,7.0,2.0,78.5,86.7,1.21,22.0,5.0,17.0,128.8,88.9,2.11,7.0,13.0,-6.0,43.0,50.0,0.56,,,,,,,,,,,,
2022-06-18T00:00:00.000+0000,ad3m,Streaming,fightclub,Sweden,13657,2325578,3918,MSI MGA 2018 Europe Closed Qualifier,1,Cache,,,19,4,15,5,2.0,68.2,4,81.4,2,1.15,19,4,15,5,2.0,68.2,4,81.4,2,1.15,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,


In [None]:
%sql SELECT * FROM table_changes('bronze.playerstable', 0) where player_name == 'try'

date,player_name,team,opponent,country,player_id,match_id,event_id,event_name,best_of,map_1,map_2,map_3,kills,assists,deaths,hs,flash_assists,kast,kddiff,adr,fkdiff,rating,m1_kills,m1_assists,m1_deaths,m1_hs,m1_flash_assists,m1_kast,m1_kddiff,m1_adr,m1_fkdiff,m1_rating,m2_kills,m2_assists,m2_deaths,m2_hs,m2_flash_assists,m2_kast,m2_kddiff,m2_adr,m2_fkdiff,m2_rating,m3_kills,m3_assists,m3_deaths,m3_hs,m3_flash_assists,m3_kast,m3_kddiff,m3_adr,m3_fkdiff,m3_rating,kills_ct,deaths_ct,kddiff_ct,adr_ct,kast_ct,rating_ct,kills_t,deaths_t,kddiff_t,adr_t,kast_t,rating_t,m1_kills_ct,m1_deaths_ct,m1_kddiff_ct,m1_adr_ct,m1_kast_ct,m1_rating_ct,m1_kills_t,m1_deaths_t,m1_kddiff_t,m1_adr_t,m1_kast_t,m1_rating_t,m2_kills_ct,m2_deaths_ct,m2_kddiff_ct,m2_adr_ct,m2_kast_ct,m2_rating_ct,m2_kills_t,m2_deaths_t,m2_kddiff_t,m2_adr_t,m2_kast_t,m2_rating_t,m3_kills_ct,m3_deaths_ct,m3_kddiff_ct,m3_adr_ct,m3_kast_ct,m3_rating_ct,m3_kills_t,m3_deaths_t,m3_kddiff_t,m3_adr_t,m3_kast_t,m3_rating_t,_change_type,_commit_version,_commit_timestamp
2019-12-21T00:00:00.000+0000,try,Evolve,NOORG,Argentina,19869,2338684,5075,WESG 2019 LATAM South,3,Dust2,Overpass,Mirage,46,18,49,18,3.0,75.4,-3,79.6,-5,1.04,16,7,15,7,1.0,81.8,1,79.5,0,1.14,11.0,5.0,18.0,6.0,1.0,68.4,-7.0,62.1,-3.0,0.77,19.0,6.0,16.0,5.0,1.0,75.0,3.0,93.7,-2.0,1.18,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,update_preimage,1,2022-06-22T02:31:33.000+0000
2022-06-18T00:00:00.000+0000,try,Evolve,Complexity,United States,19869,2323722,3721,ZOTAC Cup Masters 2018 North America Closed Qualifier,3,Inferno,Mirage,Nuke,44,6,49,19,1.0,63.5,-5,79.9,10,1.04,23,4,14,8,1.0,82.6,9,94.2,5,1.59,10.0,1.0,18.0,5.0,0.0,61.9,-8.0,60.1,2.0,0.73,11.0,1.0,17.0,6.0,0.0,42.1,-6.0,84.3,3.0,0.8,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,update_postimage,1,2022-06-22T02:31:33.000+0000
2020-02-27T00:00:00.000+0000,try,9z,Duck Nutz,Argentina,19869,2339811,5233,Aorus League 2020 #1 Southern Cone,1,Inferno,,,20,9,33,9,3.0,75.6,-13,61.2,-3,0.86,20,9,33,9,3.0,75.6,-13,61.2,-3,0.86,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,insert,0,2022-06-22T01:52:20.000+0000
2019-12-21T00:00:00.000+0000,try,Evolve,NOORG,Argentina,19869,2338684,5075,WESG 2019 LATAM South,3,Dust2,Overpass,Mirage,46,18,49,18,3.0,75.4,-3,79.6,-5,1.04,16,7,15,7,1.0,81.8,1,79.5,0,1.14,11.0,5.0,18.0,6.0,1.0,68.4,-7.0,62.1,-3.0,0.77,19.0,6.0,16.0,5.0,1.0,75.0,3.0,93.7,-2.0,1.18,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,insert,0,2022-06-22T01:52:20.000+0000
