## Import libs

In [None]:
import json
import nltk
import os
import pandas as pd
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer
from sklearn.metrics import f1_score
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import gensim.downloader as api
from sklearn.metrics import accuracy_score
import gc
from insights import load_data, extract_data, get_insights, plot_histogram, plot_bar_chart
from preprocessing import preprocessed_with_stopwords_with_lemming, preprocessed_without_stopwords_with_lemming, preprocessed_with_stopwords_without_lemming, preprocessed_without_stopwords_without_lemming
from classification import train_and_evaluate_nb, train_and_evaluate_ffnn, load_json_data, prepare_train_test_split, move_samples_between_sets
# pip install scipy==1.12 for word2vec

## Task 1: Extract insights from data

In [None]:
# Load and extract data
data = load_data('data/Oppositional_thinking_analysis_dataset.json')

# Extract text and category from the data.
data_extracted = extract_data(data)

# Convert to DataFrame
df = pd.DataFrame(data_extracted)
print(df.head())

# Adding new features
df = get_insights(df)

# Plot distributions and bar charts. (df, category, title, xlabel, ylabel):
plot_histogram(df, 'text_length', 'Text Length Distribution by Category', 'Number of Words', 'Frequency')
plot_bar_chart(df, 'unique_word_count', 'Average Number of Unique Words by Category', 'Average Unique Words')
plot_bar_chart(df, 'uppercase_word_count', 'Average Number of Uppercase Words by Category', 'Average Uppercase Words')
plot_bar_chart(df, 'exclamation_count', 'Average Number of Exclamation Marks by Category', 'Average Exclamation Marks')

# Plot feature-based bar charts
plot_bar_chart(df, 'contains_link', 'Proportion of Texts Containing Links by Category', 'Proportion Containing Links')
plot_bar_chart(df, 'contains_parentheses', 'Proportion of Texts Containing Parentheses by Category', 'Proportion Containing Parentheses')
plot_bar_chart(df, 'contains_quotation_marks', 'Proportion of Texts Containing Quotation Marks by Category', 'Proportion Containing Quotation Marks')
plot_bar_chart(df, 'contains_5G', 'Proportion of Texts Containing "5G" or "5 G" by Category', 'Proportion Containing "5G" or "5 G"')
plot_bar_chart(df, 'contains_bill_gates', 'Proportion of Texts Containing "Bill Gates" by Category', 'Proportion Containing "Bill Gates"')

## Task 2: Pre-processing

### Load the input text file and extract texts and labels

In [None]:
# Download necessary NLTK data
nltk.download('stopwords')
nltk.download('punkt')

with open('data/Oppositional_thinking_analysis_dataset.json', 'r', encoding='utf-8') as file:
    data = json.load(file)

# Extract text and category
data_extracted = [{'text': entry['text'], 'category': entry['category']} for entry in data]

category_mapping = {
    'CRITICAL': 0,     
    'CONSPIRACY': 1
}

### Preprocess the input texts in different ways and save each example in a json file

In [None]:
# 3 min
def preprocess_and_save(data, preprocess_func, file_name):
    df = pd.DataFrame(data)
    df['preprocessed_text'] = df['text'].apply(preprocess_func)
    df = df.drop('text', axis=1)
    df['category'] = df['category'].apply(lambda x: category_mapping[x])
    df.to_json(file_name, orient='records', lines=True, force_ascii=False)
    print(df.head())
    return df

# List of preprocessing configurations
preprocessing_configs = [
    ('preprocessed_with_stopwords_with_lemming', preprocessed_with_stopwords_with_lemming, 'data/preprocessed_with_stopwords_with_lemming.json'),
    ('preprocessed_without_stopwords_with_lemming', preprocessed_without_stopwords_with_lemming, 'data/preprocessed_without_stopwords_with_lemming.json'),
    ('preprocessed_with_stopwords_without_lemming', preprocessed_with_stopwords_without_lemming, 'data/preprocessed_with_stopwords_without_lemming.json'),
    ('preprocessed_without_stopwords_without_lemming', preprocessed_without_stopwords_without_lemming, 'data/preprocessed_without_stopwords_without_lemming.json')
]

# Dictionary to store preprocessed DataFrames
preprocessed_data = {}

# Apply preprocessing and save results
for name, func, file_name in preprocessing_configs:
    preprocessed_data[name] = preprocess_and_save(data_extracted, func, file_name)

## Task 3: Vectorization and Text classification

### Load data with different pre-processing steps

