# Classify the party affiliation of the president

In [None]:
""" Import the data into dataframe """
from ast import literal_eval
from pathlib import Path

import pandas as pd
pd.set_option('display.max_rows', 500)  # We want to see the whole dataframe

dataset_folder = Path('../state-of-the-union-dataset')
full_text_folder = dataset_folder / 'txt'
meta_folder = dataset_folder / 'meta'

speeches = []
presidents = []
years = []

for file in full_text_folder.glob('*.txt'):
    speeches.append(file.read_text())
    president, year = file.stem.split('_')
    presidents.append(president)
    years.append(int(year))

df = pd.DataFrame(index=years, data={'President': presidents, 'Text': speeches}).sort_index()

# Read metadata
presidents = pd.read_csv(meta_folder / 'presidents.csv', converters={"Party": literal_eval})
presidents['First Year'] = presidents['Term Start'].str.extract(r',\s([0-9]{4})').astype("int")
presidents['Last Year'] = presidents['Term End'].str.extract(r',\s([0-9]{4})').astype("float")
speeches_meta = pd.read_csv(meta_folder / 'speeches-meta.csv')

In [None]:
def handle_special_party_case(year, president):
    """ Manually handle some cases where the party affiliation is ambiguous """
    party = None
    if president['Last Name'] in ['Adams', 'Tyler', 'Johnson']:
        party = president['Party'][0]
    else:
        raise NotImplementedError("[handle_special_party_case] Unhandled special case!")   
    return party

def add_meta(row):
    """ Adds meta information to a row in the dataframe """
    year = row.name
    last_name = row['President']
    president = presidents[(presidents['First Year'] <= year) & (presidents['Last Year'] > year)].squeeze()
    first_name = president['First Name(s)']
    row['First Name'] = first_name
    party = president['Party']
    if len(party) > 1:
        party = handle_special_party_case(year, president)
    else:
        party = party[0]
        
    row['Party'] = party

    return row

df = df.apply(add_meta, axis='columns')
df.insert(0, 'First Name', df.pop('First Name'))
df.insert(1, 'Last Name', df.pop('President'))
df.insert(2, 'Party', df.pop('Party'))


In [None]:
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize

nltk.download('punkt')
nltk.download('stopwords')
stopwords_ = set(stopwords.words('english'))

In [None]:
# Expand some contractions
df['Expanded Text'] = df["Text"].str.replace("'ll", " will").str.replace("'ve", " have").str.replace("'re", "are")


In [None]:
df['Tokens'] = df['Expanded Text'].apply(word_tokenize)

In [None]:
import string

df['Cleaned Tokens'] = df['Tokens'].apply(lambda tokens: [w.lower() for w in tokens if not w.lower() in stopwords_.union(set([c for c in string.punctuation] + ['--', "''", '``']))])

In [None]:
from nltk import pos_tag

df['Tagged Tokens'] = df['Cleaned Tokens'].apply(pos_tag)

In [None]:
from nltk.stem import WordNetLemmatizer

nltk.download('wordnet')
nltk.download('omw-1.4')

lemmatizer = WordNetLemmatizer()

def lemmatize_tokens(tagged_tokens):
    lemmatized_tokens = []
    for token, tag in tagged_tokens:
        if tag in ['JJ', 'JJR', 'JJS']:
            pos = 'a'
        elif tag in ['NN', 'NNS', 'NNP', 'NNPS']:
            pos = 'n'
        elif tag in ['RB', 'RBR', 'RBS', 'WRB']:
            pos = 'r'
        elif tag in ['VB', 'VBD', 'VBG', 'VBN', 'VBP', 'VBZ']:
            pos = 'v'
        else:
            lemmatized_tokens.append(token)
            continue
        
        lemmatized_tokens.append(lemmatizer.lemmatize(token, pos))
    return lemmatized_tokens
    

df['Lemmatized'] = df['Tagged Tokens'].apply(lemmatize_tokens)

In [None]:
from sklearn.base import BaseEstimator, TransformerMixin

from gensim import corpora, models
from gensim.matutils import corpus2dense

import numpy as np

class TopicModeler(BaseEstimator, TransformerMixin):
    """ A topic modeler with an sklearn-compatible interface """
    def __init__(self, num_topics=10):
        self.num_topics = num_topics
        
    
    def fit(self, X, y=None):
        X = X.values
        self.dictionary = corpora.Dictionary(X)
        corpus = [self.dictionary.doc2bow(text) for text in X]
        self.tfidf = models.TfidfModel(corpus)
        corpus_tfidf = self.tfidf[corpus]
        self.lsi_model = models.LsiModel(corpus_tfidf, num_topics=self.num_topics)
        return self
        
    
    def transform(self, X):
        X = X.values
        corpus = [self.dictionary.doc2bow(text) for text in X]
        
        X_topics = []
        for topics in self.lsi_model[self.tfidf[corpus]]:
            t = [topic for _, topic in topics]
            X_topics.append(t)
        return np.array(X_topics)

In [None]:
""" Prepare the Pipeline """

from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC

vectorizer = TfidfVectorizer(input='content', 
                             lowercase=False, 
                             preprocessor=lambda x: x,   # We did the preprocessing ourselves, so just pass everything through
                             tokenizer=lambda x: x,      # We also did the tokenization ourselves
                             ngram_range=(2, 2),         # Use bigrams
                             max_features=1000)          # Limit the vocabulary to 1000 words

feature_extractor = ColumnTransformer([
    ('tfidf', vectorizer, 'Lemmatized'),
    ('topics', TopicModeler(), 'Lemmatized')
])

pipe = Pipeline(steps=[
    ('feature_extraction', feature_extractor), 
    ('classifier', SVC())
    ])

In [None]:
""" Holdout validate pipeline """
from sklearn.metrics import classification_report
from sklearn.model_selection import train_test_split


df_train, df_test = train_test_split(df[(df['Party'] == 'Democratic') | (df['Party'] == 'Republican')], test_size=0.1)

In [None]:
""" Grid search hyperparamters """
from sklearn.model_selection import GridSearchCV

param_grid = {
    "feature_extraction__topics__num_topics": [5, 10, 15, 20, 25],
    'classifier__C': [100, 1000, 10e3, 10e4], 
    'classifier__gamma': [0.001, 0.01, 0.1], 
    'classifier__kernel': ['rbf'],
}


search = GridSearchCV(pipe, param_grid, n_jobs=8, verbose=3)
search.fit(df_train, df_train['Party'])
print(f"Best parameter (CV score={search.best_score_:0.3f}):")
print(search.best_params_)


In [None]:
print(classification_report(df_test['Party'], search.predict(df_test)))