In [None]:
import wandb
import os
import json
from IPython.display import HTML

# Initialize the wandb run
run = wandb.init()

# Access the artifact and download it
artifact = run.use_artifact('nlp_and_interpretability/feature_explanations/pythia-70m_hh_rlhf_explanations_artifact:v2', type='dataset')
artifact_dir = artifact.download()

In [None]:
# Locate the JSON file in the downloaded directory
# Assuming there's only one JSON file, or you know the filename beforehand
json_file_path = None
for file_name in os.listdir(artifact_dir):
    if file_name.endswith(".json"):
        json_file_path = os.path.join(artifact_dir, file_name)
        break

# Load the JSON file
if json_file_path:
    with open(json_file_path, 'r') as json_file:
        data = json.load(json_file)
        # Display or process the JSON data as needed
else:
    print("No JSON file found in the artifact directory.")
#data

In [None]:
input_text = "I\t0.0\nĠread\t0.0\nĠit\t0.0\nĠsomewhere\t0.0\nĠand\t0.0\nĠit\t1.0\nĠwould\t2.0\nĠmake\t0.0\nI\t0.0\nĠhave\t0.0\nĠno\t0.0\nĠidea\t2.0\nĠwhen\t0.0\nĠmy\t0.0\nĠperiod\t0.0\nĠshould\t1.0\nI\t0.0\nĠreally\t0.0\nĠdon\t0.0\n't\t0.0\nĠknow\t2.0\nĠwhere\t1.0\nĠthis\t0.0\nĠmonth\t0.0\nNot\t0.0\nĠsure\t2.0\nĠif\t1.0\nĠI\t0.0\nĠmentioned\t0.0\nĠthis\t0.0\nĠbefore\t0.0\nĠbut\t0.0\nNot\t0.0\nĠsure\t2.0\nĠif\t1.0\nĠI\t0.0\nâĢĻ\t0.0\nve\t0.0\nĠshared\t0.0\nĠthis\t0.0"

def parse_input(input_text):
    output = []
    for word_and_activation in input_text.strip().split('\n'):
        token, activation = word_and_activation.split('\t')
        token = token.replace('Ġ', '')
        output.append((token, float(activation)))
    return output

parse_input(input_text)

In [None]:
class NeuronExplanation:

    def __init__(self, neuron_name, tokens_and_activations, explanation):
        self.neuron_name = neuron_name
        self.tokens_and_activations = tokens_and_activations.copy()
        self.explanation = explanation

    def __str__(self):
        return f'Neuron_name: {self.neuron_name},\nTokens_and_activations: {self.tokens_and_activations},\nExplanation: {self.explanation}'

all_neuron_explanations = []
for neuron_name, explanation_and_activations in data.items():
    explanation = explanation_and_activations['explanation']
    original_activations = explanation_and_activations['original_activations']
    tokens_and_activations = parse_input(original_activations)

    neuron_explanation = NeuronExplanation(
        neuron_name=neuron_name, tokens_and_activations=tokens_and_activations, explanation=explanation)

    all_neuron_explanations.append(neuron_explanation)

In [None]:
lengths = [len(item.tokens_and_activations) for item in all_neuron_explanations]
#lengths

In [None]:
import random
from textwrap import dedent

