### Imports

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
import nltk
import math
import spacy
from tqdm import tqdm_notebook as tqdm

In [None]:
tqdm().pandas()

### Config

In [None]:
plt.style.use("seaborn-whitegrid")
plt.rc('ps',fonttype = 42)
plt.rc('pdf',fonttype = 42)
plt.rcParams.update({'font.size': 14})
plt.rcParams['ps.useafm'] = True
plt.rcParams['pdf.use14corefonts'] = True
plt.rcParams['axes.unicode_minus'] = False

### Load data

In [None]:
texts_df = pd.read_pickle("data/processed/texts.p")

In [None]:
texts_df

### Prepare data

#### Fix authors

In [None]:
author_fix = {
    "Bachiller D. P. Gatell": "Bachiller D. P. Gatell.",
    "Eliza Haywood": "Eliza Fowler Haywood",
}
texts_df["author"] = texts_df["author"].replace(author_fix)

#### Get known authors

In [None]:
all_authors = texts_df.author.unique().tolist()

In [None]:
known_authors = []
for a in all_authors:
    if "Anonym" in a or "Anonyme" in a or "Anónimo" in a or "[" in a or "missing" in a:
        continue
    known_authors.append(a)

#### Create tokens

In [None]:
de_nlp = spacy.load("de_core_news_sm")
fr_nlp = spacy.load("fr_core_news_sm")
es_nlp = spacy.load("es_core_news_sm")
it_nlp = spacy.load("it_core_news_sm")

In [None]:
nlp_to_use = {
    "German": de_nlp,
    "French": fr_nlp,
    "Spanish; Castilian": es_nlp,
    "Italian": it_nlp
}

In [None]:
text_by_file_df = texts_df.groupby(["filename", "author", "language"])["text"].apply(lambda x: " ".join(x)).to_frame()

In [None]:
text_by_file_df

In [None]:
def create_tokens(row):
    lang = row.name[2]
    if lang not in nlp_to_use:
        return None
    doc = nlp_to_use[lang](row["text"])
    toks = []
    for t in doc:
        if any(c.isalpha() for c in t.text):
            toks.append((t.text, t.pos_))
    return toks
    #tokens = nltk.word_tokenize(text)
    
    #return [token for token in tokens if any(c.isalpha() for c in token)]

tokens_df = text_by_file_df.progress_apply(create_tokens, axis=1)

In [None]:
tokens_df.head(10)

### John Burrows' Delta

In [None]:
def calculate_z_scores(corpus):
    # calculate 'standard' freqs
    known_authors_freq_dist = list(nltk.FreqDist(corpus.sum()).most_common(100))

    # calculate freqs per author
    features = [word for word, freq in known_authors_freq_dist]
    feature_freqs = {}
    for author, tokens in corpus.items():
        feature_freqs[author] = {}
        overall = len(tokens)
        for feature in features:
            presence = tokens.count(feature)
            feature_freqs[author][feature] = presence / overall
        
    # calculate means and stds
    corpus_features = {}
    for feature in features:
        corpus_features[feature] = {}

        feature_average = 0
        for author in corpus.index:
            feature_average += feature_freqs[author][feature]
        feature_average /= len(corpus.index)
        corpus_features[feature]["Mean"] = feature_average

        feature_stdev = 0
        for author in corpus.index:
            diff = feature_freqs[author][feature] - corpus_features[feature]["Mean"]
            feature_stdev += diff*diff
        feature_stdev /= (len(corpus.index) - 1)
        feature_stdev = math.sqrt(feature_stdev)
        corpus_features[feature]["StdDev"] = feature_stdev
        
    # calculate z scores
    feature_zscores = {}
    for author in corpus.index:
        feature_zscores[author] = {}
        for feature in features:
            feature_val = feature_freqs[author][feature]
            feature_mean = corpus_features[feature]["Mean"]
            feature_stdev = corpus_features[feature]["StdDev"]
            feature_zscores[author][feature] = ((feature_val-feature_mean) / feature_stdev)
    
    return features, corpus_features, feature_zscores
 
