In [1]:
import pandas as pd
import json
import re
import numpy as np

from sklearn.model_selection import train_test_split
from sklearn.utils import resample
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer
from sklearn.naive_bayes import MultinomialNB
from sklearn.metrics import classification_report
from sklearn.preprocessing import OneHotEncoder
from sklearn.pipeline import Pipeline
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.compose import ColumnTransformer



import nltk
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
from nltk.stem import PorterStemmer

nltk.download('stopwords')
nltk.download('punkt')


[nltk_data] Downloading package stopwords to
[nltk_data]     /Users/erikrubinov/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package punkt to
[nltk_data]     /Users/erikrubinov/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


True

In [3]:
data = json.loads(open("Oppositional_thinking_analysis_dataset.json").read())

In [54]:
"""
Create Dataset
"""

def balance_data(data):
    df = pd.DataFrame(data)
    df.pop('id')
    train_df, test_df = train_test_split(df, test_size=0.2, stratify=df['category'], random_state=42)

    # Handle class imbalance in the training set
    train_df_majority = train_df[train_df.category == 'CRITICAL']
    train_df_minority = train_df[train_df.category == 'CONSPIRACY']

    train_df_minority_upsampled = resample(train_df_minority, 
                                        replace=True,     
                                        n_samples=len(train_df_majority),    
                                        random_state=42)

    train_df_balanced = pd.concat([train_df_majority, train_df_minority_upsampled])

    return train_df_balanced, test_df



In [None]:
def preprocess_easy(text, lem_tag = True, stem_tag = False):
    text = re.sub(r'http\S+', '', text)
    text = re.sub(r'[^A-Za-z\s]', '', text)
    tokens = nltk.word_tokenize(text.lower())

    if lem_tag:
    # Lemmatization
        lemmatizer = WordNetLemmatizer()
        tokens = [lemmatizer.lemmatize(word) for word in tokens]

    if stem_tag:
        stemmer = PorterStemmer()
        tokens = [stemmer.stem(word) for word in tokens]

    return ' '.join(tokens)



def preprocess(text:str, lem_tag = True, stem_tag = False) -> int:

    # Lowercasing
    text = text.lower()
    text = re.sub(r'\d+', '', text) # remove decimals  
    text = re.sub(r'[\:\-\']', '', text)  # Remove specific punctuation
    text = re.sub(r'http\S+', '', text) # Remove URLs
    text = re.sub(r'\s+', ' ', text) # Remove extra whitespace
    text = re.sub(r'[^\w\s]', '', text) # Remove special characters
    text = re.sub(r'\d+\.\d+', '', text)  # Matches one or more digits followed by a dot and one or more digits
    text = re.sub(r'\bcom\b', '', text, flags=re.IGNORECASE)  # Matches "com" at word boundaries (whole word)


    # Tokenization
    tokens = word_tokenize(text)

    # Removing stop words
    stop_words = set(stopwords.words('english'))
    tokens = [word for word in tokens if word not in stop_words]

    # Lemmatization
    if lem_tag:
    # Lemmatization
        lemmatizer = WordNetLemmatizer()
        tokens = [lemmatizer.lemmatize(word) for word in tokens]
    
    if stem_tag:
        stemmer = PorterStemmer()
        tokens = [stemmer.stem(word) for word in tokens]

    return ' '.join(tokens)

In [40]:
class TextPreprocessor(BaseEstimator, TransformerMixin):
    def __init__(self, custom_preprocess_function, use_lemmatization=True, use_stemmanization = True):
        self.use_lemmatization = use_lemmatization
        self.use_stemmanization = use_stemmanization
        self.preprocess_function = custom_preprocess_function

    def fit(self, X, y=None):
        return self

    def transform(self, X, y=None):
        res = [self.preprocess_function(text, self.use_lemmatization, self.use_stemmanization) for text in X]
        return res
    

    def __init__(self, text_transformer, ngram_range = (1,1),  additional_features = []):
        self.text_transformer = text_transformer
        self.additional_features = additional_features
        self.encoder = OneHotEncoder()
        self.vectorizer = CountVectorizer(ngram_range=ngram_range)


    def fit(self, X, y=None):
        # Fit the individual transformers
        self.text_transformer.fit(X['text'])
        if self.additional_features:
            self.encoder.fit(X[self.additional_features]) 

        return self

    def transform(self, X):
      
        text_features = self.text_transformer.transform(X['text'])
     
        if self.additional_features:
            additional_features = self.encoder.transform(X[self.additional_features]).toarray()
            text_features = np.hstack((text_features.toarray(), additional_features))
       
        return text_features
