In [0]:
%pip install pandas

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
#%pip install numpy==1.22
%pip install git+https://github.com/awslabs/python-deequ.git

Collecting git+https://github.com/awslabs/python-deequ.git
  Cloning https://github.com/awslabs/python-deequ.git to /tmp/pip-req-build-610duf6g
  Running command git clone --filter=blob:none --quiet https://github.com/awslabs/python-deequ.git /tmp/pip-req-build-610duf6g
  Resolved https://github.com/awslabs/python-deequ.git to commit ca8e9e1dba9303a960411df6d58b01ef6dd87c12
  Installing build dependencies: started
  Installing build dependencies: finished with status 'done'
  Getting requirements to build wheel: started
  Getting requirements to build wheel: finished with status 'done'
  Preparing metadata (pyproject.toml): started
  Preparing metadata (pyproject.toml): finished with status 'done'
[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
%restart_python

In [0]:
import os
os.environ['SPARK_VERSION'] = "3.3"

In [0]:
from pyspark.sql.functions import col, lit, current_timestamp, sum as _sum
from delta.tables import DeltaTable
from pydeequ.checks import Check, CheckLevel
from pydeequ.verification import VerificationSuite, VerificationResult

In [0]:
import pandas as pd

In [0]:
dbutils.widgets.text("arrival_date", "YYYY-MM-DD")
date_of_arrival = dbutils.widgets.get("arrival_date")

In [0]:
booking_data = f"/Volumes/workspace/booking_and_customers/booking_volume/bookings_{date_of_arrival}.csv"
customer_data = f"/Volumes/workspace/booking_and_customers/customers_volume/customers_{date_of_arrival}.csv"
print(booking_data)
print(customer_data)

/Volumes/workspace/booking_and_customers/booking_volume/bookings_2024-07-26.csv
/Volumes/workspace/booking_and_customers/customers_volume/customers_2024-07-26.csv


In [0]:
# Read booking data
booking_df = spark.read.format("csv").option("header", "true").option("inferschema", "true").option("quote", "\"").option("multiline", "true").load(booking_data)
# Print bookingSchema
booking_df.printSchema()
display(booking_df)
# Read customer data
customer_df = spark.read.format("csv").option("header", "true").option("inferschema", "true").option("quote", "\"").option("multiline", "true").load(customer_data)
# Print customerSchema
customer_df.printSchema()
display(customer_df)

root
 |-- booking_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- booking_date: date (nullable = true)
 |-- amount: integer (nullable = true)
 |-- booking_type: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- discount: integer (nullable = true)
 |-- booking_status: string (nullable = true)
 |-- hotel_name: string (nullable = true)
 |-- flight_number: string (nullable = true)



booking_id,customer_id,booking_date,amount,booking_type,quantity,discount,booking_status,hotel_name,flight_number
1,1097,2024-01-14,272,Hotel,1,94,Pending,"Johnson, Leach and Kim",
2,1077,2023-12-04,907,Hotel,4,7,Completed,"Buckley, Wang and Shaffer",
3,1107,2024-01-17,812,Flight,1,93,Pending,,EA1978
4,1112,2024-05-17,313,Flight,2,83,Cancelled,,LL0838
5,1104,2023-08-25,688,Hotel,1,32,Completed,"Calhoun, Bauer and Jones",
6,1125,2024-04-06,271,Hotel,3,80,Pending,Lee-Erickson,
7,1128,2024-07-01,438,Hotel,3,0,Completed,"Maddox, Williams and Hess",
8,1137,2023-12-14,651,Hotel,2,92,Pending,Miller Ltd,
9,1047,2023-09-20,812,Hotel,3,40,Completed,"Turner, Adams and Fitzgerald",
10,1023,2023-09-26,127,Flight,5,75,Cancelled,,Nb3015


root
 |-- customer_id: integer (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- customer_address: string (nullable = true)
 |-- phone_number: string (nullable = true)
 |-- email: string (nullable = true)
 |-- valid_from: date (nullable = true)
 |-- valid_to: date (nullable = true)



customer_id,customer_name,customer_address,phone_number,email,valid_from,valid_to
1026,Lori Odom,"66894 Pamela Ridge Apt. 701 Wilsonport, NV 55859",,santanachristopher@wilson-bailey.com,2024-01-30,9999-12-31
1030,David Odonnell,USNV Simmons FPO AE 08244,6864684148,kathleen41@hotmail.com,2024-08-30,9999-12-31
1035,Cheryl Weaver,"3716 Cunningham Station Apt. 567 Davidborough, TX 41021",893.223.0773x3326,smithcatherine@yahoo.com,2024-03-07,9999-12-31
1036,Rebecca Johnson,"674 Bishop Mission Suzannebury, NY 90306",018.713.0054x360,carol33@holt-higgins.info,2024-05-25,9999-12-31
1037,Lisa Hill,"6720 Brittany Streets Lake Sabrinaview, IN 22990",(646)830-3919x64651,cory15@hotmail.com,2024-11-01,9999-12-31
1038,Aaron Cooper,"37842 Haynes Isle Suite 421 South Marisa, PA 75690",249-334-3781x7626,crystal91@henderson-lane.net,2024-11-26,9999-12-31
1039,Betty Andrews,Unit 9441 Box 7301 DPO AA 92892,(055)647-0735,avilacody@yahoo.com,2024-04-10,9999-12-31
1047,Edward Stone,"31740 Martinez Trace Jonesview, NC 49949",884.266.5166x7808,zwhite@hotmail.com,2024-03-31,9999-12-31
1048,James Myers,"78527 Kelly Corner Powellbury, FL 03544",001-403-398-8094,vyoder@wiley-jones.com,2024-08-06,9999-12-31
1050,Scott Freeman,"528 John Hollow Theresabury, SC 37328",0970621868,emily78@gmail.com,2024-01-24,9999-12-31


In [0]:
#SPARK_VERSION=3.2

In [0]:
from pyspark.sql.functions import col

def check_and_remove_nulls(df, column_name):
    null_count = df.filter(col(column_name).isNull()).count()
    if null_count > 0:
        print(f"Column '{column_name}' has {null_count} null values. Removing...")
        df = df.dropna(subset=[column_name])
    else:
        print(f"Column '{column_name}' has no null values.")
    return df
df_cleaned = check_and_remove_nulls(booking_df, "booking_id")
df_cleaned = check_and_remove_nulls(df_cleaned, "customer_id")
df_cleaned = check_and_remove_nulls(df_cleaned, "amount")

display(df_cleaned)


Column 'booking_id' has no null values.
Column 'customer_id' has no null values.
Column 'amount' has no null values.


booking_id,customer_id,booking_date,amount,booking_type,quantity,discount,booking_status,hotel_name,flight_number
1,1097,2024-01-14,272,Hotel,1,94,Pending,"Johnson, Leach and Kim",
2,1077,2023-12-04,907,Hotel,4,7,Completed,"Buckley, Wang and Shaffer",
3,1107,2024-01-17,812,Flight,1,93,Pending,,EA1978
4,1112,2024-05-17,313,Flight,2,83,Cancelled,,LL0838
5,1104,2023-08-25,688,Hotel,1,32,Completed,"Calhoun, Bauer and Jones",
6,1125,2024-04-06,271,Hotel,3,80,Pending,Lee-Erickson,
7,1128,2024-07-01,438,Hotel,3,0,Completed,"Maddox, Williams and Hess",
8,1137,2023-12-14,651,Hotel,2,92,Pending,Miller Ltd,
9,1047,2023-09-20,812,Hotel,3,40,Completed,"Turner, Adams and Fitzgerald",
10,1023,2023-09-26,127,Flight,5,75,Cancelled,,Nb3015


In [0]:
#df_cleaned.write.format("delta").mode("overwrite").saveAsTable("workspace.booking_and_customers.booking_table")

In [0]:
current_df = spark.read.table("workspace.booking_and_customers.booking_table")
display(current_df)

booking_id,customer_id,booking_date,amount,booking_type,quantity,discount,booking_status,hotel_name,flight_number
1,1007,2024-07-22,570,Flight,1,1,Completed,,my6617
2,1093,2023-10-20,296,Hotel,1,14,Cancelled,Mcdonald LLC,
3,1081,2023-09-18,691,Hotel,4,4,Cancelled,Foley and Sons,
4,1036,2023-10-17,361,Hotel,3,75,Completed,"Hines, Lutz and Jimenez",
5,1022,2024-01-29,253,Hotel,2,57,Pending,Booker LLC,
6,1062,2024-01-21,845,Hotel,4,77,Pending,Martinez-Macias,
7,1021,2024-05-20,255,Flight,1,78,Pending,,Xd9493
8,1037,2023-09-10,600,Hotel,3,35,Cancelled,Herrera Ltd,
9,1047,2023-12-19,241,Hotel,2,6,Pending,Daniels-Ross,
10,1099,2024-05-11,839,Flight,1,60,Pending,,Lg1381


In [0]:
current_pd = current_df.toPandas()
new_pd = df_cleaned.toPandas()


In [0]:
# Upserting the data using concat + drop duplicates because in merge we have to write extra logic to handle the case of when booking id mateched update and when booking id is not matched insert
merged_pd = pd.concat([current_pd, new_pd], ignore_index=True) #concat the two df and then drop the duplicates by keeping the last entry
merged_pd = merged_pd.drop_duplicates(subset=['booking_id'], keep='last')
print(f"Rows after merge: {len(merged_pd)}")
display(merged_pd)

Rows after merge: 200


booking_id,customer_id,booking_date,amount,booking_type,quantity,discount,booking_status,hotel_name,flight_number
1,1097,2024-01-14,272,Hotel,1,94,Pending,"Johnson, Leach and Kim",
2,1077,2023-12-04,907,Hotel,4,7,Completed,"Buckley, Wang and Shaffer",
3,1107,2024-01-17,812,Flight,1,93,Pending,,EA1978
4,1112,2024-05-17,313,Flight,2,83,Cancelled,,LL0838
5,1104,2023-08-25,688,Hotel,1,32,Completed,"Calhoun, Bauer and Jones",
6,1125,2024-04-06,271,Hotel,3,80,Pending,Lee-Erickson,
7,1128,2024-07-01,438,Hotel,3,0,Completed,"Maddox, Williams and Hess",
8,1137,2023-12-14,651,Hotel,2,92,Pending,Miller Ltd,
9,1047,2023-09-20,812,Hotel,3,40,Completed,"Turner, Adams and Fitzgerald",
10,1023,2023-09-26,127,Flight,5,75,Cancelled,,Nb3015


In [0]:
merged_spark_df = spark.createDataFrame(merged_pd)
merged_spark_df.write.format("delta").mode("overwrite").saveAsTable("workspace.booking_and_customers.booking_table")
updated_df = spark.read.table("workspace.booking_and_customers.booking_table")
display(updated_df)

booking_id,customer_id,booking_date,amount,booking_type,quantity,discount,booking_status,hotel_name,flight_number
1,1097,2024-01-14,272,Hotel,1,94,Pending,"Johnson, Leach and Kim",
2,1077,2023-12-04,907,Hotel,4,7,Completed,"Buckley, Wang and Shaffer",
3,1107,2024-01-17,812,Flight,1,93,Pending,,EA1978
4,1112,2024-05-17,313,Flight,2,83,Cancelled,,LL0838
5,1104,2023-08-25,688,Hotel,1,32,Completed,"Calhoun, Bauer and Jones",
6,1125,2024-04-06,271,Hotel,3,80,Pending,Lee-Erickson,
7,1128,2024-07-01,438,Hotel,3,0,Completed,"Maddox, Williams and Hess",
8,1137,2023-12-14,651,Hotel,2,92,Pending,Miller Ltd,
9,1047,2023-09-20,812,Hotel,3,40,Completed,"Turner, Adams and Fitzgerald",
10,1023,2023-09-26,127,Flight,5,75,Cancelled,,Nb3015


In [0]:
# from pyspark.sql.functions import col

# def isNull(df, column_name):
#     df_filtered = df.filter(col(column_name).isNull()).count()
#     return df_filtered
# def remove_nulls(df, column_name):
#     return df.dropna(subset=[column_name])
# print(isNull(booking_df, "booking_id"))
# print(isNull(booking_df, "customer_id"))
# print(isNull(booking_df, "amount"))
# df_after_booking_id_null = remove_nulls(booking_df, "booking_id")
# df_after_customer_id_null = remove_nulls(df_after_booking_id_null, "customer_id")
# df_cleaned_after_amount_null = remove_nulls(df_after_customer_id_null, "amount")
# display(df_cleaned_after_amount_null)

In [0]:
# from pyspark.sql.functions import col

# def isNull(df, column_name):
#     return df.filter(col(column_name).isNull()).count()

# def remove_nulls_if_any(df, column_name):
#     if isNull(df, column_name) > 0:
#         print(f"Removing rows where {column_name} is null")
#         df = df.filter(col(column_name).isNotNull())
#     else:
#         print(f"No nulls found in {column_name}, no rows removed")
#     return df

# # List of columns to clean
# columns_to_check = ["booking_id", "customer_id", "amount"]

# df_cleaned = booking_df
# for column in columns_to_check:
#     df_cleaned = remove_nulls_if_any(df_cleaned, column)

# display(df_cleaned)


In [0]:
# merged_pd = current_pd.set_index('booking_id').combine_first(new_pd.set_index('booking_id')).reset_index() 
# only inserting the new data but instead of updating old data with new one it is keeping the old data

In [0]:
# from pyspark.sql.functions import lit, current_timestamp
# import pandas as pd

# # Step 1: Convert Spark DataFrames to Pandas
# current_pd = current_df.toPandas()
# new_pd = df_cleaned.toPandas()

# # Step 2: Add a last_updated timestamp column to new_pd (optional but useful)
# from datetime import datetime
# new_pd['last_updated'] = datetime.now()

# # Step 3: Perform full outer merge on booking_id with suffixes for overlapping columns
# merged_pd = pd.merge(
#     current_pd,
#     new_pd,
#     on='booking_id',
#     how='outer',
#     suffixes=('_old', '_new')
# )

# # Step 4: For each column except booking_id, update values from new_pd if they exist
# columns = [col for col in current_pd.columns if col != 'booking_id']

# for col in columns:
#     old_col = f"{col}_old"
#     new_col = f"{col}_new"
#     if old_col in merged_pd.columns and new_col in merged_pd.columns:
#         # Take new value if it exists, else old value
#         merged_pd[col] = merged_pd[new_col].combine_first(merged_pd[old_col])
#         # Drop the old and new intermediate columns
#         merged_pd.drop([old_col, new_col], axis=1, inplace=True)
#     else:
#         # Column might not be in both dfs, handle that case:
#         if old_col in merged_pd.columns:
#             merged_pd[col] = merged_pd[old_col]
#             merged_pd.drop(old_col, axis=1, inplace=True)
#         if new_col in merged_pd.columns:
#             merged_pd[col] = merged_pd[new_col]
#             merged_pd.drop(new_col, axis=1, inplace=True)

# # Step 5: If last_updated wasn't in current_pd, but you added it to new_pd, handle that
# if 'last_updated' not in current_pd.columns and 'last_updated' in merged_pd.columns:
#     pass  # last_updated is already correct from new_pd
# elif 'last_updated' in current_pd.columns and 'last_updated' not in merged_pd.columns:
#     merged_pd['last_updated'] = pd.NaT

# # Step 6: Convert back to Spark DataFrame
# merged_spark_df = spark.createDataFrame(merged_pd)

# # Step 7: Overwrite the Delta table with the merged data
# merged_spark_df.write.format("delta").mode("overwrite").saveAsTable("workspace.booking_and_customers.booking_table")

# # Step 8: Verify the result
# updated_df = spark.read.table("workspace.booking_and_customers.booking_table")
# display(updated_df)


In [0]:
# merged_df = current_df.merge(
#     df_cleaned,
#     "current_df.booking_id == df_cleaned.booking_id"
# ).whenMatchedUpdateAll().whenNotMatchedInsertAll()
# display(merged_df)

In [0]:
# check_Incremental = Check(spark, CheckLevel.Error, "Booking Data Check") \
#     .hasSize(lambda x: x > 0).isUnique("booking_id", hint="Booking id is not Unique throught").isComplete("customer_id").isComplete("amount").isNonNegative("amount").isNonNegative("quantity").isNonNegative("discount")