In [12]:
import pandas as pd
import json
import re
import logging
import numpy as np
from scipy.stats import gmean

### - Reading kafka logged messages

In [2]:
# read the entire file into a python array
#input_log_file= 'consumerrecord.log'
input_log_file= '20241017T100924-hall_k-kafka-messages-T131959.log'
with open(input_log_file, 'r') as f:
    data = f.readlines()
print(len(data))
data[0:2]

31374


['ConsumerRecord(topic=\'ProductionResultEvent\', partition=1, offset=105643292, timestamp=1729152564708, timestamp_type=0, key=b\'ca4f940bc2eb4138a5ca68b9e7df1af8\', value=b\'{"station":"MachineTotal","results":[],"globalResult":"GOOD","isGlobal":true,"isTraceable":false,"isStation":false,"isDetailed":true,"timestamp":1729152564457,"machineId":"ca4f940bc2eb4138a5ca68b9e7df1af8"}\', headers=[], checksum=None, serialized_key_size=32, serialized_value_size=206, serialized_header_size=-1)\n',
 'ConsumerRecord(topic=\'ProductionResultEvent\', partition=1, offset=105643293, timestamp=1729152565011, timestamp_type=0, key=b\'ca4f940bc2eb4138a5ca68b9e7df1af8\', value=b\'{"station":"VisionAPrint","results":[],"globalResult":"GOOD","isGlobal":false,"isTraceable":false,"isStation":true,"isDetailed":true,"timestamp":1729152564757,"machineId":"ca4f940bc2eb4138a5ca68b9e7df1af8"}\', headers=[], checksum=None, serialized_key_size=32, serialized_value_size=206, serialized_header_size=-1)\n']

### Cleansing
- Removing ConsumerRecord and parantesis from start and end
- Splitting string by comma(,) without touching the qouted parts to find out key/value pairs
- Filtering Comma/None/Emplty cells
- Handling broken key/values
- Decoding bytes string included

In [3]:
def fix_broken_and_bytes_keyvalues(keyvalues):
    reverse_keyvalues = list(reversed(keyvalues))
    #fixed_keyvalues = []
    fixed_keyvalues = {}
    hold=""
    for i, item in enumerate(reverse_keyvalues):
        if isinstance(item, (bytes, bytearray)) :
            item = item
        if i<len(reverse_keyvalues)-1 and (reverse_keyvalues[i+1][-1] == "=" or reverse_keyvalues[i+1][-2:] == "=b") :
            hold=item
        else:
            keyvalue_str = re.sub(r"(.*)([b]\'.*')(.*)", lambda x: x.group(1)+eval(x.group(2)).decode('UTF-8')+x.group(3), item+hold)
            # Remove first and last single qoute if exists
            keyvalue_str = keyvalue_str.replace("'", "", 1).replace("'", "", -1)
            # split key value by first occurance of =
            keyvalue = keyvalue_str.strip().split("=",1)
            #fixed_keyvalues.insert(0,keyvalue.strip().split("="))
            if len(keyvalue)==2 :
                fixed_keyvalues[keyvalue[0]] = keyvalue[1]
            else:
                logging.exception("This is not valid key=value : "+keyvalue_str+" inside :"+str(keyvalues))
            hold=""
    return fixed_keyvalues

In [4]:
def cleanse_ConsumerRecord(consumerRecord):
    # Splitting string by comma(,) without touching the qouted parts
    raw_keyvalues = [p for p in re.split("(,|\\\".*?\\\"|'.*?')", consumerRecord.strip()[15:-1])]
    filter_empty_keyvalues = list(filter(lambda item: item if item!=',' else None , raw_keyvalues))
    fixed_keyvalues = fix_broken_and_bytes_keyvalues(filter_empty_keyvalues)
    return fixed_keyvalues

In [5]:
cleanse_ConsumerRecord(data[0])
# pd.DataFrame.from_dict(data, orient='index',                       columns=['A', 'B', 'C', 'D'])

{'serialized_header_size': '-1',
 'serialized_value_size': '206',
 'serialized_key_size': '32',
 'checksum': 'None',
 'headers': '[]',
 'value': '{"station":"MachineTotal","results":[],"globalResult":"GOOD","isGlobal":true,"isTraceable":false,"isStation":false,"isDetailed":true,"timestamp":1729152564457,"machineId":"ca4f940bc2eb4138a5ca68b9e7df1af8"}',
 'key': 'ca4f940bc2eb4138a5ca68b9e7df1af8',
 'timestamp_type': '0',
 'timestamp': '1729152564708',
 'offset': '105643292',
 'partition': '1',
 'topic': 'ProductionResultEvent'}