In [None]:
# Load the preprocessed JSON data (from Task 2)
data_files = [
    'data/preprocessed_with_stopwords_with_lemming.json',
    'data/preprocessed_without_stopwords_with_lemming.json',
    'data/preprocessed_with_stopwords_without_lemming.json',
    'data/preprocessed_without_stopwords_without_lemming.json'
]

### Define different vectorization Models

In [None]:
# Vectorizers to compare
vectorizers = [
    (TfidfVectorizer(), 'TfidfVectorizer'),
    (CountVectorizer(ngram_range=(1, 1)), 'CountVectorizer1'), # Unigrams
    (CountVectorizer(ngram_range=(2, 2)), 'CountVectorizer2'), # Bigrams
    (CountVectorizer(ngram_range=(3, 3)), 'CountVectorizer3'),  # Trigrams
    (CountVectorizer(ngram_range=(4, 4)), 'CountVectorizer4')  # Fourgram
]

### Naïve Bayes Model with different vectorizers and pre-processing steps

#### Train Naïve Bayes Model

In [None]:
# Initialize dictionaries to store results and misclassifications
results = {}
misclassifications = {}
nb_res_path = "data/nb_results"
os.makedirs(nb_res_path, exist_ok=True)

# Iterate over each data file and each vectorizer
for data_file in data_files:
    # Load data
    data = load_json_data(data_file)
    
    # Prepare train and test sets
    X_train, X_test, y_train, y_test = prepare_train_test_split(data, "nb")
    
    print(f"\nData file: {data_file}")
    print("Original train set length:", len(X_train))
    print("Original test set length:", len(X_test))
    
    # Move samples of the 'CRITICAL' category (labeled as 0)
    X_train, y_train, X_test, y_test = move_samples_between_sets(X_train, y_train, X_test, y_test)
    
    print("\nAfter undersampling:")
    print("Train set length:", len(X_train))
    print("Test set length:", len(X_test))
    print("Train and test sets loaded for preprocessed data file. Training with this preprocessed file:\n")
    print(20*"-")
    # Dictionary to store classification reports and misclassification info
    results[data_file] = {}
    misclassifications[data_file] = {}
    
    # Iterate over each vectorizer for this data file
    for vectorizer, vectorizer_name in vectorizers:
        report, misclassified_data_nb, misclassified_count, total = train_and_evaluate_nb(vectorizer, X_train['preprocessed_text'], X_test['preprocessed_text'], y_train, y_test, data_file, vectorizer_name)
        results[data_file][vectorizer_name] = report
        misclassifications[data_file][vectorizer_name] = (misclassified_count, total)
        # Save misclassified examples to JSON file
        with open(f"{nb_res_path}/{vectorizer_name}_misclassified.json", 'w') as f:
            json.dump(misclassified_data_nb, f, indent=4)
    print(150*"*")


#### Visualize Statistics for Bayes

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

# Convert results to a DataFrame for visualization
def results_to_dataframe(results):
    records = []
    for data_file, vectorizer_reports in results.items():
        for vectorizer_name, report in vectorizer_reports.items():
            for metric in ['precision', 'recall', 'f1-score']:
                for label, scores in report.items():
                    if isinstance(scores, dict):
                        records.append({
                            'Data File': data_file,
                            'Vectorizer': vectorizer_name,
                            'Label': label,
                            'Metric': metric,
                            'Score': scores[metric]
                        })
    return pd.DataFrame(records)

# Create DataFrame from results
df_results = results_to_dataframe(results)

# Create a DataFrame for misclassifications
misclass_records = []
for data_file, vectorizer_data in misclassifications.items():
    for vectorizer_name, (misclassified, total) in vectorizer_data.items():
        misclass_records.append({
            'Data File': data_file,
            'Vectorizer': vectorizer_name,
            'Misclassified': misclassified,
            'Total': total
        })
df_misclassifications = pd.DataFrame(misclass_records)

# Plotting the scores
plt.figure(figsize=(14, 8))
score_plot = sns.barplot(x='Vectorizer', y='Score', hue='Data File', data=df_results, errorbar=None)

# Annotate bars with score values
for p in score_plot.patches:
    score_plot.annotate(format(p.get_height(), '.2f'), 
                        (p.get_x() + p.get_width() / 2., p.get_height()), 
                        ha='center', va='center', 
                        xytext=(0, 9), 
                        textcoords='offset points')

