In [None]:
#basic imports
import pandas as pd
import nltk
import matplotlib.pyplot as plt
import numpy as np
import string

#machine learning imports
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
from sklearn.feature_extraction.text import TfidfVectorizer
from imblearn.over_sampling import SMOTE
from imblearn.over_sampling import RandomOverSampler
from sklearn.model_selection import train_test_split, cross_val_score, KFold, StratifiedKFold
from sklearn.naive_bayes import MultinomialNB, BernoulliNB, ComplementNB
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.svm import LinearSVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import accuracy_score, classification_report, f1_score

#neural network imports
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset, random_split
from sklearn.preprocessing import LabelEncoder



#nltk stuff, uncomment if needed
#nltk.download('punkt')
#nltk.download('stopwords')
#nltk.download('wordnet')

In [None]:
df = pd.read_excel("data/finalannotation.xlsx")
df_wbs = pd.read_excel("data/finalannotation.xlsx")

In [None]:
df.columns

In [None]:
df = df.drop(columns = ["new class type", "new ethical issues", "Unnamed: 17"])
df_wbs = df_wbs.drop(columns = ["new class type", "new ethical issues", "Unnamed: 17"])

In [None]:
df.columns

In [None]:
wblist = ["0. for whistleblowing", "1. against whistleblowing", "2. neutral"]

df_wbs["stance comment"] = df_wbs["stance comment"].str.replace("0. for whistleblowing", "related to whistleblowing")
df_wbs["stance comment"] = df_wbs["stance comment"].str.replace("1. against whistleblowing", "related to whistleblowing")
df_wbs["stance comment"] = df_wbs["stance comment"].str.replace("2. neutral", "related to whistleblowing")
df_wbs["stance comment"] = df_wbs["stance comment"].str.replace("3. unrelated to whistleblowing", "unrelated to whistleblowing")


df_wbs["stance comment"].value_counts()

In [None]:
def preprocess_text(text):
    '''This function preprocesses the text to prepare it for text mining tasks.
    The text is first lowercased and tokenized, then stopwords are removed, and the text is lemmatized.
    '''
    #tokenize and lowercase the text + define the stopwords and instantiate lemmatizer
    tokens = word_tokenize(text.lower())
    stop_words = set(stopwords.words('english'))
    lemmatizer = WordNetLemmatizer()
    
   
    tokens = [token for token in tokens if token.isalpha() and token not in stop_words]
    tokens = [lemmatizer.lemmatize(token) for token in tokens]

    #create a single string from the tokens
    cleaned_text = ' '.join(tokens)
    
    return cleaned_text

In [None]:
df["cleaned_body"] = df["Body"].apply(preprocess_text)
df_wbs["cleaned_body"] = df_wbs["Body"].apply(preprocess_text)

In [None]:
df.sample(1)

In [None]:
df_related = df[df["stance comment"] != "3. unrelated to whistleblowing"]
df_related_binary = df_related[df_related["stance comment"] != "2. neutral"]

In [None]:
import pandas as pd
from sklearn.feature_extraction.text import TfidfVectorizer

#had some issues with extracting data from dataframe so turned it to a list and also double checked with some prints
text_data = df_related['cleaned_body'].tolist()
print(f'Number of documents: {len(text_data)}')
print(f'Sample documents: {text_data[:5]}')
print("\n")

#setting up vectorizer with parameter tuning
vectorizer = TfidfVectorizer(
    max_df=0.95,  #setting max document frequency
    min_df=2,  #setting min document frequency
    max_features=5000, 
    ngram_range=(1, 2),
)

X = vectorizer.fit_transform(text_data)


print(f'Shape of TF-IDF matrix: {X.shape}')



In [None]:
#summing the terms over all documents, looping over them, adding them to a sorted list
term_sums = X.sum(axis=0)
term_freq = []

for term, idx in vectorizer.vocabulary_.items():
    term_freq.append((term, term_sums[0, idx]))

term_freq = sorted(term_freq, key=lambda x: x[1], reverse=True)

print("Amount of unique terms: " + str(len(term_freq)))

