# **Algorithms for Massive Data**
## Market Basket Analysis using [LinkedIn job skills](https://www.kaggle.com/datasets/asaniczka/1-3m-linkedin-jobs-and-skills-2024) dataset

### Author: Adriano Meligrana
#### email: adriano.meligrana@studenti.unimi.it
#### Academic year: 2023-2024

Installing pyspark and importing the libraries

In [1]:
import os
import pyspark

In [2]:
#logging in into Kaggle and downloading the dataset

#kaggle enviroment
os.environ['KAGGLE_USERNAME'] = "xxxxxx"
os.environ['KAGGLE_KEY'] = "xxxxxx"

#dataset download
!kaggle datasets download -d asaniczka/1-3m-linkedin-jobs-and-skills-2024

#dataset unzip
!unzip 1-3m-linkedin-jobs-and-skills-2024.zip -d data

/bin/bash: line 1: kaggle: command not found
unzip:  cannot find or open 1-3m-linkedin-jobs-and-skills-2024.zip, 1-3m-linkedin-jobs-and-skills-2024.zip.zip or 1-3m-linkedin-jobs-and-skills-2024.zip.ZIP.


In [3]:
#creating the Spark session
from pyspark.sql import SparkSession

spark = SparkSession.builder\
    .master('local[*]') \
    .config("spark.driver.memory", "15g") \
    .appName('apriori') \
    .getOrCreate()

#importing the dataset into spark
df = spark.read.csv("./job_skills.csv", header=True, sep=",")
df.show()

24/06/10 01:52:27 WARN Utils: Your hostname, bob-Victus-by-HP-Gaming-Laptop-15-fb0xxx resolves to a loopback address: 127.0.1.1; using 192.168.1.107 instead (on interface wlo1)
24/06/10 01:52:27 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/10 01:52:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


+--------------------+--------------------+
|            job_link|          job_skills|
+--------------------+--------------------+
|https://www.linke...|Building Custodia...|
|https://www.linke...|Customer service,...|
|https://www.linke...|Applied Behavior ...|
|https://www.linke...|Electrical Engine...|
|https://www.linke...|Electrical Assemb...|
|https://www.linke...|Access Control, V...|
|https://www.linke...|Consultation, Sup...|
|https://www.linke...|Veterinary Recept...|
|https://www.linke...|Optical Inspectio...|
|https://www.linke...|HVAC, troubleshoo...|
|https://www.linke...|Host/Server Assis...|
|https://www.linke...|Apartment mainten...|
|https://www.linke...|Fiber Optic Cable...|
|https://www.linke...|CT Technologist, ...|
|https://ca.linked...|SAP, DRMIS, Data ...|
|https://www.linke...|Debt and equity o...|
|https://ca.linked...|Biomedical Engine...|
|https://www.linke...|Laboratory Techni...|
|https://www.linke...|Program Managemen...|
|https://www.linke...|Hiring, Tr

In [4]:
from pyspark.sql import Row

def preprocessing(row):
    skills_str = row.job_skills.lower()
    skills_str = skills_str.replace(" capacities", "")
    skills_str = skills_str.replace(" abilities", "")
    skills_str = skills_str.replace(" skills", "")
    skills_str = skills_str.replace("decisionmaking", "decision making")
    skills_str = skills_str.replace("problemsolving", "problem solving")
    skills_str = skills_str.replace("teamwork", "team work")
    skills_str = skills_str.replace("problem-solving", "problem solving")
    skills = list(set(skills_str.split(", ")))
    row_dict = row.asDict()
    row_dict['job_skills'] = skills
    return Row(**row_dict)

n = df.count()
print(f"Dataset length: {n}")
df = df.na.drop()
print(f"number of dropped NA values: {n - df.count()}")
df = df.drop("job_link")
df = df.rdd.map(lambda row: preprocessing(row)).toDF().cache()

                                                                                

Dataset length: 1296381


                                                                                

number of dropped NA values: 2007


### Implementation of the $\mathrm{A}$-$\mathrm{Priori}$ $\mathrm{Algorithm}$

