In [0]:
# Adjust the path based on where your file is stored in DBFS
file_path = "dbfs:/FileStore/cust/cust.csv"

# Read the CSV file into a DataFrame
df = spark.read.csv(file_path, header=True, inferSchema=True)

# Show the first few rows
df.show()


+-------------+-----------+---------+-------------------+--------------+-------+-----+-------+--------+---------+
|Customer_Name|Customer_Id|Open_Date|Last_Consulted_Date|Vaccination_Id|Dr_Name|State|Country|     DOB|Is_Active|
+-------------+-----------+---------+-------------------+--------------+-------+-----+-------+--------+---------+
|         Alex|     123457| 20101012|           20121013|           MVD|   Paul|   SA|    USA|19870306|        A|
|         John|     123458| 20101012|           20121013|           MVD|   Paul|   TN|    IND|19870306|        A|
|       Mathew|     123459| 20101012|           20121013|           MVD|   Paul|  WAS|   PHIL|19870306|        A|
|         Matt|      12345| 20101012|           20121013|           MVD|   Paul|  BOS|    NYC|19870306|        A|
|        Jacob|       1256| 20101012|           20121013|           MVD|   Paul|  VIC|     AU|19870306|        A|
|        Emily|     123460| 20101013|           20121114|           MVD|   Paul|   CA|  

In [0]:
df.printSchema()


root
 |-- Customer_Name: string (nullable = true)
 |-- Customer_Id: integer (nullable = true)
 |-- Open_Date: integer (nullable = true)
 |-- Last_Consulted_Date: integer (nullable = true)
 |-- Vaccination_Id: string (nullable = true)
 |-- Dr_Name: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- DOB: integer (nullable = true)
 |-- Is_Active: string (nullable = true)



In [0]:
from pyspark.sql.functions import current_date, datediff, year, to_date

# Step 1: Convert DOB and Last_Consulted_Date from integer to date type
df = df.withColumn("DOB_Date", to_date(df["DOB"].cast("string"), "yyyyMMdd"))
df = df.withColumn("Last_Consulted_Date_Formatted", to_date(df["Last_Consulted_Date"].cast("string"), "yyyyMMdd"))

# Step 2: Add derived columns (Age and Days Since Last Consultation)
df = df.withColumn("Age", year(current_date()) - year(df["DOB_Date"]))
df = df.withColumn("Days_Since_Last_Consultation", datediff(current_date(), df["Last_Consulted_Date_Formatted"]))

# Step 3: Show the updated data
df.show()


+-------------+-----------+---------+-------------------+--------------+-------+-----+-------+--------+---------+----------+---+-----------------------------+----------------------------+
|Customer_Name|Customer_Id|Open_Date|Last_Consulted_Date|Vaccination_Id|Dr_Name|State|Country|     DOB|Is_Active|  DOB_Date|Age|Last_Consulted_Date_Formatted|Days_Since_Last_Consultation|
+-------------+-----------+---------+-------------------+--------------+-------+-----+-------+--------+---------+----------+---+-----------------------------+----------------------------+
|         Alex|     123457| 20101012|           20121013|           MVD|   Paul|   SA|    USA|19870306|        A|1987-03-06| 37|                   2012-10-13|                        4383|
|         John|     123458| 20101012|           20121013|           MVD|   Paul|   TN|    IND|19870306|        A|1987-03-06| 37|                   2012-10-13|                        4383|
|       Mathew|     123459| 20101012|           20121013|   

In [0]:
output_path = "dbfs:/FileStore/cust/partCust"
df.write.partitionBy("Country").parquet(output_path)