# Pre-processing pipelines
def create_pipelines(i,j,custom_preprocess ):
    pipelines = {
        f'CountVectorizer with preprocessing function {custom_preprocess.__name__} and no Lemmatization and ngram: {i}, {j}': Pipeline([
            ('preprocessor', TextPreprocessor(custom_preprocess_function=custom_preprocess, use_lemmatization=False, use_stemmanization= False)),
            ('vectorizer', CountVectorizer(ngram_range=(i, j))),
            ('classifier', MultinomialNB())
        ]),
        f'TfidfVectorizer with preprocessing function {custom_preprocess.__name__} and no Lemmatization and ngram: {i}, {j}': Pipeline([
            ('preprocessor', TextPreprocessor(custom_preprocess_function=custom_preprocess, use_lemmatization=False, use_stemmanization= False)),
            ('vectorizer', TfidfVectorizer(ngram_range=(i, j))),
            ('classifier', MultinomialNB())
        ])
    }
    return pipelines

# Function to train and evaluate a Naïve Bayes model
def train_and_evaluate(train_df, test_df, pipeline):
    X_train = train_df['text']
    #X_train = test_df[['text', 'uppercase_amount', 'comment_length']]

    y_train = train_df['category']
    X_test = test_df['text']
    y_test = test_df['category']
    
    pipeline.fit(X_train, y_train)
    y_pred = pipeline.predict(X_test)
    
    print(f"Results for {pipeline.named_steps['vectorizer'].__class__.__name__}:")
    print(classification_report(y_test, y_pred))


# Train and evaluate models with different pipelines
train_df_balanced = balance_data(data)[0]
test_df = balance_data(data)[1]
for preprocess_function in [preprocess, preprocess_easy]:
    for i in range(1,3):
        for j in range(1,3):
            if i<=j:
                pipelines = create_pipelines(i,j, preprocess_function)
                for name, pipeline in pipelines.items():
                    print(f"Evaluating {name}...")
                    train_and_evaluate(train_df_balanced, test_df, pipeline)

Evaluating CountVectorizer with preprocessing function preprocess and no Lemmatization and ngram: 1, 1...
Results for CountVectorizer:
              precision    recall  f1-score   support

  CONSPIRACY       0.78      0.77      0.78       276
    CRITICAL       0.88      0.89      0.88       524

    accuracy                           0.85       800
   macro avg       0.83      0.83      0.83       800
weighted avg       0.85      0.85      0.85       800

Evaluating TfidfVectorizer with preprocessing function preprocess and no Lemmatization and ngram: 1, 1...
Results for TfidfVectorizer:
              precision    recall  f1-score   support

  CONSPIRACY       0.75      0.86      0.80       276
    CRITICAL       0.92      0.85      0.88       524

    accuracy                           0.85       800
   macro avg       0.83      0.85      0.84       800
weighted avg       0.86      0.85      0.85       800

Evaluating CountVectorizer with preprocessing function preprocess and no Lem

In [81]:
## add feature uppercase percentage
def calculate_uppercase_percentage(text):
    uppercase_count = 0
    total_letters = 0
    
    for char in text:
        if char.isalpha():  # Check if the character is a letter
            total_letters += 1
            if char.isupper():  # Check if the letter is uppercase
                uppercase_count += 1
    
    if total_letters == 0:
        return 0
    uppercase_percentage = (uppercase_count / total_letters) * 100
    
    return uppercase_percentage

def classify_uppercase_percentage(percentage):
    if percentage < 6:
        return "low"
    elif 6 <= percentage <= 12:
        return "neutral"
    else:
        return "high"

# Update the data with the new key-value pair
updated_data = []
for comment in data:
    uppercase_percentage = calculate_uppercase_percentage(comment["text"])
    uppercase_amount = classify_uppercase_percentage(uppercase_percentage)
    updated_comment = comment.copy()
    updated_comment["uppercase_amount"] = uppercase_amount
    updated_data.append(updated_comment)

# Save the updated data to a new file
with open('Oppositional_thinking_analysis_dataset.json', 'w') as f:
    json.dump(updated_data, f, indent=4)


