In [None]:
import pandas as pd
import re
from openai import AsyncOpenAI
from anthropic import AsyncAnthropic
import anthropic
import openai
import asyncio
from tenacity import retry, wait_random_exponential, stop_after_attempt
import os
import google.generativeai as genai
import time
from collections import defaultdict
from dotenv import load_dotenv
import json


In [None]:
import asyncio

semaphore = asyncio.Semaphore(3)

In [None]:
import matplotlib.pyplot as plt

In [None]:
load_dotenv()

In [None]:
DB_CSVS_BASE_PATH="Spider2/spider2-lite/resource/databases/csv_dbs"


In [None]:
ARCADE_COLUMNS = ["nb_name","work_dir","nb_header","intent_number","intent","code","inputs","outputs"]
OPEN_AI_40_MINI = "4o-mini"


In [None]:
openai_client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))
anthropic_client = AsyncAnthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))
# gemini_model = genai.GenerativeModel(model_name=model or "gemini-2.0-flash-thinking-exp-01-21",)
# gemini_client = genai.GenerativeModel

In [None]:
DB_METADATAS_PATH="Spider2/spider2-lite/resource/databases/sqlite/"
#You need to download the sqlite dbs manually and unzip them here
DATABASES_PATH="Spider2/spider2-lite/resource/databases/spider2-localdb/" 
DB_CSVS_BASE_PATH="Spider2/spider2-lite/resource/databases/csv_dbs"
EVALUATION_SET_PATH="Spider2/spider2-lite/spider2-lite.jsonl"
SPIDER2_LOCAL_DB_LINK = "https://drive.usercontent.google.com/download?id=1coEVsCZq-Xvj9p2TnhBFoFTsY-UoYGmG&export=download&authuser=0&confirm=t&uuid=e4894821-9b03-4a4a-b574-9e931c7f6497&at=AEz70l4CupjM1wWNkGFVtYAST2Xs%3A1743423729461"
GOLD_RESULT_PATH = "Spider2/spider2-lite/evaluation_suite/gold/exec_result"

def get_table_metadata(database_name, table_name):
    table_metadata_file = os.path.join(DB_METADATAS_PATH, database_name, f"{table_name}.json")
    try:
        with open(table_metadata_file, "r", encoding="utf-8") as f:
           table_metadata = json.load(f)
           return table_metadata
    except Exception as e:
        print(e)
        return None

def get_table_dtype_map(database_name, table_name, sql=False):
    table_metadata = get_table_metadata(database_name, table_name)
    dtype_map = dict(
        zip(
            table_metadata.get("column_names", []),
            table_metadata.get("column_types", []),
        )
    )
    if sql:
        return dtype_map
    else:
        try:
            with open("spider_dtype_mappings.json", "r") as f:
                mappings = json.load(f)
                dtype_map = {k: mappings.get(v.lower(), "object") for k, v in dtype_map.items()}
                return dtype_map
        except Exception as e:
            print(e)
            return None

def load_csv_database(database_name, rows_limit=10, as_dict=False):
    """
    Load a CSV-dumped database into a dictionary where each key is a table name and the value is a pandas DataFrame.

    :param database_path: Path to the directory containing the CSV files representing the database.
    :return: A dictionary with table names as keys and pandas DataFrames as values.
    """
    path1 = os.path.join(DB_CSVS_BASE_PATH, database_name)
    path2 = path1.replace("-", "_")
    path3 = path1.replace("_", "-")
    path = [x for x in [path1, path2, path3] if os.path.exists(x)]
    if path:
        database_path = path[0]
    else:
        print("Failed to get database")
        return None
    tables = {}
    for file_name in os.listdir(database_path):
        if file_name.endswith(".csv"):
            table_name = os.path.splitext(file_name)[0]
            file_path = os.path.join(database_path, file_name)
            dtypes = get_table_dtype_map(database_name, table_name)
            tables[table_name] = pd.read_csv(file_path,)# dtype=dtypes
            if rows_limit >= 0:
                tables[table_name] = tables[table_name].iloc[:rows_limit]
            if as_dict:
                tables[table_name] = tables[table_name].to_dict(orient='records')
    return tables

