In [1]:
pip install pyspark

Note: you may need to restart the kernel to use updated packages.


In [3]:
pip install findspark

Note: you may need to restart the kernel to use updated packages.


In [2]:
from pyspark.sql import SparkSession

In [4]:
spark = SparkSession \
    .builder \
    .appName("how to read csv file") \
    .getOrCreate()

In [5]:
spark.version

'3.0.1'

In [6]:
import os

In [7]:
os.getcwd()

'C:\\Users\\aruverma'

In [11]:
# 1. Load data into Spark Data frames
aisles=spark.read.option("inferSchema","true").option("header","true").csv("aisles.csv").toDF("aisle_id","aisle")

In [12]:
# aisles.show(2)

+--------+--------------------+
|aisle_id|               aisle|
+--------+--------------------+
|       1|prepared soups sa...|
|       2|   specialty cheeses|
+--------+--------------------+
only showing top 2 rows



In [13]:
departments=spark.read.option("inferSchema","true").option("header","true").csv("departments.csv").toDF("department_id","department")

In [14]:
departments.show(2)

+-------------+----------+
|department_id|department|
+-------------+----------+
|            1|    frozen|
|            2|     other|
+-------------+----------+
only showing top 2 rows



In [15]:
products=spark.read.option("inferSchema","true").option("header","true").csv("products.csv").toDF("product_id","product_name","aisle_id","department_id")

In [16]:
products.show(2)

+----------+--------------------+--------+-------------+
|product_id|        product_name|aisle_id|department_id|
+----------+--------------------+--------+-------------+
|         1|Chocolate Sandwic...|      61|           19|
|         2|    All-Seasons Salt|     104|           13|
+----------+--------------------+--------+-------------+
only showing top 2 rows



In [17]:
orders=spark.read.option("inferSchema","true").option("header","true").csv("orders.csv").toDF("order_id","user_id","evala_set","order_number","order_dow","order_hour_of_day","days_since_prior_order")

In [18]:
orders.show(2)

+--------+-------+---------+------------+---------+-----------------+----------------------+
|order_id|user_id|evala_set|order_number|order_dow|order_hour_of_day|days_since_prior_order|
+--------+-------+---------+------------+---------+-----------------+----------------------+
| 2539329|      1|    prior|           1|        2|                8|                  null|
| 2398795|      1|    prior|           2|        3|                7|                  15.0|
+--------+-------+---------+------------+---------+-----------------+----------------------+
only showing top 2 rows



In [21]:
orders_train=spark.read.option("inferSchema","true").option("header","true").csv("order_products__train.csv").toDF("order_id","product_id","add_to_cart_order","reordered")

In [22]:
orders_train.show(2)

+--------+----------+-----------------+---------+
|order_id|product_id|add_to_cart_order|reordered|
+--------+----------+-----------------+---------+
|       1|     49302|                1|        1|
|       1|     11109|                2|        1|
+--------+----------+-----------------+---------+
only showing top 2 rows



In [23]:
# 2. Merge all data frames based on common key and create a single Data Frames
df = products.join(aisles, aisles.aisle_id == products.aisle_id,"left").drop(aisles.aisle_id).join(departments, departments.department_id == products.department_id,"left").drop(departments.department_id).join(orders_train, orders_train.product_id == products.product_id,"right").drop(orders_train.product_id).join(orders, orders.order_id == orders_train.order_id, "left").drop(orders.order_id)

In [25]:
# 3. Check for any missing Data (if any such as null)
filtered = df.filter(" is not null and ".join(df.columns) + " is not null")
filtered.count()

1384617

In [26]:
# 4. List the most Ordered products (Top 10)
from pyspark.sql.functions import col
filtered.groupBy("product_name").count().sort(col("count").desc()).show(10,False)

+----------------------+-----+
|product_name          |count|
+----------------------+-----+
|Banana                |18726|
|Bag of Organic Bananas|15480|
|Organic Strawberries  |10894|
|Organic Baby Spinach  |9784 |
|Large Lemon           |8135 |
|Organic Avocado       |7409 |
|Organic Hass Avocado  |7293 |
|Strawberries          |6494 |
|Limes                 |6033 |
|Organic Raspberries   |5546 |
+----------------------+-----+
only showing top 10 rows



In [38]:
# 5. Check whether people usually reorder the same previous ordered products 
from pyspark.sql.functions import *
filtered.groupBy("product_name").agg(avg("reordered"),count("reordered")).sort(col("count(reordered)").desc()).show(10,False)

