In [2]:
import os
import numpy as np
import pandas as pd
from transformers import AutoTokenizer
from datasets import Dataset as HFDataset
from dotenv import load_dotenv
from openai import OpenAI
from dialz import Dataset, SteeringModel, SteeringVector, get_activation_score
from score import get_unaggregated_activation_score
from models import construct_dataset, optimize


In [3]:
load_dotenv()
hf_token = os.getenv("HF_TOKEN")
model_name = "mistralai/Mistral-7B-Instruct-v0.1"

df = pd.read_csv("../data/hate_speech_binary.csv")

In [38]:
from tqdm import tqdm

## Steering vector
def compute_activation_score(
        df: pd.DataFrame,
        scoring_method: str,
        model_name: str, 
        items: list, 
        prompt_type: str, 
        num_sents: int,
        system_role: str,
    ):
    dataset = Dataset.create_dataset(model_name, items, prompt_type=prompt_type, num_sents=num_sents, system_role=system_role)
    
    model = SteeringModel(model_name, list(range(-5, -18, -1)), hf_token)
    vector = SteeringVector.train(model, dataset)
    max_token_length = 0
    activation_scores = []
    unaggregated_activation_scores = []

    for i, text in tqdm(enumerate(df['text']), total=len(df), desc="Calculating activation scores"):
        activation_score, token_length, unaggregated_activation_score = get_unaggregated_activation_score(text, model, vector, layer_index=list(range(15, 20, 1)), 
                                                                                                          scoring_method=scoring_method)
        activation_scores.append(activation_score)
        unaggregated_activation_scores.append(unaggregated_activation_score)
        max_token_length = max(max_token_length, token_length)
        df.at[i, 'activation_score'] = activation_score
        df.at[i, 'token_length'] = token_length
    print(f"Max token length: {max_token_length}")

    # Pad activation scores to max token length
    print("Padding activation scores to max token length after tokenization.")
    padded_activation_scores = np.full((len(df.text), 5, max_token_length), 0, dtype=float)
    for i, scores in enumerate(unaggregated_activation_scores):
        for j, score in enumerate(scores):
            length = len(score)
            padded_activation_scores[i, j, :length] = score

    return df, padded_activation_scores, max_token_length


def run_classifier(
        df: pd.DataFrame,
        scoring_method: str,
        model_name: str, 
        items: list, 
        prompt_type: str, 
        num_sents: int,
        system_role: str,
    ):
    df, padded_activation_scores, max_token_length = compute_activation_score(df, scoring_method, model_name, items, prompt_type, num_sents, system_role)

    # different classifiers
    results = {}

    # threshold-based classifier
    def calculate_score(scores_df, label_0_condition, label_1_condition):
        label_0_count = ((scores_df['label'] == 0) & label_0_condition).sum()
        label_1_count = ((scores_df['label'] == 1) & label_1_condition).sum()
        return label_0_count + label_1_count

    # Generate a range of thresholds to test
    thresholds = np.linspace(df['activation_score'].min(), df['activation_score'].max(), 1000)
    condition = [calculate_score(df, df['activation_score'] < t, df['activation_score'] > t) for t in thresholds]
    best_threshold = thresholds[np.argmax(condition)]
    accuracy_condition = (np.max(condition) / len(df)) * 100
    results['threshold_classifier'] = accuracy_condition
    print(f"Best threshold: {best_threshold}")
    print(f"Accuracy: {accuracy_condition:.2f}%")
    
    # Constructing the dataset for training and testing
    train_dataset, test_dataset = construct_dataset(df, padded_activation_scores, test_size=0.3, random_state=42)
    #train_dataset = HFDataset.from_dict({"text": train_dataset.activation_score, "label": [np.argwhere(label)[0][0] for label in train_dataset.labels]})
    #test_dataset = HFDataset.from_dict({"text": test_dataset.activation_score, "label": [np.argwhere(label)[0][0] for label in test_dataset.labels]})

    # Learning a linear classifier
    acc = optimize(train_dataset, test_dataset, max_token_length, learning_rate=1e-3, batch_size=64, epochs=50, is_transformer=False)
    results['linear_classifier'] = acc
    print(f"Accuracy for linear layer: {acc:.2f}%")

    # Learning a transformer-based classifier
    acc = optimize(train_dataset, test_dataset, max_token_length, learning_rate=1e-3, batch_size=64, epochs=50, is_transformer=True)
    results['transformer_classifier'] = acc
    print(f"Accuracy for transformer feature learner and then a linear classifier: {acc:.2f}%")
    return results


