####Python Notebook for SCD_06 Implementation in PySpark

#####Import Libraries

In [0]:
from pyspark.sql.functions import when,lit,array_contains,col
from pyspark.sql import SparkSession, SQLContext
import datetime
import pandas as pd

#####Spark Session

In [0]:
spark = SparkSession.builder.appName('SCD_06').getOrCreate()
spark

#####Reading Stores Initial Data Files into DataFrame

In [0]:
# DataFrame
stores_df = spark.read.csv('/FileStore/tables/stores_1.csv', header=True, inferSchema=True)
display(stores_df)

store_id,store_name,store_city,store_country
4971040129,Oyonder,Tečovice,Czech Republic
1557122377,Edgewire,Mlowo,Tanzania
2540967361,Kazu,Ziyuan,China
4753146499,Aimbo,Troyits’ke,Ukraine
947844392,Oyondu,Telč,Czech Republic
6970783693,Devcast,Zwolle,Netherlands
6745610604,Vidoo,Mercedes,Argentina
8996196010,Kimia,Chonghe,China
1891728016,Skimia,Gorē,Ethiopia
9857778712,Thoughtmix,Urug,Indonesia


In [0]:
stores_df.printSchema()

root
 |-- store_id: long (nullable = true)
 |-- store_name: string (nullable = true)
 |-- store_city: string (nullable = true)
 |-- store_country: string (nullable = true)



##### Renaming Columns

In [0]:
stores_df = stores_df.withColumnRenamed("store_city","current_city").withColumnRenamed("store_country","current_country")
display(stores_df)

store_id,store_name,current_city,current_country
4971040129,Oyonder,Tečovice,Czech Republic
1557122377,Edgewire,Mlowo,Tanzania
2540967361,Kazu,Ziyuan,China
4753146499,Aimbo,Troyits’ke,Ukraine
947844392,Oyondu,Telč,Czech Republic
6970783693,Devcast,Zwolle,Netherlands
6745610604,Vidoo,Mercedes,Argentina
8996196010,Kimia,Chonghe,China
1891728016,Skimia,Gorē,Ethiopia
9857778712,Thoughtmix,Urug,Indonesia


##### Implemeting SCD_06 Required Columns in DataFrame

In [0]:
stores_df = stores_df.withColumn("previous_city",stores_df["current_city"]).withColumn("previous_country",stores_df["current_country"]).withColumn("start_date",lit(datetime.datetime(2022, 4, 10).strftime('%Y/%m/%d'))).withColumn("end_date",lit(datetime.datetime(9999, 12, 31).strftime('%Y/%m/%d'))).withColumn("current_flag",lit('Y'))

#Rearrange Columns
stores_df = stores_df.select("store_id", "store_name", "current_city","previous_city", "current_country", "previous_country", "start_date", "end_date", "current_flag")

display(stores_df)

store_id,store_name,current_city,previous_city,current_country,previous_country,start_date,end_date,current_flag
4971040129,Oyonder,Tečovice,Tečovice,Czech Republic,Czech Republic,2022/04/10,9999/12/31,Y
1557122377,Edgewire,Mlowo,Mlowo,Tanzania,Tanzania,2022/04/10,9999/12/31,Y
2540967361,Kazu,Ziyuan,Ziyuan,China,China,2022/04/10,9999/12/31,Y
4753146499,Aimbo,Troyits’ke,Troyits’ke,Ukraine,Ukraine,2022/04/10,9999/12/31,Y
947844392,Oyondu,Telč,Telč,Czech Republic,Czech Republic,2022/04/10,9999/12/31,Y
6970783693,Devcast,Zwolle,Zwolle,Netherlands,Netherlands,2022/04/10,9999/12/31,Y
6745610604,Vidoo,Mercedes,Mercedes,Argentina,Argentina,2022/04/10,9999/12/31,Y
8996196010,Kimia,Chonghe,Chonghe,China,China,2022/04/10,9999/12/31,Y
1891728016,Skimia,Gorē,Gorē,Ethiopia,Ethiopia,2022/04/10,9999/12/31,Y
9857778712,Thoughtmix,Urug,Urug,Indonesia,Indonesia,2022/04/10,9999/12/31,Y


