In [None]:
import kagglehub
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import tensorflow as tf
import keras
import jinja2
import os
from pathlib import Path
from dotenv import load_dotenv
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler, StandardScaler, RobustScaler
from sklearn.ensemble import IsolationForest

from drain3 import TemplateMiner
from drain3.template_miner_config import TemplateMinerConfig
from drain3.file_persistence import FilePersistence


In [None]:
# path = kagglehub.dataset_download("boltzmannbrain/nab")
# print("Path to dataset files: ", path)

In [None]:
load_dotenv()

In [None]:
# pd.read_csv(r"C:\Users\Martin James\.cache\kagglehub\datasets\boltzmannbrain\nab\versions\1\realAWSCloudwatch\realAWSCloudwatch\")
AWS_dir_path = Path(os.getenv("AWS_DIR_PATH"))
# all_AWS_paths = [os.path.join(AWS_dir_path, file) for file in os.listdir(AWS_dir_path) if file.endswith(".csv")]
all_AWS_paths = [AWS_dir_path / file for file in os.listdir(str(AWS_dir_path)) if file.endswith(".csv")]

# for my_path in all_AWS_paths:
#     print(my_path.name)

In [None]:
AWS_name_gen = (x.name for x in all_AWS_paths)
all_AWS_df_list = [pd.read_csv(my_path) for my_path in all_AWS_paths]

for df in all_AWS_df_list:
    std_scaler = StandardScaler()
    rbst_scaler = RobustScaler()
    df['timestamp'] = pd.to_datetime(df['timestamp'])
    df['std_value'] = std_scaler.fit_transform(df[['value']]) # reminder that double brackets returns a df, not a series
    # df['rbst_value'] = rbst_scaler.fit_transform(df[['value']])
    # df.style.set_caption(next(AWS_name_gen, "somethingWentWrong"))
    
    display(df.tail(2).style.set_caption(next(AWS_name_gen, "somethingWentWrong")))

In [None]:
inp_df = all_AWS_df_list[0]

In [None]:
def run_iso(my_df, window=5, contamination="auto"):
    # window = 5
    X = []

    for i in range(len(my_df) - window):
        X.append(my_df['std_value'].iloc[i:i+window].values)

    X = np.array(X)

    iso = IsolationForest(contamination=contamination, random_state=1)
    y_pred = iso.fit_predict(X)

    # align predictions with dataframe
    my_df['anomaly_window'] = 0
    my_df.loc[window-1:, 'anomaly_window'] = np.append((y_pred == -1).astype(int), 1) # fixing the minor misalignment


In [None]:
run_iso(inp_df, window=7)
# inp_df

In [None]:
# inp_df.describe()
inp_df[inp_df["anomaly_window"] == 1].head()

In [None]:
plt.figure(figsize=(12,6))
plt.plot(inp_df['timestamp'], inp_df['std_value'], label='Standard-scaled Value', color='blue', zorder=1)

# Optional: highlight anomalies if you have them
anomalies = inp_df[inp_df['anomaly_window'] == 1]
plt.scatter(anomalies['timestamp'], anomalies['std_value'], 
            color='red', label='Anomaly', marker='x', zorder=2)

plt.xlabel("Timestamp")
plt.ylabel("Standard-scaled value")
plt.legend()
plt.grid(True)
plt.show()


## MVAD dataset

In [None]:
merged_df = None

In [None]:
MVAD_csv_path = Path(os.getenv("MVAD_CSV_PATH"))

if merged_df == None:
    merged_df = pd.read_csv(MVAD_csv_path)
# merged_df.head()



In [None]:
merged_df.describe()

In [None]:
merged_df["timestamp"] = pd.to_datetime(merged_df["timestamp"])
merged_df.head()

In [None]:
# merged_df[merged_df['anomaly']==nan].count()
# merged_df[(merged_df['anomaly'].isna()) & (merged_df["identifier"]=="pc2")]
pc2_df = merged_df[(merged_df["identifier"]=="pc2")]
pc2_df.head()

In [None]:
pc1_df = merged_df[merged_df["identifier"] == "pc1"]
pc1_df.describe()

In [None]:
std_scaler_pc1 = StandardScaler()
scalable_columns = ["cpu_temperature", "cpu_usage", "cpu_load", "memory_usage", "battery_level", "cpu_power"]
pc1_scaled = pd.DataFrame(std_scaler_pc1.fit_transform(pc1_df[scalable_columns]), columns=scalable_columns, index=pc1_df.index)
pc1_scaled["timestamp"] = pc1_df["timestamp"]
pc1_scaled["anomaly"] = pc1_df["anomaly"]


In [None]:
reordered_columns = ["timestamp"]
reordered_columns.extend(scalable_columns)
reordered_columns.extend(["anomaly"])
# reordered_columns
pc1_scaled = pc1_scaled[reordered_columns]
# pc1_scaled.head(2)

In [None]:
display(pc1_scaled.head())
pc1_scaled.describe()

In [None]:
plt.figure(figsize=(12,6))
plt.plot(pc1_scaled['timestamp'], pc1_scaled['cpu_temperature'], label='Standard-scaled Value of cpu_temperature', color='blue', zorder=1)

# Optional: highlight anomalies if you have them
# anomalies = inp_df[inp_df['anomaly_window'] == 1]
# plt.scatter(anomalies['timestamp'], anomalies['std_value'], 
#             color='red', label='Anomaly', marker='x', zorder=2)

plt.xlabel("Timestamp")
plt.ylabel("Standard-scaled value")
# plt.legend()
plt.grid(True)
plt.show()

