In [1]:
import os
import docx
from transformers import AutoTokenizer, AutoModelForSequenceClassification, AutoConfig
from scipy.special import softmax
import numpy as np
import pandas as pd
import torch
import re

In [2]:
# Load model and tokenizer
MODEL = "cardiffnlp/twitter-roberta-base-sentiment-latest"
tokenizer = AutoTokenizer.from_pretrained(MODEL)
model = AutoModelForSequenceClassification.from_pretrained(MODEL)
config = AutoConfig.from_pretrained(MODEL)

Some weights of the model checkpoint at cardiffnlp/twitter-roberta-base-sentiment-latest were not used when initializing RobertaForSequenceClassification: ['roberta.pooler.dense.bias', 'roberta.pooler.dense.weight']
- This IS expected if you are initializing RobertaForSequenceClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing RobertaForSequenceClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


In [3]:
# Directory containing text files
directory_path = r"C:\\Users\\deeda\\Desktop\\Brandeis\\Second Year\\NLP_anlaysis\\transcripts"

#save csv in this folder
save_file_path = r"C:\\Users\\deeda\\Desktop\\Brandeis\\Second Year\\NLP_anlaysis\\Cardiff_chunk_results"


In [4]:
# Preprocessing function
def preprocess(text):
    text = re.sub(r'@\w+', '@user', text)
    text = re.sub(r'http\S+', 'http', text)
    return text

# Load .docx file
def load_docx_text(file_path):
    doc = docx.Document(file_path)
    return "\n".join([para.text for para in doc.paragraphs if para.text.strip()])

In [5]:
# Chunking function
def chunk_text(text, max_tokens=100, stride=20):
    tokens = tokenizer(preprocess(text), return_tensors='pt', truncation=False)['input_ids'][0]
    chunks = []
    for i in range(0, len(tokens), max_tokens - stride):
        chunk_tokens = tokens[i:i + max_tokens]
        chunk_text = tokenizer.decode(chunk_tokens, skip_special_tokens=True)
        chunks.append(chunk_text)
        if i + max_tokens >= len(tokens):
            break
    return chunks

In [6]:
# Sentiment prediction function
def predict_sentiment(text_chunk):
    inputs = tokenizer(preprocess(text_chunk), return_tensors="pt", truncation=True, padding=True, max_length=512)
    with torch.no_grad():
        outputs = model(**inputs)
    scores = softmax(outputs.logits[0].numpy())
    return scores


In [7]:
# Aggregate results
def analyze_transcript(file_path):
    try:
        text = load_docx_text(file_path)
        if not text.strip():
            return None

        chunks = chunk_text(text)
        if not chunks:
            return None

        chunk_results = []
        sentiment_scores = []

        for chunk in chunks:
            scores = predict_sentiment(chunk)
            sentiment_scores.append(scores)
            chunk_results.append({
                "chunk_text": chunk,
                "predicted_label": config.id2label[int(scores.argmax())],
                "score_negative": scores[0],
                "score_neutral": scores[1],
                "score_positive": scores[2],
            })

        avg_scores = np.mean(sentiment_scores, axis=0)
        overall_label = config.id2label[int(avg_scores.argmax())]
        return {
            "filename": os.path.basename(file_path),
            "overall_sentiment": overall_label,
            "score_negative": avg_scores[0],
            "score_neutral": avg_scores[1],
            "score_positive": avg_scores[2]
        }, chunk_results

    except Exception as e:
        print(f"[Error] {file_path}: {e}")
        return None



In [8]:
# Process all files in folder
def analyze_folder(directory_path):
    summary_results = []
    chunk_data = []

    for filename in os.listdir(directory_path):
        if filename.endswith(".docx"):
            result = analyze_transcript(os.path.join(directory_path, filename))
            if result:
                summary, chunks = result
                summary_results.append(summary)
                for c in chunks:
                    c["filename"] = filename
                    chunk_data.append(c)

    return summary_results, chunk_data



In [9]:
# Run analysis
#folder_path = "path/to/your/folder"
summary_results, chunk_data = analyze_folder(directory_path)


In [10]:
# Print summary
#for filename, label, scores in summary_results:
    #print(f"\nFile: {filename}")
    #print(f"Overall Sentiment: {label}")
    #for i, label_name in config.id2label.items():
        #print(f"  {label_name}: {scores[i]:.4f}")

In [11]:

# Convert results to a DataFrame
pd.DataFrame(summary_results).to_csv(os.path.join(save_file_path, "summary_sentiment_chunks.csv"), index=False)
pd.DataFrame(chunk_data).to_csv(os.path.join(save_file_path, "chunk_sentiment.csv"), index=False)


Average sentiment scores across movies

In [12]:
# Aggregate and Average Sentiment Scores across movies
from collections import defaultdict

def extract_participant_id(filename):
    match = re.match(r"(.*?)_(box|piper|umbrella)\.docx", filename)
    return match.group(1) if match else None

