# **Preparations on environment, dataset**

In [1]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m1.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=2a55987dc81d3aaf2efb492a7b2ebff672287e60c96de99dc93562a25eb66f79
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [2]:
from google.colab import files

from google.colab import userdata
import os
import zipfile

import re
import string
from nltk.corpus import stopwords
from nltk.stem import PorterStemmer
from nltk.tokenize import word_tokenize
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, expr, array, monotonically_increasing_id, explode,md5, concat_ws,collect_list, collect_set
from pyspark.sql.functions import avg, max, min, sum
from pyspark.sql.types import StructType, StructField, ArrayType, StringType, FloatType, IntegerType, ArrayType, LongType
import hashlib
from collections import defaultdict
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import sys
import random

import binascii
import random
from sympy import nextprime
from pyspark.sql.functions import udf, col
from pyspark.ml.feature import MinHashLSH
from pyspark.ml.linalg import DenseVector, VectorUDT

In [3]:
# upload Kaggle API token
# in case uploading json file from local environment
'''
files.upload()

# Set Up Kaggle Environment
import os
import zipfile

# Create the .kaggle directory and move the kaggle.json file there
!mkdir -p ~/.kaggle
!mv kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json

# Install Kaggle API
!pip install kaggle
'''

'\nfiles.upload()\n\n# Set Up Kaggle Environment\nimport os\nimport zipfile\n\n# Create the .kaggle directory and move the kaggle.json file there\n!mkdir -p ~/.kaggle\n!mv kaggle.json ~/.kaggle/\n!chmod 600 ~/.kaggle/kaggle.json\n\n# Install Kaggle API\n!pip install kaggle\n'

In [4]:
# link to kaggle direcly
# in case loading user data directly from colab secrets

os.environ["KAGGLE_KEY"] = userdata.get('KAGGLE_KEY')
os.environ["KAGGLE_USERNAME"] = userdata.get('KAGGLE_USERNAME')

# download and Unzip Dataset
!kaggle datasets download -d asaniczka/1-3m-linkedin-jobs-and-skills-2024

with zipfile.ZipFile("1-3m-linkedin-jobs-and-skills-2024.zip", 'r') as zip_ref:
    zip_ref.extractall(".")

Dataset URL: https://www.kaggle.com/datasets/asaniczka/1-3m-linkedin-jobs-and-skills-2024
License(s): ODC Attribution License (ODC-By)
Downloading 1-3m-linkedin-jobs-and-skills-2024.zip to /content
100% 1.87G/1.88G [00:32<00:00, 27.9MB/s]
100% 1.88G/1.88G [00:32<00:00, 61.1MB/s]


# **Spark session and preprocessing**

In [5]:
# start time
import time
start_time = time.time()

In [6]:
# initialize Spark Session
# the session is necessary to start one time only
from pyspark.sql import SparkSession

if 'spark' in locals():
    spark.stop()

# initialize a new Spark session with all necessary configurations
# extremely important when dealing with limited resource local machines
spark = SparkSession.builder \
    .appName("LinkedIn Jobs Analysis") \
    .master("local[4]") \
    .config("spark.executor.memory", "10g") \
    .config("spark.driver.memory", "10g") \
    .config("spark.python.worker.memory", "10g") \
    .config("spark.dynamicAllocation.enabled", "true") \
    .config("spark.executor.extraJavaOptions", "-XX:+UseG1GC") \
    .getOrCreate()

spark

In [7]:
# setting the number of shuffle partitions to 200
# spark.conf.set("spark.sql.shuffle.partitions", "200")
# optional tests

