<h1><center>Code NLP<center></h1>

##### Imports:

In [None]:
# Standard library imports
from collections import Counter
from dataclasses import dataclass
import csv
import random
import re
import string
from typing import List, Set

# Third-party imports
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import torch
from prettytable import PrettyTable

# NLTK imports
import nltk
from nltk import word_tokenize
from nltk.corpus import stopwords
from nltk.sentiment.vader import SentimentIntensityAnalyzer
from nltk.stem import WordNetLemmatizer
from nltk.tokenize import sent_tokenize
from nltk.util import ngrams

# Gensim imports
from gensim.corpora.dictionary import Dictionary
from gensim.models.coherencemodel import CoherenceModel
from gensim.models.doc2vec import TaggedDocument

# Scikit-learn imports
from sklearn.feature_extraction.text import TfidfTransformer, CountVectorizer, TfidfVectorizer
from sklearn.linear_model import Ridge
from sklearn.metrics import (
    accuracy_score,
    adjusted_rand_score,
    calinski_harabasz_score,
    davies_bouldin_score,
    f1_score,
    mean_squared_error,
    r2_score,
    silhouette_score
)
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.manifold import trustworthiness

# Transformers and datasets imports
from datasets import Dataset
from transformers import (
    AutoTokenizer, 
    AutoModelForSequenceClassification,
    Trainer,
    TrainingArguments,
    TextClassificationPipeline,
    AutoConfig
)

# Other ML/DL imports
import tensorflow_hub as hub
from umap.umap_ import UMAP
import hdbscan
from skopt import gp_minimize
from skopt.space import Real, Integer, Categorical
import pandas as pd

In [None]:
# supress warnings
import warnings
warnings.filterwarnings('ignore')

In [None]:
nltk.download('punkt')
nltk.download('stopwords')
nltk.download('wordnet')
nltk.download('vader_lexicon')

##### Import Datasets:

In [None]:
# Import Data Frames
with open ('0_data/statements.csv', 'r') as f:
    generated = pd.read_csv(f)

with open ('0_data/final_labeled_dataset.csv', 'r') as f:
    parliament = pd.read_csv(f)

In [None]:
generated.info()

In [None]:
parliament.info()

In [None]:
generated.statement.sample(5)

In [None]:
parliament.translated_text.sample(5)

In [None]:
generated.isna().sum()

In [None]:
parliament.isna().sum()

##### Preprocessing Class:

In [None]:
@dataclass
class Preprocess:
    """
    Handles text preprocessing tasks including removal of stopwords, lemmatization,
    and generation of trigrams. The class is designed to streamline the preparation
    of textual data for further natural language processing tasks.

    It includes utilities for basic text cleaning (removal of non-alphabetical
    characters), tokenization, filtering of stopwords, lemmatization, and creating
    trigram representations. The class requires nltk and its necessary resources
    such as stopwords and the WordNet lemmatizer.

    :ivar _stopwords: Set of stopwords used to filter out common words that do not
                      contribute to the meaning of the text.
    :type _stopwords: Set[str]
    :ivar _lemmatizer: Instance of WordNetLemmatizer used for lemmatizing words
                       to their base forms.
    :type _lemmatizer: WordNetLemmatizer
    """
    _stopwords: Set[str] = None
    _lemmatizer: WordNetLemmatizer = None

    def __post_init__(self):
        self._stopwords = set(nltk.corpus.stopwords.words('english'))
        self._lemmatizer = WordNetLemmatizer()

    def rm_stopwords(self, text: str) -> str:
        """
        Removes stopwords from the given text.

        This method processes the input text and removes all words found in the
        internal stopwords list, returning the cleaned text.

        :param text: The input text to process and remove stopwords from.
        :type text: str
        :return: The text with all stopwords removed.
        :rtype: str
        """
        return ' '.join([word for word in text.split()
                         if word not in self._stopwords])

    def lemmatize_doc(self, tokens: List[str]) -> List[str]:
        """
        Lemmatizes a document by processing each token.

        This method takes a list of tokens (words), filters out non-alphabetic tokens,
        stopwords, and words with a length of 2 or less. The remaining tokens are
        lemmatized using the internal lemmatizer.

        :param tokens: A list of word tokens to be lemmatized
        :type tokens: List[str]
        :return: A list of lemmatized tokens meeting the filtering criteria
        :rtype: List[str]
        """
        return [self._lemmatizer.lemmatize(word) for word in tokens
                if word.isalpha() and word.lower() not in self._stopwords and len(word) > 2]

    def trigrams(self, text: str) -> List[tuple]:
        """
        Generate trigrams from the given text by tokenizing it into words and creating
        groups of three consecutive tokens.

        :param text: String input representing the text to process.
        :return: A list of tuples where each tuple contains a trigram, i.e., three
                 consecutive tokens from the provided text.
        """
        tokens = self.tokenize_doc(text)
        return list(ngrams(tokens, 3))

    @staticmethod
    def basic_clean(text: str) -> str:
        """
        Cleans a given text by removing all non-alphabetical characters and converting it to lowercase.

        This method processes a given text, removes all non-alphabetic characters, and ensures
        the resulting text is in lowercase. It returns the cleaned-up version of the string.

        :param text: The input text to be cleaned.
        :type text: str
        :return: A cleaned version of the input text, containing only lowercase alphabetic
            characters and spaces.
        :rtype: str
        """
        return re.sub(r'[^a-z\s]', '', str(text).lower())

    @staticmethod
    def tokenize_doc(text: str) -> List[str]:
        """
        Tokenizes a given text into lowercase words. This method processes the input text by first converting
        all characters to lowercase and then splitting the text into individual tokens (words). It is a simple
        and effective way to prepare text for natural language processing tasks, enabling consistent analysis
        by normalizing letter cases.

        :param text: The input string to be tokenized. It represents a document or text fragment that needs
            processing.
        :type text: str
        :return: A list of lowercase word tokens extracted from the input text.
        :rtype: List[str]
        """
        return word_tokenize(text.lower())

