In [32]:
import os
import re
import json
import pandas as pd
from collections import defaultdict
from tqdm import tqdm
# -------------------- MAPPING DES TEMPLATES -------------------- #
def mapping(file_names, output_dir):
    os.makedirs(output_dir, exist_ok=True)
    for file_name in file_names:
        log_templates_file = os.path.join(output_dir, file_name)
        log_temp = pd.read_csv(log_templates_file).sort_values(by="Occurrences", ascending=False)
        log_temp_dict = {event: f"E{idx + 1}" for idx, event in enumerate(log_temp["EventId"])}
        output_file = os.path.join(output_dir, f"{file_name.replace('.csv', '')}.json")
        with open(output_file, "w") as f:
            json.dump(log_temp_dict, f)
        print(f"Mapping saved to {output_file}")

# -------------------- PRÉTRAITEMENT DES LOGS STRUCTURÉS -------------------- #
def process_log_files(input_dir, output_dir, json_filename, structured_log_filename, anomaly_label_filename, output_filename):
    json_file_path = os.path.join(output_dir, json_filename)
    anomaly_label_path = os.path.join(input_dir, anomaly_label_filename)
    structured_log_path = os.path.join(output_dir, structured_log_filename)

    df_structured = pd.read_csv(structured_log_path)
    with open(json_file_path, 'r') as json_file:
        event_mapping = json.load(json_file)
    df_labels = pd.read_csv(anomaly_label_path)
    df_labels['Label'] = df_labels['Label'].replace({'Normal': 'Success', 'Anomaly': 'Fail'})

    df_structured['BlockId'] = df_structured['Content'].apply(lambda x: re.search(r'blk_(|-)[0-9]+', x).group(0) if re.search(r'blk_(|-)[0-9]+', x) else None)
    df_structured = df_structured.dropna(subset=['BlockId'])
    df_structured['EventId'] = df_structured['EventId'].apply(lambda x: event_mapping.get(x, x))
    df_structured = pd.merge(df_structured, df_labels, on='BlockId', how='left')

    columns = ['BlockId', 'Label'] + [col for col in df_structured.columns if col not in ['BlockId', 'Label']]
    df_structured = df_structured[columns]

    output_path = os.path.join(output_filename)
    df_structured.to_csv(output_path, index=False)
    print(f"Processed log saved to {output_path}")

# -------------------- SAMPLING PAR SESSION -------------------- #
def hdfs_sampling(file_names, input_dir, output_dir, window='session', window_size=0):
    assert window == 'session', "Only window=session is supported."
    os.makedirs(output_dir, exist_ok=True)

    for file_name in file_names:
        input_path = os.path.join(input_dir, file_name)
        output_path = os.path.join(output_dir, file_name.replace('.csv', '_sequence.csv'))

        struct_log = pd.read_csv(input_path, engine='c', na_filter=False, memory_map=True, dtype={'Time': str})
        struct_log['Time'] = struct_log['Time'].str.zfill(6)
        struct_log['Date'] = struct_log['Date'].astype(str).str.zfill(6)
        struct_log['BlockId'] = struct_log['Content'].str.extract(r'(blk_-?\d+)')
        struct_log['EventId'] = struct_log['EventId'].fillna('')
        struct_log['Label'] = struct_log['Label'].apply(lambda x: 1 if x == 'Fail' else 0)

        data_dict, time_dict, date_dict, type_count = defaultdict(list), defaultdict(list), defaultdict(list), defaultdict(int)

        grouped = struct_log.groupby('BlockId')
        for block_id, group in tqdm(grouped, total=len(grouped)):
            data_dict[block_id] = group['EventId'].tolist()
            time_dict[block_id] = pd.to_datetime(group['Time'], format='%H%M%S', errors='coerce').dropna()
            date_dict[block_id] = group['Date'].tolist()
            type_count[block_id] = group['Label'].sum()

        rows = []
        for block_id, events in tqdm(data_dict.items(), total=len(data_dict)):
            features = [event for event in events if event]
            times = time_dict[block_id]
            dates = date_dict[block_id]
            time_intervals = [(times.iloc[i] - times.iloc[i - 1]).total_seconds() for i in range(1, len(times))] if len(times) > 1 else []
            latency = (times.iloc[-1] - times.iloc[0]).total_seconds() if len(times) > 1 else 0
            label = 'Fail' if type_count[block_id] > 0 else 'Success'
            first_date = dates[0] if dates else ''
            first_time = times.iloc[0].strftime('%H%M%S') if not times.empty else ''

            rows.append({
                "BlockId": block_id,
                "Label": label,
                "Type": type_count[block_id],
                "Features": str(features),
                "Date": first_date,
                "Time": first_time,
                "TimeInterval": str(time_intervals),
                "Latency": latency
            })

        data_df = pd.DataFrame(rows, columns=['BlockId', 'Label', 'Type', 'Features', 'Date', 'Time', 'TimeInterval', 'Latency'])
        data_df.to_csv(output_path, index=False)
        print(f"Saved: {output_path}")


