In [None]:
%load_ext lab_black
from pathlib import Path
import numpy as np
import pandas as pd
from matplotlib import pyplot as plt
from matplotlib import ticker as tck
import seaborn as sns
import ipywidgets
import pydantic
import typing
import datetime
import itertools
import collections
import json
import gzip
import tqdm
import multiprocessing
from efficient_apriori import apriori
import math
import locale

locale.setlocale(locale.LC_ALL, "de_DE")
locale._override_localeconv["thousands_sep"] = "."
locale._override_localeconv["grouping"] = [3, 3, 0]
plt.rcParams["axes.formatter.use_locale"] = True
sns.set_theme(style="ticks")
cm = 1 / 2.54
a4 = 29.7, 42


class InfoboxProperty(pydantic.BaseModel):
    propertyType: typing.Optional[str]
    name: str


class InfoboxChange(pydantic.BaseModel):
    property: InfoboxProperty
    valueValidTo: typing.Optional[datetime.datetime] = None
    currentValue: typing.Optional[str] = None
    previousValue: typing.Optional[str] = None


class User(pydantic.BaseModel):
    username: typing.Optional[str]
    id: typing.Optional[int]


class InfoboxRevision(pydantic.BaseModel):
    revisionId: int
    pageTitle: str
    changes: typing.Sequence[InfoboxChange]
    validFrom: datetime.datetime
    attributes: typing.Optional[typing.Dict[str, str]]
    pageID: int
    revisionType: typing.Optional[str]
    key: str
    template: typing.Optional[str] = None
    position: typing.Optional[int] = None
    user: typing.Optional[User] = None
    validTo: typing.Optional[datetime.datetime] = None


class ChangeBuckets(pydantic.BaseModel):
    filename: str
    changes: typing.Dict[str, typing.Sequence[typing.Hashable]]


def sliding(seq, window_size):
    for i in range(len(seq) - window_size + 1):
        yield seq[i : i + window_size]


def overlapping_groups(groups, window_size):
    return {
        keys[0]: set().union(*(groups[key] for key in keys))
        for keys in sliding(tuple(groups.keys()), window_size)
    }

# Creating Buckets

In [None]:
def process_pageid(file):
    groups = collections.defaultdict(set)
    with open(file) as f:
        revisions = (InfoboxRevision.parse_raw(line) for line in f)
        for revision in revisions:
            groups[revision.validFrom.date().isoformat()].add(revision.pageID)
    return ChangeBuckets(
        filename=file.name,
        changes={k: tuple(sorted(groups[k])) for k in sorted(groups.keys())},
    )


def process_property(file):
    groups = collections.defaultdict(set)
    with open(file) as f:
        revisions = (InfoboxRevision.parse_raw(line) for line in f)
        for revision in revisions:
            groups[revision.validFrom.date().isoformat()].update(
                change.property.name for change in revision.changes
            )
    return ChangeBuckets(
        filename=file.name,
        changes={k: tuple(sorted(groups[k])) for k in sorted(groups.keys())},
    )


def process_template_property(file):
    groups = collections.defaultdict(set)
    with open(file) as f:
        revisions = (InfoboxRevision.parse_raw(line) for line in f)
        for revision in revisions:
            groups[revision.validFrom.date().isoformat()].update(
                (str(revision.template), change.property.name)
                for change in revision.changes
            )
    return ChangeBuckets(
        filename=file.name,
        changes={k: tuple(sorted(groups[k])) for k in sorted(groups.keys())},
    )


def process_page_property(file):
    groups = collections.defaultdict(set)
    with open(file) as f:
        revisions = (InfoboxRevision.parse_raw(line) for line in f)
        for revision in revisions:
            groups[revision.validFrom.date().isoformat()].update(
                (revision.pageID, change.property.name) for change in revision.changes
            )
    return ChangeBuckets(
        filename=file.name,
        changes={k: tuple(sorted(groups[k])) for k in sorted(groups.keys())},
    )


fname = "./changesets-pageid.json.gz"

if not Path(fname).exists():
    groups = collections.defaultdict(set)
    files = [
        x
        for x in sorted(
            Path("../../matched-infoboxes-extracted/").rglob("*.output.json")
        )
        if x.is_file()
    ]
    with multiprocessing.Pool(4) as p:
        imap = p.imap(process_page_property, files)
        for cb in tqdm.tqdm(imap, total=len(files)):
            for k, v in cb.changes.items():
                groups[k].update(v)
    del files
    groups = {k: tuple(sorted(groups[k])) for k in sorted(groups.keys())}
    with open(fname, "wb") as f:
        f.write(
            gzip.compress(
                ChangeBuckets(filename="all", changes=groups)
                .json(indent=None, separators=(",", ":"))
                .encode("utf-8")
            )
        )
else:
    with open(fname, "rb") as f:
        groups = ChangeBuckets.parse_raw(
            gzip.decompress(f.read()).decode("utf-8")
        ).changes