In [None]:
async def call_llm(provider, prompt="", model=None, temperature=0.0, max_tokens=512, messages=None):
    try:
        messages = messages or [{"role": "user", "content": prompt}]
        if provider.lower() == "openai":
            client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))
            response = await client.chat.completions.create(
                    model=model or "gpt-4o-mini",
                    store=False,
                    messages=messages,
                    temperature=temperature,
                    max_tokens=max_tokens
                )
            return response.choices[0].message.content

        elif provider.lower() == "anthropic":
            response = await anthropic_client.messages.create(
                model=model or "claude-3-5-haiku-20241022",
                messages=messages,
                temperature=temperature,
                max_tokens=max_tokens,
            )
            return response.content[0].text

        elif provider.lower() == "google":
            gemini_model = genai.GenerativeModel(
                model_name=model or "gemini-2.0-flash-thinking-exp-01-21",
                generation_config={
                    "temperature": temperature,
                    "top_p": 0.95,
                    "top_k": 40,
                    "max_output_tokens": max_tokens,
                    "response_mime_type": "text/plain",
                }
            )
            response = await gemini_model.generate_content_async(prompt)
            # time.sleep(6)
            return response.text

        else:
            return "Unknown provider."

    except Exception as e:
        return f"Error: {e}"

In [None]:
# async def call_llm(provider, prompt="", model=None, temperature=0.0, max_tokens=512):
#     try:
#         if provider.lower() == "openai":
#             client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))
#             response = await client.chat.completions.create(
#                     model=model or "gpt-4o-mini",
#                     store=False,
#                     messages=[{"role": "user", "content": prompt}],
#                     temperature=temperature,
#                     max_tokens=max_tokens
#                 )
#             return response.choices[0].message.content

#         elif provider.lower() == "anthropic":
#             response = await anthropic_client.messages.create(
#                 model=model or "claude-3-5-haiku-20241022",
#                 messages=[{"role": "user", "content": prompt}],
#                 temperature=temperature,
#                 max_tokens=max_tokens,
#             )
#             return response.content[0].text

#         elif provider.lower() == "google":
#             gemini_model = genai.GenerativeModel(
#                 model_name=model or "gemini-2.0-flash-thinking-exp-01-21",
#                 generation_config={
#                     "temperature": temperature,
#                     "top_p": 0.95,
#                     "top_k": 40,
#                     "max_output_tokens": max_tokens,
#                     "response_mime_type": "text/plain",
#                 }
#             )
#             response = await gemini_model.generate_content_async(prompt)
#             # time.sleep(6)
#             return response.text

#         else:
#             return "Unknown provider."

#     except Exception as e:
#         return f"Error: {e}"

In [None]:
def get_db_description(input_dict, input_dtypes, db_name=""):
    """
    Generates a description of the database structure, including tables, columns, and their data types.
    """
    description = f"The following is a list of tables in the {db_name} database, along with a list of the columns in the table, along with their types, where available, in parentheses.\n\n"
    
    for table_name, sample_rows in input_dict.items():
        description += f"Table: {table_name}\nColumns: "
        if table_name in input_dtypes:
            for column_name, dtype in input_dtypes[table_name].items():
                description += f"{column_name} ({dtype}), "
        else:
            description += "    No column information available.\n"
        description += "\n\n"
    return description

In [None]:
def make_header(db_name, work_dir=""):
    header = f"""
import pandas as pd



{db_name} = dict()
for table, table_data in load_csv_database('{db_name}', rows_limit=-1).items():
    {db_name}[table] = pd.DataFrame(table_data)
OUTPUT_DIR = f"{work_dir}/output.csv"
"""
    return header

In [None]:
def get_task_description(intent, task):
    prompt_steps =  f"Generate a numbered list of steps, in plain text, easily implementable with pandas and python to accomplish the following task. Your final step should be to write the resulting data to a csv file to the path specified in `OUTPUT_DIR`. Do not generate any code at this time"
    prompt_single_shot = "Generate python code that uses the pandas library to accomplish the following task:"
    prompt_gen_from_steps = f"Generate python code for each of the pregenerated steps you will be provided with to fulfill the following user intent:"

    prompt_task = ""
    task = task.lower()
    if task == "gen_steps":
        prompt_task = prompt_steps
    elif task == "gen_single_shot":
        prompt_task = prompt_single_shot
    elif task == "gen_from_steps":
        prompt_task = prompt_gen_from_steps
    else:
        raise Exception
    
    prompt = f"""
{prompt_task}
{intent}
"""
    return prompt

