In [1]:
from pyspark.sql import SparkSession

spark = SparkSession\
        .builder\
        .appName("Spark Job")\
        .enableHiveSupport()\
        .getOrCreate()

In [2]:
from pyspark.sql import functions as F

In [4]:
test_df = spark.read.csv("./final_customer_2023-02-14.csv",header=True)

In [5]:

# Modify Date
test_df = test_df.withColumn("date", F.col("date").cast("date"))
test_df = test_df.withColumn("customer_joindate", F.col("customer_joindate").cast("date"))

# FactTable Preprocess
Fact_table = test_df.select(['order_id','customer_id','product_id','date','store_id','product_quantity','product_price'])
Fact_table = Fact_table.withColumn("amount",Fact_table['product_quantity']*Fact_table['product_price'])
Fact_table = Fact_table.withColumn("date_id",F.date_format(F.col("date"), "yyyyMMdd"))
Fact_table = Fact_table.select(['order_id','customer_id','product_id','date_id','store_id','product_quantity','amount'])
Fact_table = Fact_table.withColumnRenamed("product_quantity",'quantity')

# CustomerTable Preprocess

Customer_dim = test_df.select(['customer_id','customer_name','customer_email','customer_address','customer_joindate'])
Customer_dim = Customer_dim.withColumn("first_name", F.split(F.col('customer_name'), ' ').getItem(0))
Customer_dim = Customer_dim.withColumn("last_name", F.split(F.col('customer_name'), ' ').getItem(1))
Customer_dim = Customer_dim.withColumnRenamed("customer_email",'email_address')
Customer_dim = Customer_dim.withColumn("address", F.split(F.col('customer_address'), ',').getItem(0))
Customer_dim = Customer_dim.withColumn("city", F.split(F.col('customer_address'), ',').getItem(1))
Customer_dim = Customer_dim.withColumn("state", F.split(F.col('customer_address'), ',').getItem(2))
Customer_dim = Customer_dim.withColumn("zipcode", F.split(F.col('customer_address'), ',').getItem(3))
Customer_dim = Customer_dim.select(
    F.col('customer_id'),
    F.col('first_name'),
    F.col('last_name'),
    F.col('email_address'),
    F.col('address'),
    F.trim(F.col('city')).alias('city'),
   F.trim(F.col('state')).alias('state'),
    F.col('zipcode'),
    F.col('Customer_Joindate').alias('join_date')
).dropDuplicates(subset=['customer_id','first_name'])

# Product_dim Preprocess
Product_dim = test_df.select(
    ['product_id','product_name','product_category','manufacture_name','manufacture_address']
)

Product_dim = Product_dim.withColumn("address", F.split(F.col('manufacture_address'), ',').getItem(0))
Product_dim = Product_dim.withColumn("city", F.split(F.col('manufacture_address'), ',').getItem(1))
Product_dim = Product_dim.withColumn("state", F.split(F.col('manufacture_address'), ',').getItem(2))
Product_dim = Product_dim.withColumn("zipcode", F.split(F.col('manufacture_address'), ',').getItem(3))


Product_dim = Product_dim.select(
    F.col('product_id'),
    F.col('product_name'),
    F.col('product_category'),
    F.col('manufacture_name'),
    F.col('address'),
    F.trim(F.col('city')).alias('city'),
   F.trim(F.col('state')).alias('state'),
    F.col('zipcode')
).dropDuplicates(subset=['product_id','product_name'])

# StoreTable Preprocess
Store_dim = test_df.select(
    ['store_id','store_name','manager_name','store_address']
)

Store_dim = Store_dim.withColumn("address", F.split(F.col('store_address'), ',').getItem(0))
Store_dim = Store_dim.withColumn("city", F.split(F.col('store_address'), ',').getItem(1))
Store_dim = Store_dim.withColumn("state", F.split(F.col('store_address'), ',').getItem(2))
Store_dim = Store_dim.withColumn("zipcode", F.split(F.col('store_address'), ',').getItem(3))

Store_dim = Store_dim.select(
    F.col('store_id'),
    F.col('store_name'),
    F.col('manager_name'),
    F.col('address'),
    F.trim(F.col('city')).alias('city'),
   F.trim(F.col('state')).alias('state'),
    F.col('zipcode')
).dropDuplicates(subset=['store_id','store_name'])

