## Libraries Used

In [38]:
import datetime

# Main entry point for DataFrame and SQL functionality.
from pyspark.sql import SparkSession
# Start SPARK Session
spark = SparkSession.builder.appName('Basics').getOrCreate()

from pyspark.sql.functions import *


### Read the Main Table

In [39]:
mainTable = spark.read.format('csv').options(
    header=True, inferschema=True).load(
        "/home/bluepi/Downloads/Update/product_info/main table/main table.csv",header =True)

# Main Table Schema
mainTable.printSchema()

root
 |-- p_id: integer (nullable = true)
 |-- p_name: string (nullable = true)
 |-- price: integer (nullable = true)
 |-- date_timestamp: timestamp (nullable = true)



### New Main Table with added columns

In [40]:
from pyspark.sql.functions import year, month, dayofyear, hour, minute, second

# Add record type to main table
mainTable_new = mainTable.withColumn('record_type',lit("A"))

# mainTable_new = mainTable_new.select(['p_id', 'p_name', 'price', 'date_timestamp','record_type',
#                               year(mainTable['date_timestamp']).alias("Year"),
#                               month(mainTable['date_timestamp']).alias("Month"),
#                               dayofyear(mainTable['date_timestamp']).alias("DayOfYear"),
#                               hour(mainTable['date_timestamp']).alias("Hour"),
#                               minute(mainTable['date_timestamp']).alias("Minute"),
#                               second(mainTable['date_timestamp']).alias("Second")])
mainTable_new.printSchema()

root
 |-- p_id: integer (nullable = true)
 |-- p_name: string (nullable = true)
 |-- price: integer (nullable = true)
 |-- date_timestamp: timestamp (nullable = true)
 |-- record_type: string (nullable = false)



In [41]:
mainTable.show()

+----+---------+-----+-------------------+
|p_id|   p_name|price|     date_timestamp|
+----+---------+-----+-------------------+
|   1|      Job| 1464|2020-05-02 08:06:42|
|   2|   Keylex|  208|2020-02-02 08:02:22|
|   3|   Duobam| 1684|2020-02-28 07:58:08|
|   4|Ronstring| 1961|2020-04-02 07:10:33|
|   5|  Bitwolf| 1338|2020-04-02 17:11:15|
|   6|  Andalax|   22|2020-02-18 00:42:19|
|   7|   Duobam| 1167|2020-12-02 19:45:42|
|   8|    Alpha| 1573|2020-02-26 07:04:01|
|   9|  Fix San| 1516|2020-02-23 06:06:54|
|  10|   Biodex| 1916|2020-04-02 18:36:24|
|  11|  Bitwolf|  422|2020-02-26 14:32:13|
|  12|  Bitchip| 1289|2020-08-02 12:35:48|
|  13|  Bitwolf|  794|2020-03-02 18:07:28|
|  14|   Bigtax| 1327|2020-01-02 09:08:05|
|  15|  Konklab| 1275|2020-02-14 03:03:11|
|  16|Gembucket|  188|2020-02-25 21:37:01|
|  17|  Fix San| 1008|2020-02-26 03:50:43|
|  18|   Lotlux|  565|2020-05-02 13:07:57|
|  19|   Tresom|  176|2020-02-19 20:48:07|
|  20|  Zontrax| 1491|2020-02-24 07:53:33|
+----+-----

### Read the Previous Day Folders

In [56]:
# Address to the product_info folder
address = "/home/bluepi/Downloads/Update/product_info/"
previous_day = (datetime.datetime.today() - datetime.timedelta(days=10)).strftime('%d-%m-%Y')
print("Previous Date ---->"+previous_day)

# Address to the Previous Day folder
new_address = address + previous_day
print("\nNew Address to read the folder ---->"+new_address)

# Read the Previous Day folder
per_day_data = spark.read.format('csv') \
          .options( header=True, inferschema=True ) \
          .load(new_address)

Previous Date ---->29-03-2020

