In [0]:
%run /Workspace/Users/ajkayode@outlook.com/ehr-project/ehr-project-bundle/envsetup/Configs

In [0]:
import pandas as pd
from pyspark.sql.functions import pandas_udf
from typing import Iterator

#broadcast_conf = spark.sparkContext.broadcast(f"{ats_configs.catalog}.{ats_configs.db}")
broadcast_index = spark.sparkContext.broadcast(f"{ats_configs.vector_index_name}")

@pandas_udf("array<bigint>")
def profile_search(content: Iterator[pd.Series]) -> Iterator[pd.Series]:
    from databricks.vector_search.client import VectorSearchClient

    vs = VectorSearchClient()
    #index = vs.get_index(index_name = ats_configs.vector_index_name)
    index = vs.get_index(index_name = broadcast_index.value)

    def get_profiles(content):
        ids = index.similarity_search(query_text=content, columns=["id"], num_results=2)
        return [int(data[0]) for data in ids["result"]["data_array"]]

    for series in content:
        yield series.apply(get_profiles)
   

In [0]:
class ProfileJDMatching:
    def __init__(self):
        spark.conf.set(
            "spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled", "true"
        )

    def get_start_time(self):
        start_time = (
            spark.sql(
                f"""
                select execution_time as start_time 
            from {ats_configs.jobs_metadate_table_name} 
            where job_name = '{ats_configs.jd_profile_job_name}'
            order by execution_time desc
            """
                ).first()
                .asDict()["start_time"]
                .strftime("%Y-%m-%d %H:%M:%S")
        )

        return start_time

    def get_load_date(self):
        load_date = (
            spark.sql(
                f"""
                select date_add(last_load_date, 1) as load_date 
                from {ats_configs.jobs_metadate_table_name} 
                where job_name = '{ats_configs.jd_profile_job_name}'
                order by last_load_date desc
                """).first()
                .asDict()["load_date"]
                .strftime("%Y-%m-%d %H:%M:%S")
        )

        return load_date

    def get_end_time(self):
        end_time = (
            spark.sql(
                f"""
                select current_timestamp() as end_time
                """).first()
                .asDict()["end_time"]
                .strftime("%Y-%m-%d %H:%M:%S")
        )
        return end_time

    def update_metadata(self, end_time, load_date):
        print(f"Updating metadata for {ats_configs.jd_profile_job_name}")
        spark.sql(
            f"""
            insert into {ats_configs.jobs_metadate_table_name}
            values('{ats_configs.jd_profile_job_name}', 
            '{load_date}', 
            '{end_time}', 
            'Job Execution Completed.' )""")
        print(f"Updated metadata for {ats_configs.jd_profile_job_name}")

    def get_prompt(self):
        prompt = f"""
        For the candidate resume below, compare it to the provided job description.
        Provide your output as a JSON object with:
        name (from the resume)

        email (from the resume)

        phone (from the resume)

        fit_score (number from 0 to 10, where 10 is a perfect fit)

        matched_skills (list of skills present in both resume and JD)

        missing_skills (list of required JD skills not found in the resume)

        evaluation (a short, objective summary of the candidate’s fit, mentioning strengths and gaps)

        When comparing skills:
        - Focus the fit score and evaluation primarily on technical and role-specific skills.
        - Do not penalize a candidate for missing foundational technical skills (such as object-oriented programming, data structures, algorithms) or soft skills (such as communication skills, analytical skills, teamwork) if their education, job titles, or work experience clearly imply these skills.
        - Infer such skills from relevant job titles, degrees, leadership, or collaborative work
        - Only list these as missing if there is clear evidence the candidate lacks them or if their experience is too junior to reasonably assume them.

        Return a list of such JSON objects, one per resume.
        """
        return prompt


    def get_jd_profile_matching(self):
        from pyspark.sql.functions import expr, explode

        # Read change data from the JD silver layer
        load_date = self.get_load_date()
        start_time= self.get_start_time()
        end_time= self.get_end_time()

        #read change data from bronze layer table
        silver_jd_df = (
            spark.read
            .option("readChangeFeed", "true")
            .option("startingTimestamp", start_time)
            .option("endingTimestamp", end_time)
            .table(ats_configs.jd_silver_table_name)
        )

        # Search for marching profiles for each JD from vector index and Save matching in gold staging table
        jd_profile_df = (silver_jd_df.withColumn("profile_id", explode(profile_search("json_context")))
                                    .selectExpr('id as jd_id', 
                                                'profile_id')
                                    .write.mode("overwrite")
                                    .saveAsTable(f"{ats_configs.jd_profile_table_name}_stg")
        )

        # Read from gold staging and use LLM to analyze each profile with JD and summarize for fitness
        prompt = self.get_prompt()

        jd_profile_df = spark.read.table(f"{ats_configs.jd_profile_table_name}_stg")
        profile_df = spark.read.table(ats_configs.profile_silver_table_name)
        jd_df = spark.read.table(ats_configs.jd_silver_table_name)

        matching_jd_profile_df = (
            jd_profile_df.alias("stg_df")
            .join(profile_df.alias("profile"), jd_profile_df.profile_id == profile_df.id)
            .join(jd_df.alias("jd"), jd_profile_df.jd_id == jd_df.id)
            .selectExpr("stg_df.jd_id",
                    "jd.source as jd_source",
                    "jd.json_context as jd_extract",
                    "stg_df.profile_id",
                    "profile.source as profile_source",
                    "profile.json_context as profile_extract"
        )
        )

        jd_profile_summary_df = matching_jd_profile_df.withColumn("summary",
                                                                  expr(f"""ai_query(endpoint => '{ats_configs.chat_model_endpoint_name}',
                                                                    request => CONCAT('{prompt}', 
                                                                                        'Resume: ',profile_extract,
                                                                                        'Job Description: ', jd_extract)) 
                                                                        """),
                                                                  ).withColumn("generated_date", expr("current_timestamp()"))
        # Save the result in the Gold layer
        jd_profile_summary_df.write.mode("append").saveAsTable(ats_configs.jd_profile_table_name)

        # Update job metadata for gold layer execution
        self.update_metadata(end_time, load_date)

In [0]:
ProfileDJMatch = ProfileJDMatching()
ProfileDJMatch.get_jd_profile_matching()