# CSV Reader

`def read_large_csv(file_path)` parse CSV line by line.
To use it, run `for row in read_large_csv(xxx):`

In [None]:
import csv

def read_large_csv(file_path):
    with open(file_path, mode='r', encoding='utf-8') as f:
        reader = csv.reader(f)
        header = next(reader)
        print(f"CSV Header of {file_path}: {header}")
        for row in reader:
            yield row

# Retrieve Anomaly Groudtruth from log files

Grountruth anomaly log files are from [../../Datasets/dataset2/data/run/run_table_2021-07.csv](../../Datasets/dataset2/data/run/run_table_2021-07.csv) and [../../Datasets/dataset2/data/run/run_table_2021-08.csv](../../Datasets/dataset2/data/run/run_table_2021-08.csv).
The following scripts generates an **anomalies** list.
Each of its elements is `(anomaly_type, start_timestamp, end_timestamp, duration)`.
`anomaly_type` contains `Mem`, `Single Trace Wait`, `File`, `CPU`, `Access`.

In [None]:
# Retrieve Anomaly Groudtruth from the log files

from datetime import datetime, timedelta

injection_log_1 = "../../Datasets/dataset2/data/run/run_table_2021-07.csv"
injection_log_2 = "../../Datasets/dataset2/data/run/run_table_2021-08.csv"

# anomaly_types: Mem, Single Trace Wait, File, CPU, Access
# anomalies = [(anomaly_type, start_timestamp, end_timestamp, duration), ...]
anomalies = []
anomaly_raw_cnt = 0

