In [None]:
import pandas as pd
import datetime as dt
import swifter
from collections import namedtuple
import matplotlib.pyplot as plt
import numpy as np
import copy
import pandas as pd
import swifter

In [None]:
def parse_mi_ho(df):
    
    nr_pci = ''

    def NR_OTA():
        if df["type_id"].iloc[i] == "5G_NR_RRC_OTA_Packet":
            return True
        else:
            return False

    def find_1st_after(target, look_after=1):
        for j in range(i, len(df)):
            t_ = df["time"].iloc[j]
            if (t_ - t).total_seconds() > look_after:
                return None, None
            if df[target].iloc[j] not in [0,'0'] and not np.isnan(df[target].iloc[j]):
                return t_, j

    def find_1st_before(target, look_before=1):
        for j in range(i, -1, -1):
            t_ = df["time"].iloc[j]
            if (t - t_).total_seconds() > look_before:
                return None, None
            if df[target].iloc[j] not in [0,'0'] and not np.isnan(df[target].iloc[j]):
                return t_, j

    HO = namedtuple('HO','start, end, others', defaults=(None,None))

    D = {
        'Conn_Rel':[], 
        'Conn_Req':[], # Setup
        'LTE_HO': [], # LTE -> newLTE
        'MN_HO': [], # LTE + NR -> newLTE + NR
        'MN_HO_to_eNB': [], # LTE + NR -> newLTE
        'SN_setup': [], # LTE -> LTE + NR => NR setup
        'SN_Rel': [], # LTE + NR -> LTE
        'SN_HO': [], # LTE + NR -> LTE + newNR
        'RLF_II': [],
        'RLF_III': [],
        'SCG_RLF': [],
        }

    for i in range(len(df)):
        if NR_OTA():
            continue

        t = df["time"].iloc[i]
        
        others = ''
        if df["rrcConnectionRelease"].iloc[i] == 1:
            D['Conn_Rel'].append(HO(start=t))

        if df["rrcConnectionRequest"].iloc[i] == 1:
            a = find_1st_after('rrcConnectionReconfigurationComplete',look_after=2)[0]
            b = find_1st_after('securityModeComplete',look_after=2)[0]
            end = a if a > b else b
            D['Conn_Req'].append(HO(start=t,end=end))
            nr_pci = ''
        
        if df["lte-rrc.t304"].iloc[i] == 1:
            end, _ = find_1st_after('rrcConnectionReconfigurationComplete')
            serv_cell, target_cell = df["PCI"].iloc[i], df['lte_targetPhysCellId'].iloc[i]
            serv_freq, target_freq = df["Freq"].iloc[i], df['dl-CarrierFreq'].iloc[i]

            if df["SCellToAddMod-r10"].iloc[i] == 1:
                n =len(str(df["SCellIndex-r10.1"].iloc[i]).split('@'))
                others=f'Set up {n} SCell.'
            
            if serv_freq != target_freq:
                a,b = find_1st_before("rrcConnectionReestablishmentRequest", 1)
                others += " Inter freq. HO."
                if a is not None:
                    others += " Near after RLF."
                
            if df["nr-rrc.t304"].iloc[i] == 1 and df["dualConnectivityPHR: setup (1)"].iloc[i] == 1:
                
                if serv_cell == target_cell and serv_freq == target_freq:
                    D['SN_setup'].append(HO(start=t, end=end, others=others))
                    nr_pci = df['nr_physCellId'].iloc[i]
                else:    
                    D['MN_HO'].append(HO(start=t, end=end, others=others))
                    nr_pci = df['nr_physCellId'].iloc[i]
            else:
                
                if serv_cell == target_cell and serv_freq == target_freq:
                    a, b = find_1st_before("scgFailureInformationNR-r15")
                    if a is not None:
                        others += " Caused by scg-failure."
                    D['SN_Rel'].append(HO(start=t, end=end, others=others))
                    nr_pci = ''
                else:
                    a, b = find_1st_before("rrcConnectionSetup",3)
                    if a is not None:
                        others += 'Near After connection setup'
                    if nr_pci == '':
                        D['LTE_HO'].append(HO(start=t, end=end, others=others))
                    else:
                        D['MN_HO_to_eNB'].append(HO(start=t, end=end, others=others))

        if df["nr-rrc.t304"].iloc[i] == 1 and not df["dualConnectivityPHR: setup (1)"].iloc[i] == 1:
            end, _ = find_1st_after('rrcConnectionReconfigurationComplete')
            D['SN_HO'].append(HO(start=t,end=end))
            nr_pci = df['nr_physCellId'].iloc[i]

        if df["rrcConnectionReestablishmentRequest"].iloc[i] == 1:
            end, _ = find_1st_after('rrcConnectionReestablishmentComplete', look_after=1)
            b, _ = find_1st_after('rrcConnectionReestablishmentReject', look_after=1)
            others = df["reestablishmentCause"].iloc[i]
            c, d = find_1st_before('scgFailureInformationNR-r15', 1)
            if c != None:
                others  += ' caused by scgfailure.'
            if end is not None: 
                # Type II
                D['RLF_II'].append(HO(start=t,end=end,others=others))
            else: 
                # Type III
                D['RLF_III'].append(HO(start=t,end=b,others=others)) # End for Type III?
            
        if df["scgFailureInformationNR-r15"].iloc[i] == 1:
            others = df["failureType-r15"].iloc[i]
            D['SCG_RLF'].append(HO(start=t,others=others))
    
    return D

