In [1]:
from typing import List, Dict, Any
import json
import requests
import time

Span = Dict[str,Any]
Trace = Dict[str,Any]

def build_jaeger_trace_endpoint(params : List[str]) -> str:
    return f"http://localhost:6686/api/traces?{"&".join(params)}"

def time_range_from_lookback_mins(lookback_minutes: int) -> tuple[int,int]:
    start_ms = int(time.time() - lookback_minutes*60_000_000)
    end_ms = int(time.time()*60_000_000)
    return (start_ms, end_ms)

def time_range_in_mins_ago(start_mins: int, end_mins : int) -> tuple[int,int]:
    start_us = int((time.time() - start_mins*60)*1_000_000)
    end_us = int((time.time() - end_mins*60)*1_000_000)
    return (start_us, end_us)

def build_jaeger_spans_endpoint_url(limit: int,  service : str, namespace : str, operation : str, time_range: tuple[int,int], lookback_minutes : int, tags : Dict[str,str]) -> str:
    lookback_seconds = 60*lookback_minutes
    (start_ms, end_ms) = time_range
    
    tags["service.namespace"] = namespace
    full_tags = "%2C".join([f'"{key}"%3A"{value}"' for (key,value) in tags.items()])
    limit_str = f"limit={limit}"
    lookback_str = f"lookback={lookback_seconds}s"
    service_str = f"service={service}"
    operation_str = f"operation={operation}"
    start_str = f"start={start_ms}"
    end_str = f"end={end_ms}"
    tags_str = f"tags=%7B{full_tags}%7D"
    return build_jaeger_trace_endpoint([limit_str,lookback_str,service_str,operation_str,start_str,end_str,tags_str])

def scrape_jaeger_endpoint(url : str) -> Dict[str, List[Trace]]:
    try:
        response = requests.get(url)
        response.raise_for_status()
    except requests.exceptions.HTTPError as err:
        raise err

    return json.loads(response.text)

def get_traces(limit: int, service: str, namespace: str, op_name: str, time_range: tuple[int,int], lookback_minutes: int, tags: Dict[str, str]):
    url = build_jaeger_spans_endpoint_url(limit, service, namespace, op_name, time_range, lookback_minutes, tags);
    result = scrape_jaeger_endpoint(url)
    return result["data"]

# Given a trace, find the list of spans with span['operationName'] == op_name
def get_spans_list_from_trace_by_name(trace : Trace, op_name) -> List[Span]:
    return [span for span in trace["spans"] if span['operationName'] == op_name]

# Given a span, find the first tag value with tag key == key
def get_one_value_from_span_tags_by_key(span : Span, key):
    return next(tag["value"] for tag in span["tags"] if tag['key'] == key)

# Given a trace, find the first spans with span['operationName'] == op_name
def get_one_span_from_trace_by_name(trace : Trace, op_name: str) -> Span:
    return next(span for span in trace["spans"] if span['operationName'] == op_name)

# Given a trace, find the first spans with span['operationName'] == op_name
def get_one_span_from_trace_by_name_and_code_namespace(trace : Trace, op_name: str, code_namespace: str) -> Span:
    return next(span for span in trace["spans"] if span['operationName'] == op_name and get_one_value_from_span_tags_by_key(span, 'code.namespace') == code_namespace)



In [2]:
#url = build_jaeger_spans_endpoint_url(10,"digitiser-aggregator", "pipeline_1", "Frame", 60,dict());
#url = build_jaeger_spans_endpoint_url(100,"digitiser-aggregator", "pipeline_1", "Frame", time_range_from_lookback_mins(60), 60, {"frame_is_expired": "true"});
#print(url)
#value = scrape_jaeger_endpoint(url)
#print(value)


In [3]:
from datetime import datetime

def get_time_range_us_from_timestamp(timestamp: str, secs_under: int, secs_over: int) -> tuple[int,int]:
    secs = float(datetime.strptime(timestamp.split(".")[0], "%Y-%m-%dT%H:%M:%S").strftime('%s'))
    ts_start = int((secs - secs_under)*1_000_000)
    ts_end = int((secs + secs_over)*1_000_000)
    return (ts_start, ts_end)

def make_timestamp_jaeger_suitable(timestamp):
    return timestamp.replace("+","%5C%5C%2B").replace(":", "%3A")
    