for row in read_large_csv(injection_log_1):
    log_entry = row[-1].split(' | ')

    # These logs are mostly '(Background on this error at: http://sqlalche.me/e/14/e3q8)\n'
    if len(log_entry) < 2:
        continue

    log_type = log_entry[1]
    if log_type == "INFO" or log_type == "ERROR":
        continue
    if log_type != "WARNING":
        raise Exception(f"Unknown log type: {log_type}")

    anomaly_raw_cnt += 1    
    if "[memory_anomalies]" in log_entry[-1]:
        anomaly_info = log_entry[-1]
        anomaly_type = "Mem"

        # 提取开始时间
        start_indicator = "start at "
        start_idx = anomaly_info.index(start_indicator) + len(start_indicator)
        end_idx = anomaly_info.index(" and lasts", start_idx)
        timestamp_str = anomaly_info[start_idx:end_idx]
        start_ts = datetime.strptime(timestamp_str, "%Y-%m-%d %H:%M:%S.%f")

        lasts_indicator = "lasts "
        lasts_idx = anomaly_info.index(lasts_indicator) + len(lasts_indicator)
        lasts_end_idx = anomaly_info.index(" seconds", lasts_idx)
        lasts_str = anomaly_info[lasts_idx:lasts_end_idx]
        duration = float(lasts_str)

        end_ts = start_ts + timedelta(seconds=duration)

        # print(log_entry)
        # print(f"Anomaly: {start_ts} - {end_ts} ({duration}s)")
        anomalies.append((anomaly_type, start_ts, end_ts, duration))
    elif "[normal memory freed label]" in log_entry[-1]:
        continue
    elif "simulate the login failure of the QR code expired" in log_entry[-1]:
        anomaly_info = log_entry[-1]
        anomaly_type = "Single Trace Wait"

        # 提取开始时间
        timestamp_sec_str = log_entry[0].split(',')[0]
        start_ts = datetime.strptime(timestamp_sec_str, "%Y-%m-%d %H:%M:%S")

        lasts_indicator = "wait for "
        lasts_idx = anomaly_info.index(lasts_indicator) + len(lasts_indicator)
        lasts_end_idx = anomaly_info.index(" seconds", lasts_idx)
        lasts_str = anomaly_info[lasts_idx:lasts_end_idx]
        duration = float(lasts_str)

        end_ts = start_ts + timedelta(seconds=duration)

        # print(log_entry)
        # print(f"Anomaly: {start_ts} - {end_ts} ({duration}s)")
        anomalies.append((anomaly_type, start_ts, end_ts, duration))
    elif "trigger the file moving program" in log_entry[-1]:
        anomaly_info = log_entry[-1]
        anomaly_type = "File"

        # 提取开始时间
        start_indicator = "start with "
        start_idx = anomaly_info.index(start_indicator) + len(start_indicator)
        end_idx = anomaly_info.index(", last for", start_idx)
        timestamp_str = anomaly_info[start_idx:end_idx]
        start_ts = datetime.strptime(timestamp_str, "%Y-%m-%d %H:%M:%S.%f")

        lasts_indicator = "last for "
        lasts_idx = anomaly_info.index(lasts_indicator) + len(lasts_indicator)
        lasts_end_idx = anomaly_info.index(" seconds", lasts_idx)
        lasts_str = anomaly_info[lasts_idx:lasts_end_idx]
        duration = float(lasts_str)

        end_ts = start_ts + timedelta(seconds=duration)

        # print(log_entry)
        # print(f"Anomaly: {start_ts} - {end_ts} ({duration}s)")
        anomalies.append((anomaly_type, start_ts, end_ts, duration))
    elif "[cpu_anomalies]" in log_entry[-1]:
        anomaly_info = log_entry[-1]
        anomaly_type = "CPU"

        # 提取开始时间
        start_indicator = "start at "
        start_idx = anomaly_info.index(start_indicator) + len(start_indicator)
        end_idx = anomaly_info.index(" and lasts", start_idx)
        timestamp_str = anomaly_info[start_idx:end_idx]
        start_ts = datetime.strptime(timestamp_str, "%Y-%m-%d %H:%M:%S.%f")

        lasts_indicator = "lasts "
        lasts_idx = anomaly_info.index(lasts_indicator) + len(lasts_indicator)
        lasts_end_idx = anomaly_info.index(" seconds", lasts_idx)
        lasts_str = anomaly_info[lasts_idx:lasts_end_idx]
        duration = float(lasts_str)

        end_ts = start_ts + timedelta(seconds=duration)

        # print(log_entry)
        # print(f"Anomaly: {start_ts} - {end_ts} ({duration}s)")
        if duration > 24 * 60 * 60 * 14:
            print(f"The anomaly is too long, ignored: {duration}s = {duration / 60 / 60}h = {duration / 60 / 60 / 24}d")
            continue
        anomalies.append((anomaly_type, start_ts, end_ts, duration))
    elif "trigger an access permission denied exception, will lasts an hour" in log_entry[-1]:
        anomaly_type = "Access"

        # 提取开始时间
        timestamp_sec_str = log_entry[0].split(',')[0]
        start_ts = datetime.strptime(timestamp_sec_str, "%Y-%m-%d %H:%M:%S")

        duration = 60 * 60  # 1 Hour

        end_ts = start_ts + timedelta(seconds=duration)        
        # print(log_entry)
        # print(f"Anomaly: {start_ts} - {end_ts} ({duration}s)")
        anomalies.append((anomaly_type, start_ts, end_ts, duration))
    else:
        print(log_entry)

