# Read PDf 

In [1]:
def read_multiple_pdf(file_path: str) -> list:
    """
    Read multiple PDF files from the specified file path and extract the text from each page.

    Args:
        file_path (str): The directory path containing the PDF files.
    Returns:
        list: A list containing the extracted text from each page of the PDF files.
    """
    pdf_files = get_pdf_files(file_path)
    output = []
    for file in pdf_files:
        try:
            with open(file, "rb") as f:
                pdf_reader = PdfReader(f)
                count =get_filenames_from_dirput.append(page.extractText())
        except Exception as e:
            print(f"Error reading file '{file}': {str(e)}")
    return output

In [2]:
def read_single_pdf(file_path: str) -> str:
    """
    Read a single PDF file and extract the text from each page.

    Args:
        file_path (str): The path of the PDF file.

    Returns:
        str: A string containing the extracted text from each page of the PDF file.
    """

    output = []
    try:
        with open(file_path, "rb") as f:
            pdf_reader = PdfReader(f)
            count = len(pdf_reader.pages)
            for i in range(count):
                page = pdf_reader.pages[i]
                output.append(page.extractText())
    except Exception as e:
        print(f"Error reading file '{file_path}': {str(e)}")
    return str(" ".join(output))

# Text Cleaner


In [3]:
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from nltk.stem import WordNetLemmatizer
import string

In [10]:
class TextCleaner:
    def __init__(self, raw_text):
        self.stopwords_set = set(stopwords.words(
            "english") + list(string.punctuation))
        self.lemmatizer = WordNetLemmatizer()
        self.raw_input_text = raw_text

    def clean_text(self) -> str:
        tokens = word_tokenize(self.raw_input_text.lower())
        tokens = [token for token in tokens if token not in self.stopwords_set]
        tokens = [self.lemmatizer.lemmatize(token) for token in tokens]
        cleaned_text = " ".join(tokens)
        return cleaned_text


# Utils

In [11]:
from uuid import uuid4
import re
import spacy

# Load the English Model
nlp = spacy.load("en_core_web_sm")

REGEX_PATTERNS = {
    "email_pattern": r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}\b",
    "phone_pattern": r"\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}",
    "link_pattern": r"\b(?:https?://|www\.)\S+\b",
}


In [12]:
def generate_unique_id():
    """
    Generate a unique ID and return it as a string
    Returns:
        str: A string with a unique ID.
    """
    return str(uuid4())


In [4]:
class TextCleaner:
    """
    A class for cleaning a text by removing specific patterns
    """

    def remove_emails_links(text):
        """
        Clean the input text by removing specific patterns
        Args:
            text (str): The input text to clean
        Returns:
            str: The cleaned text
        """
        for pattern in REGEX_PATTERNS:
            text = re.sub(REGEX_PATTERNS[pattern], "", text)
        return text

    def clean_text(text):
        """
        Clean the input text by removing specific patterns

        Args:
            text(str) : the input to clean
        Returns:
            str: the cleaned text
        """
        text = TextCleaner.remove_emails_links(text)
        doc = nlp(text)
        for token in doc:
            if token.pos_ == "PUNCT":
                text = text.replace(token.text, "")
        return str(text)

    def remove_stopwords(text):
        """
        Clean the input text by removing stopwords.

        Args:
            text (str): The input text to clean.

        Returns:
            str: The cleaned text.
        """
        doc = nlp(text)
        for token in doc:
            if token.is_stop:
                text = text.replace(token.text, "")
        return text

In [5]:
class CountFrequency:
    def __init__(self, text):
        self.text = text
        self.doc = nlp(text)

    def count_frequency(self):
        """
        Count the frequency of words in the input text
        Returns:
            dict: a dictionary with the words as keys and the frequency as values
        """

        pos_freq = {}
        for token in self.doc:
            if token.pos_ in pos_freq:
                pos_freq[token.pos_] += 1
            else:
                pos_freq[token.pos_] = 1
        return pos_freq

# KeyTerms Extractor

In [7]:
import textacy
from textacy import extract

2023-10-01 09:52:55.137417: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
2023-10-01 09:52:58.979287: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:996] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355
2023-10-01 09:52:58.987388: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:996] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355
2023-10-

