# Notebook Objective & Process Overview

This notebook walks you through an end-to-end association-rule mining pipeline using Spark’s FP-Growth algorithm on retail transaction data. Its primary goals are to:

- **Identify popular products** by review volume  
- **Discover frequent itemsets** and high-confidence cross-sell opportunities  
- **Visualize and interpret** the results with tables, heatmaps, Sankey diagrams, and network graphs  
- **Empower you** to tweak key parameters (e.g. top‐N products, support, confidence, number of rules) and immediately see how your choices affect the insights generated  

---

## Step-by-Step Process

1. **Spark Setup & Data Load**  
   - Initialize a Spark session with adequate executor memory  
   - Read in the master transaction DataFrame from Parquet and cache it  

2. **Define User-Tunable Parameters**  
   - `TOP_N_PRODUCTS`: how many of the most reviewed products to include  
   - `MIN_SUPPORT`: minimum relative support for frequent‐itemset mining  
   - `MIN_CONFIDENCE`: minimum confidence for association‐rule generation  
   - `INITIAL_TOP_N`: how many top itemsets/rules to display  

3. **Filter Data to Focus Scope**  
   - Group by product title to select the top-N reviewed items  
   - Identify “active” users (those who bought at least 2 of these top products)  

4. **Construct User Baskets**  
   - Aggregate each active user’s purchases into an array of item titles  
   - Cache the resulting `baskets` DataFrame and count total baskets  

5. **Run FP-Growth Model**  
   - Fit Spark’s `FPGrowth` on the baskets with your `MIN_SUPPORT` and `MIN_CONFIDENCE`  
   - Generate `freqItemsets` and `associationRules` DataFrames  

6. **Extract & Clean Results**  
   - Pull results into Pandas for easier manipulation  
   - Compute absolute and relative support, flatten arrays into human-readable strings, and sort  

7. **Explore & Visualize**  
   - View the top frequent itemsets and association rules in tables  
   - Create lift heatmaps, Sankey diagrams, and network graphs to highlight strong bundles  

8. **Interactive Parameter Tuning**  
   - Adjust `TOP_N_PRODUCTS`, `MIN_SUPPORT`, `MIN_CONFIDENCE`, or `INITIAL_TOP_N` at the top  
   - Rerun key cells to instantly see how your parameter changes shift the patterns and insights  

---

By following this process, you can not only reproduce a standard Apriori analysis but also experiment dynamically—testing different thresholds, zooming in on niche segments, and uncovering actionable cross-sell strategies in your own data.  


## 1. **Spark Setup & Data Load**  

In [None]:
# ─── Spark & DataFrame Setup ─────────────────────────────────────────────────
from pyspark.ml.fpm import FPGrowth                               # Frequent pattern mining
from pyspark.sql import DataFrame, Row, SparkSession
from pyspark.sql import functions as F                            # Alias for built-in SQL funcs
from pyspark.sql.functions import (
    array_distinct, collect_list, collect_set, col, count, desc,
    explode, from_unixtime, lit, size, to_date, year
)
from pyspark.sql.types import (
    ArrayType, DoubleType, FloatType, IntegerType, MapType, StringType,
    StructField, StructType, TimestampType
)

# ─── Core Python & Data Wrangling ─────────────────────────────────────────────
import itertools                                                   # Combinatorics
import json                                                        # JSON serialization
import re                                                          # Regular expressions
import pandas as pd                                                # Tabular data

# ─── Machine Learning & Graph Analysis ───────────────────────────────────────
import networkx as nx                                              # Graph algorithms

# ─── Visualization ───────────────────────────────────────────────────────────
import matplotlib.pyplot as plt                                     # Static plots
from matplotlib.ticker import FuncFormatter                         # Axis formatting
import seaborn as sns                                              # Statistical plotting
import plotly.express as px                                        # Quick interactive plots
import plotly.graph_objects as go                                  # Detailed interactive figures
from bokeh.plotting import figure, output_notebook, show            # Interactive, web-style plots
from bokeh.models import ColumnDataSource, HoverTool, LinearColorMapper, ColorBar
from bokeh.palettes import Category10, Viridis256