In [6]:
from operator import add 
from itertools import combinations
import timeit
import pandas as pd

"""
apriori(df, basket_col, support_threshold = 0.01, max_frequent = 5, sample_fraction = 0.1, seed = 42, cache = True)
  
  df            -> a PySpark DataFrame object containing baskets.
  basket_col    -> the column of the dataframe containing baskets.
  s_threshold   -> the percentage value of the support threshold, e.g. s = 1% -> s_threshold = 0.01.
  max_frequent  -> the last set size of frequent itemsets to check, e.g. if last_frequent = 2, 
                   only singletons and pairs will be checked.
  fraction      -> sample fraction of baskets which will be used to find frequent itemsets.
  seed          -> Seed used when sampling.
  cache         -> If caching should be applied when running the algorithm.
"""
def apriori(df, basket_col, support_threshold = 0.01, max_frequent = 6, sample_fraction = 0.1, seed = 42, cache = True):

  if sample_fraction != 1:
      df = df.sample(False, fraction = sample_fraction, seed = seed).cache()

  rdd = df.rdd.map(lambda row: row[basket_col])
  size = rdd.count()
  support = int(size*support_threshold) + 1
  uniques = {e: i for i, e in enumerate(set(item for basket in rdd.collect() for item in basket))}
  uniques_opposite = {i: e for (e, i) in uniques.items()}
  rdd = rdd.map(lambda basket: [uniques[item] for item in basket])
  if cache: rdd = rdd.cache()

  frequent_items_dfs = []
  for i in range(max_frequent):

    print(f"Checking frequent sets of {i+1} elements")

    t0 = timeit.default_timer()

    if i != 0:
        rdd = rdd.map(lambda basket: compute_frequents(basket, frequent_items, i))
        rdd = rdd.filter(lambda basket: len(basket) > i)
        if cache: rdd = rdd.cache()

    n_pass = rdd.flatMap(lambda basket: ((items, 1) for items in combinations(basket, i+1)))
    n_pass = n_pass.reduceByKey(add)
    n_pass = n_pass.filter(lambda key_value: key_value[1] >= support)

    frequent_items_values = n_pass.collect()
      
    frequent_items = set(key_values[0] for key_values in frequent_items_values)
    frequent_items_values = sorted(frequent_items_values, key = lambda key_values: key_values[1], reverse = True)
    frequent_items_values = [(*[uniques_opposite[k] for k in kv[0]], kv[1]) for kv in frequent_items_values]
    fdf = pd.DataFrame.from_records(frequent_items_values, columns=[*[f'item_{s+1}' for s in range(i+1)], "frequency"])
    frequent_items_dfs.append(fdf)

    t1 = timeit.default_timer()

    print(f"Step number {i+1} completed in {round(t1-t0, 2)} seconds.")
    print(f"Number of frequent sets of {i+1} elements: {len(frequent_items_values)}\n")
    display_n(fdf,5)
    
  return frequent_items_dfs

def compute_frequents(basket, frequent_items, i):
    new_basket = set()
    for items in combinations(basket, i):
        if items in frequent_items:
            for item in items:
                if item not in new_basket:
                    new_basket.add(item)
    return sorted(new_basket)

def display_n(df,n):
    with pd.option_context('display.max_rows',n*2):
        display(df)

In [7]:
%%prun
results = apriori(df, "job_skills", support_threshold=0.01, max_frequent=6, sample_fraction=0.1, seed=42, cache=True)

                                                                                

Checking frequent sets of 1 elements




Step number 1 completed in 3.41 seconds.
Number of frequent sets of 1 elements: 186



                                                                                

Unnamed: 0,item_1,frequency
0,communication,55536
1,problem solving,31655
2,customer service,28831
3,team work,25041
4,leadership,20546
...,...,...
181,oral communication,1339
182,report writing,1329
183,sap,1326
184,ethics,1321


Checking frequent sets of 2 elements


[Stage 16:>                                                       (0 + 12) / 12]