In [None]:
# initialize the preprocess class
preprocessor = Preprocess()

## Analysis

### Topic modeling

In [None]:
# set seed variable
SEED = 42

In [None]:
def tune_umap_parameters(embeddings, n_calls: int=50, verbose: bool=False):
    """
    Tunes the parameters of UMAP to optimize the trustworthiness score of the dimensionality
    reduction performed on a given embedding. This function uses the Scikit-Optimize
    `gp_minimize` function to optimize `n_neighbors`, `min_dist`, and `metric` parameters
    of UMAP over a specified number of calls. It returns the best parameter set and a
    configured UMAP reducer object initialized with these parameters.

    :param embeddings: The data to which dimensionality reduction is applied. It is expected
        to be a multidimensional array-like structure.
    :type embeddings: array-like of shape (n_samples, n_features)
    :param n_calls: The number of optimization iterations to run while searching for the
        optimal set of UMAP parameters. Defaults to 50.
    :type n_calls: int
    :param verbose: Indicates whether verbose output should be enabled during optimization
        iterations. Defaults to False.
    :type verbose: bool
    :return: A tuple containing the best parameters found and a configured UMAP reducer
        object. The reducer is initialized using the best found parameters.
    :rtype: Tuple[Dict[str, Any], UMAP]
    """
    space = [
        Integer(10, 50, name='n_neighbors'),
        Real(0.0, 0.3, name='min_dist'),
        Categorical(['euclidean'], name='metric')
    ]

    def objective(params, embeddings, n_components=2):
        n_neighbors, min_dist, metric = params

        reducer = UMAP(
            n_neighbors=n_neighbors,
            min_dist=min_dist,
            metric=metric,
            n_components=n_components,
            random_state=SEED
        )

        embedding = reducer.fit_transform(embeddings)

        trust_score = trustworthiness(
            embeddings,
            embedding,
            n_neighbors=min(20, len(embeddings) - 1)
        )

        return -trust_score

    result = gp_minimize(
        lambda params: objective(params, embeddings),
        space,
        n_calls=n_calls,
        random_state=SEED,
        verbose=verbose
    )

    best_params = {
        'n_neighbors': result.x[0],
        'min_dist': result.x[1],
        'metric': result.x[2]
    }

    print("\nBest parameters:")
    for param, value in best_params.items():
        print(f"{param}: {value}")

    print(f"\nBest score: {-result.fun:.4f}")

    best_reducer = UMAP(
        **best_params,
        n_components=2,
        random_state=SEED
    )

    return best_params, best_reducer