In [6]:
list_dict_records = list(map(lambda x:  cleanse_ConsumerRecord(x) , data))
#print(next(list_dict_records))
print(len(list_dict_records))
list_dict_records[0]

31374


{'serialized_header_size': '-1',
 'serialized_value_size': '206',
 'serialized_key_size': '32',
 'checksum': 'None',
 'headers': '[]',
 'value': '{"station":"MachineTotal","results":[],"globalResult":"GOOD","isGlobal":true,"isTraceable":false,"isStation":false,"isDetailed":true,"timestamp":1729152564457,"machineId":"ca4f940bc2eb4138a5ca68b9e7df1af8"}',
 'key': 'ca4f940bc2eb4138a5ca68b9e7df1af8',
 'timestamp_type': '0',
 'timestamp': '1729152564708',
 'offset': '105643292',
 'partition': '1',
 'topic': 'ProductionResultEvent'}

In [42]:
df = pd.DataFrame.from_dict(list_dict_records)
df.columns = ['machine' if x=='key' else x for x in df.columns]
df['datetime'] = pd.to_datetime(df['timestamp'], unit='ms')
df.head()

  df['datetime'] = pd.to_datetime(df['timestamp'], unit='ms')


Unnamed: 0,serialized_header_size,serialized_value_size,serialized_key_size,checksum,headers,value,machine,timestamp_type,timestamp,offset,partition,topic,datetime
0,-1,206,32,,[],"{""station"":""MachineTotal"",""results"":[],""global...",ca4f940bc2eb4138a5ca68b9e7df1af8,0,1729152564708,105643292,1,ProductionResultEvent,2024-10-17 08:09:08.864
1,-1,206,32,,[],"{""station"":""VisionAPrint"",""results"":[],""global...",ca4f940bc2eb4138a5ca68b9e7df1af8,0,1729152565011,105643293,1,ProductionResultEvent,2024-10-17 08:09:08.864
2,-1,206,32,,[],"{""station"":""MachineTotal"",""results"":[],""global...",ca4f940bc2eb4138a5ca68b9e7df1af8,0,1729152565012,105643294,1,ProductionResultEvent,2024-10-17 08:09:08.864
3,-1,187,18,,[],"{""station"":""Machine"",""results"":[],""globalResul...",CMI500110000240089,0,1729152565291,63065753,4,ProductionResultEvent,2024-10-17 08:09:08.864
4,-1,206,32,,[],"{""station"":""VisionAPrint"",""results"":[],""global...",ca4f940bc2eb4138a5ca68b9e7df1af8,0,1729152565315,105643295,1,ProductionResultEvent,2024-10-17 08:09:08.864


### Describe

In [41]:
def describex(data):
    stats = data.describe(include='all')
    skewness = data.skew()
    kurtosis = data.kurtosis()
    skewness_df = pd.DataFrame({'skewness':skewness}).T
    kurtosis_df = pd.DataFrame({'kurtosis':kurtosis}).T
    gmean_df = pd.DataFrame(df.apply(gmean, axis=0),columns=['gmean']).T
    return stats.append([kurtosis_df,skewness_df,gmean_df])
#describex(df)
df.describe(include='all')

Unnamed: 0,serialized_header_size,serialized_value_size,serialized_key_size,checksum,headers,value,machine,timestamp_type,timestamp,offset,partition,topic,datetime
count,31374.0,31374.0,31374.0,31374.0,31374,31374,31374,31374.0,31374.0,31374.0,31374.0,31374,31374
unique,1.0,48.0,4.0,1.0,1,31371,4,1.0,29577.0,31374.0,3.0,6,
top,-1.0,206.0,16.0,,[],"{""station"":""VisionAPrint"",""results"":[],""global...",CCT6K10000239396,0.0,1729153872225.0,105643292.0,1.0,ProductionResultEvent,
freq,31374.0,9755.0,13705.0,31374.0,31374,2,13705,31374.0,6.0,1.0,23622.0,31239,
mean,,,,,,,,,,,,,2024-10-17 08:34:47.554194944
min,,,,,,,,,,,,,2024-10-17 08:09:08.864000
25%,,,,,,,,,,,,,2024-10-17 08:20:04.224000
50%,,,,,,,,,,,,,2024-10-17 08:30:59.584000
75%,,,,,,,,,,,,,2024-10-17 08:44:06.016000
max,,,,,,,,,,,,,2024-10-17 09:12:29.952000


