# Assignment 5

### A selection of interesting solutions

#### Task 2: one-hots via pandas get_dummies()

In [2]:
data = ['blue', 'yellow', 'blue', 'green', 'red', 'yellow']

def one_hot_encoding(string_list):

    m=len(string_list)
    n=np.unique(string_list, return_counts=False)
    
    y = pd.get_dummies(n)
    
    coding=[y.loc[:,x].tolist() for x in string_list]
    
    return y, coding

y, one_hot_encoded_data = one_hot_encoding(data)
y

Unnamed: 0,blue,green,red,yellow
0,1,0,0,0
1,0,1,0,0
2,0,0,1,0
3,0,0,0,1


#### Task 2: double/nested list comprehension

In [None]:
data = ['blue', 'yellow', 'blue', 'green', 'red', 'yellow']

def one_hot_encoding(string_list):
    """
    This function takes list of strings and does a one hot encoding of it after sorting it alphabetically. 
    The function is derived from the quiz of the 4. January 2023
    
    param: string_list
    return: list of lists with the one_hot_encoding
    """
    unique_items = sorted(list(set(string_list)))
    print(unique_items)
    one_hot = [[1 if ui == w else 0 for ui in unique_items] for w in string_list]

    return(one_hot)

one_hot_encoded_data = one_hot_encoding(data)
one_hot_encoded_data

#### Task 3: Very neat solution with lots of built-in Python data structures and operations i.e. filter, map.

In [None]:
import numpy as np
from collections import Counter
import pandas as pd

def bag_of_words(corpus):
    # Perform pre-processing on the input text to remove non-alphanumeric characters and convert to lowercase
    pre_process = lambda x: "".join(filter(str.isalnum, x.lower()))
    # Tokenize the input text
    tokens = [[pre_process(n) for n in i.split()] for i in corpus]
    # Count the occurrences of each token in the tokenized text
    token_counts = list(map(Counter, tokens))
    # Get the unique words in the text
    unique_words = sorted(set([word for tokens in tokens for word in tokens]))
    # Create a dataframe with the token counts as the values and the unique words as the columns
    result = pd.DataFrame(token_counts, columns=unique_words).fillna(0).astype(int)
    return list(result.columns), np.array(result.values)

bag_of_words(corpus)

#### Task 3: regular expressions

In [None]:
import re 
import numpy as np
def bag_of_words(corpus):

    words = list(set(re.split('[^a-z]', ''.join(corpus).lower())))
    words.remove('')
    print(words)
    wordfreq = [[re.split('[^a-z]', sentences.lower()).count(p) for p in words] for sentences in corpus]
    return np.array(wordfreq)
bag_of_words(corpus)

#### Task 5: comprehension and zipped arrays

In [None]:
def precision(y_true, y_predicted):
    # zip for parallel variables
    tp = sum([1 for yt, yp in zip(y_true, y_predicted) if yt == 1 and yp == 1])
    fp = sum([1 for yt, yp in zip(y_true, y_predicted) if yt == 0 and yp == 1])

    return tp / (tp + fp) if tp + fp != 0 else 0

print(precision(np.array([1,0,1,1]), np.array([1,1,0,0])))

def recall(y_true, y_predicted):
    tp = sum([1 for yt, yp in zip(y_true, y_predicted) if yt == 1 and yp == 1])
    fn = sum([1 for yt, yp in zip(y_true, y_predicted) if yt == 1 and yp == 0])

    return tp / (tp + fn) if tp + fn != 0 else 0

print(recall(np.array([1,0,1,1]), np.array([1,1,0,0])))

#### Task 5: alternative compact solution using the intersection operator

In [None]:
def precision(y_true, y_predicted):
    # Count the number of true positives
    true_positives = np.sum((y_true == 1) & (y_predicted == 1))
    # Count the number of false positives
    false_positives = np.sum((y_true == 0) & (y_predicted == 1))
    # Calculate precision
    precision = true_positives / (true_positives + false_positives)
    # If precision is NaN (e.g. when true_positives and false_positives are both 0), return 0
    return precision if not np.isnan(precision) else 0

def recall(y_true, y_predicted):
    # Count the number of true positives
    true_positives = np.sum((y_true == 1) & (y_predicted == 1))
    # Count the number of false negatives
    false_negatives = np.sum((y_true == 1) & (y_predicted == 0))
    # Calculate recall
    recall = true_positives / (true_positives + false_negatives)
    # If recall is NaN (e.g. when true_positives and false_negatives are both 0), return 0
    return recall if not np.isnan(recall) else 0

#### Task 5: insanely neat and carefully thought-through

In [None]:
import numpy as np

