# Install dependencies

In [None]:


%pip install ollama
%pip install openai
%pip install transformers
%pip install tiktoken
%pip install huggingface-hub
%pip install python-dotenv


# Prepare data

In [None]:
import json
import sys
sys.path.append('../')  # Add the path to the my_packages module
from my_packages.data_processing.split_dataset import split_on_shots, split
from my_packages.data_processing.get_labels_data import used_libraries_from_dataset
from my_packages.analysis.analyze_datasets import analyze_library_distribution, analyze_instance_distribution, analyze_visual_node_types_distribution

def used_libraries_to_string(data):
    name_doc_string = ""
    for func in data:
        name_doc_string += f"Name: {func['function_name']}\nDocumentation: {func['doc']}\n\n"
    return name_doc_string

main_dataset_folder = '../data/mbpp_transformed_code_examples/sanitized-MBPP-midio.json'

# main dataset
with open(main_dataset_folder, 'r') as file:
    dataset = json.load(file)
    
num_shot = 10 # Few-shot examples
eval_size_percentage = 0.5
train_data, val_data, test_data = split_on_shots(num_shot, eval_size_percentage, dataset, seed = 64, write_to_file=True)

# Extract all unique nodes (library_functions) across datasets
used_libraries_json = used_libraries_from_dataset(train_data)
explained_used_libraries = used_libraries_to_string(used_libraries_json)

#Bar chart of distribuation
analyze_library_distribution(train_data, val_data, test_data)
analyze_instance_distribution(train_data, val_data, test_data)
analyze_visual_node_types_distribution(train_data, val_data, test_data)

# Prompt utils

In [43]:
import os
import re
def extract_code(response_text):
    """Extract code snippet from the response using regex."""
    # Match content between ```language and ```
    match = re.search(fr"```midio(.*?)```", response_text, re.DOTALL)

    # Extract and clean up the code
    if match:
        return match.group(1).strip()  # Return only the code block

    # If no match, assume the response might already be code without markdown formatting
    return response_text.strip()

def read_file(_file):
    with open(_file) as reader:
        return reader.read()

def get_prompt_responses(data: json, code_folder : str):
    script_path = os.path.dirname(os.getcwd())
    prompt_template_path = os.path.join(script_path, 'templates/prompts/FEW_SHOT_TEMPLATE.file')
    code_template_path = os.path.join(script_path, 'templates/responses/CODE_TEMPLATE.file')

    if not (os.path.exists(prompt_template_path)):
        print("Few_shot_template.file not found!!")
        return
    if not (os.path.exists(code_template_path)):
        print("Code_template.file not found!!")
        return
    prompt_template = read_file(prompt_template_path)
    code_template = read_file(code_template_path)

    prompts = [prompt_template.format(task_description=task['prompts'][0]) for task in data]
    responses = []
    for task in data:
        file_path = f"{code_folder}/task_id_{task['task_id']}.midio"
        try:
            with open(file_path, 'r') as file:
                solution_code = code_template.format(code=file.read().strip())
                responses.append(solution_code)
        except FileNotFoundError:
            responses.append("File not found")
        except Exception as e:
            responses.append(f"Error: {e}")
    return (prompts, responses)

def create_few_shot_prompt(train_prompts, train_responses, context_role= "developer"):
    guiding = "You are a Midio code generator and are going to solve some programming tasks for a node-based programming language called Midio. Always format output strictly inside code blocks. Generate only the code and no text, so that it can be directly executed in a compiler. Use minimal amount of library functions to solve the tasks.\n" 
    node_list = f"Only use the following library functions:\n {explained_used_libraries}\n\n"
    context = guiding + node_list
    few_shots_messages = [{"role": context_role, "content": context}] 
    for i, (prompt, response) in enumerate(zip(train_prompts, train_responses)):
        few_shots_messages.append({"role": "user", "content": prompt})
        few_shots_messages.append({"role": "assistant", "content": response})
    return few_shots_messages

train_prompts, train_responses = get_prompt_responses(train_data, '../data/mbpp_transformed_code_examples/only_files/')
val_prompts, val_responses = get_prompt_responses(val_data, '../data/mbpp_transformed_code_examples/only_files/')
test_prompts, test_responses = get_prompt_responses(test_data, '../data/mbpp_transformed_code_examples/only_files/')

few_shot_messages = create_few_shot_prompt(train_prompts, train_responses)


# Open source models

Use OpenAI python library, but connect it to ollama to use open source models

In [None]:
import ollama
from openai import OpenAI
host = 'http://localhost:11434'

client = OpenAI(
  base_url=host+'/v1/',
  api_key='ollama' #Ignored with 'ollama'. To use GPT models, this key need to be set.
)

# To be able to pull (download) models from ollama in the code, ollama.pull('model_name'). Not possible with the openAI client.
ollama.Client(
  host=host,
)

## Models to use

In [45]:
from huggingface_hub import login
from dotenv import load_dotenv

