In [0]:
spark.sql("CREATE SCHEMA IF NOT EXISTS workspace.source_table")  # Create source schema if it doesn't exist
spark.sql("CREATE SCHEMA IF NOT EXISTS workspace.target_table")

DataFrame[]

In [0]:
sales = spark.sql("SELECT * FROM samples.accuweather.forecast_daily_calendar_imperial")
sales.write.mode("overwrite").saveAsTable("workspace.source_table.sales")

In [0]:
#Load Data From Source
source = spark.read.table('workspace.source_table.sales')
source.display()

In [0]:
from pyspark.sql import functions as F

# Load Data From Source and concatenate all columns into 'ConCatValue'
source = source.withColumn('ConCatValue', F.concat_ws('', *source.columns))
display(source)


In [0]:
# Add IndCurrent, CreatedDate, and ModifiedDate columns
source = source.withColumn("IndCurrent", F.lit(1)) \
    .withColumn("CreatedDate", F.current_timestamp()) \
    .withColumn("ModifiedDate", F.current_timestamp())
source.display()

In [0]:
from pyspark.sql.window import Window

window_spec = Window.orderBy(F.monotonically_increasing_id())
source = source.withColumn("storage_id", F.row_number().over(window_spec))

first_cols = ["storage_id"]
other_cols = [col for col in source.columns if col not in first_cols]
source = source.select(first_cols + other_cols)

display(source)

In [0]:
# Generate SHA-256 hash of concatenated column values and drop 'ConCatValue'
source = source.withColumn("RowHash", F.sha2(F.col("ConCatValue"), 256)).drop('ConCatValue')
display(source)


In [0]:
#writing to the target schema  
source.write.mode("append").saveAsTable("workspace.target_table.sales")


In [0]:
# Display data from the target_table schema
target_df = spark.sql("SELECT * FROM workspace.target_table.sales")
display(target_df)

In [0]:
SourceTable='workspace.source_table.sales'
TargetTable='workspace.target_table.sales'

In [0]:
SourceDf=spark.read.table(SourceTable)  # Read source table into DataFrame
TargetDf=spark.read.table(TargetTable)  # Read target table into DataFrame


In [0]:
SourceDf.display()

In [0]:
from pyspark.sql.functions import col

# Filter the DataFrame to show only rows where 'degree_days_freezing' is '0'
# Display the filtered DataFrame for inspection
SourceDf.filter(col("degree_days_freezing") == "0").display()

In [0]:
from pyspark.sql.functions import col, when

# Update the 'city_name' column in SourceDf:
# For rows where 'degree_days_freezing' equals '0', set the 'city_name' value to 'Kolkata'.
# For all other rows, retain the original 'city_name' value.
SourceDf = SourceDf.withColumn(
    "city_name",
    when(col("degree_days_freezing") == "0", "Kolkata").otherwise(col("city_name"))
)

# Display rows where 'degree_days_freezing' is '0' to verify the 'city_name' column update.
SourceDf.filter(col("degree_days_freezing") == "0").display()

# After this update, the 'city_name' value for all rows with 'degree_days_freezing' 0 will be 'Kolkata'.
     

In [0]:
# Create a hash key by concatenating all columns into a single string column 'RowHash'
from pyspark.sql import functions as F

# Concatenate all columns in 'source' DataFrame into 'RowHash'
SourceDf = SourceDf.withColumn('RowHash', F.concat_ws('', *SourceDf.columns))

In [0]:
# Add three new columns to SourceDf:
# 1. 'IndCurrent': Set to 1 for all rows, indicating the current/active record.
# 2. 'CreatedDate': Set to the current timestamp, representing when the record was created.
# 3. 'ModifiedDate': Set to the current timestamp, representing when the record was last modified.
SourceDf = SourceDf.withColumn("IndCurrent", F.lit(1)) \
    .withColumn("CreatedDate", F.current_timestamp()) \
    .withColumn("ModifiedDate", F.current_timestamp())

In [0]:
# Add three new columns to SourceDf:
# 1. 'IndCurrent': Set to 1 for all rows, indicating the current/active record.
# 2. 'CreatedDate': Set to the current timestamp, representing when the record was created.
# 3. 'ModifiedDate': Set to the current timestamp, representing when the record was last modified.
SourceDf = SourceDf.withColumn("IndCurrent", F.lit(1)) \
    .withColumn("CreatedDate", F.current_timestamp()) \
    .withColumn("ModifiedDate", F.current_timestamp())

In [0]:
SourceDf.filter(col("degree_days_freezing") == "0").display()

In [0]:
# Before applying the SCD Type 1 merge, let's inspect the data in the target table for a specific degree_days_freezing
display(spark.sql("select * from workspace.target_table.sales where degree_days_freezing='0'"))

In [0]:
from delta.tables import DeltaTable
from pyspark.sql.functions import current_timestamp, col

# Static configuration
table_name = "workspace.target_table.sales"
key_column = "franchiseID"
timestamp_column = "ModifiedDate"
hash_column = "RowHash"
created_column = "CreatedDate"

# Reference Delta table
target_table = DeltaTable.forName(spark, table_name)

# Aliases
src = SourceDf.alias("src")
tgt = target_table.alias("tgt")

# Columns to update (exclude key, timestamp, and created date)
columns_to_update = [
    col_name for col_name in SourceDf.columns 
    if col_name not in [key_column, timestamp_column, created_column]
]

# Construct SET dictionary for update
set_dict = {col_name: col(f"src.{col_name}") for col_name in columns_to_update}
set_dict[timestamp_column] = current_timestamp()  # Add ModifiedDate explicitly

# Perform SCD Type 1 MERGE
tgt.merge(
    src,
    f"tgt.{key_column} = src.{key_column}"
).whenMatchedUpdate(
    condition=col(f"src.{hash_column}") != col(f"tgt.{hash_column}"),
    set=set_dict
).whenNotMatchedInsertAll().execute()

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-7602603880600825>, line 35[0m
[1;32m     26[0m set_dict[timestamp_column] [38;5;241m=[39m current_timestamp()  [38;5;66;03m# Add ModifiedDate explicitly[39;00m
[1;32m     28[0m [38;5;66;03m# Perform SCD Type 1 MERGE[39;00m
[1;32m     29[0m tgt[38;5;241m.[39mmerge(
[1;32m     30[0m     src,
[1;32m     31[0m     [38;5;124mf[39m[38;5;124m"[39m[38;5;124mtgt.[39m[38;5;132;01m{[39;00mkey_column[38;5;132;01m}[39;00m[38;5;124m = src.[39m[38;5;132;01m{[39;00mkey_column[38;5;132;01m}[39;00m[38;5;124m"[39m
[1;32m     32[0m )[38;5;241m.[39mwhenMatchedUpdate(
[1;32m     33[0m     condition[38;5;241m=[39mcol([38;5;124mf[39m[38;5;124m"[39m[38;5;124msrc.[39m[38;5;132;01m{[39;00mhash_column[38;5;132;01m}[39;00m[38;5;124m"[39m) [38;5;241m!=[39m col([38;