for row in read_large_csv(injection_log_2):
    log_entry = row[-1].split(' | ')

    # These logs are mostly '(Background on this error at: http://sqlalche.me/e/14/e3q8)\n'
    if len(log_entry) < 2:
        continue

    log_type = log_entry[1]
    if log_type == "INFO" or log_type == "ERROR":
        continue
    if log_type != "WARNING":
        raise Exception(f"Unknown log type: {log_type}")

    anomaly_raw_cnt += 1    
    if "[memory_anomalies]" in log_entry[-1]:
        anomaly_info = log_entry[-1]
        anomaly_type = "Mem"

        # 提取开始时间
        start_indicator = "start at "
        start_idx = anomaly_info.index(start_indicator) + len(start_indicator)
        end_idx = anomaly_info.index(" and lasts", start_idx)
        timestamp_str = anomaly_info[start_idx:end_idx]
        start_ts = datetime.strptime(timestamp_str, "%Y-%m-%d %H:%M:%S.%f")

        lasts_indicator = "lasts "
        lasts_idx = anomaly_info.index(lasts_indicator) + len(lasts_indicator)
        lasts_end_idx = anomaly_info.index(" seconds", lasts_idx)
        lasts_str = anomaly_info[lasts_idx:lasts_end_idx]
        duration = float(lasts_str)

        end_ts = start_ts + timedelta(seconds=duration)

        # print(log_entry)
        # print(f"Anomaly: {start_ts} - {end_ts} ({duration}s)")
        anomalies.append((anomaly_type, start_ts, end_ts, duration))
    elif "[normal memory freed label]" in log_entry[-1]:
        continue
    elif "simulate the login failure of the QR code expired" in log_entry[-1]:
        anomaly_info = log_entry[-1]
        anomaly_type = "Single Trace Wait"

        # 提取开始时间
        timestamp_sec_str = log_entry[0].split(',')[0]
        start_ts = datetime.strptime(timestamp_sec_str, "%Y-%m-%d %H:%M:%S")

        lasts_indicator = "wait for "
        lasts_idx = anomaly_info.index(lasts_indicator) + len(lasts_indicator)
        lasts_end_idx = anomaly_info.index(" seconds", lasts_idx)
        lasts_str = anomaly_info[lasts_idx:lasts_end_idx]
        duration = float(lasts_str)

        end_ts = start_ts + timedelta(seconds=duration)

        # print(log_entry)
        # print(f"Anomaly: {start_ts} - {end_ts} ({duration}s)")
        anomalies.append((anomaly_type, start_ts, end_ts, duration))
    elif "trigger the file moving program" in log_entry[-1]:
        anomaly_info = log_entry[-1]
        anomaly_type = "File"

        # 提取开始时间
        start_indicator = "start with "
        start_idx = anomaly_info.index(start_indicator) + len(start_indicator)
        end_idx = anomaly_info.index(", last for", start_idx)
        timestamp_str = anomaly_info[start_idx:end_idx]
        start_ts = datetime.strptime(timestamp_str, "%Y-%m-%d %H:%M:%S.%f")

        lasts_indicator = "last for "
        lasts_idx = anomaly_info.index(lasts_indicator) + len(lasts_indicator)
        lasts_end_idx = anomaly_info.index(" seconds", lasts_idx)
        lasts_str = anomaly_info[lasts_idx:lasts_end_idx]
        duration = float(lasts_str)

        end_ts = start_ts + timedelta(seconds=duration)

        # print(log_entry)
        # print(f"Anomaly: {start_ts} - {end_ts} ({duration}s)")
        anomalies.append((anomaly_type, start_ts, end_ts, duration))
    elif "[cpu_anomalies]" in log_entry[-1]:
        anomaly_info = log_entry[-1]
        anomaly_type = "CPU"

        # 提取开始时间
        start_indicator = "start at "
        start_idx = anomaly_info.index(start_indicator) + len(start_indicator)
        end_idx = anomaly_info.index(" and lasts", start_idx)
        timestamp_str = anomaly_info[start_idx:end_idx]
        start_ts = datetime.strptime(timestamp_str, "%Y-%m-%d %H:%M:%S.%f")

        lasts_indicator = "lasts "
        lasts_idx = anomaly_info.index(lasts_indicator) + len(lasts_indicator)
        lasts_end_idx = anomaly_info.index(" seconds", lasts_idx)
        lasts_str = anomaly_info[lasts_idx:lasts_end_idx]
        duration = float(lasts_str)

        end_ts = start_ts + timedelta(seconds=duration)

        # print(log_entry)
        # print(f"Anomaly: {start_ts} - {end_ts} ({duration}s)")
        if duration > 24 * 60 * 60 * 14:
            print(f"The anomaly is too long, ignored: {duration}s = {duration / 60 / 60}h = {duration / 60 / 60 / 24}d")
            continue
        anomalies.append((anomaly_type, start_ts, end_ts, duration))
    elif "trigger an access permission denied exception, will lasts an hour" in log_entry[-1]:
        anomaly_type = "Access"

        # 提取开始时间
        timestamp_sec_str = log_entry[0].split(',')[0]
        start_ts = datetime.strptime(timestamp_sec_str, "%Y-%m-%d %H:%M:%S")

        duration = 60 * 60  # 1 Hour

        end_ts = start_ts + timedelta(seconds=duration)        
        # print(log_entry)
        # print(f"Anomaly: {start_ts} - {end_ts} ({duration}s)")
        anomalies.append((anomaly_type, start_ts, end_ts, duration))
    else:
        print(log_entry)