In [None]:
def tune_hdbscan_parameters(embeddings,  n_calls: int=50, verbose: bool=False):
    """
    Tunes HDBSCAN hyperparameters using the Bayesian optimization framework.

    This function optimizes the hyperparameters for the HDBSCAN clustering algorithm
    to maximize cluster quality, stability, and minimize noise ratio. The optimization
    is carried out using a Gaussian process-based minimizer, `gp_minimize`. It returns
    the best set of hyperparameters and the trained HDBSCAN clusterer.

    :param embeddings: The data to be clustered. It should be a 2D array-like structure where
        rows represent individual samples and columns represent features.
    :type embeddings: numpy.ndarray or list of lists
    :param n_calls: The number of calls to the optimization algorithm, specifying how many
        sets of parameters will be evaluated. Defaults to 50.
    :type n_calls: int
    :param verbose: Whether to print intermediate results during the optimization process.
        Defaults to False.
    :type verbose: bool
    :return: A tuple containing the best hyperparameters as a dictionary and a trained
        HDBSCAN instance with those parameters.
    :rtype: tuple
    """
    space = [
        Integer(3, 15, name='min_cluster_size'),
        Integer(3, 10, name='min_samples'),
        Real(0.0, 0.5, name='cluster_selection_epsilon'),
        Categorical(['euclidean'], name='metric')
    ]

    def objective(params, embeddings, n_runs=5):
        min_cluster_size, min_samples, cluster_selection_epsilon, metric = params

        cluster_results = []
        silhouette_scores = []

        for _ in range(n_runs):
            clusterer = hdbscan.HDBSCAN(
                min_cluster_size=min_cluster_size,
                min_samples=min_samples,
                cluster_selection_epsilon=cluster_selection_epsilon,
                metric=metric
            )

            labels = clusterer.fit_predict(embeddings)
            cluster_results.append(labels)

            n_clusters = len(set(labels)) - (1 if -1 in labels else 0)
            if n_clusters > 1:
                mask = labels != -1
                if np.sum(mask) > 1:
                    sil_score = silhouette_score(embeddings[mask], labels[mask])
                    silhouette_scores.append(sil_score)

        stability_scores = []
        for i in range(len(cluster_results)):
            for j in range(i + 1, len(cluster_results)):
                ari = adjusted_rand_score(cluster_results[i], cluster_results[j])
                stability_scores.append(ari)

        mean_stability = np.mean(stability_scores) if stability_scores else 0
        mean_silhouette = np.mean(silhouette_scores) if silhouette_scores else 0

        noise_ratio = np.sum(cluster_results[-1] == -1) / len(cluster_results[-1])

        composite_score = (0.4 * mean_stability +
                          0.4 * mean_silhouette -
                          0.2 * noise_ratio)

        return -composite_score

    result = gp_minimize(
        lambda params: objective(params, embeddings),
        space,
        n_calls=n_calls,
        random_state=SEED,
        verbose=verbose
    )

    best_params = {
        'min_cluster_size': result.x[0],
        'min_samples': result.x[1],
        'cluster_selection_epsilon': result.x[2],
        'metric': result.x[3]
    }

    print("\nBest HDBSCAN parameters:")
    for param, value in best_params.items():
        print(f"{param}: {value}")

    print(f"\nBest score: {-result.fun:.4f}")

    clusterer = hdbscan.HDBSCAN(**best_params)
    labels = clusterer.fit_predict(embeddings)
    n_clusters = len(set(labels)) - (1 if -1 in labels else 0)
    noise_points = sum(1 for label in labels if label == -1)

    print(f"\nNumber of clusters: {n_clusters}")
    print(f"Number of noise points: {noise_points} ({noise_points/len(labels):.2%})")

    return best_params, clusterer

In [None]:
def get_closest_words(topic_vector, word_vectors, n=10):
    """
    Get the indices of the top `n` closest words to the given topic vector.

    This function calculates cosine similarities between a topic vector and a
    set of word vectors. It then identifies the top `n` indices of the word vectors
    that are most similar to the topic vector.

    :param topic_vector: A 1D numerical array representing the topic vector.
    :param word_vectors: A 2D numerical array where each row corresponds to a
        word vector.
    :param n: An integer representing the number of closest words to retrieve.
        Defaults to 10.
    :return: A 1D numpy array containing the indices of the top `n` closest
        word vectors in descending order of similarity.
    """
    similarities = cosine_similarity([topic_vector], word_vectors)[0]
    return np.argsort(similarities)[-n:][::-1]

In [None]:
def tm_cleaning(doc):
    """
    Cleans and preprocesses a given document for text mining purposes.

    This function processes a text document through several steps: basic cleaning,
    tokenization, and lemmatization. These steps prepare the textual content for
    further analysis or machine learning tasks by transforming it into a format
    suitable for natural language processing.

    :param doc: The text document to be cleaned and preprocessed.
    :type doc: str

    :return: A list of lemmatized tokens from the cleaned document.
    :rtype: list
    """
    doc = preprocessor.basic_clean(doc)
    tokens = preprocessor.tokenize_doc(doc)
    tokens = preprocessor.lemmatize_doc(tokens)
    return tokens

In [None]:
# group generated df
df_chatgpt = generated[generated.provider == 'chatgpt']
df_deepseek = generated[generated.provider == 'deepseek']

In [None]:
# create document vectors for topic modeling analysis
docs_chatgpt = [TaggedDocument(doc, [i]) for i, doc in enumerate(df_chatgpt.statement.apply(tm_cleaning).tolist())]
docs_deepseek = [TaggedDocument(doc, [i]) for i, doc in enumerate(df_deepseek.statement.apply(tm_cleaning).tolist())]