In [8]:
class KeytermExtractor:
    """
    A class for extracting keyterms from a given text using various algorithms.
    """

    def __init__(self, raw_text: str, top_n_values: int = 20):
        """
        Initialize the KeytermExtractor object.

        Args:
            raw_text (str): The raw input text.
            top_n_values (int): The number of top keyterms to extract.
        """
        self.raw_text = raw_text
        self.text_doc = textacy.make_spacy_doc(self.raw_text, lang="en_core_web_md")
        self.top_n_values = top_n_values

    def get_keyterms_based_on_textrank(self):
        """
        Extract keyterms using the TextRank algorithm.

        Returns:
            List[str]: A list of top keyterms based on TextRank.
        """
        return list(
            extract.keyterms.textrank(
                self.text_doc, normalize="lemma", topn=self.top_n_values
            )
        )

    def get_keyterms_based_on_sgrank(self):
        """
        Extract keyterms using the SGRank algorithm.

        Returns:
            List[str]: A list of top keyterms based on SGRank.
        """
        return list(
            extract.keyterms.sgrank(
                self.text_doc, normalize="lemma", topn=self.top_n_values
            )
        )

    def get_keyterms_based_on_scake(self):
        """
        Extract keyterms using the sCAKE algorithm.

        Returns:
            List[str]: A list of top keyterms based on sCAKE.
        """
        return list(
            extract.keyterms.scake(
                self.text_doc, normalize="lemma", topn=self.top_n_values
            )
        )

    def get_keyterms_based_on_yake(self):
        """
        Extract keyterms using the YAKE algorithm.

        Returns:
            List[str]: A list of top keyterms based on YAKE.
        """
        return list(
            extract.keyterms.yake(
                self.text_doc, normalize="lemma", topn=self.top_n_values
            )
        )

    def bi_gramchunker(self):
        """
        Chunk the text into bigrams.

        Returns:
            List[str]: A list of bigrams.
        """
        return list(
            textacy.extract.basics.ngrams(
                self.text_doc,
                n=2,
                filter_stops=True,
                filter_nums=True,
                filter_punct=True,
            )
        )

    def tri_gramchunker(self):
        """
        Chunk the text into trigrams.

        Returns:
            List[str]: A list of trigrams.
        """
        return list(
            textacy.extract.basics.ngrams(
                self.text_doc,
                n=3,
                filter_stops=True,
                filter_nums=True,
                filter_punct=True,
            )
        )

# Extractor

In [9]:
import re
import urllib.request

In [16]:
nlp = spacy.load("en_core_web_sm")


In [13]:
RESUME_SECTIONS = [
    "Contact Information",
    "Objective",
    "Summary",
    "Education",
    "Experience",
    "Skills",
    "Projects",
    "Certifications",
    "Licenses",
    "Awards",
    "Honors",
    "Publications",
    "References",
    "Technical Skills",
    "Computer Skills",
    "Programming Languages",
    "Software Skills",
    "Soft Skills",
    "Language Skills",
    "Professional Skills",
    "Transferable Skills",
    "Work Experience",
    "Professional Experience",
    "Employment History",
    "Internship Experience",
    "Volunteer Experience",
    "Leadership Experience",
    "Research Experience",
    "Teaching Experience",
]