## Min/Max Support Filtering Analysis

In [None]:
freqs = collections.defaultdict(int)
for bucket in groups.values():
    for id in bucket:
        freqs[id] += 1

freqs = pd.DataFrame(freqs.items(), columns=["ID", "Count"])
freqs["Frequency"] = freqs["Count"] / len(groups)
freqs.sort_values("Frequency", ascending=False)

In [None]:
@ipywidgets.interact(
    bins=ipywidgets.IntSlider(
        value=50, min=5, max=100, step=5, continuous_update=False
    ),
    lower=ipywidgets.FloatSlider(
        value=0.001,
        min=0.0,
        max=1.0,
        step=0.001,
        readout_format=".1%",
        continuous_update=False,
    ),
    upper=ipywidgets.FloatSlider(
        value=0.1,
        min=0.0,
        max=1.0,
        step=0.001,
        readout_format=".1%",
        continuous_update=False,
    ),
)
def plot_frequency_hist(bins, lower, upper):
    fig = plt.figure(figsize=(20 * cm, 15 * cm), dpi=100)
    ax = plt.subplot(111)
    freqs["Filtered"] = (freqs["Frequency"] < lower) | (freqs["Frequency"] > upper)
    sns.histplot(
        data=freqs,
        x="Frequency",
        hue="Filtered",
        stat="percent",
        bins=bins,
        multiple="stack",
        log_scale=(False, True),
        ax=ax,
    )
    sns.move_legend(
        ax,
        "center left",
        bbox_to_anchor=(1, 0.5),
        ncol=1,
        frameon=False,
    )
    ax.annotate(
        f"Filtered: {freqs['Filtered'].sum() / len(freqs):.2%} IDs",
        xy=(1, 1),
        xycoords="axes fraction",
        xytext=(0, 0),
        textcoords="offset points",
        ha="right",
        va="top",
    )
    ax.xaxis.set_major_formatter(tck.PercentFormatter(xmax=1))
    ax.yaxis.set_major_formatter(tck.PercentFormatter(xmax=100, decimals=4))
    sns.despine(ax=ax)
    plt.show()

# Apriori Association Rule Mining

In [None]:
n_days = 5
data = tuple(overlapping_groups(groups, n_days).values())
test_size = math.ceil(len(data) * 0.2)
train_data = data[: len(data) - test_size]
test_data = data[len(data) - test_size :]
del data, test_size
itemsets, rules = apriori(
    train_data,
    min_support=0.01 * n_days,
    min_confidence=0.85,
    max_length=4,
)
del n_days

df = (
    pd.DataFrame(
        [
            (
                frozenset(rule.lhs),
                frozenset(rule.rhs),
                rule.confidence,
                rule.support,
                rule.lift,
                rule.conviction,
            )
            for rule in rules
        ],
        columns=["LHS", "RHS", "Confidence", "Support", "Lift", "Conviction"],
    )
    .set_index(["LHS", "RHS"])
    .sort_index()
)
display(df.describe().T.style.format("{:.2f}"))
display(df.sort_values("Lift", ascending=False))

# Evaluation per Association Rule

In [None]:
for i in df.itertuples():
    d = {(False, False): 0, (False, True): 0, (True, False): 0, (True, True): 0}
    for s in test_data:
        d[(i.Index[1] <= s, i.Index[0] <= s)] += 1
    df.loc[i.Index, "TN"] = d[(False, False)]
    df.loc[i.Index, "FP"] = d[(False, True)]
    df.loc[i.Index, "FN"] = d[(True, False)]
    df.loc[i.Index, "TP"] = d[(True, True)]
df[["TP", "FP", "TN", "FN"]] = df[["TP", "FP", "TN", "FN"]].astype(int)
df["Precision"] = (df["TP"] / (df["TP"] + df["FP"])).fillna(0)
df["Recall"] = (df["TP"] / (df["TP"] + df["FN"])).fillna(0)
df["F1"] = (
    2 * (df["Precision"] * df["Recall"]) / (df["Precision"] + df["Recall"])
).fillna(0)
df["Accuracy"] = (
    (df["TP"] + df["TN"]) / df[["TP", "FP", "TN", "FN"]].sum(axis=1)
).fillna(0)

df.sort_values(["Precision", "F1", "Recall", "Accuracy"], ascending=False)

