In [1]:
import os
import openai
import psycopg2
import pandas as pd
import tiktoken  # for counting tokens
from scipy import spatial
import pretty_errors
import timeit
import logging
import time
import asyncio
from openai.error import OpenAIError
import json
from typing import Callable
from utils.preprocess import individual_preprocess
from dotenv import load_dotenv
from utils.prompts import *
from utils.SummariseJob import summarise_job_gpt
from utils.AsyncSummariseJob import async_summarise_description
from torch import Tensor
from transformers import AutoTokenizer, AutoModel
from utils.handy import e5_base_v2_query, filter_last_two_weeks, append_parquet
from tenacity import (
    retry,
    stop_after_attempt,
    wait_random_exponential,
)  # for exponential backoff


load_dotenv('.env')
openai.api_key = os.getenv("OPENAI_API_KEY")
user = os.getenv("user")
password = os.getenv("password")
host = os.getenv("host")
port = os.getenv("port")
database = os.getenv("database")
SAVE_PATH = os.getenv("SAVE_PATH")
E5_BASE_V2_DATA = os.getenv("E5_BASE_V2_DATA")


#Start the timer
start_time = timeit.default_timer()

# models
EMBEDDING_MODEL = "text-embedding-ada-002"
#GPT_MODEL = "gpt-3.5-turbo"
GPT_MODEL = "gpt-4"
#GPT_MODEL = "gpt-3.5-turbo-16k"
""""
Load the embedded file
"""

logging.basicConfig(filename='/Users/juanreyesgarcia/Library/CloudStorage/OneDrive-FundacionUniversidaddelasAmericasPuebla/DEVELOPER/PROJECTS/DreamedJobAI/logs/LoggingGPT4.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')


In [2]:


embeddings_path = E5_BASE_V2_DATA

df_unfiltered = pd.read_parquet(embeddings_path)

df = filter_last_two_weeks(df_unfiltered)


In [3]:

def ids_ranked_by_relatedness_e5(query: str,
    df: pd.DataFrame,
    min_n: int,
    top_n: int,
    relatedness_fn=lambda x, y: 1 - spatial.distance.cosine(x, y),
) -> tuple[list[str], list[float]]:
    
    #the query is embedded using e5
    query_embedding = e5_base_v2_query(query=query)

    ids_and_relatednesses = [
        (row["id"], relatedness_fn(query_embedding, row["embedding"]))
        for i, row in df.iterrows()
    ]
    ids_and_relatednesses.sort(key=lambda x: x[1], reverse=True)
    ids, relatednesses = zip(*ids_and_relatednesses)
    return ids[min_n:top_n], relatednesses[min_n:top_n]     
    #Returns a list of strings and relatednesses, sorted from most related to least.

In [4]:

"""
ids, relatednesses = ids_ranked_by_relatedness_e5(abstract_cv, df, min_n=0, top_n=20)
for id, relatedness in zip(ids, relatednesses):
    logging.info(f"ID: {id} has the following {relatedness=:.3f}")"""



'\nids, relatednesses = ids_ranked_by_relatedness_e5(abstract_cv, df, min_n=0, top_n=20)\nfor id, relatedness in zip(ids, relatednesses):\n    logging.info(f"ID: {id} has the following {relatedness=:.3f}")'

In [5]:
#tiktoken function -> to count tokens
def num_tokens(text: str, model: str = GPT_MODEL) -> int:
    """Return the number of tokens in a string."""
    encoding = tiktoken.encoding_for_model(model)
    return len(encoding.encode(text))