plt.title('Scores Comparison per Vectorizer and Data File')
plt.xlabel('Vectorizer')
plt.ylabel('Score')
plt.legend(title='Data File', bbox_to_anchor=(1.05, 1), loc='upper left')

plt.show()

# Plotting the misclassifications (unchanged from previous code)
plt.figure(figsize=(14, 8))
misclass_plot = sns.barplot(x='Vectorizer', y='Misclassified', hue='Data File', data=df_misclassifications, errorbar=None)

# Annotate bars with misclassification values
for p in misclass_plot.patches:
    misclass_plot.annotate(format(p.get_height(), '.0f'), 
                           (p.get_x() + p.get_width() / 2., p.get_height()), 
                           ha='center', va='center', 
                           xytext=(0, 9), 
                           textcoords='offset points')

plt.title('Number of Misclassifications per Vectorizer and Data File')
plt.xlabel('Vectorizer')
plt.ylabel('Number of Misclassifications')
plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
plt.show()

# Additional visualization example: Heatmap (unchanged from previous code)
pivot_table = df_results.pivot_table(values='Score', index=['Data File', 'Vectorizer'], columns='Metric')
sns.heatmap(pivot_table, annot=True, cmap='coolwarm', cbar=True)
plt.title('Heatmap of Vectorizer Performance')
plt.show()

In [None]:
# Saved files with misscalissified texts

### FFNN Models

##### Define FFNN with first 4 vectorizers.

In [None]:
# Hyperparameters
#input_size = 0  # vectorizers output
hidden_size1 = 128
hidden_size2 = 64
output_size = 1
dropout_rate = 0.4
learning_rate = 0.001
batch_size = 32
epochs = 50

# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# PyTorch Dataset class
class TextDataset(Dataset):
    def __init__(self, X, y, vectorizer):
        self.X = X
        self.y = y
        self.vectorizer = vectorizer
    
    def __len__(self):
        return len(self.X)
    
    def __getitem__(self, idx):
        text = self.X.iloc[idx]
        label = self.y.iloc[idx]
        vectorized_text = self.vectorizer.transform([text]).toarray().squeeze()
        return torch.FloatTensor(vectorized_text), torch.tensor(label, dtype=torch.float32)
    
# FFNN Model class
class FFNN(nn.Module):
    def __init__(self, input_size, hidden_size1, hidden_size2, output_size, dropout_rate=0.5):
        super(FFNN, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size1)
        self.relu1 = nn.ReLU()
        self.dropout1 = nn.Dropout(dropout_rate)
        self.fc2 = nn.Linear(hidden_size1, hidden_size2)
        self.relu2 = nn.ReLU()
        self.dropout2 = nn.Dropout(dropout_rate)
        self.fc3 = nn.Linear(hidden_size2, output_size)
        self.sigmoid = nn.Sigmoid()
    
    def forward(self, x):
        x = self.fc1(x)
        x = self.relu1(x)
        x = self.dropout1(x)
        x = self.fc2(x)
        x = self.relu2(x)
        x = self.dropout2(x)
        x = self.fc3(x)
        x = self.sigmoid(x)
        return x


##### Train and evaluate FFNN Model with first 4 vectorizers.

In [None]:
torch.cuda.empty_cache()
gc.collect()

# Iterate over each data file and each vectorizer
for data_file in data_files:
    data = load_json_data(data_file)
    
    X_train, X_test, y_train, y_test = prepare_train_test_split(data, "ffnn")
    
    X_train, y_train, X_test, y_test = move_samples_between_sets(X_train, y_train, X_test, y_test)
    
    for vectorizer, vectorizer_name in vectorizers:
        X_train_vec = vectorizer.fit_transform(X_train).toarray()
        X_test_vec = vectorizer.transform(X_test).toarray()
        
        # Create datasets and dataloaders
        train_dataset = TextDataset(X_train, y_train, vectorizer)
        test_dataset = TextDataset(X_test, y_test, vectorizer)
        
        train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
        test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
        
        print(f"\nData file: {data_file} | Vectorizer: {vectorizer_name}")
        
        # Initialize model, criterion, and optimizer
        model = FFNN(input_size=X_train_vec.shape[1],
                     hidden_size1=hidden_size1,
                     hidden_size2=hidden_size2,
                     output_size=output_size,
                     dropout_rate=dropout_rate)
        
        model.to(device)
        
        criterion = nn.BCELoss()
        optimizer = optim.Adam(model.parameters(), lr=learning_rate)
        
        # Train and evaluate FFNN model
        train_and_evaluate_ffnn(model, train_loader, test_loader, criterion, optimizer, device)