#plotting term frequencies to investigate the performance of the vectorizer
frequencies = [freq for term, freq in term_freq]
plt.hist(frequencies, bins=50)
plt.xlabel('TF-IDF Score')
plt.ylabel('Number of Terms')
plt.title('Distribution of Term Frequencies')
plt.show()


In [None]:
def classification(model, sampling, data, dataframe):
    
    if model == "Multinomial":
        model = MultinomialNB()
    elif model == "Bernoulli":
        model = BernoulliNB()
    elif model == "Complement":
        model = ComplementNB()
    elif model == "LogReg":
        model = LogisticRegression()
    elif model == "SVC":
        model = SVC()
    elif model == "LinearSVC":
        model = LinearSVC()
    elif model == "KNN":
        model = KNeighborsClassifier()
    elif model == "RandomForest":
        model = RandomForestClassifier()
    elif model == "DecTree":
        model = DecisionTreeClassifier()
    
    if sampling == "SMOTE":
        sampling = SMOTE(random_state=42)
    elif sampling == "OverSampler":
        sampling = RandomOverSampler(random_state=42)
        

    #first split data into train+temporary, then train is split into train+validation.
    df_train, df_temp = train_test_split(data, test_size=0.5, random_state=42)  
    df_test, df_val = train_test_split(df_temp, test_size=0.5, random_state=42)


    text_data = df_train['cleaned_body'].tolist()
    
    vectorizer = TfidfVectorizer(
        max_df=0.95,
        min_df=2,
        max_features=5000,
        ngram_range=(1, 2),
        )

    X_train = vectorizer.fit_transform(text_data)
    y_train = df_train["stance comment"]
    
    if sampling != "no_sampling":

        X_train_resampled, y_train_resampled = sampling.fit_resample(X_train, y_train)
        
        kf = StratifiedKFold(n_splits=10, shuffle=True, random_state=42)
        cross_val_scores = cross_val_score(model, X_train_resampled, y_train_resampled, cv=kf, scoring='f1_weighted')
        
        model.fit(X_train_resampled, y_train_resampled)
        
    else:
        
        kf = StratifiedKFold(n_splits=10, shuffle=True, random_state=42)
        cross_val_scores = cross_val_score(model, X_train, y_train, cv=kf, scoring='f1_weighted')
        
        model.fit(X_train, y_train)    
        
    X_test = vectorizer.transform(df_test['cleaned_body'].tolist())
    y_test = df_test["stance comment"]

    X_val = vectorizer.transform(df_val['cleaned_body'].tolist())
    y_val = df_val["stance comment"]
    


    print(f'Mean Cross-Validation F1: {np.mean(cross_val_scores):.4f}')
    print(f'Standard Deviation of Cross-Validation F1: {np.std(cross_val_scores):.4f}')
    
    
    y_pred = model.predict(X_test)
    
    #add the unseen data script to this function comment if you want to do something else
    unseen_predictions = model.predict(X_val)
    
    #print accuracy and classification report
    print("Test score: ")
    print(f"{str(model)} Accuracy: {accuracy_score(y_test, y_pred)}")
    print(f"{str(model)} F1: {f1_score(y_test, y_pred, average='weighted')}")
    print(classification_report(y_test, y_pred))
    
    print("Validation score: ")
    print(f"{str(model)} Accuracy: {accuracy_score(y_val, unseen_predictions)}")
    print(f"{str(model)} F1: {f1_score(y_val, unseen_predictions, average='weighted')}")
    print(classification_report(y_val, unseen_predictions))


    #store the results in the dataframe
    df_models_combine = {
        'Model name': model,
        'Sampling technique': sampling,
        'Test accuracy': round(accuracy_score(y_test, y_pred), 4),
        'Test F1': round(f1_score(y_test, y_pred, average='weighted'), 4),
        'Validation accuracy': round(accuracy_score(y_val, unseen_predictions), 4),
        'Validation F1': round(f1_score(y_val, unseen_predictions, average='weighted'), 4)
    }

    dataframe = dataframe.append(df_models_combine, ignore_index=True)
    
    return dataframe

    

In [None]:
outputdf = pd.DataFrame()

outputresult = classification("Complement", "no_sampling", df_related_binary, outputdf)