In [6]:
Fact_table.show()

+--------+-----------+----------+-------+--------+--------+------+
|order_id|customer_id|product_id|date_id|store_id|quantity|amount|
+--------+-----------+----------+-------+--------+--------+------+
|       8|          8|     84879|   null|       4|       2|  60.0|
|       9|          9|     84879|   null|       4|       1|  30.0|
|      10|         10|    84406B|   null|       2|       8|7192.0|
|      11|         11|     84879|   null|       4|       1|  30.0|
|      12|         12|     84879|   null|       4|       2|  60.0|
|      13|         13|     84879|   null|       4|       3|  90.0|
|      13|         13|    84406B|   null|       2|       2|1798.0|
+--------+-----------+----------+-------+--------+--------+------+



In [12]:
fact_prev = spark.read.parquet('F:/DE/Final/Parquet/fact_table/part-00000-a210d996-c636-434c-ae06-cf8e6fb615e9-c000.snappy.parquet')

In [11]:
# identify new records comging from day 2 dataset

Fact_table_new_df = Fact_table.join(fact_prev,Fact_table.order_id ==fact_prev.order_id,'left' )\
     .filter(fact_prev.order_id.isNull())\
    .select(Fact_table.order_id,Fact_table.customer_id,Fact_table.product_id,Fact_table.date_id,Fact_table.store_id,Fact_table.quantity,Fact_table.amount)
Fact_table_new_df.show()

AttributeError: 'DataFrame' object has no attribute 'order_id'

In [12]:
# identify modified records comging from day 2 dataset

Fact_table_modified_df = Fact_table.join(fact_prev,Fact_table.Order_id ==fact_prev.Order_id,'inner' )\
    .select(Fact_table.Order_id,Fact_table.Customer_id,Fact_table.Product_id,Fact_table.Date_id,Fact_table.Store_id,Fact_table.Quantity,Fact_table.Amount)
Fact_table_modified_df.show()

+--------+-----------+----------+--------+--------+--------+------+
|Order_id|Customer_id|Product_id| Date_id|Store_id|Quantity|Amount|
+--------+-----------+----------+--------+--------+--------+------+
|       8|          8|     84879|20220213|       4|       2|  60.0|
|       9|          9|     84879|20220213|       4|       1|  30.0|
+--------+-----------+----------+--------+--------+--------+------+



In [28]:
#unchanged records
Fact_table_unchanged_df = Fact_table.join(fact_prev,Fact_table.Order_id ==fact_prev.Order_id,'right' )\
    .filter(Fact_table.Order_id.isNull())\
    .select(fact_prev.Order_id,fact_prev.Customer_id,fact_prev.Product_id,fact_prev.Date_id,fact_prev.Store_id,fact_prev.Quantity,fact_prev.Amount)
Fact_table_unchanged_df.show()

+--------+-----------+----------+--------+--------+--------+------+
|Order_id|Customer_id|Product_id| Date_id|Store_id|Quantity|Amount|
+--------+-----------+----------+--------+--------+--------+------+
|       1|          1|    85123A|20220210|       1|       3| 300.0|
|       1|          1|     71053|20220210|       1|       3| 450.0|
|       1|          1|    84406B|20220210|       1|       2|1798.0|
|       2|          2|    84029G|20220210|       2|       3|  60.0|
|       3|          3|    84029E|20220210|       2|       1|  30.0|
|       4|          4|     22752|20220210|       2|       1|  20.0|
|       5|          5|     21730|20220210|       2|       3| 120.0|
|       6|          6|     22633|20220210|       3|       4| 200.0|
|       6|          6|     84879|20220210|       3|       5| 150.0|
|       7|          7|     22633|20220210|       3|       3| 150.0|
+--------+-----------+----------+--------+--------+--------+------+



In [61]:
Fact_table_final_df = Fact_table_new_df.union(Fact_table_modified_df).union(Fact_table_unchanged_df).distinct()

Fact_table_final_df.show()