In [None]:
# convert the previously created objects back into plain text strings
texts_chatgpt = [' '.join(doc.words) if hasattr(doc, 'words') else doc for doc in docs_chatgpt]
texts_deepseek = [' '.join(doc.words) if hasattr(doc, 'words') else doc for doc in docs_deepseek]

In [None]:
# create and normalize sentence embeddings
embed = hub.load('https://tfhub.dev/google/universal-sentence-encoder/4')
scaler = StandardScaler()
embeddings_chatgpt = scaler.fit_transform(embed(texts_chatgpt).numpy())
embeddings_deepseek = scaler.fit_transform(embed(texts_deepseek).numpy())

In [None]:
# tune umap params gpt
best_umap_params_chatgpt, reducer_chatgpt = tune_umap_parameters(embeddings_chatgpt)

In [None]:
# tune umap params deepseek
best_umap_params_deepseek, reducer_deepseek = tune_umap_parameters(embeddings_deepseek)

In [None]:
# tune hdbscan params gpt
best_hdbscan_params_chatgpt, clusterer_chatgpt = tune_hdbscan_parameters(embeddings_chatgpt)

In [None]:
# tune hdbscan params deepseek
best_hdbscan_params_deepseek, clusterer_deepseek = tune_hdbscan_parameters(embeddings_deepseek)

In [None]:
# fit transform umap embeddings
umap_embeddings_chatgpt = reducer_chatgpt.fit_transform(embeddings_chatgpt)
umap_embeddings_deepseek = reducer_deepseek.fit_transform(embeddings_deepseek)

In [None]:
# fit hdbscan clusters
cluster_labels_chatgpt = clusterer_chatgpt.fit_predict(umap_embeddings_chatgpt)
cluster_labels_deepseek = clusterer_deepseek.fit_predict(umap_embeddings_deepseek)

In [None]:
# store results in dictionary
umap_hdbscan_results = {
    'ChatGPT': (umap_embeddings_chatgpt, cluster_labels_chatgpt),
    'DeepSeek': (umap_embeddings_deepseek, cluster_labels_deepseek)
}

In [None]:
# plot reduced document clusters
fig, axes = plt.subplots(1, 2, figsize=(20, 8))
fig.suptitle('Document Clusters Comparison', fontsize=16, y=1.05)

for idx, (name, res) in enumerate(umap_hdbscan_results.items()):
    scatter = axes[idx].scatter(res[0][:, 0],
                               res[0][:, 1],
                               c=res[1],
                               cmap='Spectral',
                               alpha=0.6)
    fig.colorbar(scatter, ax=axes[idx])
    axes[idx].set_title(f'{name}')
    axes[idx].set_xlabel('UMAP 1')
    axes[idx].set_ylabel('UMAP 2')

plt.tight_layout()
plt.show()

In [None]:
# calculates topic vectors from the clustered gpt documents
n_clusters_chatgpt = len(np.unique(cluster_labels_chatgpt[cluster_labels_chatgpt != -1]))

topic_vectors = []

for i in range(n_clusters_chatgpt):
    cluster_docs = embeddings_chatgpt[cluster_labels_chatgpt == i]
    centroid = np.mean(cluster_docs, axis=0)
    topic_vectors.append(centroid)

topic_vectors_chatgpt = np.array(topic_vectors)

In [None]:
# calculates topic vectors from the clustered deepseek documents
n_clusters_deepseek = len(np.unique(cluster_labels_deepseek[cluster_labels_deepseek != -1]))

topic_vectors = []

for i in range(n_clusters_deepseek):
    cluster_docs = embeddings_deepseek[cluster_labels_deepseek == i]
    centroid = np.mean(cluster_docs, axis=0)
    topic_vectors.append(centroid)

topic_vectors_deepseek = np.array(topic_vectors)

##### Results:

