<a href="https://colab.research.google.com/github/guber25/Market_Basket_Analysis/blob/main/Market_Basket_Analysis_AMD.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Algorithms for Massive Data**
## Market Basket Analysis using [LinkedIn job skills](https://www.kaggle.com/datasets/asaniczka/1-3m-linkedin-jobs-and-skills-2024) dataset

### Author: Guglielmo Berzano
#### email: guglielmo.berzano@studenti.unimi.it
#### Academic year: 2023-2024

Installing pyspark and importing the libraries

In [1]:
!pip install pyspark
import pyspark

from itertools import combinations
import os

import timeit

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=fd54d67b7c11b668bf9e03d0b3494a30f44300595c582bf9c9802270e9197c33
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [2]:
#logging in into Kaggle and downloading the dataset

#kaggle enviroment
os.environ['KAGGLE_USERNAME'] = "xxxxxx"
os.environ['KAGGLE_KEY'] = "xxxxxx"

#dataset download
!kaggle datasets download -d asaniczka/1-3m-linkedin-jobs-and-skills-2024

#dataset unzip
!unzip 1-3m-linkedin-jobs-and-skills-2024.zip -d data

Downloading 1-3m-linkedin-jobs-and-skills-2024.zip to /content
100% 1.88G/1.88G [00:19<00:00, 158MB/s]
100% 1.88G/1.88G [00:19<00:00, 102MB/s]
Archive:  1-3m-linkedin-jobs-and-skills-2024.zip
  inflating: data/job_skills.csv     
  inflating: data/job_summary.csv    
  inflating: data/linkedin_job_postings.csv  


In [3]:
#creating the Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Apriori").getOrCreate()

#importing the dataset into spark

raw_df = spark.read.csv("/content/data/job_skills.csv", header=True,sep=",")

raw_df.show()

+--------------------+--------------------+
|            job_link|          job_skills|
+--------------------+--------------------+
|https://www.linke...|Building Custodia...|
|https://www.linke...|Customer service,...|
|https://www.linke...|Applied Behavior ...|
|https://www.linke...|Electrical Engine...|
|https://www.linke...|Electrical Assemb...|
|https://www.linke...|Access Control, V...|
|https://www.linke...|Consultation, Sup...|
|https://www.linke...|Veterinary Recept...|
|https://www.linke...|Optical Inspectio...|
|https://www.linke...|HVAC, troubleshoo...|
|https://www.linke...|Host/Server Assis...|
|https://www.linke...|Apartment mainten...|
|https://www.linke...|Fiber Optic Cable...|
|https://www.linke...|CT Technologist, ...|
|https://ca.linked...|SAP, DRMIS, Data ...|
|https://www.linke...|Debt and equity o...|
|https://ca.linked...|Biomedical Engine...|
|https://www.linke...|Laboratory Techni...|
|https://www.linke...|Program Managemen...|
|https://www.linke...|Hiring, Tr

In [4]:
#df size before na dropping
print(f"PySpark dataset length before NA dropping: {raw_df.count()}")

# na drop
raw_df=raw_df.na.drop()

#df size after na drop
print(f"PySpark dataset length after NA dropping: {raw_df.count()}")

PySpark dataset length before NA dropping: 1296381
PySpark dataset length after NA dropping: 1294374


In [6]:
#removing the job_link column

raw_df=raw_df.drop("job_link")

### Implementation of the $\mathrm{A}$-$\mathrm{Priori}$ $\mathrm{Algorithm}$

To do the implementation, I will define the following function which takes as input a PySpark df and samples it in order to do all the computations.

In this case, the dataset is sampled because otherwise it would be too large to analyse. So, we are going from a $\mathrm {Cardinality}$ of the dataset, equal to $1,294,374$, to a $\mathrm {Cardinality}$ of $13,007$ where $2\%=260$. So, if we take a $support\_threshold$ $s=2\%$, we wil have that an itemset -- either a singleton, a pair, $\dotsc$ -- will be **frequent** if it appears in at least $2\%=260$ baskets.

