In [None]:
!pip install -q -U google-genai
!pip install pydantic



In [1]:
from datasets import load_dataset

# Load the dataset
dataset = load_dataset("eriktks/conll2003")

# Access the train, validation, and test splits
train_data = dataset["train"]
validation_data = dataset["validation"]
test_data = dataset["test"]

# Print a sample
print(train_data[0])

{'id': '0', 'tokens': ['EU', 'rejects', 'German', 'call', 'to', 'boycott', 'British', 'lamb', '.'], 'pos_tags': [22, 42, 16, 21, 35, 37, 16, 21, 7], 'chunk_tags': [11, 21, 11, 12, 21, 22, 11, 12, 0], 'ner_tags': [3, 0, 7, 0, 0, 0, 7, 0, 0]}


In [2]:
# Print a sample from the training data
example = train_data[11001]
print("Example from the training data:")
print(example)
print("Tokens:", example["tokens"])
print("Labels:", example["ner_tags"])

Example from the training data:
{'id': '11001', 'tokens': ['1886', '-', 'At', 'Skeleton', 'Canyon', 'in', 'Arizona', ',', 'Geronimo', ',', 'Apache', 'chief', 'and', 'leader', 'of', 'the', 'last', 'great', 'Red', 'Indian', 'rebellion', 'finally', 'surrendered', 'to', 'General', 'Nelson', 'Miles', '.'], 'pos_tags': [11, 8, 15, 22, 22, 15, 22, 6, 22, 6, 22, 21, 10, 21, 15, 12, 16, 16, 22, 22, 21, 30, 38, 35, 22, 22, 22, 7], 'chunk_tags': [11, 12, 13, 11, 12, 13, 11, 0, 11, 0, 11, 12, 0, 11, 13, 11, 12, 12, 12, 12, 12, 3, 21, 13, 11, 12, 12, 0], 'ner_tags': [0, 0, 0, 5, 6, 0, 5, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, 8, 0, 0, 0, 0, 0, 1, 2, 0]}
Tokens: ['1886', '-', 'At', 'Skeleton', 'Canyon', 'in', 'Arizona', ',', 'Geronimo', ',', 'Apache', 'chief', 'and', 'leader', 'of', 'the', 'last', 'great', 'Red', 'Indian', 'rebellion', 'finally', 'surrendered', 'to', 'General', 'Nelson', 'Miles', '.']
Labels: [0, 0, 0, 5, 6, 0, 5, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, 8, 0, 0, 0, 0, 0, 1, 2, 0]


In [3]:
print("Training set size:", len(train_data))
print("Validation set size:", len(validation_data))
print("Test set size:", len(test_data))

Training set size: 14041
Validation set size: 3250
Test set size: 3453


{'O': 0, 'B-PER': 1, 'I-PER': 2, 'B-ORG': 3, 'I-ORG': 4, 'B-LOC': 5, 'I-LOC': 6, 'B-MISC': 7, 'I-MISC': 8}

In [4]:
import csv
import os

'''
Store the response in a list of lists where the first element is the token, the second element 
is the predicted label and the third is the true label
'''
def parse_response(tokens : list, response_labels : list, true_labels : list) -> list: 
    response_labels = response_labels.split(":")
    response_labels = response_labels[1].strip('\n').split(',')
    assert (len(response_labels) == len(tokens)), "Length of tokens and NER tags do not match"

    temp = []
    for i in range(len(tokens)):
        pred_label = int(response_labels[i].strip())
        
        assert (pred_label >= 0 and pred_label <= 8), "Predicted label is out of range"
        temp.append([tokens[i], pred_label, true_labels[i]])

    return temp

