All text annotations are temporary, and for guiding John.

In [None]:
import bokeh.io
from collections import defaultdict
from dask.distributed import LocalCluster, Client, as_completed
import gzip
from hashlib import md5
import json
import numpy as np
import os
import pandas as pd
import panel as pn
from pathlib import Path
import scipy.sparse as ss
from sklearn.preprocessing import Normalizer
import sys
import thisnotthat as tnt
from tqdm.auto import tqdm
import umap
import vectorizers as vz
import vectorizers.transformers as vzt
import zstandard as zstd

In [None]:
bokeh.io.output_notebook()
pn.extension()

Dask makes things go zzzzzoom

In [None]:
cluster = LocalCluster(threads_per_worker=1)
client = Client(cluster)
client

We will work on host 501, processing all days. One can also choose days between 18 and 25.

In [None]:
HOST = 501
DAYS = ["*"]
HOSTNAME = f"SysClient{HOST:04d}.systemia.com"

The data lives as compressed JSON-lines chunks, Zstd-compressed. The following data engineering goes much less deep into token generation than the work we presented so far, so as to put the emphasis on the vectorization (not the data engineering).

In [None]:
schemas = {
    "FLOW": ["object", "action", ("src_ip", "ip"), ("dest_ip", "ip"), ("src_port", "port"), ("dest_port", "port"), "l4protocol", "direction"],
    "FILE": ["object", "action", ("file_path", "path"), "info_class", ("new_path", "path")],
    "HOST": ["object", "action"],
    "MODULE": ["object", "action", ("module_path", "path")],
    "REGISTRY": ["object", "action", ("key", "registry-key"), ("value", "registry-value"), ("type", "registry-type")],
    "SERVICE": ["object", "action", ("name", "service-name")],
    "SHELL": ["object", "action"],
    "TASK": ["object", "action", "path", ("task_name", "task-name")],
    "THREAD": ["object", "action"],
    "USER_SESSION": ["object", "action", ("user", "user-domain"), ("requesting_domain", "domain"), ("requesting_user", "user"), ("src_ip", "ip"), ("src_port", "port")],
    "PROCESS": {
        "CREATE": {
            "actorID": ["object", "action", ("image_path", "child"), ("image_path", "path")],
            "objectID": [("parent_image_path", "parent"), ("image_path", "process"), ("user", "user-domain")]
        },
        "OPEN": {
            "actorID": ["object", "action"]
        },
        "TERMINATE": {
            "actorID": ["object", "action"]
        }
    }
}

In [None]:
def iter_events(path_chunk):
    with zstd.open(path_chunk, mode="rt", encoding="utf-8") as file:
        for line in file:
            try:
                yield json.loads(line)
            except json.JSONDecodeError:
                # Skip ill-formed records.
                pass

In [None]:
def extract_features(path_chunk):
    for event in iter_events(path_chunk):
        obj = event["object"]
        if obj not in schemas:
            continue
        schema = (
            schemas[obj].get(event["action"], {})
            if isinstance(schemas[obj], dict)
            else {"actorID": schemas[obj]}
        )
        for identifier, features in schema.items():
            tokens = []
            for feature in features:
                field, kind = (feature, feature) if isinstance(feature, str) else feature
                if value := event.get(field, ""):
                    tokens.append((kind, value))
            yield (pd.Timestamp(event["timestamp"]), event[identifier], tokens)

Given a data chunk, we return a data frame where its categorical tokens are already in the list form suitable for one-hot vectorization.

In [None]:
def tabulate_features(path_chunk):
    return pd.DataFrame(
        data=extract_features(path_chunk),
        columns=["timestamp", "process_id", "tokens"]
    ).astype({"process_id": "category"})

Where are my data chunks?

In [None]:
ROOT_HOSTNAME = Path("/data/optc/scipy2023") / HOSTNAME
CHUNKS = sorted(sum(
    [list(ROOT_HOSTNAME.glob(f"{day}/optc-eng.*.json.zstd")) for day in DAYS],
    []
))
len(CHUNKS)

The vectorization gambit is to do it by chunks, and combine the resulting sparse matrices afterwards, using nifty NgramVectorizer addition.

In [None]:
def vectorize_features(path_chunk):
    return vz.NgramVectorizer().fit(tabulate_features(path_chunk)["tokens"])

