# Langfuse Data Analysis

In [None]:
import json
import os
from typing import Optional

from dotenv import load_dotenv
from langfuse import Langfuse

### Utilities Functions

In [None]:
def init_langfuse_client(public_key: str, secret_key: str, host: str):
    return Langfuse(
        public_key=public_key,
        secret_key=secret_key,
        host=host,
    )

In [None]:
def get_all_traces(client, name: Optional[str]=None):
    traces = []
    page = 1

    while True:
        data = client.fetch_traces(name=name, page=page).data
        if len(data) == 0:
            break
        traces += data
        page += 1

    return traces

In [None]:
def pprint_json(data):
    print(json.dumps(json.loads(data), indent=2, ensure_ascii=False))

In [None]:
def get_error_results_by_query(error_results, query):
    results = []
    for error_type in error_results.keys():
        results += list(
            filter(
                lambda error_result: error_result.dict()['input']['args'][0]['query'] == query,
                error_results[error_type]
            )
        )
    return results

In [None]:
def get_error_result_details(error_results):
    error_results_details = {}
    for result in error_results:
        error_type = result.output['metadata']['error_type']
        if error_type not in error_results_details:
            error_results_details[error_type] = [result]
        else:
            error_results_details[error_type].append(result)
    return error_results_details

In [None]:
def get_traces_by_conditions(traces, conditions):
    def get_traces_with_some_conditions(traces, conditions):
        results = []
        for trace in traces:
            match = True
            for key, value in conditions.items():
                if key == "metadata":
                    for meta_key, meta_value in value.items():
                        if trace.metadata.get(meta_key) != meta_value:
                            match = False
                            break
                elif getattr(trace, key, None) != value:
                    match = False
                    break
            if match:
                results.append(trace)
        return results
    
    def get_trace_results_by_type(traces):
        error_results = []
        no_error_results = []
        for trace in traces:
            if trace.metadata.get('error_type', ''):
                error_results.append(trace)
            else:
                no_error_results.append(trace)

        assert len(error_results) + len(no_error_results) == len(traces)

        return error_results, no_error_results

    _traces = get_traces_with_some_conditions(traces, conditions)
    print(f'number of traces: {len(_traces)}')

    error_results, no_error_results = get_trace_results_by_type(_traces)
    print(f'# of error results: {len(error_results)}')
    print(f'# of no error results: {len(no_error_results)}')
    print(f'ratio of failed traces: {len(error_results) / len(_traces)}')

    return error_results, no_error_results

In [None]:
def get_traces_group_by_value(traces, name, value):
    results = {}
    for trace in traces:
        if trace.name == name:
            if _val := trace.metadata.get(value, ''):
                if _val not in results:
                    results[_val] = [trace]
                else:
                    results[_val].append(trace)

    return results

## Analysis Code

In [None]:
load_dotenv(".env", override=True)

client = init_langfuse_client(
    os.getenv("LANGFUSE_PUBLIC_KEY"),
    os.getenv("LANGFUSE_SECRET_KEY"),
    os.getenv("LANGFUSE_HOST")
)

Get all traces

In [None]:
traces = get_all_traces(client)
len(traces)

Trace names

In [None]:
set(trace.name for trace in traces)

### Traces: Prepare Semantics

In [None]:
conditions = {
    "metadata": {
    },
    "name": "Prepare Semantics",
}
error_results, no_error_results = get_traces_by_conditions(traces, conditions)

In [None]:
error_results_details = get_error_result_details(error_results)

In [None]:
for key, value in error_results_details.items():
    print(key)
    print(len(value))

### Traces: Ask Question

In [None]:
conditions = {
    "metadata": {
    },
    "name": "Ask Question",
}
error_results, no_error_results = get_traces_by_conditions(traces, conditions)

In [None]:
_traces_by_mdl_hash = get_traces_group_by_value(error_results, "Ask Question", "mdl_hash")
sorted_traces_by_mdl_hash = sorted(_traces_by_mdl_hash.items(), key=lambda x: len(x[1]), reverse=True)

print(f'number of mdl_hash: {len(sorted_traces_by_mdl_hash)}')
for mdl_hash, traces in sorted_traces_by_mdl_hash:
    print(f'mdl_hash: {mdl_hash}')
    print(f'size of traces: {len(traces)}')

In [None]:
_traces_by_project_id = get_traces_group_by_value(error_results, "Ask Question", "project_id")
sorted_traces_by_project_id = sorted(_traces_by_project_id.items(), key=lambda x: len(x[1]), reverse=True)

print(f'number of project_id: {len(sorted_traces_by_project_id)}')
for project_id, traces in sorted_traces_by_project_id:
    print(f'project_id: {project_id}')
    print(f'size of traces: {len(traces)}')

In [None]:
error_results_details = get_error_result_details(error_results)
for key, value in error_results_details.items():
    print(key)
    print(len(value))

In [None]:
for key in error_results_details.keys():
    print(f'Error Type: {key}')
    for error_result in error_results_details[key]:
        pprint_json(error_result.json())

In [None]:
_error_results = get_error_results_by_query(error_results_details, '我在台中公園，有哪些路線我可以搭乘？')
len(_error_results)

In [None]:
for _error_result in _error_results:
    pprint_json(_error_result.json())

### Traces: Ask Details(Breakdown SQL)

In [None]:
conditions = {
    "metadata": {
    },
    "name": "Ask Details(Breakdown SQL)",
}
error_results, no_error_results = get_traces_by_conditions(traces, conditions)