In [33]:
import os

from google.colab import drive
drive.mount('/content/drive', force_remount=True)


path = '/content/drive/MyDrive/ProjetEts/HDFS_results/HDFS_train.log_structured.csv'
print("✅ Existe :", os.path.exists(path))


target_dir = "/content/drive/MyDrive/ProjetEts/HDFS_results/"

for f in os.listdir(target_dir):
    print(f)


Mounted at /content/drive
✅ Existe : True
Data_Preprocessing.py
Event_occurence_matrix_HDFS_train.csv
Event_occurence_matrix_HDFS_valid.csv
Event_occurence_matrix_HDFS_test.csv
Event_occurence_matrix_HDFS_test_predicted.csv
HDFS_train.log_structured_blk_sequence.csv
HDFS_valid.log_structured_blk_sequence.csv
HDFS_test.log_structured_blk_sequence.csv
Event_traces.csv
Event_occurrence_matrix.csv
anomaly_label.csv
HDFS_train.log_structured.csv
HDFS_train.log_templates.csv
HDFS_valid.log_structured.csv
HDFS_valid.log_templates.csv
HDFS_test.log_structured.csv
HDFS_test.log_templates.csv
HDFS_train.log_structured_blk.csv
HDFS_valid.log_structured_blk.csv
HDFS_test.log_structured_blk.csv
HDFS_train.log_templates.json
HDFS_test.log_templates.json
HDFS_valid.log_templates.json


In [34]:
import os
import re
import json
import pandas as pd
from collections import defaultdict
from tqdm import tqdm

# ========================
# Fonctions Utilitaires
# ========================

def mount_drive():
    from google.colab import drive
    drive.mount("/content/drive", force_remount=True)
    print("Drive mounted")

print(os.path.exists('/content/drive/MyDrive/ProjetEts/HDFS_results/HDFS_train.log_structured.csv'))


def process_log_files(input_dir, output_dir, json_filename, structured_log_filename, anomaly_label_filename, output_filename):
    json_file_path = os.path.join(output_dir, json_filename)
    anomaly_label_path = os.path.join(input_dir, anomaly_label_filename)
    structured_log_path = os.path.join(output_dir, structured_log_filename)

    df_structured = pd.read_csv(structured_log_path)
    with open(json_file_path, 'r') as json_file:
        event_mapping = json.load(json_file)
    df_labels = pd.read_csv(anomaly_label_path)
    df_labels['Label'] = df_labels['Label'].replace({'Normal': 'Success', 'Anomaly': 'Fail'})

    df_structured['BlockId'] = df_structured['Content'].apply(lambda x: re.search(r'blk_(|-)[0-9]+', x).group(0) if re.search(r'blk_(|-)[0-9]+', x) else None)
    df_structured = df_structured.dropna(subset=['BlockId'])
    df_structured['EventId'] = df_structured['EventId'].apply(lambda x: event_mapping.get(x, x))
    df_structured = pd.merge(df_structured, df_labels, on='BlockId', how='left')

    columns = ['BlockId', 'Label'] + [col for col in df_structured.columns if col not in ['BlockId', 'Label']]
    df_structured = df_structured[columns]

    df_structured.to_csv(output_filename, index=False)
    print(f"✅ Fichier généré : {output_filename}")