In [None]:
%%time
vzr_all = sum(
    (fut.result() for fut in tqdm(client.map(vectorize_features, CHUNKS), total=len(CHUNKS))),
    vz.NgramVectorizer()
)
event_matrix = vzr_all._train_matrix
event_matrix

Let's now group events by process.

In [None]:
def events_by_process(path_chunk):
    process_ids = tabulate_features(path_chunk)[["process_id"]]
    process_ids["event_index"] = pd.Series(process_ids.index).apply(lambda x: [x])
    return process_ids.groupby("process_id", as_index=False).sum()

In [None]:
%%time
process2ievent = {}
total_events = 0
for fut in tqdm(client.map(events_by_process, CHUNKS), total=len(CHUNKS)):
    processes = fut.result()
    for process_id, indices in processes[["process_id", "event_index"]].itertuples(index=False):
        process2ievent.setdefault(process_id, [])
        for index_row_chunk in indices:
            process2ievent[process_id].append(index_row_chunk + total_events)
    total_events += processes["event_index"].apply(len).sum()

len(process2ievent)

That's a *lot* of processes. Let's prune off those for which we don't have enough features (by weight) to reliably describe their behaviour.

In [None]:
features_per_event = np.array(event_matrix.sum(axis=1)).squeeze()
features_per_process = pd.Series({process_id: sum([features_per_event[i] for i in indices]) for process_id, indices in tqdm(process2ievent.items())})
features_per_process

Distribution of number of features per process:

In [None]:
features_per_process.apply(np.log10).hist(bins=range(-1, 6))

It does not make much sense to me to keep processes described by a total number of categorical features less than 10. So let's drop the guys from the first column.

We will do that while also putting together process vectors by summing event vectors.
This means a linear combination of the rows of the event matrix.
The fastest way of achieving that is by computing a projection matrix that we will multiply on the left of the event matrix.

In [None]:
%%time
irows = []
icols = []
process2irow = {}
irow2process = {}
irow_next = 0
for process_id, indices in tqdm(process2ievent.items()):
    if features_per_process.loc[process_id] >= 10:
        irow = irow_next
        irow_next += 1
        irows += [irow] * len(indices)
        icols += indices
        process2irow[process_id] = irow
        irow2process[irow] = process_id

projection = ss.coo_matrix((np.ones((len(irows),), dtype=np.int32), (irows, icols)), shape=(len(process2irow), event_matrix.shape[0])).tocsr()
assert set(np.array(projection.sum(axis=0)).squeeze()) <= {0, 1}
projection

In [None]:
process_matrix = (projection @ event_matrix).astype(np.float32)
process_matrix

The categories (_labels_) for our process instances are either the command line by which they were started, or when we can't find that, their related image path.
The former can only be found in `PROCESS-CREATE` events.
The latter is field common to all events, and its value should be shared by all events generated by any given process instance.

The way we associate labels to process instances is thus to extract the best label we can from every event.
We then tabulate these in association with their process ID, and use an *importance* ordinal to denote which label should take precedence.
We sort this table by importance, and drop process ID duplicates: what remains are the best guest we can take as label for every process.

In [None]:
def filter_labels(proposals):
    return proposals.sort_values("importance", ascending=True).drop_duplicates(subset=["process_id"], keep="first", ignore_index=True)

In [None]:
def label_processes(path_chunk):
    data = []
    for event in iter_events(path_chunk):
        if event["object"] == "PROCESS" and event["action"] == "CREATE":
            if command_line := event.get("command_line", ""):
                data.append((event["objectID"], 0, command_line))
            elif image_path := event.get("image_path", ""):
                data.append((event["objectID"], 10, image_path))
            if parent_image_path := event.get("parent_image_path", ""):
                data.append((event["actorID"], 10, parent_image_path))
        else:
            if image_path := event.get("image_path", ""):
                data.append((event["actorID"], 10, image_path))

    return filter_labels(pd.DataFrame(data=data, columns=["process_id", "importance", "label"]))

We then run this filtering iteratively across best proposals from every chunk, and come out the other end with every process instance labeled... or nearly.

In [None]:
labels_known = pd.DataFrame()
for fut in tqdm(client.map(label_processes, CHUNKS), total=len(CHUNKS)):
    labels_known = filter_labels(pd.concat([labels_known, fut.result()], ignore_index=True))
labels_known

Any process missing a label, now, we just consider we **don't know** what they are about.
Let's bin these together.

