<a href="https://colab.research.google.com/github/Ram-Vidhu/Job_searcher_resume_enhancer_with_crew_ai/blob/dev/Notebook/GenAI_Project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Spark Setup

In [None]:
#check that java is installed
!java -version

openjdk version "11.0.28" 2025-07-15
OpenJDK Runtime Environment (build 11.0.28+6-post-Ubuntu-1ubuntu122.04.1)
OpenJDK 64-Bit Server VM (build 11.0.28+6-post-Ubuntu-1ubuntu122.04.1, mixed mode, sharing)


In [None]:
!pip install pyspark



In [None]:
!pip install chromadb

Collecting chromadb
  Downloading chromadb-1.1.0-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (7.2 kB)
Collecting pybase64>=1.4.1 (from chromadb)
  Downloading pybase64-1.4.2-cp312-cp312-manylinux1_x86_64.manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_5_x86_64.whl.metadata (8.7 kB)
Collecting posthog<6.0.0,>=2.4.0 (from chromadb)
  Downloading posthog-5.4.0-py3-none-any.whl.metadata (5.7 kB)
Collecting onnxruntime>=1.14.1 (from chromadb)
  Downloading onnxruntime-1.23.0-cp312-cp312-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl.metadata (4.9 kB)
Collecting opentelemetry-exporter-otlp-proto-grpc>=1.2.0 (from chromadb)
  Downloading opentelemetry_exporter_otlp_proto_grpc-1.37.0-py3-none-any.whl.metadata (2.4 kB)