def hdfs_sampling(file_names, input_dir, output_dir):
    os.makedirs(output_dir, exist_ok=True)
    for file_name in file_names:
        input_path = os.path.join(input_dir, file_name)
        output_path = os.path.join(output_dir, file_name.replace('.csv', '_sequence.csv'))

        struct_log = pd.read_csv(input_path, engine='c', na_filter=False, memory_map=True, dtype={'Time': str})
        struct_log['Time'] = struct_log['Time'].str.zfill(6)
        struct_log['Date'] = struct_log['Date'].astype(str).str.zfill(6)
        struct_log['BlockId'] = struct_log['Content'].str.extract(r'(blk_-?\d+)')
        struct_log['EventId'] = struct_log['EventId'].fillna('')
        struct_log['Label'] = struct_log['Label'].apply(lambda x: 1 if x == 'Fail' else 0)

        data_dict = defaultdict(list)
        time_dict = defaultdict(list)
        date_dict = defaultdict(list)
        type_count = defaultdict(int)

        grouped = struct_log.groupby('BlockId')
        for block_id, group in tqdm(grouped, total=len(grouped)):
            data_dict[block_id] = group['EventId'].tolist()
            time_dict[block_id] = pd.to_datetime(group['Time'], format='%H%M%S', errors='coerce').dropna()
            date_dict[block_id] = group['Date'].tolist()
            type_count[block_id] = group['Label'].sum()

        rows = []
        for block_id, events in tqdm(data_dict.items(), total=len(data_dict)):
            features = [event for event in events if event]
            times = time_dict[block_id]
            dates = date_dict[block_id]
            if len(times) > 1:
                time_intervals = [(times.iloc[i] - times.iloc[i - 1]).total_seconds() for i in range(1, len(times))]
                latency = (times.iloc[-1] - times.iloc[0]).total_seconds()
            else:
                time_intervals = []
                latency = 0
            label = 'Fail' if type_count[block_id] > 0 else 'Success'
            first_date = dates[0] if dates else ''
            first_time = times.iloc[0].strftime('%H%M%S') if not times.empty else ''
            rows.append({
                "BlockId": block_id,
                "Label": label,
                "Type": type_count[block_id],
                "Features": str(features),
                "Date": first_date,
                "Time": first_time,
                "TimeInterval": str(time_intervals),
                "Latency": latency
            })

        data_df = pd.DataFrame(rows)
        data_df.to_csv(output_path, index=False)
        print(f"✅ HDFS sampling terminé : {output_path}")

def generate_event_occurrence_matrix(log_files, event_traces_files, input_dir, output_dir, event_columns=None):
    if event_columns is None:
        event_columns = [f"E{i}" for i in range(1, 30)]
    anomaly_label_file = os.path.join(input_dir, "anomaly_label.csv")
    anomaly_labels = pd.read_csv(anomaly_label_file)
    anomaly_labels['Label'] = anomaly_labels['Label'].apply(lambda x: 'Fail' if x == 'Anomaly' else 'Success')
    label_dict = anomaly_labels.set_index('BlockId')['Label'].to_dict()

    for log_file, event_traces_file in zip(log_files, event_traces_files):
        output_file = os.path.join(output_dir, f"Event_occurence_matrix_{log_file.replace('.log', '')}.csv")
        print(f"Processing {log_file}...")
        event_traces = pd.read_csv(event_traces_file)
        occurrence_matrix = []

        for _, row in event_traces.iterrows():
            block_id = row['BlockId']
            label = label_dict.get(block_id, 'Unknown')
            event_list = re.findall(r"E\d+", row['Features'])
            event_counts = {event: event_list.count(event) for event in event_columns}
            occurrence_matrix.append({
                "BlockId": block_id,
                "Label": label,
                "Type": int(row['Type']) if pd.notna(row['Type']) else 0,
                "Time": row.get('Time', ''),
                "Date": row.get('Date', ''),
                **event_counts
            })

        occurrence_matrix_df = pd.DataFrame(occurrence_matrix)
        occurrence_matrix_df.to_csv(output_file, index=False)
        print(f"✅ Matrice d'occurrence sauvegardée : {output_file}")