In [None]:
def top_words(cluster_labels,
              texts,
              topic_vectors,
              embeddings,
              n_words=10):
    """
    Retrieves the top words from each cluster topic based on a calculated TF-IDF
    matrix. The analysis is performed by grouping documents according to their
    associated cluster labels, then identifying the top words within each cluster
    using their significance scores.

    :param cluster_labels: Cluster labels assigned to each document, where -1
        indicates noise and other integers correspond to specific clusters.
    :type cluster_labels: numpy.ndarray
    :param texts: List of documents (strings) from which clusters and word
        frequencies are extracted.
    :type texts: list
    :param topic_vectors: Feature vectors associated with the topics, describing
        underlying dimensions of the clusters.
    :type topic_vectors: numpy.ndarray
    :param embeddings: Precomputed vector embeddings for documents or words,
        enhancing semantic representation of the data.
    :type embeddings: numpy.ndarray
    :param n_words: Number of top words to retrieve for each cluster topic.
        Defaults to 10 if not specified.
    :type n_words: int

    :return: A list containing the top words across all clusters, flattened into a
        single collection from nested topics.
    :rtype: list
    """
    n_clusters = len(np.unique(cluster_labels[cluster_labels != -1]))

    count_vectorizer = CountVectorizer()
    doc_term_matrix = count_vectorizer.fit_transform(texts)
    vocabulary = count_vectorizer.get_feature_names_out()

    c_tf_idf_matrix = np.zeros((n_clusters, len(vocabulary)))

    for cluster_id in range(n_clusters):

        cluster_docs = doc_term_matrix[cluster_labels == cluster_id]

        if cluster_docs.shape[0] == 0:
            continue

        cluster_tf = np.array(cluster_docs.sum(axis=0).flatten())[0]
        total_docs = len(texts)
        cluster_size = cluster_docs.shape[0]

        tf_idf = cluster_tf * np.log1p(total_docs / (cluster_size + 1))
        c_tf_idf_matrix[cluster_id] = tf_idf

    all_top_words = []

    for topic_idx in range(c_tf_idf_matrix.shape[0]):
        top_n_idx = c_tf_idf_matrix[topic_idx].argsort()[-n_words:][::-1]
        top_words = [vocabulary[idx] for idx in top_n_idx]
        all_top_words.append(top_words)

        print(f"\nTopic {topic_idx + 1} Top Words:")
        print(", ".join(top_words))

    return [item for sublist in all_top_words for item in sublist]

In [None]:
top_words_chatgpt = top_words(cluster_labels_chatgpt,
                              texts_chatgpt,
                              topic_vectors_chatgpt,
                              embeddings_chatgpt,
                              n_words=10)

In [None]:
top_words_deepseek = top_words(cluster_labels_deepseek,
                               texts_deepseek,
                               topic_vectors_deepseek,
                               embeddings_deepseek,
                               n_words=10)

In [None]:
unique_words_chatgpt = set(top_words_chatgpt) - set(top_words_deepseek)
print(f'Unique words in ChatGPT: {unique_words_chatgpt}')

unique_words_deepseek = set(top_words_deepseek) - set(top_words_chatgpt)
print(f'Unique words deepseek: {unique_words_deepseek}')

common_words = set(top_words_deepseek) & set(top_words_chatgpt)
print(f'Common words: {common_words}')

In [None]:
def evaluate_topic_modeling(cluster_labels, embeddings, texts, topic_vectors, model_results=None):
    """
    Evaluates the performance of a topic modeling algorithm using various clustering,
    coherence, distinctiveness, and topic size metrics.

    The function computes clustering metrics such as silhouette score, calinski-harabasz
    score, and davies-bouldin score for all valid cluster labels. It also generates top
    terms for each topic cluster and computes coherence metrics using these terms. Additionally,
    distinctiveness of topic vectors is evaluated using cosine similarity and topic size
    metrics such as standard deviation, range, and noise ratio are calculated.

    :param cluster_labels: An array or list of cluster labels assigned to each data point.
        Cluster labels of -1 indicate outliers or noise.
    :type cluster_labels: array-like of shape (n_samples,)
    :param embeddings: The embeddings or high-dimensional representations of the data points
        used for clustering.
    :type embeddings: array-like of shape (n_samples, n_features)
    :param texts: The original documents or texts associated with each data point.
    :type texts: list of str
    :param topic_vectors: A matrix where each row represents the vectorized representation
        of a topic.
    :type topic_vectors: array-like of shape (n_topics, n_features)
    :param model_results: Optional dictionary containing precomputed model outputs or
        intermediate results, such as cluster information or existing metrics.
    :type model_results: dict, optional
    :return: A dictionary containing various evaluation metrics categorized into
        clustering metrics, coherence, distinctiveness, and size metrics.
    :rtype: dict
    """
    valid_mask = cluster_labels != -1
    clustering_metrics = {}

    if np.sum(valid_mask) > 1:
        clustering_metrics = {
            'silhouette_score': silhouette_score(
                embeddings[valid_mask],
                cluster_labels[valid_mask]
            ),
            'calinski_harabasz_score': calinski_harabasz_score(
                embeddings[valid_mask],
                cluster_labels[valid_mask]
            ),
            'davies_bouldin_score': davies_bouldin_score(
                embeddings[valid_mask],
                cluster_labels[valid_mask]
            )
        }

    tokenized_texts = [text.split() for text in texts]
    dictionary = Dictionary(tokenized_texts)

    topic_words = []
    for i in range(len(topic_vectors)):
        topic_mask = cluster_labels == i
        topic_texts = [text for text, mask in zip(texts, topic_mask) if mask]
        words = ' '.join(topic_texts).split()
        word_freq = {}
        for word in words:
            word_freq[word] = word_freq.get(word, 0) + 1
        sorted_words = sorted(word_freq.items(), key=lambda x: x[1], reverse=True)
        topic_words.append([word for word, freq in sorted_words[:10]])  # top 10 words

    coherence_model = CoherenceModel(
        topics=topic_words,
        texts=tokenized_texts,
        dictionary=dictionary,
        coherence='c_v'
    )

    coherence_metrics = {
        'c_v_coherence': coherence_model.get_coherence()
    }

    topic_similarities = cosine_similarity(topic_vectors)
    np.fill_diagonal(topic_similarities, 0)
    distinctiveness_metrics = {
        'mean_similarity': np.mean(topic_similarities),
        'max_similarity': np.max(topic_similarities)
    }

    topic_sizes = np.bincount(cluster_labels[cluster_labels != -1])
    size_metrics = {
        'size_std': np.std(topic_sizes),
        'size_range': np.ptp(topic_sizes),
        'noise_ratio': np.sum(cluster_labels == -1) / len(cluster_labels)
    }

    evaluation_results = {
        'clustering_metrics': clustering_metrics,
        'coherence': coherence_metrics,
        'distinctiveness': distinctiveness_metrics,
        'size_metrics': size_metrics
    }

    print("\nEvaluation Results")
    print("-" * 50)

    for category, metrics in evaluation_results.items():
        print(f"\n{category.replace('_', ' ').title()}:")
        for metric, value in metrics.items():
            print(f"{metric.replace('_', ' ').title()}: {value:.4f}")

    return evaluation_results

