# 00 · Prepare Data for LogBERT Pipeline

This notebook downloads public log datasets, applies regex normalization, mines templates with Drain3, and materializes tokenized Parquet splits ready for training and evaluation.

## Notebook Goals
- Install pinned dependencies for the workstation environment.
- Fetch HDFS and OpenStack log corpora with checksum verification and automatic mirror fallback.
- Apply regex-based normalization rules from `configs/data.yaml` and preview before/after examples.
- Mine log templates in streaming mode with Drain3 and persist template transitions as Parquet.
- Build Hugging Face datasets with BERT-compatible tokenization, time-based splits (80/10/10), and truncation stats.
- Save artifacts (tokenizer, processed Parquet, metadata) for downstream notebooks.

## 1. Environment Setup

In [1]:
import os, sys, subprocess
from pathlib import Path

if os.environ.get('SKIP_REQUIREMENTS', '0') == '1':
    print('SKIP_REQUIREMENTS=1 -> skipping pip install from requirements.txt')
else:
    req_path = Path('requirements.txt')
    if req_path.exists():
        print('[setup] Installing dependencies from requirements.txt ...')
        completed = subprocess.run([sys.executable, '-m', 'pip', 'install', '-r', str(req_path)])
        if completed.returncode != 0:
            raise RuntimeError('pip installation failed; inspect output above.')
    else:
        print('requirements.txt not found; skipping installation')

requirements.txt not found; skipping installation


In [2]:
import json
import math
import tarfile
import hashlib
import shutil
import textwrap
import gc
from pathlib import Path
from datetime import datetime
from typing import Dict, Iterable, List, Optional, Tuple

import pandas as pd
import numpy as np
from tqdm.auto import tqdm
from drain3 import TemplateMiner
from drain3.template_miner_config import TemplateMinerConfig
from transformers import AutoTokenizer

import yaml
import requests

  import pynvml  # type: ignore[import]


In [3]:
# Optional: Install polars for faster data loading
# !pip install polars

import polars as pl

### Load Configuration and Prepare Folders

In [4]:
!ls

00_prepare_data.ipynb	02_finetune_openstack.ipynb  data
01_pretrain_hdfs.ipynb	artifacts


In [4]:
def load_yaml(path: Path) -> Dict:
    with path.open('r') as fh:
        return yaml.safe_load(fh)

data_config = load_yaml(Path('../configs/data.yaml'))
RAW_HDFS_DIR = Path(data_config['raw_paths']['hdfs'])
RAW_OPENSTACK_DIR = Path(data_config['raw_paths']['openstack'])
ARTIFACTS_DIR = Path(data_config['artifacts_dir'])
TOKENIZER_DIR = ARTIFACTS_DIR / 'tokenizer'
PARQUET_DIR = Path(data_config['preprocessing']['parquet_dir'])
METADATA_PATH = Path(data_config['preprocessing']['dataset_metadata'])
for folder in [RAW_HDFS_DIR, RAW_OPENSTACK_DIR, ARTIFACTS_DIR, TOKENIZER_DIR, PARQUET_DIR, METADATA_PATH.parent]:
    folder.mkdir(parents=True, exist_ok=True)
print('[setup] Configuration loaded and folders prepared.')

[setup] Configuration loaded and folders prepared.


## 2. Download Public Datasets

In [6]:
def stream_download(url: str, destination: Path) -> bool:
    chunk = 1 << 20
    try:
        with requests.get(url, stream=True, timeout=60) as resp:
            resp.raise_for_status()
            total = int(resp.headers.get('content-length', 0))
            with destination.open('wb') as fh, tqdm(total=total, unit='B', unit_scale=True, desc=f'download {destination.name}') as bar:
                for part in resp.iter_content(chunk_size=chunk):
                    if part:
                        fh.write(part)
                        bar.update(len(part))
        return True
    except Exception as exc:
        print(f'[warn] download failed from {url}: {exc}')
        return False

def sha256sum(path: Path) -> str:
    hash_obj = hashlib.sha256()
    with path.open('rb') as fh:
        for chunk in iter(lambda: fh.read(1 << 20), b''):
            hash_obj.update(chunk)
    return hash_obj.hexdigest()

def ensure_download(urls: List[str], target: Path, expected_sha: Optional[str] = None) -> Path:
    for url in urls:
        if stream_download(url, target):
            break
    else:
        raise RuntimeError(f'all download mirrors failed for {target.name}')
    checksum = sha256sum(target)
    if expected_sha and checksum != expected_sha:
        raise ValueError(f'sha mismatch for {target.name}: expected {expected_sha}, got {checksum}')
    return target

def maybe_update_sha(config_section: Dict, key: str, computed_sha: str) -> None:
    current = config_section.get(key)
    if isinstance(current, str) and current:
        return
    config_section[key] = computed_sha
    with Path('../configs/data.yaml').open('w') as fh:
        yaml.safe_dump(data_config, fh, sort_keys=False)
    print(f'[sha] recorded SHA256 for {key}: {computed_sha}')

In [7]:
hdfs_cfg = data_config['datasets']['hdfs']
archive_path = RAW_HDFS_DIR / hdfs_cfg['archive_name']
archive_path.parent.mkdir(parents=True, exist_ok=True)

# Check if archive already exists before downloading
if archive_path.exists():
    print('[download] HDFS archive already exists, skipping download')
    computed_sha = sha256sum(archive_path)
    maybe_update_sha(hdfs_cfg, 'sha256', computed_sha)
else:
    print('[download] HDFS corpus')
    ensure_download(hdfs_cfg['urls'], archive_path, hdfs_cfg.get('sha256') or None)
    computed_sha = sha256sum(archive_path)
    maybe_update_sha(hdfs_cfg, 'sha256', computed_sha)

if not (RAW_HDFS_DIR / hdfs_cfg['log_file']).exists():
    print('[extract] HDFS archive')
    with tarfile.open(archive_path, 'r:gz') as tf:
        tf.extractall(RAW_HDFS_DIR)