def input_check(y_true: np.ndarray, y_predicted: np.ndarray) -> None:
    """Overly verbose check of y_true and y_predicted numpy array for precision
    and recall calculations. The arrays must be numpy arrays and of shape (n,),
    where n denotes the length of the input arrays. The array values must be
    binary (i.e. 0 or 1).
    Args:
        y_true (np.ndarray): y_true array
        y_predicted (np.ndarray): y_predicted array
    Raises:
        TypeError: If arguments are not of type np.ndarray
        ValueError: In case of unequal input array shape, if y_true is not
            1-dimensional, or if input arrays are not binary.
    """

    # type check y_true
    if not isinstance(y_true, np.ndarray):
        raise TypeError(
            f'For y_true expected np.ndarray type, got {type(y_true)}.')
    
    # type check y_predicted
    if not isinstance(y_predicted, np.ndarray):
        raise TypeError(
            f'For y_predicted expected np.ndarray type, got {type(y_predicted)}.')

    # shape comparison
    if not y_true.shape == y_predicted.shape:
        raise ValueError(
            'y_true and y_predicted array are not of same shape. This is forbidden.')

    # shape must be of length 1 (1d-array)
    if len(y_true.shape) != 1:
        raise ValueError('y_true is not a 1d array. This is forbidden.')

    # check if arrays are binary (https://stackoverflow.com/a/40597324/12785394)
    if not np.array_equal(y_true, y_true.astype(bool)):
        raise ValueError('y_true is not binary. This is forbidden.')

    if not np.array_equal(y_predicted, y_predicted.astype(bool)):
        raise ValueError('y_predicted is not binary. This is forbidden.')


def precision(y_true: np.ndarray, y_predicted: np.ndarray) -> float:
    """Calculate the precision of a binary prediction:
    precision = tp / (tp + fp), with the true positives tp and the false
    positives fp.
    Args:
        y_true (np.ndarray): True labels (1->positive, 0->negative)
        y_predicted (np.ndarray): Predicted labels (1->positive, 0->negative)
    Returns:
        float: Calculated precision
    """
    # Nesting the input_check in the try except block is somewhat redundant,
    # but it still shows that we expect an error to occur here and might
    # refine the error catching in the future...
    try:
        input_check(y_true, y_predicted)
    except Exception:
        raise
    
    # calculate the true positives tp
    # no need here for eg y_true==1 as we already know that our input is binary
    tp = np.sum(y_true & y_predicted)

    # calculate the false positives fp
    fp = np.sum((y_true == 0) & y_predicted)

    # calculate the denominator beforehand for 0 division catch
    denominator = tp + fp

    if denominator:
        return tp / denominator
    
    else:
        # divide by 0 case
        return 0


def recall(y_true: np.ndarray, y_predicted: np.ndarray) -> float:
    """Calculate the recall of a binary prediction:
    precision = tp / (tp + fn), with the true positives tp and the false
    negatives fn.
    Args:
        y_true (np.ndarray): True labels (1->positive, 0->negative)
        y_predicted (np.ndarray): Predicted labels (1->positive, 0->negative)
    Returns:
        float: Calculated recall
    """
    
    # perform input checks
    try:
        input_check(y_true, y_predicted)
    except Exception:
        raise
    
    # calculate the true positives tp
    tp = np.sum(y_true & y_predicted)
    
    # calculate the false negatives fn
    fn = np.sum(y_true & (y_predicted == 0))

    # calculate the denominator beforehand for 0 division catch
    denominator = tp + fn

    if denominator:
        return tp / denominator
    else:
        # divide by 0 case
        return 0

#### Task 6: function defined in inner scope of an outer function; the inner one is then applied on the dataframe

In [None]:
PATH = 'data/bundestags_parlamentsprotokolle.csv.gzip'
PATH_STOPWORDS = 'data/stopwords.txt'


def load_data(path: str) -> pd.DataFrame:
    """Return a pd.DataFrame with the requested government column from the
    csv file containing parliament speeches.
    Args:
        path (str): path to the compressed csv file
    Returns:
        pd.DataFrame: With government column extended DataFrame containing 
            parliament speeches.
    """

    # load from compressed csv file, already contains index col at position 0
    df = pd.read_csv(PATH, compression='gzip', index_col=0)

    def categorizer(row) -> bool:
        """Categorizer for pd.apply. Returns True if party belongs to
        government, otherwise returns False.
        """
        if row['wahlperiode'] == 17:
            if row['partei'] in ('cducsu', 'fdp'): return True
            else: return False
        
        elif row['wahlperiode'] == 18:
            if row['partei'] in ('cducsu', 'spd'): return True
            else: return False
        
        else:
            raise ValueError("Invalid value in column 'wahlperiode'.")

    # apply the categorizer row-wise, creating the column 'government'
    df['government'] = df.apply(categorizer, axis=1)

    return df

