This notebook can process VLOG data to a dataset to be used by DIRECTOR based on a director configuration file.

In [1]:
import os
import numpy as np
import csv
import json
import gzip
import shutil

from vlog_blocks import Tijd_referentie

In [2]:
#functions used:
def load_file(filepath):
    with open(filepath, 'r') as fp:
        content = fp.read()
    return content.split('\n')

def get_delta_time(timeHexString):
        """
        Returns the delta time in deciseconds based on the provided vlog hexadecimal string.

        :param str timeHexString: hexadecimal string containing delta time in vlog format
        """
        if (len(timeHexString) != 4):
            print("Got invalid timeHexString.")
            return None
        else:
            tenths_of_seconds = int(timeHexString[0:3], 16)

            return tenths_of_seconds
        
def process_detection(message,detector_messages,time_basis):
    #process detection
    delta_time = get_delta_time(message[2:6])
    try:
        number_of_detectors = int(message[6:8], 16)
        for i in range(number_of_detectors):

                state = int(message[8 + i], 16)
                time = time_basis+delta_time
                if i not in detector_messages.keys():
                    detector_messages[i]={"times":[],"states":[]}
                if state<2:
                    detector_messages[i]['times'].append(time)
                    detector_messages[i]['states'].append(state)
    except:
        print('detection init error')
        pass
    return detector_messages

def update_detection(message,detector_messages,time_basis):
    try:
        delta_time=get_delta_time(message[2:5]+"0")
        number_of_detectors = int(("0" + message[5]), 16)

        for i in range(number_of_detectors):
            detector_index = int(message[6 + (i*2):8 + (i*2)], 16)
            state = int(message[8 + (i*2):10 + (i*2)], 16)
            if state<2:
                time = time_basis+delta_time
                detector_messages[detector_index]['times'].append(time)
                detector_messages[detector_index]['states'].append(state)
    except:
        print('detection update error')
        pass
    return detector_messages
#process wus:
def process_wus(message,wus_info,time_basis):
    #signal states:
        signaal_groep_status_type = {
        0: 'R',
        1: 'G',
        2: 'Y',
        3: 'WitKnipperen',
        4: 'Gedoofd',
        5: 'GeelKnipperen'}
        # init WUS (signalgroups)
        delta_time = get_delta_time(message[2:6])
        # Determine number of signal groups
        number_of_signal_groups = int(message[6:8], 16)
        # Read signal group states
        for i in range(number_of_signal_groups):
            status_code = int(message[8 + i], 16)
            time = time_basis+delta_time
            if status_code<3:
                status = signaal_groep_status_type[status_code]
                if i not in wus_info.keys():
                    wus_info[i]={'states':[],"times":[]}
                wus_info[i]['states'].append(status)
                wus_info[i]['times'].append(time)
                
        return wus_info   
def update_wus(message,wus_info,time_basis):
    #signal states:
        signaal_groep_status_type = {
        0: 'R',
        1: 'G',
        2: 'Y',
        3: 'WitKnipperen',
        4: 'Gedoofd',
        5: 'GeelKnipperen'}
        # Determine delta time
        delta_time = get_delta_time(message[2:5] + "0")
        try:
            # Determine number of signal groups
            number_of_signal_groups = int(("0" + message[5]), 16)

            # Read signal group states

            for i in range(number_of_signal_groups):
                    signal_group_index = int(message[6 + (i * 4):8 + (i * 4)], 16)
                    status_code = int(message[8 + (i * 4):10 + (i * 4):], 16)
                    if status_code<3:
                        status = signaal_groep_status_type[status_code]
                        time = time_basis+delta_time
                        wus_info[signal_group_index]['states'].append(status)
                        wus_info[signal_group_index]['times'].append(time)
        except:
            print('wus update error')
            pass
        return wus_info
