In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import os
os.chdir("/content/drive/My Drive/Colab Notebooks/")

In [None]:
import pandas as pd

In [None]:
df = pd.read_parquet("jobs_transfer_temp.parquet")

In [None]:
df_skills = df[["job_link", "skills_array"]]

In [None]:
df_skills.head()

Unnamed: 0,job_link,skills_array
0,https://ca.linkedin.com/jobs/view/team-lead-se...,"[Troubleshooting, Ticketing Systems, IT System..."
1,https://www.linkedin.com/jobs/view/i-e-designe...,"[Instrument & Control System Design, 2D AUTOCA..."
2,https://www.linkedin.com/jobs/view/client-rela...,"[Client Relations Management (CRM), Communicat..."
3,https://www.linkedin.com/jobs/view/private-dut...,"[LPN license, CPR certification, Inhome nursin..."
4,https://www.linkedin.com/jobs/view/procurement...,"[Procurement, Leadership, Stakeholder alignmen..."


In [None]:
import pandas as pd
import torch
from sentence_transformers import SentenceTransformer
from itertools import chain
import numpy as np
import hdbscan
from tqdm import tqdm
from collections import defaultdict
from itertools import chain
from sklearn.decomposition import PCA
import pyarrow as pa
import pyarrow.parquet as pq

import cupy as cp
from cuml.cluster import KMeans
from sklearn.cluster import MiniBatchKMeans
from scipy.spatial.distance import cdist

In [None]:
skills_lists = [s for s in df_skills["skills_array"] if s is not None]

all_skills = list(chain.from_iterable(skills_lists))

unique_skills = set(all_skills)

print("Total number of unique skills:", len(unique_skills))

Total number of unique skills: 3298136


In [None]:
def normalize_skill(skill):
    return str(skill).lower().strip()

print("Normalizing skills...")
normalized_skills = [normalize_skill(s) for s in tqdm(unique_skills, desc="Normalizing skills")]

Normalizing skills...


Normalizing skills: 100%|██████████| 3298136/3298136 [00:02<00:00, 1209785.78it/s]


In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"
model = SentenceTransformer("all-MiniLM-L6-v2", device=device)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


In [None]:


device = "cuda" if torch.cuda.is_available() else "cpu"
model = SentenceTransformer("all-MiniLM-L6-v2", device=device)

reduced_dim = 32
batch_size = 2048
num_skills = len(normalized_skills)
sample_size = 50000

In [None]:
sample_embeddings = []
for i in tqdm(range(0, sample_size, batch_size), desc="Sampling embeddings for PCA"):
    batch = normalized_skills[i:i+batch_size]
    with torch.no_grad():
        emb = model.encode(batch, convert_to_tensor=True, device=device, show_progress_bar=False)
    sample_embeddings.append(emb.cpu())
sample_embeddings = torch.cat(sample_embeddings).numpy()

pca = PCA(n_components=reduced_dim)
pca.fit(sample_embeddings)
print(f"PCA fit complete. Explained variance ratio: {np.sum(pca.explained_variance_ratio_):.2f}")

Sampling embeddings for PCA: 100%|██████████| 25/25 [00:11<00:00,  2.21it/s]


PCA fit complete. Explained variance ratio: 0.41


In [None]:
reduced_embeddings = []
for i in tqdm(range(0, len(normalized_skills), batch_size), desc="Embedding + PCA batches"):
    batch = normalized_skills[i:i+batch_size]
    with torch.no_grad():
        emb = model.encode(batch, convert_to_tensor=True, device=device, show_progress_bar=False)
    emb = emb.cpu().numpy()
    emb_reduced = pca.transform(emb)
    reduced_embeddings.append(emb_reduced)

Embedding + PCA batches: 100%|██████████| 1611/1611 [13:22<00:00,  2.01it/s]


In [None]:
flat_embeddings = np.vstack(reduced_embeddings)
assert flat_embeddings.shape[0] == len(normalized_skills), "Embeddings and skills length mismatch!"

output_path = "skills_embeddings_32d.parquet"
batch_size = 50000
writer = None

print(f"Saving {len(normalized_skills)} embeddings + normalized skills to Parquet in batches...")

for start_idx in tqdm(range(0, len(normalized_skills), batch_size), desc="Writing embeddings"):
    end_idx = min(start_idx + batch_size, len(normalized_skills))

    batch_skills = normalized_skills[start_idx:end_idx]
    batch_embeddings = flat_embeddings[start_idx:end_idx].tolist()

    batch_df = pd.DataFrame({
        "skill_normalized": batch_skills,
        "embedding": batch_embeddings
    })

    table = pa.Table.from_pandas(batch_df)

    if writer is None:
        writer = pq.ParquetWriter(output_path, table.schema)
    writer.write_table(table)

    del batch_df, table, batch_embeddings

if writer:
    writer.close()

print(f"Done! Embeddings + normalized skills saved to Parquet at {output_path}")

Saving 3298136 embeddings + normalized skills to Parquet in batches...


