In [None]:
!pip install --upgrade pip
!pip install sentence-transformers thefuzz

import pandas as pd
import numpy as np
import time
import json
import os
import re

from transformers import pipeline, AutoTokenizer, AutoModel
from sklearn.metrics.pairwise import cosine_similarity
from sentence_transformers import SentenceTransformer
from concurrent.futures import ThreadPoolExecutor
from nltk.tokenize import sent_tokenize
from sklearn.decomposition import PCA
from abc import ABC, abstractmethod
from nltk.corpus import stopwords
from dataclasses import dataclass
from transformers import logging
import torch.nn.functional as F
from bs4 import BeautifulSoup
from thefuzz import fuzz
from queue import Queue
from nltk import ngrams
from torch import nn
import transformers
import threading
import torch
import spacy
import nltk

logging.set_verbosity_error()
nltk.download('stopwords')
nltk.download('punkt_tab')
stop_words = set(stopwords.words("english"))

kaggle_dir = '/root/.kaggle'
os.makedirs(kaggle_dir, exist_ok=True)

! kaggle datasets download -d promptcloud/indeed-job-posting-dataset
! kaggle datasets download -d thenoob69/esco-skills
! kaggle datasets download -d ravindrasinghrana/job-description-dataset
! mkdir datasets
! unzip indeed-job-posting-dataset.zip -d datasets
! unzip job-description-dataset.zip -d datasets
! unzip esco-skills.zip -d datasets
! rm indeed-job-posting-dataset.zip job-description-dataset.zip esco-skills.zip

# Chapter One: Comparison of the Performance of Four Large Language Models on a Small Subset of Manually Labeled Data for the Task of Skills and Benefits Extraction

In [None]:
JOB_DESCRIPTIONS_FP = 'datasets/job_descriptions.csv'
j_df = pd.read_csv(JOB_DESCRIPTIONS_FP)[['Benefits']].dropna().drop_duplicates()

JOBS_FP = 'datasets/home/sdf/marketing_sample_for_trulia_com-real_estate__20190901_20191031__30k_data.csv'
jd_df = pd.read_csv(JOBS_FP)[['Job Description']].dropna().drop_duplicates()

ESCO_SKILLS_FP = 'datasets/skills.csv'
es_df = pd.read_csv(ESCO_SKILLS_FP)[['label_cleaned']].dropna().drop_duplicates().rename(columns={'label_cleaned': 'Skills'})

In [None]:
# read labeled subset of data
job_labels_file_path = 'job_labels.json'
with open(job_labels_file_path, 'r') as file:
    job_labels = json.load(file)

In [None]:
def clean_sentence(sentence):
    words = sentence.lower().split()
    words_no_stop = [word for word in words if word not in stop_words]
    sentence_no_stop = " ".join(words_no_stop)
    return re.sub(r'[^\w\s]', '', sentence_no_stop)

def get_sentences(jd):
    soup = BeautifulSoup(jd, 'html.parser')
    st = sent_tokenize(soup.get_text())
    sentences = []

    for sent in st:
        sentences.extend([x for x in sent.split('\n') if x != ''])

    return [clean_sentence(s) for s in sentences]

In [None]:
class Items:
    def __init__(self, benefits_df, skills_df, embedding_model):
        self.benefits_df = benefits_df
        self.skills_df = skills_df
        self.embedding_model = embedding_model
        self.emb_column_name = embedding_model.get_conf().emb_label
        self.processed_skills_df = None
        self.processed_benefits_df = None

    def extract_items(self):
        def extract_skills():
            skills = self.skills_df['Skills'].apply(
                lambda x: re.sub(r'\([^)]*\)', '', x)
            ).str.strip()

            self.processed_skills_df = pd.DataFrame({
                'Original': skills,
                'Skill': skills.apply(clean_sentence)
            }).drop_duplicates('Skill').reset_index(drop=True)

        def extract_benefits():
            benefits_list = []
            for row in self.benefits_df['Benefits']:
                for benefit in re.sub(r'[\{\}\']', '', row).split(','):
                    original = ' '.join(benefit.split('(')[0].strip().lower().split())
                    benefits_list.append((original, original))

            # additional ChatGPT generated benefits
            with open('benefits.txt', 'r') as file:
                for line in file:
                    benefit = line.strip()
                    if benefit:
                        standardized = ' '.join(benefit.lower().split())
                        benefits_list.append((standardized, standardized))

            self.processed_benefits_df = pd.DataFrame(
                benefits_list,
                columns=['Original', 'Benefit']
            ).drop_duplicates('Benefit').reset_index(drop=True)

        extract_skills()
        extract_benefits()

    def generate_embeddings(self, batch_size: int = 32):
        def process_items_batch(df: pd.DataFrame, text_column: str):
            embeddings = []

            for i in range(0, len(df), batch_size):
                batch = df[text_column].iloc[i:i + batch_size].tolist()
                batch_embeddings = self.embedding_model.get_embeddings(batch)
                embeddings.extend(batch_embeddings.numpy())

            df[self.emb_column_name] = embeddings

        process_items_batch(self.processed_skills_df, 'Skill')
        process_items_batch(self.processed_benefits_df, 'Benefit')

    def get_skills_df(self):
        return self.processed_skills_df

    def get_benefits_df(self):
        return self.processed_benefits_df