In [None]:
eval_res_chatgpt = evaluate_topic_modeling(cluster_labels_chatgpt,
                                            embeddings_chatgpt,
                                            texts_chatgpt,
                                            topic_vectors_chatgpt,
                                            model_results=umap_hdbscan_results)

In [None]:
eval_res_deepseek = evaluate_topic_modeling(cluster_labels_deepseek,
                                            embeddings_deepseek,
                                            texts_deepseek,
                                            topic_vectors_deepseek,
                                            model_results=umap_hdbscan_results)

### Sentiment Analysis

##### VADER:

In [None]:
# Sentiment analysis
sia = SentimentIntensityAnalyzer()
def get_sentiment(text):
    """
    Analyzes the sentiment of the given text and returns the compound sentiment score.

    The function utilizes a sentiment intensity analyzer to evaluate the sentiment
    polarity of the input text. It calculates and returns the compound score, which
    is a numerical measure of sentiment ranging from -1 (most negative) to 1
    (most positive).

    :param text: The input text for which the sentiment needs to be analyzed.
    :type text: str
    :return: The compound sentiment score of the analyzed text.
    :rtype: float
    """
    sentiment = sia.polarity_scores(text)
    return sentiment['compound']

In [None]:
# apply model
parliament['sentiment_vader'] = parliament['translated_text'].apply(get_sentiment)
generated['sentiment_vader'] = generated['statement'].apply(get_sentiment)

###### Results:

In [None]:
def get_descriptives(sentiment):
    """
    Compute and return descriptive statistics for the given sentiment data.

    This function calculates and returns the mean, standard deviation, minimum,
    maximum, and count of the provided sentiment data.

    :param sentiment: A Pandas Series or iterable containing sentiment data
                      for which descriptive statistics will be calculated.
    :type sentiment: pandas.Series | list | iterable
    :return: A list containing the mean, standard deviation, minimum,
             maximum, and count of the sentiment data, in that order.
    :rtype: list[float | int]
    """
    return [sentiment.mean(), sentiment.std(), sentiment.min(), sentiment.max(), sentiment.count()]

sentiment_vader_parliament = get_descriptives(parliament['sentiment_vader'])
is_openai = generated['provider'] == 'chatgpt'
is_deepseek = generated['provider'] == 'deepseek'
sentiment_vader_deepseek = get_descriptives(generated[is_deepseek]['sentiment_vader'])
sentiment_vader_chatgpt = get_descriptives(generated[is_openai]['sentiment_vader'])

In [None]:
# Create table
sentiment_table = PrettyTable()
sentiment_table.field_names = ['Source', 'Mean', 'Std Dev', 'Min', 'Max', 'Count']
sentiment_table.add_row(['Original', *sentiment_vader_parliament])
sentiment_table.add_row(['ChatGPT', *sentiment_vader_chatgpt])
sentiment_table.add_row(['DeepSeek', *sentiment_vader_deepseek])

In [None]:
# Print table
print('Sentiment Analysis (Vader) Results:')
print('> uncleaned data')
print(sentiment_table)

##### RoBERTa (parlasent):

In [None]:
sentiment_analyzer = "classla/xlm-r-parlasent"
tokenizer = AutoTokenizer.from_pretrained(sentiment_analyzer)
def get_token_count(text):
    tokens = tokenizer(text, add_special_tokens=False)["input_ids"]
    return len(tokens)

