In [40]:
MONGO_URI = 'mongodb://admin:password@localhost:27017/'
MONGO_DB_NAME = 'chaos_star_bench_data'
TRACES_COLLECTION_NAME = 'traces'
EVENTS_COLLECTION_NAME = 'events'

In [41]:
from pymongo import MongoClient, ASCENDING

client = MongoClient(MONGO_URI)
db = client[MONGO_DB_NAME]
traces_collection = db[TRACES_COLLECTION_NAME]

In [42]:
events_collection = db[EVENTS_COLLECTION_NAME]
events_collection.create_index([("timestamp", ASCENDING)])

'timestamp_1'

In [43]:
cursor = traces_collection.find({}, {"_id": 0}).batch_size(1000)
for document in cursor:
    trace = document['entries']
    print(trace)
    break

[{'traceID': '5f4e18fb31de0988', 'spanID': '52df398c46c836e3', 'flags': 1, 'operationName': 'compose_unique_id_server', 'references': [{'refType': 'CHILD_OF', 'traceID': '5f4e18fb31de0988', 'spanID': '67ef910725e76d43'}], 'startTime': 1732931863777001, 'startTimeMillis': 1732931863777, 'duration': 23, 'tags': [{'key': 'internal.span.format', 'type': 'string', 'value': 'proto'}], 'logs': [], 'process': {'serviceName': 'unique-id-service', 'tags': [{'key': 'jaeger.version', 'type': 'string', 'value': 'C++-0.5.0'}, {'key': 'hostname', 'type': 'string', 'value': 'unique-id-service'}, {'key': 'ip', 'type': 'string', 'value': '10.244.3.9'}]}}, {'traceID': '5f4e18fb31de0988', 'spanID': '97734e9fdac3f804', 'flags': 1, 'operationName': 'get_followers_client', 'references': [{'refType': 'CHILD_OF', 'traceID': '5f4e18fb31de0988', 'spanID': 'f1a0bccf52ba3dbc'}], 'startTime': 1732931863781257, 'startTimeMillis': 1732931863781, 'duration': 1302, 'tags': [{'key': 'internal.span.format', 'type': 'stri

In [44]:
class TraceEntry:
    def __init__(self, trace_entry):
        self.trace_id = trace_entry['traceID']
        self.span_id = trace_entry['spanID']
        self.operation_name = trace_entry['operationName']
        self.service_name = trace_entry['process']['serviceName']
        
        ip_entries = [
            tag_entry for tag_entry in trace_entry['process']['tags'] 
            if tag_entry['key'] == 'ip'
        ]
        ip_entry = ip_entries[0] if len(ip_entries) > 0 else {}
        self.ip = ip_entry.get('value', None)
        
        self.call_time = trace_entry['startTimeMillis']
        self.duration = trace_entry['duration']
        self.parent = None
        
    def set_parent(self, parent):
        self.parent = parent

In [45]:
num_missing_parent_spans = 0
num_incomplete_traces = 0

In [46]:
def convert_trace_to_entries(trace_arr):
    global num_missing_parent_spans
    global num_incomplete_traces
    
    missing_spans_exist = False
    
    trace_index = {}
    
    for entry in trace_arr:
        trace_obj = TraceEntry(entry)
        trace_index[trace_obj.span_id] = trace_obj
    
    for entry in trace_arr:
        references = entry['references']
        span_id = entry['spanID']
        parent_references = [
            reference for reference in references 
            if reference['refType'] == 'CHILD_OF'
        ]
        parent_reference = parent_references[0] if len(parent_references) > 0 else None
        parent_span_id = parent_reference['spanID'] if parent_reference else None
        if parent_span_id is not None:
            try:
                trace_index[span_id].set_parent(trace_index[parent_span_id])
            except KeyError as _:
                num_missing_parent_spans += 1
                missing_spans_exist = True
                continue
    
    if missing_spans_exist:        
        num_incomplete_traces += 1
        
    return trace_index.values()

In [47]:
class TraceLogEntry:
    def __init__(self, trace_entry):
        self.trace_id = trace_entry.trace_id
        self.span_id = trace_entry.span_id
        
        self.upstream = trace_entry.parent.service_name
        self.downstream = trace_entry.service_name
        
        self.upstream_ip = trace_entry.parent.ip
        self.downstream_ip = trace_entry.ip
        
        self.operation_name = trace_entry.operation_name
        self.timestamp = trace_entry.call_time
        self.duration = trace_entry.duration
        
    def __str__(self):
        return (
            f"TraceLogEntry(\n"
            f"  Trace ID: {self.trace_id}\n"
            f"  Span ID: {self.span_id}\n"
            f"  Upstream: {self.upstream} (IP: {self.upstream_ip})\n"
            f"  Downstream: {self.downstream} (IP: {self.downstream_ip})\n"
            f"  Operation: {self.operation_name}\n"
            f"  Timestamp: {self.timestamp}\n"
            f"  Duration: {self.duration}ms\n"
            f")"
        )
    
    def to_dict(self):
        return {
            'trace_id': self.trace_id,
            'span_id': self.span_id,
            'upstream': self.upstream,
            'downstream': self.downstream,
            'upstream_ip': self.upstream_ip,
            'downstream_ip': self.downstream_ip,
            'operation_name': self.operation_name,
            'timestamp': self.timestamp,
            'duration': self.duration
        }

In [48]:
def convert_trace_to_logs(trace):
    trace_logs = []
    trace_entries = convert_trace_to_entries(trace)
    for trace_entry in trace_entries:
        if trace_entry.parent is not None:
            trace_logs.append(TraceLogEntry(trace_entry).to_dict())
            
    return trace_logs

In [49]:
trace_logs = convert_trace_to_logs(trace)

In [50]:
print(trace_logs[0])

{'trace_id': '5f4e18fb31de0988', 'span_id': '52df398c46c836e3', 'upstream': 'compose-post-service', 'downstream': 'unique-id-service', 'upstream_ip': '10.244.0.8', 'downstream_ip': '10.244.3.9', 'operation_name': 'compose_unique_id_server', 'timestamp': 1732931863777, 'duration': 23}


In [51]:
cursor = traces_collection.find({}, {"_id": 0}).batch_size(1000)

batch_size = 100_000
batch = []

for document in cursor:
    trace = document['entries']
    trace_logs = convert_trace_to_logs(trace)
    
    batch += trace_logs
    
    if len(batch) >= batch_size:
        events_collection.insert_many(batch[:batch_size])
        print(f"Inserted {batch_size} documents.")
        batch = batch[batch_size:]
        
if len(batch) > 0:
    events_collection.insert_many(batch)
    print(f"Inserted {len(batch)} documents.")
    
print("All traces processed and inserted.")

Inserted 100000 documents.
Inserted 100000 documents.
Inserted 100000 documents.
Inserted 100000 documents.
Inserted 100000 documents.
Inserted 100000 documents.
Inserted 100000 documents.
Inserted 100000 documents.
Inserted 100000 documents.
Inserted 100000 documents.
Inserted 100000 documents.
Inserted 100000 documents.
Inserted 100000 documents.
Inserted 100000 documents.
Inserted 100000 documents.
Inserted 100000 documents.
Inserted 100000 documents.
Inserted 100000 documents.
Inserted 100000 documents.
Inserted 100000 documents.
Inserted 100000 documents.
Inserted 100000 documents.
Inserted 100000 documents.
Inserted 100000 documents.
Inserted 100000 documents.
Inserted 100000 documents.
Inserted 100000 documents.
Inserted 100000 documents.
Inserted 100000 documents.
Inserted 100000 documents.
Inserted 100000 documents.
Inserted 100000 documents.
Inserted 100000 documents.
Inserted 100000 documents.
Inserted 100000 documents.
Inserted 100000 documents.
Inserted 100000 documents.
I

In [52]:
num_incomplete_traces, num_missing_parent_spans

(15816, 29554)