# NN 2.0

## Defining data functions

Import needed libraries

In [None]:
# imports
import pandas as pd
import tensorflow as tf
import tensorflow.keras as ks
import numpy as np

Get data from file and remove columns with text

In [None]:
def get_swissvotes_data()->pd.DataFrame:
    import re
    
    dataset = pd.read_csv("../data/formatted/swissvotes_dataset_after_1900_utf8.csv", sep=';')
    
    regex = re.compile("pdev_.*")
    to_excl = list(filter(regex.match, dataset.columns))
    
    dataset.drop(columns=to_excl, inplace=True)
    dataset.drop(columns=["legisjahr"], inplace=True)
    dataset.drop(columns=["titel_kurz_d", "titel_kurz_f", "titel_off_d", "titel_off_f", "stichwort"], inplace=True)
    dataset.drop(columns=["swissvoteslink", "anzahl", "anneepolitique", "bkchrono_de", "bkchrono_fr"], inplace=True)
    dataset.drop(columns=["curiavista_de", "curiavista_fr", "urheber", "bkresults_de", "bkresults_fr"], inplace=True)
    dataset.drop(columns=["bfsmap_de", "bfsmap_fr", "nach_cockpit_d", "nach_cockpit_f", "nach_cockpit_e"], inplace=True)
    dataset = dataset[dataset["anr"] < 646] # we don't care about future votes
    
    return dataset
print(f"Defined {get_swissvotes_data}")

In [None]:
def get_rechtsform_onehot(data:pd.DataFrame = get_swissvotes_data())->pd.DataFrame:
    tensor = tf.one_hot(data["rechtsform"], 5).numpy();
    result = pd.DataFrame(tensor, columns=["ref_obl", "ref_fak", "initiative", "gegen_entw", "stichfr"], index=data.index)
    
    return result.astype(int)
print(f"Defined {get_rechtsform_onehot}")

In [None]:
def get_politikbereich_multihot(data:pd.DataFrame = get_swissvotes_data())->pd.DataFrame:
    polber = data[["d1e1", "d2e1", "d3e1"]]
    polber = polber.replace('.', 0)
    polber = polber.astype(int)
    
    # the names of the columns (they're a bit long)
    cols = ["Staatsordnung", "Aussenpolitik", "Sicherheitspolitik", "Wirtschaft"]
    cols += ["Landwirtschaft", "Öffentliche Finanzen", "Energie", "Verkehr und Infrastruktur"]
    cols += ["Umwelt und Lebensraum", "Sozialpolitik", "Bildung und Forschung", "Kultur, Religion, Medien"]
    
    result = pd.DataFrame(columns=cols, index = data.index)
    for i in range(len(result)):
        row = np.zeros(12)
        for p in polber.iloc[i]:
            if p != 0:
                row[p-1] = 1
        result.iloc[i] = row
    return result.astype(int)

print(f"Defined {get_politikbereich_multihot}")

In [None]:
def get_department_onehot(data:pd.DataFrame = get_swissvotes_data())->pd.DataFrame:
    dep_single = data["dep"].replace('.', 2) # voting at age 18 is the only vote with a '.' and it's dep of inner
    dep_single = dep_single.astype(int)
    tensor = tf.one_hot(dep_single, 8).numpy()
    result = pd.DataFrame(tensor, columns=["EDA", "EDI", "EJPD", "VBS", "EFD", "WBF", "UVEK", "BK"], index=data.index)
    
    return result.astype(int)

print(f"Defined {get_department_onehot}")

In [None]:
def get_bundesrat_onehot(data:pd.DataFrame = get_swissvotes_data())->pd.DataFrame:
    tensor = tf.one_hot(data["br_pos"].replace('.', 3).astype(int), 3).numpy()
    result = pd.DataFrame(tensor, columns=["Für", "Dagegen", "Keine"], index=data.index)
    return result.astype(int)

print(f"Defined {get_bundesrat_onehot}")

In [None]:
def get_legislatur(low:int, high:int, data:pd.DataFrame = get_swissvotes_data())->pd.DataFrame:
    leg = data["legislatur"]
    def my_map(x:int, x_min:int=leg.min(0), x_max:int=leg.max(0), y_min:int=low, y_max:int=high)->float:
        return (x-x_min)/(x_max-x_min)*(y_max-y_min)+y_min
    
    normalized = data[["legislatur"]].applymap(my_map)
    return normalized

print(f"Defined {get_legislatur}")

In [None]:
def get_parlament_onehot(data:pd.DataFrame = get_swissvotes_data()["nr_pos"])->pd.DataFrame:
    tensor = tf.one_hot(data.astype(int), 3).numpy()
    result = pd.DataFrame(tensor, columns=["Für", "Dagegen", "Keine"], index=data.index)
    return result.astype(int)

print(f"Defined {get_parlament_onehot}")

