In [2]:
# Install the Snowflake connector if not already installed:
# !pip install snowflake-connector-python

import snowflake.connector
import pandas as pd
import numpy as np

# Connect to Snowflake using given credentials
conn = snowflake.connector.connect(
    user='MUDIT',
    password='Testing@123123',
    account='BCEMHHI-LB94703',
    warehouse='COMPUTE_WH',
    database='JOB_RECOMMENDATIONS',
    schema='JOB_DATA',
    role='ACCOUNTADMIN'
)
print("Connected to Snowflake successfully.")


Connected to Snowflake successfully.


In [3]:
# Execute a query to retrieve all records from the JOBS table
query = "SELECT * FROM JOB_DESC;"
cur = conn.cursor()
cur.execute(query)

# Fetch all results into a pandas DataFrame
jobs_df = cur.fetch_pandas_all()  # Loads all rows into a DataFrame&#8203;:contentReference[oaicite:1]{index=1}
print(f"Retrieved {len(jobs_df)} job postings.")
jobs_df.head(3)  # display first few rows for verification (optional)


Retrieved 1615940 job postings.


Unnamed: 0,Job Id,Experience,Qualifications,Salary Range,location,Country,latitude,longitude,Work Type,Company Size,...,Contact,Job Title,Role,Job Portal,Job Description,Benefits,skills,Responsibilities,Company,Company Profile
0,2600342200917599,,,$61K-$106K,"Capitol Hill, Saipan",,,,,,...,,Purchasing Agent,Inventory Manager,,An Inventory Manager oversees inventory levels...,,Inventory control Demand forecasting Supply ch...,,Kyndryl Holdings,
1,1097571695278272,,,$57K-$86K,Banjul,,,,,,...,,Graphic Designer,Web Graphic Designer,,Web Graphic Designers create visually appealin...,,"Graphic design tools (e.g., Adobe Creative Sui...",,Ambuja Cements,
2,393705790719989,,,$60K-$103K,Tashkent,,,,,,...,,Physician Assistant,Surgical Physician Assistant,,"Assist surgeons in the operating room, perform...",,Surgical procedures and techniques Operating r...,,Whitehaven Coal,


In [6]:
import os
import re
import string
import time
import logging
import warnings
import pandas as pd
import numpy as np
import joblib
from datasketch import MinHash, MinHashLSH

try:
    import fitz  # PyMuPDF for faster PDF processing
except ImportError:
    raise ImportError("The 'fitz' module (PyMuPDF) is not installed. Install it via 'pip install PyMuPDF'")

warnings.filterwarnings('ignore')
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s', filename='job_recommender_lsh.log')
logger = logging.getLogger()

# --- Cache Status Verification ---

def check_cache_status(cache_dir="./lsh_cache"):
    """Check and display cache file status"""
    if not os.path.exists(cache_dir):
        print(f"Cache directory {cache_dir} does not exist!")
        return False
        
    cache_files = {
        "LSH Index": os.path.join(cache_dir, "lsh_index.pkl"),
        "MinHashes": os.path.join(cache_dir, "minhashes.pkl"),
        "Jobs DataFrame": os.path.join(cache_dir, "processed_jobs_df.pkl")
    }
    
    print("\nCache Status:")
    all_exist = True
    for name, path in cache_files.items():
        if os.path.exists(path):
            size_mb = os.path.getsize(path) / (1024 * 1024)
            mod_time = time.strftime('%Y-%m-%d %H:%M:%S', 
                       time.localtime(os.path.getmtime(path)))
            print(f"  ✓ {name}: {size_mb:.2f} MB (Last modified: {mod_time})")
        else:
            print(f"  ✗ {name}: Not found")
            all_exist = False
    print()
    return all_exist

# --- Utility Functions ---

def extract_resume_text(pdf_path):
    try:
        doc = fitz.open(pdf_path)
        text = " ".join([page.get_text() for page in doc])
        doc.close()
        return text
    except Exception as e:
        logger.error(f"PDF extraction failed: {e}")
        print(f"Error extracting text from {pdf_path}: {e}")
        return ""

def clean_text(text):
    if not isinstance(text, str):
        return ""
    text = text.lower()
    text = re.sub(r'\n', ' ', text)
    text = text.translate(str.maketrans('', '', string.punctuation))
    return re.sub(r'\s+', ' ', text).strip()

def weight_text(text, weight):
    if not isinstance(text, str):
        return ""
    weighted = text * int(weight)
    frac = weight - int(weight)
    if frac:
        words = text.split()
        weighted += " " + " ".join(words[:int(len(words) * frac)])
    return weighted