# ─── Interactive Widgets & Display ──────────────────────────────────────────
import ipywidgets as widgets                                       # Jupyter widgets
from IPython.display import display, HTML, Markdown, clear_output    # Rich display

# ─── NLP Utilities ─────────────────────────────────────────────────────────
import nltk                                                        # Core NLP toolkit
from nltk.corpus import stopwords                                  # Stopword lists
from nltk.stem import WordNetLemmatizer                            # Lemmatization
from wordcloud import WordCloud                                    # Text frequency clouds

# ─── Miscellaneous ─────────────────────────────────────────────────────────
import tqdm                                                        # Progress bars

# ─── Jupyter & Setup ────────────────────────────────────────────────────────
%matplotlib inline                                                 # Inline Matplotlib plots
output_notebook()                                                  # Load BokehJS in notebook

# ─── NLTK Data Downloads ───────────────────────────────────────────────────
nltk.download('stopwords')
nltk.download('wordnet')

UsageError: unrecognized arguments: # Inline Matplotlib plots


In [None]:
# Path for our master file
path  = "gs://ba843-group1-project/master_data.parquet"

In [None]:
#Creating the spark session
spark = SparkSession.builder \
    .appName("Top1000BasketAnalysis") \
    .config("spark.executor.memory", "6g") \
    .config("spark.executor.memoryOverhead", "2g") \
    .config("spark.ui.showConsoleProgress", "true") \
        .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/28 17:54:46 INFO SparkEnv: Registering MapOutputTracker
25/04/28 17:54:46 INFO SparkEnv: Registering BlockManagerMaster
25/04/28 17:54:46 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
25/04/28 17:54:46 INFO SparkEnv: Registering OutputCommitCoordinator


In [None]:
master_df = spark.read.parquet(path)

                                                                                

In [None]:
#Printing the schema
master_df.printSchema()