In [117]:
# add custom feature: comment length
def classify_comment_lenght(length):
    if length < 190:
        return "short"
    elif 190 <= length <= 560:
        return "average"
    else:
        return "long"

# Update the data with the new key-value pair
updated_data = []
for comment in data:
    uppercase_amount = classify_comment_lenght(len(comment["text"]))
    updated_comment = comment.copy()
    updated_comment["comment_length"] = uppercase_amount
    updated_data.append(updated_comment)

# Save the updated data to a new file
with open('Oppositional_thinking_analysis_dataset.json', 'w') as f:
    json.dump(updated_data, f, indent=4)


In [73]:

class TextPreprocessor(BaseEstimator, TransformerMixin):
    def __init__(self, custom_preprocess_function = preprocess,  use_lemmatization=True, use_stem=False):
        self.custom_preprocess_function = custom_preprocess
        self.use_lemmatization = use_lemmatization
        self.use_stem = use_stem

    def fit(self, X, y=None):
        return self

    def transform(self, X, y=None):
        X_train_processed = X.copy()
        X_train_processed['text'] = X_train_processed['text'].apply(
            lambda x: self.custom_preprocess_function(x, lem_tag=self.use_lemmatization, stem_tag=self.use_stem))
        return X_train_processed


class CombinedFeatures(BaseEstimator, TransformerMixin):
    def __init__(self, text_transformer, ngram_range = (1,1),  additional_features = []):
        self.text_transformer = text_transformer
        self.additional_features = additional_features
        self.encoder = OneHotEncoder()
        self.vectorizer = CountVectorizer(ngram_range=ngram_range)


    def fit(self, X, y=None):
        # Fit the individual transformers
        self.text_transformer.fit(X['text'])
        if self.additional_features:
            self.encoder.fit(X[self.additional_features]) 

        return self

    def transform(self, X):
      
        text_features = self.text_transformer.transform(X['text'])
     
        if self.additional_features:
            additional_features = self.encoder.transform(X[self.additional_features]).toarray()
            text_features = np.hstack((text_features.toarray(), additional_features))
       
        return text_features

# Pre-processing pipelines

custom_preprocess = preprocess


def create_pipelines(i,j,custom_preprocess, custom_feautures : list ):
    pipelines = {
        f'CountVectorizer with preprocessing function {custom_preprocess.__name__} and no Lemmatization and ngram: {i}, {j} with custom features: {custom_feautures}': Pipeline([
            ('preprocessor',  TextPreprocessor(custom_preprocess_function=custom_preprocess, use_lemmatization=False, use_stem= False)),
            ('features', CombinedFeatures(CountVectorizer(ngram_range=(i, j)), custom_feautures)),
            ('classifier', MultinomialNB())
        ]),
        f'CountVectorizer with preprocessing function {custom_preprocess.__name__} and no Lemmatization and ngram: {i}, {j} with custom features: {custom_feautures}': Pipeline([
            ('preprocessor', TextPreprocessor(custom_preprocess_function=custom_preprocess, use_lemmatization=False, use_stem= False)),
            ('features', CombinedFeatures(TfidfVectorizer(ngram_range=(i, j)),custom_feautures)),
            ('classifier', MultinomialNB())
        ]),
        f'CountVectorizer with preprocessing function {custom_preprocess.__name__} and no Lemmatization and ngram: {i}, {j} with custom features: {custom_feautures}': Pipeline([
            ('preprocessor',  TextPreprocessor(custom_preprocess_function=custom_preprocess, use_lemmatization=True, use_stem= True)),
            ('features', CombinedFeatures(CountVectorizer(ngram_range=(i, j)), custom_feautures)),
            ('classifier', MultinomialNB())
        ]),
        f'CountVectorizer with preprocessing function {custom_preprocess.__name__} and no Lemmatization and ngram: {i}, {j} with custom features: {custom_feautures}': Pipeline([
            ('preprocessor', TextPreprocessor(custom_preprocess_function=custom_preprocess, use_lemmatization=True, use_stem= True)),
            ('features', CombinedFeatures(TfidfVectorizer(ngram_range=(i, j)), custom_feautures)),
            ('classifier', MultinomialNB())
        ])
        
    }   
    return pipelines