+--------+-----------+----------+--------+--------+--------+------+
|Order_id|Customer_id|Product_id| Date_id|Store_id|Quantity|Amount|
+--------+-----------+----------+--------+--------+--------+------+
|      13|         13|    84406B|20220213|       2|       2|1798.0|
|      12|         12|     84879|20220213|       4|       2|  60.0|
|      13|         13|     84879|20220213|       4|       3|  90.0|
|      10|         10|    84406B|20220213|       2|       8|7192.0|
|      11|         11|     84879|20220213|       4|       1|  30.0|
|       9|          9|     84879|20220213|       4|       1|  30.0|
|       8|          8|     84879|20220213|       4|       2|  60.0|
|       3|          3|    84029E|20220210|       2|       1|  30.0|
|       2|          2|    84029G|20220210|       2|       3|  60.0|
|       4|          4|     22752|20220210|       2|       1|  20.0|
|       1|          1|    85123A|20220210|       1|       3| 300.0|
|       1|          1|     71053|20220210|      

In [13]:
product_prev = spark.read.parquet('F:/DE/Final/Parquet/Product/part-00000-6b6c22be-449d-4bf8-a52d-36a00a1a3a0f-c000.snappy.parquet')

In [14]:
product_prev.show()

+----------+--------------------+----------------+----------------+--------------------+------------+-----+-------+
|product_id|        product_name|product_category|manufacture_name|             address|        city|state|zipcode|
+----------+--------------------+----------------+----------------+--------------------+------------+-----+-------+
|     22752|SET 7 BABUSHKA NE...|         Kitchen|    Best Kitchen|  3862 Almond Avenue|     Fremont|   CA|  94538|
|    84029E|RED WOOLLY HOTTIE...|         Kitchen|    Best Kitchen|  3862 Almond Avenue|     Fremont|   CA|  94538|
|     84879|ASSORTED COLOUR B...|           Cloth|              HM| 640 Hilliard Street|  Manchester|   CT|  06042|
|    85123A|WHITE HANGING HEA...|           Cloth|             AWS| 1405 Hickory Avenue| Panama City|   FL|  32401|
|     22633|HAND WARMER UNION...|           Cloth|          Unique|17372 Crestview Road|Fayetteville|   AR|  72701|
|     71053|           Apple Pen|            Tech|           Apple|463 P

In [15]:
Product_dim_new_df = Product_dim.join(product_prev,Product_dim.product_id ==product_prev.product_id,'left' )\
     .filter(product_prev.product_id.isNull())\
    .select(Product_dim.product_id,Product_dim.product_name,Product_dim.product_category,Product_dim.manufacture_name,Product_dim.address,Product_dim.city,Product_dim.state,Product_dim.zipcode)
# identify modified records comging from day 2 dataset
Product_dim_modified_df = Product_dim.join(product_prev,Product_dim.product_id ==product_prev.product_id,'inner' )\
    .select(Product_dim.product_id,Product_dim.product_name,Product_dim.product_category,Product_dim.manufacture_name,Product_dim.address,Product_dim.city,Product_dim.state,Product_dim.zipcode)
#unchanged records
Product_dim_unchanged_df = Product_dim.join(product_prev,Product_dim.product_id ==product_prev.product_id,'right' )\
    .filter(Product_dim.product_id.isNull())\
    .select(product_prev.product_id,product_prev.product_name,product_prev.product_category,product_prev.manufacture_name,product_prev.address,product_prev.city,product_prev.state,product_prev.zipcode)

In [16]:
Product_dim_new_df.show()

+----------+------------+----------------+----------------+-------+----+-----+-------+
|product_id|product_name|product_category|manufacture_name|address|city|state|zipcode|
+----------+------------+----------------+----------------+-------+----+-----+-------+
+----------+------------+----------------+----------------+-------+----+-----+-------+



In [17]:
# identify new records comging from day 2 dataset

Product_dim_new_df = Product_dim.join(product_prev,Product_dim.Product_id ==product_prev.Product_id,'left' )\
     .filter(product_prev.Product_id.isNull())\
    .select(Product_dim.Product_id,Product_dim.Product_name,Product_dim.Product_category,Product_dim.Manufacture_name,Product_dim.Address,Product_dim.City,Product_dim.State,Product_dim.Zipcode)
Product_dim_new_df.show()

