In [127]:
from pathlib import Path
import re
import operator
from collections import defaultdict
from typing import DefaultDict
from datasets import Dataset
from transformers import AutoTokenizer

In [128]:
logfile_path = Path('/home/cernypro/dev/source/ml4logs/data/interim/HDFS1/val-data-HDFS1-cv1-1.log')

# Ordering by time

In [129]:
with logfile_path.open(mode='r') as f:
    lines = [line for line in f]

In [130]:
lines[0]

'081111 051303 20680 INFO dfs.DataNode$DataXceiver: Receiving block blk_-2703192954496203619 src: /10.251.71.146:45393 dest: /10.251.71.146:50010\n'

In [60]:
p_str = r'^(\d+) (\d+) (\d+) '
p = re.compile(p_str)

In [61]:
lines_with_stamps = [(tuple(map(int,p.match(line).groups())), line) for line in lines]

In [62]:
sorted_lines = list(map(operator.itemgetter(1), sorted(lines_with_stamps, key=operator.itemgetter(0))))

In [69]:
sorted_lines[-100000:-99990]

['081111 110444 26 INFO dfs.FSNamesystem: BLOCK* NameSystem.delete: blk_-8577959545943422945 is added to invalidSet of 10.251.214.112:50010\n',
 '081111 110444 26 INFO dfs.FSNamesystem: BLOCK* NameSystem.delete: blk_-8577959545943422945 is added to invalidSet of 10.251.37.240:50010\n',
 '081111 110444 26 INFO dfs.FSNamesystem: BLOCK* NameSystem.delete: blk_-3157828367431892896 is added to invalidSet of 10.251.121.224:50010\n',
 '081111 110444 26 INFO dfs.FSNamesystem: BLOCK* NameSystem.delete: blk_-3157828367431892896 is added to invalidSet of 10.251.198.196:50010\n',
 '081111 110444 26 INFO dfs.FSNamesystem: BLOCK* NameSystem.delete: blk_-3157828367431892896 is added to invalidSet of 10.251.67.4:50010\n',
 '081111 110444 26 INFO dfs.FSNamesystem: BLOCK* NameSystem.delete: blk_2097261219794367550 is added to invalidSet of 10.250.15.198:50010\n',
 '081111 110444 26 INFO dfs.FSNamesystem: BLOCK* NameSystem.delete: blk_2097261219794367550 is added to invalidSet of 10.251.214.130:50010\n',

In [64]:
new_file_name = f'{logfile_path.stem}-time-ordered.log'
new_path = logfile_path.parent / new_file_name

In [65]:
with new_path.open(mode='w') as f:
    f.writelines(sorted_lines)

# Contexts by blocks

In [131]:
MIN_CONTEXT_SIZE = 2
MAX_CONTEXT_SIZE = 10

In [132]:
def load_grouped(file_path: Path) -> DefaultDict:
    traces = defaultdict(list)

    regex = re.compile(r'(blk_-?\d+)')  # pattern eg. blk_-1608999687919862906

    with file_path.open(mode='r') as f:
        for line in f:
            block_id = regex.search(line).group()
            traces[block_id].append(line)
    return traces

In [133]:
blocks = load_grouped(logfile_path)

In [134]:
def create_contexts_from_blocks(block_dict, min_context_size:int, max_context_size:int):
    contexts = []
    for block_id, lines in block_dict.items():
        cur_context = []
        for line in lines:
            cur_context.append(line)
            if len(cur_context) >= max_context_size:
                contexts.append(cur_context)
                cur_context = []
        if min_context_size <= len(cur_context) < max_context_size:
            contexts.append(cur_context)
    return contexts
        

In [109]:
tst = {f'tst{i}' : blocks[list(blocks.keys())[i]] for i in range(2)}

In [135]:
chunks_lines = create_contexts_from_blocks(blocks, MIN_CONTEXT_SIZE, MAX_CONTEXT_SIZE)

In [136]:
chunks_dataset = Dataset.from_dict({'chunk': chunks_lines})

In [137]:
tokenizer = AutoTokenizer.from_pretrained("distilbert-base-cased")

In [138]:
%%time
tokenized_chunks = [tokenizer(chunk, add_special_tokens=False, truncation=True, return_attention_mask=False)['input_ids'] for chunk in chunks_lines]

CPU times: user 4min 32s, sys: 42.2 s, total: 5min 14s
Wall time: 1min 47s


In [139]:
len(tokenized_chunks)

115988

In [140]:
len(chunks_lines)

115988

In [143]:
chunks_dataset[0]

{'chunk': ['081111 051303 20680 INFO dfs.DataNode$DataXceiver: Receiving block blk_-2703192954496203619 src: /10.251.71.146:45393 dest: /10.251.71.146:50010\n',
  '081111 051303 30 INFO dfs.FSNamesystem: BLOCK* NameSystem.allocateBlock: /user/root/rand5/_temporary/_task_200811101024_0011_m_000849_0/part-00849. blk_-2703192954496203619\n',
  '081111 051304 20827 INFO dfs.DataNode$DataXceiver: Receiving block blk_-2703192954496203619 src: /10.251.111.228:46287 dest: /10.251.111.228:50010\n',
  '081111 051304 20881 INFO dfs.DataNode$DataXceiver: Receiving block blk_-2703192954496203619 src: /10.251.71.146:47540 dest: /10.251.71.146:50010\n',
  '081111 051343 20828 INFO dfs.DataNode$PacketResponder: PacketResponder 0 for block blk_-2703192954496203619 terminating\n',
  '081111 051343 20828 INFO dfs.DataNode$PacketResponder: Received block blk_-2703192954496203619 of size 67108864 from /10.251.111.228\n',
  '081111 051343 20882 INFO dfs.DataNode$PacketResponder: Received block blk_-27031929

In [142]:
%%time
tokenized_dataset = chunks_dataset.map(lambda example: {'chunk': tokenizer(example['chunk'], add_special_tokens=False, truncation=True, return_attention_mask=False)['input_ids'] })

HBox(children=(FloatProgress(value=0.0, max=115988.0), HTML(value='')))


CPU times: user 4min 54s, sys: 45.4 s, total: 5min 40s
Wall time: 2min 23s
