# Baseline Comparison & Evaluation

This notebook recreates the results of the baseline comparison and evaluation in the paper.

The previous notebook must be ran first to generate the necessary corpora, etc., for this notebook to run.

In [None]:
import collections as cll
import datetime as dt
import gc
import gzip
import itertools as it
import multiprocessing as mp
import multiprocessing.pool as mp_pool
import os
import pickle
import random
import sys
import typing as T

import numpy as np
import numpy.typing as T_np

import sklearn.metrics as metrics
import sklearn.utils

from sklearn.preprocessing import MultiLabelBinarizer
from iterstrat.ml_stratifiers import MultilabelStratifiedKFold

Global notebook options and variables.

In [None]:
# The path containing the corpora and other artefacts from the previous Jupyter
# notebook.
ARTEFACTS_PATH = "./Artefacts/"

# This is the seed we used for our experiments. If you wish to change this,
# then be sure to delete any cached results you may have already generated.
# It will have no serious impact on the results at all.
SEED = 463

K_FOLDS = 5  # This controls the number of folds used in the cross-validation.
K_FOLD_LIMIT = K_FOLDS  # A debug option to investigate one fold or few folds.
TOP_CLASSES: T.Optional[int] = 10  # Top-supported classes outputted (summary).
AVERAGING_METHODS: T.Tuple[  # The paper used both `macro` and `micro`.
    T.Literal["macro", "micro", "weighted", "samples"], ...
] = ("macro", "micro", "weighted", "samples")

# Useful for debugging the pipelines with a small slice of data.
SAMPLE_LIMIT: T.Optional[slice] = None

# Our experiments were conducted with the majority class trimmed to the size of
# the second largest class. This can be disabled, but it is most useful when
# the majority class dwarfs all other classes.
TRIM_MAJORITY_CLASS = True

# While we tried numerous classifiers, we reported k-NN results due to its
# simplicity and acceptable performance for how little hyperparameter tuning it
# requires. That said, we found XGBoost to outperform k-NN by a small but
# noticeable margin, and excluded it from the paper due to space constraints.
# See the `build_classifier` function for hyperparameter details.
CLASSIFIER_NAME: T.Literal[
    "k-NN",  # 10-NN with distance weighting in the paper.
    "XGBoost",
    "Dummy",  # Not to be confused with the "Dummy" approach in the paper.
] = "k-NN"

# In case a crash happens, we don't need to redo a fold as long as the seed is
# identical.
USE_LAST_CACHED_SUMMARY = True

# Type aliases.
Score = T.Dict[str, T.Optional[float]]
ClassScores = T.Dict[str, Score]
Scores = T.Dict[str, T.Union[T.Optional[float], Score, ClassScores]]
Summary = T.Dict[str, T.Union[str, int, dt.datetime, Scores]]

# An extremely poor attempt at taking advantage of Copy-on-Write behaviour in
# GNU/Linux, as multiprocessing is used; thus, `fork()` is involved. Reference
# counting often defeats this. These are populated during preparation of
# cross-validation for a run of an approach.
x_train_folds: T.List[T.List[T.Sequence[T.Any]]] = []
y_train_folds: T.List[T.List[T.Sequence[T.Any]]] = []
x_test_folds: T.List[T.List[T.Sequence[T.Any]]] = []
y_test_folds: T.List[T.List[T.Sequence[T.Any]]] = []

# Resolves the set bit of a label (our samples have multiple labels) to the
# actual label name (IXP Scrubber filtering rule hashes in the paper's case).
column_label_targets: T.List[str] = []

Helper functions.

In [None]:
# This is a hack to make the default multiprocessing pool not spawn daemon
# processes; hence, we can then use pools within child processes made by a
# pool. Will cause a lot of zombie processes or fail to exit cleanly if
# something ancestral crashes, as they're no longer marked daemonic.
class NoDaemonProcess(mp.Process):
    @property
    def daemon(self):
        return False

    @daemon.setter
    def daemon(self, _):
        pass


class CustomPool(mp_pool.Pool):  # See above.
    def Process(self, *args, **kwargs):
        proc = super().Process(*args, **kwargs)  # type: ignore
        proc.__class__ = NoDaemonProcess
        return proc


LOG_LOCK = mp.Lock()


def log(*args, **kwargs):
    with LOG_LOCK:
        print(
            f"[{dt.datetime.now().strftime('%d-%m %I:%M:%S %p')}]:",
            *args,
            **kwargs,
        )


def load_pickle(path: str, compressed=True) -> T.Any:
    try:
        gc.disable()
        if compressed:
            assert path.endswith(".gz")
            with gzip.open(path, "rb") as file:
                return pickle.load(file)
        else:
            with open(path, "rb") as file:
                return pickle.load(file)
    except Exception as exception:
        raise exception from None
    finally:
        gc.enable()


def save_pickle(object: T.Any, path: str, compress=True):
    if compress:
        assert path.endswith(".gz")
        with gzip.open(path, "wb", compresslevel=6) as file:
            pickle.dump(object, file, protocol=pickle.HIGHEST_PROTOCOL)
    else:
        with open(path, "wb") as file:
            pickle.dump(object, file, protocol=pickle.HIGHEST_PROTOCOL)


def average_scores(
    fold_scores: T.Sequence[Scores],
    class_labels: T.Optional[T.Union[int, T.Sequence[str]]] = None,
) -> T.Dict[str, T.Tuple[float, float]]:
    averaged_scores: T.DefaultDict[str, T.List[float]] = cll.defaultdict(list)
    for average in AVERAGING_METHODS:
        average = average.title()
        for scores in fold_scores:
            for metric, value in scores[
                f"Average ({average})"
            ].items():  # type: ignore
                if isinstance(value, float):
                    averaged_scores[f"[{average}] {metric}"].append(value)

    if class_labels:
        if isinstance(class_labels, int):
            top_classes: T.DefaultDict[str, float] = cll.defaultdict(float)
            for scores in fold_scores:
                class_scores: ClassScores = scores["Classes"]  # type: ignore
                for class_label, metrics in class_scores.items():
                    if "Support" in metrics and isinstance(
                        metrics["Support"], (int, float)
                    ):
                        top_classes[class_label] += float(metrics["Support"])
            class_labels = tuple(
                class_label[0]
                for class_label in sorted(
                    top_classes.items(), key=lambda x: x[1], reverse=True
                )[0:class_labels]
            )

        for scores in fold_scores:
            class_scores: ClassScores = scores["Classes"]  # type: ignore
            for class_label in class_labels:
                if class_label in class_scores:
                    for metric, value in class_scores[class_label].items():
                        if isinstance(value, float):
                            averaged_scores[
                                f"|{class_label}| {metric}"
                            ].append(value)

    return {
        metric: (float(np.mean(values)), float(np.std(values)))
        for metric, values in averaged_scores.items()
    }