def process_messages(vlog_messages):
    """function to process an array of vlog messages"""
    #set start time to be 14-09-2019
    starttijd = "2018-09-14 00:00:00"
    detector_messages={}
    wus_info={}
    for message in vlog_messages:
        message_id = message[:2]
        if message_id =="01":
            if message[:4]=="0120": #time messages start with ID 01 and start of year prefix 20..(18/19 etc)
                time_processor = Tijd_referentie()
                time_processor.process_data(message)
                time_stamp = time_processor.tijd
                if type(time_stamp)!=None:
                    delta = np.datetime64(time_stamp) - np.datetime64(starttijd)
                    delta = delta / np.timedelta64(1, 's')
                #always relate time basis to starting time and most recent time_stamp
                    time_basis = int(delta)*10
                else:
                    print('manual timeupgrade')
                    time_basis+=3000 #very wacky way; no way to be sure this is what happened.
        elif message_id =="05":
            detector_messages = process_detection(message,detector_messages,time_basis)
        elif message_id =="06":
            detector_messages = update_detection(message,detector_messages,time_basis)

        elif (message_id == "0D"):
            wus_info = process_wus(message,wus_info,time_basis)
        elif (message_id == "0E"):
            wus_info = update_wus(message,wus_info,time_basis)
        else:
            #unimportant message
            pass
    return detector_messages, wus_info

def write_txt_file(info_dict,intersection,sensor_id, sensor_type):
    """function to write a text file"""
    if sensor_type=='signal_groups':
        name = "NH_{}.SG{}".format(intersection, sensor_id)
    else:
        name = "NH_{}.{}".format(intersection, str(sensor_id).rjust(3,'0'))
    if not os.path.exists('./output/{}/{}'.format(intersection,sensor_type)):
        os.makedirs('./output/{}/{}'.format(intersection,sensor_type))
    with open('./output/{}/{}/{}.txt'.format(intersection,sensor_type,name), 'w', newline='') as csvfile:
        spamwriter = csv.writer(csvfile, delimiter='\t')
        spamwriter.writerow(['starttijd',name])
        for i in range(len(info_dict['times'])):
            spamwriter.writerow([info_dict['times'][i],info_dict['states'][i]])
        csvfile.close()
    #save also as .gz files:
    with open('./output/{}/{}/{}.txt'.format(intersection,sensor_type,name), 'rb') as f_in:
        with gzip.open("./output/{}/{}/{}.txt.gz".format(intersection,sensor_type,name),'wb') as f_out:
                        shutil.copyfileobj(f_in,f_out)
            
def process_detection_dict(detections,intersection):
    """process the detection output from process_messages()"""

    sensor_types={
        "arrival_detectors":{},
        "pedestrians_cyclists":{},
        "queue_detectors":{},
        "stopline_detectors":{}
    }
    #define approach:
    nw=['201231','201225','201212']
    n=['205195','205191','205250']
    se=['201239','201245','201249','201291','201297','201302','201308','201311']
    if intersection in nw:
        approach ="0"
    elif intersection in n:
        approach ="1"
    elif intersection in se:
        approach ="2"
        
    #sensor_info_path = r"X://drive_tom//Msc Datascience//Year 2//Thesis//Siemens//experimenten//NH_201234//PNH_Tom//"
    sensor_info_path = r"D://uni_drive//Msc Datascience//Year 2//Thesis//Siemens//experimenten//NH_201234//PNH_Tom//"
    sensor_file=open(sensor_info_path+intersection+'.cfg','r').read().split('\n')
    #director_config_path = r"C:\Users\Siemens.MBR-WS-W10\Downloads\Director\Configuration\201234\config.json"
    director_config_path= r"D:\uni_drive\Msc Datascience\Year 2\Thesis\Siemens\experimenten\NH_201234\Outlier_Analysis\Framework\FPD-LOF\director_configs.json"
    director_configs =json.load(open(director_config_path, 'rb'))
    if 'preceding_intersections' in director_configs['approaches'][approach]:
        if intersection in [x['id'] for x in director_configs['approaches'][approach]['preceding_intersections']]:
            for itsc in director_configs['approaches'][approach]['preceding_intersections']:
                if itsc['id'] == intersection:
                    try:
                        stopline = itsc['feeding_stopline_detectors']
                    except:
                        print('problem with stopline detectors {}'.format(intersection))
                        print(itsc)
                    try:
                        queue = itsc['feeding_queue_detectors']
                    except:
                        print('problem with queue detectors {}'.format(intersection))
                        print(itsc)
                    try:
                        arrivals= itsc['feeding_arrival_detectors']
                    except:
                        print('problem with arrival detectors {}'.format(intersection))
                        print(itsc)
    else:
        print("not in preceding intersections")
    for line in sensor_file:
        if line.startswith('DP'):
            line =line.split(',')
            if line[2].strip('""') in stopline:
                sensor_types['stopline_detectors'][line[2].strip('""')]=line[1]
            if line[2].strip('""') in queue:
                sensor_types['queue_detectors'][line[2].strip('""')]=line[1]
            if line[2].strip('""') in arrivals:
                sensor_types['arrival_detectors'][line[2].strip('""')]=line[1]
    print("Sensor type dict:")
    print(sensor_types)
    for stopline_detector in sensor_types['stopline_detectors'].keys():
        write_txt_file(detections[int(sensor_types['stopline_detectors'][stopline_detector])],intersection, stopline_detector,'stopline_detectors')
        
    for arrival_detector in sensor_types['arrival_detectors'].keys():
        write_txt_file(detections[int(sensor_types['arrival_detectors'][arrival_detector])],intersection, arrival_detector,'arrival_detectors')
        
    for queue_detector in sensor_types['queue_detectors'].keys():
        write_txt_file(detections[int(sensor_types['queue_detectors'][queue_detector])],intersection, queue_detector,'queue_detectors')
        
    return #nothing for now

