In [51]:
from dotenv import load_dotenv
import os
import numpy as np
import html


In [52]:
def prepare_xml(data):
    start_tag = "<diagram"
    end_tag = "</diagram>"
    start_index = data.find(start_tag)
    end_index = data.find(end_tag) + len(end_tag)
    xml_data = data[start_index:end_index]
    return xml_data

# Extract data from XML
def get_xml_cell_data(mx_cell):
    vertex = mx_cell.get("vertex", '')
    edge = mx_cell.get("edge", '')
    id = mx_cell.get("id", '')
    parent = html.unescape(mx_cell.get("parent", ''))
    source = html.unescape(mx_cell.get("source", ''))
    target = html.unescape(mx_cell.get("target", ''))
    value = html.unescape(mx_cell.get("value", '')).replace(" ", " ")
    styles = html.unescape(mx_cell.get("style", '')).split(";")
    return vertex, edge, id, parent, source, target, value, styles

# compile block
def abstract_block_compiler(shape, id, value):
    return {
        'mode': shape,
        'ins': [],
        'outs': [],
        'id': id,
        'value': value,
        'msg': ''
    }

def search_in_list(list, keyword):
    for item in list:
        if keyword in item:
            return item
    return False


In [53]:
# compile all shapes
def compile_shapes(root):
    blocks = {}
    edges = {}
    edges_msg = {}
    the_start_block_id = None
    for mx_cell in root.findall(".//mxCell"):
        vertex, edge, id, parent, source, target, value, styles = get_xml_cell_data(mx_cell)
        # data = {}

        # edgelabel_check = search_in_list(styles, "edgeLabel")
        # edgestyle_check = search_in_list(styles, "edgeStyle")
        shape_check = search_in_list(styles, "shape")
        style_check = search_in_list(styles, "absoluteArcSize")

        # It's shape
        if shape_check != False or style_check != False:

            shape = shape_check if shape_check != False else style_check
            # data
            if shape == 'shape=parallelogram': shape = 'input'
            # display
            elif shape == "shape=mxgraph.flowchart.display": shape = 'display'
            # start
            elif shape == "shape=mxgraph.flowchart.start_2":
                shape = 'start'
                the_start_block_id = id
            # stored_data
            elif shape == "shape=mxgraph.flowchart.stored_data": shape = 'stored_data'
            # decision
            elif shape == "shape=mxgraph.flowchart.decision": shape = 'decision'
            # process
            elif shape == "absoluteArcSize=1": shape = 'process'

            blocks[id] = abstract_block_compiler(shape, id, value)

        # It's edge or sth
        else:
            msg = search_in_list(styles, "edgeLabel")
            if msg != False: edges_msg[id] = {'parent':parent, 'id':id, 'value':value}
            elif edge == '1': edges[id] = {'id':id, 'source':source, 'target':target, 'value':''}
            
        continue

    # fuse edges with their messages
    for msg in edges_msg:
        target_edge_id = edges_msg[msg]['parent']
        edge_msg = edges_msg[msg]['value']
        edges[target_edge_id]['value'] = edge_msg

    # fuse blocks with their edges
    for edge in edges:
        this_edge = edges[edge]
        the_source_block = blocks[this_edge['source']]
        the_target_block = blocks[this_edge['target']]

        # Handle Decision Edges (outs)
        if the_source_block["mode"] == "decision": the_source_block['outs'].append(the_target_block)
        
        # For All (ins)
        the_target_block['ins'].append(this_edge)

    return blocks, the_start_block_id

In [67]:
# follow the tail from the starting block
def generate_flow(init_block_id, blocks):
    block_flow = []
    block_flow.append(blocks[init_block_id])
    for i in blocks:
        for block in blocks:
            this_block = blocks[block]
            last_block_flow = block_flow[-1]
            sources_of_ins = np.array([thein['source'] for thein in this_block['ins']])
            is_ther_any_match = np.where(sources_of_ins == last_block_flow['id'])[0]
            if is_ther_any_match.size > 0: block_flow.append(this_block)
    return block_flow

def detect_loop(init_block_id, blocks):
    the_flow = generate_flow(init_block_id, blocks)
    the_flow.pop(0)
    loop_found = False
    for block in the_flow:
        false_option = block.get('false', False)
        if init_block_id == block['id']: loop_found = True
        elif false_option != False and false_option['id'] == init_block_id: loop_found = True
    return loop_found


