# EECS 4415 - Task 2
## Pattern Discovery in Spark

### Setup

Let's set up Spark on your Colab environment.  Run the cell below!

In [2]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

The following additional packages will be installed:
  libxtst6 openjdk-8-jre-headless
Suggested packages:
  openjdk-8-demo openjdk-8-source libnss-mdns fonts-dejavu-extra fonts-nanum fonts-ipafont-gothic
  fonts-ipafont-mincho fonts-wqy-microhei fonts-wqy-zenhei fonts-indic
The following NEW packages will be installed:
  libxtst6 openjdk-8-jdk-headless openjdk-8-jre-headless
0 upgraded, 3 newly installed, 0 to remove and 49 not upgraded.
Need to get 39.6 MB of archives.
After this operation, 144 MB of additional disk space will be used.
Selecting previously unselected package libxtst6:amd64.
(Reading database ... 123622 files and directories currently installed.)
Preparing to unpack .../libxtst6_2%3a1.2.3-1build4_amd64.deb ...
Unpacking libxtst6:amd64 (2:1.2.3-1build4) ...
Selecting previously unselected package openjdk-8-jre-headless:amd64.
Preparing to unpack .../openjdk-8-jre-headless_8u422-b05-1~22.04_amd64.deb ...
Unpacking openjdk-8-jre-headless:amd64 (8u422-b05-1~22.04) ...
Sel

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


Now we authenticate a Google Drive client to download the file we will be processing in our Spark job.

**Make sure to follow the interactive instructions.**

In [4]:
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials

# Authenticate and create the PyDrive client
auth.authenticate_user()
gauth = GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
drive = GoogleDrive(gauth)



In [5]:
id='1dhi1F78ssqR8gE6U-AgB80ZW7V_9snX4'
downloaded = drive.CreateFile({'id': id})
downloaded.GetContentFile('products.csv')

id='1KZBNEaIyMTcsRV817us6uLZgm-Mii8oU'
downloaded = drive.CreateFile({'id': id})
downloaded.GetContentFile('order_products__train.csv')

If you executed the cells above, you should be able to see the dataset we will need for this Colab under the "Files" tab on the left panel.

In [6]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

import pyspark
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf

Let's initialize the Spark context.

In [7]:
# create the session
conf = SparkConf().set("spark.ui.port", "4050")

# create the context
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

### Your task

