In [1]:
#hier werden die relevanten Infos aus dem XML gezogen --> fertig
def get_default_namespace(line):
    line_list = line.split()
    for y in line_list:
        if y.find("xmlns=")> -1:
            pos = (y.find('"')) + 1
            namespace = (y[pos:-1])
            return namespace

def get_flow_info(root, ns):
    #Liste von dicts {ID, SourceRef, TargetRef}
    flow_info = []
    for flow in root.iter(ns+"sequenceFlow"):
        flow_id = flow.attrib["id"]
        flow_source = flow.attrib["sourceRef"]
        flow_target = flow.attrib["targetRef"]
        flow_info.append({"id":flow_id, "source":flow_source, "target": flow_target})
    return flow_info

def get_start_event_info(root, ns):
    start_event_id = ""
    outgoing_id = "" #Frage: kann es mehrere outgoing geben?
    for child in root.iter(ns+"startEvent"):
        start_event_id = child.attrib["id"]
        outgoing_id = child.find(ns+"outgoing").text
        break
    #gibt ein dict mit start_event_id und outgoing_id zurück
    return {"id": start_event_id, "outgoing_id": outgoing_id}

def get_end_event_info(root, ns):
    end_event_id = ""
    incoming_ids = [] #Achtung es kann mehrere incomings geben
    for child in root.iter(ns+"endEvent"):
        end_event_id = child.attrib["id"]
        for incoming in child.iter(ns+"incoming"):
            incoming_ids.append(incoming.text)
        break
    #gibt ein dict mit start event id und Liste von outgoing ids zurück
    return {"id": end_event_id, "incoming_ids": incoming_ids}


def get_task_info(root, ns, task_types):
    #Liste von dicts: {ID, Name, Incoming, Outgoing}
    tasks = []
    
    for task_type in task_types:
        for task in root.iter(ns+task_type):
            #zu jedem Task alle incomings und outcomings finden
            incomings = []
            for incoming in task.iter(ns+"incoming"):
                incomings.append(incoming.text)
        
            outgoings = []
            for outgoing in task.iter(ns+"outgoing"):
                outgoings.append(outgoing.text)
            
            current_task = {"id":task.attrib["id"], "name":task.attrib["name"], "incoming":incomings[0], "outgoing":outgoings[0] }
            tasks.append(current_task)
    return tasks


def get_parallel_info(root, ns):
    #Liste von dicts {id, name, direction, Liste von Incomings, Liste von outgoings}
    parallels = []
    for parallel in root.iter(ns+"parallelGateway"):
        #zu jedem and alle incomings und outgoings finden:
        incomings = []
        for incoming in parallel.iter(ns+"incoming"):
            incomings.append(incoming.text)
        
        outgoings = []
        for outgoing in parallel.iter(ns+"outgoing"):
            outgoings.append(outgoing.text)
        
        current_parallel = {"id": parallel.attrib["id"], "name":parallel.attrib["name"], "direction": parallel.attrib["gatewayDirection"], "incomings": incomings, "outgoings": outgoings}
        parallels.append(current_parallel)
    return parallels
    
    
def get_xor_info(root, ns):
    #Liste von Tupeln (id, name, direction, Liste von Incomings, Liste von outgoings)
    xors = []
    for xor in root.iter(ns+"exclusiveGateway"):
        #zu jedem xor alle incomings und outgoings finden:
        incomings = []
        for incoming in xor.iter(ns+"incoming"):
            incomings.append(incoming.text)
        
        outgoings = []
        for outgoing in xor.iter(ns+"outgoing"):
            outgoings.append(outgoing.text)
        
        current_xor = {"id": parallel.attrib["id"], "name":parallel.attrib["name"], "direction": parallel.attrib["gatewayDirection"], "incomings": incomings, "outgoings": outgoings}
        xors.append(current_xor)
    return xors


def get_task_types(root, ns):
    #gibt liste mit allen task types zurück, zb [task, manualTask, serviceTask]
    
    #alle direkten Kinder des process Tags, also Tasks, Flows, Gateways etc, also alles inetresante
    process_tag = root.find(ns+"process")
    direct_children = []
    for subtag in process_tag:
        splits = subtag.tag.split("}")
        direct_children.append(splits[1])
    
    #jetzt alle rausfiltern die den substring task haben
    tasks = []
    for x in direct_children:
        if "task" in x or "Task" in x:
            tasks.append(x)
    
    #duplikate entfernen:
    tasks = list(dict.fromkeys(tasks))
    return tasks
    