In [None]:
q = get_task_description("make me an omelette", "gen_single_shot")
print(q)

In [None]:
def parse_generated_steps(llm_response):
    """
    Parses a string response from an LLM into a list of steps using regex.

    Args:
        llm_response (str): The LLM-generated response containing a numbered list of steps.

    Returns:
        list: A list of steps as strings.
    """
    # Use regex to match lines starting with a number followed by a period and a space
    step_pattern = re.compile(r'^\d+\.{0,1}\s+(.*)', flags=re.MULTILINE)
    steps = step_pattern.findall(llm_response)
    return steps
    

In [None]:
parse_generated_steps("""1. step 1\n2. step 2\n""")

In [None]:
def make_full_prompt(db_name, input_dict, output_dict, input_dtypes, intent, task, work_dir=""):
    header = make_header(db_name, work_dir)
    task_description = get_task_description(intent, task)
    db_description = get_db_description(input_dict=input_dict, input_dtypes=input_dtypes)
    
    full_prompt = f"""
{db_description}
{task_description}

The following code has been pre-written. As such, you do not need to handle loading the data:
{header}
Be concise. Generate only valid Python code. Don’t include any other explanations other than Python comments. Don't make any assumptons about currently existing code
"""
    return full_prompt


# def make_multi_turn_prompt(db_name, input_dict, output_dict, input_dtypes, intent, task):
#     header = make_header(db_name)
#     task_description = get_task_description(intent, task)
#     db_description = get_db_description(input_dict=input_dict, input_dtypes=input_dtypes)    

#     multi_prompt = f"""The following is your task: {task_description}.

# A sequence of steps to achieve this task has been pre-generated. Your role is to execute each step using Python to accomplish the task. The subsequent messages will provide individual steps toward the goal, which you must implement.

# {db_description}

# The following code has been pre-written. As such, you do not need to handle loading the data:
# {header}    
# """
#     return multi_prompt
    

In [None]:
def make_full_prompt_from_df_row(df_row, task):
    db_name = df_row["db_name"]
    input_dict = df_row["inputs"]
    output_dict = df_row["outputs"]
    input_dtypes = df_row["d_types"]
    intent = df_row["intent"]
    work_dir  = f"output/{task}/{df_row['work_dir']}"
    os.makedirs(work_dir, exist_ok=True)
    
    
    if task in ("gen_single_shot", "gen_steps", "gen_from_steps"):
        return make_full_prompt(db_name=db_name,
                                input_dict=input_dict,
                                output_dict=output_dict,
                                input_dtypes=input_dtypes,
                                intent=intent,
                                task=task,
                                work_dir=work_dir)
    return ""
    # elif task in ("gen_from_steps"):
    #     return make_multi_turn_prompt(db_name=db_name,
    #                             input_dict=input_dict,
    #                             output_dict=output_dict,
    #                             input_dtypes=input_dtypes,
    #                             intent=intent,
    #                             task=task)

In [None]:
llm_etl_spider = pd.DataFrame(pd.read_pickle("datasets/llm_etl_spider2_df.pickle"))
llm_etl_spider = llm_etl_spider[llm_etl_spider["d_types"] != {}]
# llm_etl_spider =  llm_etl_spider.sample(15)
# llm_etl_spider = llm_etl_spider.iloc[:5]

In [None]:
q = make_full_prompt_from_df_row(llm_etl_spider.iloc[0], task="gen_from_steps")

print(q)

In [None]:
llm_etl_spider

In [None]:
model_options = {
    "gpt-4o-mini": ("openai", "gpt-4o-mini")
}

In [None]:
async def run_task_row(df_row, task, provider, model, temperature=0.0, max_tokens=512):
    prompt = make_full_prompt_from_df_row(df_row=df_row, task=task)
    response = ""
    response = await call_llm(
            provider=provider, 
            prompt=prompt, 
            model=model, 
            temperature=temperature, 
            max_tokens=max_tokens
            )
    return response


