In [None]:
import os
import pathlib
import re
import functools as ftools
import time

import numpy as np
import pandas as pd
import fasttext

In [None]:
PROJECT_DIR = pathlib.Path(os.getenv('PROJECT_DIR', default=pathlib.Path.home() / 'ml4logs'))
FASTTEXT_PATHS = {
    'hdfs_1':  PROJECT_DIR / 'models/embeddings/fasttext-skipgram-hdfs_1.bin',
    'hdfs_2':  PROJECT_DIR / 'models/embeddings/fasttext-skipgram-hdfs_2.bin',
    'hdfs_12': PROJECT_DIR / 'models/embeddings/fasttext-skipgram-hdfs_12.bin'
}
HDFS_PATH = PROJECT_DIR / 'data/raw/HDFS_1.log'
LABEL_PATH = PROJECT_DIR / 'data/raw/HDFS_1_label.csv'

DIM_EMBEDDING = 100

assert(HDFS_PATH.exists() and HDFS_PATH.is_file())
assert(LABEL_PATH.exists() and LABEL_PATH.is_file())
for _, path in FASTTEXT_PATHS.items():
    assert(path.exists() and path.is_file())

## Load raw logs and labels

In [None]:
logs = HDFS_PATH.read_text().strip().split('\n')

In [None]:
labels = pd.read_csv(LABEL_PATH).set_index('BlockId')
labels['Label'] = labels['Label'].map({'Normal': 0, 'Anomaly': 1})

## Find blocks in each log message

In [None]:
blocks = tuple(map(ftools.partial(re.findall, r'(blk_-?\d+)'), logs))

## Process for each fasttext model and for each merge method

In [None]:
def sum_per_block(embeddings, blocks):
    result = {}
    for i in range(len(embeddings)):
        for bid in blocks[i]:
            result[bid] = result.get(bid, np.zeros(DIM_EMBEDDING)) + embeddings[i]
    return result

def average_per_block(embeddings, blocks):
    result = {}
    counter = {}
    for i in range(len(embeddings)):
        for bid in blocks[i]:
            result[bid] = result.get(bid, np.zeros(DIM_EMBEDDING)) + embeddings[i]
            counter[bid] = counter.get(bid, 0) + 1
    for bid in counter:
        result[bid] /= counter[bid]
    return result

def max_per_block(embeddings, blocks):
    result = {}
    for i in range(len(embeddings)):
        for bid in blocks[i]:
            result[bid] = np.fmax(result.get(bid, embeddings[i]), embeddings[i])
    return result

def min_per_block(embeddings, blocks):
    result = {}
    for i in range(len(embeddings)):
        for bid in blocks[i]:
            result[bid] = np.fmin(result.get(bid, embeddings[i]), embeddings[i])
    return result

merge_methods = {
    'sum': sum_per_block,
    'average': average_per_block,
    'max': max_per_block,
    'min': min_per_block
}

In [None]:
for fname, fasttext_path in FASTTEXT_PATHS.items():
    started_time = time.time()
    display('Processing {}'.format(fname))
    model = fasttext.load_model(str(fasttext_path))
    embeddings = np.stack(tuple(map(model.get_sentence_vector, logs)))
    for mname, f in merge_methods.items():
        display('---- Subprocessing {}'.format(mname))
        embeddings_per_block = f(embeddings, blocks)
        blks, X = tuple(zip(*embeddings_per_block.items()))
        Y = labels.loc[list(blks)]['Label'].to_numpy()
        save_fname = f'{fname}-fasttext-{mname}.npz'
        np.savez(PROJECT_DIR / 'data/processed' / save_fname, X=np.stack(X), Y=Y)
    display('Time elapsed: {:.2f}'.format(time.time() - started_time))