def get_shingles(text, k=5):
    if len(text) < k:
        return set()
    return set([text[i:i+k] for i in range(len(text)-k+1)])

# --- MinHash LSH Embedding with Caching ---

def build_or_load_lsh_index(jobs_df, cache_dir="./lsh_cache", num_perm=128, force_rebuild=False):
    os.makedirs(cache_dir, exist_ok=True)
    lsh_path = os.path.join(cache_dir, "lsh_index.pkl")
    minhash_path = os.path.join(cache_dir, "minhashes.pkl")
    df_path = os.path.join(cache_dir, "processed_jobs_df.pkl")

    # Check if all cache files exist
    cache_exists = all(os.path.exists(path) for path in [lsh_path, minhash_path, df_path])
    
    if cache_exists and not force_rebuild:
        print(f"CACHE HIT: Loading cached LSH index from {os.path.abspath(cache_dir)}")
        logger.info("Loading cached LSH index and MinHashes...")
        start_time = time.time()
        try:
            lsh = joblib.load(lsh_path)
            minhashes = joblib.load(minhash_path)
            cached_df = joblib.load(df_path)
            load_time = time.time() - start_time
            print(f"Cache loaded in {load_time:.2f} seconds")
            
            # Verify if cached DataFrame structure matches the current one
            if list(cached_df.columns) == list(jobs_df.columns) and len(cached_df) == len(jobs_df):
                print("DataFrame structure matches cache. Using cached data.")
                return lsh, minhashes, cached_df
            else:
                print("WARNING: DataFrame structure has changed. Rebuilding cache...")
        except Exception as e:
            print(f"Error loading cache: {e}. Rebuilding...")
    else:
        if force_rebuild:
            print("Force rebuilding LSH index as requested.")
        else:
            print(f"CACHE MISS: Building new LSH index (files not found in {os.path.abspath(cache_dir)})")
        
    logger.info("Building new LSH index...")
    print("Building LSH index from scratch. This may take some time...")
    start_time = time.time()
    
    # Process text with weights
    weights = {
        'Job Title': 3.0, 'Role': 2.5, 'skills': 2.0, 'Job Description': 1.0, 'Company': 0.8
    }
    
    jobs_df['Weighted_Text'] = ""
    for field, weight in weights.items():
        if field in jobs_df.columns:
            jobs_df['Weighted_Text'] += jobs_df[field].fillna('').apply(lambda x: weight_text(str(x), weight) + " ")

    jobs_df['Weighted_Text_Clean'] = jobs_df['Weighted_Text'].apply(clean_text)

    lsh = MinHashLSH(threshold=0.3, num_perm=num_perm)
    minhashes = {}

    total_rows = len(jobs_df)
    for i, (idx, row) in enumerate(jobs_df.iterrows()):
        if i % 100 == 0:
            print(f"Processing {i}/{total_rows} jobs ({i/total_rows*100:.1f}%)...")
            
        shingles = get_shingles(row['Weighted_Text_Clean'])
        m = MinHash(num_perm=num_perm)
        for shingle in shingles:
            m.update(shingle.encode('utf8'))
        lsh.insert(str(idx), m)
        minhashes[str(idx)] = m

    build_time = time.time() - start_time
    print(f"LSH index built in {build_time:.2f} seconds")
    
    # Save to cache
    print("Saving LSH index to cache...")
    try:
        joblib.dump(lsh, lsh_path, compress=3)
        joblib.dump(minhashes, minhash_path, compress=3)
        joblib.dump(jobs_df, df_path, compress=3)
        print(f"Cache saved to {os.path.abspath(cache_dir)}")
    except Exception as e:
        print(f"Warning: Failed to save cache: {e}")
    
    return lsh, minhashes, jobs_df

# --- Similarity Matching ---

def match_resume_lsh(resume_text, lsh, minhashes, jobs_df, num_perm=128, top_n=10):
    print("Processing resume text...")
    
    resume_clean = clean_text(resume_text)
    shingles = get_shingles(resume_clean)
    
    if not shingles:
        print("Warning: No shingles generated from resume. Text might be too short or empty.")
        return pd.DataFrame({'Message': ['No valid content found in resume']})
        
    m = MinHash(num_perm=num_perm)
    for sh in shingles:
        m.update(sh.encode('utf8'))

    print("Finding matches in LSH index...")
    result_ids = lsh.query(m)
    
    if not result_ids:
        print("No matches found in LSH. Try lowering the threshold.")
        return pd.DataFrame({'Message': ['No matches found']})
        
    print(f"Found {len(result_ids)} potential matches. Calculating similarities...")
    
    similarities = []
    for idx in result_ids:
        jaccard_sim = m.jaccard(minhashes[idx])
        similarities.append((int(idx), jaccard_sim))

    similarities.sort(key=lambda x: x[1], reverse=True)
    top_ids = [idx for idx, _ in similarities[:top_n]]
    top_scores = [score for _, score in similarities[:top_n]]

    result_df = jobs_df.loc[top_ids].copy()
    result_df['similarity'] = top_scores
    
    return result_df

