In [2]:
from transformers import pipeline
from transformers.pipelines.pt_utils import KeyDataset
from transformers import AutoTokenizer, AutoModelForCausalLM
import torch
from datasets import load_dataset
import pandas as pd
import os

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [4]:
# model_path = "./pretrained_llms/Llama-3.3-70B-Instruct"
# model_path = "./pretrained_llms/Llama-3.1-8B"
model_name = "Llama-3.1-8B-Instruct"
model_path = os.path.join("./pretrained_llms", model_name)
data_path = "./data"
data_name = "mteb/tweet_sentiment_extraction"
cache_dir = "./cache"

dataset = load_dataset(data_name, cache_dir=data_path, split='train')
# dataset = load_dataset(data_name, cache_dir=data_path, split='train')
# dataset = load_dataset(data_name, cache_dir=data_path, split='train', remove_columns=["id"])

./pretrained_llms/Llama-3.1-8B-Instruct


In [12]:
def analyze_sentiment_zero_shot(dataset, model, tokenizer, device, verbose=False, num_samples=None):
    """
    Analyze sentiment using zero-shot learning approach.
    
    Args:
        dataset: Dataset containing texts to analyze
        model: The language model
        tokenizer: The tokenizer
        device: Device to run the model on
        verbose: If True, prints each text and its prediction
        num_samples: Optional number of samples to process (None for all)
    
    Returns:
        list: List of dictionaries containing text and predictions
    """
    model.eval()
    model = model.to(device)  
    
    prompt_template = """[INST] Analyze the sentiment of the following text. Respond with exactly one word: either 'positive', 'negative', or 'neutral'.

Text: "{}"

Sentiment: [/INST]"""

    # Handle num_samples
    texts = dataset['text']
    if num_samples is not None:
        texts = texts[:num_samples]
    
    results = []
    total = len(texts)
    
    for i, text in enumerate(texts):
        prompt = prompt_template.format(text)
        inputs = tokenizer(prompt, return_tensors="pt", padding=True, truncation=True).to(device)
        
        with torch.no_grad():
            outputs = model.generate(
                **inputs,
                max_new_tokens=20,
                num_return_sequences=1,
                temperature=0.1,  # Lower temperature for more focused responses
                pad_token_id=tokenizer.pad_token_id,
                eos_token_id=tokenizer.eos_token_id
            )
            
        response = tokenizer.decode(outputs[0], skip_special_tokens=True)
        # print("-"*20)
        # print(response)
        # print("-"*20)
       
        sentiment = response[len(prompt):].strip().split()[0].lower()
        
        # Store result
        results.append({
            'text': text,
            'predicted_sentiment': sentiment
        })
        
        # Verbose output
        if verbose:
            print(f"\nText [{i+1}/{total}]: {text}")
            print(f"Predicted sentiment: {sentiment}")
            
        # Print progress every 10% if not verbose
        elif (i + 1) % max(1, total // 10) == 0:
            print(f"Progress: {(i + 1) / total:.1%}")
    
    # Final progress
    if not verbose:
        print("Analysis complete!")
    
    return results

In [4]:
def compare_with_baseline(dataset):
    # Load a pre-trained sentiment model
    baseline_classifier = pipeline(
        "sentiment-analysis",
        model="distilbert-base-uncased-finetuned-sst-2-english",
        device=device
    )
    
    texts = dataset['text']
    
    for text in texts:
        result = baseline_classifier(text)[0]
        print(f"\nText: {text}")
        print(f"Baseline model prediction: {result}")

In [18]:
def evaluate_predictions(dataset, results):
    id2label = {0: "negative", 1: "neutral", 2: "positive"}
    label2id = {"negative": 0, "neutral": 1, "positive": 2}
    
    correct = 0
    total = len(results)
    confusion_matrix = {
        'negative': {'negative': 0, 'neutral': 0, 'positive': 0},
        'neutral': {'negative': 0, 'neutral': 0, 'positive': 0},
        'positive': {'negative': 0, 'neutral': 0, 'positive': 0}
    }
    
    for i, result in enumerate(results):
        true_label = id2label[dataset[i]['label']]
        # predicted = result['predicted_sentiment'].lower().strip()
        predicted = result['predicted_sentiment']        
        
        # Handle variations in predictions
        if 'positive' in predicted:
            predicted = 'positive'
        elif 'negative' in predicted:
            predicted = 'negative'
        elif 'neutral' in predicted:
            predicted = 'neutral'
        else:
            print(f"Warning: Unexpected prediction format: {predicted}")
            continue
            
        is_correct = true_label == predicted
        if is_correct:
            correct += 1
            
        # Update confusion matrix
        confusion_matrix[true_label][predicted] += 1
        
        print(f"\nText: {result['text']}")
        print(f"True label: {true_label}")
        print(f"Predicted: {predicted}")
        print(f"Correct: {is_correct}")
    
    # Calculate metrics
    accuracy = correct / total if total > 0 else 0
    
    # Print detailed results
    print("\n=== Evaluation Results ===")
    print(f"Total samples: {total}")
    print(f"Correct predictions: {correct}")
    print(f"Accuracy: {accuracy:.2%}")
    
    # Print confusion matrix
    print("\n=== Confusion Matrix ===")
    print("True\Pred\t\tNegative\tNeutral\t\tPositive")
    for true_label in ['negative', 'neutral', 'positive']:
        row = confusion_matrix[true_label]
        print(f"{true_label.title()}\t\t{row['negative']}\t\t{row['neutral']}\t\t{row['positive']}")
        
    # Calculate per-class metrics
    print("\n=== Per-Class Metrics ===")
    for label in ['negative', 'neutral', 'positive']:
        true_pos = confusion_matrix[label][label]
        false_pos = sum(conf[label] for l, conf in confusion_matrix.items() if l != label)
        false_neg = sum(confusion_matrix[label].values()) - true_pos
        
        precision = true_pos / (true_pos + false_pos) if (true_pos + false_pos) > 0 else 0
        recall = true_pos / (true_pos + false_neg) if (true_pos + false_neg) > 0 else 0
        f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
        
        print(f"\n{label.title()} class:")
        print(f"Precision: {precision:.2%}")
        print(f"Recall: {recall:.2%}")
        print(f"F1-score: {f1:.2%}")
    
    return {
        'accuracy': accuracy,
        'confusion_matrix': confusion_matrix,
        'total_samples': total,
        'correct_predictions': correct
    }

In [22]:
def find_inconsistencies(dataset, results, id2label):
    inconsistent_data = []
    
    for i, result in enumerate(results):
        true_label = id2label[dataset[i]['label']]
        predicted = result['predicted_sentiment'].lower().strip()
        
        # Handle variations in predictions
        if 'positive' in predicted:
            predicted = 'positive'
        elif 'negative' in predicted:
            predicted = 'negative'
        elif 'neutral' in predicted:
            predicted = 'neutral'
        else:
            print(f"Warning: Unexpected prediction format: {predicted}")
            continue
            
        # If prediction doesn't match ground truth, add to list
        if true_label != predicted:
            inconsistent_data.append({
                'Ground Truth': true_label,
                'Predicted': predicted,
                'Text': dataset[i]['text']
            })
    
    # Create DataFrame
    df_inconsistencies = pd.DataFrame(inconsistent_data)
    
    # If there are inconsistencies, display summary
    if len(df_inconsistencies) > 0:
        print(f"\nFound {len(df_inconsistencies)} inconsistencies out of {len(dataset)} samples")
        print(f"Inconsistency rate: {(len(df_inconsistencies)/len(dataset)):.2%}")
    else:
        print("\nNo inconsistencies found!")
    
    return df_inconsistencies

id2label = {0: "negative", 1: "neutral", 2: "positive"}

df = find_inconsistencies(dataset, results, id2label)
# Display full DataFrame (not truncated)
pd.set_option('display.max_colwidth', None)
print("\nInconsistent predictions:")
print(df)


Found 38 inconsistencies out of 27481 samples
Inconsistency rate: 0.14%

Inconsistent predictions:
   Ground Truth Predicted  \
0       neutral  positive   
1       neutral  positive   
2       neutral  negative   
3      negative   neutral   
4      negative   neutral   
5       neutral  negative   
6       neutral  positive   
7       neutral  negative   
8      negative   neutral   
9      negative   neutral   
10     negative  positive   
11      neutral  negative   
12     negative   neutral   
13      neutral  positive   
14     negative   neutral   
15      neutral  negative   
16      neutral  negative   
17      neutral  positive   
18      neutral  negative   
19      neutral  negative   
20     negative   neutral   
21      neutral  positive   
22     negative   neutral   
23      neutral  negative   
24     positive   neutral   
25      neutral  negative   
26      neutral  positive   
27      neutral  positive   
28      neutral  positive   
29      neutral  negative   
3

In [6]:
model = AutoModelForCausalLM.from_pretrained(
    model_path, 
    torch_dtype="auto",
    cache_dir=cache_dir
)
tokenizer = AutoTokenizer.from_pretrained(
    model_path,
    add_eos_token=True,
    cache_dir=cache_dir
)

if tokenizer.pad_token_id is None:
    print("No pad token found, setting pad token to eos token")
    tokenizer.pad_token = tokenizer.eos_token
    tokenizer.pad_token_id = tokenizer.eos_token_id
    tokenizer.padding_side = "right"
    model.resize_token_embeddings(len(tokenizer))
    model.config.pad_token_id = tokenizer.pad_token_id


Loading checkpoint shards:   0%|          | 0/4 [00:00<?, ?it/s]

No pad token found, setting pad token to eos token


In [16]:
# print("=== Testing Raw Outputs ===")
# test_raw_outputs(dataset, model, tokenizer, device)

print("\n=== Testing with Instruction Prompts ===")
# results = analyze_sentiment_zero_shot(dataset, model, tokenizer, device)
results = analyze_sentiment_zero_shot(dataset, model, tokenizer, device, verbose=False, num_samples=100)

# print("\n=== Comparing with Baseline Model ===")
# compare_with_baseline(dataset)


=== Testing with Instruction Prompts ===
Progress: 10.0%
Progress: 20.0%
Progress: 30.0%
Progress: 40.0%
Progress: 50.0%
Progress: 60.0%
Progress: 70.0%
Progress: 80.0%
Progress: 90.0%
Progress: 100.0%
Analysis complete!


In [19]:
metrics = evaluate_predictions(dataset, results)


Text:  I`d have responded, if I were going
True label: neutral
Predicted: neutral
Correct: True

Text:  Sooo SAD I will miss you here in San Diego!!!
True label: negative
Predicted: negative
Correct: True

Text: my boss is bullying me...
True label: negative
Predicted: negative
Correct: True

Text:  what interview! leave me alone
True label: negative
Predicted: negative
Correct: True

Text:  Sons of ****, why couldn`t they put them on the releases we already bought
True label: negative
Predicted: negative
Correct: True

Text: http://www.dothebouncy.com/smf - some shameless plugging for the best Rangers forum on earth
True label: neutral
Predicted: positive
Correct: False

Text: 2am feedings for the baby are fun when he is all smiles and coos
True label: positive
Predicted: positive
Correct: True

Text: Soooo high
True label: neutral
Predicted: positive
Correct: False

Text:  Both of you
True label: neutral
Predicted: neutral
Correct: True

Text:  Journey!? Wow... u just became cooler.

In [25]:
print(model.config.eos_token_id)

128001


In [None]:
dataset[0]

In [None]:
model = AutoModelForSequenceClassification.from_pretrained(model_path, num_labels=3, torch_dtype="auto")
tokenizer = AutoTokenizer.from_pretrained(model_path,
                                          add_eos_token=True,
                                          cache_dir=cache_dir)
if tokenizer.pad_token_id is None:
    print("No pad token found, setting pad token to eos token")
    tokenizer.pad_token = tokenizer.eos_token
    tokenizer.pad_token_id = tokenizer.eos_token_id
    tokenizer.padding_side = "right"
    model.resize_token_embeddings(len(tokenizer))
    model.config.pad_token_id = tokenizer.pad_token_id
    
classifier = pipeline(task="sentiment-analysis", model=model, tokenizer=tokenizer, device=device, padding=True, truncation=True, max_length=512)

In [None]:
for out in classifier(KeyDataset(dataset, "text"), batch_size=8):
    print(out)

In [None]:
tokens = classifier.tokenizer(["Example text", "I am a boy"], padding=True, truncation=True)
print(tokens)
print(classifier.tokenizer.pad_token)