In [None]:
# INSTALL IF NEEDED:

# !pip install spacy

In [None]:
# CONNECT TO COLAB IF NEEDED:

# from google.colab import drive
# import os

# drive.mount('/content/drive')
# os.chdir('./drive/MyDrive/data/tue_lai')

# Load the data

In [None]:
import pandas as pd

data_path = 'political_leaning.csv'
data = pd.read_csv(data_path)

In [None]:
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split

TARGET_COL = 'political_leaning'
INDEPENDENT_COL = 'post'

def label_encode(df, col_name):
    label_encoder = LabelEncoder()
    df[col_name] = label_encoder.fit_transform(df[col_name])
    return df, label_encoder


df, le = label_encode(data, TARGET_COL)
X_train, X_test, y_train, y_test = train_test_split(df[INDEPENDENT_COL], df[TARGET_COL])

### Create preprocessing pipelines

In [None]:
import nltk
from nltk.probability import FreqDist
from nltk.tokenize import word_tokenize, sent_tokenize
from nltk.tag import pos_tag
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize, sent_tokenize

from sklearn.preprocessing import MinMaxScaler
from sklearn.ensemble import VotingClassifier
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression, SGDClassifier
from sklearn.pipeline import FeatureUnion, Pipeline
from sklearn.base import BaseEstimator, TransformerMixin, ClassifierMixin

import numpy as np

nltk.download("punkt")
nltk.download('stopwords')
nltk.download('averaged_perceptron_tagger')

STOP_WORDS = set(stopwords.words('english'))
CENTRALITY = TfidfVectorizer()
CORPUS_AVG_TFIDF = CENTRALITY.fit_transform(data['post'].tolist()).mean(axis=0)

In [None]:
def scale_array(array):
    scaler = MinMaxScaler()
    return scaler.fit_transform(np.array(array).reshape(-1,1))


class StylometricFeatures(BaseEstimator, TransformerMixin):
    """
    A class to extract the following stylometric features:
    1. avg sentence/token lengths,
    2. avg stopwords ratio,
    3. frequencies of POS tags.

    By defeault it doesn't use tfidf, but there is a parameter to do so.
    """

    def __init__(self, include_tfidf=False):
        self.include_tfidf = include_tfidf


    def fit(self, X, y=None):
        return self

    def tfidf_centrality(self, text):
        text_matrix = CENTRALITY.transform(text)
        text_tf = text_matrix.sum(axis=0)
        result = np.sum((text_tf - CORPUS_AVG_TFIDF))
        return result


    def stop_word_ratio(self, post):
        return len([word for word in post if word in STOP_WORDS]) / len(post)

    def avg_word_length(self, post):
        return sum(len(word) for word in post) / len(post)

    def avg_sent_length(self, post):
        return sum(len(sent) for sent in post) / len(post)

    def part_of_speech_distribution(self, text):
        words = word_tokenize(text)
        pos_tags = pos_tag(words)
        pos_freq = FreqDist(tag for word, tag in pos_tags)
        top_tags = pos_freq.most_common(20)
        total_tags = sum(pos_freq.values())
        pos_distribution = {tag: freq / total_tags for tag, freq in top_tags}
        vals = list(pos_distribution.values())
        padded = vals + [0.0] * (20 - len(vals)) # for varied length texts
        return padded

    def transform(self, X):
        # Add temporary vars
        sentences = [sent_tokenize(text) for text in X]
        words = [word_tokenize(text) for text in X]

        # Calculate stylometric features
        stylometric_features = []

        avg_sent_length = scale_array([self.avg_word_length(post) for post in sentences])
        stylometric_features.append(avg_sent_length)

        avg_word_length = scale_array([self.avg_word_length(post) for post in words])
        stylometric_features.append(avg_word_length)

        stopwords_ratio = scale_array([self.stop_word_ratio(post) for post in words])
        stylometric_features.append(stopwords_ratio)

        pos_dists = [self.part_of_speech_distribution(text) for text in X]

        if self.include_tfidf:
            tfidf_val = scale_array([self.tfidf_centrality(x) for x in words])
            stylometric_features.append(tfidf_val)

        # List of cols --> list of rows
        stylometric_features = list(map(list, zip(*stylometric_features)))
        stylometric_features = [[float(val) for val in row] for row in stylometric_features]

        feats = [
            style_vals + pos_vals
            for style_vals, pos_vals in zip(
                stylometric_features,
                pos_dists
            )
        ]

        return np.array(feats)

In [None]:
import spacy

SPACY = spacy.load("en_core_web_sm")


