<a href="https://colab.research.google.com/github/dariashcherbakovaaa/Algorithms-for-massive-data/blob/main/MBA_Shcherbakova.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Market Basket Analysis on LinkedIn job skills and job links

###### **Daria Shcherbakova** *(DSE student, 17487A)*

## Settings

In [73]:
import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning)

!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").appName('Algo_Aprori').getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark
import pyspark
type(spark)

sc = spark.sparkContext
spark

In [75]:
# os.environ['KAGGLE_USERNAME'] = "xxxxxx"
# os.environ['KAGGLE_KEY'] = "xxxxxx"

In [76]:
!kaggle datasets download -d asaniczka/1-3m-linkedin-jobs-and-skills-2024
!unzip 1-3m-linkedin-jobs-and-skills-2024.zip -d job_skills

Dataset URL: https://www.kaggle.com/datasets/asaniczka/1-3m-linkedin-jobs-and-skills-2024
License(s): ODC Attribution License (ODC-By)
1-3m-linkedin-jobs-and-skills-2024.zip: Skipping, found more recently modified local copy (use --force to force download)
Archive:  1-3m-linkedin-jobs-and-skills-2024.zip
replace job_skills/job_skills.csv? [y]es, [n]o, [A]ll, [N]one, [r]ename: y
  inflating: job_skills/job_skills.csv  y
y

replace job_skills/job_summary.csv? [y]es, [n]o, [A]ll, [N]one, [r]ename:   inflating: job_skills/job_summary.csv  y


In [None]:
data_path = '/content/job_skills/job_skills.csv'
df_skills = spark.read.csv(data_path, inferSchema=True, header=True)
df_skills.printSchema()
df_skills.show()

In [None]:
data_path = '/content/job_skills/linkedin_job_postings.csv'
df_all = spark.read.csv(data_path, inferSchema=True, header=True)
df_all.printSchema()
df_all.show()

In [None]:
# os.remove('/content/1-3m-linkedin-jobs-and-skills-2024.zip')
# os.remove('/content/job_skills/job_skills.csv')
# os.remove('/content/job_skills/job_summary.csv')
# os.remove('/content/job_skills/linkedin_job_postings.csv')
# !rmdir /content/job_skills/

## Data pre-processing

#### Additional dataset to select actual job

In [None]:
df_all = df_all[['job_link', 'job_title', 'search_country', 'search_position', 'job_level']]
df_all.show(10)

In [None]:
df_all.describe()

In [None]:
df_all = df_all.where((df_all['job_level'] == 'Mid senior') & (df_all['search_country'] == 'United States'))
df_all.show(10)

In [None]:
df_all.groupBy('job_title').count().orderBy('count', ascending=False).show()

In [None]:
import re

pattern_danalyst = r'\bdata\sanal\w*'
pattern_banalyst = r'\bbusiness\sanal\w*'
#pattern_analyst = r'\banal\w*'
pattern_scientist = r'\bdata\sscientist\w*'
pattern_engineer = r'\bdata\sengin\w*'

In [None]:
from pyspark.sql.functions import col, lower, regexp_extract

df_all_sampled = df_all.filter(
    (lower(col('job_title')).rlike(pattern_danalyst)) |
    (lower(col('job_title')).rlike(pattern_banalyst)) |
#    (lower(col('job_title')).rlike(pattern_analyst)) |
    (lower(col('job_title')).rlike(pattern_scientist)) |
    (lower(col('job_title')).rlike(pattern_engineer))
)
print((df_all_sampled.count(), len(df_all_sampled.columns)))

In [None]:
new_names = ['link', 'job_title', 'country', 'serach', 'level']

df_all_sampled = df_all_sampled.toDF(*new_names)
df_all_sampled.limit(5).toPandas()

#### The main dataset with links and skills

In [None]:
new_names = ['link', 'skill']

df_skills = df_skills.toDF(*new_names)
df_skills.show(5)

In [None]:
df_skills.describe()

In [None]:
df_skills = df_skills.dropna() # drop empty, NA cells
df_skills = df_skills.dropDuplicates() # drop repeated cells