### Pivot Machines * Topic  count

In [8]:
ptable = pd.pivot_table(df, values='timestamp', index=['machine'], columns='topic', aggfunc='count')
ptable

topic,LotChangedEvent,MachineMessageClearEvent,MachineMessageEvent,ProductionResultEvent,RecipeChangedEvent,StateChangedEvent
machine,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
1000155001,,6.0,9.0,3992.0,,20.0
CCT6K10000239396,3.0,3.0,4.0,13689.0,2.0,4.0
CMI500110000240089,,17.0,17.0,3691.0,,
ca4f940bc2eb4138a5ca68b9e7df1af8,,10.0,10.0,9867.0,,30.0


### Extract json data in value field

In [9]:
df_extracted = df.join(df['value'].apply(json.loads).apply(pd.Series), lsuffix='_left', rsuffix='_right', how='outer')
pd.set_option('display.max_columns', None)
df_extracted.head()

Unnamed: 0,serialized_header_size,serialized_value_size,serialized_key_size,checksum,headers,value,machine,timestamp_type,timestamp_left,offset,partition,topic,station,results,globalResult,isGlobal,isTraceable,isStation,isDetailed,timestamp_right,machineId,instanceId,messageId,parameters,locationId,severity,state,name,changeType,lotType,size,recipeType,idealCycleTime
0,-1,206,32,,[],"{""station"":""MachineTotal"",""results"":[],""global...",ca4f940bc2eb4138a5ca68b9e7df1af8,0,1729152564708,105643292,1,ProductionResultEvent,MachineTotal,[],GOOD,True,False,False,True,1729152564457,ca4f940bc2eb4138a5ca68b9e7df1af8,,,,,,,,,,,,
1,-1,206,32,,[],"{""station"":""VisionAPrint"",""results"":[],""global...",ca4f940bc2eb4138a5ca68b9e7df1af8,0,1729152565011,105643293,1,ProductionResultEvent,VisionAPrint,[],GOOD,False,False,True,True,1729152564757,ca4f940bc2eb4138a5ca68b9e7df1af8,,,,,,,,,,,,
2,-1,206,32,,[],"{""station"":""MachineTotal"",""results"":[],""global...",ca4f940bc2eb4138a5ca68b9e7df1af8,0,1729152565012,105643294,1,ProductionResultEvent,MachineTotal,[],GOOD,True,False,False,True,1729152564857,ca4f940bc2eb4138a5ca68b9e7df1af8,,,,,,,,,,,,
3,-1,187,18,,[],"{""station"":""Machine"",""results"":[],""globalResul...",CMI500110000240089,0,1729152565291,63065753,4,ProductionResultEvent,Machine,[],GOOD,True,False,False,True,1729152565020,CMI500110000240089,,,,,,,,,,,,
4,-1,206,32,,[],"{""station"":""VisionAPrint"",""results"":[],""global...",ca4f940bc2eb4138a5ca68b9e7df1af8,0,1729152565315,105643295,1,ProductionResultEvent,VisionAPrint,[],GOOD,False,False,True,True,1729152565157,ca4f940bc2eb4138a5ca68b9e7df1af8,,,,,,,,,,,,


# Pivot ['machine','isGlobal'] * ['topic','globalResult']

In [14]:
ptable2 = pd.pivot_table(df_extracted, values='timestamp_left',
                         index=['machine','isGlobal'], columns=['topic','globalResult'], aggfunc='count')
ptable2

Unnamed: 0_level_0,topic,ProductionResultEvent,ProductionResultEvent
Unnamed: 0_level_1,globalResult,BAD,GOOD
machine,isGlobal,Unnamed: 2_level_2,Unnamed: 3_level_2
1000155001,False,21.0,3761.0
1000155001,True,9.0,201.0
CCT6K10000239396,False,1.0,11407.0
CCT6K10000239396,True,2.0,2279.0
CMI500110000240089,False,44.0,
CMI500110000240089,True,18.0,3629.0
ca4f940bc2eb4138a5ca68b9e7df1af8,False,56.0,4873.0
ca4f940bc2eb4138a5ca68b9e7df1af8,True,56.0,4882.0
