# 02 - Preprocessing & Feature Engineering

This notebook covers the full pipeline: parsing, templating, temporal feature extraction, and scaling.

### Pipeline Steps:
1. **Streaming Log Parsing**: Handle HDFS v1 at scale.
2. **Log Key Discovery**: Template events to reduce dimensionality.
3. **Temporal Engineering**: Extract session durations from timestamps.
4. **Feature Normalization**: Apply `StandardScaler` to prepare for ML.

In [1]:
import pandas as pd
import os
from sys import path

# Add the src directory to system path to import the local preprocess module
path.append('../src')
import preprocess 

# Configuration for data files
LOG_FILE = '../data/HDFS.log'
LABEL_FILE = '../data/anomaly_label.csv'
OUTPUT_FILE = '../data/X_features.csv'

## 1. Execution of the Pipeline

The function `parse_hdfs_logs` now includes:
- **Session Duration calculation** (end_time - start_time per block).
- **Feature Scaling** using `sklearn.preprocessing.StandardScaler`.

In [2]:
# Verify log file exists before starting the heavy parsing process
if os.path.exists(LOG_FILE):
    # Process the 11 million log lines (Parsing -> Templating -> Temporal Features -> Scaling)
    df_final = preprocess.parse_hdfs_logs(LOG_FILE, LABEL_FILE)
    
    # Save the cleaned, scaled, and labeled dataset for EDA and Modeling
    df_final.to_csv(OUTPUT_FILE)
    print(f'\nSuccess: Dataset ready for ML saved at: {OUTPUT_FILE}')
    
    # Display the first few rows of the processed feature matrix
    display(df_final.head())
else:
    print('Error: Data file not found. Please ensure HDFS.log is in the data folder.')

Reading log file: ../data/HDFS.log
Processed 1000000 lines...
Processed 2000000 lines...
Processed 3000000 lines...
Processed 4000000 lines...
Processed 5000000 lines...
Processed 6000000 lines...
Processed 7000000 lines...
Processed 8000000 lines...
Processed 9000000 lines...
Processed 10000000 lines...
Processed 11000000 lines...
Creating Feature Matrix (Crosstab)...
Calculating temporal features (Session Duration)...
Applying Normalization (StandardScaler)...
Dataset ready for ML saved at: ../data/X_features.csv


Event,Adding an already existing block BLOCK_ID,BLOCK* NameSystem.addStoredBlock: Redundant addStoredBlock request received for BLOCK_ID on NUM.NUM.NUM.NUM:NUM size NUM,BLOCK* NameSystem.addStoredBlock: addStoredBlock request received for BLOCK_ID on NUM.NUM.NUM.NUM:NUM size NUM But it does not belong to any file.,BLOCK* NameSystem.addStoredBlock: blockMap updated: NUM.NUM.NUM.NUM:NUM is added to BLOCK_ID size NUM,BLOCK* NameSystem.allocateBlock: /mnt/hadoop/mapred/system/job_NUM_NUM/job.jar. BLOCK_ID,BLOCK* NameSystem.allocateBlock: /mnt/hadoop/mapred/system/job_NUM_NUM/job.split. BLOCK_ID,BLOCK* NameSystem.allocateBlock: /mnt/hadoop/mapred/system/job_NUM_NUM/job.xml. BLOCK_ID,BLOCK* NameSystem.allocateBlock: /user/root/grep-temp-NUM/_logs/history/ip-NUM-NUM-NUM-NUM.ecNUM.internal_NUM_job_NUM_NUM_conf.xml. BLOCK_ID,BLOCK* NameSystem.allocateBlock: /user/root/grep-temp-NUM/_logs/history/ip-NUM-NUM-NUM-NUM.ecNUM.internal_NUM_job_NUM_NUM_root_grep-search. BLOCK_ID,BLOCK* NameSystem.allocateBlock: /user/root/grep-temp-NUM/_temporary/_task_NUM_NUM_r_NUM_NUM/part-NUM. BLOCK_ID,...,writeBlock BLOCK_ID received exception java.io.IOException: Connection reset by peer,writeBlock BLOCK_ID received exception java.io.IOException: Could not read from stream,writeBlock BLOCK_ID received exception java.io.IOException: Interrupted receiveBlock,writeBlock BLOCK_ID received exception java.io.InterruptedIOException: Interruped while waiting for IO on channel java.nio.channels.SocketChannel[connected local=IP_ADDR remote=IP_ADDR]. NUM millis timeout left.,writeBlock BLOCK_ID received exception java.net.NoRouteToHostException: No route to host,writeBlock BLOCK_ID received exception java.net.SocketTimeoutException,writeBlock BLOCK_ID received exception java.net.SocketTimeoutException: NUM millis timeout while waiting for channel to be ready for read. ch : java.nio.channels.SocketChannel[connected local=IP_ADDR remote=IP_ADDR],writeBlock BLOCK_ID received exception java.net.SocketTimeoutException: NUM millis timeout while waiting for channel to be ready for write. ch : java.nio.channels.SocketChannel[connected local=IP_ADDR remote=IP_ADDR],writeBlock BLOCK_ID received exception java.nio.channels.ClosedByInterruptException,SessionDuration
BlockId,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
blk_-1000002529962039464,-0.002949,-0.041169,-0.045836,0.023158,-0.006594,-0.00646,-0.00646,-0.002284,-0.002284,-0.001865,...,-0.008991,-0.07511,-0.004023,-0.003489,-0.001319,-0.004755,-0.00417,-0.002949,-0.003489,-0.938474
blk_-100000266894974466,-0.002949,-0.041169,-0.045836,0.023158,-0.006594,-0.00646,-0.00646,-0.002284,-0.002284,-0.001865,...,-0.008991,-0.07511,-0.004023,-0.003489,-0.001319,-0.004755,-0.00417,-0.002949,-0.003489,0.792114
blk_-1000007292892887521,-0.002949,-0.041169,-0.045836,0.023158,-0.006594,-0.00646,-0.00646,-0.002284,-0.002284,-0.001865,...,-0.008991,-0.07511,-0.004023,-0.003489,-0.001319,-0.004755,-0.00417,-0.002949,-0.003489,-0.93853
blk_-1000014584150379967,-0.002949,-0.041169,-0.045836,0.023158,-0.006594,-0.00646,-0.00646,-0.002284,-0.002284,-0.001865,...,-0.008991,-0.07511,-0.004023,-0.003489,-0.001319,-0.004755,-0.00417,-0.002949,-0.003489,0.820179
blk_-1000028658773048709,-0.002949,-0.041169,-0.045836,0.023158,-0.006594,-0.00646,-0.00646,-0.002284,-0.002284,-0.001865,...,-0.008991,-0.07511,-0.004023,-0.003489,-0.001319,-0.004755,-0.00417,-0.002949,-0.003489,0.407477