In [2]:
#hier wird das tpn zusammengebaut
def get_places_for_text(trans_info):
    #trans_info ist eine Liste von Dicts
    places_set = set()
    for trans in trans_info:
        s1 = trans["ingoings"]
        places_set.update(s1)
        s2 = trans["outgoings"]
        places_set.update(trans["outgoings"])
    x = list(places_set)
    x.sort()
    return x

def get_text_places(places):
    res = ""
    i = 0
    for place in places:
        if i == 0:
            res += 'place "' + place + '" init 1;\n'
        else:
            res += 'place "' + place + '";\n'
        i += 1
    return res


def get_text_transitions(trans_info):
    res = ""
    x = '"'
    for trans in trans_info:
        res += 'trans "' + trans["number"] + '"~"' + trans["name"] + '"\n'
        res += '  in '
        for ingoing in trans["ingoings"]:
            res += x + ingoing + x +', '
        res = res [:-2]
        res += '\n'
        
        res += '  out '
        for outgoing in trans['outgoings']:
            res += x+ outgoing + x + ', '
        res = res [:-2]
        res += ';\n\n'
    return res
        

def build_tpn(trans_info):
    places = get_places_for_text(trans_info)
    text_places = get_text_places(places)
    
    text_transitions = get_text_transitions(trans_info)
    
    return text_places + "\n" + text_transitions

In [3]:
def get_flow_id_to_number_mapping(flow_info):
    #Idee: jeder flow soll ja ein place werden, und jeder place baucht irgendwie eine nummer
    #flow info schaut so aus: Liste von Tupeln (ID, SourceRef, TargetRef) --> jeder flow id ist einzigartig
    mapping = {} #dict mit flow_ids als keys und numbers als values
    i = 0
    for flow in flow_info:
        mapping[flow["id"]] = i
        i += 1
    return mapping

In [4]:
current_xml = "read_book.bpmn"
#current_xml = "read_book_xor.bpmn"
#current_xml = "read_book_and.bpmn"
#current_xml = "BPMN_V2.bpmn"

import xml.etree.ElementTree as ET

start_event_info = {} #gibt ein dict mit start_event_id und outgoing_id zurück
end_event_info = {} #gibt ein dict mit start event id und Liste von outgoing ids zurück
task_info = [] #Liste von Tupeln: (ID, Name, Liste von Incoming, Liste von Outgoing)
flow_info = [] #Liste von Tupeln (ID, SourceRef, TargetRef)
parallel_info = [] #Liste von Tupeln (id, name, direction, Liste von Incomings, Liste von outgoings)
xor_info = [] #Liste von Tupeln (id, name, direction, Liste von Incomings, Liste von outgoings)

start_event_id = ""
end_event_id = ""
task_ids = []
flow_ids = []
div_parallel_ids = []
conv_parallel_ids = []
div_xor_ids = []
conv_xor_ids = []

with open(current_xml, "r") as f:
    ns = get_default_namespace(f.readline())
    ns = "{" + ns + "}"
    tree = ET.parse(current_xml)
    root = tree.getroot()
    
    #infos aus bpmn xml ziehen
    start_event_info = get_start_event_info(root, ns)
    end_event_info = get_end_event_info(root, ns)
    task_info = get_task_info(root, ns, get_task_types(root, ns)) #Liste von Tupeln
    flow_info = get_flow_info(root, ns) #Liste von Tupeln
    parallel_info = get_parallel_info(root, ns)
    xor_info = get_xor_info(root, ns)
    #XML ist abgeschlossen

#ID Listen befüllen    
end_event_id = end_event_info["id"]
start_event_id = start_event_info["id"]
for task in task_info:
    task_ids.append(task["id"])
for flow in flow_info:
    flow_ids.append(flow["id"])
    