In [None]:
@dataclass
class Configuration:
    backbone_name: str
    emb_label: str
    tokenizer: AutoTokenizer
    device: torch.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    def __post_init__(self):
        self.backbone = AutoModel.from_pretrained(self.backbone_name).to(self.device)

In [None]:
class EmbeddingModel(ABC):
    def __init__(self, backbone_name: str, emb_label: str):
        self.conf = Configuration(
            backbone_name=backbone_name,
            emb_label=emb_label,
            tokenizer=AutoTokenizer.from_pretrained(backbone_name)
        )

    @abstractmethod
    def get_embeddings(self, texts: list) -> torch.Tensor:
        pass

    def get_conf(self) -> Configuration:
        return self.conf


class JobBertModel(EmbeddingModel):
    def __init__(self):
        super().__init__(backbone_name='jjzha/jobbert-base-cased', emb_label='jobbert')

    def get_embeddings(self, texts: list) -> torch.Tensor:
        inputs = self.conf.tokenizer(texts, return_tensors='pt', padding=True, truncation=True).to(self.conf.device)
        with torch.no_grad():
            embeddings = self.conf.backbone(**inputs).last_hidden_state[:, 0, :]
        return embeddings.cpu()


class MiniLMModel(EmbeddingModel):
    def __init__(self):
        self.conf = Configuration(
            backbone_name='sentence-transformers/all-MiniLM-L6-v2',
            emb_label='minilm',
            tokenizer=None
        )
        self.model = SentenceTransformer(self.conf.backbone_name).to(self.conf.device)


    def get_embeddings(self, texts: list) -> torch.Tensor:
        embeddings = self.model.encode(texts, show_progress_bar=False, convert_to_tensor=True, device=self.conf.device)
        return embeddings.cpu()


class DistilBertModel(EmbeddingModel):
    def __init__(self):
        super().__init__(backbone_name='distilbert-base-cased', emb_label='distilbert')

    def get_embeddings(self, texts: list) -> torch.Tensor:
        inputs = self.conf.tokenizer(texts, return_tensors='pt', padding=True, truncation=True).to(self.conf.device)
        with torch.no_grad():
            embeddings = self.conf.backbone(**inputs).last_hidden_state[:, 0, :]
        return embeddings.cpu()


class RobertaModel(EmbeddingModel):
    def __init__(self):
        super().__init__(backbone_name='roberta-base', emb_label='roberta')

    def get_embeddings(self, texts: list) -> torch.Tensor:
        inputs = self.conf.tokenizer(texts, return_tensors='pt', padding=True, truncation=True).to(self.conf.device)
        with torch.no_grad():
            embeddings = self.conf.backbone(**inputs).last_hidden_state[:, 0, :]
        return embeddings.cpu()

In [None]:
def compute_similarity(vecs, emb_type, df_items):
    input_vectors = torch.stack(vecs).T
    esco_vectors = torch.tensor(np.stack(df_items[emb_type].values), dtype=torch.float32)

    norm_esco_vectors = torch.nn.functional.normalize(esco_vectors, p=2, dim=1)
    norm_input_vectors = torch.nn.functional.normalize(input_vectors, p=2, dim=0)

    cos_similarities = torch.matmul(norm_esco_vectors, norm_input_vectors)
    sims, idxs = torch.max(cos_similarities, dim=0)

    return idxs.numpy(), sims.numpy()