##### Saving DataFrame to a Hive Table

In [0]:
#Save DataFrame to a Temp Table/View
stores_df.createOrReplaceTempView('stores_tmp')

#Initializing SQL Context 
sql_ctx = SQLContext(spark)

#Truncating Table Directory
dbutils.fs.rm("dbfs:/user/hive/warehouse/", True)

#Drop Table if Exists
sql_ctx.sql("DROP TABLE IF EXISTS Stores_scd") 
sql_ctx.sql("CREATE TABLE Stores_scd AS SELECT * from stores_tmp")

Out[93]: DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]

In [0]:
#Reading Data from Table

scd_df = spark.read.table("Stores_scd")
display(scd_df)

store_id,store_name,current_city,previous_city,current_country,previous_country,start_date,end_date,current_flag
4971040129,Oyonder,Tečovice,Tečovice,Czech Republic,Czech Republic,2022/04/10,9999/12/31,Y
1557122377,Edgewire,Mlowo,Mlowo,Tanzania,Tanzania,2022/04/10,9999/12/31,Y
2540967361,Kazu,Ziyuan,Ziyuan,China,China,2022/04/10,9999/12/31,Y
4753146499,Aimbo,Troyits’ke,Troyits’ke,Ukraine,Ukraine,2022/04/10,9999/12/31,Y
947844392,Oyondu,Telč,Telč,Czech Republic,Czech Republic,2022/04/10,9999/12/31,Y
6970783693,Devcast,Zwolle,Zwolle,Netherlands,Netherlands,2022/04/10,9999/12/31,Y
6745610604,Vidoo,Mercedes,Mercedes,Argentina,Argentina,2022/04/10,9999/12/31,Y
8996196010,Kimia,Chonghe,Chonghe,China,China,2022/04/10,9999/12/31,Y
1891728016,Skimia,Gorē,Gorē,Ethiopia,Ethiopia,2022/04/10,9999/12/31,Y
9857778712,Thoughtmix,Urug,Urug,Indonesia,Indonesia,2022/04/10,9999/12/31,Y


##### Reading Incremented Stores Data into DataFrame

In [0]:
incremented_df = spark.read.csv('/FileStore/tables/stores_2.csv', header=True, inferSchema=True)
display(incremented_df)

store_id,store_name,store_city,store_country
4791826558,Browseblab,Chenjiaba,China
777509504,Browsecat,Jinta,China
8061670267,Flipstorm,Krinichnaya,Ukraine
883999021,Thoughtbeat,Suruhan,Indonesia
1354111893,Viva,Podstepki,Russia
1939559960,Vitz,Tianhu,China
7958754078,Voonte,Cabeço,Portugal
5434248015,Ainyx,Mihara,Japan
4382993563,Riffpedia,Melbourne,United States
7935100454,Skalith,Singaraja,Indonesia


#####Implementing SCD_06 Calculations.

In [0]:
#Read Store_ids of Initials Stores Data
stores_list = list(scd_df.toPandas()['store_id'])

