# 4.4 Log Generalization and Difference

In [None]:
# !pip install pandas
# !pip install matplotlib

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import os
import re

## -- Log Configuration --

In [None]:
# Logs Path
path_to_logs = '../Collected_Logs/'

# Number of faults
num_faults = 4

# Mapping faults and components
fault_to_component_mapping = {
    'fault_1': 'flink-master',
    'fault_2': 'flink-worker',
    'fault_3': 'kafka-broker',
    'fault_4': 'functions'
}
component_mapping = {
    '/components-master-1': 'flink-master',
    '/components-worker-1': 'flink-worker',
    '/components-kafka-1': 'kafka-broker',
    '/components-functions-1': 'functions',
    '/components-console-1': 'kafka-console'
}

# Phase time ranges
phase_ranges = {
    'flink-master': [('2024-11-08 22:50:04','2024-11-08 22:56:05'), ('2024-11-08 22:58:06','2024-11-08 23:04:07'), ('2024-11-08 23:06:08','2024-11-08 23:12:08'), ('2024-11-08 23:14:10','2024-11-08 23:20:10')],
    'flink-worker': [('2024-11-08 22:50:05', '2024-11-08 22:52:06'), ('2024-11-08 22:55:06', '2024-11-08 22:57:07'), ('2024-11-08 23:00:08', '2024-11-08 23:02:08'), ('2024-11-08 23:05:09', '2024-11-08 23:07:09'), ('2024-11-08 23:10:10', '2024-11-08 23:12:10'), ('2024-11-08 23:15:11', '2024-11-08 23:17:12')],
    'kafka-broker': [('2024-11-08 22:50:06', '2024-11-08 22:52:07'), ('2024-11-08 22:55:07', '2024-11-08 22:57:08'), ('2024-11-08 23:00:08', '2024-11-08 23:02:09'), ('2024-11-08 23:05:09', '2024-11-08 23:07:10'), ('2024-11-08 23:10:11', '2024-11-08 23:12:12'), ('2024-11-08 23:15:12', '2024-11-08 23:17:13')],
    'functions': [('2024-11-08 22:50:07', '2024-11-08 22:52:07'), ('2024-11-08 22:55:07', '2024-11-08 22:57:08'), ('2024-11-08 23:00:08', '2024-11-08 23:02:09'), ('2024-11-08 23:05:09', '2024-11-08 23:07:10'), ('2024-11-08 23:10:10', '2024-11-08 23:12:10'), ('2024-11-08 23:15:11', '2024-11-08 23:17:11')],
}

## 4.1 Log Preparation

### Extract All Logs

In [None]:
# Set 'fault-target' attribute on all logs files
def load_logs(normal_file_path, fault_file_pattern, num_faults):
    normal_logs = pd.read_csv(normal_file_path)
    normal_logs['fault-target'] = 'normal'

    all_logs = [normal_logs]
    
    for i in range(1, num_faults + 1):
        fault_file = fault_file_pattern.format(i)
        if os.path.exists(fault_file):
            fault_logs = pd.read_csv(fault_file)
            fault_logs['fault-target'] = f'fault_{i}' 
            all_logs.append(fault_logs)
        else:
            print(f"File {fault_file} not found.")
    
    combined_logs = pd.concat(all_logs, ignore_index=True)
    return combined_logs

# Usage
normal_file_path = path_to_logs + 'normal.csv'
fault_file_pattern = path_to_logs + 'fault_{}.csv'
combined_logs_df = load_logs(normal_file_path, fault_file_pattern, num_faults)

combined_logs_df.head()

### Extract Relevant Attributes

In [None]:
# Filter the logs and only keep the needed columns
def filter_and_reorder_columns(logs_df):
    needed_columns = ['timestamp', 'fault-target', 'jsonPayload.container.name', 'jsonPayload.message']
    
    reordered_logs_df = logs_df[needed_columns]
    
    return reordered_logs_df

# Usage
attributes_logs_df = filter_and_reorder_columns(combined_logs_df).rename(columns={'jsonPayload.container.name': 'component'})

