# A model for predicting the probability of a voter's opinion change based on tweeter posts

We will predict based on a dataset from kaggle about Twitter posts during the 2020 US election. 
Goal: By analyzing tweets, identify those users who are most likely to change their minds, cluster them to recommend what kind of messages will have a greater effect on them.

In [1]:
# System and utilities
import sys
import re
from datetime import datetime
from tqdm import tqdm

# Data manipulation
import numpy as np
import pandas as pd

# Visualization
import matplotlib.pyplot as plt
import seaborn as sns

# Language detection
from langdetect import detect, DetectorFactory
from langdetect.lang_detect_exception import LangDetectException

# PyTorch
import torch
from torch.utils.data import DataLoader, Dataset

# Transformers
from transformers import pipeline, AutoTokenizer, AutoModelForSequenceClassification

# Sklearn
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.metrics import (
    accuracy_score,
    precision_score,
    recall_score,
    f1_score,
    roc_auc_score,
    classification_report
)
from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import TruncatedSVD
from sklearn.cluster import KMeans
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import VotingClassifier

# XGBoost
from xgboost import XGBClassifier

# Other
from scipy.stats import entropy
from scipy.sparse import hstack


  from .autonotebook import tqdm as notebook_tqdm


Since our data consists of two datasets: one from the #Trump dataset and one from the #Biden dataset, to simplify the model and to select potentially only those users who are in America, we determine in which language the tweet is written and combine these two datasets 

In [None]:
def safe_detect(text):
    try:
        if not isinstance(text, str) or len(text.strip()) < 5:
            return "unknown"
        return detect(text)
    except LangDetectException:
        return "unknown"

df_trump = pd.read_csv('hashtag_donaldtrump.csv', lineterminator='\n')
df_biden= pd.read_csv('hashtag_joebiden.csv', lineterminator='\n')
tqdm.pandas(desc="Detecting language")
# Trump
df_trump['lang'] = df_trump['tweet'].progress_apply(safe_detect)
df_trump_en = df_trump[df_trump['lang'] == 'en'].reset_index(drop=True)
df_trump_en.to_csv('hashtag_donaldtrump_en.csv', index=False)
df_trump = pd.read_csv('hashtag_donaldtrump_en.csv')
#Biden 
df_biden['lang'] = df_biden['tweet'].progress_apply(safe_detect)
df_biden_en = df_biden[df_biden['lang'] == 'en'].reset_index(drop=True)
df_biden_en.to_csv('hashtag_donaldtrump_en.csv', index=False)
df_biden = pd.read_csv('hashtag_donaldtrump_en.csv')

#Add a labeled column with candidate name
df_trump['candidate'] = 'Trump'  
df_biden['candidate'] = 'Biden'  

combined_df = pd.concat([df_trump, df_biden], axis=0)




# Sentiment Analysis of Tweets Using RoBERTa
This block performs automatic sentiment annotation of tweets using the pre-trained model *cardiffnlp/twitter-roberta-base-sentiment.* The choice of this model is motivated by several factors:

1. **Domain-specific training:** The model has been fine-tuned specifically on English-language tweets, making it well-suited for handling the informal language, abbreviations, hashtags, and mentions typical of Twitter.

2. **Modern architecture:** Based on the RoBERTa transformer, this model improves classification accuracy compared to earlier models such as BERT.

3. **Three-class sentiment output:** The model classifies each tweet into one of three categories — negative (0), neutral (1), or positive (2) — which aligns well with our goal of detecting political leanings and emotional intensity.

This sentiment labeling serves as a crucial preprocessing step in identifying users who may be emotionally volatile or politically ambivalent, which helps inform the target audience detection system.

In [None]:
model_name = "cardiffnlp/twitter-roberta-base-sentiment"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSequenceClassification.from_pretrained(model_name)
model.eval()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

df = combined_df
tweets = df['tweet'].dropna().tolist()

class TweetDataset(Dataset):    
    def __init__(self, texts, tokenizer, max_len=128):
        self.texts = texts
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        return self.tokenizer(
            self.texts[idx],
            truncation=True,
            padding='max_length',
            max_length=self.max_len,
            return_tensors='pt'
        )
