- Collect rules from BPMN File
- Collect rules from Discovered BPMN / Event Log
- Create Metrics
- Modify the BPMN File
- Put in Streamlit App

In [1]:
import xmltodict
import pandas as pd
import uuid

In [2]:
def get_bpmn_element_by_value(d : dict, value, path=(), k="@name"):
    found_element = ({},None)  
    for key, val in d.items():
        if key == k:
            if val == value:
                found_element = d, path
        if type(val) == list:
            for c,i in enumerate(val):
                if type(i)!=dict:
                    continue
                found_element = get_bpmn_element_by_value(i,value,path=path+(key,c,),k=k)
                if found_element != ({},None):
                    break
        elif type(val) == dict:
            found_element = get_bpmn_element_by_value(d[key],value,path=path+(key,),k=k)
        if found_element != ({},None):
            break
    return found_element

In [3]:
def get_list_of_elements_containing_key(d,k,l=[]):
    if k in d.keys():
        l = l +[d]
    else:
        for key, val in d.items():
            if type(val) == list:
                for c,i in enumerate(val):
                    if type(i)!=dict:
                        continue
                    l = get_list_of_elements_containing_key(i,k,l)
            elif type(val) == dict:
                l = get_list_of_elements_containing_key(val,k,l)
    return l

In [4]:
def filter_list_for_element(l=[],key="bpmn:extensionElements", nested_key="tilt:dataDisclosed"):
    new_l = []
    for i in l:
        present_keys = i[key].keys()
        if "@name" not in i.keys(): continue
        for j in present_keys:
            if nested_key in j:
                new_l.append(i)
                break
    return new_l

In [5]:
with open('./data/discovered_process.bpmn', 'r', encoding='utf-8') as file:
    test_xml = file.read()
with open('./data/diagram_1.bpmn', 'r', encoding='utf-8') as file:
    normative_xml = file.read()

test_bpmn = xmltodict.parse(test_xml)
normative_bpmn = xmltodict.parse(normative_xml)

In [6]:
test_tilt_elements = filter_list_for_element(
    get_list_of_elements_containing_key(test_bpmn,"bpmn:extensionElements",[]),
    key="bpmn:extensionElements",
    nested_key="tilt:dataDisclosed"
    )
normative_tilt_elements = filter_list_for_element(
    get_list_of_elements_containing_key(normative_bpmn,"bpmn:extensionElements",[]),
    key="bpmn:extensionElements",
    nested_key="tilt:dataDisclosed"
    )

In [7]:
test_data_disclosed_elements = []
for i in test_tilt_elements:
    if type(i["bpmn:extensionElements"]["tilt:dataDisclosed"]) != list:
        i["bpmn:extensionElements"]["tilt:dataDisclosed"] = [i["bpmn:extensionElements"]["tilt:dataDisclosed"]]
    for j in i["bpmn:extensionElements"]["tilt:dataDisclosed"]:
        test_data_disclosed_elements.append((i["@name"],j["@_id"],))

normative_data_disclosed_elements = []
for i in normative_tilt_elements:
    if "tilt:dataDisclosed" not in i["bpmn:extensionElements"].keys() or "@name" not in i.keys():
        continue
    if type(i["bpmn:extensionElements"]["tilt:dataDisclosed"]) != list:
        i["bpmn:extensionElements"]["tilt:dataDisclosed"] = [i["bpmn:extensionElements"]["tilt:dataDisclosed"]]
    for j in i["bpmn:extensionElements"]["tilt:dataDisclosed"]:
        normative_data_disclosed_elements.append((i["@name"],j["@_id"],))

In [8]:
def append_values_to_list(d, key, value_id):
    l = []
    if key in d.keys():
        if type(d[key]) == list:
            for i in d[key]:
                l.append(i[value_id])
        else:
            l.append(d[key][value_id])
    return l

def build_data_disclosed_element(d):
    if type(d) != dict: return
    c = d["@category"] if "@category" in d.keys() else ""
    p = append_values_to_list(d,"tilt:purposes","@purpose")
    l = append_values_to_list(d,"tilt:legalBases","@reference")
    element = {"@category": c,
               "tilt:purposes": p,
               "tilt:legalBases":l}
    return str(hash(str(element))),element,d["@_id"]

In [9]:
test_new_ids = []
for i in test_tilt_elements:
    for j in i["bpmn:extensionElements"]["tilt:dataDisclosed"]:
        test_new_ids = test_new_ids + [build_data_disclosed_element(j)]
normative_new_ids = []
for i in normative_tilt_elements:
    for j in i["bpmn:extensionElements"]["tilt:dataDisclosed"]:
        normative_new_ids = normative_new_ids + [build_data_disclosed_element(j)]

In [10]:
df_test_new = pd.DataFrame(test_new_ids,columns=["id_test","dict","old_id"]).dropna()
df_test_old = pd.DataFrame(test_data_disclosed_elements,columns=["name","old_id"]).dropna()
df_data_disclosed_test = pd.merge(df_test_new,df_test_old,how="outer",on="old_id").drop_duplicates(["id_test","name","old_id"])

In [11]:
df_normative_new = pd.DataFrame(normative_new_ids,columns=["id_norm","dict","old_id"]).dropna()
df_normative_old = pd.DataFrame(normative_data_disclosed_elements,columns=["name","old_id"]).dropna()
df_data_disclosed_normative = pd.merge(df_normative_new,df_normative_old,how="outer",on="old_id").drop_duplicates(["id_norm","name","old_id"])

In [12]:
combined_df = pd.merge(df_data_disclosed_test[["id_test","name"]],df_data_disclosed_normative[["id_norm","name"]],how="outer",left_on=["id_test","name"],right_on=["id_norm","name"])#.drop_duplicates()