def print_scores(averaged_scores: T.Dict[str, T.Tuple[float, float]]):
    prefixes = ("[", "|")
    for prefix in (*prefixes, None):
        previous: T.Optional[str] = None
        for metric, score in averaged_scores.items():
            if (
                prefix is None and metric[0] not in prefixes
            ) or prefix == metric[0]:
                current = metric.split(" ", 1)[0]
                if previous is None or current != previous:
                    print("\t" + "-" * 63)
                    previous = current

                metric += ":"
                print(f"\t{metric:40s}\t{score[0]:.4f} ± {score[1]:.4f}")
    print("\t" + "-" * 63)


def print_summaries(
    summaries: T.Iterable[Summary],
    class_labels: T.Optional[T.Union[int, T.Sequence[str]]] = None,
    include_training=True,
):
    print()
    for data_set in (
        ("Training", "Testing") if include_training else ("Testing",)
    ):
        log(f"{data_set} scores:")
        print_scores(
            average_scores(
                tuple(
                    summary[f"{data_set} Scores"]  # type: ignore
                    for summary in summaries
                ),
                class_labels,
            )
        )
        print()

    for time_name, calculator in (
        ("Training", lambda x: x["End Fit Transform Time"] - x["Start Time"]),
        ("Testing", lambda x: x["End Time"] - x["Start Predict Score Time"]),
    ):
        fold_times = tuple(map(calculator, summaries))
        for index, fold_time in enumerate(fold_times, start=1):
            print(f"{time_name} fold {index} time: {fold_time}")
        print(
            f"{time_name} average fold time:",
            dt.timedelta(
                seconds=float(
                    np.mean(
                        [fold_time.total_seconds() for fold_time in fold_times]
                    )
                )
            ),
        )

In [None]:
def label_counts(
    y_tags: T.List[str],
    labels: T.Dict[str, T.Dict[T.Optional[str], int]],
    print_top_n: T.Optional[int] = None,
):
    counts: T.DefaultDict[T.Optional[str], int] = cll.defaultdict(int)
    for tag in y_tags:
        for label in labels[tag]:
            counts[label] += 1
    sorted_counts = sorted(  # Highest to lowest.
        counts.items(), key=lambda x: x[1], reverse=True
    )
    if print_top_n is not None:
        for index, (label, count) in enumerate(sorted_counts):
            if index >= print_top_n:
                print(
                    "Rest of the labels "
                    f"({len(counts) - print_top_n}/{len(counts)}) not shown: "
                    f"{sum(i[1] for i in sorted_counts[print_top_n:]):,}",
                )
                break
            if label is None:
                label = "-"
            print(f'Label "{label}" occurs {count:,} times.')

    return dict(counts), sorted_counts


def trimmer(
    x: T.List[T.Any],
    y_tags: T.List[str],
    labels: T.Dict[str, T.Dict[T.Optional[str], int]],
    random_seed: int = SEED,
):
    x, y_tags = sklearn.utils.shuffle(
        x, y_tags, random_state=random_seed
    )  # type: ignore

    def match_counts():
        no_matches = 0
        some_matches = 0
        all_matches = 0
        for tag in y_tags:
            if None in labels[tag]:
                if len(labels[tag]) > 1:
                    some_matches += 1
                else:
                    no_matches += 1
            elif labels[tag]:
                all_matches += 1

        print(f"{no_matches:,} documents with no filtering rule matches.")
        print(f"{some_matches:,} documents with some filtering rule matches.")
        print(f"{all_matches:,} documents with only filtering rule matches.")
        return no_matches, some_matches, all_matches

    log("Before trimming:")
    no_matches, some_matches, all_matches = match_counts()
    counts, sorted_counts = label_counts(
        y_tags, labels, print_top_n=TOP_CLASSES
    )

    # Remove classes with truly insufficient support. Ideally, we would remove
    # even more of these ultra rare classes to prevent the macro-averaged
    # metric scores from heavily decreasing based on these miniscule
    # appearances, but we keep them for the sake of completeness.
    pop_off = 0
    for label, count in reversed(sorted_counts):
        if count >= K_FOLDS * 2:
            break
        pop_off += 1
        print(
            f'"{label}" has less than {K_FOLDS * 2} support,',
            "substituting it with the null rule.",
        )
        for tag in y_tags:
            tag_labels = labels[tag]
            if label in tag_labels:
                try:
                    tag_labels[None] += tag_labels[label]
                except KeyError:
                    tag_labels[None] = tag_labels[label]
                del tag_labels[label]
        del counts[label]

    try:
        for _ in range(pop_off):
            sorted_counts.pop()
    except IndexError as exception:
        raise ValueError("All classes lack enough support.") from exception

    log(f"Label count mean: {np.mean(list(counts.values())):.2f}")
    log(f"Label count median: {np.median(list(counts.values())):.2f}")

    # NOTE: You can remove this if you still wish to trim the majority class.
    # We expect that the majority of documents will match no filtering rules.
    if no_matches < some_matches and no_matches < all_matches:
        log("No-filtering-rule documents is the minority.")
        # This should rarely occur (if ever) for a decently long dataset slice
        # based on the current IXP Scrubber rules, at least in our private IXP
        # dataset.
        return (
            x,
            y_tags,
            [
                top[0] if top[0] is not None else "-"
                for top in sorted_counts[0:TOP_CLASSES]
            ],
        )

    first_label, first_count = sorted_counts[0]
    second_label, second_count = sorted_counts[1]

    # First try removing the documents with only the largest label.
    index = 0
    while first_count > second_count and index < len(y_tags):
        tag = y_tags[index]
        tag_labels = labels[tag]
        if first_label in tag_labels and len(tag_labels) == 1:
            x[index] = None
            y_tags[index] = None  # type: ignore
            del labels[tag]
            first_count -= 1
        index += 1

    # Then try removing the documents with only the largest label and the next
    # largest label (if the previous step still includes the imbalance).
    index = 0
    while first_count > second_count and index < len(y_tags):
        tag = y_tags[index]
        if tag is None:
            index += 1
            continue  # Was removed in the previous step.
        tag_labels = labels[tag]
        if (
            first_label in tag_labels
            and second_label in tag_labels
            and len(tag_labels) == 2
        ):
            x[index] = None
            y_tags[index] = None  # type: ignore
            del labels[tag]
            first_count -= 1
            second_count -= 1
        index += 1

    # If the previous steps still include the imbalance, then try removing any
    # documents with the largest label until it is equal in support with the
    # second/next largest label.
    index = 0
    while first_count > second_count and index < len(y_tags):
        tag = y_tags[index]
        if tag is None:
            index += 1
            continue
        tag_labels = labels[tag]
        if first_label in tag_labels:
            x[index] = None
            y_tags[index] = None  # type: ignore
            del labels[tag]
            first_count -= 1
            if second_label in tag_labels:
                second_count -= 1
        index += 1

    x = [i for i in x if i is not None]
    y_tags = [i for i in y_tags if i is not None]

    log("After trimming:")
    match_counts()

    return (
        x,
        y_tags,
        [
            top[0] if top[0] is not None else "-"
            for top in label_counts(y_tags, labels, print_top_n=TOP_CLASSES)[
                1
            ][0:TOP_CLASSES]
        ],
    )


