# Set up enviornment, access Kaggle and download data

### Enviornment setup

In [29]:
!pip install pyspark
import pyspark
import os
import timeit
import matplotlib.pyplot as plt
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("MarketBasket").getOrCreate()
sc = spark.sparkContext



### Download and import data

In [3]:
# Access Kaggle
os.environ['KAGGLE_USERNAME'] = "xxxxxx"
os.environ['KAGGLE_KEY'] = "xxxxxx"

# Data download
!kaggle datasets download -d asaniczka/1-3m-linkedin-jobs-and-skills-2024
!unzip 1-3m-linkedin-jobs-and-skills-2024 -d data
# remove unnecessary data
!rm /content/data/linkedin_job_postings.csv
!rm /content/data/job_summary.csv
!rm /content/1-3m-linkedin-jobs-and-skills-2024.zip

Dataset URL: https://www.kaggle.com/datasets/asaniczka/1-3m-linkedin-jobs-and-skills-2024
License(s): ODC Attribution License (ODC-By)
Downloading 1-3m-linkedin-jobs-and-skills-2024.zip to /content
 99% 1.86G/1.88G [00:14<00:00, 157MB/s]
100% 1.88G/1.88G [00:14<00:00, 135MB/s]
Archive:  1-3m-linkedin-jobs-and-skills-2024.zip
  inflating: data/job_skills.csv     
  inflating: data/job_summary.csv    
  inflating: data/linkedin_job_postings.csv  


In [5]:
# import data
df = spark.read.option('header','true').csv('/content/data/job_skills.csv')

### Exploratory Data Analysis (EDA)

In [6]:
# check the data
print(df.head())
print(df.printSchema())
print(df.show())

Row(job_link='https://www.linkedin.com/jobs/view/housekeeper-i-pt-at-jacksonville-state-university-3802280436', job_skills='Building Custodial Services, Cleaning, Janitorial Services, Materials Handling, Housekeeping, Sanitation, Waste Management, Floor Maintenance, Equipment Maintenance, Safety Protocols, Communication Skills, Attention to Detail, Physical Strength, Experience in Housekeeping')
root
 |-- job_link: string (nullable = true)
 |-- job_skills: string (nullable = true)

None
+--------------------+--------------------+
|            job_link|          job_skills|
+--------------------+--------------------+
|https://www.linke...|Building Custodia...|
|https://www.linke...|Customer service,...|
|https://www.linke...|Applied Behavior ...|
|https://www.linke...|Electrical Engine...|
|https://www.linke...|Electrical Assemb...|
|https://www.linke...|Access Control, V...|
|https://www.linke...|Consultation, Sup...|
|https://www.linke...|Veterinary Recept...|
|https://www.linke...|Op

In [7]:
df.describe()

DataFrame[summary: string, job_link: string, job_skills: string]

In [8]:
df = df.dropna() # 2007 NAs

In [9]:
df = df.drop("job_link") # we do not need this column

In [10]:
# Associate the data to a Resilient Distributed Dataset (RDD)
df_rdd = df.rdd

In [11]:
df_rdd.first()

Row(job_skills='Building Custodial Services, Cleaning, Janitorial Services, Materials Handling, Housekeeping, Sanitation, Waste Management, Floor Maintenance, Equipment Maintenance, Safety Protocols, Communication Skills, Attention to Detail, Physical Strength, Experience in Housekeeping')

In [13]:
df_rdd.getNumPartitions() # default is six partitions

6

In [14]:
df_rdd.first() # got to isolate the skills

Row(job_skills='Building Custodial Services, Cleaning, Janitorial Services, Materials Handling, Housekeeping, Sanitation, Waste Management, Floor Maintenance, Equipment Maintenance, Safety Protocols, Communication Skills, Attention to Detail, Physical Strength, Experience in Housekeeping')

In [43]:
skills = df_rdd.map(lambda x: x['job_skills'])
skills.first()

'Building Custodial Services, Cleaning, Janitorial Services, Materials Handling, Housekeeping, Sanitation, Waste Management, Floor Maintenance, Equipment Maintenance, Safety Protocols, Communication Skills, Attention to Detail, Physical Strength, Experience in Housekeeping'

### Basket creation

In [44]:
skills = skills.map(lambda word: word.lower())
skills = skills.map(lambda line: line.split(', '))
skills.first()

['building custodial services',
 'cleaning',
 'janitorial services',
 'materials handling',
 'housekeeping',
 'sanitation',
 'waste management',
 'floor maintenance',
 'equipment maintenance',
 'safety protocols',
 'communication skills',
 'attention to detail',
 'physical strength',
 'experience in housekeeping']

In [45]:
lens = skills.map(lambda x: len(x))
print(f"Min number of skills required is {lens.min()}") # these take forever (1)
print(f"Max number of skills required is {lens.max()}") # (463)

Min number of skills required is 1
Max number of skills required is 463


In [32]:
plt.hist(lens.collect(), bins=100, edgecolor='black') # .collect is plottable
plt.xlabel("Number of skills ")
plt.ylabel('Frequency')
plt.title('Number of skills for each job')
plt.show()

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

### Hashing
I transform the skills names in integers by hashing

In [46]:
ht = skills.flatMap(lambda line: line)
ht.first()

'building custodial services'

In [47]:
tot_skills = ht.count()
tot_skills

26908850

In [48]:
ht = ht.zipWithIndex()
ht.take(5)

[('building custodial services', 0),
 ('cleaning', 1),
 ('janitorial services', 2),
 ('materials handling', 3),
 ('housekeeping', 4)]

In [49]:
ht_index = ht.collectAsMap() # this does not work

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 31.0 failed 1 times, most recent failure: Lost task 0.0 in stage 31.0 (TID 74) (63b3a47196f8 executor driver): java.lang.OutOfMemoryError: Java heap space

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2414)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2433)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2458)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1049)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1048)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:195)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.OutOfMemoryError: Java heap space


In [None]:
def hashing(basket):
    return {ht_index[skill] for skill in skills}

baskets = skills.map(hashing)