+----------+------------+----------------+----------------+-------+----+-----+-------+
|Product_id|Product_name|Product_category|Manufacture_name|Address|City|State|Zipcode|
+----------+------------+----------------+----------------+-------+----+-----+-------+
+----------+------------+----------------+----------------+-------+----+-----+-------+



In [18]:
# identify modified records comging from day 2 dataset
Product_dim_modified_df = Product_dim.join(product_prev,Product_dim.Product_id ==product_prev.Product_id,'inner' )\
    .select(Product_dim.Product_id,Product_dim.Product_name,Product_dim.Product_category,Product_dim.Manufacture_name,Product_dim.Address,Product_dim.City,Product_dim.State,Product_dim.Zipcode)
Product_dim_modified_df.show(10)

+----------+--------------------+----------------+----------------+--------------------+----------+-----+-------+
|Product_id|        Product_name|Product_category|Manufacture_name|             Address|      City|State|Zipcode|
+----------+--------------------+----------------+----------------+--------------------+----------+-----+-------+
|    84406B|              Iphone|            Tech|           Apple|463 Pecan Tree Drive|Montgomery|   AL|  36109|
|     84879|ASSORTED COLOUR B...|           Cloth|              HM| 640 Hilliard Street|Manchester|   CT|  06042|
+----------+--------------------+----------------+----------------+--------------------+----------+-----+-------+



In [19]:
product_prev.show()

+----------+--------------------+----------------+----------------+--------------------+------------+-----+-------+
|Product_id|        Product_name|Product_category|Manufacture_name|             Address|        City|State|Zipcode|
+----------+--------------------+----------------+----------------+--------------------+------------+-----+-------+
|     22752|SET 7 BABUSHKA NE...|         Kitchen|    Best Kitchen|  3862 Almond Avenue|     Fremont|   CA|  94538|
|    84029E|RED WOOLLY HOTTIE...|         Kitchen|    Best Kitchen|  3862 Almond Avenue|     Fremont|   CA|  94538|
|     84879|ASSORTED COLOUR B...|           Cloth|              HM| 640 Hilliard Street|  Manchester|   CT|  06042|
|    85123A|WHITE HANGING HEA...|           Cloth|             AWS| 1405 Hickory Avenue| Panama City|   FL|  32401|
|     22633|HAND WARMER UNION...|           Cloth|          Unique|17372 Crestview Road|Fayetteville|   AR|  72701|
|     71053|           Apple Pen|            Tech|           Apple|463 P

In [31]:
#unchanged records


Product_dim_unchanged_df = Product_dim.join(product_prev,Product_dim.Product_id ==product_prev.Product_id,'right' )\
    .filter(Product_dim.Product_id.isNull())\
    .select(product_prev.Product_id,product_prev.Product_name,product_prev.Product_category,product_prev.Manufacture_name,product_prev.Address,product_prev.City,product_prev.State,product_prev.Zipcode)
Product_dim_unchanged_df.show(10)

+----------+--------------------+----------------+----------------+--------------------+------------+-----+-------+
|Product_id|        Product_name|Product_category|Manufacture_name|             Address|        City|State|Zipcode|
+----------+--------------------+----------------+----------------+--------------------+------------+-----+-------+
|     22752|SET 7 BABUSHKA NE...|         Kitchen|    Best Kitchen|  3862 Almond Avenue|     Fremont|   CA|  94538|
|    84029E|RED WOOLLY HOTTIE...|         Kitchen|    Best Kitchen|  3862 Almond Avenue|     Fremont|   CA|  94538|
|    85123A|WHITE HANGING HEA...|           Cloth|             AWS| 1405 Hickory Avenue| Panama City|   FL|  32401|
|     22633|HAND WARMER UNION...|           Cloth|          Unique|17372 Crestview Road|Fayetteville|   AR|  72701|
|     71053|           Apple Pen|            Tech|           Apple|463 Pecan Tree Drive|  Montgomery|   AL|  36109|
|     21730|GLASS STAR FROSTE...|         Kitchen|    Best Kitchen|  386

In [56]:
Product_dim_final_df = Product_dim_new_df.union(Product_dim_modified_df).union(Product_dim_unchanged_df).distinct()

Product_dim_final_df.show()