In [None]:
def get_unique_matches(matches):
    unique_matches = {}

    for ngram, skill_name, score in matches:
        if (skill_name not in unique_matches or
            score > unique_matches[skill_name][2]):
            unique_matches[skill_name] = (ngram, skill_name, score)

    return list(unique_matches.values())

def is_match(el1, el2, threshold=0.8):
    return fuzz.ratio(el1.lower(), el2.lower()) >= threshold * 100

def calculate_metrics(matched, found, real):
    tp = matched
    fp = found - matched
    fn = real - matched

    precision = tp / (tp + fp) if (tp + fp) > 0 else 0
    recall = tp / (tp + fn) if (tp + fn) > 0 else 0
    f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0

    return {
        'precision': precision,
        'recall': recall,
        'f1': f1
    }

def find_max_similarity(df_items, ngram_embs, emb_label, res, pred_labels, ngram_list, threshold=0.85):
    idxs, sims = compute_similarity(ngram_embs, emb_label, df_items)
    max_idx = np.argmax(sims)

    if sims[max_idx] > threshold and idxs[max_idx] < len(df_items):
        pred_labels.append(idxs[max_idx])
        res.append((ngram_list[max_idx],
                   df_items.iloc[idxs[max_idx]]['Original'],
                   sims[max_idx]))

def find_matched(unique_matches, expected):
    matched = []
    for _, name, score in unique_matches:
        for expected_el in expected:
            if is_match(name, expected_el):
                matched.append((name, expected_el, score))

    metrics = calculate_metrics(len(matched), len(unique_matches), len(expected))
    return metrics['precision'], metrics['recall'], metrics['f1']

In [None]:
threshold = .85

for model in [MiniLMModel(), JobBertModel(), RobertaModel(), DistilBertModel()]:
    processor = Items(
        benefits_df=j_df,
        skills_df=es_df,
        embedding_model=model
    )

    processor.extract_items()
    processor.generate_embeddings(batch_size=32)

    skills_emb_df = processor.get_skills_df()
    benefits_emb_df = processor.get_benefits_df()

    metrics = {
        'skills': {'acc': [], 'rec': [], 'f1': []},
        'benefits': {'acc': [], 'rec': [], 'f1': []}
    }

    for job in job_labels:
        idx, skills, benefits = job.values()
        res_skills = []
        pred_labels_skills = []
        res_benefits = []
        pred_labels_benefits = []
        sentences = get_sentences(jd_df['Job Description'][int(idx)])
        emb_label = model.get_conf().emb_label

        for sent in sentences:
            ngram_embs = []
            sent = clean_sentence(sent)
            sent_emb = model.get_embeddings(sent).squeeze()

            words = sent.split()
            ngram_max_len = 8 if len(words) > 8 else len(words)
            ngram_list = [gram for i in range(1, ngram_max_len+1) for gram in list(ngrams(words, i))]

            for ngram in ngram_list:
                text = ' '.join(ngram)
                ngram_embs.append((model.get_embeddings([text])[0])*0.8 + sent_emb*0.2)

            if len(ngram_embs) == 0:
                continue

            find_max_similarity(benefits_emb_df, ngram_embs, emb_label,
                                res_benefits, pred_labels_benefits, ngram_list)
            find_max_similarity(skills_emb_df, ngram_embs, emb_label,
                                res_skills, pred_labels_skills, ngram_list)

        unique_matches_skills = get_unique_matches(res_skills)
        unique_matches_benefits = get_unique_matches(res_benefits)

        for type_, data in [('skills', (unique_matches_skills, skills)),
                       ('benefits', (unique_matches_benefits, benefits))]:
            acc, rec, f1 = find_matched(*data)
            metrics[type_]['acc'].append(acc)
            metrics[type_]['rec'].append(rec)
            metrics[type_]['f1'].append(f1)

        break

    print("###############################################")
    print(f"Model: {emb_label}")
    means = {k: {m: np.mean(v[m]) for m in v} for k, v in metrics.items()}
    for category, values in means.items():
        print(f"\n{category.capitalize()} means:")
        print('\n'.join(f"{k}: {v:.2f}" for k, v in values.items()))

###############################################
Model: minilm

Skills means:
acc: 0.57
rec: 0.40
f1: 0.47

Benefits means:
acc: 1.00
rec: 0.57
f1: 0.73
###############################################
Model: jobbert

Skills means:
acc: 0.12
rec: 0.30
f1: 0.17

Benefits means:
acc: 0.67
rec: 0.57
f1: 0.62
###############################################
Model: roberta