attributes_logs_df.head()

### Remove Logs with missing Values

In [None]:
# Remove incomplete logs 
def drop_rows_with_missing_values(logs_df):
    logs_df_cleaned = logs_df.dropna(how='any')

    return logs_df_cleaned

# Usage
processed_logs_df = drop_rows_with_missing_values(attributes_logs_df)
processed_logs_df['component'] = processed_logs_df['component'].replace(component_mapping)
processed_logs_df['fault-target'] = processed_logs_df['fault-target'].replace(fault_to_component_mapping)

processed_logs_df.head()

In [None]:
# Export result to CSV file
processed_logs_df.to_csv('../Results/Log_Generalization/4.4.1-collected_logs.csv', index=False)

## 4.2 Log Generalization

### Extract Severity Level

In [None]:
# Set 'severity_level' attribute
# Extracted and removed from 'jsonPayload.message'
def extract_and_remove_severity_level(logs_df):
    severity_pattern = r'\b(FATAL|ERROR|WARN|WARNING|INFO)\b'
    
    def clean_message_and_extract_severity(message):
        if isinstance(message, str):
            match = re.search(severity_pattern, message, re.IGNORECASE)
            if match:
                severity = match.group(0).upper()
                if severity == 'WARNING':
                    return 'WARN', message
                else:
                    return severity, message
            else:
                return None, None
        else:
            return None, None
    
    logs_df['severity_level'], logs_df['jsonPayload.message'] = zip(*logs_df['jsonPayload.message'].apply(clean_message_and_extract_severity))
    
    # Remove incomplete logs
    logs_df = logs_df.dropna(subset=['severity_level', 'jsonPayload.message'])

    return logs_df

# Usage:
severity_logs_df = extract_and_remove_severity_level(processed_logs_df)

severity_logs_df.head()

### Remove variable Patterns

In [None]:
# Clean 'jsonPayload.message' from dynamic content
def template_all_logs(logs_df):
    patterns = {
        'severity' : r'\b(FATAL|ERROR|WARN|WARNING|INFO)\b',
        'timestamp': r'\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}(,\d{3})?',
        'special': r'[@#~$]',
        'uuid': r'[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}',  
        'random_string': r'-[a-zA-Z0-9]{6}',
        'hex_id': r'@[a-fA-F0-9]{8}',
        'sender_id': r'\[@akka\.tcp:@:\w+\]',
        'recipient_id': r'\[@akka:#\]',
        'path': r'/[\w\-/.]+',
        'ip_address': r'\b(?:\d{1,3}\.){3}\d{1,3}\b',
        'port': r'\b\d{2,5}\b',
        'number': r'\b\d+(\.\d+)?\b',
        'error_code': r'\b[A-Z0-9]+\b',
        'mac_address': r'(?:[0-9A-Fa-f]{2}[:-]){5}(?:[0-9A-Fa-f]{2})',
        'jar_details': r'\[.*?\.jar:.*?\]',
        'method_call': r'\$\w+\.\w+\(.java:\)',
        'special_in_address': r'\(registry, proxy, [A-Za-z0-9]+\)',
        'job_id': r'\b(?=[A-Za-z]*\d)(?=\d*[A-Za-z])[A-Za-z0-9]+\b',
        'job_id_2': r'\b(?=[A-Za-z]*\d)(?=\d*[A-Za-z])[A-Za-z0-9]{8,}\b',
        'topic_id_': r'_[A-Za-z0-9]+(?=,|\.|$)',
        'task_id_': r'\b[A-Za-z0-9]{32}\b'
    }

    def automatic_log_templating(log_message):
        for placeholder, pattern in patterns.items():
            log_message = re.sub(pattern, '', log_message)
        return log_message

    logs_df['templated_message'] = logs_df['jsonPayload.message'].apply(automatic_log_templating)

    return logs_df

# Usage:
templated_logs_df = template_all_logs(severity_logs_df)
if 'jsonPayload.message' in templated_logs_df.columns:
    templated_logs_df = templated_logs_df.drop(columns=['jsonPayload.message'])