+----------+--------------------+----------------+----------------+--------------------+------------+-----+-------+
|Product_id|        Product_name|Product_category|Manufacture_name|             Address|        City|State|Zipcode|
+----------+--------------------+----------------+----------------+--------------------+------------+-----+-------+
|    84406B|              Iphone|            Tech|           Apple|463 Pecan Tree Drive|  Montgomery|   AL|  36109|
|     84879|ASSORTED COLOUR B...|           Cloth|              HM| 640 Hilliard Street|  Manchester|   CT|  06042|
|     22633|HAND WARMER UNION...|           Cloth|          Unique|17372 Crestview Road|Fayetteville|   AR|  72701|
|     22752|SET 7 BABUSHKA NE...|         Kitchen|    Best Kitchen|  3862 Almond Avenue|     Fremont|   CA|  94538|
|     21730|GLASS STAR FROSTE...|         Kitchen|    Best Kitchen|  3862 Almond Avenue|     Fremont|   CA|  94538|
|     71053|           Apple Pen|            Tech|           Apple|463 P

In [21]:
customer_prev = spark.read.parquet('F:/DE/Final/Parquet/Customer_dim/part-00000-718f8014-b421-4256-877c-683b49dfeb02-c000.snappy.parquet')

In [23]:
Customer_dim.show()

+-----------+----------+---------+--------------------+--------------------+-----------+-----+-------+----------+
|Customer_id|First_Name|Last_Name|       Email_Address|             Address|       City|State|Zipcode| join_date|
+-----------+----------+---------+--------------------+--------------------+-----------+-----+-------+----------+
|         10|      John|    Smith|     fake5@decabg.eu|     1234 Elm Street|    Anytown|   CA|  12345|2019-06-07|
|         11|     Sarah|  Johnson|     fake2@decabg.eu|     5678 Oak Avenue|Springfield|   IL|  67890|2019-06-08|
|         12|   William|    Brown|     fake3@decabg.eu|     9012 Maple Road| Smallville|   NY|  34567|2019-06-09|
|         13|    Ashley| Williams|     fake4@decabg.eu|      4321 Pine Lane|   Big City|   TX|  98765|2019-06-10|
|          8|      Lori|    Green|    ponure@decabg.eu|7365 Cherry Hill ...|   Kingston|   NY|  12401|2011-09-12|
|          9|    Sharon| Davidson|roraba@brand-app.biz|8551 St Margarets...|    Seymour|

In [24]:
# identify new records comging from day 2 dataset

customer_dim_new_df = Customer_dim.join(customer_prev,Customer_dim.Customer_id ==customer_prev.Customer_id,'left' )\
     .filter(customer_prev.Customer_id.isNull())\
    .select(Customer_dim.Customer_id,Customer_dim.First_Name,Customer_dim.Last_Name,Customer_dim.Email_Address,Customer_dim.Address,Customer_dim.City,Customer_dim.State,Customer_dim.Zipcode,Customer_dim.join_date)
customer_dim_new_df.show()

+-----------+----------+---------+---------------+---------------+-----------+-----+-------+----------+
|Customer_id|First_Name|Last_Name|  Email_Address|        Address|       City|State|Zipcode| join_date|
+-----------+----------+---------+---------------+---------------+-----------+-----+-------+----------+
|         10|      John|    Smith|fake5@decabg.eu|1234 Elm Street|    Anytown|   CA|  12345|2019-06-07|
|         11|     Sarah|  Johnson|fake2@decabg.eu|5678 Oak Avenue|Springfield|   IL|  67890|2019-06-08|
|         12|   William|    Brown|fake3@decabg.eu|9012 Maple Road| Smallville|   NY|  34567|2019-06-09|
|         13|    Ashley| Williams|fake4@decabg.eu| 4321 Pine Lane|   Big City|   TX|  98765|2019-06-10|
+-----------+----------+---------+---------------+---------------+-----------+-----+-------+----------+



In [25]:
# modified

customer_dim_modified_df = Customer_dim.join(customer_prev,Customer_dim.Customer_id ==customer_prev.Customer_id,'inner' )\
    .select(Customer_dim.Customer_id,Customer_dim.First_Name,Customer_dim.Last_Name,Customer_dim.Email_Address,Customer_dim.Address,Customer_dim.City,Customer_dim.State,Customer_dim.Zipcode,Customer_dim.join_date)
customer_dim_modified_df.show()