In [None]:
model_list = ["Multinomial","Bernoulli","Complement","LogReg","SVC","LinearSVC","KNN","RandomForest","DecTree"]

result_df = pd.DataFrame()

for i in model_list:
    result_df = classification(i, "no_sampling", df_related, result_df)

result_df

In [None]:
result_df.to_excel("results/classificationscores3classes.xlsx")

In [None]:
df.sample(1)

# NEURAL NETWORK STUFF

In [None]:
df_related_binary["stance comment"].value_counts()

In [None]:
df_wbs["stance comment"].value_counts()

In [None]:
df_related["stance comment"].value_counts()

In [None]:
nn_data = df_related #change if needed

df_train, df_temp = train_test_split(nn_data, test_size=0.5, random_state=42)  
df_test, df_val = train_test_split(df_temp, test_size=0.5, random_state=42)

#df_train, df_test = train_test_split(nn_data, test_size=0.35, random_state=42)  

y = nn_data["stance comment"]

text_data = df_train['cleaned_body'].tolist()
    
vectorizer = TfidfVectorizer(max_df=0.95, min_df=2, max_features=5000, ngram_range=(1, 2))

X_train = vectorizer.fit_transform(text_data)
y_train = df_train["stance comment"]

X_test = vectorizer.transform(df_test['cleaned_body'].tolist())
y_test = df_test["stance comment"]

X_val = vectorizer.transform(df_val['cleaned_body'].tolist())
y_val = df_val["stance comment"]

#sampling = RandomOverSampler(random_state=42)
#sampling = SMOTE(random_state=42)
sampling = "no_sampling"

#X_train_resampled, y_train_resampled = sampling.fit_resample(X_train, y_train)


label_encoder = LabelEncoder()
y_train_encoded = label_encoder.fit_transform(y_train)
#y_train_encoded = label_encoder.fit_transform(y_train_resampled)
y_test_encoded = label_encoder.transform(y_test)
y_val_encoded = label_encoder.transform(y_val)

#convert data to torch tensors
#X_train_tensor = torch.tensor(X_train_resampled.toarray(), dtype=torch.float32)
X_train_tensor = torch.tensor(X_train.toarray(), dtype=torch.float32)
y_train_tensor = torch.tensor(y_train_encoded, dtype=torch.long)
X_test_tensor = torch.tensor(X_test.toarray(), dtype=torch.float32)
y_test_tensor = torch.tensor(y_test_encoded, dtype=torch.long)
X_val_tensor = torch.tensor(X_val.toarray(), dtype=torch.float32)
y_val_tensor = torch.tensor(y_val_encoded, dtype=torch.long)

train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)
val_dataset = TensorDataset(X_val_tensor, y_val_tensor)

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False)
val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False)

#mapping labels for easy understanding
label_mapping = dict(zip(label_encoder.classes_, label_encoder.transform(label_encoder.classes_)))

In [None]:
len(y_train)

In [None]:
label_mapping