else:
    print('[extract] HDFS log already present')

hdfs_log_path = RAW_HDFS_DIR / hdfs_cfg['log_file']
print(f'[ready] HDFS log at {hdfs_log_path}')

[download] HDFS archive already exists, skipping download
[extract] HDFS log already present
[ready] HDFS log at data/hdfs/raw/HDFS.log


In [8]:
openstack_cfg = data_config['datasets']['openstack']
RAW_OPENSTACK_DIR.mkdir(parents=True, exist_ok=True)
paths: Dict[str, Path] = {}
print('[download] OpenStack normal logs')
normal_target = RAW_OPENSTACK_DIR / 'openstack_normal.log'
normal_urls = openstack_cfg['normal_urls']
normal_sha_cfg = (openstack_cfg.get('sha256') or {}).get('normal')
normal_acquired = False
if normal_target.exists():
    print('[cache] Detected existing openstack_normal.log, skipping download.')
    normal_acquired = True
    normal_urls = []
    maybe_update_sha(openstack_cfg['sha256'], 'normal', sha256sum(normal_target))
if not normal_acquired:
    for url in normal_urls:
        try:
            if url.endswith('.tar.gz'):
                tmp_tar = RAW_OPENSTACK_DIR / 'openstack_bundle.tar.gz'
                ensure_download([url], tmp_tar, None)
                with tarfile.open(tmp_tar, 'r:gz') as tf:
                    member = next((m for m in tf.getmembers() if m.name.endswith('openstack_normal.log')), None)
                    if member is None:
                        raise ValueError('openstack_normal.log not found inside archive')
                    tf.extract(member, RAW_OPENSTACK_DIR)
                    extracted_path = RAW_OPENSTACK_DIR / member.name
                    final_path = RAW_OPENSTACK_DIR / 'openstack_normal.log'
                    final_path.write_bytes(extracted_path.read_bytes())
                    if extracted_path.exists():
                        extracted_path.unlink()
                    extracted_dir = extracted_path.parent
                    if extracted_dir != RAW_OPENSTACK_DIR and extracted_dir.exists():
                        shutil.rmtree(extracted_dir, ignore_errors=True)
                tmp_tar.unlink(missing_ok=True)
                normal_acquired = True
            else:
                ensure_download([url], normal_target, normal_sha_cfg)
                normal_acquired = True
            if normal_acquired:
                maybe_update_sha(openstack_cfg['sha256'], 'normal', sha256sum(normal_target))
                break
        except Exception as exc:
            print(f'[warn] fallback download failed from {url}: {exc}')
if not normal_acquired:
    raise RuntimeError('All download mirrors failed for OpenStack normal logs')
paths['normal'] = normal_target

print('[download] OpenStack abnormal logs')
abnormal_target = RAW_OPENSTACK_DIR / 'openstack_abnormal.log'
if abnormal_target.exists():
    print('[cache] Detected existing openstack_abnormal.log, skipping download.')
else:
    ensure_download(openstack_cfg['abnormal_urls'], abnormal_target, (openstack_cfg.get('sha256') or {}).get('abnormal'))
    maybe_update_sha(openstack_cfg['sha256'], 'abnormal', sha256sum(abnormal_target))
paths['abnormal'] = abnormal_target
print(f'[ready] OpenStack logs -> {paths}')


[download] OpenStack normal logs
[cache] Detected existing openstack_normal.log, skipping download.
[download] OpenStack abnormal logs
[cache] Detected existing openstack_abnormal.log, skipping download.
[ready] OpenStack logs -> {'normal': PosixPath('data/openstack/raw/openstack_normal.log'), 'abnormal': PosixPath('data/openstack/raw/openstack_abnormal.log')}


## 3. Load and Inspect Raw Logs