In [14]:
class DataExtractor:
    """
    A class for extracting various types of data from text.
    """

    def __init__(self, raw_text: str):
        """
        Initialize the DataExtractor object.

        Args:
            raw_text (str): The raw input text.
        """

        self.text = raw_text
        self.clean_text = TextCleaner.clean_text(self.text)
        self.doc = nlp(self.clean_text)

    def extract_links(self):
        """
        Find links of any type in a given string.

        Args:
            text (str): The string to search for links.

        Returns:
            list: A list containing all the found links.
        """
        link_pattern = r"\b(?:https?://|www\.)\S+\b"
        links = re.findall(link_pattern, self.text)
        return links

    def extract_links_extended(self):
        """
        Extract links of all kinds (HTTP, HTTPS, FTP, email, www.linkedin.com,
          and github.com/user_name) from a webpage.

        Args:
            url (str): The URL of the webpage.

        Returns:
            list: A list containing all the extracted links.
        """
        links = []
        try:
            response = urllib.request.urlopen(self.text)
            html_content = response.read().decode("utf-8")
            pattern = r'href=[\'"]?([^\'" >]+)'
            raw_links = re.findall(pattern, html_content)
            for link in raw_links:
                if link.startswith(
                    (
                        "http://",
                        "https://",
                        "ftp://",
                        "mailto:",
                        "www.linkedin.com",
                        "github.com/",
                        "twitter.com",
                    )
                ):
                    links.append(link)
        except Exception as e:
            print(f"Error extracting links: {str(e)}")
        return links

    def extract_names(self):
        """Extracts and returns a list of names from the given
        text using spaCy's named entity recognition.

        Args:
            text (str): The text to extract names from.

        Returns:
            list: A list of strings representing the names extracted from the text.
        """
        names = [ent.text for ent in self.doc.ents if ent.label_ == "PERSON"]
        return names

    def extract_emails(self):
        """
        Extract email addresses from a given string.

        Args:
            text (str): The string from which to extract email addresses.

        Returns:
            list: A list containing all the extracted email addresses.
        """
        email_pattern = r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}\b"
        emails = re.findall(email_pattern, self.text)
        return emails

    def extract_phone_numbers(self):
        """
        Extract phone numbers from a given string.

        Args:
            text (str): The string from which to extract phone numbers.

        Returns:
            list: A list containing all the extracted phone numbers.
        """
        phone_number_pattern = (
            r"^(\+\d{1,3})?[-.\s]?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}$"
        )
        phone_numbers = re.findall(phone_number_pattern, self.text)
        return phone_numbers

    def extract_experience(self):
        """
        Extract experience from a given string. It does so by using the Spacy module.

        Args:
            text (str): The string from which to extract experience.

        Returns:
            str: A string containing all the extracted experience.
        """
        experience_section = []
        in_experience_section = False

        for token in self.doc:
            if token.text in RESUME_SECTIONS:
                if token.text == "Experience" or "EXPERIENCE" or "experience":
                    in_experience_section = True
                else:
                    in_experience_section = False

            if in_experience_section:
                experience_section.append(token.text)

        return " ".join(experience_section)

    def extract_position_year(self):
        """
        Extract position and year from a given string.

        Args:
            text (str): The string from which to extract position and year.

        Returns:
            list: A list containing the extracted position and year.
        """
        position_year_search_pattern = (
            r"(\b\w+\b\s+\b\w+\b),\s+(\d{4})\s*-\s*(\d{4}|\bpresent\b)"
        )
        position_year = re.findall(position_year_search_pattern, self.text)
        return position_year

    def extract_particular_words(self):
        """
        Extract nouns and proper nouns from the given text.

        Args:
            text (str): The input text to extract nouns from.

        Returns:
            list: A list of extracted nouns.
        """
        pos_tags = ["NOUN", "PROPN"]
        nouns = [token.text for token in self.doc if token.pos_ in pos_tags]
        return nouns

    def extract_entities(self):
        """
        Extract named entities of types 'GPE' (geopolitical entity) and 'ORG' (organization) from the given text.

        Args:
            text (str): The input text to extract entities from.

        Returns:
            list: A list of extracted entities.
        """
        entity_labels = ["GPE", "ORG"]
        entities = [
            token.text for token in self.doc.ents if token.label_ in entity_labels
        ]
        return list(set(entities))

# Parse Job Desc to Json

In [15]:
import json
import pathlib

In [18]:
SAVE_DIRECTORY = "Data/Processed/JobDescription"


In [19]:
class ParseJobDesc:
    def __init__(self, job_desc: str):
        self.job_desc_data = job_desc
        self.clean_data = TextCleaner.clean_text(self.job_desc_data)
        self.entities = DataExtractor(self.clean_data).extract_entities()
        self.key_words = DataExtractor(
            self.clean_data).extract_particular_words()
        self.pos_frequencies = CountFrequency(
            self.clean_data).count_frequency()
        self.keyterms = KeytermExtractor(
            self.clean).get_keyterms_based_on_sgrank()
        self.bi_grams = KeytermExtractor(self.clean_data).bi_gramchunker()
        self.tri_grmas = KeytermExtractor(self.clean_data).tri_gramchunker()

    def get_JSON(self) -> dict:
        """
        Returns a dictionary of job description data.
        """
        job_desc_dictionary = {
            "unique_id": generate_unique_id(),
            "job_desc_data": self.job_desc_data,
            "clean_data": self.clean_data,
            "entities": self.entities,
            "extracted_keywords": self.key_words,
            "keyterms": self.keyterms,
            "bi_grams": str(self.bi_grams),
            "tri_grams": str(self.tri_grmas),
            "pos_frequencies": self.pos_frequencies,
        }

        return job_desc_dictionary


# Parse Resume to json

In [21]:
import json
import os.path
import os
import pathlib

SAVE_DIRECTORY = "Data/Processed/Resumes"