templated_logs_df.head()

In [None]:
# Export result to CSV file
templated_logs_df.to_csv('../Results/Log_Generalization/4.4.2-generalized_logs.csv', index=False)

## 4.3 Log Difference

### Extract Normal Logs with distinct MessagePayload

In [None]:
# Extract single message occurence in normal logs
def extract_distinct_normal_logs(logs_df):
    normal_logs_df = logs_df[logs_df['fault-target'] == 'normal']
    
    distinct_normal_logs = normal_logs_df.drop_duplicates(subset=['templated_message'])
    
    return distinct_normal_logs

# Usage:
distinct_normal_logs_df = extract_distinct_normal_logs(templated_logs_df)

distinct_normal_logs_df.head()

### Extract fault Logs

In [None]:
# Extract fault logs
def remove_normal_logs(logs_df):
    fault_only_logs_df = logs_df[logs_df['fault-target'] != 'normal']
    
    return fault_only_logs_df

# Usage
fault_logs_df = remove_normal_logs(templated_logs_df)

fault_logs_df.head()

### Remove fault Logs that appear in Normal Logs

In [None]:
# Extract faut logs that do not exist in normal logs
def remove_fault_logs_present_in_normal(distinct_normal_logs, fault_logs):
    distinct_normal_messages = distinct_normal_logs['templated_message'].unique()
    
    fault_logs_filtered_df = fault_logs[~fault_logs['templated_message'].isin(distinct_normal_messages)]
    
    return fault_logs_filtered_df

# Usage:
diff_logs_df = remove_fault_logs_present_in_normal(distinct_normal_logs_df, fault_logs_df)

diff_logs_df.head()

In [None]:
# Export result to CSV file
diff_logs_df.to_csv('../Results/Log_Generalization/4.4.3-difference_logs.csv', index=False)

## 4.4 Log Enrichment

#### Phase

In [None]:
def add_phase_flags(logs_df, phase_ranges):
# Set 'phase' attribute
# crash or recovery
    logs_df['timestamp'] = pd.to_datetime(logs_df['timestamp'], format='%Y-%m-%dT%H:%M:%S.%fZ', utc=True)
    
    def determine_phase(timestamp, fault_type):
        ranges = phase_ranges.get(fault_type, [])
        for i, (crash_start, crash_end) in enumerate(ranges):
            crash_start = pd.Timestamp(crash_start)
            crash_end = pd.Timestamp(crash_end)

            if crash_start.tzinfo is None:
                crash_start = crash_start.tz_localize('UTC')
            if crash_end.tzinfo is None:
                crash_end = crash_end.tz_localize('UTC')
            
            if crash_start <= timestamp <= crash_end:
                return 'crash'

            if i < len(ranges) - 1:
                next_crash_start = pd.Timestamp(ranges[i + 1][0])
                if next_crash_start.tzinfo is None:
                    next_crash_start = next_crash_start.tz_localize('UTC')
                    
                if crash_end < timestamp < next_crash_start:
                    return 'recover'
            else:
                if crash_end < timestamp:
                    return 'recover'
        return 'recover'

    logs_df['phase'] = logs_df.apply(
        lambda row: determine_phase(row['timestamp'], row['fault-target']),
        axis=1
    )
    
    return logs_df

# Usage
phases_logs_df = add_phase_flags(diff_logs_df, phase_ranges)

phases_logs_df.head()

### Calculate log frequency change