In [9]:
def read_hdfs_log(path: Path, sample_fraction: float = 0.5) -> pd.DataFrame:
    """Read HDFS log file efficiently with sampling"""
    print(f"[hdfs] Reading {sample_fraction*100:.0f}% of HDFS log file...")
    
    # First, count total lines for sampling
    with open(path, 'r', encoding='utf-8', errors='ignore') as f:
        total_lines = sum(1 for _ in f if _.strip())
    
    target_lines = int(total_lines * sample_fraction)
    step = max(1, total_lines // target_lines)
    
    print(f"[hdfs] Total lines: {total_lines:,}, sampling every {step} lines -> {target_lines:,} records")
    
    # Read with sampling
    sampled_lines = []
    with open(path, 'r', encoding='utf-8', errors='ignore') as f:
        for i, line in enumerate(f):
            if line.strip() and i % step == 0:
                sampled_lines.append(line.strip())
                if len(sampled_lines) >= target_lines:
                    break
    
    df = pd.DataFrame({'raw': sampled_lines})
    
    # HDFS format: YYMMDD HHMMSS (e.g., "081109 203518")
    def parse_hdfs_timestamp(timestamp_str):
        try:
            if len(timestamp_str) >= 13:
                date_part = timestamp_str[:6]  # YYMMDD
                time_part = timestamp_str[7:13]  # HHMMSS
                # Convert to YYYY-MM-DD HH:MM:SS format
                year = '20' + date_part[:2]
                month = date_part[2:4]
                day = date_part[4:6]
                hour = time_part[:2]
                minute = time_part[2:4]
                second = time_part[4:6]
                formatted = f"{year}-{month}-{day} {hour}:{minute}:{second}"
                return pd.to_datetime(formatted)
        except:
            pass
        return pd.NaT
    
    df['timestamp'] = df['raw'].apply(parse_hdfs_timestamp)
    df['content'] = df['raw']
    result_df = df.dropna(subset=['timestamp']).reset_index(drop=True)
    print(f"[hdfs] Processed {len(result_df):,} valid records")
    return result_df

def read_openstack_logs(paths: Dict[str, Path]) -> pd.DataFrame:
    frames = []
    for label, path in paths.items():
        print(f"[openstack] Reading {label} logs...")
        # Read the log file as plain text lines
        with open(path, 'r', encoding='utf-8', errors='ignore') as f:
            lines = [line.strip() for line in f if line.strip()]
        
        df = pd.DataFrame({'raw': lines})
        df['timestamp'] = pd.to_datetime(df['raw'].str.extract(r'(\d{4}-\d{2}-\d{2} [^ ]+)')[0], errors='coerce')
        df['label'] = 1 if label == 'abnormal' else 0
        df['content'] = df['raw']
        valid_df = df.dropna(subset=['timestamp'])
        print(f"[openstack] {label}: {len(valid_df):,} valid records")
        frames.append(valid_df)
    return pd.concat(frames, ignore_index=True).sort_values('timestamp').reset_index(drop=True)

In [10]:
# Alternative: Fast Polars-based implementation
def read_hdfs_log_fast(path: Path, sample_fraction: float = 0.5) -> pd.DataFrame:
    """Ultra-fast HDFS log reading using Polars with streaming"""
    print(f"[hdfs-fast] Reading {sample_fraction*100:.0f}% of HDFS log with Polars...")
    
    try:
        # Read the entire file as text lines first (Polars way)
        df_pl = pl.read_csv(
            path,
            has_header=False,
            new_columns=['raw'],
            separator='\x00',  # Use null separator so each line is one field
            ignore_errors=True,
            truncate_ragged_lines=True,
            encoding='utf8-lossy'
        )
        
        # Filter out empty lines
        df_pl = df_pl.filter(pl.col('raw').str.len_chars() > 0)
        
        # Sample the DataFrame
        if sample_fraction < 1.0:
            df_pl = df_pl.sample(fraction=sample_fraction, seed=42)
        
        # Convert to pandas for compatibility
        df = df_pl.to_pandas()
        
        # Parse timestamps (vectorized)
        def parse_hdfs_timestamp_vectorized(series):
            # Extract timestamp part more efficiently
            timestamp_part = series.str[:13]
            
            # Vectorized parsing using pandas datetime
            parsed = pd.to_datetime(
                '20' + timestamp_part.str[:2] + '-' + 
                timestamp_part.str[2:4] + '-' + 
                timestamp_part.str[4:6] + ' ' +
                timestamp_part.str[7:9] + ':' + 
                timestamp_part.str[9:11] + ':' + 
                timestamp_part.str[11:13],
                errors='coerce'
            )
            return parsed
        
        df['timestamp'] = parse_hdfs_timestamp_vectorized(df['raw'])
        df['content'] = df['raw']
        
        result_df = df.dropna(subset=['timestamp']).reset_index(drop=True)
        print(f"[hdfs-fast] Processed {len(result_df):,} valid records (Polars)")
        return result_df
        
    except ImportError:
        print("[hdfs-fast] Polars not available, falling back to pandas method")
        return read_hdfs_log(path, sample_fraction)
    except Exception as e:
        print(f"[hdfs-fast] Polars method failed ({e}), falling back to pandas")
        return read_hdfs_log(path, sample_fraction)

In [11]:
# Alternative: Memory-mapped reading for huge files
def read_hdfs_log_mmap(path: Path, sample_fraction: float = 0.5) -> pd.DataFrame:
    """Memory-mapped reading for very large files"""
    import mmap
    print(f"[hdfs-mmap] Reading {sample_fraction*100:.0f}% with memory mapping...")
    
    sampled_lines = []
    with open(path, 'rb') as f:  # Open in binary mode for mmap
        with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as mm:
            # Count lines first by reading the entire mmap and counting newlines
            total_lines = mm[:].count(b'\n')
            target_lines = int(total_lines * sample_fraction)
            step = max(1, total_lines // target_lines)
            
            print(f"[hdfs-mmap] Total lines: {total_lines:,}, sampling every {step} -> {target_lines:,}")
            
            # Reset to beginning and sample
            mm.seek(0)
            line_num = 0
            for line in iter(mm.readline, b""):
                if line_num % step == 0:
                    line_str = line.decode('utf-8', errors='ignore').strip()
                    if line_str:
                        sampled_lines.append(line_str)
                        if len(sampled_lines) >= target_lines:
                            break
                line_num += 1
    
    # Process sampled data
    df = pd.DataFrame({'raw': sampled_lines})
    
    # Vectorized timestamp parsing
    timestamp_parts = df['raw'].str[:13]
    df['timestamp'] = pd.to_datetime(
        '20' + timestamp_parts.str[:2] + '-' + 
        timestamp_parts.str[2:4] + '-' + 
        timestamp_parts.str[4:6] + ' ' +
        timestamp_parts.str[7:9] + ':' + 
        timestamp_parts.str[9:11] + ':' + 
        timestamp_parts.str[11:13],
        errors='coerce'
    )
    df['content'] = df['raw']
    
    result_df = df.dropna(subset=['timestamp']).reset_index(drop=True)
    print(f"[hdfs-mmap] Processed {len(result_df):,} valid records")
    return result_df

# Use the fastest available method
print("[loading] Choosing fastest loading method...")
try:
    # Try Polars first (fastest)
    hdfs_df = read_hdfs_log_fast(hdfs_log_path, sample_fraction=0.5)
except:
    try:
        # Fall back to memory mapping
        hdfs_df = read_hdfs_log_mmap(hdfs_log_path, sample_fraction=0.5)
    except:
        # Final fallback to pandas
        hdfs_df = read_hdfs_log(hdfs_log_path, sample_fraction=0.5)

openstack_df = read_openstack_logs(paths)
print(f'[dataset] HDFS records: {len(hdfs_df):,} | OpenStack records: {len(openstack_df):,}')

[loading] Choosing fastest loading method...
[hdfs-fast] Reading 50% of HDFS log with Polars...
[hdfs-fast] Processed 5,587,814 valid records (Polars)
[openstack] Reading normal logs...
[hdfs-fast] Processed 5,587,814 valid records (Polars)
[openstack] Reading normal logs...
[openstack] normal: 189,386 valid records
[openstack] Reading abnormal logs...
[openstack] abnormal: 18,434 valid records
[dataset] HDFS records: 5,587,814 | OpenStack records: 207,820
[openstack] normal: 189,386 valid records
[openstack] Reading abnormal logs...
[openstack] abnormal: 18,434 valid records
[dataset] HDFS records: 5,587,814 | OpenStack records: 207,820


In [12]:
display(hdfs_df[['timestamp','content']].head())
display(openstack_df[['timestamp','label','content']].head())

Unnamed: 0,timestamp,content
0,2008-11-10 12:12:08,081110 121208 31 INFO dfs.FSNamesystem: BLOCK*...
1,2008-11-11 02:32:38,081111 023238 19 INFO dfs.FSDataset: Deleting ...
2,2008-11-10 07:00:10,081110 070010 7424 INFO dfs.DataNode$DataXceiv...
3,2008-11-10 13:04:40,081110 130440 11588 WARN dfs.DataNode$DataXcei...
4,2008-11-10 22:06:43,081110 220643 32 INFO dfs.FSNamesystem: BLOCK*...


Unnamed: 0,timestamp,label,content
0,2017-05-14 19:39:01.445,1,nova-api.log.2017-05-14_21:27:04 2017-05-14 19...
1,2017-05-14 19:39:01.650,1,nova-api.log.2017-05-14_21:27:04 2017-05-14 19...
2,2017-05-14 19:39:02.007,1,nova-compute.log.2017-05-14_21:27:09 2017-05-1...
3,2017-05-14 19:39:02.924,1,nova-api.log.2017-05-14_21:27:04 2017-05-14 19...
4,2017-05-14 19:39:03.166,1,nova-compute.log.2017-05-14_21:27:09 2017-05-1...


## 4. Regex Normalization

In [13]:
import re

class LogNormalizer:
    def __init__(self, rules: Iterable[Dict[str, str]]):
        self.rules = [(rule['name'], re.compile(rule['pattern']), rule['replace']) for rule in rules]

    def apply(self, text: str) -> str:
        result = text
        for _, pattern, repl in self.rules:
            result = pattern.sub(repl, result)
        return result

    def normalize_series(self, series: pd.Series) -> pd.Series:
        return series.astype(str).apply(self.apply)

normalizer = LogNormalizer(data_config['normalizer']['rules'])

# For performance, let's first work with a sample to test
print("[normalization] Processing samples first for verification...")
hdfs_sample = hdfs_df.head(1000)  # Use first 1000 records for testing
openstack_sample = openstack_df.head(1000)

hdfs_sample_norm = normalizer.normalize_series(hdfs_sample['content'])
openstack_sample_norm = normalizer.normalize_series(openstack_sample['content'])

# Display preview with samples
preview = pd.DataFrame({
    'original': hdfs_sample['content'].head(5), 
    'normalized': hdfs_sample_norm.head(5)
})
print("HDFS normalization preview:")
display(preview)

print(f"\nSample sizes - HDFS: {len(hdfs_sample):,}, OpenStack: {len(openstack_sample):,}")
print("Ready to process full datasets. Continue to next cell when satisfied with preview.")

[normalization] Processing samples first for verification...
HDFS normalization preview:


Unnamed: 0,original,normalized
0,081110 121208 31 INFO dfs.FSNamesystem: BLOCK*...,<NUM_1e3> <NUM_1e3> <PORT> INFO dfs.FSNamesyst...
1,081111 023238 19 INFO dfs.FSDataset: Deleting ...,<NUM_1e3> <NUM_1e3> <PORT> INFO dfs.FSDataset:...
2,081110 070010 7424 INFO dfs.DataNode$DataXceiv...,<NUM_1e3> <NUM_1e3> <PORT> INFO dfs.DataNode$D...
3,081110 130440 11588 WARN dfs.DataNode$DataXcei...,<NUM_1e3> <NUM_1e3> <PORT> WARN dfs.DataNode$D...
4,081110 220643 32 INFO dfs.FSNamesystem: BLOCK*...,<NUM_1e3> <NUM_1e3> <PORT> INFO dfs.FSNamesyst...



Sample sizes - HDFS: 1,000, OpenStack: 1,000
Ready to process full datasets. Continue to next cell when satisfied with preview.


## 5. Template Mining with Drain3

In [20]:
drain_cfg = data_config['drain3']

# Configure Drain3 template miner directly (correct API)
template_config = TemplateMinerConfig()
template_config.drain_depth = drain_cfg['depth']
template_config.drain_sim_th = drain_cfg['st']  # similarity threshold
template_config.drain_max_children = drain_cfg['max_children']
template_config.drain_extra_delimiters = drain_cfg['extra_delimiters']

# Create TemplateMiner with config parameter (not template_config)
template_miner = TemplateMiner(config=template_config)

records = []
transition_counts = {}
ngram = drain_cfg.get('template_transition_ngram', 3)
print('[drain3] streaming normalized HDFS logs')
cluster_trace: List[str] = []

# First, we need to normalize the data before template mining
print("[drain3] Applying normalization to full dataset...")
hdfs_df['normalized'] = normalizer.normalize_series(hdfs_df['content'])

for row in tqdm(hdfs_df.to_dict('records'), desc='Drain3 HDFS'):
    result = template_miner.add_log_message(row['normalized'])
    cluster_id = result['cluster_id']
    records.append({
        'timestamp': row['timestamp'],
        'template_id': cluster_id,
        'template': result['template_mined'],
        'content': row['normalized']
    })
    # Convert cluster_id to string for transition tracking
    cluster_trace.append(str(cluster_id))

for idx in range(len(cluster_trace) - ngram + 1):
    key = tuple(cluster_trace[idx: idx + ngram])
    transition_counts[key] = transition_counts.get(key, 0) + 1

transition_output = Path(drain_cfg['transition_output'])
transition_output.parent.mkdir(parents=True, exist_ok=True)

template_df = pd.DataFrame(records)
index_path = transition_output.with_name('template_index.parquet')
template_df.to_parquet(index_path, index=False)

if transition_counts:
    transition_df = pd.DataFrame([
        {'ngram': '->'.join(key), 'count': count} for key, count in transition_counts.items()
    ])
    transition_df.to_parquet(transition_output, index=False)
    display(transition_df.sort_values('count', ascending=False).head(10))
else:
    print('[drain3] no transitions recorded; dataset may be small')

[drain3] streaming normalized HDFS logs
[drain3] Applying normalization to full dataset...


Drain3 HDFS:   0%|          | 0/5587814 [00:00<?, ?it/s]

Unnamed: 0,ngram,count
221,6->6->6,20608
117,6->8->6,20591
490,8->6->6,20490
1058,19->19->19,20404
137,7->6->6,20401
130,6->6->7,20377
375,6->6->8,20364
1031,6->19->19,20360
981,6->6->19,20328
292,7->7->6,20308


## 6. Tokenizer with Special Tokens

In [21]:
tokenizer_cfg = data_config['tokenizer']
tokenizer = AutoTokenizer.from_pretrained(tokenizer_cfg['base_model'], use_fast=True)
special_tokens = data_config['tokens']['special']
added = tokenizer.add_special_tokens({'additional_special_tokens': special_tokens})
TOKENIZER_DIR.mkdir(parents=True, exist_ok=True)
tokenizer.save_pretrained(TOKENIZER_DIR)
print(f'[tokenizer] added {added} special tokens and saved to {TOKENIZER_DIR}')
print('sample tokens:', tokenizer.tokenize(hdfs_df['normalized'].iloc[0])[:30])



[tokenizer] added 9 special tokens and saved to artifacts/tokenizer
sample tokens: ['<NUM_1e3>', '<NUM_1e3>', '<PORT>', 'info', 'd', '##fs', '.', 'f', '##s', '##name', '##sy', '##ste', '##m', ':', 'block', '*', 'names', '##yst', '##em', '.', 'adds', '##tore', '##db', '##lock', ':', 'block', '##ma', '##p', 'updated', ':']


## 7. Build Tokenized Parquet Splits

In [8]:
# Load existing processed data efficiently to avoid memory issues
from datasets import Dataset
from transformers import AutoTokenizer
import gc

print("[setup] Loading existing artifacts...")

# Load tokenizer
tokenizer = AutoTokenizer.from_pretrained(TOKENIZER_DIR, use_fast=True)
print(f"[tokenizer] Loaded from {TOKENIZER_DIR}")

# Load template index (processed HDFS data with template_ids)
template_df = pd.read_parquet(ARTIFACTS_DIR / 'drain3' / 'template_index.parquet')
print(f"[template] Loaded {len(template_df):,} template-indexed records")

# Load HDFS data efficiently in chunks to avoid memory issues
def load_hdfs_efficiently():
    """Load only the essential HDFS data we need for tokenization"""
    print("[hdfs] Loading HDFS data efficiently...")
    
    # We already have processed template data, so we just need the basic structure
    hdfs_essential = template_df[['timestamp', 'template_id', 'content']].copy()
    
    # Add normalized content (reapply normalization if needed)
    print("[hdfs] Applying normalization...")
    
    # Load normalizer rules
    normalizer_rules = data_config['normalizer']['rules']
    import re
    
    class LogNormalizer:
        def __init__(self, rules):
            self.rules = [(rule['name'], re.compile(rule['pattern']), rule['replace']) for rule in rules]

        def apply(self, text: str) -> str:
            result = text
            for _, pattern, repl in self.rules:
                result = pattern.sub(repl, result)
            return result

    normalizer = LogNormalizer(normalizer_rules)
    
    # Apply normalization in chunks to avoid memory issues
    chunk_size = 10000
    normalized_chunks = []
    
    for i in range(0, len(hdfs_essential), chunk_size):
        chunk = hdfs_essential.iloc[i:i+chunk_size]
        chunk_norm = chunk['content'].apply(normalizer.apply)
        normalized_chunks.append(chunk_norm)
        
        if (i // chunk_size + 1) % 10 == 0:
            print(f"  Processed {i+len(chunk):,}/{len(hdfs_essential):,} records...")
    
    hdfs_essential['normalized'] = pd.concat(normalized_chunks, ignore_index=True)
    return hdfs_essential

hdfs_df = load_hdfs_efficiently()

def tokenize_dataframe_efficient(df: pd.DataFrame, max_length: int, batch_size: int = 1000) -> Tuple[pd.DataFrame, float]:
    """Memory-efficient tokenization with batching"""
    print(f"[tokenizer] Processing {len(df):,} records in batches of {batch_size:,}")
    
    all_input_ids = []
    all_attention_masks = []
    truncated_count = 0
    
    # Process in batches to avoid memory issues
    for i in range(0, len(df), batch_size):
        batch_df = df.iloc[i:i+batch_size]
        batch_texts = list(batch_df['normalized'])
        
        # Tokenize batch
        encodings = tokenizer(
            batch_texts,
            padding=False,
            truncation=True,
            max_length=max_length,
            return_attention_mask=True
        )
        
        # Count truncated sequences in this batch
        batch_truncated = sum(len(ids) == max_length for ids in encodings['input_ids'])
        truncated_count += batch_truncated
        
        # Collect results
        all_input_ids.extend(encodings['input_ids'])
        all_attention_masks.extend(encodings['attention_mask'])
        
        # Progress update
        if (i // batch_size + 1) % 10 == 0:
            print(f"  Tokenized {i+len(batch_df):,}/{len(df):,} records...")
        
        # Clean up batch to free memory
        del encodings, batch_texts
        gc.collect()
    
    # Create result DataFrame
    tokens = pd.DataFrame({
        'input_ids': all_input_ids,
        'attention_mask': all_attention_masks,
        'labels': all_input_ids.copy(),  # For MLM, labels = input_ids
    })
    
    # Add template_id if available
    if 'template_id' in df.columns:
        tokens['template_id'] = df['template_id'].values
    else:
        tokens['template_id'] = ''  # Empty string for datasets without templates
    
    # Add other columns
    if 'label' in df.columns:
        tokens['anomaly_label'] = df['label'].astype(int).values
    else:
        tokens['anomaly_label'] = 0
    
    tokens['timestamp'] = df['timestamp'].values
    tokens['raw'] = df['content'].values
    tokens['normalized'] = df['normalized'].values
    
    trunc_rate = truncated_count / max(len(df), 1)
    print(f"[tokenizer] Completed. Truncation rate: {trunc_rate:.4f}")
    
    return tokens, trunc_rate

def time_splits(df: pd.DataFrame, splits_cfg: Dict[str, float]) -> Dict[str, pd.DataFrame]:
    """Create time-based splits"""
    df_sorted = df.sort_values('timestamp').reset_index(drop=True)
    n = len(df_sorted)
    train_end = int(n * splits_cfg['train'])
    val_end = train_end + int(n * splits_cfg['val'])
    return {
        'train': df_sorted.iloc[:train_end],
        'val': df_sorted.iloc[train_end:val_end],
        'test': df_sorted.iloc[val_end:]
    }

print("[ready] All functions loaded. Ready to process datasets...")

Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.


[setup] Loading existing artifacts...
[tokenizer] Loaded from artifacts/tokenizer
[template] Loaded 5,587,814 template-indexed records
[hdfs] Loading HDFS data efficiently...
[hdfs] Applying normalization...
[template] Loaded 5,587,814 template-indexed records
[hdfs] Loading HDFS data efficiently...
[hdfs] Applying normalization...
  Processed 100,000/5,587,814 records...
  Processed 100,000/5,587,814 records...
  Processed 200,000/5,587,814 records...
  Processed 200,000/5,587,814 records...
  Processed 300,000/5,587,814 records...
  Processed 300,000/5,587,814 records...
  Processed 400,000/5,587,814 records...
  Processed 400,000/5,587,814 records...
  Processed 500,000/5,587,814 records...
  Processed 500,000/5,587,814 records...
  Processed 600,000/5,587,814 records...
  Processed 600,000/5,587,814 records...
  Processed 700,000/5,587,814 records...
  Processed 700,000/5,587,814 records...
  Processed 800,000/5,587,814 records...
  Processed 800,000/5,587,814 records...
  Processe

In [6]:
# Create HDFS dataset splits with memory-efficient processing
print("[splits] Creating HDFS time-based splits...")

# Create time-based splits
splits_cfg = data_config['splits']
hdfs_splits = time_splits(hdfs_df, splits_cfg)

print(f"[splits] Created splits:")
for name, split_df in hdfs_splits.items():
    print(f"  {name}: {len(split_df):,} records ({len(split_df)/len(hdfs_df)*100:.1f}%)")

# Process each split separately to avoid memory issues
hdfs_stats = {}
max_length = data_config['tokenizer']['max_length']
batch_size = 2000  # Smaller batch size for memory efficiency

for split_name, split_df in hdfs_splits.items():
    print(f"\n[processing] Starting {split_name} split ({len(split_df):,} records)...")
    
    # Tokenize with batching
    tokens_df, trunc = tokenize_dataframe_efficient(split_df, max_length, batch_size)
    
    # Save Parquet file
    file_path = PARQUET_DIR / f'hdfs_{split_name}.parquet'
    tokens_df.to_parquet(file_path, index=False)
    print(f"[save] Parquet saved: {file_path}")
    
    # Create HuggingFace dataset (remove large text columns to save memory)
    hf_df = tokens_df.drop(columns=['raw', 'normalized']).copy()
    ds = Dataset.from_pandas(hf_df, preserve_index=False)
    hf_path = str(PARQUET_DIR / f'hdfs_{split_name}_hf')
    ds.save_to_disk(hf_path)
    print(f"[save] HuggingFace dataset saved: {hf_path}")
    
    # Collect statistics
    hdfs_stats[split_name] = {
        'count': len(tokens_df),
        'avg_length': float(tokens_df['input_ids'].map(len).mean()),
        'truncation_rate': round(trunc, 4)
    }
    
    print(f"[stats] {split_name}: {hdfs_stats[split_name]}")
    
    # Clean up to free memory
    del tokens_df, hf_df, ds, split_df
    gc.collect()

print(f"\n[complete] All HDFS splits processed successfully!")
display(pd.DataFrame(hdfs_stats).T)

[splits] Creating HDFS time-based splits...
[splits] Created splits:
  train: 4,470,251 records (80.0%)
  val: 558,781 records (10.0%)
  test: 558,782 records (10.0%)

[processing] Starting train split (4,470,251 records)...
[tokenizer] Processing 4,470,251 records in batches of 2,000
  Tokenized 20,000/4,470,251 records...
  Tokenized 40,000/4,470,251 records...
  Tokenized 60,000/4,470,251 records...
  Tokenized 80,000/4,470,251 records...
  Tokenized 100,000/4,470,251 records...
  Tokenized 120,000/4,470,251 records...
  Tokenized 140,000/4,470,251 records...
  Tokenized 160,000/4,470,251 records...
  Tokenized 180,000/4,470,251 records...
  Tokenized 200,000/4,470,251 records...
  Tokenized 220,000/4,470,251 records...
  Tokenized 240,000/4,470,251 records...
  Tokenized 260,000/4,470,251 records...
  Tokenized 280,000/4,470,251 records...
  Tokenized 300,000/4,470,251 records...
  Tokenized 320,000/4,470,251 records...
  Tokenized 340,000/4,470,251 records...
  Tokenized 360,000/4

Saving the dataset (0/10 shards):   0%|          | 0/4470251 [00:00<?, ? examples/s]

[save] HuggingFace dataset saved: artifacts/datasets/hdfs_train_hf
[stats] train: {'count': 4470251, 'avg_length': 44.4681468669209, 'truncation_rate': 0.0}

[processing] Starting val split (558,781 records)...
[tokenizer] Processing 558,781 records in batches of 2,000
  Tokenized 20,000/558,781 records...
  Tokenized 40,000/558,781 records...
  Tokenized 60,000/558,781 records...
  Tokenized 80,000/558,781 records...
  Tokenized 100,000/558,781 records...
  Tokenized 120,000/558,781 records...
  Tokenized 140,000/558,781 records...
  Tokenized 160,000/558,781 records...
  Tokenized 180,000/558,781 records...
  Tokenized 200,000/558,781 records...
  Tokenized 220,000/558,781 records...
  Tokenized 240,000/558,781 records...
  Tokenized 260,000/558,781 records...
  Tokenized 280,000/558,781 records...
  Tokenized 300,000/558,781 records...
  Tokenized 320,000/558,781 records...
  Tokenized 340,000/558,781 records...
  Tokenized 360,000/558,781 records...
  Tokenized 380,000/558,781 reco

Saving the dataset (0/2 shards):   0%|          | 0/558781 [00:00<?, ? examples/s]

[save] HuggingFace dataset saved: artifacts/datasets/hdfs_val_hf
[stats] val: {'count': 558781, 'avg_length': 46.11112940490103, 'truncation_rate': 0.0}

[processing] Starting test split (558,782 records)...
[tokenizer] Processing 558,782 records in batches of 2,000
  Tokenized 20,000/558,782 records...
  Tokenized 40,000/558,782 records...
  Tokenized 60,000/558,782 records...
  Tokenized 80,000/558,782 records...
  Tokenized 100,000/558,782 records...
  Tokenized 120,000/558,782 records...
  Tokenized 140,000/558,782 records...
  Tokenized 160,000/558,782 records...
  Tokenized 180,000/558,782 records...
  Tokenized 200,000/558,782 records...
  Tokenized 220,000/558,782 records...
  Tokenized 240,000/558,782 records...
  Tokenized 260,000/558,782 records...
  Tokenized 280,000/558,782 records...
  Tokenized 300,000/558,782 records...
  Tokenized 320,000/558,782 records...
  Tokenized 340,000/558,782 records...
  Tokenized 360,000/558,782 records...
  Tokenized 380,000/558,782 records

Saving the dataset (0/2 shards):   0%|          | 0/558782 [00:00<?, ? examples/s]

[save] HuggingFace dataset saved: artifacts/datasets/hdfs_test_hf
[stats] test: {'count': 558782, 'avg_length': 44.91538918576475, 'truncation_rate': 0.0}

[complete] All HDFS splits processed successfully!


Unnamed: 0,count,avg_length,truncation_rate
train,4470251.0,44.468147,0.0
val,558781.0,46.111129,0.0
test,558782.0,44.915389,0.0


In [9]:
# Load and process OpenStack data efficiently
print("[openstack] Loading OpenStack logs...")

# Load OpenStack data
def read_openstack_logs() -> pd.DataFrame:
    """Load OpenStack logs efficiently"""
    # Get paths from config
    normal_path = RAW_OPENSTACK_DIR / 'openstack_normal.log'
    abnormal_path = RAW_OPENSTACK_DIR / 'openstack_abnormal.log'
    
    frames = []
    paths = {'normal': normal_path, 'abnormal': abnormal_path}
    
    for label, path in paths.items():
        print(f"[openstack] Reading {label} logs from {path}...")
        
        # Read the log file as plain text lines
        with open(path, 'r', encoding='utf-8', errors='ignore') as f:
            lines = [line.strip() for line in f if line.strip()]
        
        df = pd.DataFrame({'raw': lines})
        df['timestamp'] = pd.to_datetime(df['raw'].str.extract(r'(\d{4}-\d{2}-\d{2} [^ ]+)')[0], errors='coerce')
        df['label'] = 1 if label == 'abnormal' else 0
        df['content'] = df['raw']
        valid_df = df.dropna(subset=['timestamp'])
        print(f"[openstack] {label}: {len(valid_df):,} valid records")
        frames.append(valid_df)
    
    result = pd.concat(frames, ignore_index=True).sort_values('timestamp').reset_index(drop=True)
    print(f"[openstack] Total: {len(result):,} records")
    return result

# Load OpenStack data
openstack_df = read_openstack_logs()

# Apply normalization to OpenStack data
print("[openstack] Applying normalization...")
normalizer_rules = data_config['normalizer']['rules']
import re

class LogNormalizer:
    def __init__(self, rules):
        self.rules = [(rule['name'], re.compile(rule['pattern']), rule['replace']) for rule in rules]

    def apply(self, text: str) -> str:
        result = text
        for _, pattern, repl in self.rules:
            result = pattern.sub(repl, result)
        return result

normalizer = LogNormalizer(normalizer_rules)
openstack_df['normalized'] = openstack_df['content'].apply(normalizer.apply)

# Create time-based splits
print("[openstack] Creating time-based splits...")
openstack_splits = time_splits(openstack_df, splits_cfg)

print(f"[openstack] Split sizes:")
for name, split_df in openstack_splits.items():
    print(f"  {name}: {len(split_df):,} records ({len(split_df)/len(openstack_df)*100:.1f}%)")

# Process each split with memory-efficient tokenization
openstack_stats = {}
max_length = data_config['tokenizer']['max_length']
batch_size = 2000  # Same batch size as HDFS

for split_name, split_df in openstack_splits.items():
    print(f"\n[processing] Starting OpenStack {split_name} split ({len(split_df):,} records)...")
    
    # Tokenize with batching
    tokens_df, trunc = tokenize_dataframe_efficient(split_df, max_length, batch_size)
    
    # Save Parquet file
    file_path = PARQUET_DIR / f'openstack_{split_name}.parquet'
    tokens_df.to_parquet(file_path, index=False)
    print(f"[save] Parquet saved: {file_path}")
    
    # Create HuggingFace dataset
    hf_df = tokens_df.drop(columns=['raw', 'normalized']).copy()
    ds = Dataset.from_pandas(hf_df, preserve_index=False)
    hf_path = str(PARQUET_DIR / f'openstack_{split_name}_hf')
    ds.save_to_disk(hf_path)
    print(f"[save] HuggingFace dataset saved: {hf_path}")
    
    # Collect statistics
    openstack_stats[split_name] = {
        'count': len(tokens_df),
        'avg_length': float(tokens_df['input_ids'].map(len).mean()),
        'truncation_rate': round(trunc, 4),
        'anomaly_rate': float(tokens_df['anomaly_label'].mean()) if 'anomaly_label' in tokens_df.columns else 0.0
    }
    
    print(f"[stats] {split_name}: {openstack_stats[split_name]}")
    
    # Clean up to free memory
    del tokens_df, hf_df, ds, split_df
    gc.collect()

print(f"\n[complete] All OpenStack splits processed successfully!")
display(pd.DataFrame(openstack_stats).T)

[openstack] Loading OpenStack logs...
[openstack] Reading normal logs from data/openstack/raw/openstack_normal.log...
[openstack] normal: 189,386 valid records
[openstack] Reading abnormal logs from data/openstack/raw/openstack_abnormal.log...
[openstack] abnormal: 18,434 valid records
[openstack] Total: 207,820 records
[openstack] Applying normalization...
[openstack] normal: 189,386 valid records
[openstack] Reading abnormal logs from data/openstack/raw/openstack_abnormal.log...
[openstack] abnormal: 18,434 valid records
[openstack] Total: 207,820 records
[openstack] Applying normalization...
[openstack] Creating time-based splits...
[openstack] Split sizes:
  train: 166,256 records (80.0%)
  val: 20,782 records (10.0%)
  test: 20,782 records (10.0%)

[processing] Starting OpenStack train split (166,256 records)...
[tokenizer] Processing 166,256 records in batches of 2,000
[openstack] Creating time-based splits...
[openstack] Split sizes:
  train: 166,256 records (80.0%)
  val: 20,78

Saving the dataset (0/1 shards):   0%|          | 0/166256 [00:00<?, ? examples/s]

[save] HuggingFace dataset saved: artifacts/datasets/openstack_train_hf
[stats] train: {'count': 166256, 'avg_length': 81.68753007410258, 'truncation_rate': 0.0, 'anomaly_rate': 0.1108772014243095}

[processing] Starting OpenStack val split (20,782 records)...
[tokenizer] Processing 20,782 records in batches of 2,000

[processing] Starting OpenStack val split (20,782 records)...
[tokenizer] Processing 20,782 records in batches of 2,000
  Tokenized 20,000/20,782 records...
  Tokenized 20,000/20,782 records...
[tokenizer] Completed. Truncation rate: 0.0000
[save] Parquet saved: artifacts/datasets/openstack_val.parquet
[tokenizer] Completed. Truncation rate: 0.0000
[save] Parquet saved: artifacts/datasets/openstack_val.parquet


Saving the dataset (0/1 shards):   0%|          | 0/20782 [00:00<?, ? examples/s]

[save] HuggingFace dataset saved: artifacts/datasets/openstack_val_hf
[stats] val: {'count': 20782, 'avg_length': 81.9116543162352, 'truncation_rate': 0.0, 'anomaly_rate': 0.0}

[processing] Starting OpenStack test split (20,782 records)...
[tokenizer] Processing 20,782 records in batches of 2,000

[processing] Starting OpenStack test split (20,782 records)...
[tokenizer] Processing 20,782 records in batches of 2,000
  Tokenized 20,000/20,782 records...
  Tokenized 20,000/20,782 records...
[tokenizer] Completed. Truncation rate: 0.0000
[save] Parquet saved: artifacts/datasets/openstack_test.parquet
[tokenizer] Completed. Truncation rate: 0.0000
[save] Parquet saved: artifacts/datasets/openstack_test.parquet


Saving the dataset (0/1 shards):   0%|          | 0/20782 [00:00<?, ? examples/s]

[save] HuggingFace dataset saved: artifacts/datasets/openstack_test_hf
[stats] test: {'count': 20782, 'avg_length': 83.08338947165817, 'truncation_rate': 0.0, 'anomaly_rate': 0.0}

[complete] All OpenStack splits processed successfully!

[complete] All OpenStack splits processed successfully!


Unnamed: 0,count,avg_length,truncation_rate,anomaly_rate
train,166256.0,81.68753,0.0,0.110877
val,20782.0,81.911654,0.0,0.0
test,20782.0,83.083389,0.0,0.0


## 8. Persist Metadata

In [None]:
metadata = {
    'generated_at': datetime.utcnow().isoformat() + 'Z',
    'hdfs': hdfs_stats,
    'openstack': openstack_stats,
    'tokenizer_dir': str(TOKENIZER_DIR),
    'template_index': str(Path(data_config['drain3']['transition_output']).with_name('template_index.parquet')),
    'template_transition': str(Path(data_config['drain3']['transition_output']))
}
METADATA_PATH.write_text(json.dumps(metadata, indent=2))
print(f'[metadata] saved to {METADATA_PATH}')

[metadata] saved to artifacts/metadata/datasets.json


: 

## Artifacts Produced
- Tokenizer with special tokens -> `artifacts/tokenizer/`
- HDFS Parquet splits and HF datasets -> `artifacts/datasets/hdfs_*.parquet`, `*_hf/`
- OpenStack Parquet splits and HF datasets -> `artifacts/datasets/openstack_*.parquet`, `*_hf/`
- Drain3 template index and transitions -> `artifacts/drain3/`
- Dataset metadata summary -> `artifacts/metadata/datasets.json`

Continue with notebook `01_pretrain_hdfs.ipynb` for multi-GPU MLM pretraining.