In [None]:
def get_parties(data:pd.DataFrame = get_swissvotes_data())->list:
    import re
    
    regex_incl = re.compile("p_.*")
    regex_excl = re.compile("p_others_.*")
    
    parties_pre = list(filter(regex_incl.match, data.columns))
    parties = [p for p in parties_pre if not regex_excl.match(p)]
    return parties

print(f"Defined {get_parties}")

In [None]:
def normalize_party_reco(data:pd.DataFrame = get_swissvotes_data(), names:list = get_parties())->pd.DataFrame:
    # deal with unwanted values first
    normalized = data[names].replace(".", 0)
    normalized.replace(np.nan, 0, inplace=True)
    normalized = normalized.astype(int)
    normalized.replace([3,4,5,66,9999], 0, inplace=True)
    
    result = pd.DataFrame(index=normalized.index)
    
    for p in names: # go through parties and create one hot encoding
        tensor = tf.one_hot(normalized[p], 3).numpy()
        temp = pd.DataFrame(tensor, columns=[p+"_neutral", p+"_ja", p+"_nein"], index=result.index)
        result = result.join(temp)

    return result.astype(int)
print(f"Defined {normalize_party_reco}")

In [None]:
def get_vote_result(data:pd.DataFrame = get_swissvotes_data())->pd.DataFrame:
    result = data["annahme"].replace('.', 0)
    return result.astype(int)

print(f"Defined {get_vote_result}")

In [None]:
# creates a dataframe 
def get_canton_results(data:pd.DataFrame = get_swissvotes_data())->pd.DataFrame:
    import re
    regex = re.compile(".*_annahme")
    canton_names = list(filter(regex.match, data.columns))
    return data[canton_names].replace('.', 0).astype(int)

print(f"Defined {get_canton_results}")

## Training the net

In [None]:
# get data
def get_data():
    swissvotes = get_swissvotes_data()
    # the inputs used by the neural net are:
        # Rechtsform (one hot),
        # Politikbereich (multi hot),
        # Department (one hot),
        # Position of the Bundesrat (one hot),
        # legislatur (normalized from 1-10),
        # Position of Nationalrat (one hot),
        # Position of Ständerat (one hot),
        # Party recommendations (one hot)
    in_rchtfrm = get_rechtsform_onehot()
    in_poltber = get_politikbereich_multihot()
    in_deprtmt = get_department_onehot()
    in_burapos = get_bundesrat_onehot()
    in_narapos = get_parlament_onehot()
    in_strapos = get_parlament_onehot(swissvotes["sr_pos"])
    in_parties = normalize_party_reco()
    
    inputs = pd.concat([in_rchtfrm, in_poltber, in_deprtmt, in_burapos, in_narapos, in_strapos, in_parties], axis=1)
    
    # the outputs are:
        # result of the votes (binary),
        # result on a canton level (binary)
    out_result = get_vote_result()
    out_canton = get_canton_results()
    
    outputs = pd.concat([out_result, out_canton], axis=1)
    
    return swissvotes, inputs, outputs

print(f"Defined {get_data}")

In [None]:
def create_model(input_size:int = len(get_data()[1].columns), hidden:list=[100, 50, 20],
                 output_size:int = len(get_data()[2].columns), activation:str="relu",
                 activation_output:str="sigmoid", 
                 optimizer=ks.optimizers.SGD(learning_rate=0.1), 
                 loss=ks.losses.BinaryCrossentropy())->ks.models.Sequential:
    model = ks.models.Sequential()
    
    model.add(ks.layers.Dense(units=input_size, activation=activation, name="Input"))
    
    for i in range(len(hidden)):
        model.add(ks.layers.Dense(units=hidden[i], activation=activation, name="Hidden_"+str(i)))
        model.add(ks.layers.Dropout(rate=.1, name="Dropout_"+str(i)))
        
    model.add(ks.layers.Dense(units=output_size, activation=activation_output, name="Output"))
    
    model.compile(optimizer=optimizer, loss=loss, metrics=[ks.metrics.BinaryAccuracy(), 
                                                           ks.metrics.FalseNegatives()])
    
    return model

print(f"Defined {create_model}")

In [None]:
def train_model(model:ks.models.Sequential, inputs:pd.DataFrame=get_data()[1], 
                outputs:pd.DataFrame=get_data()[2], test_size:float=0.6, 
                batch_size:int=50, epochs:int=75, shuffle:bool=True)->tuple:
    from sklearn.model_selection import train_test_split as tss
    in_train, in_test, out_train, out_test = tss(inputs, outputs, test_size=test_size)
    
    history = model.fit(x=in_train, y=out_train, batch_size=batch_size, epochs=epochs, shuffle=shuffle)
    
    return history, in_test, out_test

print(f"Defined {train_model}")

In [None]:
model = create_model(hidden=[10, 20], 
                     loss=ks.losses.MeanAbsoluteError(), activation_output="sigmoid")

history, in_test, out_test = train_model(model, epochs=125)

print(model.summary())

In [None]:
model.evaluate(x=in_test, y=out_test)