In [0]:
# Importing all the necessary funtions for the scripts
from pyspark.sql.functions import current_date, datediff , year, count, to_date,when,col,row_number,desc
from pyspark.sql.types import *
from pyspark.sql.window import Window

#Defining a window specification 
windowSpec = Window.partitionBy("Customer_Name").orderBy( desc('Last_Consulted_Date'))

# Declaring schema 
Schema = StructType([StructField("None", StringType() ,True)
,StructField("Type", StringType() ,True)
,StructField("Customer_Name", StringType() ,True)
,StructField("Customer_Id", IntegerType() ,True)
,StructField("Open_Date", StringType() ,True)
,StructField("Last_Consulted_Date", StringType() ,True)
,StructField("Vaccination_Id", StringType() ,True)
,StructField("Dr_Name", StringType() ,True)
,StructField("State", StringType() ,True)
,StructField("Country", StringType() ,True)
,StructField("DOB", StringType() ,True)
,StructField("Is_Active", StringType() ,True)
])

# Reading the .csv file and creating the DataFrame
df=spark.read.csv("/FileStore/tables/Vaccination_Details-5.csv", header=True, sep="|", schema=Schema)
display(df)

# Deleting first two nnwanted coulumns
df=df.drop(*[df.columns[0], df.columns[1]])

#Adding Age and Days coulumn along with conditions 
df=df.withColumn("Age", year(current_date())-year(to_date(col("DOB"),"ddMMyyyy")))\
     .withColumn("Days", when(datediff(current_date(),to_date("Last_Consulted_Date","yyyyMMdd"))>30,datediff(current_date(),to_date("Last_Consulted_Date","yyyyMMdd"))).otherwise("Less Than 30 Days"))\
     .withColumn("row_number",row_number().over(windowSpec)).filter(col("row_number")==1)\
     .drop("row_number")
display(df)

#Creating tables for each country and inserting the records 
grouped_df = df.select("Country").distinct().collect()
for country in grouped_df:
    table_name = f"TABLE_{country[0]}"
    if spark.catalog.tableExists(table_name):
       df1=df.filter(col("Country") == country[0] )
       df1.write.mode("append").option("mergeSchema", True).saveAsTable(table_name) 
    else :
       df1=df.filter(col("Country") == country[0] )
       df1.write.mode("overwrite").option("mergeSchema", True).saveAsTable(table_name)
    
    







None,Type,Customer_Name,Customer_Id,Open_Date,Last_Consulted_Date,Vaccination_Id,Dr_Name,State,Country,DOB,Is_Active
,D,Alex,123457,20101012,20121013,MVD,Paul,SA,USA,6031987,A
,D,John,123458,20101012,20121013,MVD,Paul,TN,IND,6031987,A
,D,Mathew,123459,20101012,20121013,MVD,Paul,WAS,PHIL,6031987,A
,D,Matt,12345,20101012,20121013,MVD,Paul,BOS,NYC,6031987,A
,D,Jacob,1256,20101012,20121013,MVD,Paul,VIC,AU,6031987,A
,D,Smith,123458,20101012,20121013,MVD,Paul,SA,USA,6031987,A
,D,jack,123459,20111013,20121013,MVD,Paul,TN,IND,6031997,A
,D,Hardin,123460,20131018,20241003,MVD,Paul,WAS,PHIL,6031999,A
,D,Tessa,12346,20161025,20121013,MVD,Paul,BOS,NYC,6031990,A
,D,Zaya,1257,20171030,20121013,MVD,Paul,VIC,AU,6031994,A


Customer_Name,Customer_Id,Open_Date,Last_Consulted_Date,Vaccination_Id,Dr_Name,State,Country,DOB,Is_Active,Age,Days
Alex,123457,20101012,20121013,MVD,Paul,SA,USA,6031987,A,37,4393
Hardin,123460,20131018,20241003,MVD,Paul,WAS,PHIL,6031999,A,25,Less Than 30 Days
Jacob,1256,20101012,20211013,MVD,Paul,SYD,AUS,6031987,A,37,1106
John,123458,20101012,20121013,MVD,Paul,TN,IND,6031987,A,37,4393
Mathew,123459,20101012,20121013,MVD,Paul,WAS,PHIL,6031987,A,37,4393
Matt,12345,20101012,20121013,MVD,Paul,BOS,NYC,6031987,A,37,4393
Smith,123458,20101012,20121013,MVD,Paul,SA,USA,6031987,A,37,4393
Tessa,12346,20161025,20241010,MVD,Paul,BOS,IND,6031990,A,34,Less Than 30 Days
Zaya,1257,20171030,20121013,MVD,Paul,VIC,AU,6031994,A,30,4393
jack,123459,20111013,20121013,MVD,Paul,TN,IND,6031997,A,27,4393