def build_binariser(
    y_tags: T.Iterable[str],
    labels: T.Dict[str, T.Dict[T.Optional[str], int]],
) -> T.Tuple[MultiLabelBinarizer, T_np.NDArray[T.Any]]:
    binariser = MultiLabelBinarizer(sparse_output=False)
    binarised_labels = binariser.fit_transform(
        (label if label is not None else "-" for label in labels[tag])
        for tag in y_tags
    )
    return binariser, binarised_labels  # type: ignore


def scorer(
    y_truth,
    y_predictions,
    average: T.Literal["macro", "micro", "samples", "weighted"] = "weighted",
) -> Scores:
    class_scores: ClassScores = {label: {} for label in column_label_targets}

    for index, (precision, recall, f1_score, support) in enumerate(
        zip(
            *np.asarray(
                metrics.precision_recall_fscore_support(
                    y_truth, y_predictions, average=None, zero_division=0
                )
            )
        )
    ):
        class_scores[column_label_targets[index]].update(
            {
                "Precision": precision,
                "Recall": recall,
                "F1 Score": f1_score,
                "Support": support,
            }
        )

    for index, jaccard_score in enumerate(
        np.asarray(
            metrics.jaccard_score(
                y_truth, y_predictions, average=None, zero_division=0
            )
        )
    ):
        class_scores[column_label_targets[index]][
            "Jaccard Score"
        ] = jaccard_score

    for index, confusion_matrix in enumerate(
        metrics.multilabel_confusion_matrix(y_truth, y_predictions)
    ):
        class_scores[column_label_targets[index]].update(
            {
                "False Negatives": confusion_matrix[1][0],
                "True Negatives": confusion_matrix[0][0],
                "False Positives": confusion_matrix[0][1],
                "True Positives": confusion_matrix[1][1],
            }
        )

    scores: Scores = {"Classes": class_scores}

    # These are repeated within the averaged score computations for simplicity.
    exact_match_ratio = float(metrics.accuracy_score(y_truth, y_predictions))
    hamming_loss = float(metrics.hamming_loss(y_truth, y_predictions))

    for average in AVERAGING_METHODS:
        (
            precision,
            recall,
            f1_score,
            _,
        ) = metrics.precision_recall_fscore_support(
            y_truth, y_predictions, average=average, zero_division=0
        )

        scores[f"Average ({average.title()})"] = {
            "Precision": float(precision),
            "Recall": float(recall),
            "F1 Score": float(f1_score),
            "Jaccard Score": float(
                metrics.jaccard_score(
                    y_truth,
                    y_predictions,
                    average=average,
                    zero_division=0,
                )
            ),
            "Exact Match Ratio": exact_match_ratio,
            "Hamming Loss": hamming_loss,
        }

    return scores


def load_cached_summary(name: str, fold_index: int) -> T.Optional[Summary]:
    summary_file_prefix = f"{name}_fold-{fold_index}_summary."
    cached_summaries = [
        (
            int(file[len(summary_file_prefix) : -10]),
            os.path.join(ARTEFACTS_PATH, file),
        )
        for file in os.listdir(ARTEFACTS_PATH)
        if file.startswith(summary_file_prefix) and file.endswith(".pickle.gz")
    ]

    if cached_summaries:  # Get the latest one.
        return load_pickle(max(cached_summaries, key=lambda x: x[0])[1])


def load_cached_run(name: str):
    summaries: T.List[Summary] = []
    for fold_index in range(K_FOLDS):
        summary = load_cached_summary(name, fold_index)
        if summary is not None:
            summaries.append(summary)
    return summaries


def training_and_testing_scores(
    name: str,
    fold_index: int,
    transformer,
    classifier,
    transformer_fit_transform_args=[],
    transformer_fit_transform_kwargs={},
    classifier_fit_args=[],
    classifier_fit_kwargs={},
) -> Summary:
    if USE_LAST_CACHED_SUMMARY:
        cached_summary = load_cached_summary(name, fold_index)
        if cached_summary:
            log(f"WARNING: Using cached summary for fold {fold_index + 1}.")
            return cached_summary

    start_time = dt.datetime.now(tz=dt.timezone.utc)

    x_train = (
        transformer.fit_transform(
            x_train_folds[fold_index],
            y_train_folds[fold_index],
            *transformer_fit_transform_args,
            **transformer_fit_transform_kwargs,
        )
        if transformer is not None
        else x_train_folds[fold_index]
    )

    classifier.fit(
        x_train,
        y_train_folds[fold_index],
        *classifier_fit_args,
        **classifier_fit_kwargs,
    )

    end_fit_transform_time = dt.datetime.now(tz=dt.timezone.utc)

    training_scores = scorer(
        y_train_folds[fold_index], classifier.predict(x_train)
    )

    start_predict_score_time = dt.datetime.now(tz=dt.timezone.utc)

    testing_scores = scorer(
        y_test_folds[fold_index],
        classifier.predict(
            transformer.transform(x_test_folds[fold_index])
            if transformer is not None
            else x_test_folds[fold_index]
        ),
    )

    end_time = dt.datetime.now(tz=dt.timezone.utc)

    summary: Summary = {
        "Name": name,
        "Fold": fold_index,
        "Classifier": CLASSIFIER_NAME,
        "Start Time": start_time,
        "End Fit Transform Time": end_fit_transform_time,
        "Start Predict Score Time": start_predict_score_time,
        "End Time": end_time,
        "Training Scores": training_scores,
        "Testing Scores": testing_scores,
    }

    # This is just in case something goes wrong with the Jupyter kernel. It
    # caches the results of the current fold so that we can load it again in
    # case of a crash, or we can simply reload the results.
    try:
        save_pickle(
            summary,
            os.path.join(
                ARTEFACTS_PATH,
                (
                    f"{name}_fold-{fold_index}_summary."
                    f"{round(dt.datetime.now().timestamp())}.pickle.gz"
                ),
            ),
        )
    except Exception as exception:
        log(f"Error saving scores: {exception}", file=sys.stderr)

    return summary