# # Log in to Hugging Face
# load_dotenv("../.env")
# access_token_read = os.environ.get('HF_API_KEY')
# if access_token_read:
#     login(token=access_token_read)
#     print("Logged in to Hugging Face successfully!")
# else:
#     print("HF_API_KEY is not set in your environment variables.")

open_models = [
    #llama models:
    {"name": "llama3.1:8b-instruct-fp16"},
    {"name": "llama3.3:70b-instruct-q8_0"},
    # {"name": "llama3.3:70b-instruct-fp16"},

    {"name": "mistral-small:22b-instruct-2409-fp16"},
    {"name": "codestral:22b-v0.1-q8_0"},
    #Not downloaded bellow
    {"name": "mistral-large:123b-instruct-2407-q4_K_M"},
    
    {"name": "qwq:32b-preview-fp16"},
]

## Calculate max tokens for each model

In [None]:

from my_packages.utils.tokens import find_max_tokens_code_tokenizer, find_max_tokens_code_api

DATA_DIR = '../data/mbpp_transformed_code_examples/only_files'

open_models_to_test = []
for model_info in open_models:
    # ollama.pull(model_info["name"])
    max_tokens = find_max_tokens_code_api(DATA_DIR, model_info["name"], client.embeddings)
    model_result = {
        "name": model_info["name"],
        "max_tokens": max_tokens,
    }
    open_models_to_test.append(model_result)

print(open_models_to_test)

# GPT-models

## Models to use

In [26]:
gpt_models = [
    {"name": "o1-preview", "tokenization": "o200k_base"},
    {"name": "gpt-4o", "tokenization": "o200k_base"},
]


## Calculate max tokens for each model

In [None]:
import tiktoken


gpt_models_to_test = []
for model_info in gpt_models:
    encoding = tiktoken.encoding_for_model(model_info["name"])
    max_tokens = find_max_tokens_code_tokenizer(DATA_DIR, encoding)
    model_result = {
        "name": model_info["name"],
        "max_tokens": max_tokens
    }
    gpt_models_to_test.append(model_result)
print(gpt_models_to_test)

# Evaluation

## Functions for evalution

In [38]:
from my_packages.evaluation.midio_compiler import compile_code, is_code_syntax_valid, is_code_semantically_valid, print_compiled_output
# from my_packages.utils.run_bash_script import run_bash_script_with_ssh
from my_packages.utils.ollama_utils import is_remote_server_reachable
import time
from subprocess import call


# Function to generate and evaluate responses
def evaluate(client, model, prompts, responses, seed, max_new_tokens=50, temperature=0.7, top_p=0.9):
    correct_syntax = 0 # Number of samples that is syntactically correct
    correct_semantic = 0 # Number of samples that is both syntactically and semantically correct
    # all_tests_passed = 0 # Number of samples that passed all tests
    total = len(prompts)
    # if model.startswith("o1"): #o1 models are not working with developer or system role
    #     print("Testing GPT model")
    #     few_shot_messages = create_few_shot_prompt(train_prompts, train_responses, context_role="user")
    # else:
    #     few_shot_messages = create_few_shot_prompt(train_prompts, train_responses, context_role="developer")

    for index, (prompt, true_response) in enumerate(zip(prompts, responses)):
        max_retries = 3
        retries = 0

        while retries < max_retries:
            try:
                print(f"Generating response..", end="\r")
                generated = client.chat.completions.create(
                    model=model,
                    messages=few_shot_messages + [{"role": "user", "content": prompt}],
                    # template="{{ if .Prompt }}<|start_header_id|>user<|end_header_id|>\n\n{{ .Prompt }}<|eot_id|>{{ end }}<|start_header_id|>assistant<|end_header_id|>\n\n```midio\n{{ .Response }}\n```<|eot_id|>", # Template enforces code block
                    max_tokens=max_new_tokens,
                    seed=seed,
                    temperature=temperature,
                    top_p=top_p,
                    stream=False,
                    stop=["```<|eot_id|>"]  # Ensure the response stops after the code block
                )
                filtered_generated = generated.choices[0].message.content.replace("//", "").strip() # '//' outside main module can lead to compiler not ending properly
                generated_code = extract_code(filtered_generated)
                break
            except Exception as e:
                retries += 1
                print(f"Attempt {retries} failed with error: {e}")
                if is_remote_server_reachable(host + "/api/tags"):
                    print("Server is reachable.")
                else:
                    rc = call("../SSH_FORWARDING.sh")
                    # Check the result
                    if rc == 0:
                        print("Command executed successfully!")
                    else:
                        print(f"SSH command failed with return code {rc}")
                        print(f"Try to manually connect to the server again. ")
                        #Wait one minute before trying again
                        try:
                            for remaining in range(60, 0, -1):
                                print(f"Next try in {remaining} seconds", end="\r")  # Print on the same line
                                time.sleep(1)  # Wait for 1 second
                            print("Time's up!                          \n")  # Clear the line after completion
                        except KeyboardInterrupt:
                            print("\nCountdown interrupted!")
    
        else:
            print("Failed to get a response from the server after " + str(retries) + " attempts.")
            generated_code = ""
            return -1

        # print(f"\n\nSample: {index}")
        # print(f"Prompt: {prompt}")
        print(f"Generated response:\n {filtered_generated}")
        # print(f"True response:\n {true_response}")
        compiled = compile_code(generated_code)
        # print_compiled_output(compiled)
        if is_code_syntax_valid(compiled):
            correct_syntax += 1
            print("Code parsed successfully!")
            if is_code_semantically_valid(compiled):
                correct_semantic += 1
                print("Code is semantically valid!")
            # Does not mean it semantically correct, but at least it compiles
            # Errors on semantics are not checked here, but logged in the is_code_compilable function
        else:
            print("Compilation failed.\n")

    return (correct_syntax / total, correct_semantic / total)

