In [0]:
from pyspark.sql.functions import *
from delta.tables import DeltaTable

 # File location and type
file_location = "/FileStore/tables/*.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df_ele = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

# display(df_ele)


In [0]:
new_col = [i.replace(" ", "_").replace("(", "").replace(")", "").replace("-", "_") for i in df_ele.columns]

df_ele = df_ele.toDF(*new_col)

df_ele.write.format("delta").mode("append").saveAsTable("databricks_demo.ElectricVehicle_Info")

In [0]:

transfrm_tbl = spark.read.table("databricks_demo.ElectricVehicle_Info").distinct().where(
    (col("County").like("%Snohomish")) &
    (col("City").like("%Everett")) &
    (col("Postal_Code") == 98204))

In [0]:
cols = transfrm_tbl.columns

update_expr = {col: f"s.{col}" for col in cols}
insert_expr = {col: f"s.{col}" for col in cols}
print(update_expr)
print(insert_expr)

target = DeltaTable.forName(spark, "databricks_demo.ElectricVehicle_Info_P98204")

target.alias("t").merge(
transfrm_tbl.alias("s"),
"t.DOL_Vehicle_ID = s.DOL_Vehicle_ID and t.VIN_1_10 = s.VIN_1_10 AND t.Model_Year = s.Model_Year and t.County = s.County AND t.City = s.City AND t.Postal_Code = s.Postal_Code"
)\
.whenMatchedUpdate(set=update_expr) \
.whenNotMatchedInsert(values=insert_expr) \
.execute()