In [None]:
import pandas as pd
import numpy as np
import os

In [None]:
from datetime import datetime
import pytz
import time

tz = pytz.timezone('Asia/Shanghai')
def ts_to_date(timestamp):
    return datetime.fromtimestamp(timestamp//1000, tz).strftime('%Y-%m-%d')
#     return datetime.fromtimestamp(timestamp//1000, tz).strftime('%m%d')

def time_to_ts(ctime):
    try:
        timeArray = time.strptime(ctime, '%Y-%m-%d %H:%M:%S.%f')
    except:
        timeArray = time.strptime(ctime, '%Y-%m-%d %H:%M:%S')
    return int(time.mktime(timeArray))*1000

In [None]:
from multiprocessing import Process
import json

class TraceUtils:
    def __init__(self, data_dir, dataset='gaia'):
        self.data_dir = data_dir
        self.dataset = dataset
    
    def get_trace_by_day(self, day: int):
        day = f'0{day}' if day < 10 else str(day)
        temp = []
        for service in os.listdir(self.data_dir):
            filepath = os.path.join(self.data_dir, 
                                                 f'{service}/trace_{service}_2021-07-{day}.csv')
            if not os.path.exists(filepath):
                continue
            temp.append(pd.read_csv(filepath, index_col=0))
        data = pd.concat(temp, ignore_index=True)
        data['timestamp'] = data['timestamp'].apply(time_to_ts)
        return data

    def get_trace_by_ts(self, ts):
        date = ''.join(ts_to_date(ts).split('-')[1: ])
#         data = pd.read_csv(os.path.join(self.data_dir, f'{date}.csv')) # gaia
        data = pd.read_csv(os.path.join(self.data_dir, f'trace_{date}.csv')) # 21aiops
        return data
    
    # lagency/duration需要减去子调用的最大值
    def data_process(self, start_ts, end_ts):
        day = int(ts_to_date(start_ts).split('-')[-1])
        node = 'service_name' if self.dataset == 'gaia' else 'cmdb_id'
        if self.dataset == 'gaia':
            node = 'service_name'
            column = 'lagency'
            df = self.get_trace_by_day(day)
            if (start_ts is not None) and (end_ts is not None):
                sub_df = df[(df['timestamp']>=start_ts)&(df['timestamp']<end_ts)]
            else:
                sub_df = df
            csub_df = sub_df.groupby(['trace_id', 'parent_id'], as_index=False).max()
            csub_df = csub_df[[
                    'parent_id', node, column
                ]].rename(columns={'parent_id': 'span_id', node: f'c{node}', column: f'c{column}'})
            data = pd.merge(sub_df, csub_df, on='span_id', how='left').fillna(0)
            data['real_duration'] = data[column] - data[f'c{column}']
        elif self.dataset == '21aiops':
#             node = 'cmdb_id'
#             column = 'duration'
            df = self.get_trace_by_ts(start_ts)
            df['timestamp'] = df['timestamp'] if df['timestamp'].values[0] >= 1e12 else df['timestamp']*1000
            data = df[(df['timestamp']>=start_ts)&(df['timestamp']<end_ts)]
        else:
            raise Exception('Unknow dataset!')
        return data
    
class TraceAnalysis(Process):
    def __init__(self, cases, data_dir, pid, load, dataset='gaia', config=None):
        super().__init__()
        self.id = pid
        self.cases = cases.iloc[pid*load: (pid+1)*load]
        self.tu = TraceUtils(data_dir, dataset)
        self.config = config
        self.dataset = dataset
        if config is None:
            self.config = {}
            self.config['threshold'] = 50 # 认为异常的duration阈值
            self.config['minute'] = 60000
        self.res = dict(zip(list(self.cases.index), [set() for _ in range(len(self.cases))])) # 报告可疑微服务
        self.time_used = None
    
    def analysis(self):
        start_time = time.time()
        if self.dataset == 'gaia':
            node = 'service_name'
        elif self.dataset == '21aiops':
            node = 'cmdb_id'
        else:
            raise Exception('Unknow dataset!')
        for case_id, case in self.cases.iterrows():
            start_ts = time_to_ts(case['st_time'])
            end_ts = time_to_ts(case['ed_time'])
            df = self.tu.data_process(start_ts, end_ts) 
            chosen_df = df[df['real_duration']>self.config['threshold']]
            self.res[case_id].update(list(chosen_df[node].unique()))
            self.res[case_id].update(list(chosen_df[f'c{node}'].unique()))
            if 0 in self.res[case_id]:
                self.res[case_id].remove(0)
            if np.nan in self.res[case_id]:
                self.res[case_id].remove(np.nan)
        end_time = time.time()
        self.time_used = end_time - start_time
    
    def save_res(self, savepath):
        for key in self.res:
            self.res[key] = list(self.res[key])
        with open(savepath, 'w') as f:
            json.dump(self.res, f)
        print(f'{self.id} Time used: ', self.time_used)
        print('Save successfully!')
    
    def run(self):
        self.analysis()
        self.save_res(f'{self.dataset}/trace/{self.dataset}_trace_{self.id}.json')
        with open(f'{self.dataset}/trace/time_used_{self.id}', 'w') as f:
            f.write(f'{self.time_used}')

In [None]:
# 配置
trace_path = '/home/u2120210568/jupyterfiles/jinwa/unirca/new_trace/'
label_path = '/home/u2120210568/multi_rca/case/gaia_resplit.csv' # 故障case的路径
demo_labels = pd.read_csv(label_path, index_col=0)
demo_labels = demo_labels[demo_labels['data_type']=='test']
processes = []
for pid in range(20):
    processes.append(TraceAnalysis(demo_labels, trace_path, pid, 47))
    processes[-1].start()
    print(f'process {pid} starts...')
for pid in range(20):
    processes[pid].join()
    print(f'process {pid} ends.')

In [None]:
# 21挑战赛 配置
trace_path = '/home/u2120210568/jupyterfiles/linzihan/multi_rca/new/2021aiops/process_row_trace'
label_path = '/home/u2120210568/multi_rca/case/21aiops_resplit.csv'
demo_labels = pd.read_csv(label_path, index_col=0)
processes = []
for pid in range(10):
    processes.append(TraceAnalysis(demo_labels[demo_labels['data_type']=='test'], 
                                   trace_path, pid, 8, '21aiops'))
    processes[-1].start()
    print(f'process {pid} starts...')
for pid in range(10):
    processes[pid].join()
    print(f'process {pid} ends.')

In [None]:
demo_labels

In [None]:
from multiprocessing import Process
import json
from datetime import datetime
import pytz
import time
import os
import pandas as pd
import numpy as np

tz = pytz.timezone('Asia/Shanghai')
def ts_to_date(timestamp):
    return datetime.fromtimestamp(timestamp//1000, tz).strftime('%Y-%m-%d')

def time_to_ts(ctime):
    try:
        # 先尝试解析包含小时和分钟的格式
        timeArray = time.strptime(ctime, '%Y/%m/%d %H:%M')
    except ValueError:
        try:
            # 如果失败，尝试只有日期的格式
            timeArray = time.strptime(ctime, '%Y/%m/%d')
        except ValueError:
            # 如果仍然失败，记录错误或返回一个默认值
            print(f"Failed to parse date: {ctime}")
            return None
    return int(time.mktime(timeArray))*1000

class TraceUtils:
    def __init__(self, data_dir, dataset='gaia'):
        self.data_dir = data_dir
        self.dataset = dataset
    
    def get_trace_by_day(self, day: int):
        day = f'0{day}' if day < 10 else str(day)
        temp = []
        for service in os.listdir(self.data_dir):
            filepath = os.path.join(self.data_dir, 
                                                 f'{service}/trace_{service}_2021-07-{day}.csv')
            if not os.path.exists(filepath):
                continue
            temp.append(pd.read_csv(filepath, index_col=0))
        data = pd.concat(temp, ignore_index=True)
        data['timestamp'] = data['timestamp'].apply(time_to_ts)
        return data

    def get_trace_by_ts(self, ts):
        date = ''.join(ts_to_date(ts).split('-')[1: ])
        data = pd.read_csv(os.path.join(self.data_dir, f'trace_{date}.csv')) # 21aiops
        return data
    
    # lagency/duration需要减去子调用的最大值
    def data_process(self, start_ts, end_ts):
        day = int(ts_to_date(start_ts).split('-')[-1])
        node = 'service_name' if self.dataset == 'gaia' else 'cmdb_id'
        if self.dataset == 'gaia':
            node = 'service_name'
            column = 'lagency'
            df = self.get_trace_by_day(day)
            if (start_ts is not None) and (end_ts is not None):
                sub_df = df[(df['timestamp']>=start_ts)&(df['timestamp']<end_ts)]
            else:
                sub_df = df
            csub_df = sub_df.groupby(['trace_id', 'parent_id'], as_index=False).max()
            csub_df = csub_df[[
                    'parent_id', node, column
                ]].rename(columns={'parent_id': 'span_id', node: f'c{node}', column: f'c{column}'})
            data = pd.merge(sub_df, csub_df, on='span_id', how='left').fillna(0)
            data['real_duration'] = data[column] - data[f'c{column}']
        elif self.dataset == '21aiops':
            df = self.get_trace_by_ts(start_ts)
            df['timestamp'] = df['timestamp'] if df['timestamp'].values[0] >= 1e12 else df['timestamp']*1000
            data = df[(df['timestamp']>=start_ts)&(df['timestamp']<end_ts)]
        elif self.dataset == '22aiops':
            node = 'cmdb_id'
            column = 'duration'
            df = pd.read_csv('/Users/fengxiaoyu/Desktop/PDiagnose/初赛评分数据/trace.csv')
            if (start_ts is not None) and (end_ts is not None):
                sub_df = df[(df['timestamp']>=start_ts)&(df['timestamp']<end_ts)]
            else:
                sub_df = df
            csub_df = sub_df.groupby(['span_id', 'parent_span'], as_index=False).max()
            csub_df = csub_df[[
                    'parent_span', node, column
                ]].rename(columns={'parent_span': 'span_id', node: f'c{node}', column: f'c{column}'})
            data = pd.merge(sub_df, csub_df, on='span_id', how='left').fillna(0)
            data['real_duration'] = data[column] - data[f'c{column}']
        else:
            raise Exception('Unknow dataset!')
        return data
    
class TraceAnalysis():
    def __init__(self, cases, data_dir, pid, load, dataset='gaia', config=None):
        super().__init__()
        self.id = pid
        self.cases = cases.iloc[pid*load: (pid+1)*load]
        self.tu = TraceUtils(data_dir, dataset)
        self.config = config
        self.dataset = dataset
        if config is None:
            self.config = {}
            self.config['threshold'] = 50 # 认为异常的duration阈值
            self.config['minute'] = 60000
        self.res = dict(zip(list(self.cases.index), [set() for _ in range(len(self.cases))])) # 报告可疑微服务
        self.time_used = None
    
    def analysis(self):
        start_time = time.time()
        if self.dataset == 'gaia':
            node = 'service_name'
        elif self.dataset == '21aiops':
            node = 'cmdb_id'
        elif self.dataset == '22aiops':
            node = 'cmdb_id'
        else:
            raise Exception('Unknow dataset!')
        for case_id, case in self.cases.iterrows():
            start_ts = time_to_ts(case['start'])
            end_ts = time_to_ts(case['end'])
            df = self.tu.data_process(start_ts, end_ts) 
            chosen_df = df[df['real_duration']>self.config['threshold']]
            self.res[case_id].update(list(chosen_df[node].unique()))
            self.res[case_id].update(list(chosen_df[f'c{node}'].unique()))
            if 0 in self.res[case_id]:
                self.res[case_id].remove(0)
            if np.nan in self.res[case_id]:
                self.res[case_id].remove(np.nan)
        end_time = time.time()
        self.time_used = end_time - start_time
    
    def save_res(self, savepath):
        for key in self.res:
            self.res[key] = list(self.res[key])
        with open(savepath, 'w') as f:
            json.dump(self.res, f)
        print(f'{self.id} Time used: ', self.time_used)
        print('Save successfully!')
    
    def run(self):
        self.analysis()
        self.save_res(f'{self.dataset}/trace/{self.dataset}_trace_{self.id}.json')
        with open(f'{self.dataset}/trace/time_used_{self.id}', 'w') as f:
            f.write(f'{self.time_used}')

# 22挑战赛 配置
trace_path = '/Users/fengxiaoyu/Desktop/PDiagnose/初赛评分数据/trace.csv'
label_path = '/Users/fengxiaoyu/Desktop/PDiagnose/22AIOps_run_table.csv'
demo_labels = pd.read_csv(label_path, index_col=0)
processes = []
# 只创建一个实例
pid = 0
load = len(demo_labels)
analyzer = TraceAnalysis(demo_labels[(demo_labels['type'] == 'test') & (demo_labels['level'] == 'pod')], trace_path, pid, load, '22aiops')
analyzer.analysis()
analyzer.save_res(f'{analyzer.dataset}/trace/{analyzer.dataset}_trace_{analyzer.id}.json')

In [9]:
from multiprocessing import Process
import json
from datetime import datetime
import pytz
import time
import os
import pandas as pd
import numpy as np

tz = pytz.timezone('Asia/Shanghai')
def ts_to_date(timestamp):
    return datetime.fromtimestamp(timestamp//1000, tz).strftime('%Y-%m-%d')

def time_to_ts(ctime):
    try:
        # 先尝试解析包含小时和分钟的格式
        timeArray = time.strptime(ctime, '%Y/%m/%d %H:%M')
    except ValueError:
        try:
            # 如果失败，尝试只有日期的格式
            timeArray = time.strptime(ctime, '%Y/%m/%d')
        except ValueError:
            # 如果仍然失败，记录错误或返回一个默认值
            print(f"Failed to parse date: {ctime}")
            return None
    return int(time.mktime(timeArray))*1000

class TraceUtils:
    def __init__(self, data_dir, dataset='gaia'):
        self.data_dir = data_dir
        self.dataset = dataset
    
    def get_trace_by_day(self, day: int):
        day = f'0{day}' if day < 10 else str(day)
        temp = []
        for service in os.listdir(self.data_dir):
            filepath = os.path.join(self.data_dir, 
                                                 f'{service}/trace_{service}_2021-07-{day}.csv')
            if not os.path.exists(filepath):
                continue
            temp.append(pd.read_csv(filepath, index_col=0))
        data = pd.concat(temp, ignore_index=True)
        data['timestamp'] = data['timestamp'].apply(time_to_ts)
        return data

    def get_trace_by_ts(self, ts):
        date = ''.join(ts_to_date(ts).split('-')[1: ])
        data = pd.read_csv(os.path.join(self.data_dir, f'trace_{date}.csv')) # 21aiops
        return data
    
    # lagency/duration需要减去子调用的最大值
    def data_process(self, start_ts, end_ts):
        # day = int(ts_to_date(start_ts).split('-')[-1])
        # node = 'service_name' if self.dataset == 'gaia' else 'cmdb_id'
        if self.dataset == 'gaia':
            node = 'service_name'
            column = 'lagency'
            df = self.get_trace_by_day(day)
            if (start_ts is not None) and (end_ts is not None):
                sub_df = df[(df['timestamp']>=start_ts)&(df['timestamp']<end_ts)]
            else:
                sub_df = df
            csub_df = sub_df.groupby(['trace_id', 'parent_id'], as_index=False).max()
            csub_df = csub_df[[
                    'parent_id', node, column
                ]].rename(columns={'parent_id': 'span_id', node: f'c{node}', column: f'c{column}'})
            data = pd.merge(sub_df, csub_df, on='span_id', how='left').fillna(0)
            data['real_duration'] = data[column] - data[f'c{column}']
        elif self.dataset == '21aiops':
            df = self.get_trace_by_ts(start_ts)
            df['timestamp'] = df['timestamp'] if df['timestamp'].values[0] >= 1e12 else df['timestamp']*1000
            data = df[(df['timestamp']>=start_ts)&(df['timestamp']<end_ts)]
        elif self.dataset == '22aiops':
            node = 'cmdb_id'
            column = 'duration'
            df = pd.read_csv('/Users/fengxiaoyu/Desktop/PDiagnose/初赛评分数据/trace.csv')
            if (start_ts is not None) and (end_ts is not None):
                sub_df = df[(df['timestamp']>=start_ts)&(df['timestamp']<end_ts)]
            else:
                sub_df = df
            csub_df = sub_df.groupby(['span_id', 'parent_span'], as_index=False).max()
            csub_df = csub_df[[
                    'parent_span', node, column
                ]].rename(columns={'parent_span': 'span_id', node: f'c{node}', column: f'c{column}'})
            data = pd.merge(sub_df, csub_df, on='span_id', how='left').fillna(0)
            data['real_duration'] = data[column] - data[f'c{column}']
        elif self.dataset == 'platform':
            node = 'cmdb_id'
            column = 'duration'
            df = pd.read_csv('/Users/fengxiaoyu/Desktop/PDiagnose/平台数据集/trace/trace.csv')
            if (start_ts is not None) and (end_ts is not None):
                sub_df = df[(df['timestamp']>=start_ts)&(df['timestamp']<end_ts)]
            else:
                sub_df = df
            print(df.shape)
            csub_df = sub_df.groupby(['span_id', 'parent_span'], as_index=False).max()
            csub_df = csub_df[[
                    'parent_span', node, column
                ]].rename(columns={'parent_span': 'span_id', node: f'c{node}', column: f'c{column}'})
            data = pd.merge(sub_df, csub_df, on='span_id', how='left').fillna(0)
            data['real_duration'] = data[column] - data[f'c{column}']
        else:
            raise Exception('Unknow dataset!')
        return data
    
class TraceAnalysis():
    def __init__(self, cases, data_dir, pid, load, dataset='gaia', config=None):
        super().__init__()
        self.id = pid
        self.cases = cases.iloc[pid*load: (pid+1)*load]
        self.tu = TraceUtils(data_dir, dataset)
        self.config = config
        self.dataset = dataset
        if config is None:
            self.config = {}
            self.config['threshold'] = 50 # 认为异常的duration阈值
            self.config['minute'] = 60000
        self.res = dict(zip(list(self.cases.index), [set() for _ in range(len(self.cases))])) # 报告可疑微服务
        self.time_used = None
    
    def analysis(self):
        start_time = time.time()
        if self.dataset == 'gaia':
            node = 'service_name'
        elif self.dataset == '21aiops':
            node = 'cmdb_id'
        elif self.dataset == '22aiops':
            node = 'cmdb_id'
        elif self.dataset == 'platform':
            node = 'cmdb_id'
        else:
            raise Exception('Unknow dataset!')
        for case_id, case in self.cases.iterrows():
            start_ts = int(case['st_time'])*1000000
            end_ts = int(case['ed_time'])*1000000
            print(start_ts)
            df = self.tu.data_process(start_ts, end_ts) 
            chosen_df = df[df['real_duration']>self.config['threshold']]
            self.res[case_id].update(list(chosen_df[node].unique()))
            self.res[case_id].update(list(chosen_df[f'c{node}'].unique()))
            if 0 in self.res[case_id]:
                self.res[case_id].remove(0)
            if np.nan in self.res[case_id]:
                self.res[case_id].remove(np.nan)
        end_time = time.time()
        self.time_used = end_time - start_time
    
    def save_res(self, savepath):
        for key in self.res:
            self.res[key] = list(self.res[key])
        with open(savepath, 'w') as f:
            json.dump(self.res, f)
        print(f'{self.id} Time used: ', self.time_used)
        print('Save successfully!')
    
    def run(self):
        self.analysis()
        self.save_res(f'{self.dataset}/trace/{self.dataset}_trace_{self.id}.json')
        with open(f'{self.dataset}/trace/time_used_{self.id}', 'w') as f:
            f.write(f'{self.time_used}')

# 22挑战赛 配置
trace_path = '/Users/fengxiaoyu/Desktop/PDiagnose/平台数据集/trace/trace.csv'
label_path = '/Users/fengxiaoyu/Desktop/PDiagnose/run_table.csv'
demo_labels = pd.read_csv(label_path, index_col=0)
processes = []
# 只创建一个实例
pid = 0
load = len(demo_labels[(demo_labels['data_type'] == 'test')])
analyzer = TraceAnalysis(demo_labels[(demo_labels['data_type'] == 'test')], trace_path, pid, load, 'platform')
analyzer.analysis()
analyzer.save_res(f'{analyzer.dataset}/trace/{analyzer.dataset}_trace_{analyzer.id}.json')

1711079840000000
(39179309, 11)
1711079920000000
(39179309, 11)
1711081240000000
(39179309, 11)
1711097700000000
(39179309, 11)
1711099140000000
(39179309, 11)
1711109040000000
(39179309, 11)
1711110330000000
(39179309, 11)
1711115030000000
(39179309, 11)
1711116250000000
(39179309, 11)
1711119050000000
(39179309, 11)
1711120370000000
(39179309, 11)
1711126070000000
(39179309, 11)
1711129070000000
(39179309, 11)
1711132860000000
(39179309, 11)
1711134100000000
(39179309, 11)
1711137100000000
(39179309, 11)
1711138420000000
(39179309, 11)
1711141220000000
(39179309, 11)
1711142360000000
(39179309, 11)
1711143350000000
(39179309, 11)
1711144690000000
(39179309, 11)
1711150490000000
(39179309, 11)
1711156490000000
(39179309, 11)
1711160530000000
(39179309, 11)
1711161620000000
(39179309, 11)
1711167220000000
(39179309, 11)
1711168710000000
(39179309, 11)
1711171710000000
(39179309, 11)
1711174510000000
(39179309, 11)
1711177310000000
(39179309, 11)
1711180310000000
(39179309, 11)
17111831