def cross_validator(
    x,
    y,
    binariser: MultiLabelBinarizer,
    classification_function: T.Callable[[int], Summary],
    simultaneous_folds: T.Optional[int] = K_FOLDS,
    fold_limit: int = K_FOLD_LIMIT,
    worker_mode: T.Literal["thread", "process"] = "process",
):
    if SAMPLE_LIMIT is not None:
        log(
            f"Warning: Using a sample limit: '{SAMPLE_LIMIT}'.",
            file=sys.stderr,
        )

    x_train_folds.clear()
    y_train_folds.clear()

    x_test_folds.clear()
    y_test_folds.clear()

    column_label_targets.clear()
    column_label_targets.extend(binariser.classes_)

    random.seed(SEED)
    np.random.seed(SEED)

    splitter = MultilabelStratifiedKFold(
        n_splits=K_FOLDS, shuffle=True, random_state=SEED
    )

    fold_count = 0
    for train_indexes, test_indexes in splitter.split(x, y):
        log(f"Splitting fold {fold_count + 1}...")

        x_train_folds.append([x[index] for index in train_indexes])
        y_train_folds.append([y[index] for index in train_indexes])

        x_test_folds.append([x[index] for index in test_indexes])
        y_test_folds.append([y[index] for index in test_indexes])

        fold_count += 1
        if fold_count >= fold_limit:
            break

    log("Beginning classification...")
    if simultaneous_folds and simultaneous_folds > 1 and fold_count > 1:
        if worker_mode == "process":
            with CustomPool(simultaneous_folds, maxtasksperchild=1) as pool:
                return pool.map(
                    classification_function, range(fold_count), chunksize=1
                )
        elif worker_mode == "thread":
            with mp_pool.ThreadPool(simultaneous_folds) as pool:
                return pool.map(
                    classification_function, range(fold_count), chunksize=1
                )
        else:
            raise ValueError(f"Unknown worker mode: {worker_mode}")
    else:
        return list(map(classification_function, range(fold_count)))


def build_classifier():
    log(f'Using classifier "{CLASSIFIER_NAME}".')

    if CLASSIFIER_NAME == "k-NN":
        from sklearn.neighbors import KNeighborsClassifier

        return KNeighborsClassifier(
            n_neighbors=10, n_jobs=-1, weights="distance"
        )
    elif CLASSIFIER_NAME == "XGBoost":
        from xgboost import XGBClassifier

        return XGBClassifier(tree_method="hist", n_jobs=-1, random_state=SEED)
    elif CLASSIFIER_NAME == "Dummy":
        from sklearn.dummy import DummyClassifier

        return DummyClassifier(random_state=SEED)

    raise ValueError(f"Unknown classifier: {CLASSIFIER_NAME}")

# Dummy
Always predicts the null rule label "-" (no filtering rules matched) regardless of the input.

In [None]:
DUMMY_LABELS_PATH = os.path.join(
    ARTEFACTS_PATH, "month.baseline.labels.pickle.gz"
)

In [None]:
log("Loading labels...")
labels: T.Dict[str, T.Dict[T.Optional[str], int]] = load_pickle(
    DUMMY_LABELS_PATH
)
log("Loaded labels.")

In [None]:
log("Organising data and labels...")
x: T.List[T.Any] = [True for _ in range(len(labels))]
y_tags: T.List[str] = list(labels)

if SAMPLE_LIMIT is not None:
    log(f"Warning: Using a sample limit: '{SAMPLE_LIMIT}'.", file=sys.stderr)
    x = x[SAMPLE_LIMIT]
    y_tags = y_tags[SAMPLE_LIMIT]

if TRIM_MAJORITY_CLASS:
    log("Trimming...")
    x, y_tags, top_classes = trimmer(x, y_tags, labels)
else:
    top_classes = TOP_CLASSES

binariser, y = build_binariser(y_tags, labels)
log("Organised data and labels.")

In [None]:
def dummy_classification(fold_index: int) -> Summary:
    from sklearn.dummy import DummyClassifier

    return training_and_testing_scores(
        "Dummy",
        fold_index,
        None,
        DummyClassifier(
            strategy="constant",
            constant=np.array(
                [int(label == "-") for label in column_label_targets]
            ),
            random_state=SEED,
        ),
    )


dummy_fold_summaries = cross_validator(x, y, binariser, dummy_classification)

print_summaries(dummy_fold_summaries, top_classes)

# Baseline Heuristic

In [None]:
BASELINE_DATA_PATH = os.path.join(
    ARTEFACTS_PATH, "month.baseline.data.pickle.gz"
)

BASELINE_LABELS_PATH = os.path.join(
    ARTEFACTS_PATH, "month.baseline.labels.pickle.gz"
)

# This is based on the domain knowledge we specify in the paper.
AMPLIFICATION_PORTS = (
    19,
    53,
    69,
    88,
    123,
    137,
    138,
    139,
    161,
    389,
    853,
    5353,
    11211,
)

AMPLIFICATION_SOURCE_PORTS = tuple(  # Note that this is post-amplification.
    map(lambda port: f"{port}->", AMPLIFICATION_PORTS)
)

# We excluded port zero for this "generic" range, as it is technically invalid.
GENERIC_SYSTEM_SOURCE_PORTS = frozenset(
    map(
        lambda port: f"{port}->",
        (port for port in range(1, 1024) if port not in AMPLIFICATION_PORTS),
    )
)

GENERIC_USER_SOURCE_PORTS = frozenset(
    map(
        lambda port: f"{port}->",
        (
            port
            for port in range(1024, 49152)
            if port not in AMPLIFICATION_PORTS
        ),
    )
)

GENERIC_DYNAMIC_SOURCE_PORTS = frozenset(
    map(
        lambda port: f"{port}->",
        (
            port
            for port in range(49152, 65536)
            if port not in AMPLIFICATION_PORTS
        ),
    )
)

In [None]:
log("Loading data...")
data: T.Dict[str, T.Dict[str, T.Tuple[int, int, int]]] = load_pickle(
    BASELINE_DATA_PATH
)
log("Loaded data.")

log("Loading labels...")
labels: T.Dict[str, T.Dict[T.Optional[str], int]] = load_pickle(
    BASELINE_LABELS_PATH
)
log("Loaded labels.")