=
batch_size = 128
dataset = TweetDataset(tweets, tokenizer)
dataloader = DataLoader(dataset, batch_size=batch_size)

labels = []
scores = []

with torch.no_grad():
    for batch in tqdm(dataloader):
        input_ids = batch['input_ids'].squeeze(1).to(device)
        attention_mask = batch['attention_mask'].squeeze(1).to(device)
        
        outputs = model(input_ids=input_ids, attention_mask=attention_mask)
        probs = torch.nn.functional.softmax(outputs.logits, dim=-1)
        
        label = torch.argmax(probs, dim=1).cpu().numpy()
        score = torch.max(probs, dim=1).values.cpu().numpy()
        
        labels.extend(label)
        scores.extend(score)

df = df.loc[:len(labels)-1] 
df['roberta_label'] = labels
df['roberta_score'] = scores

# 
df.to_csv("annotated_tweets_all.csv", index=False)


After running the sentiment analysis pipeline using the cardiffnlp/twitter-roberta-base-sentiment model, each tweet in the dataset has been enriched with two new attributes:

**roberta_label:** the predicted sentiment class for each tweet:

0 = Negative

1 = Neutral

2 = Positive

**roberta_score:** the confidence score of the model, representing the probability assigned to the predicted sentiment class (ranging from 0 to 1).


In [2]:
# Open the files received after processing through RobErta. 
robert = pd.read_csv('annotated_tweets_all.csv')
filtered_users = robert.groupby('user_id').filter(lambda x: len(x) >= 2)
razmetka = pd.read_excel('Разметка.xlsx')

In [3]:

def classify_position(label, candidate):
    if label == 0 and candidate == 'Biden':
        return 'Against Biden'
    elif label == 2 and candidate == 'Trump':
        return 'Against Biden'
    elif label == 2 and candidate == 'Biden':
        return 'Pro Biden'
    elif label == 0 and candidate == 'Trump':
        return 'Against Trump'
    elif label == 2 and candidate == 'Trump':
        return 'Pro Trump'
    elif label == 0 and candidate == 'Biden':
        return 'Pro Trump'
    elif label == 1:
        return 'Neutral'
    else:
        return 'Neutral'
        

def get_seiz_state(label, score):
    if label == 1 and score < 0.5:
        return 'S'
    elif label == 1 and 0.5 <= score < 0.8:
        return 'E'
    elif label in [0, 2] and score > 0.8:
        return 'I'
    elif label in [0, 2] and score < 0.6:
        return 'Z'
    else:
        return 'E'

def count_position_changes(positions):
    return sum(1 for i in range(1, len(positions)) if positions[i] != positions[i - 1])

def avg_time_between_changes(times, positions):
    changes = [i for i in range(1, len(positions)) if positions[i] != positions[i - 1]]
    if len(changes) < 2:
        return np.nan
    timestamps = [datetime.strptime(str(times[i]), "%Y-%m-%d %H:%M:%S") for i in changes]
    deltas = [(timestamps[i] - timestamps[i - 1]).total_seconds() for i in range(1, len(timestamps))]
    return np.mean(deltas) if deltas else np.nan

def seiz_state_variability(states):
    return len(set(states)) / len(states) if states else 0

def roberta_low_conf_ratio(scores):
    return sum(score < 0.5 for score in scores) / len(scores)

def stance_entropy(positions):
    dist = pd.Series(positions).value_counts(normalize=True)
    return entropy(dist)

def stance_reversals(positions):
    return sum(positions[i] == positions[i-2] and positions[i] != positions[i-1]
               for i in range(2, len(positions)))


