# OTLP Embeddings

This notebook:
- Reads OpenTelemetry traces in protobuf format into a custom JSON model
- Generates embeddings via the AzureAI API
- Stores the results in a vector database running locally (`qdrant`) 

In [22]:
ds_name = 'baseline'
#ds_name = 'live_add_brazil'
#ds_name = 'live_high_uk_latency'
#ds_name = 'live_unseen_exception_type'

dir = f'D:\source\OpenAiAlerting-Python\data\{ds_name}'
col_name = ds_name

In [23]:
# Define logic to convert proto traces to our format.

import json
from opentelemetry.proto.trace.v1 import trace_pb2

class Trace:
    def __init__(self, trace_id, root_span):
        self.trace_id = trace_id
        self.root_span = root_span

    def to_json(self):
        return json.dumps(self.root_span.to_json_dict(), default=lambda o : o.to_json_dict() if isinstance(o, Span) else o.__dict__)

class Span:
    def __init__(self, span_id, parent_span_id, operation_name, client_or_server, duration_ms, service_name, service_version, attributes, events):
        self.operation_name = operation_name
        self.span_id = span_id
        self.parent_span_id = parent_span_id
        self.client_or_server = client_or_server
        self.duration_ms = duration_ms
        self.service_name = service_name
        self.service_version = service_version
        self.attributes = attributes
        self.events = events
        self.child_spans = []

    def add_child(self, span):
        if span.parent_span_id == self.span_id:
            self.child_spans.append(span)
            return self
        elif span.span_id == self.parent_span_id:
            span.add_child(self)
            return span
        else:
            for child in self.child_spans:
                newRoot = child.add_child(span)
                if newRoot != None:
                    self.child_spans.remove(child)
                    self.child_spans.append(newRoot)
                    return self

    def to_json_dict(self):
        clone = self.__dict__.copy()
        clone.pop('span_id', None)
        clone.pop('parent_span_id', None)
        return clone
                
def convert_trace(trace):
    all_spans = []
    trace_id = ''
    
    for resource_span in trace.resource_spans:
        resource_attrs = { x.key: x.value.string_value for x in resource_span.resource.attributes}
        service_name = resource_attrs['service.name']
        service_version = resource_attrs['service.version']

        for scope_span in resource_span.scope_spans:
            for span in scope_span.spans:
                trace_id = span.trace_id.hex()
                op_name = span.name
                span_id = span.span_id.hex()
                parent_span_id = span.parent_span_id.hex()
                client_or_server = "client" if span.kind == trace_pb2.Span.SPAN_KIND_CLIENT else "server"
                duration_ms = (span.end_time_unix_nano - span.start_time_unix_nano) / 1e6
                attributes = { x.key: x.value.string_value or x.value.int_value for x in span.attributes }
                events = []
                for e in span.events:
                    events.append({'type': e.name, 'attributes': {x.key: x.value.string_value or x.value.int_value for x in e.attributes }})

                all_spans.append(Span(span_id, parent_span_id, op_name, client_or_server, duration_ms, service_name, service_version, attributes, events))
    
    root_span = None

    for span in all_spans:
        if root_span is None:
            root_span = span
        else:
            root_span = root_span.add_child(span)

    return Trace(trace_id, root_span)

In [24]:
# Load protobuf files, convert to our schema, add to traces array.
import os

files = os.listdir(dir)

traces = []

for f_name in files:
    proto_trace = trace_pb2.TracesData()

    with open(os.path.join(dir, f_name), "rb") as f:
        proto_trace.ParseFromString(f.read())

    traces.append(convert_trace(proto_trace))   

In [25]:
# Get token sizes and total to calculate pricing

import tiktoken
import statistics

tokenizer = tiktoken.get_encoding("cl100k_base")

tokens = []

for t in traces:
  data = t.to_json()
  tokens.append(len(tokenizer.encode(data)))
  
print(f'total: {sum(tokens)}, min: {min(tokens)}, max: {max(tokens)}, median: {statistics.median(tokens)}, total_cost: £{round(sum(tokens) / 1000 * 0.000079, 2)}')

total: 323085, min: 567, max: 1890, median: 570.0, total_cost: £0.03


In [26]:
# Generate embeddings for each trace, then add to map[trace_id] -> embedding vector
from openai import AzureOpenAI

client = AzureOpenAI(
  api_key = "secret-key",  
  api_version = "2023-05-15",
  azure_endpoint = "https://blake.openai.azure.com/"
)

trace_embeddings = {}

for (i, trace) in enumerate(traces):
  data = trace.to_json()
  
  response = client.embeddings.create(
      input = data,
      model= "text-embedding-ada-002"
  )

  trace_embeddings[trace.trace_id] = response.data[0].embedding

  i += 1
  if i % 50 == 0:
    print(f'completed {i}/{len(traces)} traces')

completed 50/500 traces
completed 100/500 traces
completed 150/500 traces
completed 200/500 traces
completed 250/500 traces
completed 300/500 traces
completed 350/500 traces
completed 400/500 traces
completed 450/500 traces
completed 500/500 traces


In [27]:
# Create a collection in qdrant for storing the historic embeddings
from qdrant_client import QdrantClient
from qdrant_client.http.models import Distance, VectorParams

q_client = QdrantClient("localhost", port=6333)

q_client.create_collection(
    collection_name=col_name,
    vectors_config=VectorParams(size=1536, distance=Distance.COSINE),
)

True

In [28]:

# Insert the historic traces into the collection
from qdrant_client.http.models import PointStruct

def dfs_find_span(op_name, service_name):
    stack = [t.root_span]
    visited = set()
    temp = 0

    while stack:
        vertex = stack.pop()

        if vertex in visited:
            continue
        if vertex.operation_name == op_name and vertex.service_name == service_name:
            return vertex
            
        visited.add(vertex)
        stack.extend(vertex.child_spans)
    
    return None

id = 1
points = []
for t in traces:
    vector = trace_embeddings[t.trace_id]
    payload = {
        'duration': t.root_span.duration_ms,
        'http.request.method': t.root_span.attributes['http.request.method'],
        'http.response.status_code': t.root_span.attributes['http.response.status_code']
    }

    if 'user.country' in t.root_span.attributes:
        payload['country'] = t.root_span.attributes['user.country']

    forecast_svc_span = dfs_find_span('GET WeatherForecast/GetForecastForUser', 'ForecastService')

    if 'error.type' in forecast_svc_span.attributes:
        payload['error.type'] = forecast_svc_span.attributes['error.type']

    if 'forecast.temperatureC' in forecast_svc_span.attributes:
        payload['forecast.temperatureC'] = forecast_svc_span.attributes['forecast.temperatureC']

    points.append(PointStruct(id=id, vector=vector, payload=payload))
    id += 1

operation_info = q_client.upsert(
    collection_name=col_name,
    wait=True,
    points=points
)

print(operation_info)

operation_id=0 status=<UpdateStatus.COMPLETED: 'completed'>
