In [28]:
%matplotlib inline
import matplotlib.pyplot as plt

from pathlib import Path

from promg.modules.db_management import DBManagement
from tabulate import tabulate
import yaml

from promg import Configuration, DatabaseConnection, Performance, SemanticHeader, DatasetDescriptions, OcedPg, Query

from IPython.core.interactiveshell import InteractiveShell

InteractiveShell.ast_node_interactivity = "all"

import pandas as pd

pd.set_option('display.width', 2000)

### Define the project that you want to do analysis on

In [29]:
case_study = 'bpic14'
# case_study = 'bpic17'
use_sample = False

In [30]:
# retrieve configuration for case_study
conf_path = Path(case_study, 'config.yaml')
config = yaml.safe_load(open(conf_path))

print(f"These are the credentials that I expect to be set for the database.")
print(f"db_name: {config["db_name"]}")
print(f"user: {config["user"]}")
print(f"uri: {config["uri"]}")
print(f"password: {config["password"]}")
print("----------------------")
print(f"If you have other credentials, please change them at: {conf_path}")

These are the credentials that I expect to be set for the database.
db_name: neo4j
user: neo4j
uri: bolt://localhost:7687
password: bpic2014
----------------------
If you have other credentials, please change them at: bpic14\config.yaml


### Prepare so we can use PromG to execute queries

In [31]:
config = Configuration.init_conf_with_config_file(conf_path)
db_connection = DatabaseConnection.set_up_connection(config=config)

# Start Analysis
Make sure the data imported using import_data.ipynb. The imported data is in the schema as defined in the paper

# Analysis Goals
We want to repeat the analysis perform in 

`Klijn, E.L., Mannhardt, F., Fahland, D. (2023). Aggregating Event Knowledge Graphs for Task Analysis. In: Montali, M., Senderovich, A., Weidlich, M. (eds) Process Mining Workshops. ICPM 2022. Lecture Notes in Business Information Processing, vol 468. Springer, Cham. https://doi.org/10.1007/978-3-031-27815-0_36`

We want to infer tasks between resources and cases, let's indicate the type of the Resource and the type of the case

In [32]:
# This needs domain knowledge. All queries below will use these types as parameters
resource_type = None
case_type = None
batch_sizes = None
if case_study == "bpic17":
    resource_type = "Resource"
    case_type = "CaseAWO"
    batch_sizes = {
        "CaseAWO": 5000,
        "Resource": 10,
    }
elif case_study == "bpic14":
    resource_type = "Resource"
    case_type = "Interaction"
    batch_sizes = {
        "Interaction": 10_000,
        "Resource": 10,
    }
object_types = [resource_type, case_type]
object_types

['Resource', 'Interaction']


First, we have to discover the DF relations for the resources and the case. For this we can use a generic, parameterized query.

In [33]:
# Discover DF for the objects in our data model

# to ensure we don't overload the database, we will run the query for each object_type separately
# furthermore, we have a small number of resources (149) with many events, and many cases (31509) with a low number of events
# to accommodate for this, we will adapt the batch size accordingly

df_instance_query_str = '''
    CALL apoc.periodic.iterate('
        MATCH (o) - [:INSTANCE_OF] -> (ot:ObjectType {objectType: $object_type})
        RETURN o, ot
    ','
        MATCH (e:Event) - [e2o_instance] -> (o) // we remove the check on the event type level for performance increase, and we have no roles so check is redundant
        WITH o.id as oId, ot.objectType as oType, e order by e.timestamp, id(e)
        WITH oId, oType, collect(e) as events
        WITH oId, oType, events
        UNWIND range(0, size(events)-2) AS i
            WITH oId, oType, events[i] AS e1, events[i+1] AS e2
            MERGE (e1) - [df:DF {objectType:oType, id:oId}] -> (e2)',
        {batchSize:$batch_size, parallel: true, params:{object_type:$object_type}})
'''

