In [281]:
#setting up environement and importing all required packages.
import os
os.environ["SPARK_HOME"] = "/usr/share/spark-3.1.1-bin-hadoop3.2"
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField,StringType,IntegerType

In [279]:
#defining and configuring sparksession
spark = SparkSession\
        .builder\
        .appName('Retail_store_Pipeline')\
        .master('local[*]')\
        .getOrCreate()

In [282]:
try:
    #Specifying File Schemas (i.e. Mentioning column name, datatype and Null value status of each column):-
    #1 Aisles Schema:-
    aisles_schema= StructType([StructField('aisle_id',IntegerType(),False),
                              StructField('aisle',StringType(),True)])
    #2 Departments_schema:-
    department_schema=StructType([StructField('department_id',IntegerType(),False),
                                    StructField('department',StringType(),True)])
    #3 order_schema:-
    orders_schema=StructType([StructField('order_id',IntegerType(),False),
                                  StructField('user_id',IntegerType(),True),
                                  StructField('eval_set',StringType(),True),
                                  StructField('order_number',IntegerType(),True),
                                  StructField('order_dow',IntegerType(),True),
                                  StructField('order_hour_of_day',IntegerType(),True),
                                  StructField('days_since_prior_order',IntegerType(),True)])
    #4 prior_order_schema and train_order_schema:-
    prior_order_schema=StructType([StructField('order_id',IntegerType(),True),
                                  StructField('product_id',IntegerType(),True),
                                  StructField('add_to_cart_order',IntegerType(),True),
                                  StructField('reordered',IntegerType(),True)])
    #5 Products_schema:-
    products_schema=StructType([StructField('product_id',IntegerType(),False),
                                  StructField('product_name',StringType(),True),
                                  StructField('aisle_id',StringType(),True),
                                  StructField('department_id',StringType(),True)])
except:
    print('error occoured..!!')
finally:
    print("All done...!!")

All done...!!


In [283]:
'''defining source and destination data lake path as variables.
Note:- The data has been copied to local and then given the local path here as 
I was facing issues with Insofe cluster for my IP.'''

dataset_path='/home/fai10105/Project/Data_sets/' #Data source location
output_path=dataset_path+"/output/"              #Output location


In [284]:
'''A) Extracting Data:-reading files as dataframes except products:-'''
try:
    #aisles
    aisles_df = spark.read.schema(aisles_schema)\
                    .option("delimeter",",").option("header","True")\
                    .csv(dataset_path+'aisles.csv')
    #departments:-
    department_df = spark.read.schema(department_schema)\
                    .option("header","True")\
                    .csv(dataset_path+'departments.csv')
    #orders:-
    orders_df = spark.read.schema(orders_schema)\
                    .option("header","True")\
                    .csv(dataset_path+'orders.csv')
    #prior_order:-
    prior_order_df = spark.read\
                    .schema(prior_order_schema)\
                    .option("header","True")\
                    .csv(dataset_path+'prior_order.csv')
    #train_order:-
    train_order_df= spark.read\
                    .schema(prior_order_schema)\
                    .option("header","True")\
                    .csv(dataset_path+'train_order.csv')
except:
    print('error occoured..!')
finally:
    print('All done...!!')

All done...!!


In [285]:
'''products:- reading products file as rdd first as it has some noises later on 
it has been converted to data frame after removing noises. 
All other files have been read as data frames'''
try:
    products_rdd = spark.sparkContext\
                    .textFile(dataset_path+'products.csv')

    #removing noises from products data:- removing unwanted characters from records like:- '\' , '"' , ',' etc
    #after removing noises we will convert it to dataframe
    def remove_noise(row):
        if '"' in row:
            first=row.index('"')
            last=row.index('"',first+1)
            part_a=row[0:first]
            part_b=row[first:last+1].replace(", "," - ").replace('"','')
            part_c=row[last+1:]
            row=(part_a+part_b+part_c).replace('\"',"").split(",")
            return [int(row[0]),row[1],row[2],row[3]]
        else:
            row = row.replace('\"',"").split(",")
            return [int(row[0]),row[1],row[2],row[3]]

    header=products_rdd.first()
    products_rdd_mo=products_rdd.filter(lambda x : x!=header).map(lambda x : remove_noise(x))
    products_df=products_rdd_mo.toDF(products_schema) # product dataframe creation from product rdd after removing noises.
except:
    print('error occoured..!!')
finally:
    print('all fine')

all fine


In [286]:
#creating dictionary which will store values in form of (table_name,dataframe_name) as (key,value)
all_data_frames={'Aisles': aisles_df, 'Department':department_df,'Products':products_df,
                 'Orders':orders_df,'Prior_order':prior_order_df,'Train_order':train_order_df}

