<span style="color:red; font-family:Helvetica Neue, Helvetica, Arial, sans-serif; font-size:2em;">An Exception was encountered at '<a href="#papermill-error-cell">In [12]</a>'.</span>

In [1]:
from pyspark.sql import *
from pyspark.sql.functions import *
import pandas as pd
import os
import shutil

PYSPARK_PYTHON = os.getenv("PYSPARK_PYTHON") 
PYSPARK_DRIVER_PYTHON = os.getenv("PYSPARK_DRIVER_PYTHON")

In [2]:
import pyspark
from delta import configure_spark_with_delta_pip, DeltaTable
import json

# Load the configuration JSON file
with open('/usr/local/spark/conf/spark-defaults.json', 'r') as f:
    config = json.load(f)

# Initialize the Spark session builder
builder = pyspark.sql.SparkSession.builder.appName("MyApp1").config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog").config("spark.pyspark.python", PYSPARK_PYTHON)\
    .config("spark.pyspark.driver.python", PYSPARK_DRIVER_PYTHON)

# Read the packages from the text file
packages = []
with open('/usr/local/spark/conf/packages.txt', 'r') as file:
    # Read each line and strip newlines or extra spaces
    packages = [line.strip() for line in file if line.strip()]

# # Add packages to the Spark session configuration
builder.config("spark.jars.packages", ",".join(packages))

# Apply the configurations from the JSON file to the Spark session
for key, value in config.items():
    builder.config(key, value)

In [3]:
# Spark will automatically use the master specified in spark-defaults.conf
spark = configure_spark_with_delta_pip(builder).getOrCreate()

spark

In [4]:
dfconfig=pd.read_csv('/home/jovyan/Notebooks/configpath.csv')
dfpath=spark.createDataFrame(dfconfig)

In [5]:
trgt_path_processed = dfpath.filter(col("DataFeedName") == "Fact_Delta_Path").select('Path').collect()[0][0]
trgt_path_csv = dfpath.filter(col("DataFeedName") == "Fact_CSV_Path").select('Path').collect()[0][0]
source_path = dfpath.filter(col("DataFeedName") == "FactFile").select('Path').collect()[0][0]
sheet_name =dfpath.filter(col("DataFeedName") == "sheet_name").select('Path').collect()[0][0]
calendar_path =dfpath.filter(col("DataFeedName") == "Calendar_Delta_Path").select('Path').collect()[0][0]


In [6]:
pandas_df = pd.read_excel(source_path, sheet_name = sheet_name)
df=spark.createDataFrame(pandas_df)
df= df.withColumn("Date", date_format(df["Date"], "yyyy-MM-dd").cast("date"))

In [7]:
if DeltaTable.isDeltaTable(spark, trgt_path_processed):
    dffact = spark.sql(f"SELECT MAX(datesk) as max_date FROM delta.`{trgt_path_processed}`")
    # Collect the result
    max_date = dffact.withColumn("max_date",
                                    concat_ws("-",col("max_date").substr(1,4),col("max_date").substr(5,2),col("max_date").substr(7,2))
                                    .cast("date")).collect()[0]["max_date"]
    query=f"SELECT * FROM vw_src WHERE Date >= '{max_date}'"
    
else:
    query="SELECT * FROM vw_src"
print(query)

SELECT * FROM vw_src WHERE Date >= '2024-12-20'


In [8]:
df.createOrReplaceTempView("vw_src")
df_src = spark.sql(query)
df_src.show()
print(df_src.count())


+-----+----------+-------------+-----------+--------+------------+
|Index|      Date|Spending Item|Wallet used|Category|Spend Amount|
+-----+----------+-------------+-----------+--------+------------+
|   55|2024-12-20|       Swiggy|  ICICI Pay|Personal|         199|
+-----+----------+-------------+-----------+--------+------------+



1


In [9]:
df_temp =(
df_src
    .withColumnRenamed("Spending Item", "SpendingItem")
    .withColumnRenamed("Spend Amount", "SpendAmount")
    .withColumnRenamed("Wallet used", "WalletUsed")
)