print(f"Anomaly count: {anomaly_raw_cnt}, {len(anomalies)} anomalies found.")
# for anomaly in anomalies:
#     print(anomaly)

# Retrieve End-to-end Anomaly Symptom from trace files

Grountruth anomaly log files are from `../../Datasets/dataset2/data/trace/*.csv`.
The following scripts generates an **e2e_anomalies** list.
Each of its elements is `(start_timestamp, end_timestamp)`.
Anomalies are judged by 3-sigma principle ($\mu - 3 * \sigma \leq value \leq \mu + 3 * \sigma$).

In [None]:
import os
import sys

def list_files(root_dir):
    for root, dirs, files in os.walk(root_dir):
        for file in files:
            yield os.path.join(root, file)

# Generate trace first
trace_dir = "../../Datasets/dataset2/data/trace/"
trace_dict = {}
for trace_file in list_files(trace_dir):
    print(f"Processing {trace_file}")
    row_count = 0
    for row in read_large_csv(trace_file):
        row_count += 1
        if row_count % 500000 == 0:
            print(f"Processed {row_count} rows")

        service_name = row[2]
        trace_id = row[3]
        span_id = row[4]
        parent_span_id = row[5]
        try:
            start_ts = datetime.strptime(row[6], "%Y-%m-%d %H:%M:%S.%f")
        except ValueError:
            start_ts = datetime.strptime(row[6], "%Y-%m-%d %H:%M:%S")
        try:
            end_ts = datetime.strptime(row[7], "%Y-%m-%d %H:%M:%S.%f")
        except ValueError:
            end_ts = datetime.strptime(row[7], "%Y-%m-%d %H:%M:%S")
        status_code = int(row[9])

        if trace_id not in trace_dict:
            trace_dict[trace_id] = [(service_name, span_id, parent_span_id, start_ts, end_ts, status_code)]
        else:
            trace_dict[trace_id].append((service_name, span_id, parent_span_id, start_ts, end_ts, status_code))            

print(f"Trace count: {len(trace_dict)}")

In [None]:
def build_topology(span_list):
    def add_span_to_topology(topology, span):
        service, span_id, parent_id, _, _, _ = span
        if parent_id not in topology:
            topology[parent_id] = {'children': [], 'service': "Start"}
        if span_id not in topology:
            topology[span_id] = {'service': service, 'children': []}
        topology[parent_id]['children'].append(span_id)
        topology[span_id]['service'] = service

    topology = {}
    for span in span_list:
        add_span_to_topology(topology, span)
    return topology

def normalize_topology(topology, root_id='0'):
    """
    Standardize the topology for comparison by normalizing the ordering.
    """
    def recursive_normalize(node):
        if 'service' in node:
            node_str = node['service']
        else:
            node_str = ""
        children_strs = []
        for child_id in node['children']:
            if child_id in topology:
                child_str = recursive_normalize(topology[child_id])
                children_strs.append(child_str)
        children_strs.sort()
        complete_str = node_str + "(" + ",".join(children_strs) + ")"
        return complete_str

    root_topology = topology.get(root_id, {'service': '', 'children': []})
    return recursive_normalize(root_topology)

e2e_latency = dict()
error_code_set = set()
for trace_id, spans in trace_dict.items():
    topo = build_topology(spans)
    norm_topo = normalize_topology(topo)
    start_ts, end_ts, ret_code = None, None, None
    for span in spans:
        # print(span)
        error_code_set.add(span[-1])

        if ret_code is None or ret_code < span[-1]:
            ret_code = span[-1]
        if start_ts is None or span[3] < start_ts:
            start_ts = span[3]
        if end_ts is None or end_ts < span[4]:
            end_ts = span[4]

    if norm_topo not in e2e_latency:
        e2e_latency[norm_topo] = [(start_ts, end_ts, ret_code)]
    else:
        e2e_latency[norm_topo].append((start_ts, end_ts, ret_code))  