In [None]:
labels = pd.Series(irow2process, name="process_id").to_frame().merge(labels_known[["process_id", "label"]], on="process_id", how="left").fillna("(unknown)")
labels

Now, not all features are _useful_ for characterizing the process instances.
*Orphan features* are too few for their sharing to denote similarity between more than a very small group of processes.
*Spurious features* are too often associated to processes to help differentiate between them (like stop words).
So a quick thresholding might help compress our very large feature space.

In [None]:
feature_importance = pd.Series(np.array(process_matrix.sum(axis=0)).squeeze())
sum(feature_importance == 0)

So, the pruning of the set of processes already leaves 41 features completely useless.

In [None]:
feature_importance.loc[feature_importance > 0].apply(np.log10).hist(bins=[-2,-1,0,1,2,3,4,5])

Most features, by a large factor, are orphans; we seem not to have any spurious feature, as none is associated to more than 10000 process instances.

Let's take a more detailed look at the first column of the previous histogram.

In [None]:
feature_importance.loc[feature_importance < 10].hist(bins=np.linspace(0, 10, 10) - 0.5)

Again, most of these rarely used features are literal orphans: associated to one or two processes.
Let's cut off any that's not tied to at least 3 processes.

In [None]:
%%time
col2token = []
token2col = {}
indices_keep = []
for i, count in enumerate(feature_importance):
    if count > 3:
        indices_keep.append(i)
        token = vzr_all.column_index_dictionary_[i]
        index_new = len(col2token)
        col2token.append(token)
        token2col[token] = index_new

reduced_matrix = process_matrix[:, indices_keep].copy()
reduced_matrix

Ok, has this feature space reduction killed the representation of processes?
I'm hoping the total feature weight for any process is at least 5 (e.g. 5 tokens associated to it across all events that characterize it).

In [None]:
features_per_process_redux = np.array(reduced_matrix.sum(axis=1)).squeeze()
assert np.min(features_per_process_redux) > 5.0

Now, it's always easier to compute the compressed vector representation on the subset of unique process vectors.

In [None]:
%%time

def md5_list(it):
    return struct.unpack("<QQ", md5(memoryview(np.array(it))).digest())

reduced_lil = reduced_matrix.tolil()
hh = np.zeros(shape=(reduced_matrix.shape[0], 4), dtype=np.uint64)
for i, indices_values in enumerate(zip(reduced_lil.rows, reduced_lil.data)):
    hh[i, :] = sum((md5_list(it) for it in indices_values), ())
_, index_u, inverse_u, counts_u = np.unique(hh, axis=0, return_index=True, return_inverse=True, return_counts=True)
index_u.shape, inverse_u.shape

In [None]:
unique_matrix = reduced_matrix[index_u, :]
unique_matrix

I have tried running the information weight transform on the matrix of unique process vectors,
but the result seems to confuse UMAP **a lot**.
UMAP would crash on that matrix by putting way too many vectors under one particular leaf of the RP tree:
Leland mused that the hyperplanes used to spread the vectors between the RP trees were doing a poor job.
I didn't have the time to truly debug this, so I moved on with directly compressing the matrix of unique process vectors.

The protomap only contains the unique vectors.

In [None]:
%%time
process_protomap = umap.UMAP(n_components=2, metric="cosine", densmap=True, dens_lambda=4, n_epochs=800, verbose=True).fit_transform(Normalizer(norm="l1").fit_transform(unique_matrix))
process_protomap

The full map is the protomap reduplicated.

In [None]:
process_map = process_protomap[inverse_u, :]
process_map.shape

The following will visualize a map of all process instances, where we color the most frequent process classes (top 12).

In [None]:
processes_top12 = labels.groupby("label", as_index=False).agg({"process_id": "count"}).sort_values("process_id", ascending=False).head(12)
processes_top12

In [None]:
list(processes_top12["label"])

In [None]:
process_map_study = process_map[labels.loc[labels["label"] == 'C:\\Windows\\SYSTEM32\\cmd.exe /c "C:\\ncr\\DeleteArchiveSecurity.bat"'].index, :]
process_map_study.shape

In [None]:
plot = tnt.BokehPlotPane(process_map_study, width=900, height=900, show_legend=False)
editor = tnt.LabelEditorWidget([])
editor.link_to_plot(plot)
summary = tnt.DataSummaryPane()
pn.Row(plot, editor, summary)

Ready for demos of exploring the map contents using summarizers.