New Address to read the folder ---->/home/bluepi/Downloads/Update/product_info/29-03-2020


In [57]:
# per_day_data.show(3)
per_day_data.orderBy(per_day_data.date_timestamp.asc()).show(60)
# per_day_data.printSchema()

+----+-----------+-----+-------------------+-----------+
|p_id|     p_name|price|     date_timestamp|record_type|
+----+-----------+-----+-------------------+-----------+
|  32|     Latlux|  976|2020-03-28 03:17:10|          D|
| 170|    Fix San| 1375|2020-03-28 03:19:10|          U|
|  56|     Zathin|   65|2020-03-28 03:19:38|          U|
|  12|    Bitchip|  540|2020-03-28 03:23:45|          U|
|  83|    Fintone|  530|2020-03-28 03:26:51|          U|
|  86|   Alphazap|  913|2020-03-28 03:29:10|          U|
| 102|       Temp| 1264|2020-03-28 03:37:47|          I|
|  74|     Sonair|  988|2020-03-28 03:38:43|          U|
| 131|  Daltfresh|  493|2020-03-28 03:41:13|          I|
|  98|  Ronstring| 1678|2020-03-28 03:41:32|          D|
| 109|Ventosanzap|  453|2020-03-28 03:48:09|          I|
|  72|         It| 1489|2020-03-28 03:53:06|          U|
|  12|    Bitchip|  437|2020-03-28 03:53:24|          U|
| 117|Ventosanzap| 1498|2020-03-28 03:54:10|          I|
|  24|         It| 1352|2020-03

### Added new columns

In [58]:
# per_day_data_new = per_day_data.select(['p_id', 'p_name', 'price', 'date_timestamp','record_type',
#                               year(per_day_data['date_timestamp']).alias("Year"),
#                               month(per_day_data['date_timestamp']).alias("Month"),
#                               dayofyear(per_day_data['date_timestamp']).alias("DayOfYear"),
#                               hour(per_day_data['date_timestamp']).alias("Hour"),
#                               minute(per_day_data['date_timestamp']).alias("Minute"),
#                               second(per_day_data['date_timestamp']).alias("Second")])

per_day_data.printSchema()

root
 |-- p_id: integer (nullable = true)
 |-- p_name: string (nullable = true)
 |-- price: integer (nullable = true)
 |-- date_timestamp: timestamp (nullable = true)
 |-- record_type: string (nullable = true)



In [59]:
per_day_data.orderBy(per_day_data.date_timestamp.asc()).show(60)

+----+-----------+-----+-------------------+-----------+
|p_id|     p_name|price|     date_timestamp|record_type|
+----+-----------+-----+-------------------+-----------+
|  32|     Latlux|  976|2020-03-28 03:17:10|          D|
| 170|    Fix San| 1375|2020-03-28 03:19:10|          U|
|  56|     Zathin|   65|2020-03-28 03:19:38|          U|
|  12|    Bitchip|  540|2020-03-28 03:23:45|          U|
|  83|    Fintone|  530|2020-03-28 03:26:51|          U|
|  86|   Alphazap|  913|2020-03-28 03:29:10|          U|
| 102|       Temp| 1264|2020-03-28 03:37:47|          I|
|  74|     Sonair|  988|2020-03-28 03:38:43|          U|
| 131|  Daltfresh|  493|2020-03-28 03:41:13|          I|
|  98|  Ronstring| 1678|2020-03-28 03:41:32|          D|
| 109|Ventosanzap|  453|2020-03-28 03:48:09|          I|
|  72|         It| 1489|2020-03-28 03:53:06|          U|
|  12|    Bitchip|  437|2020-03-28 03:53:24|          U|
| 117|Ventosanzap| 1498|2020-03-28 03:54:10|          I|
|  24|         It| 1352|2020-03

### Directly append new Inserted products

In [60]:
print("\nTable of Products to be insterted in Main_Table from Per_Day_Table \n")
per_day_data.filter("record_type == 'I' ").orderBy(per_day_data.p_id.asc()).show()