#### Task 6: Data loading routine using lambda, Random Search (instead of Grid Search), Stratified k-fold (so that the classes are balanced throughout all the folds)

*Note*: the metrics report after each fold are fairly misleading, since they show 1.0 precision, recall and accuracy. The final report is sensible though.

In [None]:
'''Function to extend the column "government" in the dataframe.'''
def Government_True_Or_False(Tempwahlperiode,Temppartei):
    if Tempwahlperiode == 17 and (Temppartei == 'cducsu' or Temppartei == 'fdp'): # if the party is cducsu or fdp and the wahlperiode is 17, then the value is True
        return True
    elif Tempwahlperiode == 18 and (Temppartei == 'cducsu' or Temppartei == 'spd'): # if the party is cducsu or spd and the wahlperiode is 18, then the value is True
        return True
    else:
        return False # otherwise the value is False

'''Function that loads the dataset into a dataframe and adds the column "government" to the dataframe by calling the function Government_True_Or_False.'''
DATADIR = "data"
def load_data():
    DF = pd.read_csv(DATADIR + '/bundestags_parlamentsprotokolle.csv').drop('Unnamed: 0',axis=1) # load the dataset into a dataframe
    DF['government'] = DF.apply(lambda row: Government_True_Or_False(row['wahlperiode'],row['partei']),axis=1) # add the column "government" to the dataframe
    return DF

In [None]:
def train_bundestag(df):
    # Initialize Pipeline for extracting features
    # Define Hyperparameters and HPO strategy
    # Split training + test
    # Train model
    # Run predictions and print scores

    #Label encode the column "sprecher"
    le = LabelEncoder()
    df['sprecher'] = le.fit_transform(df['sprecher']) # label encode the column "sprecher"
    df['partei'] = le.fit_transform(df['partei']) # label encode the column "partei"

    #Count vectorizer
    df.text = df.text.astype(str).str.lower() # convert the column "text" to lower case
    AllText = df.text.tolist() # convert the column "text" to a list
    hv = HashingVectorizer(n_features=20) # initialize the hashing vectorizer
    Vectors = hv.fit_transform(AllText).toarray().tolist() # fit the hashing vectorizer to the list and convert the result to a list
    df[['vectorized Text '+str(i) for i in range(20)]] = pd.DataFrame(Vectors, index= df.index) # add the list to the dataframe
    df = df.drop('text',axis=1) # drop the column "text"

    #Hyperparameter optimization
    n_estimators = [int(x) for x in np.linspace(start = 50, stop = 150, num = 10)]
    max_features = ['sqrt']
    random_grid = {'n_estimators': n_estimators,
                   'max_features': max_features,
                   'n_jobs':[-1]
                  }

    #Splitting the data
    X = df.drop('government',axis=1)
    y = df['government']


    #Split data bases upon wahlperiode value counts
    wahlperiode_17 = df[df['wahlperiode'] == 17]
    wahlperiode_18 = df[df['wahlperiode'] == 18]

    X = wahlperiode_17.drop('government',axis=1)
    y = wahlperiode_17['government']

    X_train, X_test, y_train, y_test = train_test_split(X,y,test_size=0.2,random_state=42)

    #Random forest classifier
    rf = RandomForestClassifier()
    #Randomized search cross validation
    rf_random = RandomizedSearchCV(estimator = rf, param_distributions = random_grid, n_iter = 6, cv = 2, verbose=1, random_state=42, n_jobs = -1)
    rf_random.fit(X_train, y_train)
    BestParams = rf_random.best_params_
    print('Hyperparameter Optimization finished.')

    # Stratified K-Fold
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    skf = StratifiedKFold(n_splits=3, shuffle=True, random_state=42)
    Val = 0
    # Training the model
    print("Results on 17 Bundestag:")
    for train_index, test_index in skf.split(X_train, y_train): # split the training data into training and validation data
        Val += 1
        TX_train, TX_test = X_train.iloc[train_index, :], X_train.iloc[test_index, :]
        Ty_train, Ty_test = y_train.iloc[train_index], y_train.iloc[test_index]
        rf = RandomForestClassifier(**BestParams)
        rf.fit(TX_train,Ty_train)
        y_pred = rf.predict(TX_test)
        # Print scores
        print('Fold Number: ',Val)
        print('Precision: ',precision(Ty_test,y_pred))
        print('Recall: ',recall(Ty_test,y_pred))
        print(classification_report(Ty_test,y_pred))
        print()

    X = wahlperiode_18.drop('government',axis=1)
    y = wahlperiode_18['government']

    print("Results on 18 Bundestag:")
    y_pred = rf.predict(X)
    # Print scores
    print('Precision: ',precision(y,y_pred))
    print('Recall: ',recall(y,y_pred))
    print(classification_report(y,y_pred))
    print()
