Import libraries for fetching the data

In [2]:
import os
import pandas as pd
import timeit
import rpy2.robjects as ro
from rpy2.robjects import r
from rpy2.robjects import pandas2ri
from rpy2.robjects.conversion import localconverter
import requests


Fetch and convert cipher data to pandas dataframe.

In [3]:
data_url = "https://nextcloud.sdu.dk/index.php/s/Zzjcqjopy5cTawn/download/data_33.Rdata"
data_file = "data.Rdata"

#download the file
req = requests.get(data_url, allow_redirects=True, stream=True)

#save downloaded file
with open(data_file,"wb") as rf:
     for chunk in req.iter_content(chunk_size=1024):
         # writing one chunk at a time to r file
         if chunk:
              rf.write(chunk)

r_data=r.load(data_file)

with localconverter(ro.default_converter + pandas2ri.converter):
    ciphers = ro.conversion.rpy2py(r['ciphers'])

Define utility functions.

In [4]:
import numpy as np
import random
import statistics

In [6]:
def take(arr, indexes):
    mask = np.ones(len(arr), dtype=bool)
    mask[indexes] = True
    return arr[mask]

def take_inverse(arr, indexes):
    mask = np.ones(len(arr), dtype=bool)
    mask[indexes] = False
    return arr[mask]

def all_persons_in(ciphers, split = 0.5):
    count = len(ciphers)
    samples = random.sample(range(0, count), int(count * split))
    train_data = take(ciphers, samples)
    test_data = take_inverse(ciphers, samples)
    return { "train_data": train_data, "test_data": test_data }

def disjunct(ciphers, amount):
    total_people = len(set(ciphers.T[0]))
    per_person = int(len(ciphers) / total_people)
    train_data = take(ciphers, range(0, per_person * amount))
    test_data = take_inverse(ciphers, range(0, per_person * amount))
    return { "train_data": train_data, "test_data": test_data }

def split(ciphers):
    person_index = ciphers.T[0]
    ground_truth = ciphers.T[1]
    data = []
    for cipher in ciphers:
        data.append(cipher[2:len(cipher)])
    return { "person": person_index, "truth":ground_truth, "data": data}

def compute_accuracy_folds(folds, ciphers, predictor, predictor_args=[]):
    results = []
    for fold in folds:
        train_raw = take(ciphers, fold)
        test_raw = take_inverse(ciphers, fold)

        train_data = take_inverse(train_raw, range(0,1))
        train_labels = take(train_raw, [1])
        test_data = take_inverse(test_raw, range(0,1))
        test_labels = take(test_raw, [1])

        predictions = predictor(train_data, test_data, train_labels, test_labels, predictor_args)
        cf = pd.crosstab(predictions, test_labels)
        accuracy = np.diag(cf).sum() / cf.to_numpy().sum()

        results.append(accuracy)

    mean = statistics.mean(results)
    stdev = statistics.stdev(results)
    return { "mean": mean, "stdev": stdev }

def k_folds(arr_size, folds):
    samples = random.sample(range(0, arr_size), arr_size)
    return np.array_split(samples, folds)

Perform PCA and other preprocessing on ciphers.

In [11]:
from sklearn.decomposition import PCA
ciphers_metadata = []
ciphers_data = []
cipher_length = len(ciphers.T)

for i in range(2, len(ciphers)):
    ciphers_metadata.extend([ciphers[i][0], ciphers[i][1]])
    ciphers_data.append(ciphers[i][2:cipher_length])

pca = PCA(n_components=100)
pca.fit(ciphers_data)
transformed = pca.transform(ciphers_data)

ciphers_transformed = []
for i in range(2, len(ciphers)):
    ciphers_transformed.append([])
    ciphers_transformed[i].append(ciphers_metadata[i][0])
    ciphers_transformed[i].append(ciphers_metadata[i][1])
    ciphers_transformed[i].extend(transformed[i - 2])



IndexError: list assignment index out of range

Perform cross-validated kNN analysis with and without preprocessing

In [123]:
from sklearn.neighbors import KNeighborsClassifier

# kNN functions
def knn_predictor(train_data, train_labels, test_data, test_labels, args):
    k, l = args['k'], args['l']
    neigh = KNeighborsClassifier(n_neighbors=k)
    neigh.fit(train_data, train_labels)
    return neigh.predict(test_data)

data = all_persons_in(ciphers, 0.5)
train_data_test = data["test_data"]
train_data = split(data["train_data"])
test_data = split(data["test_data"])
knn_predictor(train_data["data"], train_data["truth"], test_data["data"], test_data["truth"], {'k': 3, 'l':1})

col_0   0.0   1.0   2.0   3.0   4.0   5.0   6.0   7.0   8.0   9.0
row_0                                                            
0.0    3241     0    34    57    20    62    95     4    48    41
1.0       8  3240    62    88    90    60    50    47    46    97
2.0       2     1  3166    55     7    17     7    21    53    11
3.0       1     0     7  2951     1    51     5    11    71    22
4.0       6     2    10     2  3093    13     7     3    15   131
5.0       0     1     1    22     0  3032    20     1    69    11
6.0      33     0     6     7     4    54  3090     1    95     4
7.0       9     6    40    30     8    13     2  3210     5    39
8.0       6     0     8    18     0     5     6     2  2924     3
9.0      12     9     4    15    95    17     4     3    20  2904


In [None]:
# Without preprocessing

In [None]:
# With preprocessing