root
 |-- parent_asin: string (nullable = true)
 |-- asin: string (nullable = true)
 |-- helpful_vote: long (nullable = true)
 |-- images: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- attachment_type: string (nullable = true)
 |    |    |-- large_image_url: string (nullable = true)
 |    |    |-- medium_image_url: string (nullable = true)
 |    |    |-- small_image_url: string (nullable = true)
 |-- rating: double (nullable = true)
 |-- text: string (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- title: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- verified_purchase: boolean (nullable = true)
 |-- main_category: string (nullable = true)
 |-- average_rating: double (nullable = true)
 |-- rating_number: double (nullable = true)
 |-- features: string (nullable = true)
 |-- description: string (nullable = true)
 |-- price: double (nullable = true)
 |-- videos: string (nullable = true)
 |-- store: string (nullabl

In [None]:
master_df.cache()

DataFrame[parent_asin: string, asin: string, helpful_vote: bigint, images: array<struct<attachment_type:string,large_image_url:string,medium_image_url:string,small_image_url:string>>, rating: double, text: string, timestamp: bigint, title: string, user_id: string, verified_purchase: boolean, main_category: string, average_rating: double, rating_number: double, features: string, description: string, price: double, videos: string, store: string, categories: string, bought_together: string, Product_title: string]

----

## 2. **Define User-Tunable Parameters**  

In [None]:
# ─────────────────────────────────────────────────────────────────────────────
# 1) USER‐TUNABLE PARAMETERS
# ─────────────────────────────────────────────────────────────────────────────
TOP_N_PRODUCTS   = 10000   # keep only the top‐N reviewed products
MIN_SUPPORT      = 1e-4    # relative support threshold for FP-Growth
MIN_CONFIDENCE   = 0.01     # confidence threshold for FP-Growth
INITIAL_TOP_N    = 20      # how many itemsets/rules to show
# ─────────────────────────────────────────────────────────────────────────────

## 3. **Filter Data to Focus Scope**

In [None]:
# ─────────────────────────────────────────────────────────────────────────────
# 2) Restrict to top‐N products & active users
# ─────────────────────────────────────────────────────────────────────────────
top_prods = (
    master_df
      .groupBy("Product_title")
      .count()
      .orderBy(F.desc("count"))
      .limit(TOP_N_PRODUCTS)
      .select("Product_title")
)

In [None]:
df_small = master_df.join(top_prods, on="Product_title", how="inner")

active_users = (
    df_small
      .groupBy("user_id")
      .count()
      .filter("count >= 2")
      .select("user_id")
)

df_active = df_small.join(active_users, on="user_id", how="inner")

## 4. **Construct User Baskets**

In [None]:
# ─────────────────────────────────────────────────────────────────────────────
# 3) Build & cache baskets
# ─────────────────────────────────────────────────────────────────────────────
baskets = (
    df_active
      .groupBy("user_id")
      .agg(F.collect_set("Product_title").alias("items"))
)
baskets.cache()
total_baskets = baskets.count()

                                                                                

## 5. **Run FP-Growth Model and Extract & Clean Results**

In [None]:
# ─────────────────────────────────────────────────────────────────────────────
# 4) Run FP‐Growth
# ─────────────────────────────────────────────────────────────────────────────
model = FPGrowth(
    itemsCol="items",
    minSupport=MIN_SUPPORT,
    minConfidence=MIN_CONFIDENCE
).fit(baskets)

# ─────────────────────────────────────────────────────────────────────────────
# 5) Pull into Pandas & clean up
# ─────────────────────────────────────────────────────────────────────────────
# 5a) Frequent itemsets
freq_df = (
    model
      .freqItemsets
      .toPandas()
      .rename(columns={"freq":"count"})
)
freq_df["support"] = freq_df["count"] / total_baskets
freq_df["itemset_str"] = freq_df["items"].apply(lambda x: " || ".join(x))
freq_df = freq_df.sort_values("support", ascending=False).reset_index(drop=True)

# 5b) Association rules
rules = model.associationRules.toPandas()

# keep both list‐forms and string‐forms
rules["antecedent_list"] = rules["antecedent"]
rules["consequent_list"] = rules["consequent"]
rules["antecedent_str"]  = rules["antecedent_list"].apply(lambda x: " || ".join(x))
rules["consequent_str"]  = rules["consequent_list"].apply(lambda x: " || ".join(x))
rules = rules.sort_values("confidence", ascending=False).reset_index(drop=True)

25/04/28 18:06:58 WARN FPGrowth: Input data is not cached.                      
25/04/28 18:07:10 WARN DAGScheduler: Broadcasting large task binary with size 1000.2 KiB
                                                                                

## 6. **Explore & Visualize**

In [None]:
# ─────────────────────────────────────────────────────────────────────────────
# 6) Plotting helper functions
# ─────────────────────────────────────────────────────────────────────────────
def embed(fig):
    fig.update_layout(
        width=1000, height=600,
        margin=dict(l=50, r=50, t=50, b=50)
    )
    html = fig.to_html(include_plotlyjs="cdn")
    display(HTML(html))

def show_table(freq_df, top_n):
    df = freq_df.head(top_n)[["itemset_str","support","count"]]
    display(df.rename(columns={"itemset_str":"Itemset"})
               .style.format({"support":"{:.4f}", "count":"{:,}"}))

def plot_network(rules_df, top_n):
    top = rules_df.head(top_n)
    G = nx.DiGraph()
    for _, row in top.iterrows():
        for a in row.antecedent_list:
            for c in row.consequent_list:
                G.add_edge(a, c, weight=row.lift)
    pos = nx.spring_layout(G, k=0.5, seed=42)
    # edges
    edge_x, edge_y = [], []
    for u,v in G.edges():
        x0,y0 = pos[u]; x1,y1 = pos[v]
        edge_x += [x0,x1,None]; edge_y += [y0,y1,None]
    edge_trace = go.Scatter(
        x=edge_x, y=edge_y, mode="lines",
        line=dict(width=1, color="gray"), hoverinfo="none"
    )
    # nodes
    node_x, node_y, node_text = [], [], []
    for n in G.nodes():
        x,y = pos[n]
        node_x.append(x); node_y.append(y); node_text.append(n)
    node_trace = go.Scatter(
        x=node_x, y=node_y, mode="markers+text",
        marker=dict(size=12, color="skyblue"),
        text=node_text, textposition="top center",
        hoverinfo="text"
    )
    fig = go.Figure([edge_trace, node_trace])
    fig.update_layout(title="Network of Top Rules", showlegend=False)
    embed(fig)

def plot_sankey(rules_df, top_n):
    top = rules_df.head(top_n)
    # build label list
    labels = []
    for lst in top.antecedent_list:
        labels += lst
    for lst in top.consequent_list:
        labels += lst
    labels = list(dict.fromkeys(labels))
    idx = {l:i for i,l in enumerate(labels)}

    sources, targets, values = [], [], []
    for _,row in top.iterrows():
        for a in row.antecedent_list:
            for c in row.consequent_list:
                sources.append(idx[a])
                targets.append(idx[c])
                values.append(row.confidence)

    sankey = go.Sankey(
        node=dict(label=labels, pad=15, thickness=15),
        link=dict(source=sources, target=targets, value=values)
    )
    fig = go.Figure(sankey)
    fig.update_layout(title="Sankey Diagram of Top Rules")
    embed(fig)

from IPython.display import HTML

def plot_heatmap(rules_df, min_lift):
    # pivot the lifts into a matrix
    dfh = (
        rules_df
        .query("lift >= @min_lift")
        .pivot(index="antecedent_str", columns="consequent_str", values="lift")
        .fillna(0)
    )

    # build the heatmap
    fig = px.imshow(
        dfh.values,
        x=dfh.columns,
        y=dfh.index,
        labels=dict(color="Lift"),    # only colorbar is labeled
        title=f"Lift Heatmap (lift ≥ {min_lift:.2f})"
    )

    # remove all axis visuals
    fig.update_xaxes(showgrid=False, zeroline=False, showticklabels=False)
    fig.update_yaxes(showgrid=False, zeroline=False, showticklabels=False)

    # tighten up the layout
    fig.update_layout(
        width=800, height=600,
        margin=dict(l=20, r=20, t=50, b=20)
    )

    # render inline via CDN
    return HTML(fig.to_html(include_plotlyjs="cdn"))


def show_story(rules_df, top_n):
    top = rules_df.head(top_n)
    lines = [
        f"• **{r.antecedent_str}** → **{r.consequent_str}**  ⎮ conf {r.confidence:.2f}, lift {r.lift:.2f}"
        for _,r in top.iterrows()
    ]
    display(Markdown("## Story: Top Association Rules\n" + "\n".join(lines)))

## 7. **Interactive Parameter Tuning**

In [None]:
# ─────────────────────────────────────────────────────────────────────────────
# 7) Build the interactive dashboard
# ─────────────────────────────────────────────────────────────────────────────
support_slider    = widgets.FloatSlider(
    value=MIN_SUPPORT, min=1e-5, max=1e-2, step=1e-5,
    description="Min Support", readout_format=".5f"
)
confidence_slider = widgets.FloatSlider(
    value=MIN_CONFIDENCE, min=0.0, max=1.0, step=0.01,
    description="Min Confidence"
)
lift_slider       = widgets.FloatSlider(
    value=1.0, min=0.0, max=5.0, step=0.1,
    description="Min Lift"
)
topn_slider       = widgets.IntSlider(
    value=INITIAL_TOP_N, min=5, max=100, step=5,
    description="Top N"
)

out1 = widgets.Output()  # Table
out2 = widgets.Output()  # Network
out3 = widgets.Output()  # Sankey
out4 = widgets.Output()  # Heatmap
out5 = widgets.Output()  # Story

tabs = widgets.Tab([out1,out2,out3,out4,out5])
for i,title in enumerate(["Table","Network","Sankey","Heatmap","Story"]):
    tabs.set_title(i, title)

def update_dashboard(_):
    for o in (out1,out2,out3,out4,out5):
        o.clear_output(wait=True)

    ms = support_slider.value
    mc = confidence_slider.value
    ml = lift_slider.value
    tn = topn_slider.value

    # filtered Pandas slices
    ff = freq_df.query("support >= @ms").nlargest(tn, "support")
    rr = rules.query("confidence >= @mc & lift >= @ml").nlargest(tn, "confidence")

    with out1:
        show_table(ff, tn)
    with out2:
        plot_network(rr, tn)
    with out3:
        plot_sankey(rr, tn)
    with out4:
        display(plot_heatmap(rr, ml))
    with out5:
        show_story(rr, tn)

for w in (support_slider, confidence_slider, lift_slider, topn_slider):
    w.observe(update_dashboard, names="value")

dashboard = widgets.VBox([
    widgets.HBox([support_slider, confidence_slider, lift_slider, topn_slider]),
    tabs
])
display(dashboard)
update_dashboard(None)

VBox(children=(HBox(children=(FloatSlider(value=0.0001, description='Min Support', max=0.01, min=1e-05, readou…

In [None]:
import os
os.listdir('/Images/Apriori/')

['Network_Top_Rules.png',
 'Story.png',
 'Lift_Heatmap.png',
 'Table.png',
 'Sankey.png']

# Apriori Association Rules Visualizations Sample

## Story: Top Association Rules
![Story: Top Association Rules](https://raw.githubusercontent.com/billburr958/images-temp/main/Apriori/Story.png)

---

## Table: Top 20 Frequent Items
![Table: Top 20 Frequent Items](https://raw.githubusercontent.com/billburr958/images-temp/main/Apriori/Table.png)

---

## Lift Heatmap (lift ≥ 1.00)
![Lift Heatmap (lift ≥ 1.00)](https://raw.githubusercontent.com/billburr958/images-temp/main/Apriori/Lift_Heatmap.png)

---

## Sankey Diagram of Top Rules
![Sankey Diagram of Top Rules](https://raw.githubusercontent.com/billburr958/images-temp/main/Apriori/Sankey.png)

---

## Network of Top Rules
![Network of Top Rules](https://raw.githubusercontent.com/billburr958/images-temp/main/Apriori/Network_Top_Rules.png)


----

# Insights and recommendations


## Top 10 Products by Review Volume  
*(Proxy for popularity & demand)*

1. **Funny Civil Engineers TShirt** – “I’m A Crazy Civil Engineering T-Shirt”  
2. **Hail Satan Unicorn Cat Rainbow Pullover Hoodie**  
3. **Crocs Unisex-Adult Classic Clog**  
4. **Andy Griffith Show “Floyd’s Barber Shop” Premium T-Shirt**  
5. **SATINA High-Waisted Capri & Full-Length Women’s Leggings**  
6. **Dickies Men’s Dri-Tech Moisture Control Crew Socks (Multipack)**  
7. **Levi’s Men’s 505 Regular Fit Jeans**  
8. **Levi’s Men’s 501 Original Fit Jeans**  
9. **Women’s Mesh Slip-On Air Cushion Walking Shoes**  
10. *(10th product in top-10 list)*  

**Insight:** The top-10 is dominated by apparel & footwear (graphic tees, outerwear, leggings, socks, jeans), confirming clothing drives the lion’s share of engagement and reviews.

---

## Most Frequently Purchased Individual Items  
*(Frequent itemset support counts)*

| Item                                                                                      | Purchases |
| ----------------------------------------------------------------------------------------- | ---------: |
| Amazon Essentials Women’s Classic-Fit Polar Fleece Jacket (Plus Sizes)                    |      3,992 |
| Bali Women’s One Smooth-U Underwire T-Shirt Bra (Convertible Straps)                      |      2,915 |
| Carhartt Men’s Loose-Fit Heavyweight Short-Sleeve Pocket Henley T-Shirt                   |      2,420 |
| Clarks Women’s Suede Knitted-Collar Clog Slippers (Faux-Fur Lining)                        |      1,286 |
| BOBS from Skechers Women’s Highlights Flexpadrille Wedge                                  |        893 |

**Insight:** Each of these appears in over 800 baskets—highlighting outerwear, intimate apparel, and footwear as core volume drivers.

---

## Key Cross-Sell & Bundling Opportunities  
*(Top association rules by confidence & lift)*

| Antecedent                                                  | Consequent                                       | Confidence | Lift  |
| ----------------------------------------------------------- | ------------------------------------------------ | ---------: | ----: |
| Kanu Surf Men’s Barracuda Swim Trunks                       | CB Rashguard UPF 50+ Swim Shirt                  |      5.8 % | 77×   |
| Grumpy Old Man “Killing It” Sweatshirt                      | Hail Satan Unicorn Cat Rainbow Hoodie            |      5.1 % | 7.8×  |
| Grumpy Old Man “Killing It” Sweatshirt                      | Funny Civil Engineers TShirt                     |     15.5 % | 3.2×  |

**Insights:**  
- **Swimwear Set**: Trunks → rashguard is extraordinarily strong (lift = 77), showing almost all swim-trunk buyers add the matching rashguard.  
- **Graphic-Apparel Bundles**: Sweatshirt buyers frequently add the unicorn-cat hoodie or civil-engineers tee, underscoring a cohesive “graphic apparel” segment.

---

## Strategic Recommendations

### 1. Bundle & Promote
- Launch **“Swim Essentials”** sets pairing trunks + rashguard  
- Offer **graphic-apparel combos**: sweatshirt + hoodie or sweatshirt + tee  

### 2. Inventory & Merchandising
- Prioritize stock for the **top-5 high-support items** (fleece jackets, bras, tees, clogs)  
- Feature these in **homepage carousels** and **email newsletters**  

### 3. Personalized Cross-Sell
- On each key-item page, dynamically recommend its top complement  
- Use **lift-weighted ranking** so the rashguard appears highest for swim trunks  

### 4. Marketing & Promotions
- Run targeted campaigns around **“Graphic Apparel Week”** and **“Active-Wear Essentials”**  
- Leverage **user-generated review images** for social proof  

----

#### Saving our derived datasets for further analysis if needed.

In [None]:
# ─── 0) Ensure model outputs exist ─────────────────────────────────────────────
# (only if you haven’t already defined them)
freq_itemsets_df = model.freqItemsets       # DataFrame of ["items","freq"]
rules_df         = model.associationRules   # DataFrame of ["antecedent","consequent",…]


# ─── 1) Copy via pandas .to_clipboard() ─────────────────────────────────────
# (works if your notebook server has a clipboard mechanism available)

In [None]:
from pyspark.sql.functions import concat_ws

# ─── 0) Ensure model outputs exist ────────────────────────────────────────────
# (If needed, reassign:)
freq_itemsets_df = model.freqItemsets
rules_df         = model.associationRules

In [None]:
# ─── 1) Flatten the array columns ─────────────────────────────────────────────
flat_freq = freq_itemsets_df.select(
    concat_ws(",", "items").alias("items"),
    "freq"
)

flat_rules = rules_df.select(
    concat_ws(",", "antecedent").alias("antecedent"),
    concat_ws(",", "consequent").alias("consequent"),
    "confidence",
    "lift",
    "support"
)

In [None]:
# ─── 2) Write top_prods (already flat) ───────────────────────────────────────
top_prods \
  .coalesce(1) \
  .write \
  .mode("overwrite") \
  .option("header", "true") \
  .csv("gs://ba843-group1-project/Analysis Files/top_prods_by_count")

                                                                                

In [None]:
# ─── 3) Write flattened frequent itemsets ────────────────────────────────────
flat_freq \
  .coalesce(1) \
  .write \
  .mode("overwrite") \
  .option("header", "true") \
  .csv("gs://ba843-group1-project/Analysis Files/frequent_itemsets")

25/04/28 18:12:36 WARN DAGScheduler: Broadcasting large task binary with size 1052.5 KiB
                                                                                

In [None]:
# ─── 4) Write flattened association rules ────────────────────────────────────
flat_rules \
  .coalesce(1) \
  .write \
  .mode("overwrite") \
  .option("header", "true") \
  .csv("gs://ba843-group1-project/Analysis Files/association_rules")

25/04/28 18:16:07 WARN DAGScheduler: Broadcasting large task binary with size 1193.3 KiB
                                                                                