In [None]:
f = '/home/wmnlab/diag_log_sm07_2023-05-07_12-42-00_rrc.csv'
df = pd.read_csv(f)
df["time"] = df["time"].swifter.apply(lambda x: pd.to_datetime(x) + dt.timedelta(hours=8))

A = parse_mi_ho(df)

In [None]:
A

# Measurement Report

In [None]:
class REPORTCONFIG:
    def __init__(self, name, parameter):
        self.name = name
        self.parameters = self.parse_parameter(parameter)
    
    def parse_parameter(self, parameter):
        L = []
        start = False
        for i in range(len(parameter)):
            if parameter[i] == "'" and start == False:
                s = ''
                start = True
                continue
            
            if start:
                if parameter[i] == "'":
                    L.append(s)
                    start = False
                s += parameter[i]
        
        P = dict()
        filter = '+-0123456789[]()&'
        for i in range(0,len(L),2):
            x = ''
            for c in L[i+1]:
                if c in filter:
                    x += c
            try:
                P[L[i]] = int(x)
            except:
                P[L[i]] = x
        return P

    def __str__(self):
        return self.name

    def __repr__(self):
        return self.name + ' ' + str(self.parameters)  

class MEASOBJ:

    def __init__(self, obj, freq):
        self.name = obj
        self.freq = freq

    def __str__(self):
        return f'({self.name}, {self.freq})'

    def __repr__(self):
        return f'({self.name}, {self.freq})'

def parse_measIdToAddMod(s):
    a = s.replace('(','')
    a = a.replace(')','')
    a = a.split('&')
    return (a[0], a[1], a[2])


In [None]:
## Mobileinsight rrc csv file.
file = '/home/wmnlab/diag_log_qc00_2023-04-17_15-34-15_rrc.csv'
mi_rrc_df = pd.read_csv(file)
mi_rrc_df["Timestamp"] = mi_rrc_df["Timestamp"].swifter.apply(lambda x: pd.to_datetime(x) + dt.timedelta(hours=8))

In [None]:
unused = ['DL frequency','UL frequency', 'DL bandwidth', 'UL bandwidth', 'Cell Identity', 'TAC','Band ID', 'MCC', 'MNC']
mi_rrc_df = mi_rrc_df.drop(columns=unused)
mi_rrc_df = mi_rrc_df.dropna()
# mi_rrc_df['measId'] = mi_rrc_df['measId'].astype('int32')

In [None]:
# mi_rrc_df

In [None]:
measobj_dict = {}
report_config_dict = {}
measId_dict = {}

nr_measobj_dict = {}
nr_report_config_dict = {}
nr_measId_dict = {}

def reset():

    measobj_dict = {}
    report_config_dict = {}
    measId_dict = {}

    nr_measobj_dict = {}
    nr_report_config_dict = {}
    nr_measId_dict = {}

MR = namedtuple('MR','time, event, others', defaults=(None,None))
L = []

event_list = [
    "lte-measurementReport", "lte-MeasObjectToAddMod", "lte-ReportConfigToAddMod", "lte-measIdToRemoveList","lte-MeasIdToAddMod", 
    "lte-rrc.t304","rrcConnectionRelease", "rrcConnectionSetup","rrcConnectionReestablishmentRequest","rrcConnectionReestablishmentReject",
    "SCellToAddMod-r10", "sCellToReleaseList-r10", "nr-rrc.t304",
    "nr-MeasObjectToAddMod", "nr-ReportConfigToAddMod", "nr-MeasIdToAddMod", "nr-measurementReport","scgFailureInformationNR-r15",
    "nr-Config-r15: release (0)", 
    ]

RRC_connected = True


