In [2]:
# Importing necessary libraries/modules for the task
import requests
import pprint
import pandas as pd
import imaplib
import email

Pyarrow will become a required dependency of pandas in the next major release of pandas (pandas 3.0),
(to allow more performant data types, such as the Arrow string type, and better interoperability with other libraries)
but was not found to be installed on your system.
If this would cause problems for you,
please provide us feedback at https://github.com/pandas-dev/pandas/issues/54466
        
  import pandas as pd


# Scraping Jobs and Job Descriptions


In [3]:
import importlib
from scrapers.mycareersfuture import MyCareersFuture # Importing the MyCareersFuture class from the scrapers.mycareersfuture module



In [4]:
client = MyCareersFuture() # Creating an instance of the MyCareersFuture scraper class

In [5]:
data = client.collect(sleep_time=2)  # Calling the 'collect' method of the MyCareersFuture scraper instance to initiate the scraping process

Total pages 63


100%|██████████| 62/62 [02:32<00:00,  2.45s/it]

Extracted 6225 jobs





In [None]:

data

In [7]:
from tqdm import tqdm  # Importing tqdm for progress tracking
import time  # Importing the time module for time-related operations

count = 0  # Initializing a counter variable

# Iterating over each item in the 'data' variable with tqdm for progress tracking
for i in tqdm(data):
    id = i['uuid']  # Extracting the UUID of the job listing
    response = client.get_job_data(id)  # Getting job data for the specific UUID
    i['job_data'] = response  # Storing the job data in the 'job_data' field of the item
    count += 1  # Incrementing the counter variable

    # Checking if 30 iterations have been completed to implement a delay
    if count % 30 == 0:
        time.sleep(10)  # Waiting for 10 seconds to avoid overwhelming the server with requests


100%|██████████| 6225/6225 [41:09<00:00,  2.52it/s]  


In [None]:
client.get_job_data('aec44687225e1c1f74b918d617490ae3')

In [10]:
# Using pandas to normalize the 'job_data' field of the DataFrame and converting it to JSON format
pd.json_normalize(pd.DataFrame(data)['job_data'],max_level=0).to_json("mycareersfuture_with_descriptions.json",orient="records")

In [12]:
data = pd.read_json("mycareersfuture_with_descriptions.json",orient="records")

# Cleaning Scraped Data


In [14]:

def load_and_clean_data(file_path):
    """
    Load and clean the scraped data from a JSON file.

    Parameters:
    file_path (str): The file path to the JSON file containing the scraped data.

    Returns:
    pandas.DataFrame: A cleaned DataFrame containing the scraped data.
    """
    # Read the JSON file into a DataFrame
    df = pd.read_json(file_path, orient='records')

    # Extract salary information and create separate columns for minimum, maximum, and type
    if 'salary' in df.columns:
        df['salary_min'] = None
        df['salary_max'] = None
        df['salary_type'] = None
        for index, row in df.iterrows():
            salary_info = row['salary']
            if salary_info is not None:
                df.at[index, 'salary_min'] = salary_info.get('minimum')
                df.at[index, 'salary_max'] = salary_info.get('maximum')
                if 'type' in salary_info and salary_info['type'] is not None:
                    df.at[index, 'salary_type'] = salary_info['type'].get('salaryType')
        df.drop(columns=['salary'], inplace=True)

    # Normalize and expand company information into separate columns
    if 'postedCompany' in df.columns:
        company_info = pd.json_normalize(df['postedCompany'].dropna())
        company_info.columns = ['company_' + col for col in company_info.columns]
        df = pd.concat([df.drop(columns=['postedCompany']), company_info], axis=1)

    # Normalize lists of skills, categories, employment types, and position levels
    list_columns = ['skills', 'categories', 'employmentTypes', 'positionLevels']
    for column in list_columns:
        df[column] = df[column].apply(lambda x: [item.get(column[:-1], '') for item in x] if isinstance(x, list) else [])

    return df

# Cleaning Job Descriptions in Cleaned Data

In [26]:
from bs4 import BeautifulSoup

def clean_job_descriptions(df_uncleaned, df_clean):
    """
    Clean job descriptions and requirements from uncleaned DataFrame and update the cleaned DataFrame.

    Parameters:
    df_uncleaned (pandas.DataFrame): DataFrame containing uncleaned job data.
    df_clean (pandas.DataFrame): DataFrame containing cleaned job data.

    Returns:
    pandas.DataFrame: DataFrame with cleaned job descriptions and requirements.
    """
    # Define keywords for job descriptions and job requirements
    jd_keywords = ['things you need to do', 'job description', 'job duties', 'responsibilities', 'job description', 'duties', 'key responsibilities', 'job details']
    jr_keywords = ['things you need to have', 'qualification', 'requirements', 'job details', 'qualifications']

    jobpostid_list = {}

    # Iterate over each row in the uncleaned DataFrame
    for index, row in df_uncleaned.iterrows():
        soup = BeautifulSoup(row['description'], 'html.parser')
        job_description_items, job_requirements_items = [], []

        # Adjusted lambda function to check for None before calling .lower()
        def keyword_in_text(tag, keywords):
            if tag and tag.string:
                text = tag.string
                return any(keyword in text.lower() for keyword in keywords)
            return False

        # Process Job Descriptions
        job_description_strong = soup.find('strong', string=lambda t: keyword_in_text(t, jd_keywords))
        if job_description_strong:
            ul_tag = job_description_strong.find_next('ul')
            if ul_tag:
                job_description_items = [li.text.strip() for li in ul_tag.find_all('li')]

        # Process Job Requirements
        job_requirements_strong = soup.find('strong', string=lambda t: keyword_in_text(t, jr_keywords))
        if job_requirements_strong:
            ul_tag = job_requirements_strong.find_next('ul')
            if ul_tag:
                job_requirements_items = [li.text.strip() for li in ul_tag.find_all('li')]

        if job_description_items or job_requirements_items:
            jobpostid_list[row['metadata']['jobPostId']] = {
                'job_descriptions': job_description_items,
                'job_requirements': job_requirements_items
            }

    # Update df_clean with the structured job descriptions and requirements
    clean_job_descriptions = []
    for index, row in df_clean.iterrows():
        job_info = jobpostid_list.get(row['metadata']['jobPostId'])
        if job_info:
            clean_description = "Job Description:\n" + '\n'.join(job_info['job_descriptions']) + "\n\nJob Requirements:\n" + '\n'.join(job_info['job_requirements'])
            clean_job_descriptions.append(clean_description)
        else:
            clean_job_descriptions.append(None)

    df_clean['Job Description'] = clean_job_descriptions
    return df_clean