In [6]:
async def async_query_summary(
    query: str,
    df: pd.DataFrame,
    model: str,
    token_budget: int,
    min_n: int,
    top_n: int
) -> str:
    #Return a message for GPT, with relevant source texts pulled from a dataframe.
    ids, relatednesses = ids_ranked_by_relatedness_e5(query, df, min_n=min_n, top_n=top_n)
    #Basically giving the most relevant IDs from the previous function
    introduction = introduction_prompt
    query_user = f"{query}"
    message = introduction
    # Create a list of tasks
    tasks = [async_summarise_description(df[df['id'] == id]['description'].values[0]) for id in ids]

    # Run the tasks concurrently
    results = await asyncio.gather(*tasks)
    job_summaries = []
    total_cost_summaries = 0    

    for id, result in zip(ids, results):
        job_description_summary, cost, elapsed_time = result
        
        # Append summary to the list
        job_summaries.append({
            "id": id,
            "summary": job_description_summary
        })
        #Append total cost
        total_cost_summaries += cost

        next_id = f'\nID:<{id}>\nJob Description:---{job_description_summary}---\n'
        if (
            num_tokens(message + next_id + query_user, model=model)
            > token_budget
        ):
            break
        else:
            message += next_id
    return query_user, message, job_summaries, total_cost_summaries


In [7]:
#@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(6))
async def ask(
    #This query is your question, only parameter to fill in function
    query: str,
    min_n: int,
    top_n: int,
    df: pd.DataFrame = df,
    model: str = GPT_MODEL,
    token_budget: int = 8192,
    log_gpt_messages: bool = True,
) -> str:
    #Answers a query using GPT and a dataframe of relevant texts and embeddings.
    query_user, job_id_description, job_summaries, total_cost_summaries = await async_query_summary(query, df, model=model, token_budget=token_budget, min_n=min_n, top_n=top_n)

    #Save summaries in a df & then parquet -> append data if function called more than once
    df_summaries = pd.DataFrame(job_summaries)
    #logging.info(df_summaries)
    #df_summaries.to_parquet(SAVE_PATH+ f'/summaries.parquet', engine='pyarrow')

    append_parquet(df_summaries, 'summaries')
    
    messages = [
        {"role": "system", "content": system_prompt},
        {"role": "user", "content": f"{delimiters}{query_user}{delimiters}"},
        {"role": "assistant", "content": job_id_description}
    ]
    
    if log_gpt_messages:
        logging.info(messages)
    response = openai.ChatCompletion.create(
        model=model,
        messages=messages,
        temperature=0
    )
    response_message = response["choices"][0]["message"]["content"]
    
    #if print_cost_and_relatednesses:
    total_tokens = response['usage']['total_tokens']
    prompt_tokens = response['usage']['prompt_tokens']
    completion_tokens = response['usage']['completion_tokens']
    logging.info(f"\nOPERATION: GPT-3.5 TURBO SUMMARISING. \nTOTAL COST: ${total_cost_summaries} USD")
    #logging.info(f"OPERATION: {GPT_MODEL} CLASSIFYING \nPROMPT TOKENS USED:{prompt_tokens}\n COMPLETION TOKENS USED:{completion_tokens}\n \nTOTAL TOKENS USED:{total_tokens}\n)

    #Approximate cost
    if GPT_MODEL == "gpt-4":
        prompt_cost = round((prompt_tokens / 1000) * 0.03, 3)
        completion_cost = round((completion_tokens / 1000) * 0.06, 3)
        cost_classify = prompt_cost + completion_cost
        logging.info(f"\nOPERATION: {GPT_MODEL} CLASSIFICATION \nPROMPT TOKENS USED:{prompt_tokens}\nCOMPLETION TOKENS USED:{completion_tokens}\nTOTAL TOKENS USED:{total_tokens}\nCOST FOR CLASSIFYING: ${cost_classify} USD")
    elif GPT_MODEL == "gpt-3.5-turbo":
        prompt_cost = round((prompt_tokens / 1000) * 0.0015, 3)
        completion_cost = round((completion_tokens / 1000) * 0.002, 3)
        cost_classify = prompt_cost + completion_cost
        logging.info(f"\nOPERATION: {GPT_MODEL} CLASSIFICATION \nPROMPT TOKENS USED:{prompt_tokens}\nCOMPLETION TOKENS USED:{completion_tokens}\nTOTAL TOKENS USED:{total_tokens}\nCOST FOR CLASSIFYING: ${cost_classify} USD")
    elif GPT_MODEL == "gpt-3.5-turbo-16k":
        prompt_cost = round((prompt_tokens / 1000) * 0.003, 3)
        completion_cost = round((completion_tokens / 1000) * 0.004, 3)
        cost_classify = prompt_cost + completion_cost
        logging.info(f"\nOPERATION: {GPT_MODEL} CLASSIFICATION \nPROMPT TOKENS USED:{prompt_tokens}\nCOMPLETION TOKENS USED:{completion_tokens}\nTOTAL TOKENS USED:{total_tokens}\nCOST FOR CLASSIFYING: ${cost_classify} USD")

    #relatednesses
    ids, relatednesses = ids_ranked_by_relatedness_e5(query=query, df=df, min_n=min_n, top_n=top_n)
    for id, relatedness in zip(ids, relatednesses):
        logging.info(f"ID: {id} has the following {relatedness=:.3f}")
    
    elapsed_time = (timeit.default_timer() - start_time) / 60
    logging.info(f"\nGPT-3.5 TURBO & GPT-4 finished summarising and classifying! all in: {elapsed_time:.2f} minutes \n")
    
    return response_message