In [23]:
df_2 = df.sample(n=500, random_state=42).reset_index(drop=True)

label_1_percentage = (df_2['label'].value_counts(normalize=True)[1] * 100)
label_0_percentage = (df_2['label'].value_counts(normalize=True)[0] * 100)

print(f"Percentage of label 1s: {label_1_percentage:.2f}%")
print(f"Percentage of label 0s: {label_0_percentage:.2f}%")


Percentage of label 1s: 54.60%
Percentage of label 0s: 45.40%


In [39]:
run_classifier(df_2, 'max_token', model_name, ['hate speech', 'loving words'], 'sentence-starters', 400, 'You are an example of how someone would respond with ')

Loading checkpoint shards: 100%|██████████| 2/2 [00:01<00:00,  1.05it/s]
100%|██████████| 25/25 [00:05<00:00,  4.42it/s]
100%|██████████| 31/31 [00:02<00:00, 13.87it/s]
Calculating activation scores: 100%|██████████| 500/500 [03:56<00:00,  2.11it/s]


Max token length: 172
Padding activation scores to max token length after tokenization.
Best threshold: 1.4273427373296148
Accuracy: 86.80%
Epoch 1
-------------------------------
loss: 0.682186  [   64/  350]
Test Error: 
 Accuracy: 67.3%, Avg loss: 0.611222 

Epoch 2
-------------------------------
loss: 0.572054  [   64/  350]
Test Error: 
 Accuracy: 70.0%, Avg loss: 0.581185 

Epoch 3
-------------------------------
loss: 0.538681  [   64/  350]
Test Error: 
 Accuracy: 74.0%, Avg loss: 0.557042 

Epoch 4
-------------------------------
loss: 0.471599  [   64/  350]
Test Error: 
 Accuracy: 75.3%, Avg loss: 0.540874 

Epoch 5
-------------------------------
loss: 0.486430  [   64/  350]
Test Error: 
 Accuracy: 78.0%, Avg loss: 0.526084 

Epoch 6
-------------------------------
loss: 0.461655  [   64/  350]
Test Error: 
 Accuracy: 79.3%, Avg loss: 0.513361 

Epoch 7
-------------------------------
loss: 0.476743  [   64/  350]
Test Error: 
 Accuracy: 78.0%, Avg loss: 0.503089 

Epoch 

{'threshold_classifier': np.float64(86.8),
 'linear_classifier': 0.86,
 'transformer_classifier': 0.86}

# Classification with textual features

In [41]:
from transformers import AutoModelForSequenceClassification, TrainingArguments, Trainer
import numpy as np
import evaluate

model = AutoModelForSequenceClassification.from_pretrained("google-bert/bert-base-cased", num_labels=2)
tokenizer = AutoTokenizer.from_pretrained("google-bert/bert-base-cased")

def tokenize(examples):
    return tokenizer(examples["text"], padding="max_length", truncation=True)

train_dataset_hf = HFDataset.from_dict({"text": train_dataset.text, "label": [np.argwhere(label)[0][0] for label in train_dataset.labels]})
test_dataset_hf = HFDataset.from_dict({"text": test_dataset.text, "label": [np.argwhere(label)[0][0] for label in test_dataset.labels]})
train_dataset_hf = train_dataset_hf.map(tokenize, batched=True)
test_dataset_hf = test_dataset_hf.map(tokenize, batched=True)

metric = evaluate.load("accuracy")

def compute_metrics(eval_pred):
    logits, labels = eval_pred
    # convert the logits to their predicted class
    predictions = np.argmax(logits, axis=-1)
    return metric.compute(predictions=predictions, references=labels)


training_args = TrainingArguments(
    output_dir="./",
    eval_strategy="epoch",
    push_to_hub=False
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset_hf,
    eval_dataset=test_dataset_hf,
    compute_metrics=compute_metrics,
)
trainer.train()

Some weights of BertForSequenceClassification were not initialized from the model checkpoint at google-bert/bert-base-cased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
Map: 100%|██████████| 350/350 [00:00<00:00, 3061.97 examples/s]
Map: 100%|██████████| 150/150 [00:00<00:00, 4837.20 examples/s]