If you run successfully the setup stage, you are ready to work with the **3 Million Instacart Orders** dataset. In case you want to read more about it, check the [official Instacart blog post](https://tech.instacart.com/3-million-instacart-orders-open-sourced-d40d29ead6f2) about it, a concise [schema description](https://gist.github.com/jeremystan/c3b39d947d9b88b3ccff3147dbcf6c6b) of the dataset, and the [download page](https://www.instacart.com/datasets/grocery-shopping-2017).

In this Colab, we will be working with a subset training dataset (~131K orders) to perform fast and scalable Frequent Pattern Mining.

In [8]:
products = spark.read.csv('products.csv', header=True, inferSchema=True)
orders = spark.read.csv('order_products__train.csv', header=True, inferSchema=True)

In [9]:
products.printSchema()

root
 |-- product_id: integer (nullable = true)
 |-- product_name: string (nullable = true)
 |-- aisle_id: string (nullable = true)
 |-- department_id: string (nullable = true)



In [10]:
orders.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- add_to_cart_order: integer (nullable = true)
 |-- reordered: integer (nullable = true)



Use the Spark Dataframe API to join 'products' and 'orders', so that you will be able to see the product names in each transaction (and not only their ids).  Then, group by the orders by 'order_id' to obtain one row per basket (i.e., set of products purchased together by one customer). Display the top 20 rows.

In [11]:
''' 2-3 lines of code expected '''
# YOUR CODE HERE

# Join 'orders' with 'products' on 'product_id' to get 'product_name'
order_products = orders.join(products, on='product_id', how='inner')

# Group by 'order_id' and collect product names into a list to form baskets
baskets = order_products.groupBy('order_id').agg(collect_list('product_name').alias('items'))

# Display the top 20 baskets
baskets.show(20, truncate=False)

+--------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|order_id|items                                                        

In this Colab we will first explore [MLlib](https://spark.apache.org/mllib/), Apache Spark's scalable machine learning library. Specifically, you should use its implementation of the [FP-Growth](https://spark.apache.org/docs/latest/ml-frequent-pattern-mining.html#fp-growth) algorithm to perform fast Frequent Pattern Mining in Spark.
Use the Python example in the documentation, and train a model with

```minSupport=0.01``` and ```minConfidence=0.5```

In [12]:
''' 3 lines of code expected '''
# YOUR CODE HERE

from pyspark.ml.fpm import FPGrowth

# Initialize FP-Growth model
fpGrowth = FPGrowth(itemsCol='items', minSupport=0.01, minConfidence=0.5)

# Train the model on the baskets DataFrame
model = fpGrowth.fit(baskets)

Compute and print how many frequent itemsets and association rules were generated by running FP-growth alongside visualizing top frequent itemsets and association rules. Show top 20 rows.


In [13]:
''' 5 lines of code in total expected but can differ based on your style. for sub-parts of the question, creating different cells of code would be recommended.'''
# YOUR CODE HERE

# Extract frequent itemsets and association rules
freq_itemsets = model.freqItemsets
association_rules = model.associationRules

# Compute counts
num_freq_itemsets = freq_itemsets.count()
num_association_rules = association_rules.count()

# Print counts
print(f"Number of frequent itemsets: {num_freq_itemsets}")
print(f"Number of association rules: {num_association_rules}")

# Display top 20 frequent itemsets
freq_itemsets.orderBy(desc('freq')).show(20, truncate=False)

# Display top 20 association rules
association_rules.orderBy(desc('confidence')).show(20, truncate=False)

Number of frequent itemsets: 120
Number of association rules: 0
+------------------------+-----+
|items                   |freq |
+------------------------+-----+
|[Banana]                |18726|
|[Bag of Organic Bananas]|15480|
|[Organic Strawberries]  |10894|
|[Organic Baby Spinach]  |9784 |
|[Large Lemon]           |8135 |
|[Organic Avocado]       |7409 |
|[Organic Hass Avocado]  |7293 |
|[Strawberries]          |6494 |
|[Limes]                 |6033 |
|[Organic Raspberries]   |5546 |
|[Organic Blueberries]   |4966 |
|[Organic Whole Milk]    |4908 |
|[Organic Cucumber]      |4613 |
|[Organic Zucchini]      |4589 |
|[Organic Yellow Onion]  |4290 |
|[Organic Garlic]        |4158 |
|[Seedless Red Grapes]   |4059 |
|[Asparagus]             |3868 |
|[Organic Grape Tomatoes]|3823 |
|[Organic Red Onion]     |3818 |
+------------------------+-----+
only showing top 20 rows

+----------+----------+----------+----+-------+
|antecedent|consequent|confidence|lift|support|
+----------+----------

Now retrain the FP-growth model changing only
```minsupport=0.001```.
Compute and print how many frequent itemsets and association rules were generated. Show top 20 rows for both.


In [14]:
''' 6 lines of code in total expected but can differ based on your style. for sub-parts of the question, creating different cells of code would be recommended.'''
# YOUR CODE HERE

# Initialize FP-Growth model with new minSupport
fpGrowth2 = FPGrowth(itemsCol='items', minSupport=0.001, minConfidence=0.5)

# Train the model on the baskets DataFrame
model2 = fpGrowth2.fit(baskets)

# Extract frequent itemsets and association rules
freq_itemsets2 = model2.freqItemsets
association_rules2 = model2.associationRules

# Compute counts
num_freq_itemsets2 = freq_itemsets2.count()
num_association_rules2 = association_rules2.count()

# Print counts
print(f"Number of frequent itemsets with minSupport=0.001: {num_freq_itemsets2}")
print(f"Number of association rules with minSupport=0.001: {num_association_rules2}")

# Display top 20 frequent itemsets
freq_itemsets2.orderBy(desc('freq')).show(20, truncate=False)

# Display top 20 association rules
association_rules2.orderBy(desc('confidence')).show(20, truncate=False)

Number of frequent itemsets with minSupport=0.001: 4444
Number of association rules with minSupport=0.001: 11
+------------------------+-----+
|items                   |freq |
+------------------------+-----+
|[Banana]                |18726|
|[Bag of Organic Bananas]|15480|
|[Organic Strawberries]  |10894|
|[Organic Baby Spinach]  |9784 |
|[Large Lemon]           |8135 |
|[Organic Avocado]       |7409 |
|[Organic Hass Avocado]  |7293 |
|[Strawberries]          |6494 |
|[Limes]                 |6033 |
|[Organic Raspberries]   |5546 |
|[Organic Blueberries]   |4966 |
|[Organic Whole Milk]    |4908 |
|[Organic Cucumber]      |4613 |
|[Organic Zucchini]      |4589 |
|[Organic Yellow Onion]  |4290 |
|[Organic Garlic]        |4158 |
|[Seedless Red Grapes]   |4059 |
|[Asparagus]             |3868 |
|[Organic Grape Tomatoes]|3823 |
|[Organic Red Onion]     |3818 |
+------------------------+-----+
only showing top 20 rows

+-----------------------------------------------------------------+-----

We ask you to inspect the resulting dataframes, and report a few results. Sort frequent items in descending order by frequency and display 50 rows.

In [15]:
# YOUR CODE HERE

# Filter itemsets of size 1 (single items)
single_itemsets = freq_itemsets2.filter(size('items') == 1)

# Sort by descending frequency and display top 50
single_itemsets.orderBy(desc('freq')).show(50, truncate=False)

+--------------------------------------+-----+
|items                                 |freq |
+--------------------------------------+-----+
|[Banana]                              |18726|
|[Bag of Organic Bananas]              |15480|
|[Organic Strawberries]                |10894|
|[Organic Baby Spinach]                |9784 |
|[Large Lemon]                         |8135 |
|[Organic Avocado]                     |7409 |
|[Organic Hass Avocado]                |7293 |
|[Strawberries]                        |6494 |
|[Limes]                               |6033 |
|[Organic Raspberries]                 |5546 |
|[Organic Blueberries]                 |4966 |
|[Organic Whole Milk]                  |4908 |
|[Organic Cucumber]                    |4613 |
|[Organic Zucchini]                    |4589 |
|[Organic Yellow Onion]                |4290 |
|[Organic Garlic]                      |4158 |
|[Seedless Red Grapes]                 |4059 |
|[Asparagus]                           |3868 |
|[Organic Gra

Also, sort assocation rules in descending order by confidence and display 20 rows.

In [16]:
# YOUR CODE HERE

# Sort association rules by descending confidence and display top 20
association_rules2.orderBy(desc('confidence')).show(20, truncate=False)

+-----------------------------------------------------------------+------------------------+------------------+------------------+---------------------+
|antecedent                                                       |consequent              |confidence        |lift              |support              |
+-----------------------------------------------------------------+------------------------+------------------+------------------+---------------------+
|[Organic Raspberries, Organic Hass Avocado, Organic Strawberries]|[Bag of Organic Bananas]|0.5984251968503937|5.072272070642333 |0.0017376856770495927|
|[Organic Cucumber, Organic Hass Avocado, Organic Strawberries]   |[Bag of Organic Bananas]|0.546875          |4.635330870478036 |0.0010669999771357147|
|[Organic Kiwi, Organic Hass Avocado]                             |[Bag of Organic Bananas]|0.5459770114942529|4.627719489738336 |0.001448071397541327 |
|[Organic Navel Orange, Organic Raspberries]                      |[Bag of Organic

What are your observations by varying support parameters? Answer in 2 sentences.

By decreasing the minSupport from 0.01 to 0.001, the number of frequent itemsets and association rules increased significantly. This is because a lower support threshold allows less frequent itemsets to be considered frequent, resulting in more patterns being detected.

Your next task is to develop the **SON (Map Reduce) algorithm** to compute frequent itemsets by dividing the data into 10 chunks. Assume the *global* support to be:
```minsupport=0.001```.

Hint:
1. To simulate the SON (Map Reduce) algorithm for frequent itemset mining in Apache Spark, even if the input file is located on a single node, you can divide the file into smaller chunks (partitions) and process each partition as though it were distributed across multiple nodes. Spark will treat these partitions as separate subsets of the data, effectively mimicking how the SON algorithm would work in a true distributed environment.

2. The repartition function redistributes the data into the desired number of partitions (use 10 in this case). Example:

    ```data_partitioned = data.repartition(10)```

The .repartition(10) method repartitions the DataFrame into 10 partitions, but it does not change the underlying structure of data. It remains a Spark DataFrame, just with a different partitioning scheme.

3. Apply FP-Growth to partitions without converting to RDD to generate candidates. Get the DataFrame by iterating through paritions by using e.g., the filter() function or foreachPartition(). If you want to apply FP-Growth to each partition while keeping everything within the DataFrame API, you must avoid using RDD-based operations like .rdd.mapPartitions().

4. Even with lazy evaluations: transformations such as filter() in the logical plan by iterating through partitions can lead to distributed execution. So, even if we do not use actions like foreachPartition(), which forces distributed execution without logical plan optimization, this workflow could still benefit from distributed execution. Since we are using a single node and simulating SON MapReduce, the distinction between filter() and foreachPartition() would not make a difference. Therefore, we recommend using filter(), as it leads to a more straightforward implementation, but we would accept either solution with filter() or foreachPartition(). However, in the general case note that if one would like to explicitly process each partition in parallel across the cluster, use foreachPartition().



Perform map and reduce operations using Spark transformations and actions.

 In Phase 1, find candidate itemsets.  

In [22]:
# YOUR CODE HERE (map and reduce operations)

from pyspark.ml.fpm import FPGrowth

# Repartition the baskets DataFrame into 10 partitions
baskets_partitioned = baskets.repartition(10)

# Add a partition ID column using spark_partition_id()
from pyspark.sql.functions import spark_partition_id

baskets_partitioned = baskets_partitioned.withColumn("partition_id", spark_partition_id())

# Get the list of partition IDs
partition_ids = baskets_partitioned.select('partition_id').distinct().collect()
partition_ids = [row['partition_id'] for row in partition_ids]

# Initialize an empty list to collect candidate itemsets from all partitions
candidate_itemsets_list = []

# Iterate over each partition ID
for pid in partition_ids:
    # Filter the data for the current partition
    partition_data = baskets_partitioned.filter(col('partition_id') == pid).select('items')

    # Apply FP-Growth to the partition
    fpGrowth = FPGrowth(itemsCol='items', minSupport=0.001, minConfidence=0.5)
    model = fpGrowth.fit(partition_data)

    # Collect the frequent itemsets from the partition
    freq_itemsets = model.freqItemsets
    candidate_itemsets_list.append(freq_itemsets)

# Combine candidate itemsets from all partitions
from functools import reduce

# Union all candidate itemsets DataFrames and remove duplicates
candidate_itemsets = reduce(DataFrame.unionAll, candidate_itemsets_list).dropDuplicates(['items'])

# Compute and print the number of candidate itemsets after Phase 1
num_candidate_itemsets = candidate_itemsets.count()
print(f"Number of candidate itemsets after Phase 1: {num_candidate_itemsets}")

Number of candidate itemsets after Phase 1: 8421


Can Phase 1 cause false negatives? Provide an answer including justification in one sentence.

Yes, Phase 1 can cause false negatives because itemsets that are globally frequent may not appear frequently within any single partition and thus may not be identified as candidates.

In Phase 2, find true frequent itemsets.

In [23]:
# YOUR CODE HERE (map and reduce operations)

from pyspark.sql.functions import col, udf, explode
from pyspark.sql.types import ArrayType, StringType

# Define the global minimum support
minsupport = 0.001

# Total number of baskets
total_baskets = baskets.count()

# Calculate the minimum support count
min_support_count = minsupport * total_baskets

# Collect candidate itemsets into a list
candidate_itemsets_list = candidate_itemsets.select('items').collect()
candidate_itemsets_list = [row['items'] for row in candidate_itemsets_list]

# Broadcast the candidate itemsets
candidate_itemsets_broadcast = sc.broadcast(candidate_itemsets_list)

# Define a UDF to find candidate itemsets in each basket
def find_candidate_itemsets(basket_items):
    basket_set = set(basket_items)
    candidates = []
    for itemset in candidate_itemsets_broadcast.value:
        if set(itemset).issubset(basket_set):
            candidates.append(itemset)
    return candidates

find_candidates_udf = udf(find_candidate_itemsets, ArrayType(ArrayType(StringType())))

# Apply the UDF to the baskets DataFrame
baskets_with_candidates = baskets.withColumn('candidate_itemsets', find_candidates_udf(col('items')))

# Explode the candidate itemsets to get one itemset per row
exploded_candidates = baskets_with_candidates.select(explode('candidate_itemsets').alias('items'))

# Group by the itemsets and count occurrences
frequent_itemsets = exploded_candidates.groupBy('items').count().withColumnRenamed('count', 'freq')

# Filter itemsets that meet the global support threshold
frequent_itemsets = frequent_itemsets.filter(col('freq') >= min_support_count)

What is the benefit of Phase 2? Provide an answer in one sentence.

Phase 2 verifies the candidate itemsets against the entire dataset to eliminate false positives, ensuring that only truly frequent itemsets are identified

Compute and print how many frequent itemsets were generated.

In [24]:
# YOUR CODE HERE

# Compute and print the number of frequent itemsets
num_frequent_itemsets = frequent_itemsets.count()
print(f"Number of frequent itemsets: {num_frequent_itemsets}")

Number of frequent itemsets: 4557


Sort frequent items in descending order by frequency and display 50 rows.

In [25]:
# YOUR CODE HERE

 # Sort frequent itemsets by descending frequency
frequent_itemsets_sorted = frequent_itemsets.orderBy(desc('freq'))

# Display the top 50 frequent itemsets
frequent_itemsets_sorted.show(50, truncate=False)

+----------------------------------------------+-----+
|items                                         |freq |
+----------------------------------------------+-----+
|[Banana]                                      |18726|
|[Bag of Organic Bananas]                      |15480|
|[Organic Strawberries]                        |10894|
|[Organic Baby Spinach]                        |9784 |
|[Large Lemon]                                 |8135 |
|[Organic Avocado]                             |7409 |
|[Organic Hass Avocado]                        |7293 |
|[Strawberries]                                |6494 |
|[Limes]                                       |6033 |
|[Organic Raspberries]                         |5546 |
|[Organic Blueberries]                         |4966 |
|[Organic Whole Milk]                          |4908 |
|[Organic Cucumber]                            |4613 |
|[Organic Zucchini]                            |4589 |
|[Organic Yellow Onion]                        |4290 |
|[Organic 

Write a paragraph of conclusions below summarizing your insights.

By implementing the SON algorithm in Spark, we efficiently mined frequent itemsets in a scalable and parallel manner. Dividing the data into partitions allowed us to process subsets independently in Phase 1, leveraging Spark’s distributed computing to parallelize computations and handle large datasets effectively. Although this phase may miss some globally frequent itemsets (false negatives), Phase 2 addresses this by verifying candidate itemsets against the entire dataset, eliminating false positives and ensuring accuracy. This two-phase approach demonstrates the effectiveness of the SON algorithm for frequent itemset mining on big data, highlighting how parallelism enhances performance and scalability using Spark’s DataFrame API and transformations.

Once you obtained the desired results, **head over to eClass and submit your solution for this Colab**!