# Syntaxes

In [55]:
def input_block(suffix, block):
    lines = ''
    lines += suffix + block['value']
    lines += " = "
    lines += "input('"+block['msg']+"')"
    lines += "\n"
    return lines

def process_block(suffix, block):
    lines = ''
    lines += suffix+block['value']
    lines += "\n"
    return lines

def display_block(suffix, block):
    lines = ''
    lines += suffix+"print("
    lines += block['value']
    lines += ")\n"
    return lines

def stored_data_block(suffix, block):
    lines = ''
    lines += suffix+block['msg']
    lines += " = np.array(["
    for item in block['value'].split(','): lines += '"'+item.strip()+'", '
    lines += "])\n"
    return lines

def decision_block(suffix, block, dealing_decisions):
    global blocks
    lines = ''
    dealing_decisions.append(block['id'])
    # loop_check = detect_loop(block['false']['id'], blocks)
    # if loop_check == False:
    #     lines += suffix+"if "
    #     lines += block['value']
    #     lines += ":\n"
    # else:
    #     lines += suffix+"while ("
    #     lines += block['value']
    #     lines += "):\n"
    # suffix+="\t"
    return lines, dealing_decisions

In [56]:
def compile_block(flow, dealing_decisions, suffix=''):
    lines = ''
    for block in flow:
        if block['mode'] == 'input': newlines = input_block(suffix, block)
        elif block['mode'] == 'process': newlines = process_block(suffix, block)
        elif block['mode'] == 'display': newlines = display_block(suffix, block)
        elif block['mode'] == 'stored_data': newlines = stored_data_block(suffix, block)
        elif block['mode'] == 'decision': newlines, dealing_decisions = decision_block(suffix, block, dealing_decisions)
    lines += newlines
    return lines, dealing_decisions

In [59]:
def flow_compiler(blocks, the_start_block_id):
    lines = ''
    lines += blocks[the_start_block_id]['value']
    lines += "\n"
            
    dealing_decisions = []
    
    newlines, dealing_decisions = compile_block(
        generate_flow(the_start_block_id, blocks),
        dealing_decisions
    )
    lines += newlines

    # for this_else in reversed(dealing_decisions):
    #     thesuffix = ""
    #     for i in range(len(dealing_decisions)): thesuffix += "\t"
    #     lines += "else:\n"
    #     dealing_decisions.pop()
    #     newlines, dealing_decisions = compile_block(
    #         generate_flow(blocks[this_else]['false']['id'], blocks),
    #         dealing_decisions,
    #         suffix=thesuffix,
    #     )
    #     lines += newlines
    return lines

In [68]:
import xml.etree.ElementTree as ET
# Load variables from .env file into the environment
load_dotenv()

def convert():
    print("convert")
    # 1. Read the Draw.io file
    with open(os.getenv("SRC_FILE"), "r") as file:
        drawio_data = file.read()

    # 2. Extract the XML data
    xml_data = prepare_xml(drawio_data)
    
    if len(os.getenv("DEST_XML_FILE")) >= 0:
        with open(os.getenv("DEST_XML_FILE"), "w") as file: file.writelines(xml_data)
    
    # 3. Process the XML data
    root = ET.fromstring(xml_data)
    blocks, the_start_block_id = compile_shapes(root)
    lines = flow_compiler(blocks, the_start_block_id)
    
    with open(os.getenv("DEST_FILE"), "w") as file:
        file.writelines(lines)

convert()

convert


  is_ther_any_match = np.where(sources_of_ins == last_block_flow['id'])[0]
  is_ther_any_match = np.where(sources_of_ins == last_block_flow['id'])[0]
  is_ther_any_match = np.where(sources_of_ins == last_block_flow['id'])[0]
  is_ther_any_match = np.where(sources_of_ins == last_block_flow['id'])[0]


In [None]:
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

# Define the function to be executed when a change occurs
def on_file_change():
    print("File changed!")

# Define the event handler class
class FileChangeHandler(FileSystemEventHandler):
    def on_modified(self, event):
        # Call the function when a modification event occurs
        convert()

# Create an observer and attach the event handler
observer = Observer()
observer.schedule(FileChangeHandler(), path=os.getenv("SRC_FILE"), recursive=False)

# Start the observer
observer.start()

try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    # Stop the observer if interrupted
    observer.stop()

# Wait until the observer thread completes its execution
observer.join()