In [None]:
import sys
import os
import json
import copy
import re

class Span:
    def __init__(self, start_time, end_time, tgid, pid, protocol, component_name, content, direction):
        self.start_time = start_time
        self.end_time = end_time
        self.tgid = tgid
        self.pid = pid
        self.protocol = protocol
        self.component_name = component_name
        self.content = content
        self.direction = direction
        self.trace_id = self.parse_traceid()
        self.duration = end_time - start_time
        self.span_id = None

    def parse_traceid(self):
        """
        解析 trace_id
        """
        if self.protocol == "Thrift":
            if 'uber-trace-id' not in self.content:
                return
            parts = self.content.split("uber-trace-id")[-1]
            parts = parts.split(":")
            return parts[0].strip()
        if self.protocol == "HTTP1":
            if 'X-Request-ID' not in self.content:
                return
            
            pattern = r"X-Request-ID:\s*([a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12})"
            match = re.search(pattern, self.content)
            if match:
                request_id = match.group(1)
                return request_id
    def __str__(self):
        """
        返回 Span 的字符串表示
        """
        return (f"Span(start_time={self.start_time}, end_time={self.end_time}, "
                f"tgid={self.tgid}, pid={self.pid}, protocol={self.protocol}, component_name={self.component_name}), "
                f"trace_id={self.trace_id}, duration={self.duration}, direction={self.direction}) ")




def parse_json(file_path):

    # print(f"Parsing JSON file: {file_path}")
    with open(file_path, "r", encoding="utf-8") as file:
        data = json.load(file)

    # 获取 spans 列表
    spans = data.get("spans", [])

    span_map = {} # component:{incoming:[], outgoing:[]}

    # 遍历并打印每个 span 的请求和响应信息
    count = 0
    for span in spans:
        count += 1
        # if count > 10000:
        #     break
        req = span.get("req", {})
        resp = span.get("resp", {})
        if req['protocol'] not in ['Thrift', 'HTTP1']:
            continue
        start_time = req.get("timestamp_ns")
        end_time = resp.get("timestamp_ns")
        tgid = req.get("tgid")
        pid = req.get("pid")
        protocol = req.get("protocol")
        component_name = req.get("comm").split("\\")[0]
        content = req.get("payload")
        dstip = req['quintuple']['dst_addr']
        direction = req.get("direction")
        span = Span(start_time, end_time, tgid, pid, protocol, component_name, content, direction)
        if tgid not in span_map:
            span_map[tgid] = {"incoming": {}, "outgoing": {}}
        if req.get("direction") == "Ingress":
            if dstip not in span_map[tgid]["incoming"]:
                span_map[tgid]["incoming"][dstip] = []
            span_map[tgid]["incoming"][dstip].append(span)
        elif req.get("direction") == "Egress":
            if dstip not in span_map[tgid]["outgoing"]:
                span_map[tgid]["outgoing"][dstip] = []
            span_map[tgid]["outgoing"][dstip].append(span)
        else:
            print(f"Unknown direction: {req.get('direction')}")
            continue
    return span_map
    

for rps in [100]:
    # file_path = f"/home/ubuntu/gyt-file/DeepTrace/tests/output/rps{rps}-spans.json"
    file_path = f"/home/ubuntu/gyt-file/DeepTrace/tests/output/spans.json"
    spans = parse_json(file_path)

    for tgid, span_map  in spans.items():
        if len(span_map['incoming']) == 0 or len(span_map['outgoing']) == 0:
            continue
        incmoing_span_traceids = {}

        all_spans = []
        for direction, span_list in span_map.items():
            for ip in span_list:
                if direction == "incoming":
                    for span in span_list[ip]:
                        if span.trace_id not in incmoing_span_traceids:
                            incmoing_span_traceids[span.trace_id] = 0
                if direction == "outgoing":
                    spans_temp = []
                    for span in span_list[ip]:
                        if span.trace_id in incmoing_span_traceids:
                            spans_temp.append(span)
                    span_list[ip] = spans_temp
                print(f'tgid: {tgid} direction: {direction} ip: {ip} count: {len(span_list[ip])}') 

                span_list[ip] = sorted(span_list[ip], key=lambda x: x.start_time)
                for span in span_list[ip]:
                    span.span_id = span_list[ip].index(span)
                all_spans.extend(span_list[ip])
        all_spans = sorted(all_spans, key=lambda x: x.start_time)
        timestamps = [span.start_time for span in all_spans] + [span.end_time for span in all_spans]
        timestamps = sorted(timestamps)
        for span in all_spans:
            span.start_time = timestamps.index(span.start_time)
            span.end_time = timestamps.index(span.end_time)
        for span in all_spans:
            print(span.direction, span.start_time, span.end_time, span.trace_id, span.span_id)
    # processed_spans = fifo(copy.deepcopy(spans))
    # service_acc1 = service_acc(processed_spans)
    # e2e_acc1 = e2e_acc(processed_spans)
    # print("-" * 50)
    # print(f"RPS: {rps}")
    # print("Service Accuracy:")
    # print("-" * 30)
    # for tgid, acc in service_acc1.items():
    #     print(f" TGID: {tgid:<10} | Accuracy: {acc:.2f}")
    # print("-" * 30)
    # print("End-to-End Accuracy:")
    # print(f" {e2e_acc1:.2f}")

