In [1]:
from pyspark.sql import *
from pyspark.sql.functions import *
import pandas as pd
import os
import shutil

PYSPARK_PYTHON = os.getenv("PYSPARK_PYTHON") 
PYSPARK_DRIVER_PYTHON = os.getenv("PYSPARK_DRIVER_PYTHON")

In [2]:
import pyspark
from delta import configure_spark_with_delta_pip, DeltaTable
import json

# Load the configuration JSON file
with open('/usr/local/spark/conf/spark-defaults.json', 'r') as f:
    config = json.load(f)

# Initialize the Spark session builder
builder = pyspark.sql.SparkSession.builder.appName("MyApp1").config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog").config("spark.pyspark.python", PYSPARK_PYTHON)\
    .config("spark.pyspark.driver.python", PYSPARK_DRIVER_PYTHON)

# Read the packages from the text file
packages = []
with open('/usr/local/spark/conf/packages.txt', 'r') as file:
    # Read each line and strip newlines or extra spaces
    packages = [line.strip() for line in file if line.strip()]

# # Add packages to the Spark session configuration
builder.config("spark.jars.packages", ",".join(packages))

# Apply the configurations from the JSON file to the Spark session
for key, value in config.items():
    builder.config(key, value)

In [None]:

# Configure Spark with Delta Lake (if needed)
spark = configure_spark_with_delta_pip(builder).getOrCreate()
# Now you can use the Spark session
spark

In [4]:
dfconfig=pd.read_csv('/home/jovyan/Notebooks/configpath.csv')
dfpath=spark.createDataFrame(dfconfig)

In [5]:
trgt_path_processed = dfpath.filter(col("DataFeedName") == "Calendar_Delta_Path").select('Path').collect()[0][0]
trgt_path_csv = dfpath.filter(col("DataFeedName") == "Calendar_CSV_Path").select('Path').collect()[0][0]

In [5]:
# Create a DataFrame with date range
start_date = "2000-01-01"
end_date = "2050-12-31"

In [6]:
# Create a DataFrame with a single row containing the start and end date
date_range_df = spark.createDataFrame([(start_date, end_date)], ["start_date", "end_date"])

# Generate date sequence
date_sequence_df = date_range_df.select(
    sequence(
        to_date(date_range_df.start_date).alias("start_date"),
        to_date(date_range_df.end_date).alias("end_date")
    ).alias("date")
)

In [7]:
# Explode the sequence into separate rows
df_date = date_sequence_df.selectExpr("explode(date) as date")

In [8]:
df_output = df_date.withColumn("DateSK", regexp_replace("date", "-", "")).withColumn("Year", year("date"))\
    .withColumn("Month",date_format("date","MMMM")).withColumn("Quarter",concat(year("date"), lit(" Q"), quarter("date")))

In [None]:
print(df_output.count())

In [10]:
df_output.createOrReplaceTempView("vw_source")

In [None]:
df_output.createOrReplaceTempView("vw_source")
if DeltaTable.isDeltaTable(spark, trgt_path_processed):
    column_name = df_output.columns
    set_clause = ", ".join([f"target.{i} = source.{i}" for i in column_name])
    insert_clause=",".join(column_name)
    insert_values=",".join([f"source.{i}" for i in column_name])
    query = f"""MERGE INTO delta.`{trgt_path_processed}` AS target 
            USING vw_source AS source 
            ON target.DateSK = source.DateSK 
            AND target.RowSK <> source.RowSK 
            WHEN MATCHED THEN UPDATE SET {set_clause}
            WHEN NOT MATCHED THEN INSERT ({insert_clause}) VALUES ({insert_values})"""
else :
    query=f"CREATE TABLE delta.`{trgt_path_processed}` USING DELTA AS SELECT * FROM vw_source"
print(query)
spark.sql(query)

In [12]:
# Save the DataFrame to a CSV file
spark.read.format("delta").load(trgt_path_processed) \
    .coalesce(1) \
    .write.format("csv") \
    .option("header", "true") \
    .mode("overwrite") \
    .save(trgt_path_csv)

In [None]:
trgt_copy_path = trgt_path_csv + "processed.csv"
files=os.listdir(trgt_path_csv)
selected_files = [file for file in files if file.startswith('part-00') and file.endswith('.csv')]
file=trgt_path_csv + selected_files[0]
print(selected_files)
shutil.copy(file, trgt_copy_path)

In [None]:
delete_log = [file for file in files if "processed.csv" != file ]
for file in delete_log :
    os.remove(trgt_path_csv + file)
    print(f"removed {trgt_path_csv + file}")

In [None]:
# spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", False)
# # Create DeltaTable object
# delta_table = DeltaTable.forPath(spark, trgt_path_processed)

# # Vacuum the table
# delta_table.vacuum(0)  # Retain 0 hours to clean up all unreferenced files

In [6]:
spark.stop()