# --- Main Recommender ---

def run_job_recommender_lsh(resume_path, jobs_df, cache_dir="./lsh_cache", top_n=10, force_rebuild=False, num_perm=128):
    print(f"\n--- Job Recommender LSH Started ---")
    start = time.time()
    
    # Check cache status
    print("Checking cache status...")
    cache_exists = check_cache_status(cache_dir)
    
    print(f"Extracting text from resume: {resume_path}")
    resume_text = extract_resume_text(resume_path)
    
    if not resume_text:
        logger.warning("Resume extraction failed.")
        return pd.DataFrame({'Message': ['Resume extraction failed']})

    print(f"Building or loading LSH index (Force rebuild: {force_rebuild})")
    lsh, minhashes, processed_df = build_or_load_lsh_index(
        jobs_df, 
        cache_dir=cache_dir, 
        num_perm=num_perm,
        force_rebuild=force_rebuild
    )
    
    print(f"Finding matches for resume...")
    results = match_resume_lsh(resume_text, lsh, minhashes, processed_df, top_n=top_n, num_perm=num_perm)

    total_time = time.time() - start
    print(f"\n--- Job Recommender completed in {total_time:.2f} seconds ---")
    
    return results

# --- Example Execution ---
if __name__ == "__main__":
    # Replace with your actual jobs DataFrame
    # For example:
    # jobs_df = pd.read_csv('jobs_data.csv')
    
    # Sample data if jobs_df is not defined
    try:
        if 'jobs_df' not in globals():
            print("Creating sample jobs dataframe for demonstration")
            jobs_df = pd.DataFrame({
                'Job Title': ['Data Scientist', 'Software Engineer', 'Product Manager'],
                'Company': ['Tech Co', 'Software Inc', 'Product Corp'],
                'Role': ['Analytics', 'Development', 'Management'],
                'skills': ['Python, SQL, ML', 'Java, C++, Python', 'Agile, Leadership'],
                'Job Description': ['Analyze data...', 'Build software...', 'Manage products...'],
                'location': ['New York', 'San Francisco', 'Chicago']
            })
    except Exception as e:
        print(f"Error creating sample data: {e}")
        
    resume_path = 'sampleresume.pdf'  # Replace with your resume path
    
    # Optional: Force rebuild cache if needed
    force_rebuild = False  # Set to True to force rebuild
    
    # Run the recommender
    top_matches = run_job_recommender_lsh(
        resume_path=resume_path,
        jobs_df=jobs_df,
        top_n=5,
        force_rebuild=force_rebuild,
        cache_dir="./lsh_cache",  # Specify cache directory explicitly
        num_perm=128  # Reduce to 64 for faster processing if needed
    )
    
    # Display results
    if 'Message' in top_matches.columns:
        print("\nResult:", top_matches['Message'].values[0])
    else:
        print("\nTop job matches:")
        print(top_matches[['Job Title', 'Company', 'Role', 'location', 'similarity']])



--- Job Recommender LSH Started ---
Checking cache status...

Cache Status:
  ✓ LSH Index: 703.77 MB (Last modified: 2025-04-23 20:35:23)
  ✓ MinHashes: 802.49 MB (Last modified: 2025-04-23 20:38:16)
  ✓ Jobs DataFrame: 413.39 MB (Last modified: 2025-04-23 20:38:36)

Extracting text from resume: sampleresume.pdf
Building or loading LSH index (Force rebuild: False)
CACHE HIT: Loading cached LSH index from /Users/likhithgunjal/Documents/newest project/lsh_cache
Cache loaded in 246.37 seconds
DataFrame structure matches cache. Using cached data.
Finding matches for resume...
Processing resume text...
Finding matches in LSH index...
Found 3547 potential matches. Calculating similarities...

--- Job Recommender completed in 246.46 seconds ---

Top job matches:
                    Job Title                   Company                  Role  \
399355   Social Media Manager  Community Health Systems  Social Media Analyst   
837255   Social Media Manager  Community Health Systems  Social Media A