Writing embeddings: 100%|██████████| 66/66 [00:22<00:00,  2.91it/s]

Done! Embeddings + normalized skills saved to Parquet at skills_embeddings_32d.parquet





In [None]:
parquet_path = "skills_embeddings_32d.parquet"

df = pq.read_table(parquet_path).to_pandas()

print(df.head())

               skill_normalized  \
0         web and mobile trends   
1                           mqb   
2        highquality recruiting   
3  integrated hardware/software   
4                  pos strategy   

                                           embedding  
0  [0.15919700264930725, 0.07573260366916656, 0.1...  
1  [0.031084492802619934, -0.11332739889621735, 0...  
2  [-0.20707492530345917, 0.26148080825805664, 0....  
3  [0.23487156629562378, 0.06972840428352356, -0....  
4  [0.19313479959964752, 0.016096383333206177, 0....  


In [None]:

parquet_path = "skills_embeddings_32d.parquet"
num_clusters = 1000
batch_size = 50000

parquet_file = pq.ParquetFile(parquet_path)
num_row_groups = parquet_file.num_row_groups
all_skills = []
all_emb_batches = []

print(f"Loading embeddings batch-wise from {parquet_path}...")
for rg_idx in tqdm(range(num_row_groups), desc="Reading row groups", unit="rg"):
    batch_df = parquet_file.read_row_group(rg_idx).to_pandas()
    batch_emb = np.vstack(batch_df["embedding"].values)
    batch_skills = batch_df["skill_normalized"].tolist()

    all_emb_batches.append(batch_emb)
    all_skills.extend(batch_skills)

print("All batches loaded.")

emb_gpu = cp.array(np.vstack(all_emb_batches), dtype=cp.float32)
print(f"Embeddings shape: {emb_gpu.shape}")

Loading embeddings batch-wise from skills_embeddings_32d.parquet...


Reading row groups: 100%|██████████| 66/66 [00:14<00:00,  4.43rg/s]


All batches loaded.
Embeddings shape: (3298136, 32)


In [None]:
num_clusters = 2000
print(f"Clustering {emb_gpu.shape[0]} skills into {num_clusters} clusters on GPU...")
kmeans = KMeans(n_clusters=num_clusters, max_iter=300)

labels = kmeans.fit_predict(emb_gpu)
print("Clustering complete.")

df_clusters = pd.DataFrame({
    "skill_normalized": all_skills,
    "cluster": labels.get()
})

output_path = "skills_canonical_clusters.parquet"
df_clusters.to_parquet(output_path, index=False)
print(f"Canonical clusters saved to: {output_path}")

Clustering 3298136 skills into 2000 clusters on GPU...
Clustering complete.
Canonical clusters saved to: skills_canonical_clusters.parquet


In [None]:
parquet_path = "skills_canonical_clusters.parquet"
df_clusters = pd.read_parquet(parquet_path)

print("Columns in the parquet:", df_clusters.columns.tolist())
print("\nFirst 5 rows:")
print(df_clusters.head())

# top 20 largest clusters
print("\nNumber of unique clusters:", df_clusters['cluster'].nunique())
print("\nCluster sizes:")
print(df_clusters['cluster'].value_counts().head(20))  

Columns in the parquet: ['skill_normalized', 'cluster']

First 5 rows:
               skill_normalized  cluster
0         web and mobile trends     1455
1                           mqb      331
2        highquality recruiting     1250
3  integrated hardware/software      831
4                  pos strategy      819

Number of unique clusters: 2000

Cluster sizes:
cluster
926     5997
1       5756
1559    5682
902     5642
1474    5551
415     5353
1148    5320
1033    5298
313     5194
1330    5041
197     4933
659     4862
1615    4759
194     4726
1878    4713
580     4671
1048    4642
331     4531
1433    4385
77      4361
Name: count, dtype: int64


In [None]:
sample_clusters = df_clusters['cluster'].drop_duplicates().sample(5, random_state=100).tolist()

print("Sample clusters and their skills:\n")
for c in sample_clusters:
    skills_in_cluster = df_clusters[df_clusters['cluster'] == c]['skill_normalized'].tolist()
    print(f"Cluster {c} ({len(skills_in_cluster)} skills):")
    print(skills_in_cluster[:20])
    print("...")

Sample clusters and their skills:

Cluster 1952 (2821 skills):
['peo pgo cet pmp certifications', 'cpvc and pex certification', 'cphon certification', 'cprss certification', 'cpi or nappi certification', 'pmi pmp certification', 'cqi (cphq) certification', 'rmf certification', 'cpsm cscp cpp certifications', 'treasury professional certification', 'quality control management course certification', 'arizona: bls certification cma rma or ccma certification', 'scm certification', 'cpc cpch cca rhia rhit ccs ccsp certified', 'pmac / cpp certification', 'cca or cdt (csi)', 'cprbls certification (caha/aapnrp)', 'cpr/bls/acls/nrp certification', 'ccure9000 enterprise certification', 'rn efm certification']
...
Cluster 1338 (1412 skills):
['selfmedication administration', 'pain management focus', 'chronic disease management', 'difficult airway management', 'cardiac rhythm management procedures', 'highpressure situations management', 'medical and behavioral concerns management', 'illness managem

In [None]:


file_path = "skills_embeddings_32d.parquet"
print("Reading skills embeddings parquet...")
parquet_file = pq.ParquetFile(file_path)
df_embeddings = parquet_file.read().to_pandas()

print("Sample of embeddings:")
print(df_embeddings.head())

df_embeddings['embedding'] = df_embeddings['embedding'].apply(np.array)

n_clusters = 2000
print(f"Clustering {len(df_embeddings)} skills into {n_clusters} clusters...")
emb_array = np.vstack(df_embeddings['embedding'].values)

kmeans = MiniBatchKMeans(n_clusters=n_clusters, batch_size=8192, random_state=42)
labels = kmeans.fit_predict(emb_array)

df_embeddings['cluster'] = labels

Reading skills embeddings parquet...
Sample of embeddings:
               skill_normalized  \
0         web and mobile trends   
1                           mqb   
2        highquality recruiting   
3  integrated hardware/software   
4                  pos strategy   

                                           embedding  
0  [0.15919700264930725, 0.07573260366916656, 0.1...  
1  [0.031084492802619934, -0.11332739889621735, 0...  
2  [-0.20707492530345917, 0.26148080825805664, 0....  
3  [0.23487156629562378, 0.06972840428352356, -0....  
4  [0.19313479959964752, 0.016096383333206177, 0....  
Clustering 3298136 skills into 2000 clusters...


In [None]:
unique_clusters = np.unique(labels)
canonical_clusters = []

print("Computing canonical skills from centroids...")
for c in tqdm(unique_clusters, desc="Processing clusters"):
    cluster_rows = df_embeddings[df_embeddings['cluster'] == c]
    cluster_emb = np.vstack(cluster_rows['embedding'].values)

    centroid = cluster_emb.mean(axis=0, keepdims=True)
    distances = cdist(cluster_emb, centroid, metric='euclidean').flatten()
    closest_idx = distances.argmin()
    canonical_skill = cluster_rows.iloc[closest_idx]['skill_normalized']

    canonical_clusters.append({
        'canonical_skill': canonical_skill,
        'clustered_skills': cluster_rows['skill_normalized'].tolist()
    })

Computing canonical skills from centroids...


Processing clusters: 100%|██████████| 2000/2000 [00:22<00:00, 88.68it/s] 


In [None]:


df_clusters = pd.DataFrame(canonical_clusters)
output_path = "skill_clusters_canonical.parquet"

table = pa.Table.from_pandas(df_clusters)
pq.write_table(table, output_path)

print("Saved clusters to parquet:", output_path)
print("Sample clusters:")
print(df_clusters.head())

Saved clusters to parquet: skill_clusters_canonical.parquet
Sample clusters:
                                   canonical_skill  \
0                                property closings   
1                       google calendar management   
2  licensed registered nurse – mn board of nursing   
3              technical review and interpretation   
4                       verifying business changes   

                                    clustered_skills  
0  [highvalue property insurance, out of state pr...  
1  [experience creating and managing editorial ca...  
2  [nova scotia college of nursing (nscn) registr...  
3  [engineering feedback, technical tests, techno...  
4  [client matter aging reports, dsc vendor contr...  


In [None]:
clusters = pq.read_table("skill_clusters_canonical.parquet").to_pandas()

skill_to_canonical = {}

for _, row in clusters.iterrows():
    canon = row["canonical_skill"]
    for s in row["clustered_skills"]:
        skill_to_canonical[s] = canon

print("Total mappings:", len(skill_to_canonical))


input_file = "jobs_transfer_temp.parquet"
output_file = "jobs_transfer_retagged.parquet"

pf = pq.ParquetFile(input_file)
writer = None

def retag_list(skill_list):
    if skill_list is None:
        return []
    if isinstance(skill_list, list):
        return [skill_to_canonical.get(s, s) for s in skill_list]

    return [skill_to_canonical.get(s, s) for s in list(skill_list)]

Total mappings: 2770325


In [None]:
for rg in tqdm(range(pf.num_row_groups), desc="Retagging row groups"):
    batch = pf.read_row_group(rg).to_pandas()

    batch["skills_canonical"] = batch["skills_array"].apply(retag_list)

    table = pa.Table.from_pandas(batch)

    if writer is None:
        writer = pq.ParquetWriter(output_file, table.schema)

    writer.write_table(table)

writer.close()
print("Done")

Retagging row groups: 100%|██████████| 4/4 [00:51<00:00, 12.79s/it]

Done





In [None]:
import pickle

with open('map.pickle', 'wb') as f:
    pickle.dump(skill_to_canonical, f)
    print("Data pickled successfully.")

Data pickled successfully.
