<a href="https://colab.research.google.com/github/chiaraanni/AMD/blob/main/SON.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### **ALGORITHM FOR MASSIVE DATA**

# **MARKET BASKET ANALYSIS**

**Libraries**

In [None]:
!pip install pyspark
# Let's intialize the spark context and let's parallelize the data
import os
import pyspark as spark
#import pandas as pd
import itertools
from pyspark.sql import SparkSession
from itertools import combinations
import math

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=9d69751eb06967bff0f4281e76299ad228bbe243a491935eafe294eb85afe4a1
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


**Load the file from Kaggle's link**

In [None]:
os.environ['KAGGLE_USERNAME'] = "chiaraanni"
os.environ['KAGGLE_KEY'] = "b5ec3d7be8b44a0db1812ecc93cc48e2"
!kaggle datasets download -d asaniczka/1-3m-linkedin-jobs-and-skills-2024
!unzip /content/1-3m-linkedin-jobs-and-skills-2024.zip > /dev/null

Downloading 1-3m-linkedin-jobs-and-skills-2024.zip to /content
 99% 1.87G/1.88G [00:20<00:00, 99.1MB/s]
100% 1.88G/1.88G [00:20<00:00, 97.4MB/s]


In [None]:
# Load the file into a dataframe and drop the null values
spark = SparkSession.builder.appName("MarketBasket").getOrCreate()

path = '/content/job_skills.csv'

df= spark.read.options(header=True).csv(path).dropna()
df=df.select(df['job_skills']) #select'jobskill
df.count()

1294374

**Sample the dataset**

In [None]:
df=df.sample(False, 0.002)
rdd2=df.rdd
splitted_rdd= rdd2.map(lambda row: row['job_skills'].split(', '))
rdd_dim=splitted_rdd.count()
rdd_dim

In [None]:
rdd_dim

2615

In [None]:
# Decide the number of partition and the supports

num_partitions = splitted_rdd.getNumPartitions()
print("Number of partitions before:", num_partitions)
partitioned_rdd=splitted_rdd.repartition(10)
num_partitions = partitioned_rdd.getNumPartitions()
print("Number of partitions after:", num_partitions)

support=math.trunc(rdd_dim*2/100)
supp_partition=math.trunc(2*support/num_partitions)
print("Support: ", support)
print("Support for each partition", supp_partition)

Number of partitions before: 6
Number of partitions after: 10
Support:  52
Support for each partition 10


# **SON Algorithm**

**Finding the frequent singletons**

In [None]:
# Frequent singletons in all the partitions

def first_pass_APriori_partitioned(partitions, supp):
    basket_counts = {}
    for basket in partitions: # Esegui la logica del primo passaggio APriori per questa partizione
        for item in basket:
            basket_counts[item] = basket_counts.get(item, 0) + 1

    frequent_items = [(item, count) for item, count in basket_counts.items() if count > supp] # Filtra gli elementi con supporto maggiore di `supp`
    sorted_frequent_items = sorted(frequent_items, key=lambda x: x[1], reverse=True) # Ordina gli elementi per supporto decrescente
    return sorted_frequent_items


first_pass_part = splitted_rdd.mapPartitions(lambda partition: first_pass_APriori_partitioned(partition, supp_partition))
first_pass_part.take(15)

[('Communication', 129),
 ('Teamwork', 77),
 ('Leadership', 76),
 ('Customer service', 51),
 ('Customer Service', 47),
 ('Communication skills', 45),
 ('Problem Solving', 44),
 ('Communication Skills', 37),
 ('Nursing', 35),
 ('Problemsolving', 34),
 ('Training', 34),
 ('Project Management', 34),
 ('Collaboration', 33),
 ('Microsoft Office Suite', 32),
 ('Time management', 31)]

In [None]:
frequent_singletons = set(first_pass_part.map(lambda x:x[0]).collect())
print(f"We obtained {len(frequent_singletons)} items")

We obtained 125 items


In [None]:
# Frequent singletons in the entire dataset considered

print("How many are frequent in all the set?")
def first_pass_APriori(rdd_, freq_set, supp, n_toshow):
  first_pass = rdd_.flatMap(lambda basket:[(e,1) for e in basket if e in freq_set]) \
                  .reduceByKey(lambda x,y:x+y) \
                  .filter(lambda x:x[1]>supp) \
                  .sortBy(lambda x: x[1], ascending=False)

  print("Remaining singleton", first_pass.count())
  print(f"First {n_toshow}  singleton", first_pass.take(n_toshow))
  return first_pass

first_pass=first_pass_APriori(splitted_rdd, frequent_singletons, support, 15)

How many are frequent in all the set?
Remaining singleton 70
First 15  singleton [('Communication', 742), ('Teamwork', 423), ('Leadership', 381), ('Customer service', 311), ('Customer Service', 227), ('Communication skills', 223), ('Problem Solving', 201), ('Nursing', 197), ('Sales', 188), ('Collaboration', 180), ('Problemsolving', 175), ('Training', 151), ('Communication Skills', 148), ('Project Management', 147), ('Time Management', 141)]


**Finding the frequent couples**

In [None]:
# Frequent pairs in all the partitions