Skills means:
acc: 0.08
rec: 0.20
f1: 0.11

Benefits means:
acc: 0.44
rec: 0.57
f1: 0.50
###############################################
Model: distilbert

Skills means:
acc: 0.11
rec: 0.40
f1: 0.17

Benefits means:
acc: 0.22
rec: 0.57
f1: 0.32


# Chapter Two: Optimization of Skills and Benefits Extraction for the Best-Performing Model

In [None]:
from concurrent.futures import ThreadPoolExecutor
from torch.nn.utils.rnn import pad_sequence
from collections import defaultdict
from typing import Dict, Set
import functools

In [None]:
@dataclass
class ExtractionResult:
    skills: Set[str]
    benefits: Set[str]
    confidence_scores: Dict[str, float]

class PerfomanceMeasure(ABC):
    def __init__(self, j_df, es_df, model):
        self.model = model
        self.prepare_embeddings(j_df, es_df)

    def prepare_embeddings(self, j_df, es_df):
        processor = Items(
            benefits_df=j_df,
            skills_df=es_df,
            embedding_model=self.model
        )

        processor.extract_items()
        processor.generate_embeddings(batch_size=32)

        self.skills_emb_df = processor.get_skills_df()
        self.benefits_emb_df = processor.get_benefits_df()
        self.emb_label = self.model.get_conf().emb_label

    def print_time(self, start, end, len_job_samples):
        time_diff = end - start
        if time_diff < 60:
            print(f"{len_job_samples} job descriptions were processed for {time_diff:.2f} seconds")
        else:
            print(f"{len_job_samples} job descriptions were processed for {time_diff/60:.2f} minutes")

    @abstractmethod
    def run(self, job_sample):
        pass

### Non-optimized version from the first part

In [None]:
class SequencePerfomance(PerfomanceMeasure):
    def __init__(self, j_df, es_df, model):
        super().__init__(j_df, es_df, model)

    def run(self, job_samples):
        start = time.time()
        for job_sample in job_samples:
            res_skills = []
            res_benefits = []
            pred_labels_skills = []
            pred_labels_benefits = []
            sentences = get_sentences(job_sample)

            for sent in sentences:
                ngram_embs = []
                sent = clean_sentence(sent)
                sent_emb = self.model.get_embeddings(sent).squeeze()

                words = sent.split()
                ngram_max_len = 8 if len(words) > 8 else len(words)
                ngram_list = [gram for i in range(1, ngram_max_len+1) for gram in list(ngrams(words, i))]

                for ngram in ngram_list:
                    text = ' '.join(ngram)
                    ngram_embs.append((self.model.get_embeddings([text])[0])*0.8 + sent_emb*0.2)

                if len(ngram_embs) == 0:
                    continue

                find_max_similarity(self.benefits_emb_df, ngram_embs, self.emb_label,
                                    res_benefits, pred_labels_benefits, ngram_list)
                find_max_similarity(self.skills_emb_df, ngram_embs, self.emb_label,
                                    res_skills, pred_labels_skills, ngram_list)

            unique_matches_skills = get_unique_matches(res_skills)
            unique_matches_benefits = get_unique_matches(res_benefits)

        end = time.time()
        self.print_time(start, end, len(job_samples))

In [None]:
job_samples = jd_df['Job Description'].sample(n=15, random_state=42)
p = SequencePerfomance(j_df, es_df, MiniLMModel())
p.run(job_samples)

15 job descriptions were processed for 4.23 minutes


### Optimized version with the following modifications:

  * Batch Processing
  * Adding LRU cache and pre-computed reference embeddings
  * Parallel Processing
  * Vectorized Similarity Computation



