In [1]:

import bisect
import json
import pandas as pd


from pyspark.sql import DataFrame, Row, SparkSession
from pyspark.sql.types import StructType, StructField, StringType, TimestampNTZType, ArrayType, DataType
from pyspark.sql.functions import from_json, to_json, udf, collect_list
from typing import Dict, Iterator, List, Union

In [2]:
span_log_schema = StructType(
        [
            StructField("trace_id", StringType(), False),
            StructField("span_id", StringType(), False),
            StructField("parent_id", StringType(), False),
            StructField("user_id", StringType(), True),
            StructField("session_id", StringType(), True),
            StructField("span_type", StringType(), False),
            StructField("start_time", TimestampNTZType(), False),
            StructField("end_time", TimestampNTZType(), False),
            StructField("name", StringType(), False),
            StructField("status", StringType(), False),
            StructField("framework", StringType(), False),
            StructField("input", StringType(), False),
            StructField("output", StringType(), False),
            StructField("attributes", StringType(), False),
            StructField("events", ArrayType(DataType()), False),
            StructField("links", ArrayType(DataType()), False),
        ]
    )

In [3]:
def init_spark() -> SparkSession:
    """Get or create spark session."""
    spark = SparkSession.builder.appName("AccessParquetFiles").getOrCreate()
    return spark

spark = init_spark()

In [4]:
df = spark.read.json("./sample_preprocessed_span.jsonl")
df.show()