Unused Elements
Not Modeled Elements

Mark not used elements in normative
add not modeled elements to normative

mark not used in test
additional activities that are not compliant

In [13]:
#All additional Processing that is not defined in the model 
faulty_processing = combined_df[pd.isna(combined_df["id_norm"])]

In [14]:
get_bpmn_element_by_value(normative_bpmn,"Send Data to User")

({}, None)

In [15]:
def get_bpmn_element_by_type(d:dict,t)->list:
    if t in d.keys():
        return[d[t]]
    found_elements = []
    for key, val in d.items():
        if type(val) == dict:
            rec = get_bpmn_element_by_type(val,t)
            if rec:
                found_elements = found_elements + rec
            continue
        if val is None:
            continue
        for v in val:
            if v == t:
                found_elements = found_elements + found_elements + d[key][t]
            elif type(v) == list:
                for i in v:
                    if type(i) == dict:
                        rec = get_bpmn_element_by_type(i,t)
                        if rec:
                            found_elements = found_elements + rec
            elif type(v)==dict:
                rec = get_bpmn_element_by_type(v,t)
                if rec:
                    found_elements = found_elements + rec
    return found_elements if len(found_elements) > 0 else None

def append_to_element_by_type(d:dict,t,key,new_element)->dict:
    found_elements = get_bpmn_element_by_type(d,t)
    found_element = found_elements[0]#[0]
    if t != "bpmndi:BPMNPlane":
        found_shapes = get_bpmn_element_by_type(d,"bpmndi:BPMNPlane")
        #found_shapes[0]["bpmndi:BPMNShape"].append({'@id': new_element["@id"]+"_di",
        #        '@bpmnElement': new_element["@id"],
        #        'dc:Bounds': {'@x': '100', '@y': '100', '@width': '100', '@height': '80'},
        #        'bpmndi:BPMNLabel': None})
        print(found_element)
    #found_element[key] = new_element
    return d

In [16]:
def modify_by_activity(activity,old_id,bpmn,other_bpmn=None):
    error_element, *n = get_bpmn_element_by_value(bpmn,activity)
    if error_element == {}:
        return bpmn
        append_to_element_by_type(bpmn,"bpmn:process","bpmn:task",{"@id":"Activity_"+activity.replace(" ","_"),"@name":activity})
        error_element, *n = get_bpmn_element_by_value(bpmn,activity)
    error_data_disclosed, *n = get_bpmn_element_by_value(error_element,old_id,k="@_id")
    element_id = error_element["@id"]
    shape, *n= get_bpmn_element_by_value(bpmn,element_id,k="@bpmnElement")
    #shape["@bioc:stroke"] ="#000000"
    shape["@bioc:fill"]="#BD523E"
    shape["@color:background-color"]="#BD523E"
    #shape["@color:border-color"]="#BD523E"
    if error_data_disclosed == {}:
        #TODO Falls das Data Disclosed Feld nicht im Diagramm existiert, dann muss es hinzugefügt werden....
        #error_data_disclosed, *n = get_bpmn_element_by_value()
        #print(activity,error_element,old_id)
        return bpmn
    if "UNCOMPLIANT:" not in error_data_disclosed["@category"]:
        error_data_disclosed["@category"] = "UNCOMPLIANT:"+error_data_disclosed["@category"]
    return bpmn

In [17]:
modified_bpmn = test_bpmn
for index,series in faulty_processing.iterrows():
    old_id = df_data_disclosed_test[df_data_disclosed_test["id_test"] == series["id_test"]]["old_id"].values[0]
    modified_bpmn = modify_by_activity(series["name"],old_id,modified_bpmn)
with open('./data/test_coloured_without_unused.bpmn', 'w', encoding='utf-8') as file:
    file.writelines(xmltodict.unparse(modified_bpmn))

In [18]:
def modify_by_unused(activity,old_id,bpmn):
    error_element, *n = get_bpmn_element_by_value(bpmn,activity)
    error_data_disclosed, *n = get_bpmn_element_by_value(error_element,old_id,k="@_id")
    if error_data_disclosed =={}:
        return bpmn
    if "UNUSED:" not in error_data_disclosed["@category"]:
        error_data_disclosed["@category"] = "UNUSED:"+error_data_disclosed["@category"]
    element_id = error_element["@id"]
    shape, *n= get_bpmn_element_by_value(bpmn,element_id,k="@bpmnElement")
    shape["@bioc:stroke"] ="EFE9CE"
    #shape["@bioc:fill"]="#8728E3"
    #shape["@color:background-color"]="#ffe0b2"
    shape["@color:border-color"]="#EFE9CE"
    return bpmn

In [19]:
modified_bpmn = normative_bpmn
unused_processing = combined_df[pd.isna(combined_df["id_test"])]
notmodeled_processing = combined_df[pd.isna(combined_df["id_norm"])]
for index,series in unused_processing.iterrows():
    old_id = df_data_disclosed_normative[df_data_disclosed_normative["id_norm"] == series["id_norm"]]["old_id"].values[0]
    modified_bpmn = modify_by_unused(series["name"],old_id,modified_bpmn)
for index,series in faulty_processing.iterrows():
    old_id = df_data_disclosed_test[df_data_disclosed_test["id_test"] == series["id_test"]]["old_id"].values[0]
    modified_bpmn = modify_by_activity(series["name"],old_id,modified_bpmn)

with open('./data/norm_coloured_without_discovered.bpmn', 'w', encoding='utf-8') as file:
    file.writelines(xmltodict.unparse(modified_bpmn))