def summarize_participants(summary_results):
    participant_scores = defaultdict(list)
    for entry in summary_results:
        pid = extract_participant_id(entry["filename"])
        if pid:
            participant_scores[pid].append(entry)

    participant_summary = []
    for pid, entries in participant_scores.items():
        n = len(entries)
        total_neg = sum(e["score_negative"] for e in entries)
        total_neu = sum(e["score_neutral"] for e in entries)
        total_pos = sum(e["score_positive"] for e in entries)
        total = total_neg + total_neu + total_pos
        participant_summary.append({
            "participant_id": pid,
            "avg_score_negative": total_neg / n,
            "avg_score_neutral": total_neu / n,
            "avg_score_positive": total_pos / n,
            #"percent_negative": total_neg / total,
            #"percent_neutral": total_neu / total,
            #"percent_positive": total_pos / total,
            "num_movies": n
        })
    return participant_summary


In [13]:
# Save participant-level averages and percentages
participant_summary = summarize_participants(summary_results)
pd.DataFrame(participant_summary).to_csv(os.path.join(save_file_path, "participant_avg_sentiment_chunks_just averages.csv"), index=False)

print("Saved all sentiment analysis results based on chunk-level.")

Saved all sentiment analysis results based on chunk-level.


Sentiment scores per movie

In [14]:

def extract_movie_name(filename):
    for movie in ["box", "piper", "umbrella"]:
        if filename.endswith(f"_{movie}.docx"):
            return movie
    return "other"

# Group entries by movie
movie_groups = defaultdict(list)

for entry in summary_results:
    filename = entry["filename"]
    participant_id = extract_participant_id(filename)
    movie = extract_movie_name(filename)
    
    if participant_id and movie in ["box", "piper", "umbrella"]:
        movie_groups[movie].append({
            "participant_id": participant_id,
            "filename": filename,
            "overall_sentiment": entry["overall_sentiment"],
            "score_negative": entry["score_negative"],
            "score_neutral": entry["score_neutral"],
            "score_positive": entry["score_positive"]
        })

# Save one CSV per movie
for movie, records in movie_groups.items():
    df = pd.DataFrame(records)
    csv_path = os.path.join(save_file_path, f"{movie}_participant_sentiment.csv")
    df.to_csv(csv_path, index=False)
    print(f"Saved: {csv_path}")


Saved: C:\\Users\\deeda\\Desktop\\Brandeis\\Second Year\\NLP_anlaysis\\Cardiff_chunk_results\box_participant_sentiment.csv
Saved: C:\\Users\\deeda\\Desktop\\Brandeis\\Second Year\\NLP_anlaysis\\Cardiff_chunk_results\piper_participant_sentiment.csv
Saved: C:\\Users\\deeda\\Desktop\\Brandeis\\Second Year\\NLP_anlaysis\\Cardiff_chunk_results\umbrella_participant_sentiment.csv


Averages per Movie

In [15]:
# Group by movie type
from collections import defaultdict

movie_scores = defaultdict(list)

for entry in summary_results:
    if entry["filename"].endswith("_box.docx"):
        movie = "box"
    elif entry["filename"].endswith("_piper.docx"):
        movie = "piper"
    elif entry["filename"].endswith("_umbrella.docx"):
        movie = "umbrella"
    else:
        continue  # skip unmatched
    movie_scores[movie].append(entry)

# Compute average sentiment scores per movie
movie_summary = []
for movie, entries in movie_scores.items():
    n = len(entries)
    total_neg = sum(e["score_negative"] for e in entries)
    total_neu = sum(e["score_neutral"] for e in entries)
    total_pos = sum(e["score_positive"] for e in entries)
    total = total_neg + total_neu + total_pos

    movie_summary.append({
        "movie": movie,
        "avg_score_negative": total_neg / n,
        "avg_score_neutral": total_neu / n,
        "avg_score_positive": total_pos / n,
        "percent_negative": total_neg / total,
        "percent_neutral": total_neu / total,
        "percent_positive": total_pos / total,
        "num_participants": n
    })

# Save to CSV
df_movie = pd.DataFrame(movie_summary)
df_movie.to_csv(os.path.join(save_file_path, "movie_avg_sentiment_summary.csv"), index=False)

print("Saved average sentiment summary for each movie.")


Saved average sentiment summary for each movie.


Particpants who have all the 3movies

In [16]:
# Group scores by participant
participant_movies = defaultdict(dict)

for entry in summary_results:
    pid = extract_participant_id(entry["filename"])
    movie = extract_movie_name(entry["filename"])
    if pid and movie:
        participant_movies[pid][movie] = entry


participant_summary = []
for pid, movies in participant_movies.items():
    if all(movie in movies for movie in ["box", "piper", "umbrella"]):
        scores = [movies[movie] for movie in ["box", "piper", "umbrella"]]
        avg_neg = sum(e["score_negative"] for e in scores) / 3
        avg_neu = sum(e["score_neutral"] for e in scores) / 3
        avg_pos = sum(e["score_positive"] for e in scores) / 3
        participant_summary.append({
            "participant_id": pid,
            "avg_score_negative": avg_neg,
            "avg_score_neutral": avg_neu,
            "avg_score_positive": avg_pos
        })


In [17]:
df_complete = pd.DataFrame(participant_summary)
df_complete.to_csv(os.path.join(save_file_path, "participants_all3_avg_sentiment.csv"), index=False)

print("Saved participants who watched all 3 movies with averaged sentiment.")


Saved participants who watched all 3 movies with averaged sentiment.
