In [1]:
# Import
from pyspark.sql import SparkSession
import pandas as pd
import matplotlib.pyplot as plt
from pyspark.sql.functions import col
from pyspark.sql.functions import collect_list, collect_set # collect_set return set of values (no duplicate)
from pyspark.sql import functions as F
from itertools import combinations # not efficient for big data
from pyspark.sql.functions import col, udf, explode, size
from pyspark.sql.types import ArrayType, StringType
import findspark
findspark.init()

In [2]:
# Create SparkSession
spark = SparkSession.builder\
    .master("local[*]") \
    .appName("Map Reduce with Pyspark") \
    .config("spark.driver.memory", "16g") \
    .getOrCreate()

25/01/07 07:53:36 WARN Utils: Your hostname, helium resolves to a loopback address: 127.0.1.1; using 10.0.106.13 instead (on interface enp6s0)
25/01/07 07:53:36 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/01/07 07:53:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/01/07 07:53:38 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [5]:
data_path = "dataset_sales/joined_all_data.csv"
df = spark.read.csv(data_path, header=True)


In [6]:
df.show(12)

+--------+----------+-----------------+---------+--------------------+--------+-------------+--------------------+----------+-------+--------+------------+---------+-----------------+----------------------+
|order_id|product_id|add_to_cart_order|reordered|        product_name|aisle_id|department_id|               aisle|department|user_id|eval_set|order_number|order_dow|order_hour_of_day|days_since_prior_order|
+--------+----------+-----------------+---------+--------------------+--------+-------------+--------------------+----------+-------+--------+------------+---------+-----------------+----------------------+
|       2|     33120|                1|        1|  Organic Egg Whites|      86|           16|                eggs|dairy eggs| 202279|   prior|           3|        5|                9|                   8.0|
|       2|     28985|                2|        1|Michigan Organic ...|      83|            4|    fresh vegetables|   produce| 202279|   prior|           3|        5|       

In [8]:
# Filter rows where the 'department' column equals 'produce'
produce_department = df.filter(df.department == 'produce')
produce_department.cache()


DataFrame[order_id: string, product_id: string, add_to_cart_order: string, reordered: string, product_name: string, aisle_id: string, department_id: string, aisle: string, department: string, user_id: string, eval_set: string, order_number: string, order_dow: string, order_hour_of_day: string, days_since_prior_order: string]

In [9]:
produce_department.count()

                                                                                

9479291

In [10]:
produce_department.show(10)

+--------+----------+-----------------+---------+--------------------+--------+-------------+--------------------+----------+-------+--------+------------+---------+-----------------+----------------------+
|order_id|product_id|add_to_cart_order|reordered|        product_name|aisle_id|department_id|               aisle|department|user_id|eval_set|order_number|order_dow|order_hour_of_day|days_since_prior_order|
+--------+----------+-----------------+---------+--------------------+--------+-------------+--------------------+----------+-------+--------+------------+---------+-----------------+----------------------+
|       2|     28985|                2|        1|Michigan Organic ...|      83|            4|    fresh vegetables|   produce| 202279|   prior|           3|        5|                9|                   8.0|
|       2|     17794|                6|        1|             Carrots|      83|            4|    fresh vegetables|   produce| 202279|   prior|           3|        5|       

## Grouping orders

In [34]:
produce_ordered_products = produce_department.select(produce_department['product_name'], produce_department['order_id'])
produce_ordered_products.cache()
produce_ordered_products.show(3)



+--------------------+--------+
|        product_name|order_id|
+--------------------+--------+
|Michigan Organic ...|       2|
|             Carrots|       2|
|Classic Blend Col...|       2|
+--------------------+--------+
only showing top 3 rows



                                                                                

In [33]:
# Group by 'order_id' and collect product names into a set 
produce_grouped_order_id = produce_ordered_products.groupBy("order_id").agg(F.collect_set("product_name").alias("products"))
produce_grouped_order_id.show(13)