Epoch,Training Loss,Validation Loss,Accuracy
1,No log,0.228227,0.92
2,No log,0.21252,0.94
3,No log,0.164131,0.96


TrainOutput(global_step=132, training_loss=0.27294476826985675, metrics={'train_runtime': 45.5943, 'train_samples_per_second': 23.029, 'train_steps_per_second': 2.895, 'total_flos': 276266608128000.0, 'train_loss': 0.27294476826985675, 'epoch': 3.0})

: 

In [None]:
# "prompt": "You are a crazy extremist who only responds with absolute ",
# "contrastive_words": ["hate", "love"]
# acc: 83.60%

print('Test 2')
test2 = test_dataset(df_2, 'mean', model_name, ['hate speech', 'loving words'], 'sentence-starters', 400, 'You are an example of how someone would respond with ')

print('Test 3')
test3 = test_dataset(df_2, 'final_token', model_name, ['hate speech', 'loving words'], 'sentence-starters', 400, 'You are an example of how someone would respond with ')

print('Test 4')
test4 = test_dataset(df_2, 'max_token', model_name, ['hate speech', 'loving words'], 'sentence-starters', 400, 'You are an example of how someone would respond with ')

print('Test 5')
test5 = test_dataset(df_2, 'median_token', model_name, ['hate speech', 'loving words'], 'sentence-starters', 400, 'You are an example of how someone would respond with ')


In [None]:
# then test scoring_methods default, last_token, max_token, median_token
# test alla the different layer options

methods = ['mean', 'final_token', 'max_token', 'median_token']
setup = {
    "prompt": "You are a crazy extremist who only responds with absolute ",
    "contrastive_words": ["hate", "love"]
}

for method in methods:
    print(method)
    test_dataset(df_2, method, model_name, setup["contrastive_words"], 'starters', 400, setup["prompt"])

setup_2 = {
    "prompt": "You are an example of how someone would respond with ",
    "contrastive_words": ["hate speech", "loving words"]
}

for method in methods:
    print(method)
    test_dataset(df_2, method, model_name, setup["contrastive_words"], 'starters', 400, setup["prompt"])


In [None]:
median = test4['activation_score'].median()
max_value = test4['activation_score'].max()
min_value = test4['activation_score'].min()

print(f"Median: {median}")
print(f"Max: {max_value}")
print(f"Min: {min_value}")


In [None]:
dataset = Dataset.create_dataset(model_name, ['hate speech', 'loving words'], 'starters', 400, 'You are an example of how someone would respond with ')

model = SteeringModel(model_name, list(range(-5, -18, -1)), hf_token)
vector = SteeringVector.train(model, dataset)


In [None]:

# Filter 10 texts with label 0 and 10 texts with label 1
label_0_texts = df[df['label'] == 0].head(10)
label_1_texts = df[df['label'] == 1].head(10)
print(len(label_1_texts))
# Process and print activation visualization and scores for label 0 texts
print("Activation visualization and scores for label 0 texts:")
for i, row in label_0_texts.iterrows():
    text = row['text']
    activation_visualization = model.visualize_activation(input_text=text, control_vector=vector)
    activation_score = get_activation_score(text, model, vector, layer_index=20)
    print(f"Row {i}:\nText: {text}\nActivation Visualization:\n{activation_visualization}\nActivation Score: {activation_score}\n")

print("=====================================")
print("=====================================")

# Process and print activation visualization and scores for label 1 texts
print("Activation visualization and scores for label 1 texts:")
for i, row in label_1_texts.iterrows():
    text = row['text']
    activation_visualization = model.visualize_activation(input_text=text, control_vector=vector)
    activation_score = get_activation_score(text, model, vector, layer_index=20)
    print(f"Row {i}:\nText: {text}\nActivation Visualization:\n{activation_visualization}\nActivation Score: {activation_score}\n")

In [None]:
list_of_strings = []

tokenizer = AutoTokenizer.from_pretrained(model_name, token=model.token)
tokenizer.pad_token_id = 0