def advanced_user_features(user_df):
    positions = list(user_df['position'])
    times = list(user_df['created_at'])
    seiz_states = list(user_df['seiz_state'])
    scores = list(user_df['roberta_score'])

    n_tweets = len(positions)
    n_changes = count_position_changes(positions)
    change_freq = n_changes / (n_tweets - 1) if n_tweets > 1 else 0
    time_change_avg = avg_time_between_changes(times, positions)
    time_weight = 1 / (1 + np.log1p(time_change_avg)) if not np.isnan(time_change_avg) else 0
    score_confidence = 1 - np.mean(scores)
    score_std = np.std(scores)
    seiz_var = seiz_state_variability(seiz_states)
    seiz_dist = pd.Series(seiz_states).value_counts(normalize=True)
    seiz_ent = entropy(seiz_dist)
    share_neutral = positions.count("Neutral") / n_tweets
    unique_positions = len(set(positions))
    first_time = pd.to_datetime(user_df["created_at"].min())
    last_time = pd.to_datetime(user_df["created_at"].max())
    days_span = (last_time - first_time).days + 1
    tweet_density = n_tweets / days_span if days_span > 0 else n_tweets


    low_conf_ratio = roberta_low_conf_ratio(scores)
    stance_ent = stance_entropy(positions)
    reversals = stance_reversals(positions)

    return pd.Series({
        "user_id": user_df["user_id"].iloc[0],
        "change_freq": change_freq,
        "time_weight": time_weight,
        "score_confidence": score_confidence,
        "score_std": score_std,
        "seiz_var": seiz_var,
        "seiz_entropy": seiz_ent,
        "num_tweets": n_tweets,
        "share_neutral": share_neutral,
        "unique_positions": unique_positions,
        "days_span": days_span,
        "tweet_density": tweet_density,
        "low_conf_ratio": low_conf_ratio,
        "stance_entropy": stance_ent,
        "stance_reversals": reversals
    })

In [4]:
filtered_users['position'] = filtered_users.apply(lambda row: classify_position(row['roberta_label'], row['candidate']), axis=1)
filtered_users['seiz_state'] = filtered_users.apply(lambda row: get_seiz_state(row['roberta_label'], row['roberta_score']), axis=1)

user_features_df = filtered_users.groupby('user_id').apply(advanced_user_features).reset_index(drop=True)
labeled_df = user_features_df.merge(razmetka, on="user_id", how="inner")


features = [
    "change_freq", "score_std", "seiz_entropy", "share_neutral", "num_tweets",
    "time_weight", "unique_positions", "days_span", "tweet_density",
    "low_conf_ratio", "stance_entropy", "stance_reversals"
]
X = labeled_df[features]
y = labeled_df["changed_opinion"]


X = X.fillna(X.mean())


X_train, X_test, y_train, y_test = train_test_split(X, y, stratify=y, test_size=0.25, random_state=42)


xgb = XGBClassifier(use_label_encoder=False, eval_metric='logloss', random_state=42)
logreg = LogisticRegression(max_iter=1000)
voting = VotingClassifier(estimators=[('xgb', xgb), ('lr', logreg)], voting='soft')

xgb.fit(X_train, y_train)
logreg.fit(X_train, y_train)
voting.fit(X_train, y_train)


xgb_pred = xgb.predict(X_test)
xgb_proba = xgb.predict_proba(X_test)[:, 1]

logreg_pred = logreg.predict(X_test)
logreg_proba = logreg.predict_proba(X_test)[:, 1]

voting_pred = voting.predict(X_test)
voting_proba = voting.predict_proba(X_test)[:, 1]


xgb_report = classification_report(y_test, xgb_pred, output_dict=True)
logreg_report = classification_report(y_test, logreg_pred, output_dict=True)
voting_report = classification_report(y_test, voting_pred, output_dict=True)

xgb_roc = roc_auc_score(y_test, xgb_proba)
logreg_roc = roc_auc_score(y_test, logreg_proba)
voting_roc = roc_auc_score(y_test, voting_proba)


results = {
    "XGBoost ROC AUC": xgb_roc,
    "LogReg ROC AUC": logreg_roc,
    "Voting ROC AUC": voting_roc,
    "XGBoost F1 (1)": xgb_report["1"]["f1-score"],
    "LogReg F1 (1)": logreg_report["1"]["f1-score"],
    "Voting F1 (1)": voting_report["1"]["f1-score"]
}
print("📊 Final Evaluation Results:")
for k, v in results.items():
    print(f"{k}: {v:.4f}")



