<a href="https://colab.research.google.com/github/arina19-2000/unimi/blob/main/Algorithms%20for%20massive%20data%20/AMD%20Project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## **Market Basket Analysis Project**

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)
spark

In [2]:
import pyspark
sc = spark.sparkContext

In [3]:
os.environ['KAGGLE_USERNAME'] = "xxx"
os.environ['KAGGLE_KEY'] = "xxx"
!kaggle datasets download -d asaniczka/1-3m-linkedin-jobs-and-skills-2024 -f job_skills.csv

Dataset URL: https://www.kaggle.com/datasets/asaniczka/1-3m-linkedin-jobs-and-skills-2024
License(s): ODC Attribution License (ODC-By)
Downloading job_skills.csv.zip to /content
 98% 192M/197M [00:02<00:00, 97.5MB/s]
100% 197M/197M [00:02<00:00, 76.4MB/s]


In [4]:
!unzip /content/job_skills.csv.zip -d data

Archive:  /content/job_skills.csv.zip
  inflating: data/job_skills.csv     


In [5]:
df = spark.read.options(header=True).csv('data/job_skills.csv')
df.show()

+--------------------+--------------------+
|            job_link|          job_skills|
+--------------------+--------------------+
|https://www.linke...|Building Custodia...|
|https://www.linke...|Customer service,...|
|https://www.linke...|Applied Behavior ...|
|https://www.linke...|Electrical Engine...|
|https://www.linke...|Electrical Assemb...|
|https://www.linke...|Access Control, V...|
|https://www.linke...|Consultation, Sup...|
|https://www.linke...|Veterinary Recept...|
|https://www.linke...|Optical Inspectio...|
|https://www.linke...|HVAC, troubleshoo...|
|https://www.linke...|Host/Server Assis...|
|https://www.linke...|Apartment mainten...|
|https://www.linke...|Fiber Optic Cable...|
|https://www.linke...|CT Technologist, ...|
|https://ca.linked...|SAP, DRMIS, Data ...|
|https://www.linke...|Debt and equity o...|
|https://ca.linked...|Biomedical Engine...|
|https://www.linke...|Laboratory Techni...|
|https://www.linke...|Program Managemen...|
|https://www.linke...|Hiring, Tr

In [6]:
df = df.dropna()
df.describe()

summary,job_link,job_skills
count,1294374,1294374
mean,,
stddev,,
min,https://ae.linked...,"""* Maintenance, *..."
max,https://za.linked...,"志愿服务, 沟通, 陪伴, 临终服..."


## Data exploration

In [7]:
from pyspark.sql.functions import split, explode, col, lower, trim, when, count

skills_exploded = df.withColumn("skills_list", split(col("job_skills"), ",")) \
                   .withColumn("skill", explode(col("skills_list"))) \
                   .groupBy("skill") \
                   .agg(count("*").alias("count")) \
                   .withColumnRenamed("skill", "job_skills")

skills = skills_exploded.orderBy(col("count").desc())

row_count = skills.count()
print(row_count)
skills.show()

3387715
+--------------------+------+
|          job_skills| count|
+--------------------+------+
|       Communication|356365|
|            Teamwork|223000|
|          Leadership|162921|
| Communication sk...|104700|
|    Customer service|104112|
|     Problem Solving|101727|
|    Customer Service| 93458|
|      Problemsolving| 92206|
|       Collaboration| 86620|
|            Training| 82865|
| Communication Sk...| 77267|
| Attention to detail| 75147|
|     Time management| 72193|
|     Time Management| 69638|
| Microsoft Office...| 69557|
|  Project Management| 67227|
|               Sales| 65508|
|          Scheduling| 63264|
|    Customer service| 62046|
|             Nursing| 60250|
+--------------------+------+
only showing top 20 rows



In [8]:
skills_lower = df.withColumn("job_skills_lower", lower(col("job_skills"))) \
                  .withColumn("skills_list", split(col("job_skills_lower"), ",")).withColumn("skill", explode(col("skills_list"))) \
                  .groupBy("skill") \
                  .agg(count("*").alias("count")) \

skills = skills_lower.orderBy(col("count").desc())

row_count = skills.count()
print(row_count)
skills.show()

2849192
+--------------------+------+
|               skill| count|
+--------------------+------+
|       communication|358097|
|            teamwork|224315|
|    customer service|198670|
| communication sk...|182837|
|          leadership|163630|
|     problem solving|148428|
|     time management|142492|
| attention to detail|133549|
|      problemsolving|128969|
|  project management|104712|
| interpersonal sk...| 99391|
|        patient care| 96770|
|       collaboration| 86932|
|            training| 83324|
|    customer service| 79363|
|       data analysis| 79056|
| organizational s...| 75079|
| microsoft office...| 71593|
| inventory manage...| 70145|
|               sales| 65766|
+--------------------+------+
only showing top 20 rows