# Function to train and evaluate a Naïve Bayes model
def train_and_evaluate(train_df, test_df, pipeline):
    X_train = train_df[['text', 'uppercase_amount', 'comment_length']]
    y_train = train_df['category']
    X_test = test_df[['text']]
    y_test = test_df['category']
    
    pipeline.fit(X_train, y_train)
    y_pred = pipeline.predict(X_test)
    
    print(f"Results for {pipeline.named_steps['features'].text_transformer.__class__.__name__}:")
    print(classification_report(y_test, y_pred))

# Train and evaluate models with different pipelines
for feature_list in [['uppercase_amount', 'comment_length'], ['comment_length'], ['uppercase_amount']]:
    for preprocess_function in [preprocess, preprocess_easy]:
        for i in range(1,3):
            for j in range(1,3): 
                if i<=j
                pipelines = create_pipelines(i,j, preprocess_function, feature_list)
                for name, pipeline in pipelines.items():
                    train_df_balanced = balance_data(data)[0]
                    test_df = balance_data(data)[1]
                    print(f"Evaluating {name}...")
                    train_and_evaluate(train_df_balanced, test_df, pipeline)

Evaluating CountVectorizer with preprocessing function preprocess and no Lemmatization and ngram: 1, 1 with custom features: ['uppercase_amount', 'comment_length']...
Results for TfidfVectorizer:
              precision    recall  f1-score   support

  CONSPIRACY       0.73      0.83      0.78       276
    CRITICAL       0.90      0.84      0.87       524

    accuracy                           0.83       800
   macro avg       0.82      0.83      0.82       800
weighted avg       0.84      0.83      0.84       800

Evaluating CountVectorizer with preprocessing function preprocess and no Lemmatization and ngram: 1, 2 with custom features: ['uppercase_amount', 'comment_length']...
Results for TfidfVectorizer:
              precision    recall  f1-score   support

  CONSPIRACY       0.70      0.91      0.79       276
    CRITICAL       0.95      0.79      0.86       524

    accuracy                           0.83       800
   macro avg       0.82      0.85      0.83       800
weighted 

ValueError: Invalid value for ngram_range=(2, 1) lower boundary larger than the upper boundary.

In [73]:
# Assuming the preprocess and preprocess2 functions are defined as before

class TextPreprocessor(BaseEstimator, TransformerMixin):
    def fit(self, X, y=None):
        return self

    def transform(self, X, y=None):
        return [preprocess(text) for text in X]

# Combine features using ColumnTransformer
column_transformer = ColumnTransformer(
    transformers=[
        ('text_vect', TfidfVectorizer(stop_words='english'), 'text'),
        ('uppercase_ohe', OneHotEncoder(), ['uppercase_amount'])
    ],
    remainder='drop'  # This drops the columns that are not specified
)

# Define the pipeline with the ColumnTransformer
pipeline = Pipeline([
    ('features', column_transformer),
    ('classifier', MultinomialNB())
])


df = pd.DataFrame(data)
train_df, test_df = train_test_split(df, test_size=0.2, stratify=df['category'], random_state=42)

# Handle class imbalance
train_df_majority = train_df[train_df.category == 'CRITICAL']
train_df_minority = train_df[train_df.category == 'CONSPIRACY']
train_df_minority_upsampled = resample(train_df_minority, replace=True, n_samples=len(train_df_majority), random_state=42)
train_df_balanced = pd.concat([train_df_majority, train_df_minority_upsampled])

def train_and_evaluate(train_df, test_df, pipeline):
    X_train = train_df[['text', 'uppercase_amount']]
    y_train = train_df['category']
    X_test = test_df[['text', 'uppercase_amount']]
    y_test = test_df['category']
    
    pipeline.fit(X_train, y_train)
    y_pred = pipeline.predict(X_test)
    
    print("Results:")
    print(classification_report(y_test, y_pred))

# Evaluate the model
train_and_evaluate(train_df_balanced, test_df, pipeline)

Results:
              precision    recall  f1-score   support

  CONSPIRACY       0.77      0.83      0.80       276
    CRITICAL       0.91      0.87      0.89       524

    accuracy                           0.85       800
   macro avg       0.84      0.85      0.84       800
weighted avg       0.86      0.85      0.86       800



[nltk_data] Downloading package stopwords to
[nltk_data]     /Users/erikrubinov/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
