<a href="https://colab.research.google.com/github/NicoPigna/Market_Basket_Analysis_AMD/blob/main/Pignatelli_AMD.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Spark Context Creation**


In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)
spark

In [2]:
import pyspark
sc = spark.sparkContext

# **Importing the Data set from Kaggle**

In [3]:
os.environ['KAGGLE_USERNAME'] = "nicolpignatelli"
os.environ['KAGGLE_KEY'] = "a1dc46eccaf7756728ada0c3e38a937c"
!kaggle datasets download -d asaniczka/1-3m-linkedin-jobs-and-skills-2024

Downloading 1-3m-linkedin-jobs-and-skills-2024.zip to /content
 99% 1.86G/1.88G [00:17<00:00, 188MB/s]
100% 1.88G/1.88G [00:17<00:00, 112MB/s]


In [4]:
!unzip /content/1-3m-linkedin-jobs-and-skills-2024.zip -d data

Archive:  /content/1-3m-linkedin-jobs-and-skills-2024.zip
  inflating: data/job_skills.csv     
  inflating: data/job_summary.csv    
  inflating: data/linkedin_job_postings.csv  


In [5]:
!rm /content/data/linkedin_job_postings.csv
!rm /content/data/job_summary.csv
!rm /content/1-3m-linkedin-jobs-and-skills-2024.zip

In [6]:
df = spark.read.options(header=True).csv('/content/data/job_skills.csv')
df.show()

+--------------------+--------------------+
|            job_link|          job_skills|
+--------------------+--------------------+
|https://www.linke...|Building Custodia...|
|https://www.linke...|Customer service,...|
|https://www.linke...|Applied Behavior ...|
|https://www.linke...|Electrical Engine...|
|https://www.linke...|Electrical Assemb...|
|https://www.linke...|Access Control, V...|
|https://www.linke...|Consultation, Sup...|
|https://www.linke...|Veterinary Recept...|
|https://www.linke...|Optical Inspectio...|
|https://www.linke...|HVAC, troubleshoo...|
|https://www.linke...|Host/Server Assis...|
|https://www.linke...|Apartment mainten...|
|https://www.linke...|Fiber Optic Cable...|
|https://www.linke...|CT Technologist, ...|
|https://ca.linked...|SAP, DRMIS, Data ...|
|https://www.linke...|Debt and equity o...|
|https://ca.linked...|Biomedical Engine...|
|https://www.linke...|Laboratory Techni...|
|https://www.linke...|Program Managemen...|
|https://www.linke...|Hiring, Tr

# **EDA and Sampling**

In [7]:
df.printSchema()

root
 |-- job_link: string (nullable = true)
 |-- job_skills: string (nullable = true)



In [8]:
df.describe()

summary,job_link,job_skills
count,1296381,1294374
mean,,
stddev,,
min,https://ae.linked...,"""* Maintenance, *..."
max,https://za.linked...,"志愿服务, 沟通, 陪伴, 临终服..."


In [9]:
df = df.dropna()

In [10]:
df = df.sample(False,0.01,seed=14)
count = df.count()

# **Creating the baskets**

In [11]:
col = df.select('job_skills')
rdd = col.rdd

In [12]:
rdd.first()

Row(job_skills='Nursing, Physician Assistant, Oncology, Hematology, Advanced Practice Registered Nurse, Advanced Practice Provider, Prescribing, Documentation, Referrals, Consultation, Patient Advocacy, Continuing Education, Research, Quality Improvement, Public Speaking, Teaching, Preceptorship, BLS Certification, CPR Certification, Cardiology, Pulmonary Medicine, Radiation Oncology, Hematology Oncology, Medical Oncology, Patient Care, Physical Examination, Diagnosis, Treatment Planning, Medication Management, Laboratory Testing, Imaging, Patient Education, Communication, Teamwork, Problem Solving, Critical Thinking, Decision Making, Professionalism, Ethics, Cultural Competence')

In [13]:
type(rdd.first())

In [14]:
rdd.getNumPartitions()

6

In [15]:
rdd2 = rdd.map(lambda x: x['job_skills'])

In [16]:
rdd2.first()

'Nursing, Physician Assistant, Oncology, Hematology, Advanced Practice Registered Nurse, Advanced Practice Provider, Prescribing, Documentation, Referrals, Consultation, Patient Advocacy, Continuing Education, Research, Quality Improvement, Public Speaking, Teaching, Preceptorship, BLS Certification, CPR Certification, Cardiology, Pulmonary Medicine, Radiation Oncology, Hematology Oncology, Medical Oncology, Patient Care, Physical Examination, Diagnosis, Treatment Planning, Medication Management, Laboratory Testing, Imaging, Patient Education, Communication, Teamwork, Problem Solving, Critical Thinking, Decision Making, Professionalism, Ethics, Cultural Competence'

In [17]:
baskets = rdd2.map(lambda line: line.split(', '))

In [18]:
baskets.first()

