In [0]:
spark.sql("create volume if not exists csv_files")
#this will create volume in default catalog, schema/db example workspace(catalog)/default(schema/db)/csv_files(volume)\

#after creating volume now you can add file to this volume by following steps
#NEw->add or upload data->upload files to a volume -> select file same window below select volume by selecting your catalog/schema or db/volume upload once upload done you can see the files volume.

# to ensure run below code
dbutils.fs.ls("/Volumes/workspace/default/csv_files/")

In [0]:
from pyspark.sql.functions import *
from delta.tables import DeltaTable

df_elec = spark.read.csv("/Volumes/workspace/default/csv_files/*.csv",header=True,inferSchema=True)

In [0]:
%sql
--taking count before merge 
select count(1) from tryout1.bronze.stg_Electric_Vehicle_Population_Data
union all
select count(1) from tryout1.silver.Electric_Vehicle_Population_Data;

In [0]:
clnd_col = [i.replace(" ","_").replace("-","_").replace("(","").replace(")","") for i in df_elec.columns]
df_elec = df_elec.toDF(*clnd_col)
df_elec = df_elec.withColumn("Created_at", current_timestamp() ).withColumn("Modified_at", lit(None).cast("timestamp"))
df_elec.write.format("delta").mode("append").saveAsTable("tryout1.bronze.stg_Electric_Vehicle_Population_Data")

In [0]:
read_tbl_src = spark.read.table("tryout1.bronze.stg_Electric_Vehicle_Population_Data")

In [0]:
#to test merge adding incremental data
# incr_data = spark.sql("""select concat(VIN_1_10, '1') as VIN_1_10, concat(County, 'tk') as County, concat(City, 'tk') as City, concat(State, 'tk') as State,Postal_Code, Model_Year, Make, Model, Electric_Vehicle_Type,Clean_Alternative_Fuel_Vehicle_CAFV_Eligibility, Electric_Range,Base_MSRP,Legislative_District,DOL_Vehicle_ID+1::int as DOL_Vehicle_ID,Vehicle_Location,Electric_Utility,2020_Census_Tract,now() as Created_at, null as Modified_at from tryout1.bronze.stg_Electric_Vehicle_Population_Data where DOL_Vehicle_ID in (347724772,272165288,203182584) limit 3""")

# read_tbl_src = read_tbl_src.union(incr_data)

In [0]:
table_name = "tryout1.silver.Electric_Vehicle_Population_Data"

src_cols = set(read_tbl_src.columns)
update_expr = {col:f"src.{col}" for col in list(src_cols) if col not in ["Created_at","Modified_at"]}
insert_expr = {col:f"src.{col}" for col in list(src_cols)}
update_expr["Modified_at"]= "current_timestamp()"

merge_condition = f"{' OR '.join([f'src.{col} != tgt.{col}' for col in src_cols if col not in ['Created_at', 'Modified_at']])}"
print(merge_condition)

if not spark.catalog.tableExists(table_name):
    read_tbl_src.write.format("delta").mode("append").saveAsTable(table_name)
elif spark.catalog.tableExists(table_name):
    tgt_cols = set(read_tbl_src.columns)
    if src_cols != tgt_cols:
        print(f"Column mismatch: source={src_cols}, target={tgt_cols}")
        (
        read_tbl_src.alias("src")
        .write
        .format("delta")
        .mode("append")
        .option("mergeSchema", "true")
        .saveAsTable(table_name)
        )
    target_data = DeltaTable.forName(spark,table_name)
    target_data.alias("tgt").merge(
        read_tbl_src.alias("src"),"src.DOL_Vehicle_ID = tgt.DOL_Vehicle_ID"
        ).whenMatchedUpdate(
            condition=merge_condition,
            set=update_expr
            ).whenNotMatchedInsert(values=insert_expr).execute()

In [0]:
%sql
--taking count after merge 
select count(1) from tryout1.bronze.stg_Electric_Vehicle_Population_Data
union all
select count(1) from tryout1.silver.Electric_Vehicle_Population_Data;

In [0]:
%sql
-- drop table if exists tryout1.silver.Electric_Vehicle_Population_Data;
-- drop table if exists tryout1.bronze.stg_Electric_Vehicle_Population_Data;