Table of Products to be insterted in Main_Table from Per_Day_Table 

+----+-----------+-----+-------------------+-----------+
|p_id|     p_name|price|     date_timestamp|record_type|
+----+-----------+-----+-------------------+-----------+
| 102|       Temp| 1264|2020-03-28 03:37:47|          I|
| 103|      Alpha| 1900|2020-03-28 15:28:22|          I|
| 109|Ventosanzap|  453|2020-03-28 03:48:09|          I|
| 117|Ventosanzap| 1498|2020-03-28 03:54:10|          I|
| 131|  Daltfresh|  493|2020-03-28 03:41:13|          I|
| 133| Y-Solowarm|  848|2020-03-28 21:42:46|          I|
| 134|   Bytecard|  326|2020-03-28 09:31:57|          I|
| 158|    Matsoft|  370|2020-03-28 15:49:11|          I|
| 160|    Sonsing| 1909|2020-03-28 21:28:07|          I|
| 170|    Fix San| 1296|2020-03-28 09:23:12|          I|
| 184|  Ronstring| 1310|2020-03-28 21:58:46|          I|
| 191|     Keylex|  851|2020-03-28 15:57:16|          I|
| 192| Stronghold|  410|2020-03-28 21:22:16|          I|
+----+-----------+

In [61]:
# Insert into mainTable using union operation
mainTable_I_inserted = mainTable_new.union(per_day_data.filter("record_type == 'I' "))
#mainTable_I_inserted.filter("record_type == 'I' ").show()

In [62]:
before_insert = str(mainTable_new.count())
after_insert = str(mainTable_I_inserted.count())
total_insert = str(mainTable_I_inserted.filter("record_type == 'I' ").count())

print("Total no. of products before adding in Main_Table ----> "+ before_insert)
print("Total no. of products after adding in Main_Table----> "+ after_insert)
print("Total no. of products with record \"Inserted I\" in Main_Table----> "+ total_insert)

Total no. of products before adding in Main_Table ----> 100
Total no. of products after adding in Main_Table----> 113
Total no. of products with record "Inserted I" in Main_Table----> 13


### Update the Products

In [64]:
print("\nTable of Products to be Updated in Main_Table taken from Per_Day_Table \n")
per_day_data.filter("record_type == 'U' ").orderBy(per_day_data.p_id.asc()).show()

# Created a new DataFrame of records to be updated
from_per_day_data_U = per_day_data.filter("record_type == 'U' ")


Table of Products to be Updated in Main_Table taken from Per_Day_Table 

+----+---------+-----+-------------------+-----------+
|p_id|   p_name|price|     date_timestamp|record_type|
+----+---------+-----+-------------------+-----------+
|   5|  Bitwolf| 1078|2020-03-28 15:53:20|          U|
|  12|  Bitchip|  540|2020-03-28 03:23:45|          U|
|  12|  Bitchip|  437|2020-03-28 03:53:24|          U|
|  24|       It| 1352|2020-03-28 04:00:33|          U|
|  47| Transcof|  715|2020-03-28 15:18:40|          U|
|  56|   Zathin| 1282|2020-03-28 09:50:55|          U|
|  56|   Zathin|   65|2020-03-28 03:19:38|          U|
|  72|       It| 1489|2020-03-28 03:53:06|          U|
|  74|   Sonair|  988|2020-03-28 03:38:43|          U|
|  83|  Fintone|  530|2020-03-28 03:26:51|          U|
|  86| Alphazap|  913|2020-03-28 03:29:10|          U|
|  95|Ronstring|  150|2020-03-28 21:42:50|          U|
|  97|Voyatouch|  364|2020-03-28 09:40:22|          U|
| 170|  Fix San| 1375|2020-03-28 03:19:10|    

In [65]:
from_per_day_data_U_list = from_per_day_data_U.select("p_id").collect()