print(f"Error code set: {error_code_set}")

print(f"Trace count: {len(e2e_latency)}")

In [None]:
import numpy as np

e2e_anomalies = []
e2e_stat = {}

print(len(e2e_latency))
for norm_topo in e2e_latency.keys():
    durations = []
    for entry in e2e_latency[norm_topo]:
        duration = (entry[1] - entry[0]).total_seconds()
        durations.append(duration)
    mean, std = np.mean(durations), np.std(durations)
    try:
        assert mean > 0
    except AssertionError:
        print(f"Anomaly: {norm_topo}, {mean}, {std}")
        print(e2e_latency[norm_topo])
        print(durations)
        raise AssertionError
    e2e_stat[norm_topo] = (mean, std)

for norm_topo in e2e_latency.keys():
    mean, std = e2e_stat[norm_topo]
    for entry in e2e_latency[norm_topo]:
        if entry[2] != 200 and entry[2] != 300:
            e2e_anomalies.append((entry[0], entry[1]))
            continue

        duration = (entry[1] - entry[0]).total_seconds()
        if duration <= mean - 3 * std or mean + 3 * std <= duration:
            e2e_anomalies.append((entry[0], entry[1]))

e2e_anomalies = sorted(e2e_anomalies, key=lambda x: x[0])
print(len(e2e_anomalies))
for anomaly in e2e_anomalies:
    print(anomaly)

In [None]:
del trace_dict
del error_code_set
del e2e_latency
del e2e_stat

In [None]:
import gc

gc.collect()

# Retrieve Metric Anomaly Symptom from metric files

Grountruth anomaly log files are from `../../Datasets/dataset2/data/metric/*.csv`.
The following scripts generates an **metric_anomalies** dict.
Each of its elements is `{key: metric_name, value: (start_timestamp, end_timestamp)}`.
Anomalies are judged by 3-sigma principle.

In [None]:
# Read metric first
metric_dir = "../../Datasets/dataset2/data/metric/"
metric_anomalies = {}
metric_interval = {}

for metric_file in list_files(metric_dir):
    metric_name = "_".join(os.path.basename(metric_file).split('_')[:-2])
    raw_metric_list = []
    print(f"Processing metric: {metric_name}")

    for row in read_large_csv(metric_file):
        ts, val = datetime.fromtimestamp(float(row[0]) / 1000), float(row[1])
        raw_metric_list.append((ts, val))

    raw_metric_list = sorted(raw_metric_list, key=lambda x: x[0])
    raw_metric_val = [val for _, val in raw_metric_list]
    mean, std = np.mean(raw_metric_val), np.std(raw_metric_val)
    print(f"Mean: {mean}, Std: {std}, Min: {min(raw_metric_val)}, Max: {max(raw_metric_val)}, Length: {len(raw_metric_val)}")

    last_ts, interval = None, None
    for ts, val in raw_metric_list:
        if last_ts is not None:
            if interval is None or interval == 0:
                interval = (ts - last_ts).total_seconds()
            else:
                try:
                    assert interval == (ts - last_ts).total_seconds()
                except AssertionError:
                    print(f"Interval mismatch: origin interval {interval}, current interval {(ts - last_ts).total_seconds()}, current {ts} - {ts.timestamp()}, last {last_ts} - {last_ts.timestamp()}")
        last_ts = ts

        if val <= mean - 3 * std or mean + 3 * std <= val:
            print(f"Anomaly: {ts} - {val}")
            if metric_name not in metric_anomalies:
                metric_anomalies[metric_name] = [ts]
            else:
                metric_anomalies[metric_name].append(ts)

    if metric_name not in metric_anomalies:
        print(f"No anomaly found in {metric_name}")
        print(f"Mean: {mean}, Std: {std}, Min: {min(raw_metric_val)}, Max: {max(raw_metric_val)}")
    else:
        print(f"Metric: {metric_name}, from total {len(raw_metric_val)} values, {len(metric_anomalies[metric_name])} anomalies found.")

# print(f"Trace count: {len(trace_dict)}")