Step number 2 completed in 2.02 seconds.
Number of frequent sets of 2 elements: 291



                                                                                

Unnamed: 0,item_1,item_2,frequency
0,communication,problem solving,25617
1,communication,team work,19886
2,communication,customer service,19353
3,communication,leadership,15125
4,team work,problem solving,14124
...,...,...,...
286,management,problem solving,1317
287,collaboration,patient care,1317
288,sales,high school diploma,1314
289,prioritization,problem solving,1305


Checking frequent sets of 3 elements




Step number 3 completed in 1.6 seconds.
Number of frequent sets of 3 elements: 233



                                                                                

Unnamed: 0,item_1,item_2,item_3,frequency
0,communication,team work,problem solving,12718
1,communication,customer service,problem solving,10462
2,communication,leadership,problem solving,9270
3,communication,team work,customer service,8670
4,communication,time management,problem solving,8177
...,...,...,...,...
228,team work,attention to detail,adaptability,1310
229,inventory management,merchandising,customer service,1307
230,sales,time management,problem solving,1304
231,organization,time management,problem solving,1299


Checking frequent sets of 4 elements




Step number 4 completed in 6.97 seconds.
Number of frequent sets of 4 elements: 83



                                                                                

Unnamed: 0,item_1,item_2,item_3,item_4,frequency
0,communication,team work,customer service,problem solving,5869
1,communication,leadership,team work,problem solving,4996
2,communication,time management,team work,problem solving,4957
3,communication,team work,attention to detail,problem solving,4645
4,communication,leadership,customer service,problem solving,4057
...,...,...,...,...,...
78,communication,multitasking,customer service,problem solving,1314
79,communication,conflict resolution,customer service,problem solving,1307
80,communication,conflict resolution,team work,problem solving,1305
81,communication,time management,team work,adaptability,1303


Checking frequent sets of 5 elements


[Stage 21:>                                                       (0 + 12) / 12]

Step number 5 completed in 0.91 seconds.
Number of frequent sets of 5 elements: 12



                                                                                

Unnamed: 0,item_1,item_2,item_3,item_4,item_5,frequency
0,communication,team work,customer service,attention to detail,problem solving,2433
1,communication,time management,team work,attention to detail,problem solving,2414
2,communication,time management,team work,customer service,problem solving,2404
3,communication,leadership,team work,customer service,problem solving,2388
4,communication,leadership,time management,team work,problem solving,2178
...,...,...,...,...,...,...
7,communication,sales,team work,customer service,problem solving,1622
8,communication,leadership,team work,attention to detail,problem solving,1600
9,communication,time management,team work,customer service,attention to detail,1487
10,time management,team work,customer service,attention to detail,problem solving,1413


Checking frequent sets of 6 elements
Step number 6 completed in 0.62 seconds.
Number of frequent sets of 6 elements: 1



Unnamed: 0,item_1,item_2,item_3,item_4,item_5,item_6,frequency
0,communication,time management,team work,customer service,attention to detail,problem solving,1333


 

         2861661 function calls (2860230 primitive calls) in 25.776 seconds

   Ordered by: internal time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
     1427   24.103    0.017   24.103    0.017 {method 'recv_into' of '_socket.socket' objects}
      832    0.472    0.001    0.477    0.001 {built-in method _pickle.loads}
        1    0.424    0.424   25.721   25.721 3580826173.py:18(apriori)
  2679245    0.197    0.000    0.197    0.000 3580826173.py:26(<genexpr>)
        1    0.136    0.136    0.136    0.136 3580826173.py:26(<dictcomp>)
       21    0.112    0.005    0.125    0.006 {function CloudPickler.dump at 0x7a92500776d0}
        1    0.092    0.092    0.092    0.092 3580826173.py:27(<dictcomp>)
        1    0.055    0.055   25.776   25.776 <string>:1(<module>)
        3    0.016    0.005    0.016    0.005 {method 'write' of '_io.BufferedRandom' objects}
      514    0.008    0.000    0.008    0.000 {method 'sendall' of '_socket.socket' objects}
    