async def process_dataframe_rows(df:pd.DataFrame, task, provider, model, column_name=None, in_place=True):
    """
    Processes the rows of a dataframe asynchronously.

    Args:
        df (pd.DataFrame): The dataframe containing rows to process.
        task (str): The task to perform.
        provider (str): The provider to use (e.g., "openai").
        engine (str): The engine to use (e.g., "4o-mini").

    Returns:
        list: A list of responses for each row.
    """
    task_df = df if in_place else df.copy()

    tasks = [
        run_task_row(df_row=row, task=task, provider=provider, model=model)
        for _, row in task_df.iterrows()
    ]
    results = await asyncio.gather(*tasks)
    
    column_name = column_name or f"{task}_{provider}_{model}"
    
    # Use pandas' assign method for better practice
    task_df = task_df.assign(**{column_name: results})
    
    return task_df

In [None]:
# Check for missing values in the DataFrame
missing_values = llm_etl_spider.isna().sum()
print(missing_values)

In [None]:
llm_etl_spider_filtered = llm_etl_spider[llm_etl_spider["d_types"] != {}]

In [None]:
prompt = make_full_prompt_from_df_row(df_row=llm_etl_spider.iloc[0], task="gen_single_shot")


In [None]:
llm_etl_spider_filtered.columns

In [None]:
def build_arcade_like_dataset(df, save_file_name=None):
    # arcade_like_df = pd.DataFrame(columns=ARCADE_COLUMNS)
    arcade_df_rows = []
    curr_intent_id = 0
    notebook_dict = defaultdict(int)
    for idx, row in df.iterrows(): 
        steps = parse_generated_steps(row['gen_steps'])
        db_name = row["db_name"]
        work_dir = f"dataset_{db_name}/notebook_{notebook_dict[db_name]}"
        nb_name = f"dataset_{db_name}/notebook_{notebook_dict[db_name]}/annotated.ipynb"
        notebook_dict[db_name] += 1
        
        for step in steps:
            arcade_df_rows.append({
                    "nb_name": nb_name,
                    "work_dir": work_dir,
                    "nb_header": make_header(db_name, work_dir),
                    "intent_number": curr_intent_id,
                    "intent": step,
                    "code": row["code"],
                    "inputs": row["inputs"],
                    "outputs": row["outputs"],
                    "db_name": row["db_name"],
                    "d_types": row["d_types"],
                    "spider_task_id": row["work_dir"]
                }
            )
            curr_intent_id += 1
    arcade_like_df = pd.DataFrame(arcade_df_rows)
    if save_file_name:
        arcade_like_df.to_pickle(f"{save_file_name}.pickle")
    return arcade_like_df

In [None]:
# q = build_arcade_like_dataset(df, save_file_name="gemini.steps.spider2.pickle")

In [None]:
model = None
for provider in ["openai"]:#, "anthropic", "google"]: #"openai", "google",
    #Just to be safe, load the input df everytime
    this_df = pd.DataFrame(pd.read_pickle("datasets/llm_etl_spider2_df.pickle"))
    this_df = this_df.iloc[:15]
    #filter out the bad rows
    this_df = this_df[this_df["d_types"] != {}]
    for task in ["gen_steps", "gen_single_shot"]:
        this_df = await process_dataframe_rows(this_df, task=task, provider=provider, model=model, column_name=task, in_place=True)
    this_df.to_pickle(f"datasets/{provider}.single_shot.spider2.pickle")
    arcade_df = build_arcade_like_dataset(this_df)
    arcade_df.to_pickle(f"datasets/{provider}.arcade.spider2.pickle")
        

In [None]:
llm_etl_steps_oai = pd.read_pickle(f"datasets/openai.arcade.spider2.pickle")
# sample = llm_etl_steps_oai.iloc[:29]

In [None]:
llm_etl_steps_oai

