In [3]:
import pandas as pd
import json
import json
from tqdm import tqdm
import pandas as pd
import sys
sys.path.append("../src")
import prompt_utils
import os
import random

# vicuna 
# with rules classification only (0.76)
vicuna_with_rules_classification_only_path = "../data/vicuna_4bit/generic_prompt_with_rules_only_classification/"
vicuna_with_rules_classification_only_func = prompt_utils.get_vicuna_prompt_with_rules_only_classification

# OA LLAMA
# Classification Only V03 (0.81)
# 1 pos 1 neg (0.79)
oa_classification_only_v03_path = "../data/openassistant_llama_30b_4bit/generic_prompt_without_context_only_classification_v03/"
oa_classification_only_v03_func = prompt_utils.get_openassistant_llama_30b_4bit_without_context_only_classification_v03
oa_1pos_1neg_path_path = "../data/openassistant_llama_30b_4bit/generic_prompt_few_shot_prompt_only_classification_1_pos_1_neg_example/"
oa_1pos_1neg_path_func = prompt_utils.get_openassistant_llama_30b_4bit_few_shot_prompt_only_classification_1_pos_1_neg_example

# Text Davinci
# Elaboration First V02 (0.94)
davinci_elaboration_first_v02_path = "../data/openai_text_davinci_003/generic_prompt_without_context_elaboration_first_v02/"
davinci_elaboration_first_v02_func = prompt_utils.get_openai_prompt_without_context_elaboration_first_v02

# Define a list of filenames to load
labeled_data_filename = "../data/labeled_data/generic_test_0.json"

dfs = []
with open(labeled_data_filename) as f:
    data = json.load(f)
df = pd.DataFrame(data["train"])
dfs.append(df)
df = pd.DataFrame(data["test"])
dfs.append(df)
df = pd.DataFrame(data["valid"])
dfs.append(df)
df_all = pd.concat(dfs)
all_labels = ["War/Terror", "Conspiracy Theory", "Education", "Election Campaign", "Environment", 
              "Government/Public", "Health", "Immigration/Integration", 
              "Justice/Crime", "Labor/Employment", 
              "Macroeconomics/Economic Regulation", "Media/Journalism", "Religion", "Science/Technology"]

balanced_dfs = prompt_utils.generate_binary_balanced_dfs(all_labels, df_all)

In [None]:
## Change output_folder, models_to_test_names, and model_funcs to match the models you want to test
output_folder = vicuna_with_rules_classification_only_path
models_to_test_names = ["generic_prompt_with_rules_classification_only"]
model_funcs = [vicuna_with_rules_classification_only_func]
rules = True

for model_name, model_func in zip(models_to_test_names, model_funcs):

    for cross_validation_idx in range(1,5):

        print("Starting with model: " + model_name)
        print("----------------------------------")
        df_all_tmp = df_all.copy()

        df_all_tmp['normalized_tweet'] = None
        normalized_tweets_db = {}
        output_folder_tmp = f"{output_folder}{model_name}/"

        if not os.path.exists(output_folder_tmp):
            os.makedirs(output_folder_tmp)

        for idx, label in enumerate(all_labels):

            sample_df = balanced_dfs[idx]

            print("Starting requesting for label: " + label + "\n")

            new_column_name = f'{label}_pred'
            df_all_tmp[new_column_name] = None
            request_params = prompt_utils.get_base_request_params()

            i = 0
            for index, row in tqdm(sample_df.iterrows(), total=sample_df.shape[0]):

                tweet_text = prompt_utils.normalize_tweet_simplified(row['text'])
                df_all_tmp.loc[lambda df: df['id'] == row["id"], 'normalized_tweet'] = tweet_text

                pos_example_tweet = prompt_utils.get_positive_example(sample_df, label, row["text"])
                neg_example_tweet = prompt_utils.get_negative_example(sample_df, label, row["text"])

                pos_example_tweet = prompt_utils.normalize_tweet_simplified(pos_example_tweet)
                neg_example_tweet = prompt_utils.normalize_tweet_simplified(neg_example_tweet)

                # select the function based on model_func and generate the prompt
                if '1_pos_example' in model_func.__name__:
                    prompt, followup = model_func(tweet_text, label, pos_example_tweet)
                elif '1_neg_example' in model_func.__name__:
                    prompt, followup = model_func(tweet_text, label, neg_example_tweet)
                elif '1_random_example' in model_func.__name__:
                    example_tweet = random.choice([pos_example_tweet, neg_example_tweet])
                    example_tweet_label = 1 if example_tweet == pos_example_tweet else 0
                    prompt, followup = model_func(tweet_text, label, example_tweet, example_tweet_label)
                elif '_random_example' in model_func.__name__:
                    n = int(model_func.__name__.split("_")[0])
                    examples = prompt_utils.get_random_examples(sample_df, label, row["text"], n) #set number of examples here
                    prompt, followup = model_func(tweet_text, label, examples)
                elif '1_pos_1_neg_example' in model_func.__name__:
                    prompt, followup = model_func(tweet_text, label, pos_example_tweet, neg_example_tweet)
                else:
                    if rules:
                        prompt, followup, request_params = model_func(tweet_text, label, prompt_utils.RULES[idx], request_params)
                    else:
                        prompt, followup, request_params = model_func(tweet_text, label, request_params)

                request_params["stopping_strings"] = ["### Human:", "Human:", "###"]
                response = prompt_utils.get_response(request_params, prompt, "")

                # TODO: if followup is needed for a second call, manually adjust how the response is parsed to followup with a second prompt
                if followup != "":
                    response = prompt_utils.get_response(request_params, followup + response, "")

                # Save the response in the 'api_results' column
                df_all_tmp.loc[lambda df: df['id'] == row["id"], new_column_name] = response
                
                i+=1
                # Save the DataFrame to a CSV file every 100 steps
                if (i + 1) % 100 == 0:
                    output_path = os.path.join(output_folder_tmp, 'generic_test_0.csv')
                    df_all_tmp.to_csv(output_path, index=False)
                    print(f"Saved progress at index {index}")
                    print("Sample Tweet: ", tweet_text)
                    print("Sample Annotation: ", response)

            # Save the final DataFrame to a CSV file
            output_path = os.path.join(output_folder_tmp, 'generic_test_0.csv')
            df_all_tmp.to_csv(output_path, index=False)

        # Save the request_params as a JSON file in the output folder
        with open(os.path.join(output_folder_tmp, 'request_params.json'), 'w') as f:
            json.dump(request_params, f, indent=4)