Author: Beatrice Cagnin

A.Y. 2023-2024

---

## Project 2: Market-basket analysis

The task is to implement a system finding frequent itemsets (aka market-basket analysis), analyzing the LinkedIn Jobs & Skills dataset.

The detector must consider:
- BASKETS = the strings contained in the job_skills column of the job_skills.csv file
- ITEMS = skills contained in the previous strings

---

In [1]:
import os
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import size, split, col
from itertools import combinations

Connection to Kaggle and download (and unzip) the dataset

In [2]:
os.environ['KAGGLE_USERNAME'] = "xxxxxx"
os.environ['KAGGLE_KEY'] = "xxxxxx"

!kaggle datasets download -d asaniczka/1-3m-linkedin-jobs-and-skills-2024

!unzip 1-3m-linkedin-jobs-and-skills-2024.zip -d /content/data

Dataset URL: https://www.kaggle.com/datasets/asaniczka/1-3m-linkedin-jobs-and-skills-2024
License(s): ODC Attribution License (ODC-By)
Downloading 1-3m-linkedin-jobs-and-skills-2024.zip to /content
100% 1.87G/1.88G [00:15<00:00, 206MB/s]
100% 1.88G/1.88G [00:15<00:00, 130MB/s]
Archive:  1-3m-linkedin-jobs-and-skills-2024.zip
  inflating: /content/data/job_skills.csv  
  inflating: /content/data/job_summary.csv  
  inflating: /content/data/linkedin_job_postings.csv  


Initialize PySpark session

In [3]:
spark = SparkSession.builder.appName("A_priori").getOrCreate()

---

## Dataset analysis

In [4]:
df = spark.read.csv("/content/data/job_skills.csv", header = True, sep = ",")
df.show(10)

+--------------------+--------------------+
|            job_link|          job_skills|
+--------------------+--------------------+
|https://www.linke...|Building Custodia...|
|https://www.linke...|Customer service,...|
|https://www.linke...|Applied Behavior ...|
|https://www.linke...|Electrical Engine...|
|https://www.linke...|Electrical Assemb...|
|https://www.linke...|Access Control, V...|
|https://www.linke...|Consultation, Sup...|
|https://www.linke...|Veterinary Recept...|
|https://www.linke...|Optical Inspectio...|
|https://www.linke...|HVAC, troubleshoo...|
+--------------------+--------------------+
only showing top 10 rows



#### Check for Null values

In [5]:
df.printSchema()

root
 |-- job_link: string (nullable = true)
 |-- job_skills: string (nullable = true)



In [6]:
df_null_link = df.filter(df.job_link.isNull())
df_null_link.show()

df_null_skills = df.filter(df.job_skills.isNull())
df_null_skills.show()

+--------+----------+
|job_link|job_skills|
+--------+----------+
+--------+----------+

+--------------------+----------+
|            job_link|job_skills|
+--------------------+----------+
|https://www.linke...|      NULL|
|https://www.linke...|      NULL|
|https://uk.linked...|      NULL|
|https://www.linke...|      NULL|
|https://www.linke...|      NULL|
|https://uk.linked...|      NULL|
|https://www.linke...|      NULL|
|https://www.linke...|      NULL|
|https://www.linke...|      NULL|
|https://www.linke...|      NULL|
|https://www.linke...|      NULL|
|https://www.linke...|      NULL|
|https://www.linke...|      NULL|
|https://www.linke...|      NULL|
|https://www.linke...|      NULL|
|https://au.linked...|      NULL|
|https://www.linke...|      NULL|
|https://www.linke...|      NULL|
|https://www.linke...|      NULL|
|https://www.linke...|      NULL|
+--------------------+----------+
only showing top 20 rows



As it can be seen, there are not null 'job_link' but Null values are present on variable 'job_skills'.

In [7]:
df_cleaned = df.dropna()
df_cleaned.show()