class PosFeatures(BaseEstimator, TransformerMixin):
    """
    A class to extract POS features: for each post returns a list of corresponding POS tags
    """
    def __init__(self, include_tfidf=False, length_percentile=95):
        self.include_tfidf = include_tfidf
        self.length_percentile = length_percentile
        self._standartization_factor = 0

    def transform(self, X, *_):
        assert (self.sentence_size is not None), "Fitting required"

        # Create the output matrix
        result = np.zeros((len(X), self.sentence_size), dtype='uint8')

        for i, x in enumerate(SPACY.pipe(X, batch_size=50)):
            # Store the POS-tags
            tags = np.fromiter((token.pos for token in x), dtype='uint8', count=len(x))

            # Pad and truncate data, if necessary, and store them in result
            last_index = len(tags) if len(tags) < self.sentence_size else self.sentence_size
            result[i, :last_index] = tags[:last_index]

        # Generate the factor one time to ensure applying the same factor at the next transformations
        if self._standartization_factor == 0:
            self._standartization_factor = np.min(result[result != 0]) - 1

        # Standartize all valid elements to count from 1
        result[result != 0] -= self._standartization_factor
        return result

    def fit(self, X, *_):
        # Define an optimal sentence size covering a specific percent of all sample
        self.sentence_size = int(np.percentile([len(t.split()) for t in X], self.length_percentile))
        return self

    def fit_transform(self, X, *_):
        self.fit(X)
        return self.transform(X)

# Create custom classifiers

In [None]:
import torch
from torch import nn
from torch import optim
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

class RNNClassifier(nn.Module):
    def __init__(self, rnn_type, hidden_size, output_size, input_size, dropout, num_layers, bidirectional):
        super().__init__()
        assert (rnn_type in ['gru', 'lstm', 'simple']), "Invalid RNN type"

        rnn_params = {
            "hidden_size": hidden_size,
            "input_size": input_size,
            "dropout": dropout,
            "num_layers": num_layers,
            "bidirectional": bidirectional
        }

        if rnn_type == 'gru':
            self.rnn = nn.GRU(**rnn_params)
        elif rnn_type == 'lstm':
            self.rnn = nn.LSTM(**rnn_params)
        else:
            self.rnn = nn.RNN(**rnn_params)

        self.rnn_type = rnn_type
        self.fc = nn.Linear(
            (1 + bidirectional) * hidden_size,
            output_size
        )
        self.softmax = nn.Softmax(dim=-1)

    def forward(self, input_seq):
        batch_size, seq_len, input_size = input_seq.shape

        if self.rnn_type == 'lstm':
            rnn_output, (last_hidden_state, last_cell_state) = self.rnn(input_seq)
        else:
            rnn_output, last_hidden_stwate = self.rnn(input_seq)

        last_output = rnn_output[:, -1, :]
        output = self.fc(last_output).view(batch_size, -1)
        output_probs = self.softmax(output)
        return output_probs


In [None]:
from torch.utils.data import DataLoader

class RnnSklearnWrapper(BaseEstimator, ClassifierMixin):
    def __init__(self,
                 batch_size=32,
                 epochs=3,
                 dropout=0,
                 rnn_type='gru',
                 hidden_size=300,
                 num_layers=1,
                 bidirectional=False):

        self.batch_size = batch_size
        self.epochs = epochs
        self.dropout = dropout
        self.rnn_type = rnn_type
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.bidirectional = bidirectional
        self._model = None

    def fit(self, X, Y=None):
        assert (Y is not None), "Y is required"
        self.num_tags = np.max(X) + 1

        rnn_params = {
            "rnn_type": self.rnn_type,
            "input_size": self.num_tags,
            "output_size": np.max(Y) + 1,
            "hidden_size": self.hidden_size,
            "dropout": self.dropout,
            "num_layers": self.num_layers,
            "bidirectional": self.bidirectional
        }
        self.model = RNNClassifier(**rnn_params).to(DEVICE)
        self.model.train()
        optimizer = optim.Adam(self.model.parameters(), lr=0.001, weight_decay=0.9)
        criterion = nn.CrossEntropyLoss()
        step = 0
        for _ in range(self.epochs):
            running_loss = 0
            for X_batch, y_batch in DataLoader(list(zip(X, Y)), batch_size=self.batch_size, shuffle=True):
                X_batch, y_batch = self.one_hot_encode(X_batch, self.num_tags).float().to(DEVICE), y_batch.long().to(DEVICE)

                optimizer.zero_grad()

                out = self.model(X_batch)
                loss = criterion(out, y_batch)
                loss.backward()
                optimizer.step()

                step += 1
                running_loss += loss.item()
                if step % 10 == 0:
                    last_loss = running_loss / 50
                    print('Batch {} loss: {}'.format(step, last_loss))
                    running_loss = 0.
        return self

    def predict(self, X, y=None):
        if self.model is None:
            raise RuntimeError("Fitting required before prediction!")

        self.model.eval()
        preds = []
        for X_batch in DataLoader(X, batch_size=self.batch_size):
            X_batch = self.one_hot_encode(X_batch, self.num_tags).float().to(DEVICE)
            print(X_batch.shape)
            output = self.model.forward(X_batch)
            preds.append(output)

        preds = torch.cat(preds, dim=0).cpu().detach().numpy()
        return np.argmax(preds, axis=1)


    def one_hot_encode(self, tensor, num_classes):
        batch_size, seq_len = tensor.size()
        eye = torch.eye(num_classes)
        one_hot_encoded = eye[tensor.long()]
        one_hot_encoded = one_hot_encoded.view(batch_size, seq_len, num_classes)
        return one_hot_encoded