In [None]:

import sys
import os
# 获取当前脚本所在目录
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
UTILS_DIR = os.path.join(BASE_DIR, "..", "..", "association", "src")
sys.path.append(os.path.normpath(UTILS_DIR))

from utils import Span, read_all_spans, service_acc, e2e_acc
from fifo import fifo
import copy


if __name__ == "__main__":

    # for rps in [100, 200, 300, 400, 500]:
    for rps in [100]:
        index_name = f"rps-{rps}-spans"
        spans = read_all_spans(index_name)

        with open(f"spans-{rps}.json", "w") as f:
            json.dump(spans, f, indent=4)
        # print(spans)
      

In [None]:
from elasticsearch import Elasticsearch

class Span:
    def __init__(self, span_json):
        self.start_time = span_json.get("start_time")
        self.end_time = span_json.get("end_time")
        self.tgid = span_json.get("tgid")
        self.pid = span_json.get("pid")
        self.protocol = span_json.get("protocol")
        self.component_name = span_json.get("component_name")
        self.req_content = span_json.get("req_content")
        self.resp_content = span_json.get("resp_content")
        self.trace_id = span_json.get("trace_id")
        self.direction = span_json.get("direction")
        self.association_tracid = None
        self.src_ip = span_json.get("src_ip")
        self.dst_ip = span_json.get("dst_ip")
        self.src_port = span_json.get("src_port")
        self.dst_port = span_json.get("dst_port")
        self.direction = span_json.get("direction")
        self.duration = self.end_time - self.start_time

    def parse_traceid(self):
        """
        解析 trace_id
        """
        if self.protocol == "Thrift":
            if 'uber-trace-id' not in self.content:
                return
            parts = self.content.split("uber-trace-id")[-1]
            parts = parts.split(":")
            return parts[0].strip()
        if self.protocol == "HTTP1":
            if 'X-Request-ID' not in self.content:
                return
            
            pattern = r"X-Request-ID:\s*([a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12})"
            match = re.search(pattern, self.content)
            if match:
                request_id = match.group(1)
                return request_id
    def __str__(self):
        """
        返回 Span 的字符串表示
        """
        return (f"Span(start_time={self.start_time}, end_time={self.end_time}, "
                f"tgid={self.tgid}, pid={self.pid}, protocol={self.protocol}, component_name={self.component_name}), "
                f"trace_id={self.trace_id}, duration={self.duration}, direction={self.direction}) ")



def read_all_spans(index_name):
    """
    从 Elasticsearch 中读取指定索引的所有 span 数据
    """
    # 连接到 Elasticsearch
    es = Elasticsearch(hosts=["http://127.0.0.1:9200"])

    # 查询所有文档
    query = {
        "query": {
            "match_all": {}  # 匹配所有文档
        },
        "size": 10000  # 设置返回的文档数量
    }

    # 执行查询
    response = es.search(index=index_name, body=query)

    # 提取 span 数据
    spans = []
    for hit in response["hits"]["hits"]:
        spans.append(hit["_source"])  # _source 字段包含实际的文档数据
    span_map = {}
    for span in spans:
    
        span_obj = Span(span)
        if span_obj.protocol not in ['Thrift', 'HTTP1']:
            continue
        
        print(span_obj)
        if span_obj.tgid not in span_map:
            span_map[span_obj.tgid] = {"incoming": {}, "outgoing": {}}
        if span_obj.direction == "Ingress":
            if span_obj.dst_ip not in span_map[span_obj.tgid]["incoming"]:
                span_map[span_obj.tgid]["incoming"][span_obj.dst_ip] = []
            span_map[span_obj.tgid]["incoming"][span_obj.dst_ip].append(span_obj)
        elif span_obj.direction == "Egress":
            if span_obj.dst_ip not in span_map[span_obj.tgid]["outgoing"]:
                span_map[span_obj.tgid]["outgoing"][span_obj.dst_ip] = []
            span_map[span_obj.tgid]["outgoing"][span_obj.dst_ip].append(span_obj)
        else:
            continue
    return span_map