['Nursing',
 'Physician Assistant',
 'Oncology',
 'Hematology',
 'Advanced Practice Registered Nurse',
 'Advanced Practice Provider',
 'Prescribing',
 'Documentation',
 'Referrals',
 'Consultation',
 'Patient Advocacy',
 'Continuing Education',
 'Research',
 'Quality Improvement',
 'Public Speaking',
 'Teaching',
 'Preceptorship',
 'BLS Certification',
 'CPR Certification',
 'Cardiology',
 'Pulmonary Medicine',
 'Radiation Oncology',
 'Hematology Oncology',
 'Medical Oncology',
 'Patient Care',
 'Physical Examination',
 'Diagnosis',
 'Treatment Planning',
 'Medication Management',
 'Laboratory Testing',
 'Imaging',
 'Patient Education',
 'Communication',
 'Teamwork',
 'Problem Solving',
 'Critical Thinking',
 'Decision Making',
 'Professionalism',
 'Ethics',
 'Cultural Competence']

In [19]:
lens = baskets.map(lambda x: len(x))

In [20]:
max_len = lens.max()
max_len

195

In [21]:
mean_len = lens.mean()
mean_len

20.859828928103553

In [22]:
tot_skills = mean_len * count
tot_skills

270697.9999999998

In [23]:
min_len = lens.min()
min_len

1

In [24]:
s = round((baskets.count())/50)
s

260

# **The hash table**

In [25]:
hash = baskets.flatMap(lambda line: line)
hash.first()

'Nursing'

In [26]:
check_tot_skills = hash.count()
check_tot_skills

270698

In [27]:
hash = hash.distinct()
skills = hash.count()
skills

100980

In [28]:
hash = hash.zipWithIndex()
hash.take(5)

[('Hematology', 0),
 ('Referrals', 1),
 ('Consultation', 2),
 ('Research', 3),
 ('Teaching', 4)]

In [29]:
hash_index = hash.collectAsMap()

In [30]:
#transforming baskets into sets

def hashing(basket):
    return {hash_index[skill] for skill in basket}

baskets = baskets.map(hashing)

In [31]:
baskets.first()

{0,
 1,
 2,
 3,
 4,
 5,
 6,
 7,
 8,
 9,
 10,
 16839,
 16840,
 16841,
 16842,
 16843,
 33721,
 33722,
 33723,
 33724,
 33725,
 33726,
 33727,
 50548,
 50549,
 50550,
 50551,
 50552,
 50553,
 67250,
 67251,
 67252,
 67253,
 67254,
 84192,
 84193,
 84194,
 84195,
 84196,
 84197}

# **A-PRIORI ALGORITHM**

In [32]:
from itertools import combinations

In [33]:
def a_priori(baskets_collection,support,hash_table):

  print("Frequent singletons")

  first_pass = baskets_collection.flatMap(lambda basket: [(skill,1) for skill in basket]) \
                                .reduceByKey(lambda x,y: x+y) \
                                .filter(lambda x: x[1]>support)

  first_pass_count = first_pass.count()

  if first_pass_count == 0:
    print("Lower the support")
    return

  print("Number of frequent singletons:",first_pass_count)

  max_s = first_pass.max(lambda x: x[1])
  max_s_ = (list(hash_table.keys())[list(hash_table.values()).index(max_s[0])])
  print("Most frequent singleton:",max_s_)

  freqs = set(first_pass.map(lambda x: (x[0],)).collect())

  print()

  k = 2

  while True:

    print("Itemesets of size:",k)

    pass_ = baskets_collection.flatMap(lambda basket:[(elem,1) for elem in combinations(sorted(basket),k) if
                                                      all(item in freqs for item in combinations(elem,k-1))]) \
                              .reduceByKey(lambda x,y: x+y) \
                              .filter(lambda x: x[1]>support)

    pass_count = pass_.count()

    if pass_count != 0:
      print("Number of frequent itemsets of size",k,"are:",pass_count)

      max_p = pass_.max(lambda x: x[1])
      max_itemset = []
      for skill in max_p[0]:
        max_itemset.append(list(hash_table.keys())[list(hash_table.values()).index(skill)])
      print("Most frequent itemset of size",k,"is composed by:",max_itemset)

      freqs = set(pass_.map(lambda x: x[0]).collect())

      print()

      k+=1

    else:
      print("There are no frequent itemsets of size",k)
      print("Given the monotonicity of itemsets, there are no more frequent itemsets.")
      break

In [34]:
a_priori(baskets,s,hash_index)

Frequent singletons
Number of frequent singletons: 71
Most frequent singleton: Communication

Itemesets of size: 2
Number of frequent itemsets of size 2 are: 48
Most frequent itemset of size 2 is composed by: ['Teamwork', 'Communication']

Itemesets of size: 3
Number of frequent itemsets of size 3 are: 7
Most frequent itemset of size 3 is composed by: ['Teamwork', 'Communication', 'Leadership']

Itemesets of size: 4
There are no frequent itemsets of size 4
Given the monotonicity of itemsets, there are no more frequent itemsets.
