In [None]:
!pip install datasets

In [None]:
!huggingface-cli login

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import pandas as pd
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
from sklearn.metrics import classification_report, confusion_matrix, ConfusionMatrixDisplay
import torch
import matplotlib.pyplot as plt
from datasets import Dataset
from tqdm import tqdm
import random

In [None]:
task = 2

data_path = "drive/MyDrive/463_hw2_data/"
if task == 1:
    data_path = data_path + "test_data_orientation.tsv"
else:
    data_path = data_path + "train_data_power.tsv"


data = pd.read_csv(data_path, sep='\t')

data = data.dropna(subset=["text_en"])  # Remove rows where text_en is NaN
data = data[data["text_en"].str.strip() != ""]  # Remove rows where text_en is empty or whitespace

data = data.reset_index(drop=True)

sample_size = 1000
seed = 42
data = data.sample(n=sample_size, random_state=seed).reset_index(drop=True)

In [None]:
# Prepare data
data_en = {"text": data["text_en"], "task": [task] * len(data)}
data_original = {"text": data["text"], "task": [task] * len(data)}

# Create a Dataset
dataset_en = Dataset.from_dict(data_en)
dataset_original = Dataset.from_dict(data_original)

# Define a function to generate prompts
def create_prompt(text, task):
    if task == 1:
        return f"Is the speaker's party leaning left (0) or right (1)?\nText: {text}\nAnswer, only say 0 or 1:"
    elif task == 2:
        return f"Is the speaker's party governing (0) or in opposition (1)?\nText: {text}\nAnswer, only say 0 or 1:"

def generate_prompts(example):
    text = example["text"]
    if example["task"] == 1:
        example["prompt"] = f"Is the speaker's party leaning left (0) or right (1)?\nText: {text}\nAnswer:"
    elif example["task"] == 2:
        example["prompt"] = f"Is the speaker's party governing (0) or in opposition (1)?\nText: {text}\nAnswer:"
    return example

# Apply the function to create prompts
dataset_en = dataset_en.map(generate_prompts)
dataset_original = dataset_original.map(generate_prompts)

In [None]:
# Load the pre-trained causal language model
model_name = "meta-llama/Llama-3.1-8B-Instruct"
causal_pipeline = pipeline("text-generation", model=model_name, device=0, torch_dtype=torch.float16, batch_size = 4)

# Ensure the tokenizer used in causal_pipeline has a pad_token_id
causal_pipeline.tokenizer.pad_token_id = causal_pipeline.tokenizer.eos_token_id

# Ensure the model is on the GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
causal_pipeline.model.to(device)


In [None]:
# Define batch size
batch_size = 4

def inference(dataset):
    num_batches = len(dataset) // batch_size + (len(dataset) % batch_size > 0)
    results = []

    for i in tqdm(range(0, len(dataset), batch_size), desc="Processing Batches", unit="batch"):
        batch_prompts = dataset["prompt"][i:i + batch_size]  # Get a batch of prompts
        batch_results = causal_pipeline(batch_prompts, max_new_tokens=100, do_sample=True, temperature=0.8, top_p=0.8, pad_token_id=128001)
        results.extend(batch_results)  # Append results

    return results

print("Inference on English text")
results_en = inference(dataset_en)

print("Inference on original text")
results_original = inference(dataset_original)

In [None]:
# Process results
def make_predictions(results):
    predictions = []
    for output in results:
        prediction = output[0]["generated_text"].split("Answer:")[-1].strip()
        prediction.lower()
        if task == 1:
            if ("0" in prediction) or ("left" in prediction):
                predictions.append(0)
            elif ("1" in prediction) or ("right" in prediction):
                predictions.append(1)
            else:
                predictions.append(random.randint(0,1))
        elif task == 2:
            if ("0" in prediction) or ("governing" in prediction):
                predictions.append(0)
            elif ("1" in prediction) or ("opposition" in prediction):
                predictions.append(1)

            else:
                predictions.append(random.randint(0,1))
        else:
            print("Invalid task")

    return predictions


predictions_en = make_predictions(results_en)
predictions_original = make_predictions(results_original)

In [None]:
# Evaluate the results
def evaluate(true_labels, predictions, task_name, text_type):
    print(f"\nEvaluation for {task_name} ({text_type}):")
    print(classification_report(true_labels, predictions, zero_division=0))

    # Compute the confusion matrix
    cm = confusion_matrix(true_labels, predictions)

    # Plot the confusion matrix
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=["Class 0", "Class 1"])
    disp.plot(cmap=plt.cm.Blues, values_format="d")
    plt.title(f"Confusion Matrix for {task_name} ({text_type})")
    plt.show()

true_labels = data["label"]
if task == 1:
    task_name = "Task 1 - Political Ideology"
else:
    task_name = "Task 2 - Government Status"

evaluate(true_labels, predictions_en, task_name, "English")
evaluate(true_labels, predictions_original, task_name, "Original Language")