It appears that there were way less unique values after changing the case - now completing the same operation on the whole df.

In [9]:
from pyspark.sql.types import StringType

for col_name in df.columns:
    if isinstance(df.schema[col_name].dataType, StringType):
        df = df.withColumn(col_name, lower(col(col_name)))

df = df.withColumn("job_skills", split(col("job_skills"), ",\s*"))


In [10]:
df.show()

+--------------------+--------------------+
|            job_link|          job_skills|
+--------------------+--------------------+
|https://www.linke...|[building custodi...|
|https://www.linke...|[customer service...|
|https://www.linke...|[applied behavior...|
|https://www.linke...|[electrical engin...|
|https://www.linke...|[electrical assem...|
|https://www.linke...|[access control, ...|
|https://www.linke...|[consultation, su...|
|https://www.linke...|[veterinary recep...|
|https://www.linke...|[optical inspecti...|
|https://www.linke...|[hvac, troublesho...|
|https://www.linke...|[host/server assi...|
|https://www.linke...|[apartment mainte...|
|https://www.linke...|[fiber optic cabl...|
|https://www.linke...|[ct technologist,...|
|https://ca.linked...|[sap, drmis, data...|
|https://www.linke...|[debt and equity ...|
|https://ca.linked...|[biomedical engin...|
|https://www.linke...|[laboratory techn...|
|https://www.linke...|[program manageme...|
|https://www.linke...|[hiring, t

## **Sampling**

In [11]:
import pyspark.sql.functions as F

split_ = df.sample(False,0.01,seed=42)
#split = skills.sample(False,0.02,seed=42)

In [12]:
split_.show()

+--------------------+--------------------+
|            job_link|          job_skills|
+--------------------+--------------------+
|https://www.linke...|[employee trainin...|
|https://www.linke...|[aviation, englis...|
|https://www.linke...|[electronic techn...|
|https://ca.linked...|[warranty service...|
|https://uk.linked...|[flight instructo...|
|https://www.linke...|[project manageme...|
|https://www.linke...|[basketball coach...|
|https://www.linke...|[litigation, juri...|
|https://www.linke...|[accounting, audi...|
|https://www.linke...|[nursing, patient...|
|https://uk.linked...|[commercial leade...|
|https://www.linke...|[ob/gyn, epic emr...|
|https://www.linke...|[guest service, c...|
|https://www.linke...|[oracle, accounti...|
|https://www.linke...|[accounting, audi...|
|https://www.linke...|[scoping, buyout,...|
|https://www.linke...|[mobile technicia...|
|https://www.linke...|[travel planning ...|
|https://ca.linked...|[computer enginee...|
|https://www.linke...|[tax accou

In [13]:
split_.describe()

summary,job_link
count,13057
mean,
stddev,
min,https://au.linked...
max,https://www.linke...


### Forming baskets

In [14]:
rdd = split_.select('job_skills').rdd
rdd.take(5)

