In [4]:
import re
import os
import math
import statistics

import pandas as pd

from collections import defaultdict

In [8]:
log_file_path = r'C:\Users\lyin0\Desktop\LogManager\logs\8.0m.log'

with open(log_file_path, 'r', encoding='utf-8') as f:
    content = [line.strip() for line in f.readlines() if line.strip()]
print(type(content))
print(content[37])
print(content[37].split(',')[9])
print(type(content[37].split(',')[9]))

<class 'list'>
(PORT 1) : 6a,5,0,18,21,43,65,87,0,3,1,2e,3,71,0,0,0,0,0,c9,c7,7f,55,0,0,0,0,0   TimeStamp  :  3532       Distance  :  814
3
<class 'str'>


In [None]:
import re

distance_pattern = r"PORT\s*(\d+)"
curr_dist = re.search(distance_pattern, content[37]).group(1)

print(curr_dist)
print(type(curr_dist))

1
<class 'str'>


In [None]:
from collections import defaultdict

def extract_distance_from_gui_log(log_content: list, pattern:str, device_info_pattern=r"PORT\s*(\d+)") -> defaultdict:
    distances = defaultdict(list)
    for line in log_content:
        # extract port information
        device_info = re.search(device_info_pattern, line).group(1)

        # extract secutrity status code
        security_status_code = int(line.split(',')[9])

        if not security_status_code:  # security status code is 0
            tmp = float(re.search(pattern, line).group(1))
            # regard 65535 as failed ranging result
            dist = tmp if tmp != 65535 else float('NaN')
        else:
            # security code is not 0
            # un-secured results, regarded as ranging failed
            dist = float('NaN')
        distances[device_info].append(dist)
    return distances

In [15]:
tmp = extract_distance_from_gui_log(content, pattern=r"Distance\s*:\s*(\d+)")
print(type(tmp))
print(tmp.keys())

<class 'collections.defaultdict'>
dict_keys(['1'])


In [26]:
import re
import os

import pandas as pd


class LogAnalyst(object):
    def __init__(self, warmup_samples: int, analysis_samples: int) -> None:
        self._warmup_samples = warmup_samples
        self._analysis_samples = analysis_samples
        self._content = None

        self._distance_pattern = {
            'gui': r"Distance\s*:\s*(\d+)",
            'teraterm': r"Distance\[cm\]: (\d+|-)",
            'mobis': r">> RAD RESULT:( Time Out|([\d.]+))"
        }

        self._ranging_failed_flag = {
            'gui': r"65535",
            'teraterm': r"-",
            'mobis': r" Time Out"
        }

        # all ranging results, including valid and invalid
        # with unit cm
        self.distances = defaultdict(list)  
        self.analysis_results = {}  # analysis results

    def decide_log_file_type(self, log_file_path: str) -> str:
        """ guess log file type from content """
        with open(log_file_path, 'r', encoding='utf-8') as f:
            # read log file line by line and remove empty line
            self._content = [line.strip() for line in f.readlines() if line.strip()]
        
        if not self._content:  # empty log file
            return None
        
        for line in self._content:
            if "PORT" in line and "TimeStamp" in line:
                return 'gui'
            elif "Status" in line and "BlockIndex" in line:
                return 'teraterm'
            elif "RAD RESULT" in line:
                return 'mobis'
            else: continue 
        return None
    
    def extract_distance(self, log_file_type, device_info_pattern=r"PORT\s*(\d+)"):
        distance_pattern = self._distance_pattern[log_file_type]
        ranging_failed_flag = self._ranging_failed_flag[log_file_type]

        for line in self._content:
            device_id = '0'
            security_status_code = 0

            if log_file_type == 'gui':
                # extract port information for gui log
                device_id = re.search(device_info_pattern, line).group(1)
                # extract secutrity status code
                security_status_code = int(line.split(',')[9])
            
            # search distance information in current line
            match = re.search(distance_pattern, line)
            # only deal lines contains distance information
            if match:
                if not security_status_code:
                    tmp = match.group(1)
                    curr_dist = float(tmp) if tmp != ranging_failed_flag else float('inf')
                else:
                    # security code is not 0
                    # un-secured results, regarded as ranging failed
                    curr_dist = float('inf')
                
                if log_file_type == 'mobis' and not math.isinf(curr_dist):
                    # change the unit to cm
                    curr_dist = round(curr_dist * 100)
                self.distances[device_id].append(curr_dist)

        # intercept useful distances
        for device_id in self.distances.keys():
            length = len(self.distances[device_id])
            if length <= self._warmup_samples:
                self.distances[device_id] = self.distances[device_id]
            elif self._warmup_samples < length <= (self._warmup_samples + self._analysis_samples):
                self.distances[device_id] = self.distances[device_id][self._warmup_samples:]
            elif (self._warmup_samples + self._analysis_samples) < length:
                self.distances[device_id] = self.distances[device_id][self._warmup_samples:self._warmup_samples + self._analysis_samples]
            else: pass

    def analysis(self, physical_distance: float, device_id: str):
        valid_dists = self.distances[device_id]  # all ranging results used to analysis

        success_dists = [d for d in valid_dists if not math.isinf(d)]  # successful ranging results
        if not success_dists:
            raise ValueError('All ranging failed. No valied ranging results to analysis.')
            
        min_dist = min(success_dists)
        max_dist = max(success_dists)
        ave_dist = statistics.mean(success_dists)
        median_dist = statistics.median(success_dists)
        stdev = statistics.stdev(success_dists)

        if math.isinf(physical_distance):
            offset = 'None (True distance is not provided)'
        else:
            offset = (physical_distance - ave_dist)
            
        successed_cnt = len(success_dists)
        failed_cnt = len(valid_dists) - successed_cnt
        ranging_success_rate = successed_cnt / len(valid_dists)

        self.analysis_results[device_id] = {
            'min distance (cm)': min_dist,
            'max distance (cm)': max_dist,
            'average distance (cm)': round(ave_dist, 2),
            'median distance (cm)': round(median_dist, 2),
            'offset (real - ave.) (cm)': round(offset, 2),
            'std. deviation': round(stdev, 2),
            'success count': successed_cnt,
            'fail count': failed_cnt,
            'success rate': round(ranging_success_rate, 2)
        }
    
    def show_result(self, device_id):
        print(f"Ranging results from device @ port {device_id}:")
        print(f"{' Metric':<30}{' Value':<10}\n" + '-' * 41)
        for key, value in self.analysis_results[device_id].items():
            print(f" {key:<30}{value:<10}")
        print('-' * 41, end='\n\n')
    
    def save_result(self, save_file_path):
        with pd.ExcelWriter(save_file_path, engine='openpyxl') as writer:
            for device_id, results in self.analysis_results.items():
                dists = self.distances[device_id]

                max_len = max(len(dists), len(results))
                data = {
                    'Ranging result': dists + [None] * (max_len - len(dists)),
                    'Metric': list(results.keys()) + [None] * (max_len - len(results)),
                    'Value': list(results.values()) + [None] * (max_len - len(results))
                }

                df = pd.DataFrame(data)
                df.fillna('nan')
                df.to_excel(writer, sheet_name=f"device@port{device_id}", index=False)
        