In [None]:
small_slice = pc1_df[pc1_df["timestamp"] < pd.Timestamp("2024-12-02")]
# small_slice.count()

In [None]:
plt.figure(figsize=(12,6))
plt.plot(small_slice['timestamp'], small_slice['cpu_temperature'], label='Value of cpu_temperature', color='red', zorder=1)

# Optional: highlight anomalies if you have them
# anomalies = inp_df[inp_df['anomaly_window'] == 1]
# plt.scatter(anomalies['timestamp'], anomalies['std_value'], 
#             color='red', label='Anomaly', marker='x', zorder=2)

plt.xlabel("Timestamp")
plt.ylabel("Unscaled value")
# plt.legend()
plt.grid(True)
plt.show()

In [None]:
small_slice_pc2 = pc2_df[pc2_df["timestamp"] < pd.Timestamp("2024-11-30")]

In [None]:
plt.figure(figsize=(12,6))
plt.plot(small_slice_pc2['timestamp'], small_slice_pc2['cpu_temperature'], label='Value of cpu_temperature', color='red', zorder=1)

plt.xlabel("Timestamp")
plt.ylabel("Unscaled value")
# plt.legend()
plt.grid(True)
plt.show()

## After clarification of input data, log-parsing attempt


In [None]:
import re
log_pattern = re.compile(r'^(?P<proc>\S+)\[(?P<pid>\d+)\]:\s+(?P<rest>.*)$') # (?P<ourCustomName>...) is a way in python 
                                                                         # regex to get named capturing groups  

config = TemplateMinerConfig()
persistence = FilePersistence("drain3_state.bin")
template_miner = TemplateMiner(persistence, config)


In [None]:
with open("exampleLog.txt", "r") as f:
    for line in f:
        # print(line.strip())
        parts = line.split(maxsplit=4)  

        date = f"{parts[0]} {parts[1]}"
        time = parts[2]
        node = parts[3]

        # Step 2: regex for proc/pid/message

        match = log_pattern.match(parts[4])

        if match:
            msg = match.group("rest") 
            result = template_miner.add_log_message(msg)
            print("Template:", result["template_mined"])
            parsed = {
                "date": date,
                "time": time,
                "node": node,
                "proc": match.group("proc"),
                "pid": int(match.group("pid")),
                "rest": match.group("rest")
            }
            print(parsed)

In [12]:
import tqdm
from itertools import islice
from datetime import datetime

In [31]:
level_map = {"TRACE":0, "DEBUG":1, "INFO":2, "WARN":3, "ERROR":4, "FATAL":5}

hdfs_drain3_config = TemplateMinerConfig()
hdfs_drain3_persistence = FilePersistence("hdfs_drain3_state.bin")
hdfs_drain3_template_miner = TemplateMiner(hdfs_drain3_persistence, hdfs_drain3_config)

In [44]:
def preprocess_line(line):
    parts = line.split(maxsplit=5)  
    date_time = f"{parts[0]} {parts[1]}"
    thread = parts[2]
    level = parts[3]
    log_origin = parts[4][:-1]
    message = parts[5].strip()

    template_obj = hdfs_drain3_template_miner.add_log_message(message)

    parsed = {
        "date_time": date_time,
        "thread": thread,
        "level": level,
        "log_origin": log_origin,
        "message": message,
        "template": template_obj['template_mined'],
        "template_id": template_obj['cluster_id']
    }
    # print(parsed)
    return parsed

def chunk_and_read(chunk_size = 1000):
    with open(os.getenv("BIG_LOG_PATH"), "r") as f:
        while True:
            lines = list(islice(f, chunk_size))
            if not lines:
                break
            yield lines

def process_in_lines(limiter=50):
    i=0
    with open(os.getenv("BIG_LOG_PATH"), "r") as f:
        for line in f:
            if i == limiter:
                break
            print(line, end="")
            print(preprocess_line(line), end="\n\n")
            i += 1


In [46]:
process_in_lines()

081109 203518 143 INFO dfs.DataNode$DataXceiver: Receiving block blk_-1608999687919862906 src: /10.250.19.102:54106 dest: /10.250.19.102:50010
{'date_time': '081109 203518', 'thread': '143', 'level': 'INFO', 'log_origin': 'dfs.DataNode$DataXceiver', 'message': 'Receiving block blk_-1608999687919862906 src: /10.250.19.102:54106 dest: /10.250.19.102:50010', 'template': 'Receiving block <*> src: <*> dest: <*>', 'template_id': 1}

081109 203518 35 INFO dfs.FSNamesystem: BLOCK* NameSystem.allocateBlock: /mnt/hadoop/mapred/system/job_200811092030_0001/job.jar. blk_-1608999687919862906
{'date_time': '081109 203518', 'thread': '35', 'level': 'INFO', 'log_origin': 'dfs.FSNamesystem', 'message': 'BLOCK* NameSystem.allocateBlock: /mnt/hadoop/mapred/system/job_200811092030_0001/job.jar. blk_-1608999687919862906', 'template': 'BLOCK* NameSystem.allocateBlock: <*> <*>', 'template_id': 2}

081109 203519 143 INFO dfs.DataNode$DataXceiver: Receiving block blk_-1608999687919862906 src: /10.250.10.6:4052

In [None]:
#next steps

"""
Chunk by lines, 
save those as Dataframes and convert datetime, int, onehot encoded template ids, ordinal encoded levels, and int(thread)
create sliding window features
train model (IsoForest first, and then SVM and maybe LSTM autoencoder (keep 'pd.parquet's of dataframe before temporal)) batchwise
set up process in case new template found, to quickly retrain/integrate
Further automate this for reuse in multiple different log formats
"""

### Spacy 