# ========================
# Execution du pipeline
# ========================

input_dir = '/content/drive/MyDrive/ProjetEts/HDFS_results/'
output_dir = '/content/drive/MyDrive/ProjetEts/HDFS_results/'

process_log_files(input_dir, output_dir, 'HDFS_train.log_templates.json', 'HDFS_train.log_structured.csv', 'anomaly_label.csv', os.path.join(output_dir, 'HDFS_train.log_structured_blk.csv'))
process_log_files(input_dir, output_dir, 'HDFS_valid.log_templates.json', 'HDFS_valid.log_structured.csv', 'anomaly_label.csv', os.path.join(output_dir, 'HDFS_valid.log_structured_blk.csv'))
process_log_files(input_dir, output_dir, 'HDFS_test.log_templates.json',  'HDFS_test.log_structured.csv',  'anomaly_label.csv', os.path.join(output_dir, 'HDFS_test.log_structured_blk.csv'))

hdfs_sampling([
    'HDFS_train.log_structured_blk.csv',
    'HDFS_valid.log_structured_blk.csv',
    'HDFS_test.log_structured_blk.csv'
], output_dir, output_dir)

generate_event_occurrence_matrix(
    ['HDFS_train.log', 'HDFS_valid.log', 'HDFS_test.log'],
    [
        os.path.join(output_dir, 'HDFS_train.log_structured_blk_sequence.csv'),
        os.path.join(output_dir, 'HDFS_valid.log_structured_blk_sequence.csv'),
        os.path.join(output_dir, 'HDFS_test.log_structured_blk_sequence.csv')
    ],
    '/content/drive/MyDrive/ProjetEts/',
    output_dir
)



True
✅ Fichier généré : /content/drive/MyDrive/ProjetEts/HDFS_results/HDFS_train.log_structured_blk.csv
✅ Fichier généré : /content/drive/MyDrive/ProjetEts/HDFS_results/HDFS_valid.log_structured_blk.csv
✅ Fichier généré : /content/drive/MyDrive/ProjetEts/HDFS_results/HDFS_test.log_structured_blk.csv


100%|██████████| 333405/333405 [04:33<00:00, 1220.31it/s]
100%|██████████| 333405/333405 [03:00<00:00, 1842.43it/s]


✅ HDFS sampling terminé : /content/drive/MyDrive/ProjetEts/HDFS_results/HDFS_train.log_structured_blk_sequence.csv


100%|██████████| 191205/191205 [02:30<00:00, 1274.42it/s]
100%|██████████| 191205/191205 [01:03<00:00, 3018.13it/s]


✅ HDFS sampling terminé : /content/drive/MyDrive/ProjetEts/HDFS_results/HDFS_valid.log_structured_blk_sequence.csv


100%|██████████| 180558/180558 [02:30<00:00, 1202.86it/s]
100%|██████████| 180558/180558 [01:00<00:00, 2965.62it/s]


✅ HDFS sampling terminé : /content/drive/MyDrive/ProjetEts/HDFS_results/HDFS_test.log_structured_blk_sequence.csv
Processing HDFS_train.log...
✅ Matrice d'occurrence sauvegardée : /content/drive/MyDrive/ProjetEts/HDFS_results/Event_occurence_matrix_HDFS_train.csv
Processing HDFS_valid.log...
✅ Matrice d'occurrence sauvegardée : /content/drive/MyDrive/ProjetEts/HDFS_results/Event_occurence_matrix_HDFS_valid.csv
Processing HDFS_test.log...
✅ Matrice d'occurrence sauvegardée : /content/drive/MyDrive/ProjetEts/HDFS_results/Event_occurence_matrix_HDFS_test.csv