+--------------------+--------------------+
|            job_link|          job_skills|
+--------------------+--------------------+
|https://www.linke...|Building Custodia...|
|https://www.linke...|Customer service,...|
|https://www.linke...|Applied Behavior ...|
|https://www.linke...|Electrical Engine...|
|https://www.linke...|Electrical Assemb...|
|https://www.linke...|Access Control, V...|
|https://www.linke...|Consultation, Sup...|
|https://www.linke...|Veterinary Recept...|
|https://www.linke...|Optical Inspectio...|
|https://www.linke...|HVAC, troubleshoo...|
|https://www.linke...|Host/Server Assis...|
|https://www.linke...|Apartment mainten...|
|https://www.linke...|Fiber Optic Cable...|
|https://www.linke...|CT Technologist, ...|
|https://ca.linked...|SAP, DRMIS, Data ...|
|https://www.linke...|Debt and equity o...|
|https://ca.linked...|Biomedical Engine...|
|https://www.linke...|Laboratory Techni...|
|https://www.linke...|Program Managemen...|
|https://www.linke...|Hiring, Tr

Now the dataset does not contain any null value.

#### Focus on column 'job_skills'

In [8]:
df_skills =  df_cleaned.select("job_skills")
df_skills.show(20, truncate=False)

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|job_skills                                                                                                                                                                              

Number of skills for each row of the 'job_skills' column

In [9]:
df_skills_counts = df_skills.withColumn("skills_count", size(split(col("job_skills"), ", ")))
df_skills_counts.show(20)

+--------------------+------------+
|          job_skills|skills_count|
+--------------------+------------+
|Building Custodia...|          14|
|Customer service,...|          19|
|Applied Behavior ...|          20|
|Electrical Engine...|          35|
|Electrical Assemb...|          11|
|Access Control, V...|          19|
|Consultation, Sup...|          29|
|Veterinary Recept...|          22|
|Optical Inspectio...|          17|
|HVAC, troubleshoo...|           9|
|Host/Server Assis...|          29|
|Apartment mainten...|          19|
|Fiber Optic Cable...|          16|
|CT Technologist, ...|          11|
|SAP, DRMIS, Data ...|          19|
|Debt and equity o...|          27|
|Biomedical Engine...|          18|
|Laboratory Techni...|          28|
|Program Managemen...|          23|
|Hiring, Training,...|          33|
+--------------------+------------+
only showing top 20 rows



In [10]:
dataset = df_cleaned.withColumn("skills_count", size(split(col("job_skills"), ", ")))
dataset.show(20)

+--------------------+--------------------+------------+
|            job_link|          job_skills|skills_count|
+--------------------+--------------------+------------+
|https://www.linke...|Building Custodia...|          14|
|https://www.linke...|Customer service,...|          19|
|https://www.linke...|Applied Behavior ...|          20|
|https://www.linke...|Electrical Engine...|          35|
|https://www.linke...|Electrical Assemb...|          11|
|https://www.linke...|Access Control, V...|          19|
|https://www.linke...|Consultation, Sup...|          29|
|https://www.linke...|Veterinary Recept...|          22|
|https://www.linke...|Optical Inspectio...|          17|
|https://www.linke...|HVAC, troubleshoo...|           9|
|https://www.linke...|Host/Server Assis...|          29|
|https://www.linke...|Apartment mainten...|          19|
|https://www.linke...|Fiber Optic Cable...|          16|
|https://www.linke...|CT Technologist, ...|          11|
|https://ca.linked...|SAP, DRMI

---

Since working with the whole datasets takes a lot of time, a sample of it is considered.

In [11]:
row_count = df_cleaned.count()
print("Total number of rows:", row_count)

Total number of rows: 1294374


In [12]:
sample = df_skills.sample(False, 0.01, seed=123)

sample_count = sample.count()
print("Total number of sample rows:", sample_count)

Total number of sample rows: 12851


---

Insert data in PySpark, by creating a Resilient Distributed Dataset (RDD), after having tranform each row into a list.