df_skills.describe()

#### data merging and sampling

In [None]:
df_skills.groupBy('skill').count().orderBy('count', ascending=False).show() # first try to check the most "frequent" skills in dataset

In [None]:
data = df_all_sampled.join(df_skills,['link'],how='inner')
data.count()

In [None]:
data.show()

In [None]:
data = data[['link', 'skill']]
data.show(5)

#### EDA

In [None]:
import pandas as pd

In [None]:
df_pd = data.toPandas()
df_pd.shape

In [None]:
skills = df_pd['skill'].str.split(', ').explode()
df = pd.DataFrame(skills, index=None)
df.shape

In [None]:
counting = df.value_counts().sort_values(ascending = 0)
counting

In [None]:
counting[:15].plot.bar()

## BMA

#### Data transformation into baskets and items

In [None]:
skills = data.select('skill') # don't need links
rdd = skills.rdd # insert data in spark
rdd = rdd.map(lambda x: x['skill']) # transformation
rdd.take(1)

yes, it's dirty, but...

In [None]:
rdd.getNumPartitions() # in how many "part" (partitions) we may "split out data"

In [None]:
baskets = rdd.map(lambda line: line.split(', ')) # 1 description = 1 basket with all skills as items
baskets.take(1) # 1 element = list from the baskets

### Baskets' EDA

In [None]:
lenghts = baskets.map(lambda x: len(x))
print(f" Max number of items in the basket is: {lenghts.max()}\n")
print(f"The average number of items in the basket is: {lenghts.mean()}\n")
print(f"The total number of baskets is: {baskets.count()}\n")
print(f"The approximate total number of items is:{round(lenghts.mean() * baskets.count())}")

In [None]:
s = round(baskets.count() * 0.01) # 1% of total nymber of baskets
s

### HashTable

In [None]:
hash = baskets.flatMap(lambda line: line).distinct()
              #flat the results into a single RDD
hash.take(5)

In [None]:
hash_index = hash.zipWithIndex().collectAsMap()

In [None]:
hash.count()

In [None]:
def hashing(basket):
    return {hash_index[skill] for skill in basket}

In [None]:
hashed_baskets = baskets.map(hashing)
print(hashed_baskets.take(1))

### A-PRIORY

##### The first pass (count occurencies of each item)

In [None]:
first_pass = hashed_baskets.flatMap(lambda basket:[(e,1) for e in basket]) \
                .reduceByKey(lambda x,y:x+y) \
                .filter(lambda x:x[1]>s)

print("remaining singleton", first_pass.count())
print("5 random singleton", first_pass.take(5))

In [None]:
# to count all pair composed of frequent singletons
from itertools import combinations

frequent_singletons = set(first_pass.map(lambda x:x[0]).collect())
second_pass = hashed_baskets.flatMap(lambda basket:[(e,1) for e in combinations(sorted(basket),2)]) \
                 .filter(lambda x: x[0][0] in frequent_singletons) \
                 .filter(lambda x: x[0][1] in frequent_singletons) \
                 .reduceByKey(lambda x,y: x+y) \
                 .filter(lambda x:x[1]>s)

print(second_pass.count())

In [None]:
frdd = hashed_baskets.flatMap(lambda basket:[(e,1) for e in basket]) \
          .reduceByKey(lambda x,y:x+y) \
          .filter(lambda x:x[1] > s)

frequent = set(first_pass.map(lambda x:(x[0],)).collect())

print(f"remaining: {len(frequent)}, frdd {frdd.take(5)}")

k = 2
while frdd.count() != 0:
    frdd = hashed_baskets.flatMap(lambda basket: [(x,1) for x in combinations(sorted(basket),k)])\
              .filter(lambda x: all([y in frequent for y in combinations(x[0],len(x[0])-1)])) \
              .reduceByKey(lambda x,y:x+y) \
              .filter(lambda x:x[1] > s)

    frequent = set(frdd.map(lambda x:x[0]).collect())
    print(k, len(frequent), frdd.take(5))
    k += 1