In [22]:
class ParseResume:
    def __init__(self, resume: str):
        self.resume_data = resume
        self.clean_data = TextCleaner.clean_text(self.resume_data)
        self.entities = DataExtractor(self.clean_data).extract_entities()
        self.name = DataExtractor(self.clean_data[:30]).extract_names()
        self.experience = DataExtractor(self.clean_data).extract_experience()
        self.emails = DataExtractor(self.resume_data).extract_emails()
        self.phones = DataExtractor(self.resume_data).extract_phone_numbers()
        self.years = DataExtractor(self.clean_data).extract_position_year()
        self.key_words = DataExtractor(
            self.clean_data).extract_particular_words()
        self.pos_frequencies = CountFrequency(
            self.clean_data).count_frequency()
        self.keyterms = KeytermExtractor(
            self.clean_data).get_keyterms_based_on_sgrank()
        self.bi_grams = KeytermExtractor(self.clean_data).bi_gramchunker()
        self.tri_grams = KeytermExtractor(self.clean_data).tri_gramchunker()

    def get_JSON(self) -> dict:
        """
        Returns a dictionary of resume data.
        """
        resume_dictionary = {
            "unique_id": generate_unique_id(),
            "resume_data": self.resume_data,
            "clean_data": self.clean_data,
            "entities": self.entities,
            "extracted_keywords": self.key_words,
            "keyterms": self.keyterms,
            "name": self.name,
            "experience": self.experience,
            "emails": self.emails,
            "phones": self.phones,
            "years": self.years,
            "bi_grams": str(self.bi_grams),
            "tri_grams": str(self.tri_grams),
            "pos_frequencies": self.pos_frequencies,
        }

        return resume_dictionary


# JobDescription processor

In [17]:
READ_JOB_DESCRIPTION_FROM = "Data/JobDescription"
SAVE_DIRECTORY = "Data/Processed/JobDescription"

In [23]:
class JobDescriptionProcessor:
    def __init__(self, input_file):
        self.input_file = input_file
        self.input_file_name = os.path.join(
            READ_JOB_DESCRIPTION_FROM + self.input_file)

    def process(self) -> bool:
        try:
            resume_dict = self._read_resumes()
            self._write_json_file(resume_dict)
            return True
        except Exception as e:
            print(f"An error occurred: {str(e)}")
            return False

    def _read_resumes(self) -> dict:
        data = read_single_pdf(self.input_file_name)
        output = ParseResume(data).get_JSON
        return output

    def _write_json_file(self, resume_dictionary: dict):
        file_name = str(
            "JobDescription-"
            + self.input_file
            + resume_dictionary["unique_id"]
            + ".json"
        )
        save_dictionary_name = pathlib.Path(SAVE_DIRECTORY) / file_name
        json_object = json.dumps(resume_dictionary, sort_keys=True, indent=14)
        with open(save_dictionary_name, "w+") as outfile:
            outfile.write(json_object)


# Resume Processor 

In [24]:
READ_RESUME_FROM = "Data/Resumes/"
SAVE_DIRECTORY = "Data/Processed/Resumes"


In [20]:
class ResumeProcessor:
    def __init__(self, input_file):
        self.input_file = input_file
        self.input_file_name = os.path.join(READ_RESUME_FROM + self.input_file)

    def process(self) -> bool:
        try:
            resume_dict = self._read_resumes()
            self._write_json_file(resume_dict)
            return True
        except Exception as e:
            print(f"An error occurred: {str(e)}")
            return False

    def _read_resumes(self) -> dict:
        data = read_single_pdf(self.input_file_name)
        output = ParseResume(data).get_JSON()
        return output

    def _read_job_desc(self) -> dict:
        data = read_single_pdf(self.input_file_name)
        output = ParseJobDesc(data).get_JSON()
        return output

    def _write_json_file(self, resume_dictionary: dict):
        file_name = str(
            "Resume-" + self.input_file + resume_dictionary["unique_id"] + ".json"
        )
        save_directory_name = pathlib.Path(SAVE_DIRECTORY) / file_name
        json_object = json.dumps(resume_dictionary, sort_keys=True, indent=14)
        with open(save_directory_name, "w+") as outfile:
            outfile.write(json_object)

# Get Similarity Score

In [25]:
import yaml
from qdrant_client import QdrantClient, models
from qdrant_client.http.models import Batch
from sentence_transformers import SentenceTransformer

model = SentenceTransformer("all-MiniLM-L6-v2")


In [28]:
def find_path(folder_name):
    curr_dir = os.getcwd()
    while True:
        if folder_name in os.listdir(curr_dir):
            return os.path.join(curr_dir, folder_name)
        else:
            parent_dir = os.path.dirname(curr_dir)
            if parent_dir == "/":
                break
            curr_dir = parent_dir
    raise ValueError(f"Folder '{folder_name}' not found.")


cwd = find_path("Resume_Matcher")
READ_RESUME_FROM = os.path.join(cwd, "Data", "Processed", "Resumes")
READ_JOB_DESCRIPTION_FROM = os.path.join(
    cwd, "Data", "Processed", "JobDescription")