def save_to_csv(tokens : list, pred_labels : list, true_labels : list, filename : str) -> None:
    file_exists = os.path.isfile(filename)
    # Write header only if the file didn't exist before
    if not file_exists:
        with open(filename, 'a', newline='') as csvfile:
            header = ['token', 'pred', 'true']
            writer = csv.writer(csvfile)
            writer.writerow(header)
    data = [[tokens[i], pred_labels[i], true_labels[i]] for i in range(len(tokens))]
    # Open the file in append mode and write data to analysis purpose
    with open(filename, 'a', newline='') as csvfile:
        writer = csv.writer(csvfile)
        writer.writerows(data)

In [5]:
# Take predicted labels and for each token save the label in a list to be used for voting
def store_predicted_labels(pred_labels : list, votes : list) -> None:
    for i in range(len(pred_labels)):
        votes[i].append(pred_labels[i])

In [None]:
#from pydantic import BaseModel
from google import genai

for j in range(5):
    tokens = train_data[j]['tokens']
    true_labels = train_data[j]['ner_tags']
    votes = [[] for _ in range(len(tokens))]
    model = "gemma-3-27b-it"
    for i in range(5):
        client = genai.Client(api_key=os.getenv("GEMINI_API_KEY"))
        # Send request to Gemma
        response = client.models.generate_content(
            model=model,
            contents=f"""Given the following NER tags: {{'O': 0, 'B-PER': 1, 'I-PER': 2, 'B-ORG': 3, 'I-ORG': 4, 'B-LOC': 5, 'I-LOC': 6, 'B-MISC': 7, 'I-MISC': 8}}, determine the Named Entity Recognition (NER) tags for the following sentence.
            The sentence is: '{tokens}'
            This sentence contains exactly {len(tokens)} tokens.

            Print only the number associated with the NER tag for each of the {len(tokens)} tokens, using the tag-to-number mapping provided above.
            Your answer MUST follow the format: ner_tags: 0, 1, 2, 0, 0, 0
            The number of output NER tags MUST be exactly {len(tokens)}, one for each token in the order they appear in the sentence.
            Do not include any other text or explanations.
            """
        )

        # Parse the response
        data = parse_response(tokens, response.text, true_labels)
        # Store predicted labels for voting
        store_predicted_labels([data[i][1] for i in range(len(data))], votes)
    # Extract for each token the most voted label
    votes = [max(set(vote), key=vote.count) for vote in votes]
    save_to_csv(tokens, votes, true_labels, "data/data_test.csv")

In [None]:
import csv
import os

def save_basic_decomposed_qa_to_csv(tokens: list, pred_labels: dict, true_labels: dict, filename: str) -> None:
    """
    Save the token, predicted labels, and true labels to a CSV file for Basic Decomposed-QA.

    Args:
        tokens (list): List of tokens in the sentence.
        pred_labels (dict): Dictionary of predicted labels in the format {'PER': 'entity1, entity2', ...}.
        true_labels (dict): Dictionary of true labels in the format {'PER': 'entity1, entity2', ...}.
        filename (str): Path to the CSV file (relative to the 'data' folder).
    """
    # Ensure the 'data' directory exists
    os.makedirs("data", exist_ok=True)
    
    # Full path to the file in the 'data' directory
    filepath = os.path.join("data", filename)
    
    file_exists = os.path.isfile(filepath)
    
    # Write header only if the file didn't exist before
    if not file_exists:
        with open(filepath, 'a', newline='') as csvfile:
            header = ['token', 'pred', 'true']
            writer = csv.writer(csvfile)
            writer.writerow(header)
    
    # Prepare data for each token
    data = []
    for token in tokens:
        predicted_label = None
        true_label = None
        
        # Find the predicted label for the token
        for label, entities in pred_labels.items():
            if token in entities.split(', '):
                predicted_label = label
                break
        
        # Find the true label for the token
        for label, entities in true_labels.items():
            if token in entities.split(', '):
                true_label = label
                break
        
        # Append the token, predicted label, and true label
        data.append([token, predicted_label if predicted_label else 'O', true_label if true_label else 'O'])
    
    # Write data to the CSV file
    with open(filepath, 'a', newline='') as csvfile:
        writer = csv.writer(csvfile)
        writer.writerows(data)