In [27]:
log_file_path = r'C:\Users\lyin0\Desktop\LogManager\logs\8.0m.log'

test_loganalyst = LogAnalyst(warmup_samples=10, analysis_samples=100)
file_type = test_loganalyst.decide_log_file_type(log_file_path)
print(file_type)
test_loganalyst.extract_distance(file_type)
print(test_loganalyst.distances)


gui
defaultdict(<class 'list'>, {'1': [838.0, 836.0, 837.0, 833.0, 832.0, 840.0, 834.0, 833.0, inf, 834.0, 835.0, 829.0, 837.0, 835.0, 836.0, 837.0, 837.0, 837.0, 836.0, 836.0, 841.0, 837.0, inf, 833.0, inf, inf, 840.0, inf, 833.0, 841.0, 840.0, 837.0, 835.0, inf, 833.0, 841.0, 836.0, 835.0, 838.0, 836.0, inf, inf, 832.0, 837.0, 837.0, 838.0, inf, 836.0, 836.0, inf, 833.0, 833.0, 836.0, inf, 835.0, 839.0, 839.0, inf, 835.0, 842.0, 836.0, 838.0, inf, inf, 838.0, 837.0, inf, 818.0, inf, 838.0, 837.0, 836.0, 836.0, 836.0, inf, inf, 838.0, 838.0, 826.0, 836.0, 841.0, 834.0, inf, 840.0, inf, inf, inf, 834.0, 839.0, 839.0, 834.0, 836.0, inf, inf, 834.0, 836.0, 836.0, 833.0, 836.0, 835.0]})


In [28]:
for device_id in test_loganalyst.distances.keys():
    test_loganalyst.analysis(physical_distance=800, device_id=device_id)
    test_loganalyst.show_result(device_id=device_id)

Ranging results from device @ port 1:
 Metric                        Value    
-----------------------------------------
 min distance (cm)             818.0     
 max distance (cm)             842.0     
 average distance (cm)         835.93    
 median distance (cm)          836.0     
 offset (real - ave.) (cm)     -35.93    
 std. deviation                3.44      
 success count                 76        
 fail count                    24        
 success rate                  0.76      
-----------------------------------------



In [29]:
save_file_path = r"C:\Users\lyin0\Desktop\LogManager\analysis_results\test.xlsx"
test_loganalyst.save_result(save_file_path)

In [30]:
test = '300.cm.log'
print(test.split('.log'))

['300.cm', '']
