In [None]:
from pyspark import SparkContext
from itertools import combinations


## Spark Pipeline for Frequent Itemset Mining and Association Rule Generation

The notebook demonstrates a Spark pipeline for performing frequent itemset mining using the Apriori-like algorithm and generating association rules. The steps include:

1.  **Data Loading and Spark Context Initialization**: Setting up the Spark environment and loading transaction data from a text file.
2.  **Frequent Itemset Generation (L1, L2, L3)**: Identifying frequent individual items (L1), frequent item pairs (L2), and frequent item triples (L3) based on a defined support threshold.
3.  **Association Rule Generation**: Calculating confidence for 2-item and 3-item association rules from the frequent itemsets.

In [None]:
!pip install -U -q PyDrive
!pip uninstall -y PyDrive
!pip install -U PyDrive2
!apt install openjdk-8-jdk-headless -qq
import os

[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/987.4 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m983.0/987.4 kB[0m [31m35.0 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m987.4/987.4 kB[0m [31m20.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for PyDrive (setup.py) ... [?25l[?25hdone
Found existing installation: PyDrive 1.3.1
Uninstalling PyDrive-1.3.1:
  Successfully uninstalled PyDrive-1.3.1
The following additional packages will be installed:
  libxtst6 openjdk-8-jre-headless
Suggested packages:
  openjdk-8-demo openjdk-8-source libnss-mdns fonts-dejavu-extra fonts-nanum
  fonts-ipafont-gothic fonts-ipafont-mincho fonts-wqy-microhei
  fonts-wqy-zenhei fonts-indic
The following NEW packages will be installed:
  libxtst6 openjdk-8-jdk-headless openjdk-8-jre-headless
0 upgraded, 3 newly i

In [None]:

!java -version

openjdk version "17.0.17" 2025-10-21
OpenJDK Runtime Environment (build 17.0.17+10-Ubuntu-122.04)
OpenJDK 64-Bit Server VM (build 17.0.17+10-Ubuntu-122.04, mixed mode, sharing)


In [None]:
from pydrive2.auth import GoogleAuth
from pydrive2.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials


auth.authenticate_user()
gauth = GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
drive = GoogleDrive(gauth)

In [None]:
file_id = '1IW4w8IPu7v_D-2kL3pWJ5NOCw-xbvKRf'


downloaded = drive.CreateFile({'id': file_id})
downloaded.GetContentFile('browsing.txt')

In [None]:
SUPPORT = 100
sc = SparkContext.getOrCreate()


In [None]:
from itertools import combinations


transactions = (
    sc.textFile("browsing.txt")
      .map(lambda line: set(line.strip().split()))
      .cache()
)

transactions.take(3)



[{'ELE17451', 'ELE89019', 'FRO11987', 'GRO99222', 'SNA90258'},
 {'ELE17451',
  'ELE26917',
  'ELE52966',
  'ELE91550',
  'FRO12685',
  'FRO84225',
  'FRO90334',
  'GRO12298',
  'GRO99222',
  'SNA11465',
  'SNA30755',
  'SNA80192'},
 {'DAI22896', 'ELE17451', 'FRO86643', 'GRO73461', 'SNA99873'}]

In [None]:

L1 = (
    transactions
    .flatMap(lambda basket: [(item, 1) for item in basket])
    .reduceByKey(lambda a, b: a + b)
    .filter(lambda x: x[1] >= SUPPORT)
    .cache()
)

L1.take(4)





[('SNA90258', 550), ('FRO11987', 104), ('SNA11465', 142), ('SNA80192', 258)]

In [None]:
L1_items = set(L1.map(lambda x: x[0]).collect())
print("Number of frequent items (L1):", len(L1_items))

Number of frequent items (L1): 647


In [None]:
L1_broadcast = sc.broadcast(L1_items)

In [None]:
L2 = (
    transactions
    .map(lambda basket: sorted([item for item in basket if item in L1_broadcast.value]))
    .flatMap(lambda items: [((i, j), 1) for i, j in combinations(items, 2)])
    .reduceByKey(lambda a, b: a + b)
    .filter(lambda x: x[1] >= SUPPORT)
    .cache()
)

print("Number of frequent pairs (L2):", L2.count())


Number of frequent pairs (L2): 1334


In [None]:


pair_supports = dict(L2.collect())
pair_supports_broadcast = sc.broadcast(pair_supports)
pair_supports

{('ELE17451', 'GRO99222'): 148,
 ('ELE17451', 'SNA30755'): 111,
 ('DAI22896', 'ELE17451'): 193,
 ('ELE17451', 'SNA99873'): 270,
 ('DAI22177', 'ELE17451'): 203,
 ('ELE17451', 'ELE59935'): 181,
 ('DAI22177', 'ELE66810'): 105,
 ('DAI46755', 'FRO81176'): 148,
 ('ELE17451', 'ELE66810'): 154,
 ('ELE17451', 'GRO94758'): 227,
 ('ELE17451', 'SNA55952'): 123,
 ('ELE26917', 'GRO73461'): 255,
 ('GRO36567', 'GRO73461'): 117,
 ('DAI48891', 'GRO36567'): 128,
 ('FRO78087', 'GRO73461'): 192,
 ('ELE17451', 'FRO92261'): 127,
 ('ELE11111', 'ELE17451'): 121,
 ('DAI95741', 'ELE17451'): 102,
 ('DAI22896', 'GRO30386'): 102,
 ('ELE17451', 'GRO30386'): 468,
 ('FRO16142', 'GRO73461'): 197,
 ('FRO24098', 'GRO73461'): 112,
 ('DAI35347', 'ELE26917'): 111,
 ('DAI22896', 'FRO31317'): 167,
 ('DAI22896', 'SNA72163'): 227,
 ('DAI55911', 'ELE26917'): 113,
 ('DAI55911', 'GRO73461'): 116,
 ('ELE17451', 'FRO31317'): 359,
 ('ELE17451', 'SNA59903'): 351,
 ('ELE17451', 'SNA72163'): 272,
 ('SNA59903', 'SNA72163'): 310,
 ('DAI22

In [None]:
L3 = (
    transactions
    .map(lambda basket: sorted([item for item in basket if item in L1_broadcast.value]))
    .flatMap(lambda items: [((i, j, k), 1) for i, j, k in combinations(items, 3)])
    .reduceByKey(lambda a, b: a + b)
    .filter(lambda x: x[1] >= SUPPORT)
)

print("Number of frequent triples (L3):", L3.count())

Number of frequent triples (L3): 233


In [None]:



item_supports = dict(L1.collect())
pair_supports = dict(L2.collect())

pair_rules = []

for (x, y), supp_xy in pair_supports.items():
    conf_x_y = supp_xy / item_supports[x]
    conf_y_x = supp_xy / item_supports[y]

    pair_rules.append((x, y, conf_x_y))
    pair_rules.append((y, x, conf_y_x))


pair_rules_sorted = sorted(
    pair_rules,
    key=lambda r: (-r[2], r[0])
)

print("\nTop 5 rules for 2(d):")
for r in pair_rules_sorted[:5]:
    print(f"{r[0]} -> {r[1]} : {r[2]:.4f}")



Top 5 rules for 2(d):
DAI93865 -> FRO40251 : 1.0000
GRO85051 -> FRO40251 : 0.9992
GRO38636 -> FRO40251 : 0.9907
ELE12951 -> FRO40251 : 0.9906
DAI88079 -> FRO40251 : 0.9867


In [None]:


triple_supports = dict(L3.collect())

triple_rules = []

for (x, y, z), supp_xyz in triple_supports.items():
    supp_xy = pair_supports[(x, y)]
    supp_xz = pair_supports[(x, z)]
    supp_yz = pair_supports[(y, z)]

    triple_rules.append(((x, y), z, supp_xyz / supp_xy))
    triple_rules.append(((x, z), y, supp_xyz / supp_xz))
    triple_rules.append(((y, z), x, supp_xyz / supp_yz))


triple_rules_sorted = sorted(
    triple_rules,
    key=lambda r: (-r[2], r[0][0], r[0][1], r[1])
)

print("\nTop 5 rules for 2(e):")
for r in triple_rules_sorted[:5]:
    lhs = ",".join(r[0])
    print(f"{lhs} -> {r[1]} : {r[2]:.4f}")



Top 5 rules for 2(e):
DAI23334,ELE92920 -> DAI62779 : 1.0000
DAI31081,GRO85051 -> FRO40251 : 1.0000
DAI55911,GRO85051 -> FRO40251 : 1.0000
DAI62779,DAI88079 -> FRO40251 : 1.0000
DAI75645,GRO85051 -> FRO40251 : 1.0000