In [None]:
# Mapping of label numbers to readable tags
label_mapping = {
    0: 'O',
    1: 'B-PER',
    2: 'I-PER',
    3: 'B-ORG',
    4: 'I-ORG',
    5: 'B-LOC',
    6: 'I-LOC',
    7: 'B-MISC',
    8: 'I-MISC'
}

# List of NER labels and their descriptions (without distinction between B- and I-)
ner_labels = {
    'PER': 'Person',
    'ORG': 'Organization',
    'LOC': 'Location',
    'MISC': 'Miscellaneous'
}

# Iterate over the first 5 examples of the dataset
for j in range(5):
    tokens = train_data[j]['tokens']
    true_labels = train_data[j]['ner_tags']
    votes = [[] for _ in range(len(tokens))]  # Initialize votes for majority voting

    # Repeat the process 5 times for majority voting
    for iteration in range(5):
        # Re-initialize the client for each sentence
        client = genai.Client(api_key=os.getenv("GEMINI_API_KEY"))

        # Build a dictionary for the true labels
        true_label_dict = {label: [] for label in ner_labels.keys()}
        for token, true_label in zip(tokens, true_labels):
            readable_label = label_mapping[true_label]
            if readable_label != 'O':  # Ignore tokens without labels
                main_label = readable_label.split('-')[-1]  # Get PER, ORG, LOC, MISC
                true_label_dict[main_label].append(token)

        # Convert lists to comma-separated strings
        for key in true_label_dict:
            true_label_dict[key] = ', '.join(true_label_dict[key]) if true_label_dict[key] else 'None'

        context = f"The sentence is: '{' '.join(tokens)}'\n"  # Initial context
        results = []  # To save the model's responses

        # Iterate over each NER label
        for label, description in ner_labels.items():
            # Build the question for the current label
            question = f"Question: What are the named entities labeled as '{description}' in the text? You only have to answer with the entity name, if there are multiple entities then separate them with a comma. If there are no entities, answer with 'None'. Finally, remember that a token can only be classified once.\n"

            # Send the request to the model
            response = client.models.generate_content(
                model="gemma-3-27b-it",
                contents=f"{context}\n{question}\n"
            )

            # Save the response and update the context
            answer = response.text.strip()
            results.append({label: answer})
            context += f"\n{question}\n{answer}\n"

        # Parse the response and update votes
        for token_idx, token in enumerate(tokens):
            predicted_labels = []
            for result in results:
                label = list(result.keys())[0]  # Get the label (e.g., 'ORG')
                predicted_entities = result[label].split(', ') if result[label] != 'None' else []
                if token in predicted_entities:
                    predicted_labels.append(label)
            # Add the predicted label for this token to the votes
            votes[token_idx].append(predicted_labels[0] if predicted_labels else 'O')

    # Perform majority voting for each token
    final_labels = [max(set(vote), key=vote.count) for vote in votes]

    # Build the final predicted labels in the desired format
    final_label_dict = {label: [] for label in ner_labels.keys()}
    for token, final_label in zip(tokens, final_labels):
        if final_label != 'O':  # Ignore tokens without labels
            final_label_dict[final_label].append(token)

    # Convert lists to comma-separated strings
    for key in final_label_dict:
        final_label_dict[key] = ', '.join(final_label_dict[key]) if final_label_dict[key] else 'None'

    # Print the results for the current sample
    print(f"Results for sample {j + 1}:")
    print(f"Tokens: {tokens}")
    print("Predicted Labels:")
    print(final_label_dict)
    print("True Labels:")
    print(true_label_dict)
    print("\n")

    # Save the results to a CSV file
    save_basic_decomposed_qa_to_csv(tokens, final_label_dict, true_label_dict, "basic_decomposed_qa_results.csv")