for i in range(len(mi_rrc_df)):

    if mi_rrc_df['type_id'].iloc[i] == "5G_NR_RRC_OTA_Packet" or mi_rrc_df['type_id'].iloc[i] == "LTE_RRC_Serv_Cell_Info":
        continue

    time = mi_rrc_df['Timestamp'].iloc[i]
    others = ''
    
    if mi_rrc_df["rrcConnectionRelease"].iloc[i] == 1:      
        reset()

    if mi_rrc_df["lte-measurementReport"].iloc[i] == 1:
        
        others += 'E-UTRAN'
        id = mi_rrc_df['measId'].iloc[i]
        x = measId_dict[id]
        event = report_config_dict[x[1]].name
        try:
            x = measId_dict[id]
            event = report_config_dict[x[1]].name
            mr = MR(time = time, event = event, others = others)
        except:
            mr = MR(time = time, event = 'Unknown', others = others)
        
        L.append(mr)

    if mi_rrc_df["nr-measurementReport"].iloc[i] == 1:
        
        others = 'NR'
        id = str(int(mi_rrc_df['measId'].iloc[i]))

        try:
            x = nr_measId_dict[id]
            event = nr_report_config_dict[x[1]]
            mr = MR(time = time, event = event, others = others)
        except:
            mr = MR(time = time, event = 'Unknown', others = others)
        
        L.append(mr)

    if mi_rrc_df["lte-MeasObjectToAddMod"].iloc[i] == 1:
        print("MeasObjectToAddMod")
        Id_list = str(mi_rrc_df["measObjectId"].iloc[i]).split('@')
        measobj_list = mi_rrc_df["measObject"].iloc[i].split('@')
        carrierFreq_list = str(mi_rrc_df["carrierFreq"].iloc[i]).split('@')
        carrierFreq_r15_list = str(mi_rrc_df["carrierFreq-r15"].iloc[i]).split('@')
        for a in range(len(Id_list)):
            if measobj_list[a] == "measObjectEUTRA (0)":
                measobj_dict[Id_list[a]] = MEASOBJ(measobj_list[a], carrierFreq_list[0])
                carrierFreq_list.pop(0)
            elif measobj_list[a] == "measObjectNR-r15 (5)":
                measobj_dict[Id_list[a]] = MEASOBJ(measobj_list[a], carrierFreq_r15_list[0])
                carrierFreq_r15_list.pop(0)
        print(measobj_dict)   

    if mi_rrc_df["nr-MeasObjectToAddMod"].iloc[i] == 1:
        print("nr-MeasObjectToAddMod")
        Id_list = mi_rrc_df["measObjectId"].iloc[i].split('@')
        measobj_list = mi_rrc_df["measObject"].iloc[i].split('@')
        ssbFrequency_list = str(int(mi_rrc_df["ssbFrequency"].iloc[i])).split('@')

        for a in range(len(Id_list)):
            if measobj_list[a] == "measObjectNR (0)":
                nr_measobj_dict[Id_list[a]] = MEASOBJ(measobj_list[a], int(ssbFrequency_list[0]))
                ssbFrequency_list.pop(0)     
        print(nr_measobj_dict)
        
    if mi_rrc_df["lte-ReportConfigToAddMod"].iloc[i] == 1:
        print("ReportConfigToAddMod")
        reportConfigId_list = str(mi_rrc_df["lte-reportConfigId"].iloc[i]).split('@')
        eventId_list = mi_rrc_df["lte-eventId"].iloc[i].split('@')
        parameter_list = mi_rrc_df["lte-parameter"].iloc[i].split('@')
        for a in range(len(reportConfigId_list)):
            report_config_dict[reportConfigId_list[a]] = REPORTCONFIG(eventId_list[a], parameter_list[a])
        print(report_config_dict)

    if mi_rrc_df["nr-ReportConfigToAddMod"].iloc[i] == 1: #############
        print("nr-ReportConfigToAddMod")
        reportConfigId_list = mi_rrc_df["nr-reportConfigId"].iloc[i].split('@')
        eventId_list = mi_rrc_df["nr-eventId"].iloc[i].split('@')
        parameter_list = mi_rrc_df["nr-parameter"].iloc[i].split('@')
        for a in range(len(reportConfigId_list)):
            nr_report_config_dict[reportConfigId_list[a]] = REPORTCONFIG(eventId_list[a], parameter_list[a])
        print(nr_report_config_dict)

    if mi_rrc_df["lte-measIdToRemoveList"].iloc[i] != '0':
        print("measIdToRemoveList")
        measIdToRemove_list = str(mi_rrc_df["lte-measIdToRemoveList"].iloc[i]).split('@')
        if len(measIdToRemove_list) == 32:
            measId_dict = {}
        else:
            for a in range(len(measIdToRemove_list)):
                measId_dict.pop(measIdToRemove_list[a])
        print(measId_dict)

    if mi_rrc_df["lte-MeasIdToAddMod"].iloc[i] != '0':
        print("MeasIdToAddMod")
        MeasIdToAdd_list = mi_rrc_df["lte-MeasIdToAddMod"].iloc[i].split('@')
        for a in range(len(MeasIdToAdd_list)):
            x = parse_measIdToAddMod(MeasIdToAdd_list[a])
            measId_dict[x[0]] = (x[1],x[2])
        print(measId_dict)

    if mi_rrc_df["nr-MeasIdToAddMod"].iloc[i] != '0' and mi_rrc_df["nr-MeasIdToAddMod"].iloc[i] != 0:
        print("nr-MeasIdToAddMod")
        MeasIdToAdd_list = mi_rrc_df["nr-MeasIdToAddMod"].iloc[i].split('@')
        for a in range(len(MeasIdToAdd_list)):
            x = parse_measIdToAddMod(MeasIdToAdd_list[a])
            nr_measId_dict[x[0]] = (x[1],x[2])
        print(nr_measId_dict)
    

print(f'Finished')

In [None]:
L