+----------------------+------------------+----------------+
|product_name          |avg(reordered)    |count(reordered)|
+----------------------+------------------+----------------+
|Banana                |0.8841717398269785|18726           |
|Bag of Organic Bananas|0.8631782945736434|15480           |
|Organic Strawberries  |0.7897007527079126|10894           |
|Organic Baby Spinach  |0.8232829108748978|9784            |
|Large Lemon           |0.7280885064535956|8135            |
|Organic Avocado       |0.8403293291942232|7409            |
|Organic Hass Avocado  |0.8284656519950637|7293            |
|Strawberries          |0.7369879889128427|6494            |
|Limes                 |0.701806729653572 |6033            |
|Organic Raspberries   |0.7715470609448251|5546            |
+----------------------+------------------+----------------+
only showing top 10 rows



In [39]:
# 6. List most reordered products
filtered.groupBy("product_name").agg(avg("reordered"),count("reordered")).show(10,False)

+-----------------------------------------------+-------------------+----------------+
|product_name                                   |avg(reordered)     |count(reordered)|
+-----------------------------------------------+-------------------+----------------+
|Pesto Cultured Cashew Cheese                   |0.5                |6               |
|Organic Lentil Beans                           |0.5804195804195804 |143             |
|Organic Cinnamon Crunch Cereal                 |0.5026041666666666 |384             |
|Real Aged Cheddar Macaroni & Cheese            |0.6230769230769231 |130             |
|Pizza Sauce                                    |0.5340909090909091 |176             |
|XL Emerald White Seedless Grapes               |0.6220472440944882 |635             |
|Pods Spring Meadow Scent Laundry Detergent Pacs|0.45454545454545453|33              |
|Simply Stock Organic Vegetable Unsalted Stock  |0.2807017543859649 |57              |
|Organic Raw Multigreen Kobmbucha          

In [32]:
# 7. Most important department and aisle (by number of products)
df_prods = products.join(aisles, aisles.aisle_id == products.aisle_id,"left").drop(aisles.aisle_id).join(departments, departments.department_id == products.department_id,"left").drop(departments.department_id)
df_prods.groupBy("aisle").count().sort(col("count").desc()).show(10,False)
df_prods.groupBy("department").count().sort(col("count").desc()).show(10,False)

+--------------------+-----+
|aisle               |count|
+--------------------+-----+
|missing             |1258 |
|candy chocolate     |1246 |
|ice cream ice       |1091 |
|vitamins supplements|1038 |
|yogurt              |1026 |
|chips pretzels      |989  |
|tea                 |894  |
|packaged cheese     |891  |
|frozen meals        |880  |
|cookies cakes       |874  |
+--------------------+-----+
only showing top 10 rows

+---------------+-----+
|department     |count|
+---------------+-----+
|personal care  |6563 |
|snacks         |6264 |
|pantry         |5371 |
|beverages      |4365 |
|frozen         |4007 |
|dairy eggs     |3449 |
|household      |3084 |
|canned goods   |2092 |
|dry goods pasta|1858 |
|produce        |1684 |
+---------------+-----+
only showing top 10 rows



In [33]:
# 8. Get the top 10 departments
filtered.groupBy("department").count().sort(col("count").desc()).show(10,False)

+---------------+------+
|department     |count |
+---------------+------+
|produce        |409087|
|dairy eggs     |217051|
|snacks         |118862|
|beverages      |114046|
|frozen         |100426|
|pantry         |81242 |
|bakery         |48394 |
|canned goods   |46799 |
|deli           |44291 |
|dry goods pasta|38713 |
+---------------+------+
only showing top 10 rows



In [34]:
# 9. List top 10 products ordered in the morning (6 AM to 11 AM)
filtered.filter("order_hour_of_day >= 6 and order_hour_of_day<=11").groupBy("product_name").count().sort(col("count").desc()).show(10,False)

+----------------------+-----+
|product_name          |count|
+----------------------+-----+
|Banana                |6074 |
|Bag of Organic Bananas|4929 |
|Organic Strawberries  |3484 |
|Organic Baby Spinach  |3028 |
|Large Lemon           |2574 |
|Organic Avocado       |2292 |
|Organic Hass Avocado  |2233 |
|Strawberries          |2173 |
|Limes                 |1815 |
|Organic Raspberries   |1803 |
+----------------------+-----+
only showing top 10 rows