In [None]:
class SimpleNN(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(SimpleNN, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        out = self.fc1(x)
        out = self.relu(out)
        out = self.fc2(out)
        return out

In [None]:
#USE THIS ONE FOR MULTICLASS DATA
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

input_dim = X_train.shape[1]
hidden_dim = 100
output_dim = len(y.unique())
model = SimpleNN(input_dim, hidden_dim, output_dim).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

num_epochs = 20
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for texts, labels in train_loader:
        texts, labels = texts.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(texts)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
    
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}")

model.eval()
with torch.no_grad():
    correct = 0
    total = 0
    for texts, labels in test_loader:
        texts, labels = texts.to(device), labels.to(device)
        outputs = model(texts)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    y_pred = []
    y_true = []

    for texts, labels in test_loader:
        texts, labels = texts.to(device), labels.to(device)
        outputs = model(texts)
        _, predicted = torch.max(outputs.data, 1)
        y_pred.extend(predicted.cpu().numpy())
        y_true.extend(labels.cpu().numpy())
    
    test_f1_score = f1_score(y_true, y_pred, average='weighted')
    print(f'Test Set Accuracy: {100 * correct / total:.2f}%')
    print(f'Test Set F1 Score (weighted): {test_f1_score:.2f}')
    print(classification_report(y_true, y_pred))    
    
    
model.eval()
with torch.no_grad():
    correct = 0
    total = 0
    for texts, labels in val_loader:
        texts, labels = texts.to(device), labels.to(device)
        outputs = model(texts)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    y_pred_val = []
    y_true_val = []

    for texts, labels in test_loader:
        texts, labels = texts.to(device), labels.to(device)
        outputs = model(texts)
        _, predicted = torch.max(outputs.data, 1)
        y_pred_val.extend(predicted.cpu().numpy())
        y_true_val.extend(labels.cpu().numpy())

    print("\n")
    print(str(sampling))
    print("\n")
    
    val_f1_score = f1_score(y_true_val, y_pred_val, average='weighted')
    print(f'Validation Set Accuracy: {100 * correct / total:.2f}%')
    print(f'Validation Set F1 Score (weighted): {val_f1_score:.2f}')
    print(classification_report(y_true, y_pred))


In [None]:
#USE THIS ONE FOR MULTICLASS DATA WITH CLASS WEIGHT
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

input_dim = X_train.shape[1]
hidden_dim = 100
output_dim = len(y.unique())
model = SimpleNN(input_dim, hidden_dim, output_dim).to(device)

#loss function with class weights
class_counts = y.value_counts().sort_index().tolist() 
class_weights = [sum(class_counts) / count for count in class_counts]
class_weights = torch.tensor(class_weights, dtype=torch.float).to(device)
criterion = nn.CrossEntropyLoss(weight=class_weights)


optimizer = optim.Adam(model.parameters(), lr=0.001)


num_epochs = 25
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for texts, labels in train_loader:
        texts, labels = texts.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(texts)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
    
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}")

model.eval()
with torch.no_grad():
    correct = 0
    total = 0
    for texts, labels in test_loader:
        texts, labels = texts.to(device), labels.to(device)
        outputs = model(texts)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    y_pred = []
    y_true = []

    for texts, labels in test_loader:
        texts, labels = texts.to(device), labels.to(device)
        outputs = model(texts)
        _, predicted = torch.max(outputs.data, 1)
        y_pred.extend(predicted.cpu().numpy())
        y_true.extend(labels.cpu().numpy())
    
    test_f1_score = f1_score(y_true, y_pred, average='weighted')
    print(f'Test Set Accuracy: {100 * correct / total:.2f}%')
    print(f'Test Set F1 Score (weighted): {test_f1_score:.2f}')
    print(classification_report(y_true, y_pred))    
    
    
model.eval()
with torch.no_grad():
    correct = 0
    total = 0
    for texts, labels in val_loader:
        texts, labels = texts.to(device), labels.to(device)
        outputs = model(texts)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    y_pred_val = []
    y_true_val = []

    for texts, labels in test_loader:
        texts, labels = texts.to(device), labels.to(device)
        outputs = model(texts)
        _, predicted = torch.max(outputs.data, 1)
        y_pred_val.extend(predicted.cpu().numpy())
        y_true_val.extend(labels.cpu().numpy())
        
    print("\n")
    print(str(sampling))
    print("\n")

    val_f1_score = f1_score(y_true_val, y_pred_val, average='weighted')
    print(f'Validation Set Accuracy: {100 * correct / total:.2f}%')
    print(f'Validation Set F1 Score (weighted): {val_f1_score:.2f}')
    print(classification_report(y_true, y_pred))


In [None]:
#BINARY CLASSIFICATION WITH CLASS WEIGHTS

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

input_dim = X_train.shape[1]
hidden_dim = 100
output_dim = 1  
model = SimpleNN(input_dim, hidden_dim, output_dim).to(device)

#trying to avoid the model being biased towards majority class by adjusting weights
class_weights = torch.tensor([len(y) / y.value_counts()[0], len(y) / y.value_counts()[1]], dtype=torch.float).to(device)
criterion = nn.BCEWithLogitsLoss(pos_weight=class_weights[1])
optimizer = optim.Adam(model.parameters(), lr=0.001)