for row in incremented_df.collect():
    
    #Update the Old Rows and Mark Current_Flag as FALSE  
    if row['store_id'] in stores_list: 
        scd_df = scd_df.withColumn("current_city", when((scd_df["store_id"]  == row["store_id"]) & (scd_df["store_name"]  == row["store_name"]), row["store_city"]).otherwise(scd_df["current_city"]))
        scd_df = scd_df.withColumn("current_country", when((scd_df["store_id"]  == row["store_id"]) & (scd_df["store_name"]  == row["store_name"]), row["store_country"]).otherwise(scd_df["current_country"]))
        scd_df = scd_df.withColumn("end_date", when((scd_df["store_id"]  == row["store_id"]) & (scd_df["store_name"]  == row["store_name"]), lit(datetime.datetime(2022, 4, 12).strftime('%Y/%m/%d'))).otherwise(scd_df["end_date"]))
        scd_df = scd_df.withColumn("current_flag", when((scd_df["store_id"]  == row["store_id"]) & (scd_df["store_name"]  == row["store_name"]), lit("N")).otherwise(scd_df["current_flag"]))
        
        
    #Insert New Row and Mark Current_Flag as True    
    new_row = [(row["store_id"],row["store_name"],row["store_city"],row["store_city"],row["store_country"],row["store_country"],datetime.datetime(2022, 4, 12).strftime('%Y/%m/%d'),datetime.datetime(9999, 12, 31).strftime('%Y/%m/%d'), 'Y')]
    columns = ["store_id", "store_name", "current_city","previous_city", "current_country", "previous_country", "start_date", "end_date", "current_flag"]
    newdf = spark.createDataFrame(new_row, columns)
    scd_df = scd_df.union(newdf)


#####Sort and save DataFrame

In [0]:
scd_df = scd_df.sort(col("store_id"), col("current_flag").asc())
display(scd_df)

store_id,store_name,current_city,previous_city,current_country,previous_country,start_date,end_date,current_flag
32147600,Blogtag,Horton,Horton,United Kingdom,United Kingdom,2022/04/10,9999/12/31,Y
146069846,Ailane,Martin,Martin,Croatia,Croatia,2022/04/10,9999/12/31,Y
388657162,Quire,Žďár,Žďár,Czech Republic,Czech Republic,2022/04/10,9999/12/31,Y
610192906,Photobug,Göteborg,Göteborg,Sweden,Sweden,2022/04/10,9999/12/31,Y
738251763,Zoomdog,Prizren,Prizren,Kosovo,Kosovo,2022/04/12,9999/12/31,Y
741185687,Layo,Cuihuangkou,Cuihuangkou,China,China,2022/04/10,9999/12/31,Y
777509504,Browsecat,Jinta,Jinta,China,China,2022/04/12,9999/12/31,Y
790503050,Oyonder,Xiadian,Xiadian,China,China,2022/04/12,9999/12/31,Y
883999021,Thoughtbeat,Suruhan,Suruhan,Indonesia,Indonesia,2022/04/12,9999/12/31,Y
947844392,Oyondu,Telč,Telč,Czech Republic,Czech Republic,2022/04/10,9999/12/31,Y


##### Save Resules to SCD Table

In [0]:
scd_df.write.mode("overwrite").saveAsTable("Stores_scd")
display(spark.read.table("Stores_scd"))

store_id,store_name,current_city,previous_city,current_country,previous_country,start_date,end_date,current_flag
32147600,Blogtag,Horton,Horton,United Kingdom,United Kingdom,2022/04/10,9999/12/31,Y
146069846,Ailane,Martin,Martin,Croatia,Croatia,2022/04/10,9999/12/31,Y
388657162,Quire,Žďár,Žďár,Czech Republic,Czech Republic,2022/04/10,9999/12/31,Y
610192906,Photobug,Göteborg,Göteborg,Sweden,Sweden,2022/04/10,9999/12/31,Y
738251763,Zoomdog,Prizren,Prizren,Kosovo,Kosovo,2022/04/12,9999/12/31,Y
741185687,Layo,Cuihuangkou,Cuihuangkou,China,China,2022/04/10,9999/12/31,Y
777509504,Browsecat,Jinta,Jinta,China,China,2022/04/12,9999/12/31,Y
790503050,Oyonder,Xiadian,Xiadian,China,China,2022/04/12,9999/12/31,Y
883999021,Thoughtbeat,Suruhan,Suruhan,Indonesia,Indonesia,2022/04/12,9999/12/31,Y
947844392,Oyondu,Telč,Telč,Czech Republic,Czech Republic,2022/04/10,9999/12/31,Y