In [None]:
log("Organising data and labels...")
x: T.List[T_np.NDArray[T.Any]] = []
y_tags: T.List[str] = []

log("Transforming...")
for tag, tag_counts in data.items():
    x_tag: T.List[int] = []

    for port_range in (AMPLIFICATION_SOURCE_PORTS,):
        for port in port_range:
            x_tag.extend(tag_counts.get(port, (0, 0, 0)))

    for port_range in (
        GENERIC_SYSTEM_SOURCE_PORTS,
        GENERIC_USER_SOURCE_PORTS,
        GENERIC_DYNAMIC_SOURCE_PORTS,
    ):
        flow_count = 0
        byte_count = 0
        packet_count = 0
        for port, port_counts in tag_counts.items():
            if port in port_range:
                flow_count += port_counts[0]
                byte_count += port_counts[1]
                packet_count += port_counts[2]
        x_tag.extend((flow_count, byte_count, packet_count))

    x.append(np.array(x_tag, dtype=np.float64))
    y_tags.append(tag)
log("Transformed.")

if SAMPLE_LIMIT is not None:
    log(f"Warning: Using a sample limit: '{SAMPLE_LIMIT}'.", file=sys.stderr)
    x = x[SAMPLE_LIMIT]
    y_tags = y_tags[SAMPLE_LIMIT]

if TRIM_MAJORITY_CLASS:
    log("Trimming...")
    x, y_tags, top_classes = trimmer(x, y_tags, labels)
else:
    top_classes = TOP_CLASSES

binariser, y = build_binariser(y_tags, labels)
log("Organised data and labels.")

In [None]:
def baseline_classification(fold_index: int) -> Summary:
    return training_and_testing_scores(
        "Baseline", fold_index, None, build_classifier()
    )


baseline_fold_summaries = cross_validator(
    x, y, binariser, baseline_classification
)

print_summaries(baseline_fold_summaries, top_classes)

# Word2Vec

In [None]:
import gensim

from gensim.models.callbacks import CallbackAny2Vec

In [None]:
WORD2VEC_CORPUS_PATH = os.path.join(
    ARTEFACTS_PATH, "month.word2vec.data.pickle.gz"
)

WORD2VEC_MODEL_PATH = os.path.join(ARTEFACTS_PATH, "month.word2vec.model")

WORD2VEC_LABELS_PATH = os.path.join(
    ARTEFACTS_PATH, "month.word2vec.labels.pickle.gz"
)

VECTOR_LENGTH = 100
CONTEXT_WINDOW_SIZE = 10
EPOCHS = 15
SKIPGRAM_LEARNING_MODEL = True
NEGATIVE_SAMPLES = 5  # 5 is the Gensim default.
NEGATIVE_SAMPLING_EXPONENT = 0.75  # 0.75 is the Gensim default.

In [None]:
if not os.path.exists(WORD2VEC_MODEL_PATH):
    log("Loading corpus...")
    corpus: T.Dict[str, T.List[str]] = load_pickle(WORD2VEC_CORPUS_PATH)
    log("Loaded corpus.")

    class EpochsCallback(CallbackAny2Vec):
        def __init__(self):
            self.epoch = 1

        def on_epoch_end(self, model):
            loss = model.get_latest_training_loss()
            if loss != 0.0:
                log(f"Epoch {self.epoch} completed, loss is {loss:.3f}.")
            else:
                log(f"Epoch {self.epoch} completed, loss is unavailable.")
            self.epoch += 1

    log("Generating Word2Vec model...")
    model = gensim.models.Word2Vec(
        sentences=iter(corpus.values()),
        window=CONTEXT_WINDOW_SIZE,
        epochs=EPOCHS,
        sg=int(SKIPGRAM_LEARNING_MODEL),
        ns_exponent=NEGATIVE_SAMPLING_EXPONENT,
        negative=NEGATIVE_SAMPLES,
        hs=int(NEGATIVE_SAMPLES == 0),
        min_count=1,
        max_vocab_size=None,
        compute_loss=True,
        workers=min(os.cpu_count() or 8, 16),
        callbacks=[EpochsCallback()],
        seed=SEED,
    )

    log("Generated Word2Vec model. Saving...")
    model.save(WORD2VEC_MODEL_PATH)
    log("Saved Word2Vec model.")
else:
    log("Loading model...")
    model = gensim.models.Word2Vec.load(WORD2VEC_MODEL_PATH)
    log("Loaded model.")

log("Loading labels...")
labels: T.Dict[str, T.Dict[T.Optional[str], int]] = load_pickle(
    WORD2VEC_LABELS_PATH
)
log("Loaded labels.")

In [None]:
log("Organising data and labels...")
x: T.List[T_np.NDArray[T.Any]] = []
y_tags: T.List[str] = []

for tag, tag_labels in labels.items():
    try:
        x.append(model.wv[tag])
        y_tags.append(tag)
    except KeyError:
        log(f"Untrained tag (word) '{tag}' and labels: {tag_labels}")

if SAMPLE_LIMIT is not None:
    log(f"Warning: Using a sample limit: '{SAMPLE_LIMIT}'.", file=sys.stderr)
    x = x[SAMPLE_LIMIT]
    y_tags = y_tags[SAMPLE_LIMIT]

if TRIM_MAJORITY_CLASS:
    log("Trimming...")
    x, y_tags, top_classes = trimmer(x, y_tags, labels)
else:
    top_classes = TOP_CLASSES

binariser, y = build_binariser(y_tags, labels)
log("Organised data and labels.")

In [None]:
def word2vec_classification(fold_index: int) -> Summary:
    return training_and_testing_scores(
        "Word2Vec", fold_index, None, build_classifier()
    )


word2vec_fold_summaries = cross_validator(
    x, y, binariser, word2vec_classification
)

print_summaries(word2vec_fold_summaries, top_classes)

# Doc2Vec

In [None]:
import gensim

from gensim.models.callbacks import CallbackAny2Vec
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.exceptions import NotFittedError
from sklearn.utils.estimator_checks import check_estimator

In [None]:
DOC2VEC_CORPUS_PATH = os.path.join(
    ARTEFACTS_PATH, "month.standard.data.pickle.gz"
)

DOC2VEC_LABELS_PATH = os.path.join(
    ARTEFACTS_PATH, "month.standard.labels.pickle.gz"
)

VECTOR_LENGTH = 100
CONTEXT_WINDOW_SIZE = 10
EPOCHS = 25
DISTRIBUTED_MEMORY_LEARNING_MODEL = True
NEGATIVE_SAMPLES = 5  # 5 is the Gensim default.
NEGATIVE_SAMPLING_EXPONENT = 0.75  # 0.75 is the Gensim default.