num_epochs = 25
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for texts, labels in train_loader:
        texts, labels = texts.to(device), labels.to(device).float()  #make labels float for BCEWithLogitsLoss

        optimizer.zero_grad()
        outputs = model(texts).squeeze()  
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
    
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}")

model.eval()
with torch.no_grad():
    correct = 0
    total = 0
    y_pred = []
    y_true = []

    for texts, labels in test_loader:
        texts, labels = texts.to(device), labels.to(device).float()
        outputs = model(texts).squeeze()
        predictions = torch.round(torch.sigmoid(outputs))  #sigmoid for binary classification
        total += labels.size(0)
        correct += (predictions == labels).sum().item()
        y_pred.extend(predictions.cpu().numpy())
        y_true.extend(labels.cpu().numpy())

    print(f'Test Set Accuracy: {100 * correct / total:.2f}%')
    test_f1_score = f1_score(y_true, y_pred, average='weighted')
    print(f'Test Set F1 Score (weighted): {test_f1_score:.2f}')
    print(classification_report(y_true, y_pred))
    

with torch.no_grad():
    correct = 0
    total = 0
    y_pred = []
    y_true = []

    for texts, labels in val_loader:
        texts, labels = texts.to(device), labels.to(device).float()
        outputs = model(texts).squeeze()
        predictions = torch.round(torch.sigmoid(outputs))  #sigmoid for binary classification
        total += labels.size(0)
        correct += (predictions == labels).sum().item()
        y_pred.extend(predictions.cpu().numpy())
        y_true.extend(labels.cpu().numpy())
       
    print("\n")
    print(str(sampling))
    print("\n")

    print(f'Validation Set Accuracy: {100 * correct / total:.2f}%')
    val_f1_score = f1_score(y_true, y_pred, average='weighted')
    print(f'Validation Set F1 Score (weighted): {val_f1_score:.2f}')
    print(classification_report(y_true, y_pred))
    
    


In [None]:
#BINARY CLASSIFICATION WITHOUT CLASS WEIGHTS
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

input_dim = X_train.shape[1]
hidden_dim = 100
output_dim = 1  #binary classification
model = SimpleNN(input_dim, hidden_dim, output_dim).to(device)
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

num_epochs = 25
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for texts, labels in train_loader:
        texts, labels = texts.to(device), labels.to(device).float()  #make labels float for BCEWithLogitsLoss

        optimizer.zero_grad()
        outputs = model(texts).squeeze()
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
    
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}")
    
model.eval()
with torch.no_grad():
    correct = 0
    total = 0
    y_pred = []
    y_true = []

    for texts, labels in test_loader:
        texts, labels = texts.to(device), labels.to(device).float()
        outputs = model(texts).squeeze()
        predictions = torch.round(torch.sigmoid(outputs))  #sigmoid for binary classification
        total += labels.size(0)
        correct += (predictions == labels).sum().item()
        y_pred.extend(predictions.cpu().numpy())
        y_true.extend(labels.cpu().numpy())

    print(f'Test Set Accuracy: {100 * correct / total:.2f}%')
    test_f1_score = f1_score(y_true, y_pred, average='weighted')
    print(f'Test Set F1 Score (weighted): {test_f1_score:.2f}')
    print(classification_report(y_true, y_pred))
    

with torch.no_grad():
    correct = 0
    total = 0
    y_pred = []
    y_true = []

    for texts, labels in val_loader:
        texts, labels = texts.to(device), labels.to(device).float()
        outputs = model(texts).squeeze()
        predictions = torch.round(torch.sigmoid(outputs))  #sigmoid for binary classification
        total += labels.size(0)
        correct += (predictions == labels).sum().item()
        y_pred.extend(predictions.cpu().numpy())
        y_true.extend(labels.cpu().numpy())
       
    print("\n")
    print(str(sampling))
    print("\n")

    print(f'Validation Set Accuracy: {100 * correct / total:.2f}%')
    val_f1_score = f1_score(y_true, y_pred, average='weighted')
    print(f'Validation Set F1 Score (weighted): {val_f1_score:.2f}')
    print(classification_report(y_true, y_pred))
    