In [None]:
def clean_code_markers(code_string):
    """
    Removes ```python and ``` markers from a code string.
    
    Args:
        code_string (str): The input string containing code with markers
        
    Returns:
        str: Cleaned code without the markers
    """
    # Remove ```python at the start (with optional whitespace)
    cleaned = code_string.replace('```python', '').strip()
    
    # Remove ``` at the end (with optional whitespace)
    cleaned = cleaned.replace('```', '').strip()

    return cleaned

In [None]:
def execute_intent_code(exec_state, code, verbose=False):
    # print(code)
    """
    Executes the given code in the provided execution state.
    Returns the updated execution state and any outputs, capturing only primitive types, tuples, 
    and DataFrames (DataFrames are stored in JSON format).

    exec_state: python exec namespace

    examples:

        for executing notebook header:
        
            first_n_rows = pd.DataFrame(eval(eval(intents.iloc[0][INPUT_DATA_COL].replace('null', 'None'))['first_n_rows']))
            exec_state = {"pd": pd, "first_n_rows": first_n_rows}  # Initialize execution state
            try:
                outputs, exec_state = execute_intent_code(exec_state, nb_header, verbose=False)
                inputs = outputs  # Initialize inputs with the header execution outputs
            except Exception as e:
                print(f"Error executing notebook header for {nb_name}: {e}")
                continue  # Skip this notebook if the header fails

        for executing intent code (note exec_state would have been previously modified from previous intent code execution)
            # Execute original code
            try:
                print("Executing original code...")
                original_outputs, exec_state = execute_intent_code(exec_state, actual_code, verbose=False)
            except Exception as e:
                print(f"Error executing original code: {e}")
                original_outputs = {}
        
    """
    try:
        # Use a non-interactive backend for matplotlib to suppress plots
        plt.switch_backend('Agg')

        if verbose:
            print("IN STATE")
            print(exec_state)
            print("CODE")
            print(code)

        # Execute the code in the provided execution state
        exec(code, exec_state)

        # Clear any matplotlib figures created during execution
        plt.close('all')
        
        # Capture the outputs (all variables in the execution state)
        outputs = {}
        for key, value in exec_state.items():
            if not key.startswith("__"):
                if isinstance(value, (int, float, str, bool, tuple)):
                    outputs[key] = value
                elif isinstance(value, pd.DataFrame):
                    # Convert DataFrame to JSON format
                    # outputs[key] = value
                    outputs[f"#{key}"] = str(value.iloc[:4].to_json(orient="records"))

                elif isinstance(value, pd.Series):
                    # Convert Series to JSON format
                    # outputs[key] = value
                    outputs[f"*{key}"] = str(value.iloc[:4].to_json())

        if verbose:
            print("OUT STATE")
            print(exec_state)
            print("OUTPUTS")
            print(outputs)

    except Exception as e:
        print(code)
        print("Error in executing code: ", e)
        outputs = {"_error": str(e)}
    
    return outputs, exec_state


def build_output_description_str(output_dict):
    """
    Builds a string description of the outputs from the execution state.

    Args:
        output_dict (dict): A dictionary containing the outputs from the execution state.

    Returns:
        str: A formatted string describing the outputs.
    """
    description = "The following variables are in the namespace following the previous step:\n"
    for key, value in output_dict.items():
        if key.startswith("#"):
            description += f"- DataFrame '{key[1:]}' (sample): {value}\n"
        elif key.startswith("*"):
            description += f"- Series '{key[1:]}' (sample): {value}\n"
        elif key == "_error":
            description += f"- Error: {value}\n"
        else:
            description += f"- {key}: {value}\n"
    return description