def calculate_authorship(test_case_tokens, features, corpus_features, feature_zscores, authors):
    overall = len(test_case_tokens)
    test_case_freqs = {}
    for feature in features:
        presence = test_case_tokens.count(feature)
        test_case_freqs[feature] = presence / overall

    test_case_zscores = {}
    for feature in features:
        feature_val = test_case_freqs[feature]
        feature_mean = corpus_features[feature]["Mean"]
        feature_stdev = corpus_features[feature]["StdDev"]
        test_case_zscores[feature] = (feature_val - feature_mean) / feature_stdev
        #print("Test case z-score for feature", feature, "is", test_case_zscores[feature])
    
    lowest_delta = None
    for author in authors:
        delta = 0
        for feature in features:
            delta += math.fabs((test_case_zscores[feature] - feature_zscores[author][feature]))
        delta /= len(features)
        if lowest_delta is None or delta < lowest_delta:
            lowest_delta = delta
            pred_author = author
    return pred_author

### Test performance

In [None]:
languages = ["German", "French", "Italian", "Spanish; Castilian"]

In [None]:
test_results = {}
for lang in languages:
    test_results[lang] = {}
    # reduce tokens dataframe to current language
    lang_tokens_df = tokens_df.loc[tokens_df.index.map(lambda x: x[2] == lang)]
    
    # reduce to known authors
    known_authors_tokens_df = lang_tokens_df.loc[lang_tokens_df.index.map(lambda x: x[1] in known_authors)]
    
    # extract test cases
    test_cases_per_author = known_authors_tokens_df.groupby("author").size().apply(lambda x: min(x - 1, 5))
    test_cases = []
    for a, n in test_cases_per_author.items():
        if n > 0:
            test_cases.append(known_authors_tokens_df.loc[known_authors_tokens_df.index.map(lambda x: x[1] == a and x[2] == lang)].sample(n))
    test_cases_df = pd.concat(test_cases)
    print("found {} testcases out of {}".format(len(test_cases_df), len(known_authors_tokens_df)))

    # calculate authorship for test cases
    
    for tc in tqdm(test_cases_df.iteritems(), total=len(test_cases_df)):
        # group by author
        known_authors_combined_tokens_df = known_authors_tokens_df.drop(tc[0]).groupby("author").sum()
        
        # calc z-scores
        features, corpus_features, feature_zscores = calculate_z_scores(known_authors_combined_tokens_df)
        
        # calc authorship
        pred_author = calculate_authorship(tc[1], features, corpus_features, feature_zscores, known_authors_combined_tokens_df.index)
        test_results[lang][tc[0]] = pred_author
    

In [None]:
plot_results = pd.Series()
for lang in languages:
    num_correct = 0
    for i, v in test_results[lang].items():
        if i[1] == v:
            num_correct += 1
    ratio_correct = num_correct / len(test_results[lang])
    plot_results[lang] = ratio_correct

In [None]:
ax = plot_results.plot(kind="bar", title="Stylometry Performance", figsize=(5, 5))
ax.set_ylabel("Ratio of Correctly Detected Authorship")
plt.tight_layout()
plt.savefig("stylometry.pdf")

### Detect anonymous

In [None]:
all_anonymous_df = text_by_file_df.loc[text_by_file_df.index.map(lambda x: x[1] == "Anonym")].copy()
all_anonymous_df["probable_author"] = None

In [None]:
languages = ["German", "French", "Italian", "Spanish; Castilian"]

In [None]:
detection_results = {}
for lang in languages:
    # reduce tokens dataframe to current language
    lang_tokens_df = tokens_df.loc[text_by_file_df.index.map(lambda x: x[2] == lang)]
    
    # reduce to known authors
    known_authors_tokens_df = lang_tokens_df.loc[lang_tokens_df.index.map(lambda x: x[1] in known_authors)]
    
    # group by author
    known_authors_combined_tokens_df = known_authors_tokens_df.groupby("author").sum()
    
    # calcualte z-scores
    features, corpus_features, feature_zscores = calculate_z_scores(known_authors_combined_tokens_df)
    
    # get anonymous works
    anonymous_authors_tokens_df = lang_tokens_df.loc[lang_tokens_df.index.map(lambda x: x[1] == "Anonym")]
    
    # calc authorship for anonymous work
    for uw in tqdm(anonymous_authors_tokens_df.iteritems(), total=len(anonymous_authors_tokens_df)):
        pred_author = calculate_authorship(uw[1], features, corpus_features, feature_zscores, known_authors_combined_tokens_df.index)
        detection_results[uw[0]] = pred_author


In [None]:
for i, v in detection_results.items():
    all_anonymous_df.loc[i, "probable_author"] = v

In [None]:
all_anonymous_df.loc[(" ", "Anonym", "Spanish; Castilian"),]