def process_wus_dict(wus,intersection):

    #define approach:
    nw=['201231','201225','201212']
    n=['205195','205191','205250']
    se=['201239','201245','201249','201291','201297','201302','201308','201311']
    if intersection in nw:
        approach ="0"
    elif intersection in n:
        approach ="1"
    elif intersection in se:
        approach ="2"
    #LOAD SENSOR DICT AND CONFIGS:
    #sensor_info_path = r"X://drive_tom//Msc Datascience//Year 2//Thesis//Siemens//experimenten//NH_201234//PNH_Tom//"
    sensor_info_path = r"D://uni_drive//Msc Datascience//Year 2//Thesis//Siemens//experimenten//NH_201234//PNH_Tom//"
    sensor_file=open(sensor_info_path+intersection+'.cfg','r').read().split('\n')
    #director_config_path = r"C:\Users\Siemens.MBR-WS-W10\Downloads\Director\Configuration\201234\config.json"
    director_config_path= r"D:\uni_drive\Msc Datascience\Year 2\Thesis\Siemens\experimenten\NH_201234\Outlier_Analysis\Framework\FPD-LOF\director_configs.json"
    director_configs =json.load(open(director_config_path, 'rb'))
    
    signal_groups=[]
    
    #SELECT SIGNAL GROUPS:
    if 'preceding_intersections' in director_configs['approaches'][approach]:
        if intersection in [x['id'] for x in director_configs['approaches'][approach]['preceding_intersections']]:
            for itsc in director_configs['approaches'][approach]['preceding_intersections']:
                if itsc['id'] == intersection:
                    signal_groups += itsc['feeding_signal_groups']
    else:
        print("not in preceding intersections")
    signals={}
    for line in sensor_file:
        if line.startswith('FC'):
            line =line.split(',')
            if line[2].strip('""') in signal_groups:
                signals[line[2].strip('""')]=line[1]
    print("Signal_group dict:")
    print(signals)
    for signal_group in signals.keys():
        write_txt_file(wus[int(signals[signal_group])],intersection, signal_group,'signal_groups')
    
    return #return nothing for now
    