In [8]:
async def check_output_GPT4(input_cv: str, min_n:int, top_n:int) -> str:
    default = '[{"id": "", "suitability": "", "explanation": ""}]'
    default_json = json.loads(default)
    
    for _ in range(6):
        i = _ + 1
        try:
            python_string = await ask(query=input_cv, min_n=min_n, top_n=top_n)
            try:
                data = json.loads(python_string)
                logging.info(f"Response is a valid json object. Done in loop number: {i}")
                return data
            except json.JSONDecodeError:
                pass
        except OpenAIError as e:
            logging.warning(f"{e}. Retrying in 10 seconds. Number of retries: {i}")
            time.sleep(10)
            pass
        except Exception as e:
            logging.warning(f"{e}. Retrying in 5 seconds. Number of retries: {i}")
            time.sleep(5)
            pass

    logging.error("Check logs!!!! Main function was not callable. Setting json to default")
    return default_json


In [9]:
def set_dataframe_display_options():
    pd.set_option('display.max_columns', None)  # Show all columns
    pd.set_option('display.max_rows', None)  # Show all rows
    pd.set_option('display.width', None)  # Disable column width restriction
    pd.set_option('display.expand_frame_repr', False)  # Disable wrapping to multiple lines
    pd.set_option('display.max_colwidth', None)  # Display full contents of each column

# Call the function to set the desired display options
set_dataframe_display_options()