parliament['token_count'] = parliament['translated_text'].apply(get_token_count) # ✅ should run checked with df.info()
parliament['token_count'].describe()

In [None]:
# Calculate how many texts exceed the token limit (512 tokens)
# Only relevant for the parliament dataset as the generated dataset is capped at 400 tokens per piece
over_limit_count = (parliament['token_count'] > 512).sum()
print(f"Number of texts exceeding 512 tokens: {over_limit_count} out of {len(parliament)} ({over_limit_count/len(generated)*100:.2f}%)")

In [None]:
# Initialize the model components
sentiment_analyzer = "classla/xlm-r-parlasent"
tokenizer = AutoTokenizer.from_pretrained(sentiment_analyzer)
config = AutoConfig.from_pretrained(sentiment_analyzer)
model = AutoModelForSequenceClassification.from_pretrained(sentiment_analyzer)

# Sweet piece of code to set the device to use hardware acceleration
if torch.cuda.is_available():
    device = 0 # CUDA
elif torch.backends.mps.is_available():
    device = 'mps' # Apple Silicon
else:
    device = -1 # CPU
print(f"Using device: {device}")

# Using only the last 510 tokens of the text for sentiment analysis
def top_token(text):
    tokens = tokenizer(text, add_special_tokens=False)["input_ids"]
    last_tokens = tokens[-510:]  # Truncate to last 510
    input_ids = tokenizer.build_inputs_with_special_tokens(last_tokens)
    return tokenizer.decode(input_ids, skip_special_tokens=True)

# Create the pipeline with automatic device detection
sentiment_analysis = TextClassificationPipeline(
    model=model, 
    tokenizer=tokenizer, 
    return_all_scores=True,
    task='sentiment_analysis', 
    device=device,
    function_to_apply="none"
)

# Apply sentiment analysis
generated['sentiment_bert'] = generated['statement'].apply(lambda x: sentiment_analysis(top_token(x))[0])
print('finished sentiment prediction for generated statements')

parliament['sentiment_bert'] = parliament['translated_text'].apply(lambda x: sentiment_analysis(top_token(x))[0])
print('finished sentiment prediction for original statements')


In [None]:
def extract_sentiment_parts(sentiment_data):
    # Extract the first element from the list if it's a list
    if isinstance(sentiment_data, list):
        sentiment_data = sentiment_data[0]
        
    # Extract label and score
    label = sentiment_data.get('label', '')
    score = sentiment_data.get('score', 0.0)
    
    return label, score

# Apply the function to create new columns
generated['sentiment_bert_label'] = generated['sentiment_bert'].apply(lambda x: extract_sentiment_parts(x)[0])
generated['sentiment_bert_score'] = generated['sentiment_bert'].apply(lambda x: extract_sentiment_parts(x)[1])

parliament['sentiment_bert_label'] = parliament['sentiment_bert'].apply(lambda x: extract_sentiment_parts(x)[0])
parliament['sentiment_bert_score'] = parliament['sentiment_bert'].apply(lambda x: extract_sentiment_parts(x)[1])

# Display a sample of the results
print("Generated data sample:")
print(generated[['sentiment_bert_label', 'sentiment_bert_score']].head())

print("\nOriginal data sample:")
print(parliament[['sentiment_bert_label', 'sentiment_bert_score']].head())

###### Results:

In [None]:
# funciton defined above
sentiment_roberta_parliament = get_descriptives(parliament['sentiment_bert_score'])
# boolean mask defined above
sentiment_roberta_deepseek = get_descriptives(generated[is_deepseek]['sentiment_bert_score'])
sentiment_roberta_chatgpt = get_descriptives(generated[is_openai]['sentiment_bert_score'])

In [None]:
# Create and format table
sentiment_table = PrettyTable()
sentiment_table.field_names = ['Source', 'Mean', 'Std Dev', 'Min', 'Max', 'Count']

sentiment_table.add_row(['Original', *sentiment_roberta_parliament])
sentiment_table.add_row(['ChatGPT', *sentiment_roberta_chatgpt])
sentiment_table.add_row(['DeepSeek', *sentiment_roberta_deepseek])

# Print table
print('Sentiment Analysis Results:')
print(sentiment_table)

### Extremity Regression

##### Ridge Regression:

In [None]:
def reg_preprocess_text(dataset):
    return dataset.map(
        lambda x: ' '.join(
            preprocessor.lemmatize_doc(
                preprocessor.tokenize_doc(
                    preprocessor.rm_stopwords(
                        preprocessor.basic_clean(x)  # Remove ['translated_text'] access
                    )
                )
            )
        )
    )

X = parliament['translated_text']
X = reg_preprocess_text(X)

In [None]:
y = parliament['label']