In [8]:
# ensure nltk resources are downloaded
import nltk
nltk.download('stopwords')
nltk.download('punkt')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.
[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.


True

In [9]:
# define the schema
schema = StructType([
    StructField("job_link", StringType(), True),
    StructField("job_summary", StringType(), True)
])

In [10]:
# load dataset with Spark
sampled_df = spark.read.csv(
    'job_summary.csv',
    header=True,
    schema=schema,
    quote='"',
    escape='"',
    multiLine=True,
    ignoreLeadingWhiteSpace=True,
    ignoreTrailingWhiteSpace=True,
    mode="PERMISSIVE",
    sep=","
).limit(200)

#sample(fraction=0.0001, seed=42)

In [11]:
# adding Index in schema
schema = StructType([
    StructField("Index", IntegerType(), True),
    StructField("job_link", StringType(), True),
    StructField("job_summary", StringType(), True)
])

In [12]:
# add an index column
sampled_df = sampled_df.withColumn("index", monotonically_increasing_id())

In [13]:
# display the first few lines of the df
print("spark dataFrame:")
sampled_df.show(5, vertical=True, truncate=False)

spark dataFrame:
-RECORD 0--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

https://spark.apache.org/docs/latest/sql-ref-datatypes.html

https://spark.apache.org/docs/3.1.3/api/python/reference/api/pyspark.sql.functions.monotonically_increasing_id.html

In [14]:
#sampled_df.describe().show()

In [15]:
# text processing function
def preprocess_text(text):
    if text is None:
        return ""
    # convert to lowercase and remove punctuation
    text = re.sub(r'[^\w\s]', '', text.lower())
    # tokenizing
    words = word_tokenize(text)
    # remove stopwords and stem
    stop_words = set(stopwords.words('english'))
    ps = PorterStemmer()
    return ' '.join(ps.stem(word) for word in words if word not in stop_words)

In [16]:
# UDF for text preprocessing
preprocess_udf = udf(preprocess_text, StringType())

PicklingError: args[0] from __newobj__ args has the wrong class
Run again it will pass

In [18]:
# text processing to the job_summary
sampled_df = sampled_df.withColumn('processed_text', preprocess_udf(col('job_summary')))

In [19]:
# print a sample of the processed text
print("\nafter text preprocessing (first 2 rows):")
sampled_df.show(2, vertical=True, truncate=False)


after text preprocessing (first 2 rows):
-RECORD 0-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [20]:
# define shingling function
def get_shingles(text, k=10):
    text = str(text).lower()
    return list(set(text[i:i+k] for i in range(len(text) - k + 1)))

shingles_udf = udf(get_shingles, ArrayType(StringType()))

In [21]:
# apply shingling
sampled_df = sampled_df.withColumn("shingles", shingles_udf(col("processed_text")))

In [22]:
# sampled_df.select('shingles').show(5, truncate=False)
# optional

In [23]:
sampled_df.show(2, vertical=True, truncate=False)

-RECORD 0-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [24]:
# hash functions
h_functions = 100
params = []
for _ in range(h_functions):
    a = random.randint(1, 10000)
    b = random.randint(1, 10000)
    max_ab = a if a > b else b
    c = nextprime(max_ab + 1)
    params.append({"a": a, "b": b, "c": c})

In [25]:
def multiple(x, a, b, c):
    return (a * x + b) % c

In [26]:
# minhash function
def minhash(shingles, num_hashes=100):
    if not shingles:
        return [2**31-1] * num_hashes
    min_hashes = [2**31-1] * num_hashes
    for shingle in shingles:
        hash_val = binascii.crc32(shingle.encode('utf-8')) & 0xffffffff
        for i in range(num_hashes):
            a, b, c = params[i]['a'], params[i]['b'], params[i]['c']
            hash_value = multiple(hash_val, a, b, c)
            if hash_value < min_hashes[i]:
                min_hashes[i] = hash_value
    return min_hashes

In [27]:
minhash_udf = udf(lambda x: minhash(x, h_functions), ArrayType(IntegerType()))

In [28]:
# minHash signatures
sampled_df = sampled_df.withColumn("minhash", minhash_udf(col("shingles")))

In [29]:
#sampled_df.select('minhash').show(5, truncate=False)
#optional

In [30]:
sampled_df.show(2, vertical=True, truncate=False)

-RECORD 0-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [31]:
# number of bands and rows per band
b_bands = 10  # number of bands
r_rows = h_functions // b_bands  # rows per band

In [32]:
# threshold
threshold = (1 / b_bands) ** (1 / r_rows)
print(f"threshold: {threshold}")

threshold: 0.7943282347242815


In [33]:
# hash each band
def hash_band(band):
    band_str = ''.join(map(str, band))
    return binascii.crc32(band_str.encode('utf-8'))

In [34]:
def split_bands(minhash, b_bands, r_rows):
    bands = []
    for b in range(b_bands):
        start = b * r_rows
        end = start + r_rows
        band = minhash[start:end]
        bands.append(hash_band(band))
    return bands

split_bands_udf = udf(lambda x: split_bands(x, b_bands, r_rows), ArrayType(StringType()))

In [35]:
# apply LSH
sampled_df = sampled_df.withColumn("bands", split_bands_udf(col("minhash")))
sampled_df = sampled_df.withColumn("band", explode(col("bands")))

In [38]:
sampled_df.show(2, vertical=True, truncate=False)

-RECORD 0-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [36]:
# group by band and collect indices in each band
band_buckets_df = sampled_df.groupBy("band").agg(collect_set("index").alias("indices"))

In [37]:
band_buckets_df

DataFrame[band: string, indices: array<bigint>]

In [38]:
band_buckets_df.show(5)


+----------+-------+
|      band|indices|
+----------+-------+
|1007718899|   [82]|
|1008358916|   [60]|
|1009732799|   [87]|
|1013546456|   [19]|
|1014271681|   [34]|
+----------+-------+
only showing top 5 rows



In [60]:
# UDF to find candidate pairs
def find_candidate_pairs(indices):
    if not indices:
        return []
    candidates = set()
    if len(indices) > 1:
        for i in range(len(indices)):
            for j in range(i + 1, len(indices)):
                pair = tuple(sorted((indices[i], indices[j])))
                candidates.add(pair)
    return list(candidates)

find_candidate_pairs_udf = udf(find_candidate_pairs, schema)


In [61]:
schema = ArrayType(StructType([
    StructField("index_A", IntegerType(), True),
    StructField("index_B", IntegerType(), True)
]))

In [62]:
find_candidate_pairs_udf = udf(find_candidate_pairs, schema)

In [63]:
candidates_df = band_buckets_df.withColumn("candidate_pairs", find_candidate_pairs_udf(col("indices")))


In [66]:
candidates_df.show(5, vertical=True, truncate=False)

-RECORD 0---------------------
 band            | 1007718899 
 indices         | [82]       
 candidate_pairs | []         
-RECORD 1---------------------
 band            | 1008358916 
 indices         | [60]       
 candidate_pairs | []         
-RECORD 2---------------------
 band            | 1009732799 
 indices         | [87]       
 candidate_pairs | []         
-RECORD 3---------------------
 band            | 1013546456 
 indices         | [19]       
 candidate_pairs | []         
-RECORD 4---------------------
 band            | 1014271681 
 indices         | [34]       
 candidate_pairs | []         
only showing top 5 rows



In [67]:
# explode the candidate pairs to make them separate rows
candidates_df = candidates_df.withColumn("candidate_pair", explode("candidate_pairs")).select(
    col("candidate_pair.index_A").alias("index_A"),
    col("candidate_pair.index_B").alias("index_B")
)

In [68]:
candidates_df.show(5,vertical=True,truncate=False)

-RECORD 0------
 index_A | 16  
 index_B | 175 
-RECORD 1------
 index_A | 48  
 index_B | 52  
-RECORD 2------
 index_A | 29  
 index_B | 68  
-RECORD 3------
 index_A | 49  
 index_B | 59  
-RECORD 4------
 index_A | 59  
 index_B | 141 
only showing top 5 rows



In [79]:
candidates_df = candidates_df.dropDuplicates(['index_A', 'index_B'])

In [80]:
# alias df before the join
df1 = sampled_df.alias("df1")
df2 = sampled_df.alias("df2")

# join df
joined_df = candidates_df \
    .join(df1, col("df1.index") == col("index_A")) \
    .join(df2, col("df2.index") == col("index_B"))


In [81]:
joined_df = joined_df.select(
    col("df1.index").alias("index_A"),
    col("df1.job_summary").alias("job_summary_A"),
    col("df2.index").alias("index_B"),
    col("df2.job_summary").alias("job_summary_B"),
).distinct()


In [82]:
joined_df.show(4, vertical=True, truncate=False)

-RECORD 0-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

13 min so far

In [None]:
joined_df.count()

In [84]:
unique_df = joined_df.dropDuplicates(['index_A', 'index_B'])

In [85]:
unique_df.show(5,vertical=True,truncate=False)

-RECORD 0-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

23 min so far

In [None]:
unique_df.count()


In [87]:
# end time and total runtime
end_time = time.time()
total_time = end_time - start_time

In [78]:
# time to a readable format
hours, rem = divmod(total_time, 3600)
minutes, seconds = divmod(rem, 60)
print("Total runtime: {:0>2}:{:0>2}:{:05.2f}".format(int(hours), int(minutes), seconds))

Total runtime: 01:46:20.90