In [None]:
log("Loading corpus...")
corpus: T.Dict[str, T.List[str]] = load_pickle(DOC2VEC_CORPUS_PATH)
log("Loaded corpus.")

log("Loading labels...")
labels: T.Dict[str, T.Dict[T.Optional[str], int]] = load_pickle(
    DOC2VEC_LABELS_PATH
)
log("Loaded labels.")

In [None]:
log("Organising data and labels...")
x: T.List[gensim.models.doc2vec.TaggedDocument] = []
y_tags: T.List[str] = []

for tag, words in corpus.items():
    x.append(gensim.models.doc2vec.TaggedDocument(words, [tag]))
    y_tags.append(tag)

if SAMPLE_LIMIT is not None:
    log(f"Warning: Using a sample limit: '{SAMPLE_LIMIT}'.", file=sys.stderr)
    x = x[SAMPLE_LIMIT]
    y_tags = y_tags[SAMPLE_LIMIT]

if TRIM_MAJORITY_CLASS:
    log("Trimming...")
    x, y_tags, top_classes = trimmer(x, y_tags, labels)
else:
    top_classes = TOP_CLASSES

binariser, y = build_binariser(y_tags, labels)
log("Organised data and labels.")

In [None]:
class Doc2Vec(BaseEstimator, TransformerMixin):
    class EpochsCallback(CallbackAny2Vec):
        def __init__(self):
            self.epoch = 1

        def on_epoch_end(self, model):
            loss = model.get_latest_training_loss()
            if loss != 0.0:
                log(f"Epoch {self.epoch} completed, loss is {loss:.3f}.")
            else:
                log(f"Epoch {self.epoch} completed, loss is unavailable.")
            self.epoch += 1

    def __init__(
        self,
        vector_length=VECTOR_LENGTH,
        context_window_size=CONTEXT_WINDOW_SIZE,
        epochs=EPOCHS,
        negative_samples=NEGATIVE_SAMPLES,
        negative_sampling_exponent=NEGATIVE_SAMPLING_EXPONENT,
        distributed_memory_learning_model=DISTRIBUTED_MEMORY_LEARNING_MODEL,
        random_state: T.Optional[int] = SEED,
    ):
        self.vector_length = vector_length
        self.context_window_size = context_window_size
        self.epochs = epochs
        self.negative_samples = negative_samples
        self.negative_sampling_exponent = negative_sampling_exponent
        self.distributed_memory_learning_model = (
            distributed_memory_learning_model
        )
        self.random_state = random_state

    def _more_tags(self):
        return {"X_types": [T.List[gensim.models.doc2vec.TaggedDocument]]}

    def fit(self, x, y=None):
        log("Setting up Doc2Vec model...")

        self.doc2vec_model = gensim.models.Doc2Vec(
            documents=x,
            vector_size=self.vector_length,
            window=self.context_window_size,
            epochs=self.epochs,
            dm=int(self.distributed_memory_learning_model),
            ns_exponent=self.negative_sampling_exponent,
            negative=self.negative_samples,
            hs=int(self.negative_samples == 0),
            dbow_words=0,
            min_count=1,
            max_vocab_size=None,
            compute_loss=True,
            workers=min(os.cpu_count() or 8, 16),  # Hardly helps.
            callbacks=[self.EpochsCallback()],
            seed=self.random_state,
        )

        self._transformation_cache: T.Dict[
            T.Tuple[int, int], T.List[T_np.NDArray[T.Any]]
        ] = {
            (id(x), len(x)): [
                self.doc2vec_model.dv[document.tags[0]] for document in x
            ]
        }

        log("Fit complete.")
        return self

    def transform(self, x, y=None, cache_transformation=True):
        if not hasattr(self, "doc2vec_model"):
            raise NotFittedError("Fit the transformer first.")

        if not isinstance(x[0], gensim.models.doc2vec.TaggedDocument):
            raise ValueError("Expected a list of TaggedDocuments.")

        inferred = self._transformation_cache.get((id(x), len(x)), [])
        if inferred:
            log("Transformation already cached.")
            return inferred

        log(f"Transforming {len(x):,} documents...")
        progress_count = len(x) // 5

        for count, document in enumerate(x, start=1):
            inferred.append(self.doc2vec_model.infer_vector(document.words))
            if count % progress_count == 0:
                log(f"{count:,} out of {len(x):,} documents transformed...")

        self._transformation_cache[(id(x), len(x))] = inferred
        return inferred


check_estimator(Doc2Vec())

In [None]:
def doc2vec_classification(fold_index: int) -> Summary:
    return training_and_testing_scores(
        "Doc2Vec", fold_index, Doc2Vec(), build_classifier()
    )


doc2vec_fold_summaries = cross_validator(
    x, y, binariser, doc2vec_classification, simultaneous_folds=1
)

print_summaries(doc2vec_fold_summaries, top_classes)

# Latent Semantic Analysis


In [None]:
import gensim

from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.exceptions import NotFittedError
from sklearn.utils.estimator_checks import check_estimator
from sklearn.pipeline import make_pipeline
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.decomposition import TruncatedSVD

In [None]:
LSA_CORPUS_PATH = os.path.join(ARTEFACTS_PATH, "month.standard.data.pickle.gz")

LSA_LABELS_PATH = os.path.join(
    ARTEFACTS_PATH, "month.standard.labels.pickle.gz"
)

USE_GENSIM = False  # The paper uses scikit-learn's implementation.

# The untuned hyperparameters used in the paper. This was our first attempt.
NGRAM_RANGE = (1, 3)
SVD_COMPONENTS = 100

In [None]:
log("Loading corpus...")
corpus: T.Dict[str, T.List[str]] = load_pickle(LSA_CORPUS_PATH)
log("Loaded corpus.")

log("Loading labels...")
labels: T.Dict[str, T.Dict[T.Optional[str], int]] = load_pickle(
    LSA_LABELS_PATH
)
log("Loaded labels.")

In [None]:
log("Organising data and labels...")
x: T.List[T.Tuple[str, ...]] = []
y_tags: T.List[str] = []

for tag, words in corpus.items():
    x.append(tuple(words))
    y_tags.append(tag)

if SAMPLE_LIMIT is not None:
    log(f"Warning: Using a sample limit: '{SAMPLE_LIMIT}'.", file=sys.stderr)
    x = x[SAMPLE_LIMIT]
    y_tags = y_tags[SAMPLE_LIMIT]

