In [1]:
!pip install kaggle pyspark
!pip install findspark
import warnings
warnings.filterwarnings("ignore")

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=a5df936e005edad703136626003f90c519012eaa89ee58afb6aead4d4aba7691
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1
Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [2]:
#connecting to Kaggle
import os
os.environ['KAGGLE_USERNAME'] = 'xxxxxxxxx'  # Replace with your Kaggle usernameos.environ['KAGGLE_KEY'] = 'xxxxxxxxx'  # Replace with your Kaggle key

In [3]:
#downloading the dataset
!kaggle datasets download -d asaniczka/1-3m-linkedin-jobs-and-skills-2024

Downloading 1-3m-linkedin-jobs-and-skills-2024.zip to /content
 99% 1.86G/1.88G [00:20<00:00, 41.5MB/s]
100% 1.88G/1.88G [00:21<00:00, 95.8MB/s]


In [4]:
#extracting the download zip to a folder named dataset
import zipfile
with zipfile.ZipFile('1-3m-linkedin-jobs-and-skills-2024.zip', 'r') as zip_ref:
    zip_ref.extractall('dataset')

In [5]:
#setting path for dataset
file_path = 'dataset/job_summary.csv'

In [6]:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType
import re
import hashlib
import numpy as np
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer, MinHashLSH
from pyspark.ml.linalg import Vectors
from pyspark.ml.pipeline import Pipeline
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import monotonically_increasing_id


In [7]:
# Initialize SparkSession
from pyspark.sql import SparkSession

spark = SparkSession.builder \
        .master("local") \
        .appName("JobDescriptionSimilarity") \
        .getOrCreate()

In [8]:
spark.sparkContext

In [9]:
from pyspark.sql.types import StructType, StructField, StringType

#Reading the dataset
schema = StructType([
    StructField("job_link", StringType(), True),
    StructField("job_summary", StringType(), True)
])

df = spark.read \
    .option("header", "true") \
    .option("multiLine", "true") \
    .option("escape", "\"") \
    .option("quote", "\"") \
    .schema(schema) \
    .csv(file_path)


In [10]:
# Printing the top 5 rows using .show() function
df.show(5, truncate=False)

+---------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [11]:
#checking the Schema of the df
df.printSchema()

root
 |-- job_link: string (nullable = true)
 |-- job_summary: string (nullable = true)



In [12]:
#extracting number of rows from the df dataframe
df.count()

1297332

In [13]:
df.describe()

DataFrame[summary: string, job_link: string, job_summary: string]

In [14]:
#performing a summary statistics calculation on the DataFrame df
df.describe().show()

+-------+--------------------+--------------------+
|summary|            job_link|         job_summary|
+-------+--------------------+--------------------+
|  count|             1297332|             1297332|
|   mean|                NULL|                NULL|
| stddev|                NULL|                NULL|
|    min|https://ae.linked...|! CURRENTLY SEEKI...|
|    max|https://za.linked...|🪠 We invite full...|
+-------+--------------------+--------------------+



In [15]:
!pip install pandas



In [16]:
!pip install datasketch