def evaluate_models(client, models_to_test, temperatures=[0.5, 0.7, 0.9], top_ps=[0.2, 0.5, 1.0], seeds=[3, 75, 346]):
    results = {}

    for model in models_to_test:
        print(f"Model: {model['name']}")

        SEED = 42 # During Validation Phase for reproducibility

        best_syntax_accuracy = 0
        best_semantic_accuracy = 0
        best_params = {"temperature": 0.7, "top_p": 0.9}

        for temp in temperatures:
            for top_p in top_ps:
                syntax_accuracy, semantic_accuracy = evaluate (
                    client,
                    model['name'],
                    val_prompts,
                    val_responses,
                    SEED,
                    model["max_tokens"],
                    temp,
                    top_p
                )
                # print(f"Tested with temp={temp} and top_p={top_p}. Gave syntax_accuracy={syntax_accuracy}")
                if syntax_accuracy > best_syntax_accuracy:
                    best_syntax_accuracy = syntax_accuracy
                if semantic_accuracy > best_semantic_accuracy:
                    best_semantic_accuracy = semantic_accuracy
                    best_params = {"temperature": temp, "top_p": top_p}
                
        print(f"Best Hyperparameters for {model['name']}: {best_params}, Validation Syntax Accuracy: {best_syntax_accuracy:.2f}, Validation Semantic Accuracy: {best_semantic_accuracy:.2f}")
    
        #Test the model on the test setm with different seeds and the best hyperparameters.
        for seed in seeds:
            print(f"\nTesting with Seed: {seed}")

            test_syntax_accuracy, test_semantic_accuracy = evaluate(
                client,
                model['name'],
                test_prompts,
                test_responses,
                seed,
                model["max_tokens"],
                temperature=best_params["temperature"],
                top_p=best_params["top_p"]
            )

            print(f"Test Syntax Accuracy for {model['name']}: {test_syntax_accuracy:.2f}")
            print(f"Test Semantic Accuracy for {model['name']}: {test_semantic_accuracy:.2f}")

            if model["name"] not in results:
                results[model["name"]] = []
            
            results[model["name"]].append({
                "seed": seed,
                "validation_syntax_accuracy": best_syntax_accuracy,
                "validation_semantic_accuracy": best_semantic_accuracy,
                "test_syntax_accuracy": test_syntax_accuracy,
                "test_semantic_accuracy": test_semantic_accuracy
            })

    # Print the final results
    print("\nFinal Results:")
    for model_name, runs in results.items():
        print(f"Model: {model_name}")
        for run in runs:
            val_syn = run["validation_syntax_accuracy"]
            val_sem = run["validation_semantic_accuracy"]
            test_syn = run["test_syntax_accuracy"]
            test_sem = run["test_semantic_accuracy"]
            print(
                f"  Seed {run['seed']}: "
                f"Val Syntax Acc: {val_syn:.2f}, "
                f"Val Semantic Acc: {val_sem:.2f}, "
                f"Test Syntax Acc: {test_syn:.2f}, "
                f"Test Semantic Acc: {test_sem:.2f}"
            )

    return results



## Run evaluation

### Open source models

In [None]:
from my_packages.utils.ollama_utils import is_remote_server_reachable
from openai import OpenAI

host = 'http://localhost:11434'

if is_remote_server_reachable(host + "/api/tags"):
    print("Server is reachable.")
else:
    rc = call("../SSH_FORWARDING.sh")
    print("Ollama server is not reachable. Batch job might have finished. Try running bash script again.")


client = OpenAI(
  base_url=host+'/v1/',
  api_key='ollama' #Ignored with 'ollama'. To use GPT models, this key need to be set.
)
evaluate_models(client, open_models_to_test)



### GPT models

In [None]:

from openai import OpenAI
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv("../.env")
# Get the OpenAI API key from environment variables
openai_token = os.getenv('OPENAI_API_KEY')

if openai_token:
    client = OpenAI(
        api_key=openai_token  # Use the OpenAI API key
    )
    evaluate_models(client, gpt_models_to_test)
else:
    print("OPENAI_API_KEY is not set in your environment variables.")