class TestQuestion:
    def __init__(self, true_explanation, scrambled_explanations, tokens_and_activations):
        self.true_explanation = true_explanation
        self.scrambled_explanations = scrambled_explanations
        self.tokens_and_activations = tokens_and_activations
        self.markdown = self.prepare_markdown(self.tokens_and_activations)
        self.max_options = 4
        
        self.options = [self.true_explanation] + self.scrambled_explanations
        if len(self.options) < self.max_options:
            self.options += (self.max_options - len(self.options)) * ["Padding option - please ignore"]
        random.shuffle(self.options)
        self.true_option = self.options.index(true_explanation)
        
        self.prompt = (f"""Below is a sequence of texts, alongwith the activations for a neuron.
Red being the darkest. Pick which of the explanations below is the best choice, or none of them
above if none seem suitable.<br>
Token Activations:
<br>{self.markdown}
<br><br>
Explanation 1: {self.options[0]}
<br><br>
Explanation 2: {self.options[1]}
<br><br>
Explanation 3: {self.options[2]}
<br><br>
Explanation 4: {self.options[3]}
<br><br>
Explanation 5: None of the above.
"""
)

    def to_json(self):
        result = {
            "prompt": self.prompt,
            "options": self.options,
            "true_option": self.true_option
        }
        return result

    def __str__(self):
        return f'True explanation: {self.true_explanation} \n {len(self.tokens_and_activations)}'

    def activation_to_color(self, activation, min_activation, max_activation):
        # Normalize the activation value to [0, 1]
        if max_activation - min_activation == 0:  # Avoid division by zero
            return 'rgba(0, 255, 0, 0)'  # Fully transparent if no range
        
        normalized = (activation - min_activation) / (max_activation - min_activation)
        red_value = int(normalized * 255)  # Scale to 255 for green
        return f'rgba({red_value}, 0, 0, {normalized})'  # RGB color from transparent to green

    
    def prepare_markdown(self, tokens_and_activations):
        min_activation = min(word_activation[1] for word_activation in tokens_and_activations)
        max_activation = max(word_activation[1] for word_activation in tokens_and_activations)
    
        # Build HTML string with colored activations
        html_output = ""
        for word, activation in tokens_and_activations:
            color = self.activation_to_color(activation, min_activation, max_activation)
            html_output += f'<span style="background-color: {color}; padding: 2px;">{word}</span> '
    
        return html_output

def generate_test_questions(all_neuron_explanations, k=3):
    test_questions = []
    all_explanations = [ne.explanation for ne in all_neuron_explanations]
    
    for ne in all_neuron_explanations:
        true_explanation = ne.explanation
        tokens_and_activations = ne.tokens_and_activations

        # Generate a scrambled list of other explanations (excluding the true explanation)
        scrambled_explanations = random.sample(
            [exp for exp in all_explanations if exp != true_explanation],
            k=min(k, len(all_explanations) - 1)
        )
        # Create a new TestQuestion
        test_question = TestQuestion(
            true_explanation=true_explanation,
            scrambled_explanations=scrambled_explanations,
            tokens_and_activations=tokens_and_activations
        )

        # Add to the list
        test_questions.append(test_question)

    return test_questions

In [None]:
test_questions = generate_test_questions(all_neuron_explanations, k=3)
all_json = [question.to_json() for question in test_questions]

import json

with open("annotation_dataset.json", "w") as f_out:
    json.dump(all_json, f_out)

In [None]:
item_no = 73
html_output = test_questions[item_no].prompt
print(test_questions[item_no].true_option)
print(test_questions[item_no].true_explanation)
HTML(html_output)


In [None]:
HTML(test_questions[25].markdown)

### Parse annotations.

In [None]:
import json
with open("final_annotations.json", "r") as f_in:
    all_annotations = json.load(f_in)

In [None]:
true_labels = []
annotator_labels = {1: [], 2: []}

def calculate_agreement_rate(ground_truth, labels):
    # Ensure both lists are the same length
    if len(ground_truth) != len(labels):
        raise ValueError("Both lists must be of the same length")

    # Calculate the number of agreements
    agreements = sum(1 for gt, label in zip(ground_truth, labels) if gt == label)
    
    # Calculate the agreement rate
    agreement_rate = agreements / len(ground_truth)
    
    return agreement_rate


def process_annotations(annotations, num_annotations=40):
    correct_count = 0
    for annotation_element in all_annotations[:num_annotations]:
        annotations = annotation_element['annotations']
        data = annotation_element['data']
        true_labels.append(int(data['true_option']) + 1)
        if len(annotations) >= 2:
            correct_count+=1
        for curr_annotation in annotations:
            target = curr_annotation['completed_by']
            annotator_label = int(curr_annotation['result'][0]['value']['choices'][0])
            annotator_labels[target].append(annotator_label)

process_annotations(all_annotations)

In [None]:
none_of_the_above = {}

for target in annotator_labels:
    curr_labels = annotator_labels[target]
    none_of_the_above[target] = len([item for item in curr_labels if item == 5])/len(curr_labels)
    agreement_rate = calculate_agreement_rate(true_labels, curr_labels)
    print(f'Agreement rate is {agreement_rate} for target {target}')

In [None]:
calculate_agreement_rate(annotator_labels[1], annotator_labels[2])