In [None]:
async def main():
    min_n=0
    top_n=10

    # Define the suitable categories
    suitable_categories = ['Highly Suitable', 'Moderately Suitable', 'Potentially Suitable']

    # Initialize the dataframe
    df_appended = pd.DataFrame()

    # Continue to call the function until we have 10 suitable jobs
    counter = 0
    while True:
        checked_json = await check_output_GPT4(input_cv=abstract_cv, min_n=min_n, top_n=top_n)
        
        # Convert the JSON to a dataframe and append it to the existing dataframe
        df_original = pd.read_json(json.dumps(checked_json))
        df_appended = pd.concat([df_appended, df_original], ignore_index=True)
        
        counter += 1
        logging.info(f"Looking for suitable jobs. Current loop: {counter}")

        logging.info(f"Current min_n: {min_n}. Current top_n: {top_n}")

        # Increment the counters
        min_n += 10
        top_n += 10

        # Filter the dataframe to only include the suitable jobs
        df_most_suitable = df_appended[df_appended['suitability'].isin(suitable_categories)] if 'suitability' in df_appended.columns else pd.DataFrame()
        
        df_appended.to_parquet(SAVE_PATH + f"/df_appended.parquet", index=False)
        df_most_suitable.to_parquet(SAVE_PATH + f"/df_most_suitable.parquet", index=False)

        # Break the loop if we have 10 suitable jobs
        if len(df_most_suitable) >= 10:
            break

    logging.info(f"\nDF APPENDED:\n{df_appended}")
    
    #Get the ids
    def ids_df_most_suitable(df: pd.DataFrame = df_most_suitable) -> str:
        ids = ""
        for _, row in df.iterrows():
            if "id" in row:
                if ids:
                    ids += ", "
                ids += f"'{row['id']}'"

        return f"({ids})"

    ids_most_suitable = ids_df_most_suitable()
    logging.info(f"Getting the ids from the json object: {type(ids_most_suitable)}, {ids_most_suitable}")

    def find_jobs_per_ids(ids:str, table: str = "main_jobs") -> pd.DataFrame:
        conn = psycopg2.connect(user=user, password=password, host=host, port=port, database=database)
        # Create a cursor object
        cur = conn.cursor()
        #TABLE SHOULD EITHER BE "main_jobs" or "test"
        cur.execute( f"SELECT id, title, link, location FROM {table} WHERE id IN {ids}")

        # Fetch all rows from the table
        rows = cur.fetchall()

        # Separate the columns into individual lists
        all_ids = [row[0] for row in rows]
        all_titles = [row[1] for row in rows]
        all_links = [row[2] for row in rows]
        all_locations = [row[3] for row in rows]

        df = pd.DataFrame({
            'id': all_ids,
            'title': all_titles,
            'link': all_links,
            'location': all_locations
        })
                # Close the database connection
        cur.close()
        conn.close()

        return df

    df_postgre = find_jobs_per_ids(ids=ids_most_suitable)

    #Read the parquet with ids & summaries
    df_summaries = pd.read_parquet(SAVE_PATH + "/summaries.parquet")
    #Merge it with the data in postgre
    df_postgre_summaries = df_postgre.merge(df_summaries, on='id', how='inner')
    #Merge with most suitable df so you have all the rows
    df = df_postgre_summaries.merge(df_most_suitable, on="id", how='inner')

    logging.info(f"\nALL ROWS:\n{df}")


    def sort_df_by_suitability(df: pd.DataFrame = df) -> pd.DataFrame:
        custom_order = {
            'Highly Suitable': 1,
            'Moderately Suitable': 2,
            'Potentially Suitable': 3
        }
        df['suitability_rank'] = df['suitability'].map(custom_order)
        sorted_df = df.sort_values(by='suitability_rank')
        sorted_df = sorted_df.drop(columns='suitability_rank')
        return sorted_df

    sorted_df = sort_df_by_suitability()

    filename = "/final_user_df"
    
    sorted_df.to_parquet(SAVE_PATH + f"{filename}.parquet", index=False)

    logging.info(f"\nSORTED DF:\n{sorted_df}.\n\nThis df has been saved in ...{filename}.parquet\n\n\n")

await main()