[Row(job_skills=['employee training', 'food safety', 'inventory management', 'team management', 'kitchen supervision', 'food quality control', 'food safety', 'cost control', 'menu management', 'hiring', 'staff scheduling', 'sales management', 'profit optimization', 'customer service', 'guest experience', 'service area organization', 'teamwork', 'hsd or ged', 'passion for helping and serving others', 'leadership experience', 'motivation', 'development of others', 'commitment to excellence', 'commitment to safety', 'strong customer service', 'support focus', 'effective communication', 'anticipating customer needs', 'problem solving', 'decisionmaking']),
 Row(job_skills=['aviation', 'english', 'faa commercial pilot license', 'ifr', 'sel', 'high performance', 'complex', 'second class medical', 'fcc restricted radiotelephone operator permit', 'current passport', "valid driver's license", '401k', 'medical', 'dental', 'vision', 'shortterm disability', 'longterm disability', 'life insurance', 

In [15]:
baskets = rdd.map(lambda x: x['job_skills'])
baskets.take(3)

[['employee training',
  'food safety',
  'inventory management',
  'team management',
  'kitchen supervision',
  'food quality control',
  'food safety',
  'cost control',
  'menu management',
  'hiring',
  'staff scheduling',
  'sales management',
  'profit optimization',
  'customer service',
  'guest experience',
  'service area organization',
  'teamwork',
  'hsd or ged',
  'passion for helping and serving others',
  'leadership experience',
  'motivation',
  'development of others',
  'commitment to excellence',
  'commitment to safety',
  'strong customer service',
  'support focus',
  'effective communication',
  'anticipating customer needs',
  'problem solving',
  'decisionmaking'],
 ['aviation',
  'english',
  'faa commercial pilot license',
  'ifr',
  'sel',
  'high performance',
  'complex',
  'second class medical',
  'fcc restricted radiotelephone operator permit',
  'current passport',
  "valid driver's license",
  '401k',
  'medical',
  'dental',
  'vision',
  'shortte

In [16]:
baskets.count()

13057

In [17]:
length = baskets.map(lambda x: len(x))
print(length.max())
print(length.mean())

172
20.7857854024661


In [18]:
# support = round((baskets.count())*0.01)
support = round((baskets.count())*0.02)
support

261

### Hash function



In [19]:
hash = baskets.flatMap(lambda line: line)
hash.take(5)

['employee training',
 'food safety',
 'inventory management',
 'team management',
 'kitchen supervision']

In [20]:
hash.count()

271400

In [21]:
hash = hash.distinct()
cnt = hash.count()
cnt

90309

In [22]:
hash = hash.zipWithIndex()
hash.take(5)

[('employee training', 0),
 ('food quality control', 1),
 ('leadership experience', 2),
 ('strong customer service', 3),
 ('decisionmaking', 4)]

In [23]:
hash_index = hash.collectAsMap()


In [24]:
#representing the baskets via hash value
def hashing(basket):
    return {hash_index[skill] for skill in basket}

baskets2 = baskets.map(hashing)

In [25]:
baskets2.take(3)

[{0,
  1,
  2,
  3,
  4,
  15078,
  15079,
  15080,
  15081,
  30261,
  30262,
  30263,
  30264,
  30265,
  30266,
  45103,
  45104,
  45105,
  45106,
  45107,
  45108,
  45109,
  60115,
  60116,
  60117,
  60118,
  75119,
  75120,
  75121},
 {5,
  6,
  7,
  8,
  15082,
  15083,
  30267,
  30268,
  30269,
  30270,
  45110,
  45111,
  45112,
  45113,
  45114,
  45115,
  45116,
  60119,
  60120,
  60121,
  75122,
  75123,
  75124,
  75125,
  75126,
  75127,
  75128,
  75129},
 {9,
  15084,
  15085,
  15086,
  15087,
  15088,
  15089,
  15090,
  30271,
  30272,
  30273,
  30274,
  45117,
  45118,
  45119,
  45120,
  45121,
  45122,
  45123,
  60122,
  60123,
  60124,
  60125,
  60126,
  60127,
  75130,
  75131,
  75132,
  75133,
  75134}]

# Apriori Algorithm Implementation

In [26]:
from itertools import combinations

In [27]:
def apriori_alg(baskets, support_threshold, item_map):

    print("Frequent singletons")

    fr_singletons = (baskets.flatMap(lambda basket: [(item, 1) for item in basket]).reduceByKey(lambda x, y: x + y).filter(lambda x: x[1] > support_threshold))

    singletons_count = fr_singletons.count()

    if singletons_count == 0:
        print("Threshold too high.")
        return

    print("Number of frequent singletons:", singletons_count)

    most_frequent_singleton = fr_singletons.max(lambda x: x[1])
    max_singleton_item = list(item_map.keys())[list(item_map.values()).index(most_frequent_singleton[0])]
    print("Most frequent one:", max_singleton_item)

    frequent_items = set(fr_singletons.map(lambda x: (x[0],)).collect())

    print()

    k = 2

    while True:
        print("Sets of:", k)

        candidate_sets = (baskets.flatMap(lambda basket: [(combo, 1) for combo in combinations(sorted(basket), k) if all(subset in frequent_items for subset in combinations(combo, k - 1))]).reduceByKey(lambda x, y: x + y).filter(lambda x: x[1] > support_threshold))

        candidate_count = candidate_sets.count()

        if candidate_count != 0:
            print("Frequent sets of", k, ":", candidate_count)

            most_frequent_set = candidate_sets.max(lambda x: x[1])
            most_frequent_items = [list(item_map.keys())[list(item_map.values()).index(skill)] for skill in most_frequent_set[0]]
            print("Most frequent set of", k, ":", most_frequent_items)

            frequent_items = set(candidate_sets.map(lambda x: x[0]).collect())

            print()
            k += 1

        else:
            print("No frequent sets of k", k)
            print("Stop here.")
            break

In [28]:
apriori_alg(baskets2,support,hash_index)


Frequent singletons
Number of frequent singletons: 81
Most frequent one: communication

Sets of: 2
Frequent sets of 2 : 69
Most frequent set of 2 : ['customer service', 'communication']

Sets of: 3
Frequent sets of 3 : 22
Most frequent set of 3 : ['teamwork', 'customer service', 'communication']

Sets of: 4
No frequent sets of k 4
Stop here.
