In [0]:
%run /Workspace/Users/mudashirahul.subhash@hcltech.com/utils/common_functions

#### CHECKING EXISTING MOUNT_POINTS

In [0]:
dbutils.fs.ls("/mnt/bayerhackathon/")

### READ FILES FROM CONTAINER AND CREATE DATAFRAMES

In [0]:
customer_df = spark.read.csv("/mnt/bayerhackathon/customer.csv", header=True, inferSchema=True)
customer_behaviour_df = spark.read.csv("/mnt/bayerhackathon/customer_behaviour.csv", header=True, inferSchema=True)
orders_df = spark.read.csv("/mnt/bayerhackathon/order.csv", header=True, inferSchema=True)
order_line_df = spark.read.csv("/mnt/bayerhackathon/order_line.csv", header=True, inferSchema=True)
customer_scd2_df = spark.read.csv("/mnt/bayerhackathon/customer_SCD2_data.csv", header=True,sep = ";", inferSchema=True)

In [0]:
customer_df.printSchema()
customer_behaviour_df.printSchema()
orders_df.printSchema()
order_line_df.printSchema()
customer_scd2_df.printSchema()


In [0]:
 
customer_df.display()
customer_behaviour_df.display()
order_line_df.display()
orders_df.display()
customer_scd2_df.display()


In [0]:
customer_df.cache()

#### TASK 1. REMOVE ROWS FROM CUSTOMERS FILE HAVING NULL PHONE **NUMBERS** 

In [0]:
cleaned_customer_df = null_checks(customer_df,["phone"])

In [0]:
cleaned_customer_df.display()

#### TASK 2. REMOVE ROWS FROM ORDER FILE HAVING NULL PHONE **NUMBERS** 

In [0]:
cleaned_order_df = cleaned_customer_df.join(orders_df,"customer_id","inner")
cleaned_order_df.display()

#### TASK 3. IN CUSTOMER_BEHAVIOUR FILE REMOVE ROWS FOR CUSTOMERS THAT HAVE PHONE NUMBERS BLANK

In [0]:
cleaned_customer_behaviour_df = cleaned_customer_df.join(customer_behaviour_df,"customer_id","inner")
cleaned_customer_behaviour_df.display()

#### TASK 4. IN ORDERS DETAILS REMOVE ORDER LINES FOR WHICH ORDERS ARE REMOVED

In [0]:
cleaned_order_line_df = cleaned_order_df.join(order_line_df,"order_id","inner")
cleaned_order_line_df.display()

#### TASK 5. UPDATE TOTAL PURCHASE VALUE IN ORDER HEADER

In [0]:
from pyspark.sql.functions import round, sum
updated_purchase_value_df = orders_df.join(
    order_line_df.groupBy("order_id").agg(
        round(sum(order_line_df.price * order_line_df.quantity), 2).alias("updated_total_purchase_value")
    ),
    "order_id",
    "inner"
)
display(updated_purchase_value_df)

In [0]:
updated_purchase_value_df.display()

### WRITE ALL FILES TO SILVER `LAYER`

In [0]:
from datetime import datetime
current_date = datetime.now().strftime("%Y-%m-%d")

In [0]:
cleaned_order_df.printSchema()

In [0]:
cleaned_order_df.columns

In [0]:
cleaned_order_df = cleaned_order_df.select("customer_id","first_name","last_name","email","gender","Address","city",customer_df.state,"country","zipcode","phone","Created_date","loyalty","order_id","order_date","order_channel")

In [0]:
cleaned_order_line_df.printSchema()

In [0]:
cleaned_order_line_df = cleaned_order_line_df.select("customer_id","order_id","product","quantity","price","order_currency","order_date","order_channel","order_country",customer_df.state,"country")

In [0]:
cleaned_customer_df.write.mode("overwrite").format("delta").save(f"/mnt/bayerhackathon/RAHUL/silver/cleaned_customer_df/{current_date}/")
cleaned_order_df.write.mode("overwrite").format("delta").save(f"/mnt/bayerhackathon/RAHUL/silver/cleaned_order_df/{current_date}/")
cleaned_order_line_df.write.mode("overwrite").format("delta").save(f"/mnt/bayerhackathon/RAHUL/silver/cleaned_order_line_df/{current_date}/")
cleaned_customer_behaviour_df.write.mode("overwrite").format("delta").save(f"/mnt/bayerhackathon/RAHUL/silver/cleaned_customer_behaviour_df/{current_date}/") 
updated_purchase_value_df.write.mode("overwrite").format("delta").save(f"/mnt/bayerhackathon/RAHUL/silver/updated_purchase_value_df/{current_date}/")