Collecting datasketch
  Downloading datasketch-1.6.4-py3-none-any.whl (88 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/88.3 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━━[0m [32m81.9/88.3 kB[0m [31m2.3 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m88.3/88.3 kB[0m [31m2.1 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: datasketch
Successfully installed datasketch-1.6.4


In [17]:
import pandas as pd
from datasketch import MinHash, MinHashLSH


# Preprocessing function to tokenize and clean text
def preprocess_text(text):
    # Tokenizing the text and removing punctuation/stopwords
    tokens = text.lower().split()
    tokens = [token.strip(",.!?") for token in tokens]
    return tokens

In [18]:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType
from pyspark.ml.feature import Tokenizer

# Register preprocessing function as a UDF
preprocess_text_udf = udf(preprocess_text, ArrayType(StringType()))

# Apply preprocessing
df = df.withColumn("preprocessed_text", preprocess_text_udf("job_summary"))


In [19]:
#visualizing the preprocessed_text in the df
df.show()

+--------------------+--------------------+--------------------+
|            job_link|         job_summary|   preprocessed_text|
+--------------------+--------------------+--------------------+
|https://www.linke...|Rock N Roll Sushi...|[rock, n, roll, s...|
|https://www.linke...|Schedule\n: PRN i...|[schedule, :, prn...|
|https://www.linke...|Description\nIntr...|[description, int...|
|https://uk.linked...|Commercial accoun...|[commercial, acco...|
|https://www.linke...|Address:\nUSA-CT-...|[address:, usa-ct...|
|https://www.linke...|Description\nOur\...|[description, our...|
|https://www.linke...|Company Descripti...|[company, descrip...|
|https://uk.linked...|An exciting oppor...|[an, exciting, op...|
|https://www.linke...|Job Details:\nJob...|[job, details:, j...|
|https://www.linke...|Our\nRestaurant T...|[our, restaurant,...|
|https://www.linke...|Our General Manag...|[our, general, ma...|
|https://www.linke...|Earning potential...|[earning, potenti...|
|https://www.linke...|Dol

In [20]:
# Function for Generating shingles from text
def generate_shingles(text, k=3):
    tokens = preprocess_text(text)
    shingles = set()
    for i in range(len(tokens) - k + 1):
        shingle = " ".join(tokens[i:i+k])
        shingles.add(shingle)
    return shingles

In [21]:
# Define a UDF for generating shingles
generate_shingles_udf = udf(lambda text: list(generate_shingles(" ".join(text))), ArrayType(StringType()))

# Apply the UDF to generate shingles
df = df.withColumn("shingles", generate_shingles_udf("preprocessed_text"))

In [22]:
#visualizing the shingles in the df
df.show()

+--------------------+--------------------+--------------------+--------------------+
|            job_link|         job_summary|   preprocessed_text|            shingles|
+--------------------+--------------------+--------------------+--------------------+
|https://www.linke...|Rock N Roll Sushi...|[rock, n, roll, s...|[manager as our, ...|
|https://www.linke...|Schedule\n: PRN i...|[schedule, :, prn...|[with or without,...|
|https://www.linke...|Description\nIntr...|[description, int...|[of our team, gen...|
|https://uk.linked...|Commercial accoun...|[commercial, acco...|[deal with client...|
|https://www.linke...|Address:\nUSA-CT-...|[address:, usa-ct...|[which is sensiti...|
|https://www.linke...|Description\nOur\...|[description, our...|[trademark of the...|
|https://www.linke...|Company Descripti...|[company, descrip...|[waiting for you,...|
|https://uk.linked...|An exciting oppor...|[an, exciting, op...|[work our benefit...|
|https://www.linke...|Job Details:\nJob...|[job, detai

In [23]:
# Functions for Generating MinHash signatures
def generate_minhash_signature(shingles, num_perm=128 , pd=1):
    m = MinHash(num_perm=num_perm)
    for shingle in shingles:
        m.update(shingle.encode('utf8'))
    if pd == 1:
      return m
    else:
      return m.hashvalues.tolist()




In [24]:
# Define a UDF for generating MinHash signatures
generate_minhash_signature_udf = udf(lambda shingles: generate_minhash_signature(shingles,pd=0), ArrayType(IntegerType()))

# Apply the UDF to generate MinHash signatures
df = df.withColumn("minhash_signature", generate_minhash_signature_udf("shingles"))



In [25]:
#visualizing the minhash_signatures in the df
df.show()

+--------------------+--------------------+--------------------+--------------------+--------------------+
|            job_link|         job_summary|   preprocessed_text|            shingles|   minhash_signature|
+--------------------+--------------------+--------------------+--------------------+--------------------+
|https://www.linke...|Rock N Roll Sushi...|[rock, n, roll, s...|[manager as our, ...|[11828880, 127397...|
|https://www.linke...|Schedule\n: PRN i...|[schedule, :, prn...|[with or without,...|[24590172, 338962...|
|https://www.linke...|Description\nIntr...|[description, int...|[of our team, gen...|[2339457, 405833,...|
|https://uk.linked...|Commercial accoun...|[commercial, acco...|[deal with client...|[12111891, 770449...|
|https://www.linke...|Address:\nUSA-CT-...|[address:, usa-ct...|[which is sensiti...|[16115718, 241234...|
|https://www.linke...|Description\nOur\...|[description, our...|[trademark of the...|[5596544, 1689762...|
|https://www.linke...|Company Descrip

In [26]:
# Defining function to Create MinHash LSH index
def create_lsh_index(data, num_perm=128, threshold=0.5):
    lsh = MinHashLSH(threshold=threshold, num_perm=num_perm)
    minhashes = {}
    for index, row in data.iterrows():
        shingles = generate_shingles(row['job_summary'])
        minhash = generate_minhash_signature(shingles, num_perm)
        minhashes[index] = minhash
        lsh.insert(index, minhash)
    return lsh, minhashes

In [27]:
# Function to Find similar job descriptions using LSH
def find_similar_jobs(data, lsh, minhashes, num_perm=128):
    similar_pairs = []
    for index, row in data.iterrows():
        shingles = generate_shingles(row['job_summary'])
        minhash = generate_minhash_signature(shingles, num_perm)
        candidates = lsh.query(minhash)
        for candidate in candidates:
            if candidate != index:
                jaccard = minhashes[index].jaccard(minhashes[candidate])
                if jaccard > 0.5:
                    similar_pairs.append((index, candidate))
    return similar_pairs



In [29]:
chunk_size = 10000
lsh = None
minhashes = {}
similar_pairs = []

df_similar_pairs = pd.DataFrame(columns=['RowNumber1', 'JobSummary1','RowNumber2',  'JobSummary2'])

# Initialize an empty list to collect dictionaries.
data_to_append = []

for chunk in pd.read_csv(file_path, chunksize=chunk_size):
    if lsh is None:
        lsh, minhashes = create_lsh_index(chunk)
    else:
        for index, row in chunk.iterrows():
            shingles = generate_shingles(row['job_summary'])
            minhash = generate_minhash_signature(shingles)
            minhashes[index] = minhash
            lsh.insert(index, minhash)
    for pair in find_similar_jobs(chunk, lsh, minhashes):
        idx1, idx2 = pair
        if pair not in similar_pairs:
            job_summary1 = chunk.iloc[idx1]['job_summary']
            job_summary2 = chunk.iloc[idx2]['job_summary']
            similar_pairs.append(pair)
            # Append to the list instead of DataFrame.
            data_to_append.append({
                'RowNumber1': idx1+1,
                'JobSummary1': job_summary1,
                'RowNumber2': idx2+1,
                'JobSummary2': job_summary2
            })
    break

# Convert the list of dictionaries to a DataFrame and append it to df_similar_pairs.
new_rows = pd.DataFrame(data_to_append)
df_similar_pairs = pd.concat([df_similar_pairs, new_rows], ignore_index=True)

print(df_similar_pairs)


      RowNumber1                                        JobSummary1  \
0              5  Address:\nUSA-CT-Newington-44 Fenn Road\nStore...   
1              5  Address:\nUSA-CT-Newington-44 Fenn Road\nStore...   
2              5  Address:\nUSA-CT-Newington-44 Fenn Road\nStore...   
3              5  Address:\nUSA-CT-Newington-44 Fenn Road\nStore...   
4              6  Description\nOur\nRestaurant Team/Shift Leader...   
...          ...                                                ...   
86331       9997  LICENSED MARRIAGE AND FAMILY THERAPIST NEEDED ...   
86332       9998  Mental Health Counselor positions FT/PT -\nRes...   
86333       9998  Mental Health Counselor positions FT/PT -\nRes...   
86334       9998  Mental Health Counselor positions FT/PT -\nRes...   
86335      10000  Job Description\nShare Share Share\nSecondary ...   

      RowNumber2                                        JobSummary2  
0           6342  Address:\nUSA-RI-Johnston-11 Commerce Way\nSto...  
1      

In [30]:
df_similar_pairs.head(10)

Unnamed: 0,RowNumber1,JobSummary1,RowNumber2,JobSummary2
0,5,Address:\nUSA-CT-Newington-44 Fenn Road\nStore...,6342,Address:\nUSA-RI-Johnston-11 Commerce Way\nSto...
1,5,Address:\nUSA-CT-Newington-44 Fenn Road\nStore...,6096,Address:\nUSA-RI-Cranston-204 Garfield Ave\nSt...
2,5,Address:\nUSA-CT-Newington-44 Fenn Road\nStore...,2097,Address:\nUSA-RI-Providence-165 Pitnam Street\...
3,5,Address:\nUSA-CT-Newington-44 Fenn Road\nStore...,1818,Address:\nUSA-RI-Narragansett-91 Point Judith ...
4,6,Description\nOur\nRestaurant Team/Shift Leader...,519,Description\nOur\nRestaurant Team/Shift Leader...
5,6,Description\nOur\nRestaurant Team/Shift Leader...,776,Description\nOur\nRestaurant Team/Shift Leader...
6,6,Description\nOur\nRestaurant Team/Shift Leader...,775,Our\nRestaurant Team/Shift Leaders\nhave a dua...
7,6,Description\nOur\nRestaurant Team/Shift Leader...,10,Our\nRestaurant Team/Shift Leaders\nhave a dua...
8,6,Description\nOur\nRestaurant Team/Shift Leader...,537,Description\nOur\nRestaurant Team/Shift Leader...
9,6,Description\nOur\nRestaurant Team/Shift Leader...,159,Description\nOur\nRestaurant Team/Shift Leader...