In [None]:
# Split into train/test
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [None]:
# TF-IDF vectorization
vectorizer = TfidfVectorizer(max_features=5000, ngram_range=(1, 2))
X_train_vec = vectorizer.fit_transform(X_train)
X_test_vec = vectorizer.transform(X_test)

In [None]:
# Train Ridge regression model
model = Ridge()
model.fit(X_train_vec, y_train)

In [None]:
# Predict
y_pred = model.predict(X_test_vec)

###### Results:

In [None]:
# Evaluate
mse = mean_squared_error(y_test, y_pred)
r2 = r2_score(y_test, y_pred)
print("Mean Squared Error:", mse)
print("R^2 Score:", r2)

In [None]:
# Plot predictions
plt.figure(figsize=(8, 5))
sns.scatterplot(x=y_test, y=y_pred)
plt.plot([-1, 1], [-1, 1], '--', color='gray')
plt.title("Predicted vs. Actual Extremity")
plt.xlabel("Actual Extremity")
plt.ylabel("Predicted Extremity")
plt.tight_layout()
plt.show()

##### RoBERTa:

In [None]:
import os
os.environ["WANDB_DISABLED"] = "true"

In [None]:
# Load dataset
df = parliament[parliament["translated_text"].notna() & parliament["label"].notna()]

In [None]:
# Split data
train_df, test_df = train_test_split(df, test_size=0.2, random_state=42)

In [None]:
# Prepare datasets
train_df = train_df.rename(columns={"label": "labels"})
test_df = test_df.rename(columns={"label": "labels"})
train_dataset = Dataset.from_pandas(train_df[["translated_text", "labels"]])
test_dataset = Dataset.from_pandas(test_df[["translated_text", "labels"]])

In [None]:
# Use RoBERTa for regression
tokenizer = AutoTokenizer.from_pretrained("roberta-base")

In [None]:
def tokenize(example):
    return tokenizer(example["translated_text"], padding="max_length", truncation=True)

In [None]:
train_dataset = train_dataset.map(tokenize, batched=True)
test_dataset = test_dataset.map(tokenize, batched=True)

In [None]:
# Load model for regression
model = AutoModelForSequenceClassification.from_pretrained(
    "roberta-base",
    num_labels=1,
    problem_type="regression"
)
model.config.hidden_dropout_prob = 0.3  # reduce overfitting

In [None]:
# Define evaluation metrics
def compute_metrics(eval_pred):
    preds, labels = eval_pred
    preds = preds.squeeze()
    return {
        "mse": mean_squared_error(labels, preds),
        "r2": r2_score(labels, preds)
    }

In [None]:
# Training arguments
training_args = TrainingArguments(
    output_dir="./spectrum_bert_results",
    learning_rate=3e-5,
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    num_train_epochs=12,
    weight_decay=0.01,
    logging_dir="./spectrum_logs"
)

In [None]:
# Trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=test_dataset,
    compute_metrics=compute_metrics
)

In [None]:
# train
trainer.train()

###### Results:

In [None]:
# evaluate
trainer.evaluate()

In [None]:
model.save_pretrained("roberta_best")
tokenizer.save_pretrained("roberta_best")

In [None]:
from transformers import AutoTokenizer, AutoModelForSequenceClassification

statements = generated["statement"].tolist()

model_path = "roberta_best"
tokenizer = AutoTokenizer.from_pretrained(model_path)
model = AutoModelForSequenceClassification.from_pretrained(model_path)

model.eval()

# Predict
extremity_scores = []
for text in statements:
    inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True)
    with torch.no_grad():
        outputs = model(**inputs)
        score = outputs.logits.item()
        extremity_scores.append(score)

generated["extremity_score"] = extremity_scores

In [None]:
plt.figure(figsize=(8, 5))
sns.kdeplot(data=generated, x="extremity_score", hue="provider", fill=True, common_norm=False, alpha=0.5)
plt.axvline(0, linestyle="--", color="gray")
plt.title("Distribution of Predicted Extremity Scores by Provider")
plt.xlabel("Extremity Score (-1 = Left, +1 = Right)")
plt.ylabel("Density")
plt.tight_layout()
plt.show()

In [None]:
sns.boxplot(data=generated, x="provider", y="extremity_score")
plt.title("Extremity Score Distribution per LLM")
plt.axhline(0, linestyle="--", color="gray")
plt.ylabel("Predicted Extremity")
plt.show()

In [None]:
print(generated.groupby("provider")["extremity_score"].agg(["mean", "std", "min", "max", "median"]))

In [None]:
chatgpt_df = generated[generated["provider"].str.lower() == "chatgpt"]
num_left_leaning = (chatgpt_df["extremity_score"] < 0).sum()
total = len(chatgpt_df)
print(f"ChatGPT statements leaning left (< 0): {num_left_leaning} out of {total} ({(num_left_leaning/total)*100:.2f}%)")