In [65]:
def main():
    file_path = 'mycareersfuture_with_descriptions.json'  
    df_clean = load_and_clean_data(file_path)  # Load and initially clean your data
    df_uncleaned = pd.read_json(file_path, orient='records') 
    df_final = clean_job_descriptions(df_uncleaned, df_clean)  # Apply job descriptions cleaning
    subset_df_clean = df_final[df_final['Job Description'].notna()]
    subset_df_clean
    # Save the cleaned DataFrame to a new CSV file
    subset_df_clean.to_csv('cleaned_job_data1.csv', index=False)
    print("Cleaned data saved to cleaned_job_data.csv")
    
if __name__ == "__main__":
    main()

Cleaned data saved to cleaned_job_data.csv


In [28]:
job_data = pd.read_csv('cleaned_job_data1.csv')  # Reading the cleaned job data from a CSV file into a DataFrame
job_data

Unnamed: 0,uuid,sourceCode,title,description,minimumYearsExperience,shiftPattern,schemes,skills,otherRequirements,ssocCode,...,company_lastSyncDate,company_ssicCode2020,company_badges,company_logoFileName,company_logoUploadPath,company__links.self.href,company__links.jobs.href,company__links.addresses.href,company__links.schemes.href,Job Description
0,8cebe95d75634f77e04130d74bbeee98,Employer Portal,Enrolled Nurse,<h2><strong>Duties and Responsibilities</stron...,1,,[],"['Nursing Homes', 'BCLS', 'Work Well Independe...",,32200,...,2024-03-06T01:21:20.000Z,87010,[],107b61cad03e8efab8b6c56f4c3242cc/lentor-reside...,https://static.mycareersfuture.gov.sg/images/c...,https://api.mycareersfuture.gov.sg/v2/companie...,https://api.mycareersfuture.gov.sg/v2/companie...,https://api.mycareersfuture.gov.sg/v2/companie...,https://api.mycareersfuture.gov.sg/v2/companie...,Job Description:\nProvide quality nursing care...
1,589ec3cb56c1979d9d1c158490e66870,Employer Portal,Sales Development Representative,<p><u><strong>Job Responsibilities</strong></u...,0,,[],"['CRM', 'Lead Generation', 'Microsoft Excel', ...",,24333,...,2024-03-06T01:28:12.000Z,70201,[],60b9f613063dd0513340e97dcc9cc9f8/stone-forest-...,https://static.mycareersfuture.gov.sg/images/c...,https://api.mycareersfuture.gov.sg/v2/companie...,https://api.mycareersfuture.gov.sg/v2/companie...,https://api.mycareersfuture.gov.sg/v2/companie...,https://api.mycareersfuture.gov.sg/v2/companie...,Job Description:\nIdentify new business opport...
2,0000f0be7e4c9d857f4fcec17b3fc6b0,Employer Portal,Assistant Coach [ Special Education / Siglap /...,<p><strong>Assistant Coach (Special Education)...,0,,[],"['Leadership', 'Classroom', 'Teaching', 'Class...",,23622,...,2024-03-06T00:13:15.000Z,78104,[],d162667fbe1365aa541016a407de7ae4/supreme-hr-ad...,https://static.mycareersfuture.gov.sg/images/c...,https://api.mycareersfuture.gov.sg/v2/companie...,https://api.mycareersfuture.gov.sg/v2/companie...,https://api.mycareersfuture.gov.sg/v2/companie...,https://api.mycareersfuture.gov.sg/v2/companie...,Job Description:\nAssist and support Coaches i...
3,874e52b4f633cb30687a5985b272b1ed,Employer Portal,Production Operator #65770,<p><strong>Job Description</strong></p>\n<ul>\...,0,,[],"['Factory', 'Soldering', 'Housekeeping', 'Work...",,31164,...,2024-03-06T03:05:25.000Z,78104,[],737b2bbd5f7b6aa4a3e393db45fe9479/ANRADUS PTE. ...,https://static.mycareersfuture.gov.sg/images/c...,https://api.mycareersfuture.gov.sg/v2/companie...,https://api.mycareersfuture.gov.sg/v2/companie...,https://api.mycareersfuture.gov.sg/v2/companie...,https://api.mycareersfuture.gov.sg/v2/companie...,Job Description:\nIndustry/ Organization Type:...
4,b3260c8354d8cfa26675db2eea497cc5,Employer Portal,Retail Assistant #65750,<p><strong>Job Description</strong></p>\n<ul>\...,0,,[],"['Store Operations', 'Customer Service Oriente...",,52202,...,2024-03-06T03:05:25.000Z,78104,[],737b2bbd5f7b6aa4a3e393db45fe9479/ANRADUS PTE. ...,https://static.mycareersfuture.gov.sg/images/c...,https://api.mycareersfuture.gov.sg/v2/companie...,https://api.mycareersfuture.gov.sg/v2/companie...,https://api.mycareersfuture.gov.sg/v2/companie...,https://api.mycareersfuture.gov.sg/v2/companie...,Job Description:\nIndustry/ Organization Type:...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2843,2eef1f9ecf2286b8d0a25143054584c3,Employer Portal,Retail Sales Assistant (Fashion Accessories Wh...,<p>We are a fashion jewellery brand but also r...,1,,[],"['Sales', 'Visual Merchandising', 'Housekeepin...",,52202,...,2024-03-06T07:46:53.000Z,47109,[],bb87069f2d459ae2633c3d84141d66a1/yesseny-tradi...,https://static.mycareersfuture.gov.sg/images/c...,https://api.mycareersfuture.gov.sg/v2/companie...,https://api.mycareersfuture.gov.sg/v2/companie...,https://api.mycareersfuture.gov.sg/v2/companie...,https://api.mycareersfuture.gov.sg/v2/companie...,Job Description:\nProactively provide professi...
2844,2c25fc6c2986af6f9cd2408a72884b6e,Employer Portal,English Language Teacher [L1/L2],<ul>\n <li><strong>Aljunied</strong></li>\n ...,2,,[],"['Coaching', 'Childcare', 'Academic English', ...",,36100,...,2024-03-06T01:05:53.000Z,78104,[],b2bf56fc25ba3a759e198a68aa0d4ce3/allied-search...,https://static.mycareersfuture.gov.sg/images/c...,https://api.mycareersfuture.gov.sg/v2/companie...,https://api.mycareersfuture.gov.sg/v2/companie...,https://api.mycareersfuture.gov.sg/v2/companie...,https://api.mycareersfuture.gov.sg/v2/companie...,Job Description:\nMaintain and upkeep a conduc...
2845,d2d7dd45575fe206f767f79e4b1591e1,Employer Portal,Superstar Front Desk (Location: Ang Mo Kio),<p>We at De Pacific Dental Group are in search...,1,,[],"['Dental Sales', 'Search', 'Microsoft Works', ...",,42243,...,2024-02-26T02:41:29.000Z,86204,[],,,https://api.mycareersfuture.gov.sg/v2/companie...,https://api.mycareersfuture.gov.sg/v2/companie...,https://api.mycareersfuture.gov.sg/v2/companie...,https://api.mycareersfuture.gov.sg/v2/companie...,Job Description:\n\n\nJob Requirements:\nFull ...
2846,3973d6ab277c9cc3b9257c63c3720173,Employer Portal,Individual Support Specialist (Special Needs) ...,<p><strong>Job Description</strong></p>\n<ul>\...,0,,[],"['Psychology', 'Discipline', 'Classroom', 'Cla...",,23622,...,2024-03-06T03:05:25.000Z,78104,[],737b2bbd5f7b6aa4a3e393db45fe9479/ANRADUS PTE. ...,https://static.mycareersfuture.gov.sg/images/c...,https://api.mycareersfuture.gov.sg/v2/companie...,https://api.mycareersfuture.gov.sg/v2/companie...,https://api.mycareersfuture.gov.sg/v2/companie...,https://api.mycareersfuture.gov.sg/v2/companie...,Job Description:\nIndustry/ Organization Type:...


In [30]:
print(job_data.iloc[1]['Job Description'])

Job Description:
Identify new business opportunities through outbound lead generation, research and prospecting
Manage and respond to inbound requests from prospective clients via cold calling and email
Secure meetings for the GWI sales organization
Analyse customer needs and identify potential solutions
Present and explain GWI product offerings
Prospect and make outbound calls and emails to potential clients

Job Requirements:
Proven experience of relevant work or internship
Excellent communication and presentation skills
Effective time management and organization skills
Good MS Office Skills


# Embedding Job Descriptions into ChromaDB

In [None]:
import os
import pandas as pd
import chromadb
from chromadb.utils import embedding_functions
from tqdm import tqdm


def main():
    """
    Main function to embed job descriptions into ChromaDB.
    """
    documents = []  # List to store job descriptions
    metadatas = []  # List to store metadata associated with each job description
    ids = []  # List to store unique IDs for each job description

    # Read cleaned job descriptions and metadata from My Career Future
    df = pd.read_json("https://talenttrove.s3.ap-southeast-1.amazonaws.com/mycareersfuture_with_descriptions_cleaned.json")

    # Read additional job descriptions from Glassdoor
    df2 = pd.read_csv("https://talenttrove.s3.ap-southeast-1.amazonaws.com/ALL_Glass.csv")

    # Process job descriptions and metadata from My Career Future
    for index, row in df.iterrows():
        company_name = row["company_name"] if row["company_name"] else ""
        company_logo = row["company_logo"] if row["company_logo"] else ""
        title = row["title"] if row["title"] else ""
        job_post_id = row["jobPostId"] if row["jobPostId"] else ""
        apply_url = row["apply_url"] if row["apply_url"] else ""
        date = row["updatedAt"] if row["updatedAt"] else ""
        jobtype = str(row["positionLevels"]) if row["positionLevels"] else ""

        documents.append(row["description"])  # Append job description to documents list
        metadatas.append({  # Append metadata to metadatas list
            "company_name": company_name,
            "company_logo": company_logo,
            "job_post_id": job_post_id,
            "title": title,
            "apply_url": apply_url,
            "date": date,
            "jobtype": jobtype,
        })
        ids.append(str(index))  # Append unique ID to ids list

    # Calculate max_id for later use in generating unique IDs
    max_id = max(ids)

    # Process additional job descriptions from Glassdoor
    for index, row in df2.iterrows():
        company_name = row["employerNameFromSearch"] if row["employerNameFromSearch"] else ""
        company_logo = row["employer.squareLogoUrl"] if row["employer.squareLogoUrl"] else ""
        title = row["jobTitleText"] if row["jobTitleText"] else ""
        job_post_id = row["jobReqId"] if row["jobReqId"] else ""
        apply_url = row["jobLink"] if row["jobLink"] else ""
        date = row["discoverDate"] if row["discoverDate"] else ""
        jobtype = str(row["jobType"]) if row["jobType"] else ""

        documents.append(row["cleaned_description"])  # Append job description to documents list
        metadatas.append({  # Append metadata to metadatas list
            "company_name": company_name,
            "company_logo": company_logo,
            "job_post_id": job_post_id,
            "title": title,
            "apply_url": apply_url,
            "date": date,
            "jobtype": jobtype,
        })
        ids.append(str(max_id) + str(index))  # Append unique ID to ids list

    # Create directory for storing data if it doesn't exist
    path = os.path.join(os.getcwd(), "data")
    if not os.path.exists(path):
        os.mkdir(path)

    # Initialize ChromaDB client and collection
    chroma_client = chromadb.PersistentClient(path=os.path.join(os.getcwd(), "data/jd_vectordb"))
    collection = chroma_client.get_or_create_collection(
        name="mycareersfuture_jd",
        embedding_function=embedding_functions.DefaultEmbeddingFunction(),
    )

    batch_size = 64  # Adjust batch size based on the maximum allowed
    # Iterate over batches of documents and add them to the collection
    for i in tqdm(range(0, len(documents), batch_size)):
        batch_documents = documents[i: i + batch_size]
        batch_metadatas = metadatas[i: i + batch_size]
        batch_ids = ids[i: i + batch_size]

        # Ensure order of documents, metadatas, and ids within each batch
        assert len(batch_documents) == len(batch_metadatas) == len(batch_ids)

        # Add batch to collection
        collection.add(
            documents=batch_documents,
            metadatas=batch_metadatas,
            ids=batch_ids
        )


if __name__ == "__main__":
    main()

# Job Title and Company Name Extractors using Gemini


In [48]:
from transformers import AutoModelForSeq2SeqLM, AutoTokenizer
import torch
from dotenv import load_dotenv
import google.generativeai as genai

load_dotenv()
class JobTitleCompanyNameExtractor:
    """
    Extract job titles and company names from email text using a pre-trained T5 model.
    """
    def __init__(self, model="google/flan-t5-base"):
        self.model = AutoModelForSeq2SeqLM.from_pretrained("google/flan-t5-base")
        self.tokenizer = AutoTokenizer.from_pretrained("google/flan-t5-base")

    def get_jobtitle(self, email_text: str):
        """
        Extracts the job title from the given email text.
        """
        question = ("What is the job title? If not available, output None",)
        input_text = f"question: {question} context: {email_text}"
        inputs = self.tokenizer(input_text, return_tensors="pt")

        with torch.no_grad():
            outputs = self.model.generate(**inputs)
        answer = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
        return answer

    def get_company(self, email_text: str):
        """
        Extracts the company name from the given email text.
        """
        question = ("What is the name of the company? If not available, output None",)
        input_text = f"question: {question} context: {email_text}"
        inputs = self.tokenizer(input_text, return_tensors="pt")

        with torch.no_grad():
            outputs = self.model.generate(**inputs)
        answer = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
        return answer

class GeminiJobTitleCompanyNameExtractor:
    """
    Extract job titles and company names from email text using the Gemini API.
    """
    def __init__(self, model="google/flan-t5-base"):
        self.apikey = os.getenv("GOOGLE_API_KEY")
        genai.configure(api_key=self.apikey)
        self.model = genai.GenerativeModel("gemini-pro")
        print("Calling Gemini API")

    def get_jobtitle(self, email_text: str):
        """
        Extracts the job title from the given email text using the Gemini API.
        """
        question = ("What is the job title? If not available, output None",)
        input_text = f"question: {question} context: {email_text}"
        response = self.model.generate_content(input_text)
        return response.text

    def get_company(self, email_text: str):
        """
        Extracts the company name from the given email text using the Gemini API.
        """
        question = ("What is the name of the company? If not available, output None",)
        input_text = f"question: {question} context: {email_text}"
        response = self.model.generate_content(input_text)
        return response.text

if __name__ == "__main__":
    # Example email text
    email_text = "Subject: HRT Application Status - Pratham Agarwala. Body: Hi Pratham, We want to thank you very much for your interest in Hudson River Trading and the Algorithm Developer role. We have reviewed your candidacy for this position, along with other available opportunities, and have decided not to move forward with your application at this time. We ask that you refrain from applying again during this campus recruiting cycle. The new recruiting season will start in July/August 2024. We would encourage you to stay in touch with us, because as HRT grows, it's possible our hiring needs will change. We're always eager to network with smart candidates with an interest in our industry. Thanks again for your interest and time, and good luck with your job search. Regards, Hudson River Trading"
    
    # Instantiate and utilize the JobTitleCompanyNameExtractor
    job_title_extractor = JobTitleCompanyNameExtractor()
    print(job_title_extractor.get_jobtitle(email_text))
    print(job_title_extractor.get_company(email_text))
    
    # Instantiate and utilize the GeminiJobTitleCompanyNameExtractor
    gemini_job_title_extractor = GeminiJobTitleCompanyNameExtractor()
    print(gemini_job_title_extractor.get_jobtitle(email_text))
    print(gemini_job_title_extractor.get_company(email_text))

HRT Application Status - Pratham Agarwala
Hi Pratham, We want to thank you very much for your interest in Hudson River Trading
Calling Gemini API
Algorithm Developer
Hudson River Trading


# Gmail Email Retrieval

In [49]:
import imaplib
import email
from datetime import datetime, timedelta
from email.header import decode_header
from tqdm import tqdm
import json

class Gmail:
    """
    A class to interact with Gmail via IMAP and retrieve email messages.
    """
    def __init__(self, username, password):
        """
        Initialize the Gmail object with the provided username and password.
        """
        self.username = username
        self.password = password

    def authenticate(self):
        """
        Authenticate with the Gmail server.
        """
        # Authenticate with Gmail
        gmail_host = "imap.gmail.com"
        mail = imaplib.IMAP4_SSL(gmail_host)
        mail.login(self.username, self.password)
        self.mail = mail

    def search_mail(self, category="INBOX", search_criteria="ALL"):
        """
        Search for emails in the specified category using the given search criteria.
        """
        self.mail.select(category)
        _, email_ids = self.mail.search(None, search_criteria)
        email_ids = email_ids[0].split()
        return email_ids

    def get_email_by_date(
        self,
        from_date,
        category="INBOX",
        primary=True,
        to_date=(datetime.now() + timedelta(days=1)).strftime("%d-%b-%Y"),
    ):
        """
        Get emails from a specific date range in the specified category.
        """
        primary = ' X-GM-RAW "Category:Primary"' if primary else ""
        search_criteria = f'(SINCE "{from_date}" BEFORE "{to_date}"){primary}'
        email_ids = self.search_mail(category=category, search_criteria=search_criteria)
        return email_ids

    @staticmethod
    def get_body(msg):
        """
        Extract the body of an email message.
        """
        body = ""
        if msg.is_multipart():
            for part in msg.walk():
                content_disposition = str(part.get("Content-Disposition"))
                if (
                    part.get_content_type() == "text/plain"
                    and "attachment" not in content_disposition
                ):
                    body = part.get_payload(decode=True).decode(
                        part.get_content_charset() or "utf-8"
                    )
                    if body is None:
                        continue
        else:
            try:
                body = msg.get_payload(decode=True).decode(
                    msg.get_content_charset() or "utf-8"
                )
            except UnicodeDecodeError:
                print("UnicodeDecodeError")
                return body
        return body

    def parse_emails(self, ids: list):
        """
        Parse email messages and extract relevant information.
        """
        data = []
        for email_id in tqdm(ids):
            _, email_data = self.mail.fetch(email_id, "(RFC822)")
            raw_email = email_data[0][1]

            # Parse the email
            msg = email.message_from_bytes(raw_email)
            # Extract email information (e.g., subject and sender)
            subject, _ = decode_header(msg["Subject"])[0]
            sender, _ = decode_header(msg["From"])[0]
            data.append(
                {
                    "id": email_id.decode("utf-8"),
                    "subject": subject,
                    "sender": sender,
                    "date": msg["Date"],
                    "body": self.get_body(msg),
                }
            )

        return data

    def close(self):
        """
        Close the connection to the Gmail server.
        """
        self.mail.close()
        self.mail.logout()

# Email Classification and Prediction Script






In [54]:
import importlib
import sys
sys.path.append("/workspaces/IcharusAI")
from jobTracker.gmail import Gmail
from datetime import datetime
from setfit import SetFitModel
from bs4 import BeautifulSoup
import os
import pandas as pd
from jobTracker.flan import JobTitleCompanyNameExtractor, GeminiJobTitleCompanyNameExtractor
import gdown

# Define the URL for downloading the email classifier model
email_classifier_model = "https://drive.google.com/drive/folders/1Jn_cjP1OjO5Ttj9-xs63o9cTPNhKwHOv?usp=drive_link"
# Get the current directory
file_path = "jobTracker"
# Create a directory for storing the model if it doesn't exist
os.makedirs(file_path + "/model", exist_ok=True)
# Define the path for the model
model_path = file_path + "/model/email_classifer"


def download_model():
    """
    Download the email classifier model if it doesn't exist.
    """
    if os.path.exists(model_path):
        print("Model already exists")
        return
    print("Downloading model")

    gdown.download_folder(
        email_classifier_model,
        quiet=True,
        use_cookies=False,
        output=model_path,
    )
    return


class JobClassifier:
    def __init__(self) -> None:
        """
        Initialize the JobClassifier class.
        """
        download_model()
        # Load the email classifier model
        self.model = SetFitModel.from_pretrained(model_path)
        # Initialize the JobTitleCompanyNameExtractor
        self.extractor = GeminiJobTitleCompanyNameExtractor()

    def infer(self, sentence):
        """
        Perform inference on a sentence using the email classifier model.
        """
        predtext = [sentence]
        predicted_class = self.model(predtext)
        return str(predicted_class.numpy()[0])

    def classify(self, email, preprocess=True):
        """
        Classify an email using the email classifier model.
        """
        if preprocess:
            _, email = self.preprocess_email(email)

        predicted_class = self.infer(email)
        return _, predicted_class

    def preprocess_email(self, email):
        """
        Preprocess an email before classification.
        """
        try:
            subject = (email["subject"]).decode("utf-8")
        except:
            subject = email["subject"]
        html = str(BeautifulSoup(email["body"], "html.parser").text)
        string_list = [s.strip() for s in str(html).split()]
        final_string = " ".join(string_list)
        final_string_without_subject = " ".join(string_list)
        final_string = "Subject: " + str(subject) + ". Body: " + final_string
        return final_string, final_string_without_subject


if __name__ == "__main__":
    # Initialize JobClassifier
    jc = JobClassifier()
    # Initialize Gmail client and please input your own gmail and password
    gmail = Gmail(
        username="agarwalpratham2001@gmail.com", password="write your own password"
    )
    # Authenticate with Gmail
    gmail.authenticate()
    # Specify the date to retrieve emails from
    specified_date = datetime(2024, 2, 1)
    formatted_date = specified_date.strftime("%d-%b-%Y")
    # Get email IDs from the specified date
    ids = gmail.get_email_by_date(from_date=formatted_date)
    # Parse email content
    email_dict = gmail.parse_emails(ids[:20])
    preds = []
    # Classify each email
    for i in email_dict:
        out = jc.classify(i)
        preds.append(out)
    # Save predictions to CSV
    pd.Series(preds).to_csv("preds.csv")

Downloading model


https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations


Calling Gemini API


100%|██████████| 20/20 [00:05<00:00,  3.60it/s]


# Job Stage Classifier

In [55]:
from jobTracker.gmail import Gmail
from datetime import datetime
from setfit import SetFitModel
import os
import pandas as pd
import gdown

# Define the URL for the stage classifier model
stage_classifier_model = "https://drive.google.com/drive/folders/1gq-9kA_MIa6KsULSjWC9lHyQXXgWFekT?usp=drive_link"

# Get the directory path of the current file
file_path = "jobTracker"

# Create a directory to store the model if it doesn't exist
os.makedirs(file_path + "/model", exist_ok=True)

# Define the path for storing the model
model_path = file_path + "/model/stage_classifer"

def download_model():
    """
    Download the stage classifier model if it doesn't exist.
    """
    if os.path.exists(model_path):
        print("Model already exists")
        return
    print("Downloading model")

    gdown.download_folder(
        stage_classifier_model,
        quiet=True,
        use_cookies=False,
        output=model_path,
    )
    return

class JobStageClassifier:
    def __init__(self) -> None:
        """
        Initialize the JobStageClassifier object.
        """
        download_model()
        self.model = SetFitModel.from_pretrained(model_path)

    def infer(self, sentence):
        """
        Infer the class label of the input sentence.
        """
        predtext = [sentence]
        predicted_class = self.model(predtext)
        return str(predicted_class.numpy()[0])

    def classify(self, email):
        """
        Classify the email using the trained model.
        """
        predicted_class = self.infer(email)
        return predicted_class


In [61]:
!pip install python-docx

huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


Collecting python-docx
  Downloading python_docx-1.1.0-py3-none-any.whl.metadata (2.0 kB)
Downloading python_docx-1.1.0-py3-none-any.whl (239 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m239.6/239.6 kB[0m [31m7.8 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: python-docx
Successfully installed python-docx-1.1.0


# Job Recommendation 

In [None]:
import os
from docx import Document
from openai import OpenAI
import chromadb
import logging
from chromadb.utils import embedding_functions

class Recommendation:
    """
    Job recommendation engine and UI to display them.
    """

    def __init__(self, resume, openai_key=None, jobtitle=None):
        """
        Initializes the Recommendation object.

        Parameters:
            resume (str): Path or I/O object of the resume.
            openai_key (str): API key for OpenAI API.
            jobtitle (str): Title of the job.
        """
        self.resume = resume
        self.jobtitle = jobtitle
        self.openai_key = openai_key
        self.client = OpenAI(api_key=self.openai_key)
        self.file_path = os.path.join(os.getcwd(), "data/jd_vectordb")
        self.chroma_client = chromadb.PersistentClient(path=self.file_path)
        self.collection = self.chroma_client.get_or_create_collection(
            name="mycareersfuture_jd",
            embedding_function=embedding_functions.DefaultEmbeddingFunction(),
        )

    def read_word_document(self):
        """
        Reads the content of the resume Word document.

        Returns:
            str: Text content of the resume.
        """
        doc = Document(self.resume)
        text = ""
        for paragraph in doc.paragraphs:
            text += paragraph.text + "\n"
        return text

    def get_generated_jd(self):
        """
        Generates a job description based on the provided resume.

        Returns:
            str: Generated job description.
        """
        text = self.read_word_document()
        logging.info("Document Read")
        try:
            response = self.client.chat.completions.create(
                model="gpt-3.5-turbo",
                messages=[
                    {
                        "role": "system",
                        "content": "Based on the given resume above, create a suitable job posting for this resume. The job posting must include the job description, job responsibilities, and requirements such as qualifications and skills. Do not include the company name and location in this job posting.",
                    },
                    {
                        "role": "user",
                        "content": f"Resume: {text}\n\n---\n\nJob Description:",
                    },
                ],
                temperature=0.3,
                top_p=1,
                frequency_penalty=0,
                presence_penalty=0,
            )
            logging.info("CV Generated")
            return response.choices[0].message.content.strip()
        except Exception as e:
            logging.warning(e)
            print(e)
            return ""

    def search_jd(self, jd, k=20):
        """
        Searches for job descriptions similar to the provided text.

        Parameters:
            jd (str): Job description text to search for.
            k (int): Number of results to return.

        Returns:
            dict: Query results containing documents, distances, and metadatas.
        """
        results = self.collection.query(
            query_texts=[jd],
            n_results=k,
            include=["documents", "distances", "metadatas"],
        )
        return results


# Job Tracker

In [63]:
import requests
import pandas as pd
import os
from .gmail import Gmail
from .job_classifier import JobClassifier
from .job_stage import JobStageClassifier

class TrackFile:
    def __init__(self):
        # Initialize the TrackFile class with the file path
        self.file_path = os.path.join(
            os.getcwd(),
            "track.csv",
        )
        self.initiate_file()

    def initiate_file(self):
        # Check if the track file exists, if not, create it with required columns
        if os.path.exists(self.file_path):
            return
        else:
            df = pd.DataFrame(columns=["date"])
            df.to_csv(self.file_path, index=False)

    def read_file(self):
        # Read the track file if it exists, otherwise return an empty DataFrame
        if os.path.exists(self.file_path):
            return pd.read_csv(self.file_path)
        else:
            return pd.DataFrame()

    def update_file(self, df: pd.DataFrame):
        # Update the track file with new data
        track_data = self.read_file()
        track_data = pd.concat([df, track_data], axis=0)
        track_data.sort_values(by="date", inplace=True, ascending=False)
        track_data.to_csv(self.file_path, index=False)
        return track_data

    def delete_file(self, index):
        # Delete a record from the track file based on the index
        track_data = self.read_file()
        track_data.drop(index, inplace=True)
        track_data.to_csv(self.file_path, index=False)
        return track_data

    def add_record(
        self,
        title,
        company,
        stage,
        date=str(pd.to_datetime("today").date()),
        rejected=0,
        logo="https://storage.googleapis.com/simplify-imgs/company/default/logo.png",
        location="SG",
        text=" ",
    ):
        # Add a new record to the track file
        track_data = self.read_file()
        new_record = pd.DataFrame(
            {
                "text": [text],
                "job": 1,
                "title": [title],
                "company": [company],
                "stage": [stage],
                "date": [date],
                "rejected": [rejected],
                "logo": [logo],
                "location": [location],
            }
        )
        track_data = pd.concat([new_record, track_data], axis=0)
        track_data.to_csv(self.file_path, index=False)
        return track_data

    def get_last_update_date(self):
        # Get the last update date from the track file
        if os.path.exists(self.file_path):
            if not pd.isna(pd.read_csv(self.file_path)["date"].max()):
                date = (
                    pd.to_datetime(pd.read_csv(self.file_path)["date"].max())
                    + pd.DateOffset(days=1)
                ).date()
                formatted_date = date.strftime("%d-%b-%Y")
                return formatted_date
            else:
                date = (pd.to_datetime("today") - pd.DateOffset(days=1)).date()
                formatted_date = date.strftime("%d-%b-%Y")
                return formatted_date
        else:
            # If the file doesn't exist, return a default date
            date = (pd.to_datetime("today") - pd.DateOffset(days=1)).date()
            formatted_date = date.strftime("%d-%b-%Y")
            return formatted_date

def get_logo_trustpilot(company_name):
    # Function to get the logo of a company from Trustpilot
    url = "https://www.trustpilot.com/api/consumersitesearch-api/businessunits/search"
    params = {
        "country": "US",
        "page": 1,
        "pageSize": 1,
        "query": company_name,
    }
    try:
        response = requests.get(
            url, params=params, headers={"user-agent": "Mozilla/5.0"}
        )
        if pd.DataFrame(response.json().get("businessUnits", [])).shape[0] == 1:
            temp = pd.DataFrame(response.json().get("businessUnits", []))
            if pd.isna(temp["logoUrl"].iloc[0]):
                print("No Logo Found")
                return "https://storage.googleapis.com/simplify-imgs/company/default/logo.png"
            else:
                return f'https://consumersiteimages.trustpilot.net/business-units/{temp["businessUnitId"].iloc[0]}-198x149-1x.jpg'
    except Exception as e:
        pass
    return "https://storage.googleapis.com/simplify-imgs/company/default/logo.png"

def update_track_data(gmail_username, gmail_api_key):
    # Function to update track data based on new emails
    track = TrackFile()
    latest_date = track.get_last_update_date()
    gmail = Gmail(username=gmail_username, password=gmail_api_key)
    gmail.authenticate()
    ids = gmail.get_email_by_date(from_date=latest_date)
    print("Fetching emails")
    email_dict = gmail.parse_emails(ids)
    print("Identifying Job Emails")
    classifier = JobClassifier()
    preds = []
    for email in email_dict:
        out = classifier.classify(email)
        preds.append(out)
    dates = [pd.to_datetime(i["date"]).strftime("%Y-%m-%d") for i in email_dict]
    jobs = pd.DataFrame(preds, columns=["text", "job"])
    jobs["date"] = dates
    jobs = jobs[jobs["job"] != "0"]
    jobs.reset_index(inplace=True, drop=True)
    jobs["title"] = None
    jobs["company"] = None
    jobs["rejected"] = 0
    jobs["logo"] = (
        "https://storage.googleapis.com/simplify-imgs/company/default/logo.png"  # deafult placeholder logo
    )
    jobs["location"] = "Singapore"
    print("Identifying Company and Job title")
    for index, i in enumerate(jobs["text"]):
        jobs.loc[index, "company"] = classifier.extractor.get_company(i)
        jobs.loc[index, "title"] = classifier.extractor.get_jobtitle(i)
        jobs.loc[index, "logo"] = get_logo_trustpilot(jobs.loc[index, "company"])
    if jobs.shape[0] == 0:
        track_data = track.read_file()
        track_data["title"] = track_data["title"].fillna("")
        track_data["company"] = track_data["company"].fillna("")
        return track_data.to_dict(orient="records")
    print("Stage Classfiying")
    stage_classifier = JobStageClassifier()
    stages = []
    for index, i in enumerate(jobs["text"]):
        out = stage_classifier.classify(i)
        if int(out) == 4:  # rejected
            jobs.loc[index, "rejected"] = 1
        stages.append(int(out))
    jobs["stage"] = stages
    print("Updating Track Data")
    track_data = track.update_file(jobs)
    track_data["title"] = track_data["title"].fillna("")
    track_data["company"] = track_data["company"].fillna("")
    return track_data.to_dict(orient="records")

def get_track_data():
    # Function to get track data
    track = TrackFile().read_file()
    track["title"] = track["title"].fillna("")
    track["company"] = track["company"].fillna("")
    return track.to_dict(orient="records")

def delete_track_data(index):
    # Function to delete track data based on index
    track = TrackFile()
    track_data = track.delete_file(index)
    track_data["title"] = track_data["title"].fillna("")
    track_data["company"] = track_data["company"].fillna("")
    return track_data.to_dict(orient="records")


# Python Function for Generating Explanations using LLMS

In [None]:
from openai import OpenAI
import boto3
import logging
import os
import json
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()

def explain_openai_gpt(jd_text, res, rec=True, ai_service='llama', api_key=None):
    """
    Generate explanations using OpenAI's GPT-3.5 or LLAMA model.

    Args:
        jd_text (str): The job description text.
        res (str): The resume text.
        rec (bool, optional): Indicates whether the job was recommended or not. Defaults to True.
        ai_service (str, optional): Specifies the AI service to use ('openai' or 'llama'). Defaults to 'llama'.
        api_key (str, optional): OpenAI API key. Required if ai_service is 'openai'. Defaults to None.

    Returns:
        str: The generated explanation.

    Raises:
        Exception: If an error occurs during model invocation.

    Note:
        - If ai_service is 'openai', the function uses OpenAI's GPT-3.5 to generate the explanation.
        - If ai_service is 'llama', the function uses a custom LLAMA model deployed on AWS Bedrock Runtime.
    """
    # Define the content based on whether the job description was recommended or not
    if rec:
        content =  "You are an Explainable Job Recommendation system. The resume submitted by the user is:\n {res}\n Please give reasons why the user was recommended the role with the following job description and job requirements:{jd}\n Do not use more than 3 lines. Use passive voice for the response".format(res=res,jd=jd_text)
    else:
        content = "The resume submitted by the user is:\n{res}\nAs a job recommendation system, give reasons why the user should not apply for the role with the following job description and job requirements:{jd}".format(res=res, jd=jd_text)

    try:
        if ai_service == 'openai':
            # Use OpenAI's GPT-3.5 to generate explanation
            client = OpenAI(api_key=api_key)
            response = client.chat.completions.create(
                model="gpt-3.5-turbo",
                messages=[
                    {
                        "role": "user",
                        "content": content,
                    }
                ],
                temperature=0.3,
                top_p=1,
                frequency_penalty=0,
                presence_penalty=0,
            )
            result = response.choices[0].message.content.strip()
        elif ai_service == 'llama':
            # Use LLAMA model deployed on AWS Bedrock Runtime to generate explanation
            prompt = "You are an Explainable Job Recommendation system. The resume submitted by the user is:\n {res}\n Please give reasons why the user was recommended the role with the following job description and job requirements:{jd}\n Do not use more than 3 lines. Use passive voice for the response".format(res=res,jd=jd_text)

            body = {
                "prompt": prompt,
                "temperature": 0.3,
                "top_p": 1,
            }
            # Invoke LLAMA model
            bedrock_runtime_client = boto3.client('bedrock-runtime',region_name='us-east-1', aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"), aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"))
            response = bedrock_runtime_client.invoke_model(
                modelId="meta.llama2-13b-chat-v1", body=json.dumps(body)
            )
            response_body = json.loads(response["body"].read())
            completion = response_body["generation"]
            logging.info("Explanation generated with LLAMA")
            return completion
        else:
            logging.warning("Invalid AI service specified.")
            return ""
        
        logging.info("Explanation for Recommendation generated")
        return result
    except Exception as e:
        logging.warning(e)
        raise Exception("Error generating explanation")



# Converting the scripts to APIs to integrate with the frontend




In [None]:
from fastapi import FastAPI, File, Form, UploadFile
from fastapi.middleware.cors import CORSMiddleware
from typing import Optional
import uvicorn
import os

# Importing necessary classes and functions from project modules
from jobTracker.recommendation import Recommendation
from jobTracker.track import (
    update_track_data,
    get_track_data,
    delete_track_data,
    TrackFile,
)
from jobTracker.explain import explain_openai_gpt

# Initialize FastAPI instance
app = FastAPI()

# CORS middleware configuration to allow requests from web applications hosted on different origins
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # This allows all origins
    allow_credentials=True,
    allow_methods=["*"],  # Allows all methods
    allow_headers=["*"],  # Allows all headers
)

# Endpoint for recommending jobs based on resume
@app.post("/recommend-jobs/")
async def recommend_jobs(
    resume: UploadFile = File(...),
    ai_service: str = Form(...),  # AI service to use for recommendation
    api_key: Optional[str] = Form(None),  # Optional API key
    jobtitle: Optional[str] = Form(None),
):
    # Save temporary file to disk to be read by Document
    temp_file_path = f"temp_{resume.filename}"
    with open(temp_file_path, "wb") as buffer:
        buffer.write(await resume.read())

    # Initialize the Recommendation class with provided parameters
    recommendation_engine = Recommendation(
        resume=temp_file_path, ai_service=ai_service, api_key=api_key, jobtitle=jobtitle
    )

    # Generate job description based on the resume
    generated_jd = recommendation_engine.get_generated_jd()

    # Clean up the temporary file
    os.remove(temp_file_path)

    if not generated_jd:
        return {"error": "Failed to generate job description from the resume."}

    # Search for job recommendations based on the generated job description
    job_recommendations = recommendation_engine.search_jd(generated_jd, k=20)

    return {"generated_jd": generated_jd, "job_recommendations": job_recommendations}

# Endpoint for searching jobs based on job description
@app.post("/search-jobs/")
def search_jobs(
    jd: str = Form(...),
    ai_service: str = Form("openai"),  # Default to 'openai' for backward compatibility
    api_key: Optional[str] = Form(None),  # Optional API key
):
    # Initialize with minimal params as this method only uses the search functionality
    recommendation_engine = Recommendation(
        resume="./", ai_service=ai_service, api_key=api_key
    )

    # Search for job recommendations based on the provided job description
    job_recommendations = recommendation_engine.search_jd(jd, k=10)

    return {"job_recommendations": job_recommendations}

# Endpoint for explaining job recommendation based on job description and resume
@app.post("/explain-record/")
def explain_record(
    jd: str = Form(...),
    resume_text: str = Form(...),
    ai_service: str = Form("llama"),  # Default to 'llama'
    api_key: Optional[str] = Form(None),  # Optional API key
):
    # Explain the job recommendation using the specified AI service
    explain_data = explain_openai_gpt(
        jd_text=jd, res=resume_text, ai_service=ai_service, api_key=api_key
    )
    return explain_data

# Endpoint for updating track records based on emails
@app.post("/update-records/")
async def update_track(
    gmail_username: str = Form(...),
    gmail_password: str = Form(...),
):
    # Update track data based on new emails
    track_data = update_track_data(
        gmail_username=gmail_username,
        gmail_api_key=gmail_password,
    )
    return track_data

# Endpoint for getting track records
@app.get("/get-records/")
async def get_track():
    # Get track data
    track_data = get_track_data()
    return track_data

# Endpoint for deleting a track record by index
@app.post("/delete-record/")
def delete_track(index: int):
    # Delete a track record based on the provided index
    track_data = delete_track_data(index)
    return track_data

# Endpoint for adding a new track record
@app.post("/add-record/")
def add_track(
    title: str,
    company: str,
    stage: str,
    location: str,
):
    # Add a new track record with provided details
    stage_map = {"Applied": 0, "OA": 1, "Interview": 2, "Offer": 3, "Rejection": 4}
    stage = stage_map[stage]
    rejected = 1 if stage == 4 else 0
    track = TrackFile()
    track = track.add_record(
        title=title, company=company, stage=stage, rejected=rejected, location=location
    )
    return track.to_dict(orient="records")

# Run the FastAPI app
if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)


# Model Training for Job Email Classification
## Note: All the Training Data is trained using Generative AI

In [None]:
from datasets import load_dataset

# Load dataset from CSV file
dataset = load_dataset("csv", data_files="./train_data/Job_Email_Data.csv")


In [None]:
from sentence_transformers.losses import CosineSimilarityLoss

from setfit import SetFitModel, SetFitTrainer, sample_dataset

# Load a SetFit model from Hub
model = SetFitModel.from_pretrained("sentence-transformers/paraphrase-mpnet-base-v2")
# Create trainer
trainer = SetFitTrainer(
    model=model,
    train_dataset=dataset['train'],
    loss_class=CosineSimilarityLoss,
    metric="accuracy",
    batch_size=4,
    num_iterations=20, # The number of text pairs to generate for contrastive learning
    num_epochs=1, # The number of epochs to use for contrastive learning
    column_mapping={"text": "text", "label": "label"} # Map dataset columns to text/label expected by trainer
)

# Train and evaluate
trainer.train()

In [None]:
trainer.model._save_pretrained("job_email_classification_model")

# Model Training for Job Stage Classification
## Note: All the Training Data is trained using Generative AI

In [None]:
from datasets import load_dataset
dataset = load_dataset("csv", data_files="./train_data/email_classification_data.csv")

In [None]:
dataset['train'].to_pandas()

In [None]:
from sentence_transformers.losses import CosineSimilarityLoss

from setfit import SetFitModel, SetFitTrainer, sample_dataset



# Load a SetFit model from Hub
model = SetFitModel.from_pretrained("sentence-transformers/paraphrase-mpnet-base-v2")

# Create trainer
trainer = SetFitTrainer(
    model=model,
    train_dataset=dataset['train'],
    loss_class=CosineSimilarityLoss,
    metric="accuracy",
    batch_size=4,
    num_iterations=20, # The number of text pairs to generate for contrastive learning
    num_epochs=1, # The number of epochs to use for contrastive learning
    column_mapping={"sentence": "text", "label": "label"} # Map dataset columns to text/label expected by trainer
)

# Train and evaluate
trainer.train()

In [None]:
trainer.model._save_pretrained("job_stage_classification_model") #Both the models are saved in google drive and used for the above functions