def process_201234(detections,wus):
    intersection="201234"
    #LOAD SENSOR DICT AND CONFIGS:
    sensor_info_path = r"X://drive_tom//Msc Datascience//Year 2//Thesis//Siemens//experimenten//NH_201234//PNH_Tom//"
    #sensor_info_path = r"D://uni_drive//Msc Datascience//Year 2//Thesis//Siemens//experimenten//NH_201234//PNH_Tom//"
    sensor_file=open(sensor_info_path+intersection+'.cfg','r').read().split('\n')
    #director_config_path = r"C:\Users\Siemens.MBR-WS-W10\Downloads\Director\Configuration\201234\config.json"
    director_config_path= r"X:\drive_tom\Msc Datascience\Year 2\Thesis\Siemens\experimenten\NH_201234\Outlier_Analysis\Framework\FPD-LOF\director_configs.json"
    director_configs =json.load(open(director_config_path, 'rb'))
    #define sensor types as dict and sensors to add as arrays:
    sensor_types={
        "arrival_detectors":{},
        "pedestrians_cyclists":{},
        "queue_detectors":{},
        "stopline_detectors":{},
        "pedestrians_cyclists":{}
    }
    stopline=[]
    arrivals=[]
    queue=[]
    pedestrians=[]
    #init signal groups to be added:
    signal_groups=[]
    signals={}
    #grab sensornames from config file
    for approach in director_configs['signal_groups'].keys():

        a=approach.rjust(2,'0')
        signal_groups.append(a)
        #if director_configs['signal_groups'][approach]['traffic_type'] =='vehicle':
            #stopline+=director_configs['signal_groups'][approach]['stopline_detectors']
            #queue+=director_configs['signal_groups'][approach]['queue_detectors']
            #arrivals+=director_configs['signal_groups'][approach]['arrival_detectors']
        #else: #bikelane or crosswalk
            #pedestrians+=director_configs['signal_groups'][approach]['stopline_detectors']
            #pedestrians+=director_configs['signal_groups'][approach]['queue_detectors']
            #pedestrians+=director_configs['signal_groups'][approach]['arrival_detectors']
            #pedestrians+=director_configs['signal_groups'][approach]['push_button']
    #run through sensor file to get sensor indices:
    #print(signal_groups)
    
    #edit to add all sensors
    for line in sensor_file:
        if line.startswith('DP'):
            line =line.split(',')
            if len(line[2].strip('""'))==3:
                if (int(line[3])==257) & (int(line[2].strip('""'))<200):
                    sensor_types['stopline_detectors'][(line[2]).strip('""')]=line[1]
                if (int(line[3])==513) & (int(line[2].strip('""'))<200):
                    sensor_types['queue_detectors'][(line[2]).strip('""')]=line[1]
                if (int(line[3])==1025) & (int(line[2].strip('""'))<200):
                    sensor_types['arrival_detectors'][(line[2]).strip('""')]=line[1]
                if line[2].strip('""') in ['233','234','243','244','331','341']:
                    sensor_types['pedestrians_cyclists'][(line[2]).strip('""')]=line[1]
        elif line.startswith('FC'):
            line =line.split(',')
            if line[2].strip('""') in signal_groups:
                signals[(line[2]).strip('""')]=line[1]
    print(signals)    
    for signal_group in signals.keys():
        write_txt_file(wus[int(signals[signal_group])],intersection, signal_group,'signal_groups')
            
    for stopline_detector in sensor_types['stopline_detectors'].keys():
        write_txt_file(detections[int(sensor_types['stopline_detectors'][stopline_detector])],intersection, stopline_detector,'stopline_detectors')
        
    for arrival_detector in sensor_types['arrival_detectors'].keys():
        write_txt_file(detections[int(sensor_types['arrival_detectors'][arrival_detector])],intersection, arrival_detector,'arrival_detectors')
        
    for queue_detector in sensor_types['queue_detectors'].keys():
        write_txt_file(detections[int(sensor_types['queue_detectors'][queue_detector])],intersection, queue_detector,'queue_detectors')
    
    for pedestrians in sensor_types['pedestrians_cyclists'].keys():
        write_txt_file(detections[int(sensor_types['pedestrians_cyclists'][pedestrians])],intersection, pedestrians,'pedestrians_cyclists')
    return

In [3]:
def process_intersection(intersection):
    vlog_messages=[]
    for file in os.listdir(r"./input/Data Boris 16-09-2020 deel 1/"+intersection):
        if file.endswith(".vlg"):
            vlog_messages += load_file(r"./input/Data Boris 16-09-2020 deel 1/{}/{}".format(intersection,file))
    detectors, wus = process_messages(vlog_messages)
    del vlog_messages
    if intersection =='201234':
        process_201234(detectors,wus)
    else:
        #try:
            process_detection_dict(detectors,intersection)
            process_wus_dict(wus,intersection)
        #except:
        #    return detectors,wus
            del wus
            del detectors
    return 0,0

In [4]:
#intersections= ['201234','201225','205191','201239','201249','201291','201308','201225','201231']
#intersections=['201234','201225','205191','201239',
intersections=['201234']

In [5]:
for intersection in intersections:
    print(intersection)
    process_intersection(intersection)

201234
Got invalid timeHexString.
detection update error
{}