In [288]:
#Function for Displaying the columns names in each data frames
try:
    def checking_column_names(all_data_frames):
        print('Column names in all the tables are as follows:\n')
        for table_name,df_name in all_data_frames.items():
            print(table_name,':-')
            print(df_name.columns,"\n")
except:
    print('error occoured..!!')
finally:
    print('All Done..!!')

All Done..!!


In [289]:
#Displaying the columns names in each data frames    
checking_column_names(all_data_frames)

Column names in all the tables are as follows:

Aisles :-
['aisle_id', 'aisle'] 

Department :-
['department_id', 'department'] 

Products :-
['product_id', 'product_name', 'aisle_id', 'department_id'] 

Orders :-
['order_id', 'user_id', 'eval_set', 'order_number', 'order_dow', 'order_hour_of_day', 'days_since_prior_order'] 

Prior_order :-
['order_id', 'product_id', 'add_to_cart_order', 'reordered'] 

Train_order :-
['order_id', 'product_id', 'add_to_cart_order', 'reordered'] 



In [290]:
#Display the datatypes of the columns in each data frames
try:
    def checking_data_types(all_data_frames):
        print('datatypes of Column names in all the tables are as follows:\n')
        for table_name,df_name in all_data_frames.items():
            print(table_name,':-')
            df_name.printSchema()
            print()
except:
    print('error occoured..!!')
finally:
    print('All Done..!!')

All Done..!!


In [291]:
#Displaying data types using the function created
checking_data_types(all_data_frames)   

datatypes of Column names in all the tables are as follows:

Aisles :-
root
 |-- aisle_id: integer (nullable = true)
 |-- aisle: string (nullable = true)


Department :-
root
 |-- department_id: integer (nullable = true)
 |-- department: string (nullable = true)


Products :-
root
 |-- product_id: integer (nullable = false)
 |-- product_name: string (nullable = true)
 |-- aisle_id: string (nullable = true)
 |-- department_id: string (nullable = true)