print("Output of our collect operation----> ",from_per_day_data_U_list[0])
print("get the value of p_id -----> ",from_per_day_data_U_list[0].p_id)

# List comprehension
p_id_list_U = [ i.p_id for i in from_per_day_data_U_list ]
print("\nList of p_id which we have to update taken from \"per_day_data\"")
print(p_id_list_U)

total_update = str(len((p_id_list_U)))
print("Total no. of products with record \"Updated\" ------>"+total_update)

Output of our collect operation---->  Row(p_id=72)
get the value of p_id ----->  72

List of p_id which we have to update taken from "per_day_data"
[72, 24, 12, 83, 74, 56, 86, 170, 12, 47, 5, 95, 97, 56]
Total no. of products with record "Updated" ------>14


In [66]:
# mainTable_U_inserted.groupBy("p_id").count().filter("count > 1").show()

print("\nTable of Products to be Updated in Main_Table taken from Main_Table \n")
from_mainTable_U = mainTable_I_inserted.filter( col('p_id').isin(p_id_list_U))
from_mainTable_U.show()


Table of Products to be Updated in Main_Table taken from Main_Table 

+----+---------+-----+-------------------+-----------+
|p_id|   p_name|price|     date_timestamp|record_type|
+----+---------+-----+-------------------+-----------+
|   5|  Bitwolf| 1338|2020-04-02 17:11:15|          A|
|  12|  Bitchip| 1289|2020-08-02 12:35:48|          A|
|  24|       It|  352|2020-02-14 07:22:21|          A|
|  47| Transcof| 1987|2020-02-17 20:30:59|          A|
|  56|   Zathin|  892|2020-11-02 23:25:21|          A|
|  72|       It| 1426|2020-09-02 03:43:06|          A|
|  74|   Sonair|  271|2020-02-25 13:47:52|          A|
|  83|  Fintone|  828|2020-02-26 16:03:20|          A|
|  86| Alphazap|  701|2020-02-02 04:04:37|          A|
|  95|Ronstring|  672|2020-02-19 01:59:56|          A|
|  97|Voyatouch|  544|2020-02-14 14:52:43|          A|
| 170|  Fix San| 1296|2020-03-28 09:23:12|          I|
+----+---------+-----+-------------------+-----------+



In [67]:
# Performed Union operation on the above tables
mT_and_pDD_union = from_mainTable_U.union(from_per_day_data_U)
mT_and_pDD_union.orderBy(mT_and_pDD_union.p_id).show()

+----+--------+-----+-------------------+-----------+
|p_id|  p_name|price|     date_timestamp|record_type|
+----+--------+-----+-------------------+-----------+
|   5| Bitwolf| 1338|2020-04-02 17:11:15|          A|
|   5| Bitwolf| 1078|2020-03-28 15:53:20|          U|
|  12| Bitchip| 1289|2020-08-02 12:35:48|          A|
|  12| Bitchip|  437|2020-03-28 03:53:24|          U|
|  12| Bitchip|  540|2020-03-28 03:23:45|          U|
|  24|      It| 1352|2020-03-28 04:00:33|          U|
|  24|      It|  352|2020-02-14 07:22:21|          A|
|  47|Transcof|  715|2020-03-28 15:18:40|          U|
|  47|Transcof| 1987|2020-02-17 20:30:59|          A|
|  56|  Zathin| 1282|2020-03-28 09:50:55|          U|
|  56|  Zathin|  892|2020-11-02 23:25:21|          A|
|  56|  Zathin|   65|2020-03-28 03:19:38|          U|
|  72|      It| 1426|2020-09-02 03:43:06|          A|
|  72|      It| 1489|2020-03-28 03:53:06|          U|
|  74|  Sonair|  988|2020-03-28 03:38:43|          U|
|  74|  Sonair|  271|2020-02