##### Define FFNN Model for word2vec.

In [None]:
# Example usage:
input_size = 300
hidden_size1 = 128
hidden_size2 = 64
output_size = 1  # Single output for binary classification
dropout_rate = 0.5

class TextDataset(Dataset):
    def __init__(self, texts, labels):
        self.texts = texts
        self.labels = labels

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        text = self.texts[idx]
        label = self.labels[idx]
        return torch.tensor(text, dtype=torch.float32), torch.tensor(label, dtype=torch.float32)

class FFNN(nn.Module):
    def __init__(self, input_size, hidden_size1, hidden_size2, output_size, dropout_rate=0.5):
        super(FFNN, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size1)
        self.relu1 = nn.ReLU()
        self.dropout1 = nn.Dropout(dropout_rate)
        self.fc2 = nn.Linear(hidden_size1, hidden_size2)
        self.relu2 = nn.ReLU()
        self.dropout2 = nn.Dropout(dropout_rate)
        self.fc3 = nn.Linear(hidden_size2, output_size)
    
    def forward(self, x):
        x = self.fc1(x)
        x = self.relu1(x)
        x = self.dropout1(x)
        x = self.fc2(x)
        x = self.relu2(x)
        x = self.dropout2(x)
        x = self.fc3(x)
        return x  # No sigmoid here, use BCEWithLogitsLoss for the loss function

In [None]:
# Define training function
def train(model, criterion, optimizer, train_loader, epochs=10):
    model.to(device).train()
    for epoch in range(epochs):
        running_loss = 0.0
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device).unsqueeze(1)  # Ensure labels have correct shape
            
            optimizer.zero_grad()
            
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            
            running_loss += loss.item()
        print(f'Epoch {epoch+1}/{epochs}, Loss: {running_loss / len(train_loader)}')

# Define evaluation function
def evaluate(model, test_loader):
    model.to(device).train()
    all_predictions = []
    all_labels = []
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            predicted = torch.round(torch.sigmoid(outputs))  # Apply sigmoid to outputs
            all_predictions.extend(predicted.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    accuracy = accuracy_score(all_labels, all_predictions)
    f1 = f1_score(all_labels, all_predictions)
    misclassified = len(all_labels) - accuracy_score(all_labels, all_predictions, normalize=False)
    print(f'Accuracy: {accuracy}, F1 Score: {f1}, Misclassified Samples: {misclassified}')

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)
model = FFNN(input_size, hidden_size1, hidden_size2, output_size, dropout_rate)
print(model)
# Device configuration

##### Data Preparation Functions for word2vec

In [None]:
# Load Word2Vec model (you can replace 'word2vec-google-news-300' with your preferred model)
word2vec_model = api.load('word2vec-google-news-300')

In [None]:
# Function to convert text to Word2Vec embeddings
def text_to_w2v_vector(text):
    words = text.split()
    vectors = [word2vec_model.get_vector(word) for word in words if word in word2vec_model.key_to_index]
    if vectors:
        return torch.mean(torch.stack([torch.tensor(v) for v in vectors]), dim=0)
    else:
        return torch.zeros(300)  # Return zero vector if no words have embeddings

##### Training and Evaluating FFNN with word2vec as vectorizer

In [None]:
batch_size = 32
epochs = 20

print("word2vec")
for data_file in data_files:
    print(100*"*")
    print(data_file)
    data = load_json_data(data_file)
    X_train, X_test, y_train, y_test = prepare_train_test_split(data, "m")
    X_train, y_train, X_test, y_test = move_samples_between_sets(X_train, y_train, X_test, y_test)

    # Convert train and test data to Word2Vec embeddings
    X_train_w2v = torch.stack([text_to_w2v_vector(text) for text in X_train])
    X_test_w2v = torch.stack([text_to_w2v_vector(text) for text in X_test])

    # Convert labels to tensors
    y_train_tensor = torch.tensor(y_train.values)
    y_test_tensor = torch.tensor(y_test.values)
    # Create datasets
    train_dataset = TextDataset(X_train_w2v, y_train_tensor)
    test_dataset = TextDataset(X_test_w2v, y_test_tensor)

    # Create data loaders
    train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)
    # Initialize loss function and optimizer
    criterion = nn.BCEWithLogitsLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    # Train the model
    train(model, criterion, optimizer, train_loader, epochs)

    # Evaluate the model
    evaluate(model, test_loader)


## Task 4: Textual similarity

### Bonus Task: Textual similarity