In [8]:
def apriori(df, s_threshold, last_frequent = 4, sampling = True, fraction = 0.01, seed = 1234):

  """
  df -> a PySpark DataFrame object, assuming it does not contain any null or NA.

  s_threshold -> the percentage value, as an integer, of the support threshold (ex. s = 1% -> s_threshold = 1, not 0.01) !!

  last_frequent -> last frequent itemset to check for. If it is equal to zero it will find every single frequent itemset. If != 0, for example = 1,
  it will only check for frequent singletons, if = 2 for pairs and so on. Default = 4.

  sampling -> whether or not the df should be sampled.

  fraction -> fraction of df you want to keep.

  seed -> the seed of the sampling. Really important only if sampling == True.
  """
  names=["singletons", "pairs", "triplets", "quadruplets", "pentaplets"] # I assume that there cannot exist frequent itemsets bigger than pentaplets

  finished = False
  counter = 1
  print("""A-Priori Algorithm implementation\n\n
Wait for a moment while the program transforms your dataset into an RDD and that finds its size.\n""")

  if sampling == True:
    df = df.sample(False, fraction = fraction, seed = seed)

  rdd = df.rdd
  size = rdd.count()
  support = round(size*s_threshold/100)

  print(f"Your rdd is {size} rows long and is composed by {rdd.getNumPartitions()} partitions.\nAccording to the support, an itemset will be frequent if its count is at least {support}.\n")

  #removing the first part of the first row
  rdd=rdd.map(lambda bucket: {row.lower().replace("Row(job_skills=","") for row in bucket})

  rdd=rdd.map(lambda bucket: {row.replace(" skills","") for row in bucket})

  rdd=rdd.map(lambda bucket: {row.replace("problemsolving","problem solving") for row in bucket})

  #splitting every basket into each componing skill
  rdd = rdd.map(lambda row: [set(skill.split(", ")) for skill in row])

  while finished==False:

    print(f"\nSTEP {counter}\nChecking frequent {names[counter-1]}\nWork in progress!!\n")

    start_Time = timeit.default_timer()

    if counter == 1: #doing an if-else statement because when singletons are checked, we do not need to filter anything
      n_pass=rdd.flatMap(lambda basket: {(skill, 1) for items in basket for skill in items})\
                     .reduceByKey(lambda value_x, value_y: value_x + value_y)\
                     .filter(lambda key_value: key_value[1]>support)


    elif counter == 2:
      # I called the element in the tuple "mult_items" = multiple items because here we are checking for pairs, triplets, ...
      # In the first filter I am filtering those elements (the x) that are present in the frequent_items set by creating combinations of the elements in x

      n_pass = rdd.flatMap(lambda basket: {(mult_items,1) for items in basket for mult_items in combinations(sorted(items), counter) \
                                            if all(combination in frequent_items_keys for combination in mult_items)})\
                   .reduceByKey(lambda value_x, value_y: value_x + value_y)\
                   .filter(lambda key_value: key_value[1]>support)


    else:

      n_pass = rdd.flatMap(lambda basket: {(mult_items,1) for items in basket for mult_items in combinations(sorted(items), counter) \
                             if all(combination in frequent_items_keys for combination in sorted(combinations(mult_items, counter-1)))})\
                   .reduceByKey(lambda value_x, value_y: value_x + value_y)\
                   .filter(lambda key_value: key_value[1]>support)


    frequent_items = list(n_pass.collect())

    frequent_items = sorted(list(frequent_items), key = lambda key_values: key_values[1], reverse = True)

    frequent_items_keys = {key_values[0] for key_values in frequent_items}

    end_Time = timeit.default_timer()
    print(f"""
Step number {counter} completed in {round(end_Time-start_Time, 2)} seconds.
With a support theshold of {s_threshold}%, the algorithm found:\n
  - Number of frequent {names[counter-1]}: {len(frequent_items)}\n
  - Most frequent: {[(key_value[0], key_value[1]) for key_value in frequent_items[:3]]}\n
  - Least frequent: {[(key_value[0], key_value[1]) for key_value in frequent_items[-3::][::-1]]}\n
    """)

    counter+=1

    if len(frequent_items) == 0 or len(frequent_items) == 1 or (counter > last_frequent and last_frequent != 0) or counter > len(names):
      finished=True
      print("Project by \033[1mGuglielmo Berzano\033[0m, UniMi")

In [9]:
apriori(raw_df, s_threshold=2, last_frequent=4, sampling=True,fraction=0.01,seed=1234)

A-Priori Algorithm implementation


Wait for a moment while the program transforms your dataset into an RDD and that finds its size.

Your rdd is 13007 rows long and is composed by 6 partitions.
According to the support, an itemset will be frequent if its count is at least 260.


STEP 1
Checking frequent singletons
Work in progress!!


Step number 1 completed in 27.08 seconds.
With a support theshold of 2%, the algorithm found:

  - Number of frequent singletons: 77

  - Most frequent: [('communication', 5618), ('problem solving', 3167), ('customer service', 2965)]

  - Least frequent: [('education', 264), ('patient education', 269), ('marketing', 272)]

    

STEP 2
Checking frequent pairs
Work in progress!!


Step number 2 completed in 30.55 seconds.
With a support theshold of 2%, the algorithm found:

  - Number of frequent pairs: 91

  - Most frequent: [(('communication', 'problem solving'), 2597), (('communication', 'customer service'), 1996), (('communication', 'teamwork'), 1970)