# Given a timestamp
def find_traces_by_span_metadata_timestamp(limit: int, service: str, namespace: str, lookback_minutes: int, span_name: str, timestamp: str):
    time_Range = get_time_range_us_from_timestamp(timestamp, 10000, 10000)
    timestamp = make_timestamp_jaeger_suitable(timestamp)
    return get_traces(limit, service, namespace, span_name, time_Range, lookback_minutes, {"metadata_timestamp": timestamp});

#def find_spans_in_traces_by_span_metadata_timestamp(limit: int, service: str, namespace: str, span_name: str, timestamp: str) -> List[Span]:
#    traces = find_traces_by_span_metadata_timestamp(limit, service, namespace, 60, span_name, timestamp)
#    return [get_one_span_from_trace_by_name_and_service(trace, "process_digitiser_trace_message", service) for trace in traces]


#for trace in traces:
#    num_digitisers = len(get_spans_list_from_trace_by_name(trace, "Digitiser Event List"))
#    frame_span = get_one_span_from_trace_by_name(trace, "Frame")
#    timestamp_raw = get_one_value_from_span_tags_by_key(frame_span, "metadata_timestamp")
#    
#    spans = find_spans_in_traces_by_span_metadata_timestamp(10, "trace-to-events", "pipeline_1", "process_digitiser_trace_message", timestamp_raw)
#    d_id = [get_one_value_from_span_tags_by_key(span, "digitiser_id") for span in spans]

#    print(f"{timestamp_raw}, {num_digitisers}")

In [7]:
# Tool to find incomplete frames, and track down traces with equal metadata_timestamps
lookback_minutes = 48*60
time_range = time_range_in_mins_ago(lookback_minutes, 0*60)
incomplete_frame_traces = get_traces(3000,  "digitiser-aggregator", "hifi_1", "Frame", time_range, lookback_minutes, { "frame_is_expired": "true" })
print(len(incomplete_frame_traces))

3000


In [8]:
for trace in incomplete_frame_traces:
    frame_span = get_one_span_from_trace_by_name(trace, "Frame")
    metadata_timestamp_value = get_one_value_from_span_tags_by_key(frame_span, "metadata_timestamp")
    tags = {"metadata_timestamp": make_timestamp_jaeger_suitable(metadata_timestamp_value)}

    # Exclude frames with no logs
    digitiser_eventlist_messages_traces = get_traces(20, "digitiser-aggregator", "hifi_1", "process_digitiser_event_list_message", time_range, lookback_minutes, tags)
    if all([get_one_span_from_trace_by_name(t, "process_digitiser_event_list_message")['logs'] == [] for t in digitiser_eventlist_messages_traces]):
        continue

    for t in digitiser_eventlist_messages_traces:
        process_kafka_message_span = get_spans_list_from_trace_by_name(t, "process_kafka_message", "digitiser_aggregator")
        process_digitiser_event_list_message_span = get_one_span_from_trace_by_name(t, "process_digitiser_event_list_message")
        digitiser_id = get_one_value_from_span_tags_by_key(process_digitiser_event_list_message_span, "digitiser_id")
        
        kafka_message_timestamp_ms = [get_one_value_from_span_tags_by_key(span, "kafka_message_timestamp_ms") for span in process_kafka_message_span]
        logs = process_digitiser_event_list_message_span['logs']
        print("kafka eventlist time for", digitiser_id, kafka_message_timestamp_ms, logs)
        
    '''digitiser_trace_messages_traces = get_traces(20, "trace-to-events", "hifi_1", "process_digitiser_trace_message", time_range, lookback_minutes, tags)
    for t in digitiser_trace_messages_traces:
        process_kafka_message_span_tte = get_one_span_from_trace_by_name_and_code_namespace(t, "process_kafka_message", "trace_to_events")
        process_digitiser_trace_message_span = get_one_span_from_trace_by_name(t, "process_digitiser_trace_message")
        digitiser_id = get_one_value_from_span_tags_by_key(process_digitiser_trace_message_span, "digitiser_id")

        kafka_message_timestamp_ms = get_one_value_from_span_tags_by_key(process_kafka_message_span_tte, "kafka_message_timestamp_ms")
        print("kafka trace     time for", digitiser_id, kafka_message_timestamp_ms)
        '''