+-----------+----------+---------+--------------------+--------------------+--------+-----+-------+----------+
|Customer_id|First_Name|Last_Name|       Email_Address|             Address|    City|State|Zipcode| join_date|
+-----------+----------+---------+--------------------+--------------------+--------+-----+-------+----------+
|          8|      Lori|    Green|    ponure@decabg.eu|7365 Cherry Hill ...|Kingston|   NY|  12401|2011-09-12|
|          9|    Sharon| Davidson|roraba@brand-app.biz|8551 St Margarets...| Seymour|   IN|  47274|2015-06-26|
+-----------+----------+---------+--------------------+--------------------+--------+-----+-------+----------+



In [27]:
# original
customer_dim_unchanged_df = Customer_dim.join(customer_prev,Customer_dim.Customer_id ==customer_prev.Customer_id,'right' )\
    .filter(Customer_dim.Customer_id.isNull())\
    .select(customer_prev.Customer_id,customer_prev.First_Name,customer_prev.Last_Name,customer_prev.Email_Address,customer_prev.Address,customer_prev.City,customer_prev.State,customer_prev.Zipcode,customer_prev.join_date)
customer_dim_unchanged_df.show()

+-----------+----------+---------+--------------------+--------------------+-------------+-----+-------+----------+
|Customer_id|First_Name|Last_Name|       Email_Address|             Address|         City|State|Zipcode| join_date|
+-----------+----------+---------+--------------------+--------------------+-------------+-----+-------+----------+
|          5|     Jason|    Weber|    lyhevi@lyft.live|7075 Princess Street|       Linden|   NJ|  07036|2020-01-23|
|          7|  Brittany|  Goodwin|gibumuba@lyricspa...|   9057 Vermont Road| Cockeysville|   MD|  21030|2020-01-26|
|          1|    Hayley|   Austin| zekyrabe@finews.biz|   44 E. West Street|      Ashland|   OH|  44805|2019-06-07|
|          3|     Carol|Jefferson|  sizunyxy@decabg.eu|MI 48195 8779 Win...|Fuquay Varina|   NC|  27526|2014-06-30|
|          4|  Benjamin|    White|quburuha@ema-sofi...|    611 Penn Street |  Long Branch|   NJ|  07740|2010-06-14|
|          6|    Jordan|    Kelly| balukajo@finews.biz|      7459 Gulf L

In [53]:
Customer_dim_final_df = customer_dim_new_df.union(customer_dim_modified_df).union(customer_dim_unchanged_df).distinct()#.sort(F.col('Customer_id').asc())
Customer_dim_final_df.show(20)

+-----------+----------+---------+--------------------+--------------------+-------------+-----+-------+----------+
|Customer_id|First_Name|Last_Name|       Email_Address|             Address|         City|State|Zipcode| join_date|
+-----------+----------+---------+--------------------+--------------------+-------------+-----+-------+----------+
|         12|   William|    Brown|     fake3@decabg.eu|     9012 Maple Road|   Smallville|   NY|  34567|2019-06-09|
|         11|     Sarah|  Johnson|     fake2@decabg.eu|     5678 Oak Avenue|  Springfield|   IL|  67890|2019-06-08|
|         13|    Ashley| Williams|     fake4@decabg.eu|      4321 Pine Lane|     Big City|   TX|  98765|2019-06-10|
|         10|      John|    Smith|     fake5@decabg.eu|     1234 Elm Street|      Anytown|   CA|  12345|2019-06-07|
|          9|    Sharon| Davidson|roraba@brand-app.biz|8551 St Margarets...|      Seymour|   IN|  47274|2015-06-26|
|          8|      Lori|    Green|    ponure@decabg.eu|7365 Cherry Hill 

In [34]:
store_prev = spark.read.parquet('F:/DE/Final/Parquet/Store/part-00000-74c3155c-640d-4d4d-b88b-7811edc0ac9b-c000.snappy.parquet')

In [35]:
store_prev.show()