config_path = os.path.join(cwd, "scripts", "similiarty")


In [29]:
def read_doc(path):
    with open(path) as f:
        try:
            data = json.load(f)
        except Exception as e:
            data = {}
    return data

In [30]:
class QdrantSearch:
    def __init__(self, resumes, jd):
        self.qdrant_key = ""
        self.qdrant_url = ""
        self.resumes = resumes
        self.jd = jd
        self.qdrant = QdrantClient(url=self.qdrant_url, api_key=self.qdrant_key)
        # Vector_size = 4096
        vector_size = 384
        self.qdrant.recreate_collection(
            collection_name="collection_resume_matcher",
            vectors_config=models.VectorParams(
                size=vector_size, distance=models.Distance.COSINE
            ),
        )

    def get_embedding(self, text):
        try:
            embeddings = model.encode([text])
            return list(map(float, embeddings[0])), len(embeddings[0])
        except Exception as e:
            print("ERROR while Embeddign")

    def update_qdrant(self):
        vectors = []
        ids = []
        for i, resume in enumerate(self.resumes):
            vector, size = self.get_embedding(resume)
            vectors.append(vector)
            ids.append(i)
        try:
            self.qdrant.upsert(
                collection_name="collection_resume_matcher",
                points=Batch(
                    ids=ids,
                    vectors=vectors,
                    payloads=[{"text": resume} for resume in self.resumes],
                ),
            )
        except Exception as e:
            print(
                f"Error updating the vectors to the qdrant collection: {e}",
                exc_info=True,
            )

    def search(self):
        vector, _ = self.get_embedding(self.jd)
        hits = self.qdrant.search(
            collection_name="collection_resume_matcher", query_vector=vector, limit=30
        )
        results = []
        for hit in hits:
            result = {"text": str(hit.payload)[:30], "score": hit.score}
            results.append(result)
        return results

In [31]:
def get_similiarty_score(resume_string, job_description_string):
    qdrant_search = QdrantSearch([resume_string], job_description_string)
    qdrant_search.update_qdrant()
    search_result = qdrant_search.search()
    return search_result

In [None]:
resume_dict = read_doc(
    READ_RESUME_FROM + ""
)

# Run First

In [36]:
PROCESSED_RESUMES_PATH = "Data/Processed/Resumes"
PROCESSED_JOB_DESCRIPTIONS_PATH = "Data/Processed/JobDescription"


In [35]:
def read_json(filename):
    with open(filename) as f:
        data = json.load(f)
    return data

In [37]:
def remove_old_files(files_path):
    for filename in os.listdir(files_path):
        try:
            file_path = os.path.join(files_path, filename)
            if os.path.isfile(file_path):
                os.rename(file_path)
        except Exception as e:
            print(f"Error removing file '{file_path}': {str(e)}")
    print("Finished removing old files")

In [51]:
# try:
print(PROCESSED_RESUMES_PATH)
# remove_old_files(PROCESSED_RESUMES_PATH)
file_names = get_filenames_from_dir(
    "/home/kareem/hacking/research/nlp_projects/repotech/Resume_Matcher/Data/Resumes"
)
# print(file_names)
# except:
# print("Error removing old files")

for file in file_names:
    processor = ResumeProcessor(file)
    # print(file)
    success = processor.process()
print("Parsing of the resumes is now complete")

Data/Processed/Resumes
Error reading file '/home/kareem/hacking/research/nlp_projects/repotech/Resume_Matcher/Data/Processed/Resumesalfred_pennyworth_pm.pdf': [Errno 2] No such file or directory: '/home/kareem/hacking/research/nlp_projects/repotech/Resume_Matcher/Data/Processed/Resumesalfred_pennyworth_pm.pdf'
An error occurred: 'str' object has no attribute 'raw_input_text'
Error reading file '/home/kareem/hacking/research/nlp_projects/repotech/Resume_Matcher/Data/Processed/Resumesjohn_doe.pdf': [Errno 2] No such file or directory: '/home/kareem/hacking/research/nlp_projects/repotech/Resume_Matcher/Data/Processed/Resumesjohn_doe.pdf'
An error occurred: 'str' object has no attribute 'raw_input_text'
Error reading file '/home/kareem/hacking/research/nlp_projects/repotech/Resume_Matcher/Data/Processed/Resumesbarry_allen_fe.pdf': [Errno 2] No such file or directory: '/home/kareem/hacking/research/nlp_projects/repotech/Resume_Matcher/Data/Processed/Resumesbarry_allen_fe.pdf'
An error occur

# Streamlit App