+--------------------+--------------------+-------------------+--------------------+--------------------+----------------+--------------------+------------+--------------------+-----------+--------------------+--------------------+--------------------+
|               agent|     collectdatatype|       contentrange|       correlationid|                data| datacontenttype|                  id|modelversion|              source|specversion|                time|                type|          xrequestid|
+--------------------+--------------------+-------------------+--------------------+--------------------+----------------+--------------------+------------+--------------------+-----------+--------------------+--------------------+--------------------+
|azureml-ai-monito...|pandas.core.frame...|    bytes 0-867/868|ee3c0e82-7edf-480...|[{2024-02-05T15:0...|application/json|6fee5e87-e702-40f...|     default|/subscriptions/96...|        1.0|2024-02-05T15:00:22Z|azureml.inference...|ee3c0e82-7

In [5]:
logs_data = df.select('data').collect()
preprocessed_log_data = []
for log in logs_data:
    preprocessed_log_data.append(log.data[0])
print(preprocessed_log_data[0])

Row(end_time='2024-02-05T15:00:18.791564Z', events=[], input='{\n  "question": "what\'s Attention?",\n  "chat_history": []\n}', links=[], name='promptflow.flow', output='{\n  "output": "Attention is a mechanism used in machine learning models to focus on specific parts of input data when making predictions or generating output. It allows the model to selectively weigh different parts of the input, giving more importance to certain aspects and ignoring others. (Source: 1706.03762.pdf)"\n}', parent_id=None, session_id='', span_id='0x7fd179134fb9e709', span_type='SpanKind.INTERNAL', start_time='2024-02-05T15:00:13.789782Z', status='OK', trace_id='0xccf497116a5a912b29e55229bbac0ba8', user_id='')


In [6]:
span_schema = StructType([
        StructField('end_time', StringType(), False),
        StructField('events', ArrayType(StringType(), True), False), 
        StructField('input', StringType(), False), 
        StructField('links', ArrayType(StringType(), True), False), 
        StructField('name', StringType(), False), 
        StructField('output', StringType(), True), 
        StructField('parent_id', StringType(), True), 
        StructField('session_id', StringType(), True), 
        StructField('span_id', StringType(), False), 
        StructField('span_type', StringType(), False), 
        StructField('start_time', StringType(), True), 
        StructField('status', StringType(), True), 
        StructField('trace_id', StringType(), True),
        StructField('user_id', StringType(), True)
])

span_logs = spark.createDataFrame(data=preprocessed_log_data, schema=span_schema)
span_logs.show()

+--------------------+------+--------------------+-----+--------------------+--------------------+------------------+----------+------------------+-----------------+--------------------+------+--------------------+-------+
|            end_time|events|               input|links|                name|              output|         parent_id|session_id|           span_id|        span_type|          start_time|status|            trace_id|user_id|
+--------------------+------+--------------------+-----+--------------------+--------------------+------------------+----------+------------------+-----------------+--------------------+------+--------------------+-------+
|2024-02-05T15:00:...|    []|{\n  "question": ...|   []|     promptflow.flow|{\n  "output": "A...|              NULL|          |0x7fd179134fb9e709|SpanKind.INTERNAL|2024-02-05T15:00:...|    OK|0xccf497116a5a912...|       |
|2024-02-05T15:00:...|    []|{\n  "model": "gp...|   []|openai.resources....|{\n  "id": "chatc...|0x21d2ebaa

In [7]:
distinct_trace_ids = span_logs.select("trace_id").distinct()
distinct_trace_ids.show()

+--------------------+
|            trace_id|
+--------------------+
|0xccf497116a5a912...|
+--------------------+



In [8]:
traces = distinct_trace_ids.collect()

In [9]:
trace_spans = span_logs.where(span_logs.trace_id == traces[0].trace_id)
trace_spans.show()
trace_spans.schema

+--------------------+------+--------------------+-----+--------------------+--------------------+------------------+----------+------------------+-----------------+--------------------+------+--------------------+-------+
|            end_time|events|               input|links|                name|              output|         parent_id|session_id|           span_id|        span_type|          start_time|status|            trace_id|user_id|
+--------------------+------+--------------------+-----+--------------------+--------------------+------------------+----------+------------------+-----------------+--------------------+------+--------------------+-------+
|2024-02-05T15:00:...|    []|{\n  "question": ...|   []|     promptflow.flow|{\n  "output": "A...|              NULL|          |0x7fd179134fb9e709|SpanKind.INTERNAL|2024-02-05T15:00:...|    OK|0xccf497116a5a912...|       |
|2024-02-05T15:00:...|    []|{\n  "model": "gp...|   []|openai.resources....|{\n  "id": "chatc...|0x21d2ebaa

StructType([StructField('end_time', StringType(), False), StructField('events', ArrayType(StringType(), True), False), StructField('input', StringType(), False), StructField('links', ArrayType(StringType(), True), False), StructField('name', StringType(), False), StructField('output', StringType(), True), StructField('parent_id', StringType(), True), StructField('session_id', StringType(), True), StructField('span_id', StringType(), False), StructField('span_type', StringType(), False), StructField('start_time', StringType(), True), StructField('status', StringType(), True), StructField('trace_id', StringType(), True), StructField('user_id', StringType(), True)])

In [10]:
from trace_aggregator import SpanTree, Span

span_rows = trace_spans.collect()
span_list = [Span(row) for row in span_rows]
tree_builder = SpanTree(span_list)

In [11]:
trace_schema = StructType(
        [
            StructField("trace_id", StringType(), False),
            StructField("user_id", StringType(), True),
            StructField("session_id", StringType(), True),
            StructField("start_time", StringType(), False),
            StructField("end_time", StringType(), False),
            StructField("input", StringType(), False),
            StructField("output", StringType(), False),
            StructField("root_span", StringType(), True),
        ]
    )

agg_trace_schema_names = trace_schema.fieldNames()
agg_trace_schema_names

['trace_id',
 'user_id',
 'session_id',
 'start_time',
 'end_time',
 'input',
 'output',
 'root_span']

In [15]:
span_dict = tree_builder.root_span.span_row.asDict()
data = {key_name: span_dict.get(key_name, None) for key_name in agg_trace_schema_names}
data['root_span'] = str(tree_builder.root_span)
print(data)

{'trace_id': '0xccf497116a5a912b29e55229bbac0ba8', 'user_id': '', 'session_id': '', 'start_time': '2024-02-05T15:00:13.789782Z', 'end_time': '2024-02-05T15:00:18.791564Z', 'input': '{\n  "question": "what\'s Attention?",\n  "chat_history": []\n}', 'output': '{\n  "output": "Attention is a mechanism used in machine learning models to focus on specific parts of input data when making predictions or generating output. It allows the model to selectively weigh different parts of the input, giving more importance to certain aspects and ignoring others. (Source: 1706.03762.pdf)"\n}', 'root_span': '<trace_aggregator.Span object at 0x0000020D5867A4F0>'}


In [19]:
new_entry = spark.createDataFrame([data], trace_schema)
new_entry.show()

+--------------------+-------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+
|            trace_id|user_id|session_id|          start_time|            end_time|               input|              output|           root_span|
+--------------------+-------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+
|0xccf497116a5a912...|       |          |2024-02-05T15:00:...|2024-02-05T15:00:...|{\n  "question": ...|{\n  "output": "A...|<trace_aggregator...|
+--------------------+-------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+



In [14]:
json.dumps(tree_builder.root_span.span_row.asDict())

'{"end_time": "2024-02-05T15:00:18.791564Z", "events": [], "input": "{\\n  \\"question\\": \\"what\'s Attention?\\",\\n  \\"chat_history\\": []\\n}", "links": [], "name": "promptflow.flow", "output": "{\\n  \\"output\\": \\"Attention is a mechanism used in machine learning models to focus on specific parts of input data when making predictions or generating output. It allows the model to selectively weigh different parts of the input, giving more importance to certain aspects and ignoring others. (Source: 1706.03762.pdf)\\"\\n}", "parent_id": null, "session_id": "", "span_id": "0x7fd179134fb9e709", "span_type": "SpanKind.INTERNAL", "start_time": "2024-02-05T15:00:13.789782Z", "status": "OK", "trace_id": "0xccf497116a5a912b29e55229bbac0ba8", "user_id": ""}'