In [68]:
# Performed GroupBy operation on P_ID and took the latest date
for_join_mT_and_pDD = mT_and_pDD_union.groupBy("p_id").agg({"date_timestamp":"max"}).withColumnRenamed("max(date_timestamp)","date_timestamp_1")
for_join_mT_and_pDD = for_join_mT_and_pDD.withColumnRenamed("p_id","p_id_1")
for_join_mT_and_pDD.show()

+------+-------------------+
|p_id_1|   date_timestamp_1|
+------+-------------------+
|    12|2020-08-02 12:35:48|
|    47|2020-03-28 15:18:40|
|    86|2020-03-28 03:29:10|
|     5|2020-04-02 17:11:15|
|    72|2020-09-02 03:43:06|
|    97|2020-03-28 09:40:22|
|    24|2020-03-28 04:00:33|
|    95|2020-03-28 21:42:50|
|    56|2020-11-02 23:25:21|
|   170|2020-03-28 09:23:12|
|    83|2020-03-28 03:26:51|
|    74|2020-03-28 03:38:43|
+------+-------------------+



In [69]:
# Performed Join opteration to pick only latest updates only
joined = mT_and_pDD_union.join(for_join_mT_and_pDD, (
    mT_and_pDD_union.p_id == for_join_mT_and_pDD.p_id_1) & (
    mT_and_pDD_union.date_timestamp == for_join_mT_and_pDD.date_timestamp_1) ,'inner')

joined = joined.select(['p_id','p_name','price','date_timestamp','record_type'])
joined.count()

12

In [79]:
mT_and_pDD_union.filter("p_id == 74").show()

+----+------+-----+-------------------+-----------+
|p_id|p_name|price|     date_timestamp|record_type|
+----+------+-----+-------------------+-----------+
|  74|Sonair|  271|2020-02-25 13:47:52|          A|
|  74|Sonair|  988|2020-03-28 03:38:43|          U|
+----+------+-----+-------------------+-----------+



In [72]:
# First remove the p_id from Main_Table which we have to update
mainTable_U_updated = mainTable_I_inserted.filter(~col('p_id').isin(p_id_list_U))
mainTable_U_updated.orderBy("p_id").count()

101

In [80]:
# Then Add the Updated P_ID to the Main_Table
mainTable_U_updated_new = mainTable_U_updated.union(joined)
after_update = str(mainTable_U_updated_new.orderBy("p_id").count())
mainTable_U_updated_new.orderBy("p_id").count()

113

### Drop deleted products

In [None]:
# Get the list of p_id which we have to delete
to_be_deleted = per_day_data.filter("record_type == 'D' ").collect()
p_id_list_D = [ i.p_id for i in to_be_deleted ]

print("\nList of p_id which we have to deleted taken from \"per_day_data\"")
print(p_id_list_D)
total_deleted = str(len((p_id_list_D)))
print("\nTotal no. of products with record \"Updated\" ------>"+str(len((p_id_list_D))))

In [None]:
# Remove the deleted p_id from main_table
mainTable_D_deleted = mainTable_U_updated_new.filter(~col('p_id').isin(p_id_list_D))

after_delete = str(mainTable_D_deleted.count())

In [None]:
print("\nSummary of the process\n")
print("Total no. of products before insert----->"+before_insert)
print("Total no. of products after insert------>"+after_insert)
print("Total no. of inserted  ----------------->"+total_insert)
print("Total no. of products after update ----->"+after_update)
print("Total no. of products to be deleting---->"+total_deleted)
print("Total no. of products after deleting---->"+after_delete)

In [81]:
path = "/home/bluepi/Downloads/Update/Updated Product/Latest Product/"

# This writes the DF in different files becaues of parallism 
try :
    mainTable_D_deleted.write.format('csv').save(os.path.join(path, 'main_table'))
except :
    mainTable_D_deleted.write.mode('overwrite').format('csv').save(os.path.join(path, 'main_table'))

# When we want our DF in one file only
# when we use coalesce function we loses parallelism.
# mainTable_D_deleted.coalesce(1).write.csv(os.path.join(path,'main_table'))