# Libs

In [None]:
from xml.dom.minidom import parse
import xml.etree.ElementTree as ET
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline

# Read File

In [None]:
def read_prog(rootNode):

    # generate record(list)
    rec = []
    for progs in rootNode.getElementsByTagName('progs'):
        for prog_idx, program in enumerate(progs.getElementsByTagName('prog')):
            cycle_len = program.getAttribute("cycletime")
            for sig_group in program.getElementsByTagName('sg'):
                sig_group_id = sig_group.getAttribute("sg_id")
                gr = sig_group.getElementsByTagName('cmds')[0].getElementsByTagName('cmd')
                for color in gr:
                    c = color.getAttribute('display')
                    t = color.getAttribute('begin')
                    rec.append([prog_idx + 1, cycle_len, sig_group_id, c, t])
    
    # turn into pd.DataFrame format
    prog = pd.DataFrame(rec, columns=['progId', 'cycleLen',
                                      'sigGroup', 'display', 'beginTime'])
    
    # post-process
    for col in prog.columns:
        prog[col] = pd.to_numeric(prog[col])
    
    prog['cycleLen'] /= 1000
    prog['beginTime'] /= 1000
    prog = prog.astype(int)
    prog['display'] = prog['display'].replace({3:'green', 1:'red'})
    
    return prog

In [None]:
def viz_prog(filepath, progId=1, amber=3, ar=2):
    
    # read raw file
    domTree = parse(filepath)
    rootNode = domTree.documentElement
    
    # read programs from raw file
    prog = read_prog(rootNode)
    
    # param init
    ig = amber + ar # inter-green
    fig = plt.figure(figsize=(7,5))
    cycle = prog[prog.progId == progId]['cycleLen'].unique()[0]
    do = 0.018 * cycle # display offset
    
    # main loop (visualize a signal group every time)
    for idx, sg_id in enumerate(prog['sigGroup'].unique()):
        sig_group = prog[prog.sigGroup == sg_id]
        g_begin = sig_group[sig_group.display == 'green']['beginTime'].iloc[0]
        g_end = sig_group[sig_group.display == 'red']['beginTime'].iloc[0]
        
        # print("[Info] signal group {:d}: from {:d} to {:d}".format(sg_id, g_begin, g_end))
        if g_end > g_begin:
            plt.plot(range(0, g_begin + 1), np.tile(idx / 2, g_begin + 1), 'r', linewidth = 1)
            plt.plot(range(g_end - ar, cycle + 1), np.tile(idx / 2, cycle - g_end + ar + 1), 'r', linewidth = 1)
            plt.plot(range(g_begin, g_end - ig + 1), np.tile(idx / 2, g_end - g_begin - ig + 1), 'g', linewidth = 3)
            plt.plot(range(g_end - ig, g_end - ar + 1), np.tile(idx / 2, amber + 1), 'gold', linewidth = 3)
            plt.text(g_begin - do, idx / 2 + 0.1, str(g_begin))
            plt.text(g_end - ar - do, idx / 2 + 0.1, str(g_end - ar))
        elif g_end == 0:
            plt.plot(range(0, g_begin + 1), np.tile(idx / 2, g_begin + 1), 'r', linewidth = 1)
            plt.plot(range(cycle - ar, cycle + 1), np.tile(idx / 2, ar + 1), 'r', linewidth = 1)
            plt.plot(range(g_begin, cycle - ar + 1), np.tile(idx / 2, cycle - g_begin - ar + 1), 'g', linewidth = 3)
            plt.plot(range(cycle - ig, cycle - ar + 1), np.tile(idx / 2, amber + 1), 'gold', linewidth = 3)
            plt.text(g_begin - do, idx / 2 + 0.1, str(g_begin))
            plt.text(cycle - ar - do, idx / 2 + 0.1, str(cycle - ar))

    plt.xlabel('Cycle Time (s)')
    plt.ylabel('Signal Group (#)')
    plt.xlim([-0.05 * cycle, 1.05 * cycle])
    plt.ylim([-0.5,idx / 2 + 0.5])
    plt.yticks(np.arange(0, 4, 0.5), np.arange(1,9))
    plt.title("Cycle Length: {:d}s [Amber: {:d}s, All-Red: {:d}s]".format(cycle, amber, ar))
    
    return fig

In [None]:
# filepath
filepath = '../data/晋陵北路-河海路.sig'
domTree = parse(filepath)
rootNode = domTree.documentElement

# read programs from raw file
progs = read_prog(rootNode)

# visualize a program
fig = viz_prog(filepath, progId=1)
# plt.savefig('program.png',dpi=500)

In [None]:
def get_target_prog(sc, progId):
    return sc.find('progs').getchildren()[progId - 1]


def update_offset(sc, progId, new_offset):
    target_prog = get_target_prog(sc, progId)
    cycle_len = target_prog.attrib['cycletime']
    assert new_offset < int(cycle_len) / 1000
    target_prog.attrib['offset'] = str(int(new_offset * 1000))
    xmldoc.write(filepath, xml_declaration=True)
    print("[Info] New offset for program {:d}: {:d}".format(progId, new_offset))


def update_green(sc, progId, new_green, viz=False):
    
    target_prog = get_target_prog(sc, progId)
    cycle_len = target_prog.attrib['cycletime']
    assert max(max(new_green)) <= int(cycle_len) / 1000
    
    # main loop (update a signal group every time)
    prog_sgs = target_prog.find('sgs').getchildren()
    for sg_idx, sg in enumerate(prog_sgs):
        for color in sg.find('cmds'):
            if color.attrib['display'] == '3':
                color.attrib['begin'] = str(int(1000 * new_green[0][sg_idx]))
            elif color.attrib['display'] == '1':
                color.attrib['begin'] = str(int(1000 * new_green[1][sg_idx]))
    xmldoc.write(filepath, xml_declaration=True)
    
    if viz:
        viz_prog(filepath, progId=progId)


# def update_cycle(sc, progId, new_cycle, new_green, viz=False):

In [None]:
xmldoc = ET.parse(filepath)
sc = xmldoc.getroot()
num_sgs = len(sc.find('sgs').getchildren())
new_green = [[24,24,0,0,79,79,60,60], # g_begin
             [60,60,24,24,0,0,79,79]] # g_end


update_offset(sc, 1, new_offset=0)
update_green(sc, 1, new_green, viz=True)