+--------+------------------+------------+------------------+-----------+-----+-------+
|Store_id|        Store_name|Manager_name|           Address|       City|State|Zipcode|
+--------+------------------+------------+------------------+-----------+-----+-------+
|       2|       BestMai -NY|        Dogg|      625 Broadway|   New York|   NY|  10012|
|       1|      BestMai - NJ|       Afaka|       125 18th St|Jersey City|   NJ|  07310|
|       3|BestMai - Flushing|        Chen|13107 40th Rd C300|     Queens|   NY|  11354|
+--------+------------------+------------+------------------+-----------+-----+-------+



In [38]:
Store_dim.show()

+--------+------------------+------------+------------------+--------+-----+-------+
|Store_id|        Store_name|Manager_name|           Address|    City|State|Zipcode|
+--------+------------------+------------+------------------+--------+-----+-------+
|       2|       BestMai -NY|        Dogg|      625 Broadway|New York|   NY|  10012|
|       4|BestMai - Flushing|        Wang|13107 40th Rd C300|  Queens|   NY|  88888|
+--------+------------------+------------+------------------+--------+-----+-------+



In [39]:
# identify new records comging from day 2 dataset

Store_dim_new_df = Store_dim.join(store_prev,Store_dim.Store_id ==store_prev.Store_id,'left' )\
     .filter(store_prev.Store_id.isNull())\
    .select(Store_dim.Store_id,Store_dim.Store_name,Store_dim.Manager_name,Store_dim.Address,Store_dim.City,Store_dim.State,Store_dim.Zipcode)
Store_dim_new_df.show()

+--------+------------------+------------+------------------+------+-----+-------+
|Store_id|        Store_name|Manager_name|           Address|  City|State|Zipcode|
+--------+------------------+------------+------------------+------+-----+-------+
|       4|BestMai - Flushing|        Wang|13107 40th Rd C300|Queens|   NY|  88888|
+--------+------------------+------------+------------------+------+-----+-------+



In [65]:
Store_dim_modified_df = Store_dim.join(store_prev,Store_dim.Store_id ==store_prev.Store_id,'inner' )\
    .select(Store_dim.Store_id,Store_dim.Store_name,Store_dim.Manager_name,Store_dim.Address,Store_dim.City,Store_dim.State,Store_dim.Zipcode)
Store_dim_modified_df.show()

+--------+-----------+------------+------------+--------+-----+-------+
|Store_id| Store_name|Manager_name|     Address|    City|State|Zipcode|
+--------+-----------+------------+------------+--------+-----+-------+
|       2|BestMai -NY|        Dogg|625 Broadway|New York|   NY|  10012|
+--------+-----------+------------+------------+--------+-----+-------+



In [66]:
Store_dim_unchanged_df = Store_dim.join(store_prev,Store_dim.Store_id ==store_prev.Store_id,'right' )\
     .filter(Store_dim.Store_id.isNull())\
    .select(store_prev.Store_id,store_prev.Store_name,store_prev.Manager_name,store_prev.Address,store_prev.City,store_prev.State,store_prev.Zipcode)
Store_dim_unchanged_df.show()

+--------+------------------+------------+------------------+-----------+-----+-------+
|Store_id|        Store_name|Manager_name|           Address|       City|State|Zipcode|
+--------+------------------+------------+------------------+-----------+-----+-------+
|       1|      BestMai - NJ|       Afaka|       125 18th St|Jersey City|   NJ|  07310|
|       3|BestMai - Flushing|        Chen|13107 40th Rd C300|     Queens|   NY|  11354|
+--------+------------------+------------+------------------+-----------+-----+-------+



In [67]:
Store_dim_final_df = Store_dim_new_df.union(Store_dim_modified_df).union(Store_dim_unchanged_df)#.distinct()
Store_dim_final_df.show()

+--------+------------------+------------+------------------+-----------+-----+-------+
|Store_id|        Store_name|Manager_name|           Address|       City|State|Zipcode|
+--------+------------------+------------+------------------+-----------+-----+-------+
|       4|BestMai - Flushing|        Wang|13107 40th Rd C300|     Queens|   NY|  88888|
|       2|       BestMai -NY|        Dogg|      625 Broadway|   New York|   NY|  10012|
|       1|      BestMai - NJ|       Afaka|       125 18th St|Jersey City|   NJ|  07310|
|       3|BestMai - Flushing|        Chen|13107 40th Rd C300|     Queens|   NY|  11354|
+--------+------------------+------------+------------------+-----------+-----+-------+