X_all = user_features_df[features].copy()
X_all = X_all.fillna(X_all.mean())

user_features_df["predicted_label"] = logreg.predict(X_all)
user_features_df["probability_1"] = logreg.predict_proba(X_all)[:, 1]


user_features_df.to_csv("model_predictions_for_all_users.csv", index=False)

print("✅ File 'model_predictions_for_all_users.csv' saved successfully.")

  time_weight = 1 / (1 + np.log1p(time_change_avg)) if not np.isnan(time_change_avg) else 0
  time_weight = 1 / (1 + np.log1p(time_change_avg)) if not np.isnan(time_change_avg) else 0
  time_weight = 1 / (1 + np.log1p(time_change_avg)) if not np.isnan(time_change_avg) else 0
  time_weight = 1 / (1 + np.log1p(time_change_avg)) if not np.isnan(time_change_avg) else 0
  time_weight = 1 / (1 + np.log1p(time_change_avg)) if not np.isnan(time_change_avg) else 0
  time_weight = 1 / (1 + np.log1p(time_change_avg)) if not np.isnan(time_change_avg) else 0
  time_weight = 1 / (1 + np.log1p(time_change_avg)) if not np.isnan(time_change_avg) else 0
  time_weight = 1 / (1 + np.log1p(time_change_avg)) if not np.isnan(time_change_avg) else 0
  time_weight = 1 / (1 + np.log1p(time_change_avg)) if not np.isnan(time_change_avg) else 0
  time_weight = 1 / (1 + np.log1p(time_change_avg)) if not np.isnan(time_change_avg) else 0
  time_weight = 1 / (1 + np.log1p(time_change_avg)) if not np.isnan(time_change_

📊 Final Evaluation Results:
XGBoost ROC AUC: 0.7249
LogReg ROC AUC: 0.7763
Voting ROC AUC: 0.7551
XGBoost F1 (1): 0.4682
LogReg F1 (1): 0.5274
Voting F1 (1): 0.5230
✅ File 'model_predictions_for_all_users.csv' saved successfully.


In [None]:

device = 0 if torch.cuda.is_available() else -1
print(f"✅ Using device: {'GPU' if device == 0 else 'CPU'}")

tweets_df = pd.read_csv("annotated_tweets_all.csv")
pred_df = pd.read_csv("model_predictions_for_all_users.csv")


change_users = pred_df[pred_df["probability_1"] > 0.5]["user_id"]
filtered_tweets = tweets_df[tweets_df["user_id"].isin(change_users)].copy()

def clean_text(text):
    text = str(text).lower()
    text = re.sub(r"http\S+", "", text)
    text = re.sub(r"@\w+", "", text)
    text = re.sub(r"#", "", text)
    text = re.sub(r"\s+", " ", text).strip()
    return text

filtered_tweets["clean_text"] = filtered_tweets["tweet"].apply(clean_text)


classifier = pipeline("zero-shot-classification", model="facebook/bart-large-mnli", device=device)

labels = ["Economy", "Public Health / COVID-19", "Corruption / Integrity",
          "Social Justice", "Foreign Policy / Security", "Other"]


tqdm.pandas()
filtered_tweets["topic"] = filtered_tweets["clean_text"].progress_apply(
    lambda x: classifier(x, labels)["labels"][0]
)


topic_counts = filtered_tweets["topic"].value_counts()
print("📈 Topic Distribution:\n", topic_counts)

filtered_tweets.to_csv("classified_change_tweets_ml.csv", index=False)
topic_counts.to_csv("topic_distribution_ml.csv")

✅ Using device: GPU


Device set to use cuda:0
100%|██████████| 302008/302008 [6:52:28<00:00, 12.20it/s]  


📈 Topic Distribution:
 topic
Other                        226632
Economy                       22515
Corruption / Integrity        20699
Social Justice                15391
Foreign Policy / Security     12137
Public Health / COVID-19       4634
Name: count, dtype: int64