+--------+--------------------+
|order_id|            products|
+--------+--------------------+
| 1000240|[Organic Baby Aru...|
| 1000280|[Organic Hass Avo...|
| 1000665|[Organic Cilantro...|
| 1000795|[Organic Baby Aru...|
| 1000839|[Organic Tomato C...|
| 1000888|[Organic Spaghett...|
| 1001866|[Organic Baby Spi...|
| 1002011|[Organic Baby Spi...|
|  100227|[Organic Fuji App...|
| 1002442|[Organic Cilantro...|
|  100263|            [Banana]|
| 1002883|[Organic Romaine ...|
| 1002887|      [Carrot Chips]|
+--------+--------------------+
only showing top 13 rows



                                                                                

## Select orders that has more from 3 products

In [38]:
# Select only orders contains from 3 products
produce_orders_from_3_items = produce_grouped_order_id.filter(F.size("products") > 2)
produce_orders_from_3_items.cache()

print("Count of orders where number of products is more than 2 products:", produce_orders_from_3_items.count())



Count of orders where item_count is smaller than 3 products: 1408241


                                                                                

In [40]:
# Add a new column 'item_count' that contains the length of the list in the 'products' column
produce_orders_from_3_items = produce_orders_from_3_items.withColumn(
    "item_count", F.size("products")
)

produce_orders_from_3_items.show(5)

+--------+--------------------+--------------+----------+
|order_id|            products|has_duplicates|item_count|
+--------+--------------------+--------------+----------+
| 1000240|[Organic Baby Aru...|         false|         4|
| 1000665|[Organic Cilantro...|         false|        12|
| 1000795|[Organic Baby Aru...|         false|         4|
| 1000888|[Organic Spaghett...|         false|         5|
| 1001866|[Organic Baby Spi...|         false|         3|
+--------+--------------------+--------------+----------+
only showing top 5 rows



In [41]:
produce_orders_from_3_items.count()

1408241

## Mapping

In [42]:
# Step 1: Define a UDF to generate triplets
#The generate_triplets function creates all possible combinations of 3 products from the products column.
def generate_triplets(products):
    return list(combinations(products, 3))

triplets_udf = udf(generate_triplets, ArrayType(ArrayType(StringType())))

In [43]:
# Step 2: Apply the UDF to create a column with triplets
#
grouped_order_id_with_triplets = produce_orders_from_3_items.withColumn("triplets", triplets_udf(col("products")))

In [44]:
# Step 3: Explode triplets into separate rows
# explode(col("triplets")) flattens the array of triplets into individual rows, where each row represents one triplet.
triplets_exploded = grouped_order_id_with_triplets.select(explode(col("triplets")).alias("triplet"))

In [46]:
triplets_exploded.select('triplet').show()

+--------------------+
|             triplet|
+--------------------+
|[Organic Baby Aru...|
|[Organic Baby Aru...|
|[Organic Baby Aru...|
|[Organic Baby Spi...|
|[Organic Cilantro...|
|[Organic Cilantro...|
|[Organic Cilantro...|
|[Organic Cilantro...|
|[Organic Cilantro...|
|[Organic Cilantro...|
|[Organic Cilantro...|
|[Organic Cilantro...|
|[Organic Cilantro...|
|[Organic Cilantro...|
|[Organic Cilantro...|
|[Organic Cilantro...|
|[Organic Cilantro...|
|[Organic Cilantro...|
|[Organic Cilantro...|
|[Organic Cilantro...|
+--------------------+
only showing top 20 rows



                                                                                

### Reduce

In [47]:
# Step 4: Count occurrences of each triplet
# groups identical triplets and counts their occurrences across all orders.
triplet_counts = triplets_exploded.groupBy("triplet").count()


In [48]:
# Step 5: Sort by count in descending order
top_triplets = triplet_counts.orderBy(col("count").desc())


In [49]:
top_triplets.show()



+--------------------+-----+
|             triplet|count|
+--------------------+-----+
|[Organic Strawber...|12312|
|[Organic Baby Spi...| 9644|
|[Organic Raspberr...| 9525|
|[Organic Baby Spi...| 9393|
|[Organic Raspberr...| 9274|
|[Organic Baby Spi...| 8658|
|[Organic Baby Spi...| 8201|
|[Organic Baby Spi...| 8137|
|[Organic Raspberr...| 7891|
|[Large Lemon, Org...| 7044|
|[Large Lemon, Lim...| 6182|
|[Organic Baby Spi...| 6166|
|[Organic Strawber...| 6146|
|[Large Lemon, Lim...| 6109|
|[Organic Baby Spi...| 6077|
|[Limes, Organic A...| 5941|
|[Organic Baby Spi...| 5838|
|[Organic Baby Spi...| 5829|
|[Cucumber Kirby, ...| 5711|
|[Organic Raspberr...| 5706|
+--------------------+-----+
only showing top 20 rows



                                                                                

In [50]:
top_triplets.count()

                                                                                

10203741

In [28]:
# top_3_triplets.write.csv("output/top_3_triplets.csv", header=True)


In [29]:
# from pyspark.sql.functions import concat_ws

# # Convert the array column "triplet" into a string column
# flattened_triplets = top_3_triplets.withColumn("triplet", concat_ws(",", "triplet"))

# # Save the result to a CSV file
# flattened_triplets.write.csv("output/top_3_triplets.csv", header=True)


In [30]:
# Split the array into separate columns
# split_triplets = top_3_triplets.selectExpr(
#     "triplet[0] as product1", 
#     "triplet[1] as product2", 
#     "triplet[2] as product3", 
#     "count"
# )

# Save to CSV
# split_triplets.write.csv("output/top_3_triplets.csv", header=True)