def second_pass_APriori_partitioned(partitions, supp, frequent_set):
    basket_counts = {}
    for basket in partitions:
      tuple_list=list(combinations(sorted(basket),2))
      for tuple_ in tuple_list:
        if tuple_[0] in frequent_set and tuple_[1] in frequent_set:
          basket_counts[tuple_] = basket_counts.get(tuple_, 0) + 1

    frequent_items = [(item, count) for item, count in basket_counts.items() if count > supp] # Filtra gli elementi con supporto maggiore di `supp`
    sorted_frequent_items = sorted(frequent_items, key=lambda x: x[1], reverse=True) # Ordina gli elementi per supporto decrescente
    return sorted_frequent_items

second_pass_part = splitted_rdd.mapPartitions(lambda partition: second_pass_APriori_partitioned(partition, supp_partition, frequent_singletons))
#print(second_pass_part.count())
print(second_pass_part.take(15))

[(('Communication', 'Teamwork'), 47), (('Communication', 'Leadership'), 41), (('Communication', 'Problemsolving'), 24), (('Communication', 'Problem Solving'), 22), (('Communication', 'Customer Service'), 19), (('Collaboration', 'Communication'), 18), (('Leadership', 'Teamwork'), 18), (('Communication', 'Time management'), 17), (('Communication', 'Customer service'), 17), (('Communication', 'Sales'), 16), (('Communication', 'Training'), 15), (('Problemsolving', 'Teamwork'), 14), (('Customer Service', 'Teamwork'), 14), (('Communication skills', 'Customer service'), 14), (('Communication Skills', 'Problem Solving'), 14)]


In [None]:
frequent_couples = set(second_pass_part.map(lambda x:x[0]).collect())
print(f"We obtained {len(frequent_couples)} couple")

We obtained 111 couple


In [None]:
# Frequent pairs in the entire dataset considered

print("How many are frequent in all the set?")
def second_pass_APriori(rdd_, frequent_couple, supp, n_toshow):
  second_pass = rdd_.flatMap(lambda basket:[(e,1) for e in combinations(sorted(basket),2) if e in frequent_couples]) \
                  .reduceByKey(lambda x,y: x+y) \
                  .filter(lambda x:x[1]>supp)\
                  .sortBy(lambda x: x[1], ascending=False)

  print("Remaining couples", second_pass.count())
  print(f"First {n_toshow}  couples", second_pass.take(n_toshow))
  return second_pass

second_pass=second_pass_APriori(splitted_rdd, frequent_couples, support, 7)

How many are frequent in all the set?
Remaining couples 50
First 7  couples [(('Communication', 'Teamwork'), 275), (('Communication', 'Leadership'), 249), (('Communication', 'Customer service'), 150), (('Communication', 'Problem Solving'), 128), (('Communication', 'Customer Service'), 123), (('Communication', 'Problemsolving'), 123), (('Communication', 'Sales'), 123)]


**Finding the generalized frequent itemsets**

In [None]:
# Frequent itemests in all the partitions

def generalized_pass_APriori_partitioned(partitions, supp, frequent_set, pass_):
    basket_counts = {}
    for basket in partitions:
      tuple_list=list(combinations(sorted(basket),pass_))
      for tuple_ in tuple_list:
        if all(item in frequent_set for item in list(combinations(sorted(tuple_),pass_-1))):
          basket_counts[tuple_] = basket_counts.get(tuple_, 0) + 1

    frequent_items = [(item, count) for item, count in basket_counts.items() if count > supp] # Filtra gli elementi con supporto maggiore di `supp`
    #sorted_frequent_items = sorted(frequent_items, key=lambda x: x[1], reverse=True) # Ordina gli elementi per supporto decrescente
    return frequent_items

In [None]:
# Frequent itemsets in the entire dataset considered

count=1
frequent_set=frequent_couples
pass_=3

while count>0:

  print(f"This is the {pass_} pass")
  generalized_pass_part = splitted_rdd.mapPartitions(lambda partition: generalized_pass_APriori_partitioned(partition, supp_partition, frequent_set, pass_))
  print("The frequent set is: ")
  print(generalized_pass_part.take(15))
  frequent_set = set(generalized_pass_part.map(lambda x:x[0]).collect())

  print(f"We obtained {len(frequent_set)} itemsets")

  print("How many are frequent in all the set?")
  frdd = splitted_rdd.flatMap(lambda basket: [(e,1) for e in combinations(sorted(basket),pass_) if e in frequent_set])\
              .reduceByKey(lambda x,y:x+y) \
              .filter(lambda x:x[1] > support)

  count=frdd.count()
  print("Remaining itemsets: ", count)
  print(f"First 15  items", frdd.take(15))
  pass_ +=1


This is the 3 pass
The frequent set is: 
[(('Communication', 'Problemsolving', 'Teamwork'), 12), (('Communication', 'Customer Service', 'Teamwork'), 12), (('Communication', 'Problem Solving', 'Teamwork'), 13), (('Communication', 'Leadership', 'Teamwork'), 14), (('Communication', 'Customer service', 'Teamwork'), 15), (('Communication', 'Customer service', 'Time management'), 12), (('Communication', 'Flexibility', 'Teamwork'), 12), (('Customer service', 'Teamwork', 'Time management'), 11), (('Attention to detail', 'Communication', 'Customer service'), 13), (('Communication', 'Teamwork', 'Time Management'), 12), (('Attention to detail', 'Communication', 'Problemsolving'), 11), (('Attention to detail', 'Customer service', 'Problemsolving'), 12), (('Communication', 'Customer service', 'Problemsolving'), 18), (('Communication', 'Customer service', 'Inventory management'), 11), (('Communication', 'Leadership', 'Problem Solving'), 23)]
We obtained 44 itemsets
How many are frequent in all the s