for parallel in parallel_info:
    if parallel["direction"] == "Diverging":
        div_parallel_ids.append(parallel["id"])
    else:
        conv_parallel_ids.append(parallel["id"])
for xor in xor_info:
    xor_ids.append(xor["id"])

place_counter = 0

In [5]:
#Freitag: hier weitermachen
def get_flow(flow_id):
    for flow in flow_info:
        if flow["id"] == flow_id:
            return flow
        
def get_type(elem_id):
    if elem_id == start_event_id:
        return "start"
    if elem_id == end_event_id:
        return "end"
    if elem_id in task_ids:
        return "task"
    if elem_id in conv_parallel_ids:
        return "conv_parallel"
    if elem_id in div_parallel_ids:
        return "div_parallel"
    if elem_id in xor_ids:
        return "xor"
    
def get_vorg_task(vorg_id):
    for task in task_info:
        if task["id"] == vorg_id:
            return task

        
def get_vorg_trans(vorg_task, net):
    for trans in net:
        if trans["transname"] == vorg_task["name"]:
            return trans
        
def get_ingoing_places(task, net):
    global place_counter
    ingoing_places = set()
    
    ing_flow_id = task["incoming"]
    ing_flow = get_flow(ing_flow_id)
    vorg_id = ing_flow["source"]
    vorg_type = get_type(vorg_id)
    
    if vorg_type == "start":
        ingoing_places.add("P"+ str(place_counter))
        place_counter += 1
    
    if vorg_type == "task":
        vorg_task = get_vorg_task(vorg_id)
        vorg_trans = get_vorg_trans(vorg_task, net)
        in_places = vorg_trans["outgoing_places"]
        ingoing_places.update(in_places)
    
    if vorg_type == "div_parallel":
        
    
    return ingoing_places

def get_outgoing_places(task):
    global place_counter
    outgoing_places = set()
    
    outg_flow_id = task["outgoing"]
    outg_flow = get_flow(outg_flow_id)
    nachf_id = outg_flow["target"]
    nachf_type = get_type(nachf_id)
    
    if nachf_type == "end" or nachf_type == "task":
        outgoing_places.add("P"+ str(place_counter))
        place_counter += 1
    
    return outgoing_places
     

In [6]:
#testen der og funktionne
#print(task_info[2])
#my_task = task_info[2]
#x = get_ingoing_places(my_task)
#print("cccc")
#print(x)

In [7]:
def make_petri(task_info, flow_info, task_ids):
    #liste von Dictionaries machen; ingoing und outgoing sind jeweis  Sets, keine Listen!!!!
    net = [] #[{"transnum", "transname", ingoing_places, outgoing_places}, ...]

    trans_counter = 0
    for task in task_info:
        trans_counter += 1
        
        transnum = trans_counter
        transname = task["name"]
        ingoing_places = get_ingoing_places(task, net)
        outgoing_places = get_outgoing_places(task)
        curr_net_element = {"transnum": transnum, "transname":transname, "ingoing_places": ingoing_places, "outgoing_places": outgoing_places}

        net.append(curr_net_element)
    
    return net

In [8]:
#so soll der output der make_petri Funktion ausschauen
t1 = {"number": "T1", "name":"open Book", "ingoings": {"P0"}, "outgoings": {"P1"}}
t2 = {"number": "T2", "name":"read Book", "ingoings": {"P1"}, "outgoings": {"P2"}}
t3 = {"number": "T3", "name":"close Book", "ingoings": {"P2"}, "outgoings": {"P3"}}
trans_info_test = [t1, t2, t3]
#print(build_tpn(trans_info_test))

In [9]:
x = make_petri(task_info, flow_info, task_ids)
print("finale Datenstruktur Anfnag")
for y in x:
    print(y)
print("finale Datenstruktur Ende")

finale Datenstruktur Anfnag
{'transnum': 1, 'transname': 'open book', 'ingoing_places': {'P0'}, 'outgoing_places': {'P1'}}
{'transnum': 2, 'transname': 'read book', 'ingoing_places': {'P1'}, 'outgoing_places': {'P2'}}
{'transnum': 3, 'transname': 'close book', 'ingoing_places': {'P2'}, 'outgoing_places': {'P3'}}
finale Datenstruktur Ende