In [None]:
# Set frequency related attributes (frequency, frequency_phase, frequency_change)
def calculate_consolidated_frequency_change(logs_df, time_column='timestamp', time_window='H'):
    logs_df[time_column] = pd.to_datetime(logs_df[time_column])
    
    overall_frequency_df = logs_df.groupby(['fault-target', 'component', 'severity_level', 'templated_message']).size().reset_index(name='frequency')
    
    logs_df.set_index(time_column, inplace=True)
    
    frequency_by_time = logs_df.groupby([
        pd.Grouper(freq=time_window),
        'fault-target',
        'component',
        'severity_level',
        'templated_message',
        'phase'
    ]).size().reset_index(name='frequency_phase')
    
    frequency_by_time['frequency_change'] = frequency_by_time.groupby([
        'fault-target', 'component', 'severity_level', 'templated_message', 'phase'
    ])['frequency_phase'].diff().fillna(0)
    
    consolidated_df = frequency_by_time.drop(columns=[time_column]).groupby(
        ['fault-target', 'component', 'severity_level', 'templated_message', 'phase']
    ).agg({
        'frequency_phase': 'sum',
        'frequency_change': 'sum'
    }).reset_index()
    
    result_df = pd.merge(consolidated_df, overall_frequency_df, on=['fault-target', 'component', 'severity_level', 'templated_message'], how='left')
    
    return result_df

# Usage
frequency_change_logs_df = calculate_consolidated_frequency_change(phases_logs_df, time_window='10T')

frequency_change_logs_df.head()

### Component-specific

In [None]:
# Set 'component-specific' attribute
# True if log appears in only one component, else False
def add_component_specific_column(logs_df):
    component_counts = logs_df.groupby('templated_message')['component'].nunique().reset_index(name='component_count')
    
    logs_df = logs_df.merge(component_counts, on='templated_message', how='left')
    
    logs_df['component-specific'] = logs_df['component_count'] == 1
    
    # Remove 'component_count' column
    logs_df.drop(columns=['component_count'], inplace=True)
    
    return logs_df

# Usage
component_specific_logs_df = add_component_specific_column(frequency_change_logs_df)

component_specific_logs_df.head()

### Fault-specific

In [None]:
# Set 'specific' attribute 
# True if the log appears in only one fault scenario, else False
def add_fault_specific_column(logs_df):
    fault_counts = logs_df.groupby('templated_message')['fault-target'].nunique().reset_index(name='fault_count')
    
    logs_df = logs_df.merge(fault_counts, on='templated_message', how='left')
    
    logs_df['fault-specific'] = logs_df['fault_count'] == 1
    
    # Remove 'fault_count' column
    logs_df.drop(columns=['fault_count'], inplace=True)

    return logs_df

# Usage:
fault_specific_logs_df = add_fault_specific_column(component_specific_logs_df)

fault_specific_logs_df.head()

### Rarity

In [None]:
# Set 'rare' attribute
def add_rare_column(logs_df):
    logs_df['rare'] = (
        (logs_df['component-specific'] == True) &
        (logs_df['fault-specific'] == True) &
        (logs_df['frequency'] == 1)
    )
    
    return logs_df

# Usage:
rare_logs_df = add_rare_column(fault_specific_logs_df)

rare_logs_df.head()

### Direct Cause

In [None]:
# Set 'direct_cause' attribute
# True if the 'fault-target' and 'component' attributes match, else False
def add_direct_cause_column(logs_df, fault_to_component_mapping):
    def determine_direct_cause(row):
        fault_type = row['fault-target']
        component = row['component']
        
        return component == fault_type

    logs_df['direct_cause'] = logs_df.apply(determine_direct_cause, axis=1)
    
    return logs_df

# Usage
cause_logs_df = add_direct_cause_column(rare_logs_df, fault_to_component_mapping)

cause_logs_df.head()

## -- Log Summary --

In [None]:
# Filter the logs and only keep the needed attributes
def adjust_columns(logs_df):    
    desired_columns = ['severity_level','templated_message', 'direct_cause', 'rare' ,'fault-target', 'fault-specific', 'component',  'component-specific',
                       'phase', 'frequency_change', 'frequency', 'frequency_phase']
    
    logs_df = logs_df[desired_columns]
    
    return logs_df

# Usage
generalized_logs_df = adjust_columns(cause_logs_df)

generalized_logs_df.head()

# Export Results

In [None]:
# Export result to CSV file
generalized_logs_df.to_csv('../Results/Log_Generalization/4.4.4-enriched_logs.csv', index=False)