<a href="https://colab.research.google.com/github/honicky/deep-log-analysis/blob/main/Pythia%20Analysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Pythia Analysis - train small models on HDFS data

* use tokenized version of preprocessed HDFS events
* start with very small pythia models, test increasing size
* start with fine-tuning, then consider resetting weights and training from scratch
* experiment with different tokenizers
  * https://chatgpt.com/share/67448f53-29a0-800f-9913-af22d6ed0894


In [None]:
try:
  from google.colab import userdata

  !git clone https://github.com/honicky/deep-log-analysis.git
  !mv deep-log-analysis/* .
  !rm -rf deep-log-analysis
except:
  pass

In [1]:
try:
    import logparser.Drain as Drain
except ImportError:
    %pip install requests git+https://github.com/logpai/logparser

%pip install transformers torch torchvision torchaudio wandb python-dotenv datasets


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.2[0m[39;49m -> [0m[32;49m24.3.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


In [2]:
import logparser.Drain as Drain


In [3]:
%load_ext autoreload
%autoreload 2
import dataloaders as dl


# Load secrets

If we are in colab, we get them from the `userdata` module, otherwise we get them from a .env file


In [4]:
import os
try:
  from google.colab import userdata
  os.environ["HF_WRITE_TOKEN"] = userdata.get('HF_WRITE_TOKEN')
  os.environ["WANDB_API_KEY"] = userdata.get('WANDB_API_KEY')
except ImportError:
  from dotenv import load_dotenv
  load_dotenv()


# Download and unzip the HDFS dataset

The functions check if the data is already downloaded and unzipped, and only download and unzip if they are not present.


In [23]:
import pandas as pd


dl.download_data(dl.datasets["HDFS"]["url"], dl.datasets["HDFS"]["zip_file_name"])

regenerate_data = False
if regenerate_data:
    from datasets import Dataset

    dl.unzip_data(dl.datasets["HDFS"]["zip_file_name"], dl.datasets["HDFS"]["file_name"])

    structured_file_path = dl.parse_dataset("HDFS")

    structured_df = pd.read_csv(structured_file_path)
    dl.add_hdfs_blockid_column(structured_df)
    structured_df.head()

    structured_dataset = Dataset.from_pandas(structured_df)
    structured_dataset.push_to_hub("honicky/log-analysis-hdfs-preprocessed", token=os.environ["HF_WRITE_TOKEN"])

else:
    from datasets import load_dataset

    structured_dataset = load_dataset("honicky/log-analysis-hdfs-preprocessed")
    structured_df = pd.DataFrame(structured_dataset['train'])


# Load the block labels

In [24]:
dl.unzip_data(dl.datasets["HDFS"]["zip_file_name"],"preprocessed/anomaly_label.csv", base_dir="data/hdfs" )

anomaly_label_df = pd.read_csv("data/hdfs/preprocessed/anomaly_label.csv")
anomaly_label_df.head()


Unnamed: 0,BlockId,Label
0,blk_-1608999687919862906,Normal
1,blk_7503483334202473044,Normal
2,blk_-3544583377289625738,Anomaly
3,blk_-9073992586687739851,Normal
4,blk_7854771516489510256,Normal


# Parse the parameter list

The parameter list is formatted as python code, so we need to use the `ast` library to parse it.

In [25]:
from ast import literal_eval

structured_df['ParsedParameterList'] = structured_df.ParameterList.apply(literal_eval)


In [26]:
event_id_mapping_pdf = (structured_df
 .EventId
 .value_counts()
 .reset_index()
 .reset_index()
 .rename(columns={"index":"NewEventId"})
 [["EventId", "NewEventId"]]
)

In [27]:
structured_with_event_id_pdf = structured_df.merge(event_id_mapping_pdf, on="EventId")
structured_with_event_id_pdf.head()

Unnamed: 0,LineId,Date,Time,Pid,Level,Component,Content,EventId,EventTemplate,ParameterList,BlockId,ParsedParameterList,NewEventId
0,1,81109,203518,143,INFO,dfs.DataNode$DataXceiver,Receiving block blk_-1608999687919862906 src: ...,09a53393,Receiving block <*> src: <*> dest: <*>,"['blk_-1608999687919862906', '/10.250.19.102:5...",blk_-1608999687919862906,"[blk_-1608999687919862906, /10.250.19.102:5410...",0
1,2,81109,203518,35,INFO,dfs.FSNamesystem,BLOCK* NameSystem.allocateBlock: /mnt/hadoop/m...,3d91fa85,BLOCK* NameSystem.allocateBlock: <*> <*>,['/mnt/hadoop/mapred/system/job_200811092030_0...,blk_-1608999687919862906,[/mnt/hadoop/mapred/system/job_200811092030_00...,6
2,3,81109,203519,143,INFO,dfs.DataNode$DataXceiver,Receiving block blk_-1608999687919862906 src: ...,09a53393,Receiving block <*> src: <*> dest: <*>,"['blk_-1608999687919862906', '/10.250.10.6:405...",blk_-1608999687919862906,"[blk_-1608999687919862906, /10.250.10.6:40524,...",0
3,4,81109,203519,145,INFO,dfs.DataNode$DataXceiver,Receiving block blk_-1608999687919862906 src: ...,09a53393,Receiving block <*> src: <*> dest: <*>,"['blk_-1608999687919862906', '/10.250.14.224:4...",blk_-1608999687919862906,"[blk_-1608999687919862906, /10.250.14.224:4242...",0
4,5,81109,203519,145,INFO,dfs.DataNode$PacketResponder,PacketResponder 1 for block blk_-1608999687919...,d38aa58d,PacketResponder <*> for block <*> <*>,"['1', 'blk_-1608999687919862906 terminating']",blk_-1608999687919862906,"[1, blk_-1608999687919862906 terminating]",2


## Construct blocks to parse

https://raw.githubusercontent.com/EleutherAI/pythia/refs/heads/main/utils/20B_tokenizer.json has the tokenizer configuration.  We will use the `<|sep|>` token to immediately precede the short event id.  We need to add the `<|sep|>` token to the tokenizer, because it is not in the default tokenizer.  This will hopefully help the attention mechanism attend to the event id specifically.  We have shortened the event id to the minimum length based on the number occurences.  This will gives an efficient coding that will be less complicated for the attention mechanism.

We can consider a more customized tokenizer as another experiment.  This might help because of the special characters and the dominance of numbers in the logs.


In [28]:
from transformers import GPTNeoXTokenizerFast
tokenizer = GPTNeoXTokenizerFast.from_pretrained("EleutherAI/pythia-14m")
tokenizer.add_special_tokens({"additional_special_tokens": ["<|sep|>"]})
tokenizer.sep_token = "<|sep|>"
tokenizer.sep_token_id
tokenizer.pad_token_id = tokenizer.eos_token_id # no pad token in default tokenizer, so add it here for collating / training


Double check that the tokenizer properly encodes the new special token

In [29]:

tokenizer.encode("<|sep|>")


[50277]

Review then tokenizer configuration, again to ensure the new special token is included


In [30]:
tokenizer

GPTNeoXTokenizerFast(name_or_path='EleutherAI/pythia-14m', vocab_size=50254, model_max_length=1000000000000000019884624838656, is_fast=True, padding_side='right', truncation_side='right', special_tokens={'bos_token': '<|endoftext|>', 'eos_token': '<|endoftext|>', 'unk_token': '<|endoftext|>', 'sep_token': '<|sep|>', 'pad_token': '<|endoftext|>', 'additional_special_tokens': ['<|sep|>']}, clean_up_tokenization_spaces=True),  added_tokens_decoder={
	0: AddedToken("<|endoftext|>", rstrip=False, lstrip=False, single_word=False, normalized=False, special=True),
	1: AddedToken("<|padding|>", rstrip=False, lstrip=False, single_word=False, normalized=False, special=True),
	50254: AddedToken("                        ", rstrip=False, lstrip=False, single_word=False, normalized=True, special=False),
	50255: AddedToken("                       ", rstrip=False, lstrip=False, single_word=False, normalized=True, special=False),
	50256: AddedToken("                      ", rstrip=False, lstrip=False, s

In [31]:
structured_with_event_id_pdf['event_encoded'] = structured_with_event_id_pdf.apply(lambda row: f"{tokenizer.sep_token}{row['NewEventId']} {' '.join(param for param in row['ParsedParameterList'] if 'blk_' not in param)}", axis=1)
structured_with_event_id_pdf.head()


Unnamed: 0,LineId,Date,Time,Pid,Level,Component,Content,EventId,EventTemplate,ParameterList,BlockId,ParsedParameterList,NewEventId,event_encoded
0,1,81109,203518,143,INFO,dfs.DataNode$DataXceiver,Receiving block blk_-1608999687919862906 src: ...,09a53393,Receiving block <*> src: <*> dest: <*>,"['blk_-1608999687919862906', '/10.250.19.102:5...",blk_-1608999687919862906,"[blk_-1608999687919862906, /10.250.19.102:5410...",0,<|sep|>0 /10.250.19.102:54106 /10.250.19.102:5...
1,2,81109,203518,35,INFO,dfs.FSNamesystem,BLOCK* NameSystem.allocateBlock: /mnt/hadoop/m...,3d91fa85,BLOCK* NameSystem.allocateBlock: <*> <*>,['/mnt/hadoop/mapred/system/job_200811092030_0...,blk_-1608999687919862906,[/mnt/hadoop/mapred/system/job_200811092030_00...,6,<|sep|>6
2,3,81109,203519,143,INFO,dfs.DataNode$DataXceiver,Receiving block blk_-1608999687919862906 src: ...,09a53393,Receiving block <*> src: <*> dest: <*>,"['blk_-1608999687919862906', '/10.250.10.6:405...",blk_-1608999687919862906,"[blk_-1608999687919862906, /10.250.10.6:40524,...",0,<|sep|>0 /10.250.10.6:40524 /10.250.10.6:50010
3,4,81109,203519,145,INFO,dfs.DataNode$DataXceiver,Receiving block blk_-1608999687919862906 src: ...,09a53393,Receiving block <*> src: <*> dest: <*>,"['blk_-1608999687919862906', '/10.250.14.224:4...",blk_-1608999687919862906,"[blk_-1608999687919862906, /10.250.14.224:4242...",0,<|sep|>0 /10.250.14.224:42420 /10.250.14.224:5...
4,5,81109,203519,145,INFO,dfs.DataNode$PacketResponder,PacketResponder 1 for block blk_-1608999687919...,d38aa58d,PacketResponder <*> for block <*> <*>,"['1', 'blk_-1608999687919862906 terminating']",blk_-1608999687919862906,"[1, blk_-1608999687919862906 terminating]",2,<|sep|>2 1


In [32]:
encoded_blocks_series = structured_with_event_id_pdf.groupby("BlockId")['event_encoded'].apply(lambda x: "".join(x))
encoded_blocks_series.head()


BlockId
blk_-1000002529962039464    <|sep|>0 /10.251.123.1:41333 /10.251.123.1:500...
blk_-100000266894974466     <|sep|>6 <|sep|>0 /10.250.10.144:36204 /10.250...
blk_-1000007292892887521    <|sep|>0 /10.251.127.47:50228 /10.251.127.47:5...
blk_-1000014584150379967    <|sep|>0 /10.251.43.210:49254 /10.251.43.210:5...
blk_-1000028658773048709    <|sep|>0 /10.251.107.196:58917 /10.251.107.196...
Name: event_encoded, dtype: object

In [33]:
print(encoded_blocks_series.shape)
print(encoded_blocks_series.iloc[0])


(575061,)
<|sep|>0 /10.251.123.1:41333 /10.251.123.1:50010<|sep|>0 /10.251.123.1:53174 /10.251.123.1:50010<|sep|>0 /10.251.202.181:32980 /10.251.202.181:50010<|sep|>6 <|sep|>2 2<|sep|>3 3553241 /10.251.123.1<|sep|>2 0<|sep|>3 3553241 /10.251.202.181<|sep|>1 10.251.126.22:50010 3553241<|sep|>1 10.251.202.181:50010 3553241<|sep|>1 10.251.123.1:50010 3553241<|sep|>2 1<|sep|>3 3553241 /10.251.123.1


# Start with pretrained weights

The intuition is that the model will benefit some from understanding words and numbers (to some extent) when they appear, even if the structure of logs is very different from english sentences.  We can test this with an ablation study by randomizing the weights before training and then looking at the difference in the loss.

### Understanding Pythia Model Vocabulary Size Discrepancy

When loading a Pythia model from EleutherAI, I noticed a discrepancy between the model's embedding weight shape and the tokenizer vocabulary size:

```python
import torch
from transformers import GPTNeoXForCausalLM, GPTNeoXTokenizerFast

model = GPTNeoXForCausalLM.from_pretrained("EleutherAI/pythia-14m")
tokenizer = GPTNeoXTokenizerFast.from_pretrained("EleutherAI/pythia-14m")
model.get_input_embeddings().weight.data.shape
```

This outputs:
```
torch.Size([50304, 128])
```

However, the tokenizer's vocab size is:
```python
>>> tokenizer.vocab_size + len(tokenizer.added_tokens_encoder)
50279
```

Including special tokens, the vocab size is 50277.

The original 50304 dimensions confused me at first, but it turns out the size is padded in order to facilitate alignment with tensor cores. Specifically, `50304 = 2^7 * 3 * 131`, so the embedding size is a multiple of 128.

From [The Case for Co-Designing Model Architectures with Hardware](https://arxiv.org/pdf/2401.14489v2):

> Tensor Cores can be fully utilized when GEMM dimensions m, k, and n are multiples
> of 16 bytes and 128 bytes for V100 and A100 GPUs, respectively. Since a FP16
> element is 2 bytes, this corresponds to dimension sizes that are multiples of 8
> and 64 elements, respectively.

So it looks like the embedding size is a multiple of 64.

### Solution

Add padding to the embedding size to match the parallelization factor.
```
model.resize_token_embeddings(len(tokenizer), pad_to_multiple_of=64)
```



In [34]:
import torch

from transformers import GPTNeoXForCausalLM

def get_model():

    model = GPTNeoXForCausalLM.from_pretrained("EleutherAI/pythia-14m")
    model.resize_token_embeddings(len(tokenizer), pad_to_multiple_of=64)
    model.get_input_embeddings().weight.data.shape

    return model

model = get_model()

# Encode the blocks using the new tokenizer

In [35]:
encoded_blocks_pdf = encoded_blocks_series.to_frame()
encoded_blocks_pdf['encoded_block'] = encoded_blocks_pdf.event_encoded.apply(tokenizer.encode)


In [36]:
encoded_blocks_pdf

Unnamed: 0_level_0,event_encoded,encoded_block
BlockId,Unnamed: 1_level_1,Unnamed: 2_level_1
blk_-1000002529962039464,<|sep|>0 /10.251.123.1:41333 /10.251.123.1:500...,"[50277, 17, 1227, 740, 15, 21451, 15, 10683, 1..."
blk_-100000266894974466,<|sep|>6 <|sep|>0 /10.250.10.144:36204 /10.250...,"[50277, 23, 209, 50277, 17, 1227, 740, 15, 951..."
blk_-1000007292892887521,<|sep|>0 /10.251.127.47:50228 /10.251.127.47:5...,"[50277, 17, 1227, 740, 15, 21451, 15, 11946, 1..."
blk_-1000014584150379967,<|sep|>0 /10.251.43.210:49254 /10.251.43.210:5...,"[50277, 17, 1227, 740, 15, 21451, 15, 3079, 15..."
blk_-1000028658773048709,<|sep|>0 /10.251.107.196:58917 /10.251.107.196...,"[50277, 17, 1227, 740, 15, 21451, 15, 12224, 1..."
...,...,...
blk_999905757185707736,<|sep|>0 /10.251.39.160:41914 /10.251.39.160:5...,"[50277, 17, 1227, 740, 15, 21451, 15, 1867, 15..."
blk_999915040208161699,<|sep|>0 /10.251.43.210:46583 /10.251.43.210:5...,"[50277, 17, 1227, 740, 15, 21451, 15, 3079, 15..."
blk_999958959261325562,<|sep|>0 /10.251.203.246:56717 /10.251.203.246...,"[50277, 17, 1227, 740, 15, 21451, 15, 17490, 1..."
blk_999974850451006327,<|sep|>0 /10.251.126.5:32870 /10.251.126.5:500...,"[50277, 17, 1227, 740, 15, 21451, 15, 13381, 1..."


In [37]:
print(f"total token count: {encoded_blocks_pdf.encoded_block.apply(len).sum():,}")
encoded_blocks_pdf.encoded_block.apply(len).describe()

total token count: 137,942,766


count    575061.000000
mean        239.875015
std          85.098227
min          27.000000
25%         219.000000
50%         219.000000
75%         223.000000
max        5770.000000
Name: encoded_block, dtype: float64

In [38]:
encoded_blocks_pdf.encoded_block.apply(len).describe(percentiles=[0.01, 0.05, 0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.99])


count    575061.000000
mean        239.875015
std          85.098227
min          27.000000
1%           54.000000
5%          174.000000
10%         174.000000
25%         219.000000
50%         219.000000
75%         223.000000
90%         343.000000
95%         405.000000
99%         476.000000
max        5770.000000
Name: encoded_block, dtype: float64

In [40]:
device = torch.device(
    "mps" if torch.backends.mps.is_available()
    else "cuda" if torch.backends.cuda.is_available()
    else "cpu"
)
print(f"Using device: {device}")

Using device: mps


# Train/val/test split

We are using a random split, so this means that we are assuming a stationary distribution for our logs.  We will add complexity later


In [41]:
from sklearn.model_selection import train_test_split

# Merge with anomaly labels
encoded_blocks_with_labels = encoded_blocks_pdf.merge(
    anomaly_label_df,
    left_index=True,
    right_on='BlockId'
)

# Split into train/test sets (80/20 split)
train_df, val_test_df = train_test_split(
    encoded_blocks_with_labels,
    test_size=0.2,
    random_state=42,
    stratify=encoded_blocks_with_labels['Label']
)

print(f"Training samples: {len(train_df)}")

# Split into train/test sets (80/20 split)
val_df, test_df = train_test_split(
    val_test_df,
    test_size=0.5,
    random_state=42,
)

print(f"Val samples:  {len(val_df)}")
print(f"Test samples: {len(val_df)}")

Training samples: 460048
Val samples:  57506
Test samples: 57506


In [43]:
train_df

Unnamed: 0,event_encoded,encoded_block,BlockId,Label
257494,<|sep|>0 /10.251.67.211:54457 /10.251.67.211:5...,"[50277, 17, 1227, 740, 15, 21451, 15, 2251, 15...",blk_-4040947678439826686,Normal
49365,<|sep|>6 <|sep|>0 /10.251.106.37:36707 /10.251...,"[50277, 23, 209, 50277, 17, 1227, 740, 15, 214...",blk_1870752360007129176,Normal
7319,<|sep|>6 <|sep|>0 /10.251.121.224:40809 /10.25...,"[50277, 23, 209, 50277, 17, 1227, 740, 15, 214...",blk_-1999301527305082358,Normal
295080,<|sep|>0 /10.251.123.20:56258 /10.251.123.20:5...,"[50277, 17, 1227, 740, 15, 21451, 15, 10683, 1...",blk_-2322520798745751605,Normal
64733,<|sep|>6 <|sep|>0 /10.251.107.242:55242 /10.25...,"[50277, 23, 209, 50277, 17, 1227, 740, 15, 214...",blk_-4090429635427697097,Normal
...,...,...,...,...
424427,<|sep|>0 /10.251.37.240:42153 /10.251.37.240:5...,"[50277, 17, 1227, 740, 15, 21451, 15, 1787, 15...",blk_4272247743717120753,Normal
403348,<|sep|>0 /10.251.215.50:36443 /10.251.215.50:5...,"[50277, 17, 1227, 740, 15, 21451, 15, 21351, 1...",blk_1218092075075778522,Normal
253046,<|sep|>0 /10.250.11.53:53272 /10.250.11.53:500...,"[50277, 17, 1227, 740, 15, 9519, 15, 883, 15, ...",blk_-4591257497708039986,Normal
495499,<|sep|>0 /10.251.125.174:53652 /10.251.125.174...,"[50277, 17, 1227, 740, 15, 21451, 15, 9312, 15...",blk_-4092465791855115484,Normal


In [49]:
# Set up training parameters
BATCH_SIZE = 4  
MAX_LENGTH = 343  # Truncate sequences to manage memory
LEARNING_RATE = 1e-4
NUM_EPOCHS = 1



In [45]:
import os, wandb

wandb.login(key=os.getenv("WANDB_API_KEY"))


huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
[34m[1mwandb[0m: Using wandb-core as the SDK backend.  Please refer to https://wandb.me/wandb-core for more information.
huggingface/tokenizers: The

True

In [62]:
import gc
import psutil

def print_memory_stats(prefix=""):
    """Detailed memory statistics"""
    if torch.cuda.is_available():
        allocated = torch.cuda.memory_allocated() / 1024**3
        reserved = torch.cuda.memory_reserved() / 1024**3
    elif torch.backends.mps.is_available():
        allocated = torch.mps.current_allocated_memory() / 1024**3
        reserved = torch.mps.driver_allocated_memory() / 1024**3
    else:
        allocated = reserved = 0

    print(f"\n{prefix} Memory Status:")
    print(f"├── Allocated: {allocated:.2f} GB (actively used by tensors)")
    print(f"├── Reserved:  {reserved:.2f} GB (held by driver)")
    print(f"├── Cached:    {(reserved - allocated):.2f} GB (reserved - allocated)")

    # System memory info
    vm = psutil.virtual_memory()
    print(f"└── System Available: {vm.available / 1024**3:.2f} GB")

def get_gpu_memory_metrics():
    """Get system metrics for logging"""
    if torch.cuda.is_available():
        return {
            "gpu_memory_allocated_gb": torch.cuda.memory_allocated() / (1024**3),
            "gpu_memory_reserved_gb": torch.cuda.memory_reserved() / (1024**3),
        }
    elif torch.backends.mps.is_available():
        return {
            "gpu_memory_allocated_gb": torch.mps.current_allocated_memory() / (1024**3),
            "gpu_memory_reserved_gb": torch.mps.driver_allocated_memory() / (1024**3),
        }
    return {
        "gpu_memory_allocated_gb": 0,
        "gpu_memory_reserved_gb": 0,
    }

def clear_memory():
    """Explicitly clear memory"""
    gc.collect()
    if torch.cuda.is_available():
        torch.cuda.empty_cache()
    elif torch.backends.mps.is_available():
        torch.mps.empty_cache()

In [None]:
structured_with_event_id_pdf = structured_df.merge(event_id_mapping_pdf, on="EventId")
structured_with_event_id_pdf.head()

Unnamed: 0,LineId,Date,Time,Pid,Level,Component,Content,EventId,EventTemplate,ParameterList,BlockId,ParsedParameterList,NewEventId
0,1,81109,203518,143,INFO,dfs.DataNode$DataXceiver,Receiving block blk_-1608999687919862906 src: ...,09a53393,Receiving block <*> src: <*> dest: <*>,"['blk_-1608999687919862906', '/10.250.19.102:5...",blk_-1608999687919862906,"[blk_-1608999687919862906, /10.250.19.102:5410...",0
1,2,81109,203518,35,INFO,dfs.FSNamesystem,BLOCK* NameSystem.allocateBlock: /mnt/hadoop/m...,3d91fa85,BLOCK* NameSystem.allocateBlock: <*> <*>,['/mnt/hadoop/mapred/system/job_200811092030_0...,blk_-1608999687919862906,[/mnt/hadoop/mapred/system/job_200811092030_00...,6
2,3,81109,203519,143,INFO,dfs.DataNode$DataXceiver,Receiving block blk_-1608999687919862906 src: ...,09a53393,Receiving block <*> src: <*> dest: <*>,"['blk_-1608999687919862906', '/10.250.10.6:405...",blk_-1608999687919862906,"[blk_-1608999687919862906, /10.250.10.6:40524,...",0
3,4,81109,203519,145,INFO,dfs.DataNode$DataXceiver,Receiving block blk_-1608999687919862906 src: ...,09a53393,Receiving block <*> src: <*> dest: <*>,"['blk_-1608999687919862906', '/10.250.14.224:4...",blk_-1608999687919862906,"[blk_-1608999687919862906, /10.250.14.224:4242...",0
4,5,81109,203519,145,INFO,dfs.DataNode$PacketResponder,PacketResponder 1 for block blk_-1608999687919...,d38aa58d,PacketResponder <*> for block <*> <*>,"['1', 'blk_-1608999687919862906 terminating']",blk_-1608999687919862906,"[1, blk_-1608999687919862906 terminating]",2


In [79]:
# import numpy as np

# device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
# model = get_model().to(device)

# def find_optimal_batch_size(start_size=1, max_size=32):
#     """Find the largest batch size that fits in memory with detailed logging"""

#     print("Testing batch sizes...")
#     for batch_size in [2**i for i in range(start_size, int(np.log2(max_size)) + 1)]:
#         try:
#             print(f"\nTesting batch size {batch_size}")
#             print_memory_stats("Initial")

#             # Create test dataloader
#             test_loader = torch.utils.data.DataLoader(
#                 dataset,
#                 batch_size=batch_size,
#                 shuffle=True,
#                 collate_fn=dataloader.collate_fn
#             )

#             # Get batch
#             print("Loading batch...")
#             batch = next(iter(test_loader))
#             print_memory_stats("After batch load")

#             # Move to device
#             print("Moving to device...")
#             input_ids = batch['input_ids'].to(device)
#             attention_mask = batch['attention_mask'].to(device)
#             print_memory_stats("After moving to device")

#             # Forward pass
#             print("Forward pass...")
#             outputs = model(
#                 input_ids=input_ids,
#                 attention_mask=attention_mask,
#                 labels=input_ids
#             )
#             print_memory_stats("After forward pass")

#             # Backward pass
#             print("Backward pass...")
#             loss = outputs.loss
#             loss.backward()
#             print_memory_stats("After backward pass")

#             # Clean up
#             del outputs, loss, input_ids, attention_mask
#             torch.mps.empty_cache()
#             gc.collect()
#             print_memory_stats("After cleanup")

#         except RuntimeError as e:
#             print(f"\nBatch size {batch_size} failed!")
#             print(f"Error: {str(e)[:200]}...")
#             return batch_size//2

#     return max_size

# # Find optimal batch size with detailed memory tracking
# optimal_batch_size = find_optimal_batch_size()
# print(f"Optimal batch size: {optimal_batch_size}")


In [76]:
# Create DataLoader
class HDFSDataset(torch.utils.data.Dataset):
    def __init__(self, encoded_blocks, max_length):
        self.encoded_blocks = encoded_blocks
        self.max_length = max_length

    def __len__(self):
        return len(self.encoded_blocks)

    def __getitem__(self, idx):
        tokens = self.encoded_blocks.iloc[idx]['encoded_block']
        # Truncate if needed
        if len(tokens) > self.max_length:
            tokens = tokens[:self.max_length]

        # Convert to tensor and pad
        input_ids = torch.tensor(tokens, dtype=torch.long)
        attention_mask = torch.ones_like(input_ids)

        return {
            'input_ids': input_ids,
            'attention_mask': attention_mask,
        }

def create_dataloader(encoded_pdf, tokenizer):

    dataset = HDFSDataset(encoded_pdf, MAX_LENGTH)
    dataloader = torch.utils.data.DataLoader(
        dataset,
        batch_size=BATCH_SIZE,
        shuffle=True,
        collate_fn=lambda x: {
            'input_ids': torch.nn.utils.rnn.pad_sequence(
                [item['input_ids'] for item in x],
                batch_first=True,
                padding_value=tokenizer.pad_token_id if tokenizer.pad_token_id else 0
            ),
            'attention_mask': torch.nn.utils.rnn.pad_sequence(
                [item['attention_mask'] for item in x],
                batch_first=True,
                padding_value=0
            )
        }
    )

    return dataloader

dataloader = create_dataloader(train_df, tokenizer)

In [77]:

print_memory_stats()


 Memory Status:
├── Allocated: 0.26 GB (actively used by tensors)
├── Reserved:  11.60 GB (held by driver)
├── Cached:    11.34 GB (reserved - allocated)
└── System Available: 2.65 GB


In [78]:
clear_memory()

In [89]:
def evaluate_model(model, dataloader, device):
    """
    Evaluate the model on the provided dataloader with detailed perplexity metrics
    """
    model.eval()
    total_loss = 0
    num_batches = 0
    all_perplexities = []
    
    with torch.no_grad():
        for batch in dataloader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            
            outputs = model(
                input_ids=input_ids,
                attention_mask=attention_mask,
                labels=input_ids
            )
            
            # Calculate per-token perplexity
            loss = outputs.loss
            batch_perplexity = torch.exp(outputs.logits[..., :-1, :].log_softmax(-1).gather(
                -1, input_ids[..., 1:].unsqueeze(-1)
            ).squeeze(-1) * -1)
            
            # Mask out padding tokens
            mask = attention_mask[..., 1:].bool()
            valid_perplexities = batch_perplexity[mask].cpu().numpy()
            all_perplexities.extend(valid_perplexities.tolist())
            
            total_loss += loss.item()
            num_batches += 1
            
            wandb.log({
                "eval/batch_loss": loss.item(),
                **get_gpu_memory_metrics()
            })
    
    # Calculate percentiles
    percentiles = np.percentile(all_perplexities, [50, 75, 90, 95, 99, 100])
    
    # Log to terminal
    print("\nPerplexity Percentiles:")
    print(f"50th:       {percentiles[0]:.2f}")
    print(f"75th:       {percentiles[1]:.2f}")
    print(f"90th:       {percentiles[2]:.2f}")
    print(f"95th:       {percentiles[3]:.2f}")
    print(f"99th:       {percentiles[4]:.2f}")
    print(f"Max (100th): {percentiles[5]:.2f}")
    
    # Log to wandb
    wandb.log({
        "eval/avg_loss": total_loss / num_batches,
        "eval/perplexity_p50": percentiles[0],
        "eval/perplexity_p75": percentiles[1],
        "eval/perplexity_p90": percentiles[2],
        "eval/perplexity_p95": percentiles[3],
        "eval/perplexity_p99": percentiles[4],
        "eval/perplexity_max": percentiles[5],
    })
    
    return total_loss / num_batches

def train_model(model, dataloader, optimizer, device, steps=None, start_batch=0):
    """
    Train the model for a specified number of steps or until the dataloader is exhausted.
    
    Args:
        model: The model to train
        dataloader: DataLoader containing the training data
        optimizer: The optimizer to use
        device: The device to train on
        steps (int, optional): Number of steps to train. If None, train on all remaining batches
        start_batch (int): The batch index to start from (for resuming training)
    
    Returns:
        tuple: (global_step, batch_idx) - The current global step and batch index for resuming
    """
    model.train()
    global_step = start_batch
    total_loss = 0
    
    for batch_idx, batch in enumerate(dataloader, start=start_batch):
        # Check if we've reached the requested number of steps
        if steps is not None and (batch_idx - start_batch) >= steps:
            break
            
        # Move batch to device
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)

        # Forward pass
        outputs = model(
            input_ids=input_ids,
            attention_mask=attention_mask,
            labels=input_ids
        )

        loss = outputs.loss
        total_loss += loss.item()

        # Backward pass
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Print progress every 100 batches
        if batch_idx % 100 == 0:
            print(f"Batch {batch_idx}, Loss: {loss.item():.4f}")

        wandb.log({
            "train/batch_loss": loss.item(),
            "train/batch": batch_idx,
            **get_gpu_memory_metrics()
        }, step=global_step)
        
        global_step += 1

    avg_loss = total_loss / (batch_idx - start_batch + 1)
    print(f"Training complete. Average loss: {avg_loss:.4f}")
    wandb.log({
        "train/avg_loss": avg_loss,
    })
    
    return global_step, batch_idx

In [70]:
# Move model to MPS device if available, otherwise CPU
model = get_model().to(device)

# Set up optimizer
optimizer = torch.optim.AdamW(model.parameters(), lr=LEARNING_RATE)

wandb.init(
    project="log-analysis-pythia",
    config={
        "batch_size": BATCH_SIZE,
        "max_length": MAX_LENGTH,
        "learning_rate": LEARNING_RATE,
        "epochs": NUM_EPOCHS,
        "model": "pythia-14m",
    }
)

for i in range()
current_step, current_batch = train_model(model, dataloader, optimizer, device, steps=10)


huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


Batch 0, Loss: 88.6503
Training complete. Average loss: 32.0207


In [71]:
# Resume training for another 500 steps
current_step, current_batch = train_model(model, dataloader, optimizer, device, steps=100, start_batch=current_batch)


Batch 100, Loss: 0.8032
Training complete. Average loss: 1.7085


In [73]:
print_memory_stats()


 Memory Status:
├── Allocated: 0.26 GB (actively used by tensors)
├── Reserved:  11.60 GB (held by driver)
├── Cached:    11.34 GB (reserved - allocated)
└── System Available: 2.60 GB


In [80]:
eval_dataloader = create_dataloader(val_df[:100], tokenizer)

In [87]:
def evaluate_model(model, dataloader, device):
    """
    Evaluate the model on the provided dataloader with detailed perplexity metrics
    """
    model.eval()
    total_loss = 0
    num_batches = 0
    all_perplexities = []
    
    with torch.no_grad():
        for batch in dataloader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            
            outputs = model(
                input_ids=input_ids,
                attention_mask=attention_mask,
                labels=input_ids
            )
            
            # Calculate per-token perplexity
            loss = outputs.loss
            batch_perplexity = torch.exp(outputs.logits[..., :-1, :].log_softmax(-1).gather(
                -1, input_ids[..., 1:].unsqueeze(-1)
            ).squeeze(-1) * -1)
            
            # Mask out padding tokens
            mask = attention_mask[..., 1:].bool()
            valid_perplexities = batch_perplexity[mask].cpu().numpy()
            all_perplexities.extend(valid_perplexities.tolist())
            
            total_loss += loss.item()
            num_batches += 1
            
            wandb.log({
                "eval/batch_loss": loss.item(),
                **get_gpu_memory_metrics()
            })
    
    # Calculate percentiles
    percentiles = np.percentile(all_perplexities, [50, 75, 90, 95, 99, 100])
    
    # Log to terminal
    print("\nPerplexity Percentiles:")
    print(f"50th:       {percentiles[0]:.2f}")
    print(f"75th:       {percentiles[1]:.2f}")
    print(f"90th:       {percentiles[2]:.2f}")
    print(f"95th:       {percentiles[3]:.2f}")
    print(f"99th:       {percentiles[4]:.2f}")
    print(f"Max (100th): {percentiles[5]:.2f}")
    
    # Log to wandb
    wandb.log({
        "eval/avg_loss": total_loss / num_batches,
        "eval/perplexity_p50": percentiles[0],
        "eval/perplexity_p75": percentiles[1],
        "eval/perplexity_p90": percentiles[2],
        "eval/perplexity_p95": percentiles[3],
        "eval/perplexity_p99": percentiles[4],
        "eval/perplexity_max": percentiles[5],
    })
    
    return total_loss / num_batches


In [96]:
for i in range(20):
    current_step, current_batch = train_model(model, dataloader, optimizer, device, steps=100, start_batch=current_batch)

    eval_dataloader = create_dataloader(val_df[:100], tokenizer)
    evaluate_model(model, eval_dataloader, device)



Batch 200, Loss: 0.6343
Training complete. Average loss: 0.6529

Perplexity Percentiles:
50th:       1.03
75th:       1.43
90th:       8.40
95th:       154.32
99th:       5150.98
Max (100th): 381348099391488.00
Batch 300, Loss: 0.4443
Training complete. Average loss: 0.5414

Perplexity Percentiles:
50th:       1.01
75th:       1.25
90th:       4.67
95th:       91.25
99th:       2622.33
Max (100th): 457935040.00
Batch 400, Loss: 0.4773
Training complete. Average loss: 0.4629

Perplexity Percentiles:
50th:       1.00
75th:       1.13
90th:       4.25
95th:       80.53
99th:       1911.06
Max (100th): 622719467520.00
Batch 500, Loss: 0.3337
Training complete. Average loss: 0.4617

Perplexity Percentiles:
50th:       1.00
75th:       1.08
90th:       3.74
95th:       68.68
99th:       1574.25
Max (100th): 1313793900544.00
Batch 600, Loss: 0.4176
Training complete. Average loss: 0.4068

Perplexity Percentiles:
50th:       1.00
75th:       1.07
90th:       3.23
95th:       60.06
99th:       

In [97]:

# Save model to HuggingFace Hub
model_name = "pythia-14m-hdfs-logs"
model.push_to_hub(
    f"honicky/{model_name}", 
    token=os.environ["HF_WRITE_TOKEN"],
    commit_message=f"Trained {current_step} steps"
)

# Save tokenizer with the added special tokens
tokenizer.push_to_hub(
    f"honicky/{model_name}",
    token=os.environ["HF_WRITE_TOKEN"],
    commit_message="Tokenizer with added special tokens for HDFS logs"
)

# Save model config and training details
with open("README.md", "w") as f:
    f.write(f"""---
language: en
tags:
- log-analysis
- pythia
- hdfs
license: mit
datasets:
- honicky/log-analysis-hdfs-preprocessed
metrics:
- cross-entropy
- perplexity
base_model: EleutherAI/pythia-14m
---

# {model_name}

Fine-tuned Pythia-14m model for HDFS log analysis, specifically for anomaly detection.

## Model Description

This model is fine-tuned from `EleutherAI/pythia-14m` for analyzing HDFS log sequences. It's designed to understand and predict patterns in
HDFS log data so that we can detect anomalies using the perplexity of the log sequence. THhe HDFS sequence is handy because it has labels
so we can use it to validate that the model can predict anomalies. 

We will use this model to understand the ability of a small model to predict anomalies in a specific dataset.  We will study model scale
and experiment with tokenization, intialization, data set size, etc. to find a configuration that is minimal in size and fast, but can
effectively predict anomalies.  We will then attempt build a model that is more robust to different log formats.

## Training Details
- Base model: EleutherAI/pythia-14m
- Dataset: https://zenodo.org/records/8196385/files/HDFS_v1.zip?download=1 + preprocessed data at honicky/log-analysis-hdfs-preprocessed
- Batch size: {BATCH_SIZE}
- Max sequence length: {MAX_LENGTH}
- Learning rate: {LEARNING_RATE}
- Training steps: {current_step}

## Special Tokens
- Added `<|sep|>` token for event ID separation

## Intended Use
This model is intended for:
- Analyzing HDFS log sequences
- Detecting anomalies in log patterns
- Understanding system behavior through log analysis

## Limitations
- Model is specifically trained on HDFS logs and may not generalize to other log formats
- Limited to the context window size of {MAX_LENGTH} tokens


""")


# Push README
from huggingface_hub import HfApi
api = HfApi()
api.upload_file(
    path_or_fileobj="README.md",
    path_in_repo="README.md",
    repo_id=f"honicky/{model_name}",
    token=os.environ["HF_WRITE_TOKEN"],
    commit_message="Add model documentation"
)

model.safetensors: 100%|██████████| 56.3M/56.3M [00:04<00:00, 13.8MB/s]
No files have been modified since last commit. Skipping to prevent empty commit.


CommitInfo(commit_url='https://huggingface.co/honicky/pythia-14m-hdfs-logs/commit/8db8d8c92c9a229bff68b884262ad2a5c532181a', commit_message='Add model documentation', commit_description='', oid='8db8d8c92c9a229bff68b884262ad2a5c532181a', pr_url=None, repo_url=RepoUrl('https://huggingface.co/honicky/pythia-14m-hdfs-logs', endpoint='https://huggingface.co', repo_type='model', repo_id='honicky/pythia-14m-hdfs-logs'), pr_revision=None, pr_num=None)