for object_type in object_types:
    print(f"Discovering df for object_type: {object_type}")

    df_instance_query = Query(
        query_str=df_instance_query_str,
        parameters={
            "object_type": object_type,
            "batch_size": batch_sizes[object_type]
        }
    )

    results = db_connection.exec_query(df_instance_query)
    print(results)  #make sure there are 0 failed batches, if there are failed batches, then you should probably increase the storage of your db


Discovering df for object_type: Resource
[{'batches': 25, 'total': 242, 'timeTaken': 7, 'committedOperations': 242, 'failedOperations': 0, 'failedBatches': 0, 'retries': 0, 'errorMessages': {}, 'batch': {'total': 25, 'errors': {}, 'committed': 25, 'failed': 0}, 'operations': {'total': 242, 'errors': {}, 'committed': 242, 'failed': 0}, 'wasTerminated': False, 'failedParams': {}, 'updateStatistics': {'relationshipsDeleted': 0, 'relationshipsCreated': 466495, 'nodesDeleted': 0, 'nodesCreated': 0, 'labelsRemoved': 0, 'labelsAdded': 0, 'propertiesSet': 932990}}]
Discovering df for object_type: Interaction
[{'batches': 15, 'total': 147173, 'timeTaken': 1, 'committedOperations': 147173, 'failedOperations': 0, 'failedBatches': 0, 'retries': 0, 'errorMessages': {}, 'batch': {'total': 15, 'errors': {}, 'committed': 15, 'failed': 0}, 'operations': {'total': 147173, 'errors': {}, 'committed': 147173, 'failed': 0}, 'wasTerminated': False, 'failedParams': {}, 'updateStatistics': {'relationshipsDelet

# Build tasks

In [34]:
# First, we identify tasks

identify_tasks = '''
CALL apoc.periodic.iterate(
    "MATCH (e1:Event)-[r_df:DF {objectType: $resource_type}]->(e2:Event)
     WHERE (e1)-[:DF {objectType: $case_type}]->(e2)
     AND date(e1.timestamp) = date(e2.timestamp) // ensure to carry task instances over days
     RETURN e1,e2",
     "WITH e1,e2
     MERGE (e1)-[:DF_JOINT]->(e2)",
     {batchSize:$batch_size, parallel: true, params: {resource_type:$resource_type, case_type:$case_type}})
'''

identify_tasks_query = Query(
    query_str=identify_tasks,
    parameters={
        "resource_type": resource_type,
        "case_type": case_type
    }
)

db_connection.exec_query(identify_tasks_query)


[{'batches': 23,
  'total': 228144,
  'timeTaken': 5,
  'committedOperations': 228144,
  'failedOperations': 0,
  'failedBatches': 0,
  'retries': 0,
  'errorMessages': {},
  'batch': {'total': 23, 'errors': {}, 'committed': 23, 'failed': 0},
  'operations': {'total': 228144,
   'errors': {},
   'committed': 228144,
   'failed': 0},
  'wasTerminated': False,
  'failedParams': {},
  'updateStatistics': {'relationshipsDeleted': 0,
   'relationshipsCreated': 228144,
   'nodesDeleted': 0,
   'nodesCreated': 0,
   'labelsRemoved': 0,
   'labelsAdded': 0,
   'propertiesSet': 0}}]

In [35]:
# we create task instances and also their type
create_task_instances = '''
    CALL apoc.periodic.iterate(
    "
        CALL {
            // find complete path between e1 and e2 (full task instance) consisting of at least two events
            MATCH (e1:Event)-[df_joint:DF_JOINT]->()
                WHERE NOT ()-[:DF_JOINT]->(e1) // e1 is starting event as it has no preceding event
                        
            MATCH ()-[:DF_JOINT]->(e2:Event) 
                WHERE NOT (e2)-[:DF_JOINT]->() // e2 is ending event as it has no succeeding event
            
            MATCH p=(e1)-[:DF_JOINT*]->(e2) // find path between e1 and e2
            RETURN p, e1, e2
                      
        UNION
            // find single task instances consisting of one event
            MATCH (et:EventType) <- [:INSTANCE_OF] - (e:Event)
            WHERE (e)-[]->()-[:INSTANCE_OF] -> (:ObjectType {objectType:$resource_type}) <- [] - (et) //check e is performed by resource type
                AND NOT ()-[:DF_JOINT]->(e) // e has no preceding event
                AND NOT (e)-[:DF_JOINT]->() // e has no succeeding event
            MATCH p=(e) 
            RETURN p, e AS e1, e AS e2
        }
        RETURN 
            [event IN nodes(p) | head([(event)-[:INSTANCE_OF]->(et:EventType) | et.eventType])] AS variant,
            nodes(p) AS events, 
            e1.timestamp AS start_time, e1.id as start_id, 
            e2.timestamp AS end_time, e2.id as end_id
    ",
    "
        WITH variant, events, start_time, start_id, end_time, end_id
            MERGE (etStart: EventType {eventType: 'START' + variant  , agg: 'Task'})
            MERGE (etEnd: EventType {eventType: 'END' + variant , agg: 'Task'})
            MERGE (etStart) - [:BELONGS_TO] -> (etEnd)
            MERGE (tiStart:Event {timestamp: start_time, id:'Ti-Start-'+start_id+'-'+end_id}) - [:INSTANCE_OF] -> (etStart)
            MERGE (tiEnd:Event {timestamp: end_time, id:'TI-End-'+start_id+'-'+end_id}) - [:INSTANCE_OF] -> (etEnd)
            MERGE (tiStart) - [:BELONGS_TO] -> (tiEnd)
        WITH tiStart, tiEnd, etStart, etEnd, events
        UNWIND events AS e
            MATCH (e) - [:INSTANCE_OF]->(et)
            CREATE (tiStart) -[:CONTAINS]-> (e)
            CREATE (tiEnd) -[:CONTAINS]-> (e)
            
            MERGE (etStart) - [:CONTAINS] -> (et)
            MERGE (etEnd) - [:CONTAINS] -> (et)
    ",
    {batchSize:$batch_size, params:{resource_type:$resource_type, case_type:$case_type}})
'''

create_task_instances_query = Query(
    query_str=create_task_instances,
    parameters={
        "resource_type": resource_type,
        "case_type": case_type
    }
)

db_connection.exec_query(create_task_instances_query)

[{'batches': 24,
  'total': 238593,
  'timeTaken': 39,
  'committedOperations': 238593,
  'failedOperations': 0,
  'failedBatches': 0,
  'retries': 0,
  'errorMessages': {},
  'batch': {'total': 24, 'errors': {}, 'committed': 24, 'failed': 0},
  'operations': {'total': 238593,
   'errors': {},
   'committed': 238593,
   'failed': 0},
  'wasTerminated': False,
  'failedParams': {},
  'updateStatistics': {'relationshipsDeleted': 0,
   'relationshipsCreated': 1722985,
   'nodesDeleted': 0,
   'nodesCreated': 493542,
   'labelsRemoved': 0,
   'labelsAdded': 493542,
   'propertiesSet': 987084}}]

In [36]:
add_count_to_event_types = '''
    MATCH (e:Event) - [:INSTANCE_OF]->(et:EventType)
    WITH et, count(e) as count
    SET et.count = count
'''

db_connection.exec_query(add_count_to_event_types)

[]

In [37]:
# now we can remove the synchronization dfs we just created
delete_sync_df = '''
CALL apoc.periodic.iterate(
    "MATCH (:Event)-[df:DF_JOINT]->(:Event)
     RETURN df",
     "DELETE df",
     {batchSize:$batch_size, parallel: True})
'''

db_connection.exec_query(delete_sync_df)

[{'batches': 23,
  'total': 228144,
  'timeTaken': 0,
  'committedOperations': 228144,
  'failedOperations': 0,
  'failedBatches': 0,
  'retries': 0,
  'errorMessages': {},
  'batch': {'total': 23, 'errors': {}, 'committed': 23, 'failed': 0},
  'operations': {'total': 228144,
   'errors': {},
   'committed': 228144,
   'failed': 0},
  'wasTerminated': False,
  'failedParams': {},
  'updateStatistics': {'relationshipsDeleted': 228144,
   'relationshipsCreated': 0,
   'nodesDeleted': 0,
   'nodesCreated': 0,
   'labelsRemoved': 0,
   'labelsAdded': 0,
   'propertiesSet': 0}}]

In [38]:
# now we have to ensure that the task instances also observe the corresponding objects
# also ensure this is lifted to the type level

# takes roughly 2 minutes on bpic14

observe_ti_to_objects = '''
 CALL apoc.periodic.iterate(
        " MATCH (ti:Event) - [:INSTANCE_OF] -> (ti_et:EventType {agg:'Task'}) 
          MATCH (ti)-[:CONTAINS]->(e:Event)-[e2o]-> (o) - [:INSTANCE_OF] -> (ot:ObjectType)
          WHERE ot.objectType IN [$resource_type, $case_type]
          RETURN DISTINCT ti, ti_et, o, ot, type(e2o) as e2o_type",
        "WITH ti, ti_et, o, ot, e2o_type
            CALL apoc.create.relationship(ti, e2o_type, {}, o)
            YIELD rel as new_e2o
            RETURN new_e2o
            ",
        {batchSize:$batch_size, params: {resource_type:$resource_type, case_type:$case_type}})
'''

observe_ti_to_objects_query = Query(
    query_str=observe_ti_to_objects,
    parameters={
        "resource_type": resource_type,
        "case_type": case_type
    }
)

db_connection.exec_query(observe_ti_to_objects_query)


[{'batches': 96,
  'total': 954372,
  'timeTaken': 20,
  'committedOperations': 954372,
  'failedOperations': 0,
  'failedBatches': 0,
  'retries': 0,
  'errorMessages': {},
  'batch': {'total': 96, 'errors': {}, 'committed': 96, 'failed': 0},
  'operations': {'total': 954372,
   'errors': {},
   'committed': 954372,
   'failed': 0},
  'wasTerminated': False,
  'failedParams': {},
  'updateStatistics': {'relationshipsDeleted': 0,
   'relationshipsCreated': 0,
   'nodesDeleted': 0,
   'nodesCreated': 0,
   'labelsRemoved': 0,
   'labelsAdded': 0,
   'propertiesSet': 0}}]

In [39]:
convert_belongs_to_to_df = '''
    MATCH (ti1:Event) - [:INSTANCE_OF] -> (ti_et1:EventType {agg:'Task'})
    MATCH (ti2:Event) - [:INSTANCE_OF] -> (ti_et2:EventType {agg:'Task'}) 
    MATCH (ti1) - [:BELONGS_TO] -> (ti2)
    MATCH (ti_et1) - [:BELONGS_TO] -> (ti_et2)
    MATCH (ti1)-[:CONTAINS]->(e1:Event)-[e2e:DF]->(e2:Event)<-[:CONTAINS]-(ti2)
    WHERE e2e.objectType IN [$resource_type, $case_type] 
     
    MERGE (ti1) - [new_e2e:DF {objectType: e2e.objectType, id: e2e.id, agg:'Task'}] -> (ti2)
'''

convert_belongs_to_to_df_query = Query(
    query_str=convert_belongs_to_to_df,
    parameters={
        "resource_type": resource_type,
        "case_type": case_type
    }
)

result = db_connection.exec_query(convert_belongs_to_to_df_query)
print(tabulate(result))




In [40]:
lift_df_to_task_instances = '''
    CALL apoc.periodic.iterate('
        MATCH (ti1:Event) - [:INSTANCE_OF] -> (ti_et1:EventType {agg:"Task"}) 
        MATCH (ti2:Event) - [:INSTANCE_OF] -> (ti_et2:EventType {agg:"Task"}) 
        WHERE ti1 <> ti2 
            AND (ti_et1) <- [:BELONGS_TO] - (:EventType) //ti_et1 is a end of the task instance
            AND (ti_et2) - [:BELONGS_TO] -> (:EventType) //ti_et2 is a start of the task instance
        MATCH (ti1)-[:CONTAINS]->(:Event)-[e2e:DF]->(:Event)<-[:CONTAINS]-(ti2)
        WHERE e2e.objectType IN [$resource_type, $case_type] 
        RETURN ti1, e2e, ti2
    ','
        MERGE (ti1) - [new_e2e:DF {objectType: e2e.objectType, id: e2e.id, agg:"Task"}] -> (ti2)
    ',{batchSize:$batch_size, params: {resource_type:$resource_type, case_type:$case_type}})
    
'''

lift_df_to_task_instances_query = Query(
    query_str=lift_df_to_task_instances,
    parameters={
        "resource_type": resource_type,
        "case_type": case_type,
        "batch_size": 5000
    }
)

result = db_connection.exec_query(lift_df_to_task_instances_query)
result

[{'batches': 174,
  'total': 865537,
  'timeTaken': 17,
  'committedOperations': 865537,
  'failedOperations': 0,
  'failedBatches': 0,
  'retries': 0,
  'errorMessages': {},
  'batch': {'total': 174, 'errors': {}, 'committed': 174, 'failed': 0},
  'operations': {'total': 865537,
   'errors': {},
   'committed': 865537,
   'failed': 0},
  'wasTerminated': False,
  'failedParams': {},
  'updateStatistics': {'relationshipsDeleted': 0,
   'relationshipsCreated': 685233,
   'nodesDeleted': 0,
   'nodesCreated': 0,
   'labelsRemoved': 0,
   'labelsAdded': 0,
   'propertiesSet': 2055699}}]

In [41]:
# Query to derive a Multi-Entity DF-Graph by aggregating instance-level DF relationships at the event type level.
df_aggregation_query_str = '''\
    CALL apoc.periodic.iterate('
        // find all consecutive event types for specific object types
        MATCH (e1:Event) - [e2e:DF] -> (e2:Event)
        MATCH (e1) - [:INSTANCE_OF] -> (et1:EventType {agg: "Task"})
        MATCH (e2) - [:INSTANCE_OF] -> (et2:EventType {agg: "Task"})
    
        WITH e2e.objectType as oType, et1, et2, count(e2e) as df_freq // count for each oType, how often we have observed DF between events that are an instance of et1 and et2
        WHERE df_freq > $df_threshold
        RETURN oType, et1, et2
    ','
        WITH oType, et1, et2
        MERGE (et1) - [:DF {objectType:oType, agg:"Task"}] -> (et2)
    ', 
    {batchSize:$batch_size, params:{df_threshold:$df_threshold}})
'''

df_aggregation_query = Query(
    query_str=df_aggregation_query_str,
    parameters={
        "df_threshold": 0
    }
)

results = db_connection.exec_query(df_aggregation_query)
print(tabulate(results))

--  ------  -  ------  -  -  -  --  ---------------------------------------------------------  -----------------------------------------------------------------  -----  --  ----------------------------------------------------------------------------------------------------------------------------------------------------------------
13  122067  3  122067  0  0  0  {}  {'total': 13, 'errors': {}, 'committed': 13, 'failed': 0}  {'total': 122067, 'errors': {}, 'committed': 122067, 'failed': 0}  False  {}  {'relationshipsDeleted': 0, 'relationshipsCreated': 122067, 'nodesDeleted': 0, 'nodesCreated': 0, 'labelsRemoved': 0, 'labelsAdded': 0, 'propertiesSet': 244134}
--  ------  -  ------  -  -  -  --  ---------------------------------------------------------  -----------------------------------------------------------------  -----  --  ---------------------------------------------------------------------------------------------------------------------------------------------------------------

In [42]:
count_tasks = '''
    MATCH (ti_et:EventType {agg:'Task'}) - [:BELONGS_TO] -> (ti_end)
    RETURN count(ti_et) as count
'''

count_ti = '''
    MATCH (ti_et:EventType {agg:'Task'}) <- [:INSTANCE_OF] - (ti_start:Event) - [:BELONGS_TO] -> (ti_end)
    RETURN count(ti_start) as count
'''

num_tasks = db_connection.exec_query(count_tasks)
num_tasks_instances = db_connection.exec_query(count_ti)

print(f"In total there are {num_tasks[0]['count']} tasks with {num_tasks_instances[0]['count']} instances.")

In total there are 8178 tasks with 238593 instances.