In [None]:
@ipywidgets.interact(
    x=ipywidgets.ToggleButtons(options=["Confidence", "Support", "Lift"]),
    y=ipywidgets.ToggleButtons(options=["Precision", "Recall", "F1", "Accuracy"]),
    support=ipywidgets.FloatRangeSlider(
        value=(0.0, 1.0),
        min=0.0,
        max=1.0,
        step=0.001,
        readout_format=".1%",
        continuous_update=False,
    ),
    confidence=ipywidgets.FloatRangeSlider(
        value=(0.0, 1.0),
        min=0.0,
        max=1.0,
        step=0.001,
        readout_format=".1%",
        continuous_update=False,
    ),
    q=ipywidgets.IntSlider(value=4, min=1, max=10, step=1, continuous_update=False),
)
def plot_correlation(x, y, support, confidence, q):
    temp = df[
        df["Support"].between(*support) & df["Confidence"].between(*confidence)
    ].copy()
    g = sns.jointplot(
        x=x,
        y=y,
        data=temp,
        kind="hist",
        height=20 * cm,
        ax=ax,
    )
    g.ax_joint.set_xlim(temp[x].min(), temp[x].max())
    if x in {"Confidence", "Support"}:
        g.ax_joint.xaxis.set_major_formatter(tck.PercentFormatter(xmax=1))
    g.ax_joint.set_ylim(0, 1)
    g.ax_joint.yaxis.set_major_formatter(tck.PercentFormatter(xmax=1))
    plt.show()
    temp[f"{y} Interval"] = pd.cut(
        temp[y],
        q,
        labels=[f"≤ {x[1]:.0%}" for x in pd.interval_range(0, 1, q).to_tuples()],
    )
    g = sns.jointplot(
        x=x,
        y=y,
        hue=f"{y} Interval",
        palette=sns.color_palette("mako", n_colors=q),
        data=temp,
        kind="scatter",
        height=20 * cm,
        ax=ax,
        marginal_kws={"common_norm": False},
    )
    g.ax_joint.set_xlim(temp[x].min(), temp[x].max())
    if x in {"Confidence", "Support"}:
        g.ax_joint.xaxis.set_major_formatter(tck.PercentFormatter(xmax=1))
    g.ax_joint.set_ylim(0, 1)
    g.ax_joint.yaxis.set_major_formatter(tck.PercentFormatter(xmax=1))
    plt.show()

# Evaluation per Predictor

In [None]:
df2 = (
    df.reset_index()[["LHS", "RHS"]]
    .groupby("RHS")["LHS"]
    .apply(lambda lhs: tuple(sorted(set(lhs))))
    .reset_index()
    .set_index("RHS")
    .sort_index()
)

for i in df2.itertuples(index=True):
    d = {(False, False): 0, (False, True): 0, (True, False): 0, (True, True): 0}
    for s in test_data:
        d[(i.Index in s, all((j in s) for j in i.LHS))] += 1
    df2.loc[i.Index, "TN"] = d[(False, False)]
    df2.loc[i.Index, "FP"] = d[(False, True)]
    df2.loc[i.Index, "FN"] = d[(True, False)]
    df2.loc[i.Index, "TP"] = d[(True, True)]
df2[["TP", "FP", "TN", "FN"]] = df2[["TP", "FP", "TN", "FN"]].astype(int)
df2["Precision"] = (df2["TP"] / (df2["TP"] + df2["FP"])).fillna(0)
df2["Recall"] = (df2["TP"] / (df2["TP"] + df2["FN"])).fillna(0)
df2["F1"] = (
    2 * (df2["Precision"] * df2["Recall"]) / (df2["Precision"] + df2["Recall"])
).fillna(0)
df2["Accuracy"] = (
    (df2["TP"] + df2["TN"]) / df2[["TP", "FP", "TN", "FN"]].sum(axis=1)
).fillna(0)

display(df2.sort_values(["F1", "Precision", "Recall", "Accuracy"], ascending=False))

print(
    "Predictor totals:",
    f'Precision: {locale.format_string("%.2f%%", 100*df2["TP"].sum() / (df2["TP"].sum() + df2["FP"].sum()), True)}',
    f'Recall: {locale.format_string("%.2f%%", 100*df2["TP"].sum() / (df2["TP"].sum() + df2["FN"].sum()), True)}',
    f'F1: {locale.format_string("%.2f%%", 100*2*(df2["TP"].sum() / (df2["TP"].sum() + df2["FP"].sum())*df2["TP"].sum() / (df2["TP"].sum() + df2["FN"].sum()))/(df2["TP"].sum() / (df2["TP"].sum() + df2["FP"].sum())+df2["TP"].sum() / (df2["TP"].sum() + df2["FN"].sum())), True)}',
    f'Accuracy: {locale.format_string("%.2f%%", 100*(df2["TP"].sum() + df2["TN"].sum()) / df2[["TP", "FP", "TN", "FN"]].sum().sum(), True)}',
    sep="\n",
)

In [None]:
changes_predictable = (
    df2[["TP", "FP", "TN", "FN"]].sum().sum()
)  # == len(df2) * len(test_data)
total_changes = sum(len(i) for i in test_data)
print(
    f'Changes predictable: {locale.format_string("%d", changes_predictable, True)}',
    f'Changes happened: {locale.format_string("%d", total_changes, True)}',
    f'--> {locale.format_string("%.2f%%", 100 * changes_predictable / total_changes, True)}',
    sep="\n",
)