In [11]:
"""
async def main():

    #TODO: This needs to be user's cv
    #checked_json = await check_output_GPT4(input_cv=abstract_cv, min_n=0, top_n=10)

    #Try block needs to be here

    def ids_json_loads(data: list[dict[str, str, str]] = None) -> str:
        if data is None:
            data = checked_json
            logging.info(f"type of the json object: {type(data)} Data: {data}")
            #print(type(exp), exp)
        
        ids = ""
        for item in data:
            if "id" in item:
                if ids:
                    ids += ", "
                ids += f"'{item['id']}'"

        return f"({ids})"

    ids_ready = ids_json_loads()
    logging.info(f"Getting the ids from the json object: {type(ids_ready)}, {ids_ready}")

    def ids_json_loads(df: pd.DataFrame = df_most_suitable) -> str:

        ids = ""
        for _, row in df.iterrows():
            if "id" in row:
                if ids:
                    ids += ", "
                ids += f"'{row['id']}'"

        return f"({ids})"

    ids_ready = ids_json_loads()
    logging.info(f"Getting the ids from the json object: {type(ids_ready)}, {ids_ready}")


    def find_jobs_per_ids(ids:str, table: str = "test") -> pd.DataFrame:
        conn = psycopg2.connect(user=user, password=password, host=host, port=port, database=database)
        # Create a cursor object
        cur = conn.cursor()
        #TABLE SHOULD EITHER BE "main_jobs" or "test"
        cur.execute( f"SELECT id, title, link, location FROM {table} WHERE id IN {ids}")

        # Fetch all rows from the table
        rows = cur.fetchall()

        # Separate the columns into individual lists
        all_ids = [row[0] for row in rows]
        all_titles = [row[1] for row in rows]
        all_links = [row[2] for row in rows]
        all_locations = [row[3] for row in rows]

        df = pd.DataFrame({
            'id': all_ids,
            'title': all_titles,
            'link': all_links,
            'location': all_locations
        })


        # Close the database connection
        cur.close()
        conn.close()

        return df

    df_postgre = find_jobs_per_ids(ids=ids_ready)
    #Read the parquet
    df_summaries = pd.read_parquet(SAVE_PATH + "/summaries.parquet")

    df = df_postgre.merge(df_summaries, on='id', how='inner')

    logging.info(f"RELATED JOBS & THEIR SUMMARIES: \n {df}")

    def adding_all_data(df: pd.DataFrame, suitable_jobs: list) -> pd.DataFrame:
        for index, row in df.iterrows():
            entry_id = row['id']
            for json_item in suitable_jobs:
                if int(json_item['id']) == entry_id:
                    suitability = json_item['suitability']
                    explanation = json_item['explanation']
                    df.at[index, 'suitability'] = suitability
                    df.at[index, 'explanation'] = explanation
                    break
        return df

    updated_data = adding_all_data(df=df, suitable_jobs=checked_json)

    logging.info(f"ALL COLUMNS: \n {updated_data}")

    def sort_df_by_suitability(df: pd.DataFrame = df) -> pd.DataFrame:
        custom_order = {
            'Highly Suitable': 1,
            'Moderately Suitable': 2,
            'Potentially Suitable': 3,
            'Marginally Suitable': 4,
            'Not Suitable': 5
        }
        df['suitability_rank'] = df['suitability'].map(custom_order)
        sorted_df = df.sort_values(by='suitability_rank')
        sorted_df = sorted_df.drop(columns='suitability_rank')
        return sorted_df

    sorted_df = sort_df_by_suitability()

    filename = "/final_user_df"
    
    sorted_df.to_parquet(SAVE_PATH + f"{filename}.parquet", index=False)


    logging.info(f"SORTED DF:\n {sorted_df}. \n This df has been saved in ...{filename}.parquet")

if __name__ == "__main__":
	asyncio.run(main())
        """

'\nasync def main():\n\n    #TODO: This needs to be user\'s cv\n    #checked_json = await check_output_GPT4(input_cv=abstract_cv, min_n=0, top_n=10)\n\n    #Try block needs to be here\n\n    def ids_json_loads(data: list[dict[str, str, str]] = None) -> str:\n        if data is None:\n            data = checked_json\n            logging.info(f"type of the json object: {type(data)} Data: {data}")\n            #print(type(exp), exp)\n        \n        ids = ""\n        for item in data:\n            if "id" in item:\n                if ids:\n                    ids += ", "\n                ids += f"\'{item[\'id\']}\'"\n\n        return f"({ids})"\n\n    ids_ready = ids_json_loads()\n    logging.info(f"Getting the ids from the json object: {type(ids_ready)}, {ids_ready}")\n\n    def ids_json_loads(df: pd.DataFrame = df_most_suitable) -> str:\n\n        ids = ""\n        for _, row in df.iterrows():\n            if "id" in row:\n                if ids:\n                    ids += ", "\n    