In [13]:
rows_as_lists = [list(row) for row in sample.collect()]

rdd = spark.sparkContext.parallelize(rows_as_lists)
rdd.collect()

[['Service Mentality, Attention to Detail, Sense of Urgency, Initiative, Flexibility, Logic, Problemsolving, Customer service, Tact, Professionalism, HIPAA, PHI, Federal and state laws, Strong service mentality, Telephone etiquette, Personal etiquette, Warm, Positive, Energetic, Professional, Oral communication, Written communication, Tactful communication, Diplomatic communication, Personnel recruitment, Performance management, Performance assessment, Selfdevelopment, Proficient in computer applications, Word, Excel, Coordinating, Prioritizing, Multitasking, Opportunity identification, Plan of action development, Plan implementation, Plan evaluation, Conflict resolution, 401(k) Retirement Plan, Medical Plan, Vision Plan, Prescription Plan, Telehealth Plan, Dental Plan, Life and Disability Insurance, Paid Time Off, Extended Illness Days, Colleague Referral Bonus, Tuition Reimbursement, Commuter Benefits, Dependent Care Spending Account, Employee Discounts'],
 ['ECE units, Teaching expe

In [14]:
rdd.getNumPartitions()

2

Transform each row of skills into strings of skills, using MAP functions

In order to obtain **baskets**, map() function is used.

**map()** - returns a list of lists of strings

In [15]:
baskets = rdd.map(lambda row: set(item.strip() for item in row[0].split(',')))
baskets.collect()

[{'401(k) Retirement Plan',
  'Attention to Detail',
  'Colleague Referral Bonus',
  'Commuter Benefits',
  'Conflict resolution',
  'Coordinating',
  'Customer service',
  'Dental Plan',
  'Dependent Care Spending Account',
  'Diplomatic communication',
  'Employee Discounts',
  'Energetic',
  'Excel',
  'Extended Illness Days',
  'Federal and state laws',
  'Flexibility',
  'HIPAA',
  'Initiative',
  'Life and Disability Insurance',
  'Logic',
  'Medical Plan',
  'Multitasking',
  'Opportunity identification',
  'Oral communication',
  'PHI',
  'Paid Time Off',
  'Performance assessment',
  'Performance management',
  'Personal etiquette',
  'Personnel recruitment',
  'Plan evaluation',
  'Plan implementation',
  'Plan of action development',
  'Positive',
  'Prescription Plan',
  'Prioritizing',
  'Problemsolving',
  'Professional',
  'Professionalism',
  'Proficient in computer applications',
  'Selfdevelopment',
  'Sense of Urgency',
  'Service Mentality',
  'Strong service mental

In [16]:
n_baskets = baskets.count()
print(f"Total number of unique baskets: {n_baskets}")

Total number of unique baskets: 12851


In order to obtain **items**, flatMap() function is used.

**flatMap()** -- returns a list of strings

In [17]:
items = baskets.flatMap(lambda basket: basket).distinct()
items.take(20)

['Written communication',
 'Colleague Referral Bonus',
 'Tuition Reimbursement',
 'Opportunity identification',
 'Excel',
 'Professionalism',
 'Tact',
 '401(k) Retirement Plan',
 'Telephone etiquette',
 'Extended Illness Days',
 'Proficient in computer applications',
 'Dental Plan',
 'Dependent Care Spending Account',
 'Oral communication',
 'Conflict resolution',
 'Medical Plan',
 'Plan evaluation',
 'Problemsolving',
 'Strong service mentality',
 'Professional']

In [18]:
n_items = items.count()
print(f"Total number of unique items: {n_items}")

Total number of unique items: 99082


Convert items into dictionaries

In [19]:
it = items.zipWithIndex()
it.take(5)

[('Written communication', 0),
 ('Colleague Referral Bonus', 1),
 ('Tuition Reimbursement', 2),
 ('Opportunity identification', 3),
 ('Excel', 4)]

In [20]:
it_index = it.collectAsMap()

In [21]:
type(it_index)

dict

Function to reobtain items from indexes

In [22]:
reverse_it_index = {index: item for item, index in it_index.items()}

Function to tranform baskets into sets of indexes (representing items)

In [23]:
def hashing(basket):
    return {it_index[skill] for skill in basket}

b_baskets = baskets.map(hashing)
b_baskets.first()

{0,
 1,
 2,
 3,
 4,
 5,
 6,
 7,
 8,
 9,
 10,
 11,
 12,
 13,
 14,
 15,
 16,
 17,
 18,
 19,
 20,
 21,
 22,
 23,
 24,
 25,
 26,
 27,
 28,
 49703,
 49704,
 49705,
 49706,
 49707,
 49708,
 49709,
 49710,
 49711,
 49712,
 49713,
 49714,
 49715,
 49716,
 49717,
 49718,
 49719,
 49720,
 49721,
 49722,
 49723,
 49724,
 49725,
 49726}

---

## A-Priori Algorithm

**Support threshold**: take the 2% of the number of baskets

In [24]:
support = int(0.02 * n_baskets)
support

257

### Frequent singletons (skills)

In [25]:
support = 275

first_pass = b_baskets.flatMap(lambda basket:[(skill, 1) for skill in basket]) \
                      .reduceByKey(lambda x,y: x+y) \
                      .filter(lambda x: x[1] > support)

frequent_singletons = set(first_pass.map(lambda x:x[0]).collect())

print("Total number of singletons:", len(frequent_singletons))
print("5 random singleton:", first_pass.take(5))

Total number of singletons: 66
5 random singleton: [(4, 423), (49710, 1588), (49712, 533), (38, 789), (54, 1110)]


In [26]:
first_five_singletons = first_pass.takeOrdered(5, key=lambda x: -x[1])

first_five_singletons_ = [
    (reverse_it_index[singleton[0]], singleton[1])
    for singleton in first_five_singletons
]

print("The first 5 most frequent singletons are:")
for item, count in first_five_singletons_:
    print(f"{item}: {count}")

The first 5 most frequent singletons are:
Communication: 3605
Teamwork: 2220
Leadership: 1826
Customer service: 1588
Communication skills: 1118


### Frequent pairs of skills

In [27]:
second_pass = b_baskets.flatMap(lambda basket: [(tuple(sorted((e0, e1))), 1) for e0 in basket for e1 in basket if e1 != e0 and e0 in frequent_singletons and e1 in frequent_singletons]) \
                       .reduceByKey(lambda x,y: x+y) \
                       .filter(lambda x: x[1] > support)

frequent_pairs = set(second_pass.map(lambda x:x[0]).collect())

print("Total number of frequent pairs:", len(frequent_pairs))
print("5 random frequent pairs:", second_pass.take(5))


Total number of frequent pairs: 134
5 random frequent pairs: [((23, 49751), 532), ((53, 49751), 2728), ((23, 53), 396), ((17, 49751), 1344), ((33, 49751), 658)]


In [28]:
first_five_pairs = second_pass.takeOrdered(5, key=lambda x: -x[1])


first_five_pairs_ = [
    ((reverse_it_index[triple[0]], reverse_it_index[triple[1]]), score)
    for triple, score in first_five_pairs
]

print("The first 5 most frequent triples are:")
for item in first_five_pairs_:
    print(item)

The first 5 most frequent triples are:
(('Teamwork', 'Communication'), 2728)
(('Communication', 'Leadership'), 2322)
(('Customer service', 'Communication'), 1552)
(('Problemsolving', 'Communication'), 1344)
(('Teamwork', 'Leadership'), 1316)


### Frequent triples of skills

In [29]:
third_pass = b_baskets.flatMap(lambda basket:[(tuple(sorted((e0, e1, e2))), 1) for e0 in basket for e1 in basket for e2 in basket if \
                                         e1 != e0 and e0 != e2 and (e0,e1) in frequent_pairs and (e1,e2) in frequent_pairs and (e0,e2) in frequent_pairs]) \
                      .reduceByKey(lambda x,y: x+y) \
                      .filter(lambda x: x[1] > support)

frequent_triples = set(third_pass.map(lambda x: x[0]).collect())

print("Total number of frequent triples:", len(frequent_triples))
print("5 random frequent triples:", third_pass.take(5))

Total number of frequent triples: 5
5 random frequent triples: [((17, 53, 49751), 402), ((53, 49751, 49829), 510), ((53, 49751, 49935), 295), ((17, 49710, 49751), 321), ((53, 49710, 49751), 380)]


In [30]:
first_five_triples = third_pass.takeOrdered(5, key=lambda x: -x[1])

first_five_triples_ = [
    ((reverse_it_index[triple[0]], reverse_it_index[triple[1]], reverse_it_index[triple[2]]), score)
    for triple, score in first_five_triples
]

print("The first 5 most frequent triples are:")
for item in first_five_triples_:
    print(item)

The first 5 most frequent triples are:
(('Teamwork', 'Communication', 'Leadership'), 510)
(('Problemsolving', 'Teamwork', 'Communication'), 402)
(('Teamwork', 'Customer service', 'Communication'), 380)
(('Problemsolving', 'Customer service', 'Communication'), 321)
(('Teamwork', 'Communication', 'Problem Solving'), 295)


---

**Generalized A-Priori algorithm**, in order to find frequent itemsets of all sizes

In [31]:
def a_priori(b_baskets, support, it_index):

  print("Frequent singletons")

  first_pass = b_baskets.flatMap(lambda basket: [(skill, 1) for skill in basket]) \
                        .reduceByKey(lambda x,y: x+y) \
                        .filter(lambda x: x[1]>support)

  first_pass_count = first_pass.count()

  print("Number of frequent singletons: ",first_pass_count)

  max_s = first_pass.max(lambda x: x[1])
  max_s_ = (list(it_index.keys())[list(it_index.values()).index(max_s[0])])
  print("Most frequent singleton:",max_s_)

  frequents = set(first_pass.map(lambda x: (x[0],)).collect())

  print()


  k = 2

  while True:

    print("Itemsets of size:",k)

    pass_ = b_baskets.flatMap(lambda basket:[(elem,1) for elem in combinations(sorted(basket), k) if
                                                      all(item in frequents for item in combinations(elem,k-1))]) \
                     .reduceByKey(lambda x,y: x+y) \
                     .filter(lambda x: x[1]>support)

    pass_count = pass_.count()

    if pass_count != 0:
      print("Number of frequent itemsets of size",k,":",pass_count)

      max_p = pass_.max(lambda x: x[1])
      max_itemset = []
      for skill in max_p[0]:
        max_itemset.append(list(it_index.keys())[list(it_index.values()).index(skill)])
      print("Most frequent itemset of size",k,"is:",max_itemset)

      frequents = set(pass_.map(lambda x: x[0]).collect())

      print()

      k+=1


    else:
      print("There are no frequent itemsets of size",k)
      print("Given the monotonicity of itemsets, there are no more frequent itemsets.")
      break

In [32]:
a_priori(b_baskets, support, it_index)

Frequent singletons
Number of frequent singletons:  66
Most frequent singleton: Communication

Itemsets of size: 2
Number of frequent itemsets of size 2 : 43
Most frequent itemset of size 2 is: ['Teamwork', 'Communication']

Itemsets of size: 3
Number of frequent itemsets of size 3 : 5
Most frequent itemset of size 3 is: ['Teamwork', 'Communication', 'Leadership']

Itemsets of size: 4
There are no frequent itemsets of size 4
Given the monotonicity of itemsets, there are no more frequent itemsets.


In [33]:
spark.stop