if TRIM_MAJORITY_CLASS:
    log("Trimming...")
    x, y_tags, top_classes = trimmer(x, y_tags, labels)
else:
    top_classes = TOP_CLASSES

binariser, y = build_binariser(y_tags, labels)
log("Organised data and labels.")

In [None]:
class LatentSemanticAnalysis(BaseEstimator, TransformerMixin):  # For Gensim.
    def __init__(
        self,
        n_components=SVD_COMPONENTS,
        ngram_range: T.Tuple[int, int] = NGRAM_RANGE,
        ngram_joiner: str = " ",
        random_state: T.Optional[int] = SEED,
        filter_extremes=False,
    ):
        self.n_components = n_components
        self.ngram_range = ngram_range
        self.ngram_joiner = ngram_joiner
        self.random_state = random_state
        self.filter_extremes = filter_extremes

    def _more_tags(self):
        return {"X_types": [T.List[T.Iterable[str]]]}

    def _ngram_generator(self, x: T.Sequence[str]):
        for n in range(self.ngram_range[0], self.ngram_range[1] + 1):
            for gram in zip(*[x[i:] for i in range(n)]):
                yield self.ngram_joiner.join(gram)

    def fit(self, x, y=None):
        if self.ngram_range != (1, 1):
            log("Generating n-grams above unigrams...")
            x = [list(self._ngram_generator(document)) for document in x]

        log("Generating dictionary...")
        self.dictionary = gensim.corpora.Dictionary(x, prune_at=2**64)

        if self.filter_extremes:
            log(f"{len(self.dictionary):,} unique terms. Filtering...")
            self.dictionary.filter_extremes(no_below=2)
            log(f"{len(self.dictionary):,} unique terms after filtering.")
        else:
            log(f"{len(self.dictionary):,} unique terms.")

        log(f"Generating corpus of {len(x):,} documents...")
        self.corpus = list(map(self.dictionary.doc2bow, x))

        log("Setting up TF-IDF model...")
        self.tf_idf_model = gensim.models.TfidfModel(self.corpus)

        log("Setting up LSA model...")
        self.lsa_model = gensim.models.LsiModel(
            self.tf_idf_model[self.corpus],
            onepass=False,  # TODO: Which is faster time-wise?
            id2word=self.dictionary,
            num_topics=self.n_components,
            random_seed=self.random_state,
        )

        log("Fit complete.")
        return self

    def transform(self, x, y=None):
        if not hasattr(self, "lsa_model"):
            raise NotFittedError("Fit the transformer first.")

        if isinstance(x[0], str):
            x = [x]

        log(f"Transforming {len(x):,} documents...")

        return np.reshape(
            np.array(
                [
                    gensim.matutils.sparse2full(
                        self.lsa_model[document], self.n_components
                    )
                    for document in (
                        self.tf_idf_model[
                            self.dictionary.doc2bow(
                                list(self._ngram_generator(raw_document))
                            )
                        ]
                        for raw_document in x
                    )
                ]
            ),
            (len(x), self.n_components),
        )


check_estimator(LatentSemanticAnalysis())

In [None]:
def lsa_classification(fold_index: int) -> Summary:
    if USE_GENSIM:
        import logging

        logging.basicConfig(
            format="%(asctime)s : %(levelname)s : %(message)s",
            level=logging.DEBUG,
            filename=os.path.join(ARTEFACTS_PATH, "lsa.gensim.log"),
            filemode="w",
        )

        log("LSA via Gensim.")
        transformer = LatentSemanticAnalysis()
    else:
        log("LSA via scikit-learn.")
        transformer = make_pipeline(
            TfidfVectorizer(
                ngram_range=NGRAM_RANGE,
                lowercase=False,
                tokenizer=lambda x: x,
                preprocessor=lambda x: x,
                token_pattern=None,  # type: ignore
                use_idf=True,
                sublinear_tf=True,
            ),
            TruncatedSVD(
                algorithm="arpack",
                n_components=SVD_COMPONENTS,
                random_state=SEED,
            ),
            verbose=True,
        )

    return training_and_testing_scores(
        "LSA",
        fold_index,
        transformer,
        build_classifier(),
    )


lsa_fold_summaries = cross_validator(
    x, y, binariser, lsa_classification, simultaneous_folds=1
)

print_summaries(lsa_fold_summaries, top_classes)

In [None]:
gridsearch_fold_summaries = []
for vector_length, minimum_frequency in it.product(
    (100, 200, 300), (10, 1000, 10000)
):
    log(
        f"Gridsearching LSA with vector length {vector_length} "
        f"and minimum frequency {minimum_frequency}..."
    )

    def gridsearch_lsa_classification(fold_index: int) -> Summary:
        if USE_GENSIM:
            raise RuntimeError("Gensim not supported for this experiment.")

        transformer = make_pipeline(
            TfidfVectorizer(
                ngram_range=NGRAM_RANGE,
                lowercase=False,
                tokenizer=lambda x: x,
                preprocessor=lambda x: x,
                token_pattern=None,  # type: ignore
                use_idf=True,
                sublinear_tf=True,
                min_df=minimum_frequency,
            ),
            TruncatedSVD(
                algorithm="arpack",
                n_components=vector_length,
                random_state=SEED,
            ),
            verbose=True,
        )

        return training_and_testing_scores(
            f"Gridsearch-V{vector_length}-MinDF{minimum_frequency}-LSA",
            fold_index,
            transformer,
            build_classifier(),
        )

    gridsearch_fold_summaries.append(
        cross_validator(
            x,
            y,
            binariser,
            gridsearch_lsa_classification,
            simultaneous_folds=1,
        )
    )

In [None]:
ngram_fold_summaries = []
for ngram_range in (
    (1, 1),
    (1, 2),
    # (1, 3),  # This is the default we present; no need to recalculate it.
    (2, 2),
    (2, 3),
    (3, 3),
):
    log(f"Trying LSA with n-gram range {ngram_range}...")

    def ngram_lsa_classification(fold_index: int) -> Summary:
        if USE_GENSIM:
            raise RuntimeError("Gensim not supported for this experiment.")

        transformer = make_pipeline(
            TfidfVectorizer(
                ngram_range=ngram_range,
                lowercase=False,
                tokenizer=lambda x: x,
                preprocessor=lambda x: x,
                token_pattern=None,  # type: ignore
                use_idf=True,
                sublinear_tf=True,
            ),
            TruncatedSVD(
                algorithm="arpack",
                n_components=SVD_COMPONENTS,
                random_state=SEED,
            ),
            verbose=True,
        )

        return training_and_testing_scores(
            f"Ngram-{ngram_range[0]}-{ngram_range[1]}-LSA",
            fold_index,
            transformer,
            build_classifier(),
        )

    ngram_fold_summaries.append(
        cross_validator(
            x,
            y,
            binariser,
            ngram_lsa_classification,
            simultaneous_folds=1,
        )
    )