### Evaluate

In [None]:
from sklearn.metrics import confusion_matrix, classification_report
import matplotlib.pyplot as plt
import numpy as np

def evaluate_clf(clf, X, y_true, classes, normalize=True, cmap=plt.cm.Blues):
    y_pred = clf.predict(X)
    print(classification_report(y_true, y_pred))

    cm = confusion_matrix(y_true, y_pred)

    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        title = 'Normalized Confusion Matrix'
    else:
        title = 'Confusion Matrix'

    fig, ax = plt.subplots()
    im = ax.imshow(cm, interpolation='nearest', cmap=cmap)
    ax.figure.colorbar(im, ax=ax)
    ax.set(xticks=np.arange(cm.shape[1]),
           yticks=np.arange(cm.shape[0]),
           xticklabels=classes, yticklabels=classes,
           title=title,
           ylabel='True label',
           xlabel='Predicted label')

    plt.setp(ax.get_xticklabels(), rotation=45, ha="right", rotation_mode="anchor")

    fmt = '.2f' if normalize else 'd'
    thresh = cm.max() / 2.
    for i in range(cm.shape[0]):
        for j in range(cm.shape[1]):
            ax.text(j, i, format(cm[i, j], fmt),
                    ha="center", va="center",
                    color="white" if cm[i, j] > thresh else "black")

    fig.tight_layout()
    plt.show()

# Training & evaluating

### Baseline: TFIDF with SVM

In [None]:
from sklearn.svm import LinearSVC

svm = LinearSVC()


clf_base = Pipeline([
    ('features', StylometricFeatures()),
    ('classifier', svm)
])

In [None]:
clf_base.fit(X_train, y_train);

In [None]:
evaluate_clf(clf_base, X_test, y_test, classes=le.classes_)

### Pure stylometry 1: StylometryFeatrues + RandomForest

In [None]:
from sklearn.ensemble import RandomForestClassifier

rf = RandomForestClassifier(
    max_depth=2, random_state=0
)

clf_style_rf = Pipeline([
    ('features',  StylometricFeatures()),
    ('classifier', rf)
])

In [None]:
clf_style_rf.fit(X_train, y_train);

In [None]:
evaluate_clf(clf_style_rf, X_test, y_test, classes=le.classes_)

### Pure stylometry 2.0: PosFeatrues + RandomForest

In [None]:
rf = RandomForestClassifier(
    max_depth=2, random_state=0
)

clf_pos_rf = Pipeline([
        ('pre', PosFeatures()),
        ('rf', rf)
])

In [None]:
clf_pos_rf.fit(X_train, y_train);

In [None]:
evaluate_clf(clf_pos_rf, X_test, y_test, classes=le.classes_)

### Pure stylometry 2.1: PosFeatrues + RNN

In [None]:
rnn = RnnSklearnWrapper(
    epochs=5,
    rnn_type='gru',
    dropout=0.3,
    num_layers=3,
    bidirectional=True
)


clf_pos_rnn = Pipeline([
        ('pre', PosFeatures()),
        ('rnn', rnn)
])

In [None]:
clf_pos_rnn.fit(X_train, y_train);

In [None]:
evaluate_clf(clf_pos_rnn, X_test, y_test, classes=le.classes_)

### Stylometry with TFIDF 1:  StylometryFeatrues + RandomForest

In [None]:
from sklearn.feature_extraction.text import TfidfVectorizer

rf = RandomForestClassifier(
    max_depth=2, random_state=0
)

clf_style_rf = Pipeline([
    ('features', StylometricFeatures(include_tfidf=True)),
    ('classifier', rf)
])

In [None]:
clf_style_rf.fit(X_train, y_train);

In [None]:
evaluate_clf(clf_style_rf, X_test, y_test, classes=le.classes_)