rps = 100
span_map = read_all_spans(f"rps-{rps}-spans")



In [9]:
import json
import re

def parse_traceid(content):
    pattern = r"[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}"
    trace_id = re.findall(pattern, content)
    return trace_id[0] if trace_id else None

rps = 500
file_path = f"/home/ubuntu/gyt-file/DeepTrace/tests/output/rps{rps}-spans.json"
    # print(f"Parsing JSON file: {file_path}")
with open(file_path, "r", encoding="utf-8") as file:
    data = json.load(file)

# 获取 spans 列表
spans = data.get("spans", [])
data = {}
for span in spans:
    # 获取 span 的属性
    req_tracied = parse_traceid(span['req']['payload'])
    resp_tracied = parse_traceid(span['resp']['payload'])
    

    tgid = span['req']['tgid']
    src_ip = span['req']['quintuple']['src_addr']
    dst_ip = span['req']['quintuple']['dst_addr']
    src_port = span['req']['quintuple']['src_port']
    dst_port = span['req']['quintuple']['dst_port']
    five_tuple = f"{src_ip}:{src_port} -> {dst_ip}:{dst_port}"
    if req_tracied != resp_tracied:
        # print(span['req']['payload'], span['resp']['payload'])
        print(f"Trace ID mismatch: {req_tracied} != {resp_tracied} {tgid} {five_tuple}")
    if tgid not in data:
        data[tgid] = {}
    if five_tuple not in data[tgid]:
        data[tgid][five_tuple] = []
    span['req']['type'] = 'req'
    span['resp']['type'] = 'resp'
    data[tgid][five_tuple].append(span['req'])
    data[tgid][five_tuple].append(span['resp'])

for tgid, five_tuple_map in data.items():
    for five_tuple, req_resps in five_tuple_map.items():
        req_resps = sorted(req_resps, key=lambda x: x['timestamp_ns'])
        print(f"TGID: {tgid}, Five Tuple: {five_tuple}")
        last_type = None
        for req_resp in req_resps:
            if last_type == req_resp['type']:
                print(f"{req_resp['type']}: {parse_traceid(req_resp['payload'])} {req_resp['timestamp_ns']} {'error *'*20}")
            else:
                print(f"{req_resp['type']}: {parse_traceid(req_resp['payload'])} {req_resp['timestamp_ns']}")
            last_type = req_resp['type']
        print("-" * 50)



Trace ID mismatch: e66cced8-e756-4a83-b605-79dc524f9120 != d864df86-de5d-43b4-9101-913816be1592 3056354 3232235524:40568 -> 3232235522:5000
Trace ID mismatch: d864df86-de5d-43b4-9101-913816be1592 != 5e78e34b-0056-4506-8d53-b56652398681 3056354 3232235524:40568 -> 3232235522:5000
Trace ID mismatch: 5e78e34b-0056-4506-8d53-b56652398681 != be27c398-1e3f-4629-8fc3-07032c36e414 3056354 3232235524:40568 -> 3232235522:5000
Trace ID mismatch: be27c398-1e3f-4629-8fc3-07032c36e414 != b9c44778-dc31-4f0e-aead-9fab6c8802ca 3056354 3232235524:40568 -> 3232235522:5000
Trace ID mismatch: b9c44778-dc31-4f0e-aead-9fab6c8802ca != c7b69eae-d162-41a1-b657-1fa22e4545c8 3056354 3232235524:40568 -> 3232235522:5000
Trace ID mismatch: c7b69eae-d162-41a1-b657-1fa22e4545c8 != 8b5a2aad-4ae6-44c6-9376-860febb02648 3056354 3232235524:40568 -> 3232235522:5000
Trace ID mismatch: ac1f1519-8450-46fb-a785-b1025420989d != 58805d66-fb8c-49d8-916c-0f4f4071ef5a 3056354 3232235524:40452 -> 3232235522:5000
Trace ID mismatch: 8