# Latent Semantic Analysis (without Domain Knowledge)

In [None]:
import gensim

from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.exceptions import NotFittedError
from sklearn.utils.estimator_checks import check_estimator
from sklearn.pipeline import make_pipeline
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.decomposition import TruncatedSVD

In [None]:
LSA_NDK_CORPUS_PATH = os.path.join(
    ARTEFACTS_PATH, "month.standard-ndk.data.pickle.gz"
)

LSA_NDK_LABELS_PATH = os.path.join(
    ARTEFACTS_PATH, "month.standard-ndk.labels.pickle.gz"
)


USE_GENSIM = False

NGRAM_RANGE = (1, 3)
SVD_COMPONENTS = 100

In [None]:
log("Loading corpus...")
corpus: T.Dict[str, T.List[str]] = load_pickle(LSA_NDK_CORPUS_PATH)
log("Loaded corpus.")

log("Loading labels...")
labels: T.Dict[str, T.Dict[T.Optional[str], int]] = load_pickle(
    LSA_NDK_LABELS_PATH
)
log("Loaded labels.")

In [None]:
log("Organising data and labels...")
x: T.List[T.Tuple[str, ...]] = []
y_tags: T.List[str] = []

for tag, words in corpus.items():
    x.append(tuple(words))
    y_tags.append(tag)

if SAMPLE_LIMIT is not None:
    log(f"Warning: Using a sample limit: '{SAMPLE_LIMIT}'.", file=sys.stderr)
    x = x[SAMPLE_LIMIT]
    y_tags = y_tags[SAMPLE_LIMIT]

if TRIM_MAJORITY_CLASS:
    log("Trimming...")
    x, y_tags, top_classes = trimmer(x, y_tags, labels)
else:
    top_classes = TOP_CLASSES

binariser, y = build_binariser(y_tags, labels)
log("Organised data and labels.")

In [None]:
def lsa_ndk_classification(fold_index: int) -> Summary:
    if USE_GENSIM:
        try:
            LatentSemanticAnalysis  # type: ignore
        except NameError:
            raise NotImplementedError(
                "Run the domain knowledge version's "
                "Gensim implementation block first."
            )

        log("LSA-NDK via Gensim.")
        transformer = LatentSemanticAnalysis(
            ngram_range=NGRAM_RANGE, n_components=SVD_COMPONENTS
        )
    else:
        log("LSA-NDK via scikit-learn.")
        transformer = make_pipeline(
            TfidfVectorizer(
                ngram_range=NGRAM_RANGE,
                lowercase=False,
                tokenizer=lambda x: x,
                preprocessor=lambda x: x,
                token_pattern=None,  # type: ignore
                use_idf=True,
                sublinear_tf=True,
            ),
            TruncatedSVD(
                algorithm="arpack",
                n_components=SVD_COMPONENTS,
                random_state=SEED,
            ),
            verbose=True,
        )

    return training_and_testing_scores(
        "LSA-NDK",
        fold_index,
        transformer,
        build_classifier(),
    )


lsa_ndk_fold_summaries = cross_validator(
    x, y, binariser, lsa_ndk_classification, simultaneous_folds=1
)

print_summaries(lsa_ndk_fold_summaries, top_classes)

# Latent Semantic Analysis (best hyperparameters)

In [None]:
import gensim

from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.exceptions import NotFittedError
from sklearn.utils.estimator_checks import check_estimator
from sklearn.pipeline import make_pipeline
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.decomposition import TruncatedSVD

In [None]:
LSA_NDK_CORPUS_PATH = os.path.join(
    ARTEFACTS_PATH, "month.standard-ndk.data.pickle.gz"
)

LSA_NDK_LABELS_PATH = os.path.join(
    ARTEFACTS_PATH, "month.standard-ndk.labels.pickle.gz"
)


USE_GENSIM = False

NGRAM_RANGE = (1, 1)
SVD_COMPONENTS = 200

In [None]:
log("Loading corpus...")
corpus: T.Dict[str, T.List[str]] = load_pickle(LSA_NDK_CORPUS_PATH)
log("Loaded corpus.")

log("Loading labels...")
labels: T.Dict[str, T.Dict[T.Optional[str], int]] = load_pickle(
    LSA_NDK_LABELS_PATH
)
log("Loaded labels.")

In [None]:
log("Organising data and labels...")
x: T.List[T.Tuple[str, ...]] = []
y_tags: T.List[str] = []

for tag, words in corpus.items():
    x.append(tuple(words))
    y_tags.append(tag)

if SAMPLE_LIMIT is not None:
    log(f"Warning: Using a sample limit: '{SAMPLE_LIMIT}'.", file=sys.stderr)
    x = x[SAMPLE_LIMIT]
    y_tags = y_tags[SAMPLE_LIMIT]

if TRIM_MAJORITY_CLASS:
    log("Trimming...")
    x, y_tags, top_classes = trimmer(x, y_tags, labels)
else:
    top_classes = TOP_CLASSES

binariser, y = build_binariser(y_tags, labels)
log("Organised data and labels.")

In [None]:
def lsa_best_classification(fold_index: int) -> Summary:
    if USE_GENSIM:
        try:
            LatentSemanticAnalysis  # type: ignore
        except NameError:
            raise NotImplementedError(
                "Run the domain knowledge version's "
                "Gensim implementation block first."
            )

        log("LSA-BEST via Gensim.")
        transformer = LatentSemanticAnalysis(
            ngram_range=NGRAM_RANGE, n_components=SVD_COMPONENTS
        )
    else:
        log("LSA-BEST via scikit-learn.")
        transformer = make_pipeline(
            TfidfVectorizer(
                ngram_range=NGRAM_RANGE,
                lowercase=False,
                tokenizer=lambda x: x,
                preprocessor=lambda x: x,
                token_pattern=None,  # type: ignore
                use_idf=True,
                sublinear_tf=True,
            ),
            TruncatedSVD(
                algorithm="arpack",
                n_components=SVD_COMPONENTS,
                random_state=SEED,
            ),
            verbose=True,
        )

    return training_and_testing_scores(
        "LSA-BEST",
        fold_index,
        transformer,
        build_classifier(),
    )


lsa_best_fold_summaries = cross_validator(
    x, y, binariser, lsa_best_classification, simultaneous_folds=1
)

print_summaries(lsa_best_fold_summaries, top_classes)