Collecting pypika>=0.48.9 (from chromadb)
  Downloading PyPika-0.48.9.tar.gz (67 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m67.3/67.3 kB[0m [31m2.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?

In [None]:
import os
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, LongType, TimestampType
import chromadb
from sentence_transformers import SentenceTransformer
from pyspark.sql import functions as F

In [None]:
spark = SparkSession.builder.appName("Job_recommendation").getOrCreate()

## Data preprocessing

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
df1 = spark.read.format('csv').option("header", True).load('/content/drive/MyDrive/1.3M_Linkedin/job_skills.csv')

In [None]:
df2 = spark.read.format('csv').option("header", True).load('/content/drive/MyDrive/1.3M_Linkedin/job_summary.csv')

In [None]:
df3 = df1.join(df2,on='job_link',how='inner')

In [None]:
df4 = spark.read.format('csv').option("header", True).load('/content/drive/MyDrive/1.3M_Linkedin/linkedin_job_postings.csv')

In [None]:
df5 = df3.join(df4,on='job_link',how='inner')

In [None]:
# Add a random column, sort, then take N rows
df5 = df5.withColumn("rand", F.rand(seed=42)).orderBy("rand").limit(100000)


## EDA

In [None]:
from pyspark.sql import functions as f

# take null counts
null_counts = df5.select([
    f.sum(f.col(c).isNull().cast("int")).alias(c)
    for c in df5.columns
])

null_counts.show()


In [None]:
# dropping null values
df5 = df5.na.drop()

In [None]:
values = df5.select("job_level").distinct().rdd.flatMap(lambda x: x).collect()

In [None]:
print(values)

In [None]:
df5.groupby("job_level").count().show()

In [None]:
df5.groupby("search_position").count().show()

In [None]:
df5.select("job_level").count()

In [None]:
df5.groupby("search_country").count().show()

In [None]:

# categorical_cols = ["job_type", "job_level", "search_position", "search_country", "search_city"]

# # Loop through and show unique values
# for col in categorical_cols:
#     print(f"--- {col} ---")
#     values = df5.select(col).distinct().rdd.flatMap(lambda x: x).collect()
#     print(values, "\n")

## Pre-Processing

In [None]:
df5 = df5.drop('last_processed_time', 'got_summary', 'got_ner', 'is_being_worked')

In [None]:
# dropping null values
df5 = df5.na.drop()

In [None]:
df5 = df5.filter(F.col("search_country").isin(["United States", "Canada", "Australia", "United Kingdom"])).filter(F.col("job_level").isin(["Associate", "Mid senior"]))

## Storing and querying vectordb

In [None]:
# Init Chroma client (persistent storage)
client = chromadb.PersistentClient(path="chroma_db")

# Create or get collection
collection = client.get_or_create_collection(
    name="jobs",
    metadata={"hnsw:space": "cosine"}  # use cosine similarity
)

In [None]:
# Embedding model
model = SentenceTransformer("all-MiniLM-L6-v2")

In [None]:
# df5 = df5.withColumn(
#     "job_text",
#     F.concat_ws(
#         " ",   # separator
#         F.coalesce(F.col("job_title"), F.lit("")),
#         F.coalesce(F.col("job_summary"), F.lit("")),
#         F.coalesce(F.col("job_skills"), F.lit("")),
#         F.coalesce(F.col("job_level"), F.lit(""))
#     )
# )

In [None]:
# Function to build weighted embedding text
def build_embedding_text(row):
    text = (
        ("job title: " + row["job_title"] + " ") * 3 +  # weight 3
        ("skills: " + row["job_skills"] + " ") * 3 +    # weight 3
        ("summary: " + row["job_summary"] + " ") * 2 +  # weight 2
        ("level: " + str(row["job_level"]) + " ") * 1 + # weight 1
        ("location: " + str(row["job_location"]) + " ") * 1  # weight 0.5 ~ 1
    )
    return text.strip()

In [None]:
def clean_metadata(row_dict):
    clean = {}
    for k, v in row_dict.items():
        if k == "job_text":   # don't include job_text in metadata
            continue
        if v is None:
            clean[k] = ""   # default to empty string
        elif isinstance(v, (bool, int, float, str)):
            clean[k] = v
        else:
            clean[k] = str(v)   # fallback: convert to string
    return clean

In [None]:
batch_size = 500
rows_iter = df5.toLocalIterator()

batch = []
for row in rows_iter:
    batch.append(row.asDict())

    if len(batch) >= batch_size:
        # Build weighted embedding texts
        texts = [build_embedding_text(r) for r in batch]

        # Generate embeddings
        embeddings = model.encode(texts)

        # Ensure unique IDs across batches (use row index instead of batch index)
        ids = [str(r["id"]) for r in batch] if "id" in batch[0] else [str(i) for i in range(len(batch))]

        # Prepare metadata
        metadata = [clean_metadata(r) for r in batch]

        # Insert into Chroma
        collection.add(
            ids=ids,
            embeddings=embeddings.tolist(),
            documents=texts,
            metadatas=metadata
        )

        # Clear batch
        batch = []

# Handle leftover rows
if batch:
    texts = [build_embedding_text(r) for r in batch]
    embeddings = model.encode(texts)
    ids = [str(r["id"]) for r in batch] if "id" in batch[0] else [str(i) for i in range(len(batch))]
    metadata = [clean_metadata(r) for r in batch]

    collection.add(
        ids=ids,
        embeddings=embeddings.tolist(),
        documents=texts,
        metadatas=metadata
    )


In [None]:
def search_jobs_chroma(resume_text, top_k=5, filters=None):
    embedding = model.encode([resume_text])[0]

    results = collection.query(
        query_embeddings=[embedding.tolist()],
        n_results=top_k,
        where=filters  # e.g., {"job_location": "Berlin", "job_type": "Full-time"}
    )

    jobs = []
    for i in range(len(results["ids"][0])):
        jobs.append({
            "similarity_score": results["distances"][0][i],
            **results["metadatas"][0][i]
        })
    return pd.DataFrame(jobs)

In [None]:
results = search_jobs_chroma("data scientist [python, sql, machine learning, pyspark, Azure] mid level", top_k=5)

In [None]:
results

Unnamed: 0,similarity_score,first_seen,search_position,job_summary,job_title,job_skills,search_country,company,job_type,job_link,job_location,rand,job_level,search_city
0,0.469473,2024-01-14,Recruiter,Who We Are,Identity Development Manager,"SailPoint IIQ, Active Directory, Identity gove...",United States,WSP in the U.S.,Onsite,https://www.linkedin.com/jobs/view/identity-de...,"Savannah, GA",0.000347,Mid senior,Savannah


In [None]:
# Zip your ChromaDB folder
!zip -r /content/chroma_db.zip /content/chroma_db

updating: content/chroma_db/ (stored 0%)
updating: content/chroma_db/chroma.sqlite3 (deflated 64%)
updating: content/chroma_db/9bafcaea-95de-4f11-a110-c6190d0f0bf1/ (stored 0%)
updating: content/chroma_db/9bafcaea-95de-4f11-a110-c6190d0f0bf1/data_level0.bin (deflated 12%)
updating: content/chroma_db/9bafcaea-95de-4f11-a110-c6190d0f0bf1/link_lists.bin (deflated 87%)
updating: content/chroma_db/9bafcaea-95de-4f11-a110-c6190d0f0bf1/index_metadata.pickle (deflated 64%)
updating: content/chroma_db/9bafcaea-95de-4f11-a110-c6190d0f0bf1/length.bin (deflated 82%)
updating: content/chroma_db/9bafcaea-95de-4f11-a110-c6190d0f0bf1/header.bin (deflated 59%)


In [None]:
# Copy zip into Drive (adjust path if needed)
!cp /content/chroma_db.zip /content/drive/MyDrive/chroma_db.zip

In [None]:
from google.colab import files

files.download('/content/chroma_db.zip')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>