Orders :-
root
 |-- order_id: integer (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- eval_set: string (nullable = true)
 |-- order_number: integer (nullable = true)
 |-- order_dow: integer (nullable = true)
 |-- order_hour_of_day: integer (nullable = true)
 |-- days_since_prior_order: integer (nullable = true)


Prior_order :-
root
 |-- order_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- add_to_cart_order: integer (nullable = true)
 |-- reordered: integer (nullable = true)


Train_order :-


In [292]:
#Check for null values in the columns:-
try:
    def check_for_null(all_data_frames):
        print('Count of Null values in each Column in all the tables is as follows:\n')
        for table_name,df_name in all_data_frames.items():
            print(table_name,':-')
            print({col: df_name.filter(df_name[col].isNull()).count() for col in df_name.columns},'\n')
except:
    print("Error occoured")
finally:
    print("All done...!!")

All done...!!


In [293]:
#Displaying Null values usimg the function created
check_for_null(all_data_frames) 

Count of Null values in each Column in all the tables is as follows:

Aisles :-
{'aisle_id': 0, 'aisle': 0} 

Department :-
{'department_id': 0, 'department': 0} 

Products :-
{'product_id': 0, 'product_name': 0, 'aisle_id': 0, 'department_id': 0} 

Orders :-
{'order_id': 0, 'user_id': 0, 'eval_set': 0, 'order_number': 0, 'order_dow': 0, 'order_hour_of_day': 0, 'days_since_prior_order': 82683} 

Prior_order :-
{'order_id': 0, 'product_id': 0, 'add_to_cart_order': 0, 'reordered': 0} 

Train_order :-
{'order_id': 0, 'product_id': 0, 'add_to_cart_order': 0, 'reordered': 0} 



In [294]:
#B) Transformation:- Data Processing Part
try:
    #Creating Tables from dataframes for aggregation purposes:-
    aisles_df.createOrReplaceTempView('aisles') # aisles table
    department_df.createOrReplaceTempView('department') # department table
    orders_df.createOrReplaceTempView('orders') # orders table
    prior_order_df.createOrReplaceTempView('prior_order') #prior_order table
    products_df.createOrReplaceTempView('products') #products table
    train_order_df.createOrReplaceTempView('train_order') #train_order table
except:
    print("Error occoured")
finally:
    print("All done...!!")

All done...!!


In [295]:
spark.sql("Select * from aisles").show(2)

+--------+--------------------+
|aisle_id|               aisle|
+--------+--------------------+
|       1|prepared soups sa...|
|       2|   specialty cheeses|
+--------+--------------------+
only showing top 2 rows



In [296]:
spark.sql("Select * from department").show(2)

+-------------+----------+
|department_id|department|
+-------------+----------+
|            1|    frozen|
|            2|     other|
+-------------+----------+
only showing top 2 rows



In [297]:
spark.sql("Select * from orders").show(2)

+--------+-------+--------+------------+---------+-----------------+----------------------+
|order_id|user_id|eval_set|order_number|order_dow|order_hour_of_day|days_since_prior_order|
+--------+-------+--------+------------+---------+-----------------+----------------------+
| 1363380|     50|   prior|           1|        3|                9|                  null|
| 3131103|     50|   prior|           2|        6|               12|                  null|
+--------+-------+--------+------------+---------+-----------------+----------------------+
only showing top 2 rows



In [298]:
spark.sql("Select * from prior_order").show(2)

+--------+----------+-----------------+---------+
|order_id|product_id|add_to_cart_order|reordered|
+--------+----------+-----------------+---------+
|      12|     30597|                1|        1|
|      12|     15221|                2|        1|
+--------+----------+-----------------+---------+
only showing top 2 rows



In [299]:
spark.sql("Select * from train_order").show(2)

+--------+----------+-----------------+---------+
|order_id|product_id|add_to_cart_order|reordered|
+--------+----------+-----------------+---------+
|    1077|     13176|                1|        1|
|    1077|     39922|                2|        1|
+--------+----------+-----------------+---------+
only showing top 2 rows



In [300]:
spark.sql("Select * from products").show(2)

+----------+--------------------+--------+-------------+
|product_id|        product_name|aisle_id|department_id|
+----------+--------------------+--------+-------------+
|         1|Chocolate Sandwic...|      61|           19|
|         2|    All-Seasons Salt|     104|           13|
+----------+--------------------+--------+-------------+
only showing top 2 rows



In [302]:
#aggregating products, prior_order and train_order data first (just to make the process easy abd optimized)
aggregated_table_part_1 =spark.sql('''SELECT p.product_id, product_name, aisle_id, department_id, order_id, add_to_cart_order, 
                                      reordered FROM products p INNER JOIN train_order to ON to.product_id=p.product_id
                                      UNION ALL
                                      SELECT p.product_id, product_name, aisle_id, department_id, order_id,add_to_cart_order,
                                      reordered FROM products p INNER JOIN prior_order po ON po.product_id=p.product_id''')

#creating table from aggregated_table_part_1 dataframe for further aggregation
aggregated_table_part_1.createOrReplaceTempView("Combined_table")

spark.sql("Select * from combined_table").show(2)

+----------+--------------------+--------+-------------+--------+-----------------+---------+
|product_id|        product_name|aisle_id|department_id|order_id|add_to_cart_order|reordered|
+----------+--------------------+--------+-------------+--------+-----------------+---------+
|         1|Chocolate Sandwic...|      61|           19| 1290664|                3|        0|
|         2|    All-Seasons Salt|     104|           13| 1455635|               26|        0|
+----------+--------------------+--------+-------------+--------+-----------------+---------+
only showing top 2 rows



In [307]:
#aggregating all tables as per the data model
fully_combined_table = spark.sql('''SELECT product_id, product_name, t.aisle_id,aisle, d.department_id, department, 
                                    o.order_id, user_id,add_to_cart_order, reordered,eval_set, order_number, order_dow, 
                                    order_hour_of_day, days_since_prior_order
                                    FROM Combined_table t 
                                    INNER JOIN orders o ON o.order_id=t.order_id 
                                    INNER JOIN aisles a ON a.aisle_id=t.aisle_id
                                    INNER JOIN department d ON d.department_id=t.department_id''')

fully_combined_table.select("product_id","order_id", "user_id","add_to_cart_order").show(5)

+----------+--------+-------+-----------------+
|product_id|order_id|user_id|add_to_cart_order|
+----------+--------+-------+-----------------+
|         1| 1290664|  79605|                3|
|         2| 1455635|  27086|               26|
|         3| 2188727|  83549|                1|
|        10| 2180721| 146967|                4|
|        10| 1965683| 205353|               13|
+----------+--------+-------+-----------------+
only showing top 5 rows



In [310]:
                
#C Loading results to destination:- writing back tranformed data to destination (data lake):-
#Note:- Here I have used coalesce so as to repartition the data to save it as a single file so as to make it easy to 
#use it for visualization part. However it is not a recommended step as repartition is a costly process.

fully_combined_table.coalesce(1).write.option("header",True).csv(output_path)

print("done")

done
