In [0]:
SPARK_VERSION = 3.2

In [0]:
%pip install git+https://github.com/awslabs/python-deequ.git

In [0]:
import os
os.environ["SPARK_VERSION"] = "3.3"
from pyspark.sql.functions import col, lit, current_timestamp, sum as _sum
from delta.tables import DeltaTable
from pydeequ.checks import Check, CheckLevel
from pydeequ.verification import VerificationSuite, VerificationResult

In [0]:
dbutils.widgets.text("arrival_date", "2024-07-06")
date_str = dbutils.widgets.get("arrival_date")                     

In [0]:
booking_data=f"/Volumes/workspace/default/scd/Bookings/bookings_{date_str}.csv"
print(booking_data)
customer_data=f"/Volumes/workspace/default/scd/Customers/customers_{date_str}.csv"
print(customer_data)

In [0]:
df1 = spark.read.csv(booking_data, header=True, inferSchema=True)
display(df1)
df2= spark.read.csv(customer_data,header=True,inferSchema=True)
display(df2)

In [0]:
booking_df = spark.read.format("csv") \
                       .option("header", "true") \
                       .option("inferSchema", "true") \
                       .option("quote", "\"") \
                       .option("multiline", "true") \
                       .load(booking_data)
booking_df.printSchema()
customer_df = spark.read.format("csv") \
                       .option("header", "true") \
                       .option("inferSchema", "true") \
                       .option("quote", "\"") \
                       .option("multiline", "true") \
                       .load(customer_data)
customer_df.printSchema()

In [0]:
# from pyspark.sql.functions import col
# def isComplete(df,colname):
#     null_count = df.filter(col(colname).isNull()).count()
#     if null_count > 0:
#         df_clean = df.filter(col(colname).isNotNull())
#         return df_clean.display()
#     else:
#         return df.display()
    
# isComplete(df1,"customer_id")

In [0]:
from pyspark.sql.functions import col
def isComplete(df,colname):
        df_clean = df.filter(col(colname).isNotNull())
        return df_clean

In [0]:
def isNonNegative(df,colname):
    df_clean = df.filter(col(colname)>0)
    return df_clean
isNonNegative(df1,"amount")
    

In [0]:
df_cleaned = isComplete(df1, "booking_id")
df_cleaned = isComplete(df_cleaned, "customer_id")
df_cleaned = isComplete(df_cleaned, "amount")
display(df_cleaned)


In [0]:
df_cleaned.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("default.cleaned_bookings")

In [0]:
current_df = spark.table("default.cleaned_bookings")

In [0]:
dbutils.widgets.text("arrival_date", "2024-07-06")
date_str = dbutils.widgets.get("arrival_date")           

In [0]:
booking_next=f"/Volumes/workspace/default/scd/Bookings/bookings_{date_str}.csv"
print(booking_next)

In [0]:
df1 = spark.read.csv(booking_next, header=True, inferSchema=True)
display(df1)

In [0]:
df_cleaned = isComplete(df1, "booking_id")
df_cleaned = isComplete(df_cleaned, "customer_id")
df_cleaned = isComplete(df_cleaned, "amount")

In [0]:
df_cleaned.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("default.new_bookings")

In [0]:
new_df = spark.table("default.new_bookings")


In [0]:
delta_current = DeltaTable.forName(spark, "default.cleaned_bookings")
delta_current.alias("target").merge(
    source=new_df.alias("source"),
    condition="target.booking_id = source.booking_id"
).whenMatchedUpdateAll() \
 .whenNotMatchedInsertAll() \
 .execute()


In [0]:
updated_df = spark.table("default.cleaned_bookings")
updated_df.display()