In [None]:
class OptimizedSequencePerformance(PerfomanceMeasure):
    def __init__(self, j_df, es_df, model, batch_size = 32):
        super().__init__(j_df, es_df, model)
        self.batch_size = batch_size
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

        self.skills_embeddings = self._prepare_reference_embeddings(self.skills_emb_df, self.emb_label)
        self.benefits_embeddings = self._prepare_reference_embeddings(self.benefits_emb_df, self.emb_label)

        self.sentence_cache = {}

    def _prepare_reference_embeddings(self, df, emb_label):
        """Pre-compute and normalize reference embeddings."""
        embeddings = torch.stack([
            torch.from_numpy(x) if isinstance(x, np.ndarray) else x
            for x in df[emb_label]
        ]).to(self.device)
        return F.normalize(embeddings, p=2, dim=1)

    @functools.lru_cache(maxsize=512)
    def _get_cached_embedding(self, text):
        """Cache embeddings for frequently used text."""
        embedding = self.model.get_embeddings(text).squeeze()
        return embedding.to(self.device)

    def _process_batch(self, sentences):
        return [self._get_cached_embedding(sent) for sent in sentences]

    def _generate_ngrams_efficient(self, words, max_len = 8):
        ngram_max_len = max_len if len(words) > max_len else len(words)
        return list(ngrams(words, n) for n in range(1, ngram_max_len+1))

    def _compute_similarities_batch(self, ngram_embeddings, reference_embeddings, threshold = 0.85):
        """Match sequence version normalization"""
        with torch.no_grad():
            input_vectors = ngram_embeddings.T
            norm_input = F.normalize(input_vectors, p=2, dim=0)
            norm_refs = F.normalize(reference_embeddings, p=2, dim=1)

            similarities = torch.matmul(norm_refs, norm_input)
            max_similarities, max_indices = torch.max(similarities, dim=0)

            mask = max_similarities >= threshold
            return max_similarities[mask], max_indices[mask]

    def _generate_ngrams_efficient(self, words, max_len = 8):
        """Generate n-grams more efficiently using sliding window."""
        ngrams_dict = {}
        words_len = len(words)
        max_len = min(max_len, words_len)

        for n in range(1, max_len + 1):
            for i in range(words_len - n + 1):
                ngram = tuple(words[i:i + n])
                text = ' '.join(ngram)
                if text not in ngrams_dict:
                    ngrams_dict[text] = list(ngram)

        return [(k, v) for k, v in ngrams_dict.items()]

    def run(self, job_samples):
        results = []
        start = time.time()

        for i in range(0, len(job_samples), self.batch_size):
            batch = job_samples[i:i + self.batch_size]
            batch_results = self._process_job_batch(batch)
            # results.extend(batch_results)

        end = time.time()
        self.print_time(start, end, len(job_samples))
        # return results

    def _process_job_batch(self, job_batch):
        batch_results = []

        for job in job_batch:
            sentences = get_sentences(job)
            cleaned_sentences = [clean_sentence(sent) for sent in sentences]

            # Process sentences in batches
            sentence_embeddings = self._process_batch(cleaned_sentences)

            job_skills = defaultdict(float)
            job_benefits = defaultdict(float)

            for sent_emb, sent in zip(sentence_embeddings, cleaned_sentences):
                words = sent.split()
                if not words:
                    continue

                ngrams = self._generate_ngrams_efficient(words)

                if not ngrams:
                    continue

                ngram_texts = [text for text, _ in ngrams]
                try:
                    ngram_embeddings = torch.stack([
                        self._get_cached_embedding(text) * 0.8 + sent_emb * 0.2
                        for text in ngram_texts
                    ]).to(self.device)
                    ngram_embeddings = F.normalize(ngram_embeddings, p=2, dim=1)

                    # Process skills and benefits
                    skills_sims, skills_idx = self._compute_similarities_batch(
                        ngram_embeddings, self.skills_embeddings)
                    benefits_sims, benefits_idx = self._compute_similarities_batch(
                        ngram_embeddings, self.benefits_embeddings)

                    # Update results using correct column names
                    for sim, idx in zip(skills_sims, skills_idx):
                        skill = self.skills_emb_df.iloc[idx.item()]['Skill']
                        job_skills[skill] = max(job_skills[skill], sim.item())

                    for sim, idx in zip(benefits_sims, benefits_idx):
                        benefit = self.benefits_emb_df.iloc[idx.item()]['Benefit']
                        job_benefits[benefit] = max(job_benefits[benefit], sim.item())

                except RuntimeError as e:
                    print(f"Error processing ngrams for sentence: {sent[:100]}...")
                    continue

            result = ExtractionResult(
                skills=set(job_skills.keys()),
                benefits=set(job_benefits.keys()),
                confidence_scores={**job_skills, **job_benefits}
            )
            batch_results.append(result)

        return batch_results

In [None]:
job_samples = jd_df['Job Description'].sample(n=15, random_state=42)
p = OptimizedSequencePerformance(j_df, es_df, MiniLMModel())
p.run(job_samples)

15 job descriptions were processed for 3.45 minutes