In [10]:

df_output =(
    df_temp
    .withColumn("WalletSK", xxhash64("WalletUsed"))
    .withColumn("categorysk", xxhash64("category").cast("long"))
    .withColumn("DateSK", regexp_replace("date", "-", "").cast("string"))
    .withColumn("PKSK", xxhash64(concat("category", "WalletUsed", "Index", "Date")).cast("string"))
    .withColumn("UpdateTimeStamp", date_format(current_timestamp(), "yyyy-MM-dd HH:mm:ss").cast("timestamp"))
    .withColumn("RowSK", xxhash64(concat_ws("|", *[col(c) for c in df_temp.columns])))
    .drop("Index", "Date", "category","WalletUsed")
)

In [11]:
df_output.createOrReplaceTempView("vw_source")

duplicate_counts = spark.sql("""
    SELECT COUNT(PKSK) as count
    FROM vw_source
    GROUP BY PKSK
    HAVING COUNT(PKSK) > 1
""")
x = [row['count'] for row in duplicate_counts.collect()]

# Fail the code if there are duplicates
if len(x) > 0:
    raise ValueError(f"Duplicate values found :\n{duplicate_counts}")
else:
# # Proceed with the rest of the code if no duplicates
    print("No duplicates found. Continuing execution...")


No duplicates found. Continuing execution...


<span id="papermill-error-cell" style="color:red; font-family:Helvetica Neue, Helvetica, Arial, sans-serif; font-size:2em;">Execution using papermill encountered an exception here and stopped:</span>

In [12]:
if DeltaTable.isDeltaTable(spark, trgt_path_processed):
    column_name = df_output.columns
    set_clause = ", ".join([f"target.{i} = source.{i}" for i in column_name])
    insert_clause=",".join(column_name)
    insert_values=",".join([f"source.{i}" for i in column_name])
    query = f"""MERGE INTO delta.`{trgt_path_processed}` AS target 
            USING vw_source AS source 
            ON target.PKSK = source.PKSK 
            AND target.RowSK <> source.RowSK 
            WHEN MATCHED THEN UPDATE SET {set_clause}
            WHEN NOT MATCHED THEN INSERT ({insert_clause}) VALUES ({insert_values})"""
    spark.sql(query)        
    query = f"""
            DELETE FROM delta.`{trgt_path_processed}` AS target
            WHERE target.PKSK NOT IN (SELECT PKSK FROM vw_source)
            AND target.DateSK IN (
                SELECT DateSK FROM vw_source
            )
            """
    spark.sql(query)
else :
    query=f"CREATE TABLE delta.`{trgt_path_processed}` USING DELTA AS SELECT * FROM vw_source"
    spark.sql(query)

print(query)

AnalysisException: [DELTA_UNSUPPORTED_SUBQUERY] Subqueries are not supported in the DELETE (condition = ((NOT (target.PKSK IN (listquery()))) AND (target.DateSK IN (listquery())))).

In [None]:
# Save the DataFrame to a CSV file
spark.read.format("delta").load(trgt_path_processed).coalesce(1).write.format("csv").option("header","true").mode("overwrite").save(trgt_path_csv)

In [None]:
spark.read.format("csv").option("header","true").load(trgt_path_csv).count()

In [None]:
spark.sql(f"""SELECT UpdateTimeStamp FROM delta.`{trgt_path_processed}` 
          WHERE UpdateTimeStamp = (SELECT MAX(UpdateTimeStamp) FROM vw_source)""").count()

In [None]:
trgt_copy_path = trgt_path_csv + "processed.csv"
files=os.listdir(trgt_path_csv)
selected_files = [file for file in files if file.startswith('part-00') and file.endswith('.csv')]
file=trgt_path_csv + selected_files[0]
print(selected_files)
shutil.copy(file, trgt_copy_path)

In [None]:
delete_log = [file for file in files if "processed.csv" != file ]
for file in delete_log :
    os.remove(trgt_path_csv + file)
    print(f"removed {trgt_path_csv + file}")