In [1]:

import sys
import os

# Get the absolute path to the 'schema' directory
schema_dir = os.path.join(os.getcwd(), 'schema')

# Add this directory to sys.path
if schema_dir not in sys.path:
    sys.path.append(schema_dir)


In [2]:
 

import json
import base64

from schema import Tracing_pb2
import Common_pb2

import pandas as pd

In [3]:

def convert_protobuf_to_dict(pb_obj):
    result = {}
    for field in pb_obj.DESCRIPTOR.fields:
        value = getattr(pb_obj, field.name)
        if field.type == field.TYPE_MESSAGE:  # Nested protobuf
            if field.label == field.LABEL_REPEATED:
                result[field.name] = [convert_protobuf_to_dict(item) for item in value]
            else:
                result[field.name] = convert_protobuf_to_dict(value)
        elif field.type == field.TYPE_ENUM:
            result[field.name] = field.enum_type.values_by_number.get(value).name
        else:
            result[field.name] = value
    return result


In [4]:


def parse_binary_data(binary_data):
    trace_segment_object = Tracing_pb2.SegmentObject()
    try:
        trace_segment_object.ParseFromString(binary_data)
        return trace_segment_object
    except Exception as e:
        print("Error parsing binary data:", e)
        return None



In [5]:

def parse_trace_data_to_csv_row(document):
    binary_data = base64.b64decode(document['_source']['data_binary'])
    parsed_data = parse_binary_data(binary_data)
    parsed_data = convert_protobuf_to_dict(parsed_data)

    lst = []
    for span in parsed_data['spans']:
        lst.append({
            'traceId': parsed_data['traceId'],
            'traceSegmentId': parsed_data['traceSegmentId'],
            'startTime': span['startTime'],
            'parentSpanId': span['parentSpanId'],
            'spanId': span['spanId'],
            'endTime': span['endTime'],
            'operationName': span['operationName'],
            'peer': span['peer'],
            'spanType': span['spanType'],
            'spanLayer': span['spanLayer'],
            'componentId': span['componentId'],
            'isError': span['isError'],
            'service': parsed_data['service']
        })

    return lst



In [6]:

# ###
# # This method contains logic to fetch the documents from es
# # This method get the documents ingested in last 1m and scrolls
# # till all the documents fetched beyond the fault size 10000
# ##
# def get_data_from_index(index_name, query=None, size=10000, scroll_time='1m'):
#     if query is None:
#         query = {"query": {"match_all": {}}}

#     # Initialize the scroll
#     page = ELASTIC_SEARCH_HOST.search(index=index_name, body=query, scroll=scroll_time, size=size)
#     scroll_id = page['_scroll_id']
#     hits = page['hits']['hits']

#     # Start scrolling
#     while len(page['hits']['hits']):
#         page = ELASTIC_SEARCH_HOST.scroll(scroll_id=scroll_id, scroll=scroll_time)
#         scroll_id = page['_scroll_id']
#         hits.extend(page['hits']['hits'])

#     # Clear the scroll when done
#     ELASTIC_SEARCH_HOST.clear_scroll(scroll_id=scroll_id)

#     return hits


###
# This method aggregate each line of the log data recived from es
#
##
def save_log_data_in_file(data,filename):
    log_rows = []
    for doc in data:
        if doc:
            log_rows.append(doc['_source']['content'])

    with open(f"parsed_data\\{filename}", 'w', encoding='utf-8') as file:
        for entry in log_rows:
            file.write(entry)


###
# This method aggregate  each line of the trace data recived from es
#
##
def save_trace_data_in_file(data,filename):
    csv_rows = []
    for doc in data:
        row = parse_trace_data_to_csv_row(doc)
        if row:
            csv_rows.extend(row)

    df = pd.DataFrame(csv_rows)
    df.to_csv(f"parsed_data\\{filename}", index=False)

In [7]:


def read_file(filename):
    hits = []

    with open(f"..\\Analysis\\data\\RawData\\dump\\{filename}", 'r', encoding='utf-8') as file:
        for line in file:
            try:
                data = json.loads(line)
                hits.append(data)
            except json.JSONDecodeError as e:
                print(f"Error parsing JSON line: {e}")
    
    print(f"Total length of data {len(hits)}")
    
    return hits
    


In [8]:
save_log_data_in_file(read_file("logdata.json"),"logdata_no_fault.log")
save_log_data_in_file(read_file("logdata_f1.json"),"logdata_f1_fault.log")


Total length of data 349289
Total length of data 41148


In [9]:
save_trace_data_in_file(read_file("tracedata.json"),"tracedata_no_fault.csv")
save_trace_data_in_file(read_file("tracedata_f1.json"),"tracedata_f1_fault.csv")


Total length of data 142822
Total length of data 16039