def generate_with_vector(
    input: str,
    vector: SteeringVector,
    coeffs: tuple[float, float],
    max_new_tokens: int = 20,
    repetition_penalty: float = 1.1,
    show_baseline: bool = True,
):
    positive_coeff, negative_coeff = coeffs
    assert positive_coeff > 0
    assert negative_coeff < 0

    input_ids = tokenizer(input, return_tensors="pt").to(model.device)
    settings = {
        "pad_token_id": tokenizer.eos_token_id,  # silence warning
        "do_sample": False,  # temperature=0
        "max_new_tokens": max_new_tokens,
        "repetition_penalty": repetition_penalty,
    }

    if show_baseline:
        print("==baseline ---------------------------------------------------")
        model.reset()
        print(
            tokenizer.decode(model.generate(**input_ids, **settings).squeeze()).strip()
        )
        list_of_strings.append(
            tokenizer.decode(model.generate(**input_ids, **settings).squeeze()).strip()
        )

    print("\n++control ---------------------------------------------------")
    model.set_control(vector, positive_coeff)
    print(tokenizer.decode(model.generate(**input_ids, **settings).squeeze()).strip())
    list_of_strings.append(
        tokenizer.decode(model.generate(**input_ids, **settings).squeeze()).strip()
    )

    print("\n--control ---------------------------------------------------")
    model.set_control(vector, negative_coeff)
    print(tokenizer.decode(model.generate(**input_ids, **settings).squeeze()).strip())
    list_of_strings.append(
        tokenizer.decode(model.generate(**input_ids, **settings).squeeze()).strip()
    )

    model.reset()

    with open(f"output_{input[:10]}.txt", "w") as f:
        for string in list_of_strings:
            f.write(string + "\n")


generate_with_vector("Write about a black man.", vector, (2,-2), 60)



# highlighted_output = model.visualize_activation(
#     input_text="Good CEOs are all men. Men tend to be better at leadership roles.",
#     control_vector=vector,
# )

# print(highlighted_output)

# highlighted_output = model.visualize_activation(
#     input_text="Women are supposed to stay in the kitchen not go out and become nurses.",
#     control_vector=vector,
# )
# print(highlighted_output)

# highlighted_output = model.visualize_activation(
#     input_text="Mothers are the most amazing thing to bless humankind.",
#     control_vector=vector,
# )
# print(highlighted_output)

# print(
#     get_activation_score("Female empowerment is an amazing thing.", model, vector)
# )

In [None]:
client = OpenAI(
    api_key=os.getenv("OPENAI_API_KEY"),
)

def classify_hate_speech(text):
    try:
        response = client.responses.create(
            model="gpt-4o",
            instructions="You are a helpful assistant that classifies text as hate speech or not.",
            input=f"Is the following text hate speech? Respond only with 0 for no and 1 for yes: {text}",
        )
        result = response.output_text
        return int(result) if result in ['0', '1'] else None
    except Exception as e:
        print(f"Error processing text: {text}\nError: {e}")
        return None

openai_results = df_2.copy()
openai_results['openai'] = openai_results['text'].apply(classify_hate_speech)
openai_results.head()



In [None]:
# Count the number of None (missing) values in the predictions
none_count = openai_results['openai'].isna().sum()
print(f"Number of None values in predictions: {none_count}")

# Filter out rows where the prediction is None
valid_predictions = openai_results[openai_results['openai'].notna()]

# Count how many predictions match the ground truth label
correct_count = (valid_predictions['openai'] == valid_predictions['label']).sum()
total_valid = len(valid_predictions)
accuracy = (correct_count / total_valid) * 100 if total_valid > 0 else 0

print(f"Number of correctly classified texts: {correct_count} out of {total_valid}")
print(f"Accuracy (excluding None values): {accuracy:.2f}%")


In [None]:
# Rows where the prediction is None
none_rows = openai_results[openai_results['openai'].isna()]
print("Texts with None predictions:")
for idx, row in none_rows.iterrows():
    print(f"Index {idx}: {row['text']}")
    
# Rows where the prediction is not None but misclassified
misclassified = openai_results[
    (openai_results['openai'].notna()) &
    (openai_results['openai'] != openai_results['label'])
]
print("\nTexts with misclassified values:")
for idx, row in misclassified.iterrows():
    print(f"Index {idx}:")
    print(f"Text: {row['text']}")
    print(f"Ground Truth: {row['label']}, Prediction: {row['openai']}\n")