In [None]:
async def run_multiturn_code_gen(
    provider,
    model,
    system_prompt: str,
    header: str, 
    steps: list,
    temperature=0.7,
    include_output=False
):
    """
    Simulates a multi-turn conversation with an LLM.
    
    Args:
        system_prompt (str): Initial system prompt to set the behavior of the assistant.
        user_turns (list): A list of user messages to simulate the dialogue.
        model (str): The LLM model name.
        temperature (float): Sampling temperature.
    Returns:
        list: A list of assistant responses.
    """
    messages = [{"role": "system", "content": system_prompt}]
    responses = []

    exec_state = {"pd": pd, "load_csv_database": load_csv_database}
    output, exec_state = execute_intent_code(exec_state, header)
    
    if "_error" in output:
        responses.extend(["exec_error"] * (len(steps) - len(responses)))
        return responses
    
    output_str = build_output_description_str(output)

    for step in steps:
        user_m = step if not include_output else f"{output_str}\n{step}"
        messages.append({"role": "user", "content": user_m})
        time.sleep(9)
        response = await call_llm(
            provider=provider,
            model=model,
            temperature=0,
            max_tokens=512,
            messages=messages,
        )

        assistant_reply = clean_code_markers(response)
        output, exec_state = execute_intent_code(exec_state, assistant_reply)
        output_str = build_output_description_str(output)
        if "_error" in output:
            responses.extend(["exec_error"] * (len(steps) - len(responses)))
            return responses
        messages.append({"role": "assistant", "content": assistant_reply})
        responses.append(assistant_reply)

    return responses

In [None]:
# sample.iloc[:2]

In [None]:
async def process_and_merge_groups_async(df, groupby_columns, new_column_name, process_function):
    """
    Asynchronously disaggregates the rows of a dataframe via a groupby, processes each group by adding a new column,
    and merges the groups back together.

    Args:
        df (pd.DataFrame): The input dataframe.
        groupby_columns (list): List of columns to group by.
        new_column_name (str): Name of the new column to add to each group.
        process_function (callable): An async function that takes a group dataframe and returns a series or list
                                     to be added as the new column.

    Returns:
        pd.DataFrame: The processed dataframe with the new column added.
    """
    print(df.columns)
    grouped = df.groupby(groupby_columns)
    processed_groups = []

    async def process_group(group):
        group[new_column_name] = await process_function(group)
        return group

    tasks = [process_group(group) for _, group in grouped]
    processed_groups = await asyncio.gather(*tasks)

    # Concatenate all processed groups back into a single dataframe
    result_df = pd.concat(processed_groups, ignore_index=True)
    return result_df

In [None]:
async def process_steps_multiturn_no_fb(df):
    """
    Processes the steps in a DataFrame for multi-turn code generation without feedback.

    Args:
        df (pd.DataFrame): The input DataFrame containing steps and other metadata.

    Returns:
        pd.DataFrame: A DataFrame with an additional column for generated code.
    """
    header = df.iloc[0]["nb_header"]
    header = make_header(df.iloc[0]["db_name"], f"output/gen_from_steps/{df.iloc[0]["work_dir"]}")
    steps = df["intent"]
    system_prompt = make_full_prompt_from_df_row(df.iloc[0], task="gen_from_steps")
    generated_code = await run_multiturn_code_gen(
        provider="openai",
        model=None,
        system_prompt=system_prompt,
        header=header,
        steps=steps,
        include_output=True,
        temperature=0.0
    )
    return generated_code

In [None]:
# processed_df = await process_and_merge_groups_async(sample, groupby_columns="nb_name", new_column_name="gen_step_code", process_function=process_steps_multiturn_no_fb)

In [None]:
sample = llm_etl_steps_oai
async with semaphore:
    processed_df = await process_and_merge_groups_async(
        sample, 
        groupby_columns="nb_name", 
        new_column_name="gen_step_code", 
        process_function=process_steps_multiturn_no_fb
    )

In [None]:
processed_df

In [None]:
print(processed_df.iloc[6]["gen_step_code"])

In [None]:
gg = llm_etl_steps_oai.drop_duplicates(subset=["work_dir"])

In [None]:
import os
import shutil

# Create the target folder if it doesn't exist
target_folder = "gen_exec_results"
os.makedirs(target_folder, exist_ok=True)

# Iterate over each row in the DataFrame
for _, gg_row in gg.iterrows():
    # Construct the source file path
    source_path = f"output/gen_from_steps/output/{gg_row['work_dir']}/output.csv"
    
    # Construct the destination file path
    destination_path = os.path.join(target_folder, f"{gg_row['spider_task_id']}.csv")
    
    # Copy the file if it exists
    if os.path.exists(source_path):
        shutil.copy(source_path, destination_path)
    else:
        # Create an empty file with the appropriate name
        with open(destination_path, 'w') as f:
            f.write("output,")