In [1]:
import dask.dataframe as dd
from dask.dataframe import from_pandas
from dask.dataframe.utils import make_meta
from neo4j import GraphDatabase
from neo4j.exceptions import ClientError
from dask.distributed import Client, LocalCluster, get_worker
import dask

import os
import time
import timeit
from tqdm import tqdm
import pandas as pd
import re
import gc
import numpy as np
import dill

# Miners
from pm4py import serialize, deserialize
from pm4py import discover_dfg as dfg_discovery
from pm4py.discovery import DFG

from pm4py.algo.discovery.alpha import algorithm as alpha_miner
from pm4py.algo.discovery.heuristics import algorithm as heuristics_miner
from pm4py import discover_petri_net_inductive as inductive_miner


# Evaluators
from contribution import fitness_alignment, precision_alignment, generalization
from pm4py.algo.evaluation.simplicity import algorithm as simplicity_evaluator #simplicity
from pm4py.objects.petri_net.utils.check_soundness import check_easy_soundness_net_in_fin_marking

In [2]:
# dask.config.set({'distributed.scheduler.active-memory-manager.start': True, 'distributed.comm.timeouts.tcp': '7200s'})
dask.config.set({"distributed.serializer": "dill"})
# dask.config.set(scheduler='processes')

<dask.config.set at 0xffffa476dd20>

In [3]:
import sys
sys.setrecursionlimit(30000)

In [4]:
import ctypes

def trim_memory() -> int:
    libc = ctypes.CDLL("libc.so.6")
    return libc.malloc_trim(0)

In [5]:
class graph_driver():
    def __init__(self, uri_scheme='bolt', host='localhost', port='7687', username='neo4j', password='123456'):
        self.uri_scheme = uri_scheme
        self.host = host
        self.port = port
        
        self.username = username
        self.password = password
        
        self.connection_uri = "{uri_scheme}://{host}:{port}".format(uri_scheme=self.uri_scheme, host=self.host, port=self.port)
        self.auth = (self.username, self.password)
        self.driver = GraphDatabase.driver(self.connection_uri, auth=self.auth)
        
    def __del__(self):
        self._close_driver()
    
    def _close_driver(self):
        if self.driver:
            self.driver.close()
    
    def run_single_query(self, query):
        res = None
        with self.driver.session() as session:
            raw_res = session.run(query)
            res = self.format_raw_res(raw_res)
        return res
    
    def run_bulk_query(self, query_list):
        results = []
        with self.driver.session() as session:
            for query in tqdm(query_list):
                raw_res = session.run(query)
                res = self.format_raw_res(raw_res)
                results.append({'query':query, 'result':res})
        return results
    
    def reset_graph(self, db=None):
        return self.run_single_query("MATCH (n) DETACH DELETE n")
    
    def test_connection(self):
        return self.run_single_query("MATCH (n) RETURN COUNT(n) as nodes")
    
    @staticmethod
    def format_raw_res(raw_res):
        res = []
        for r in raw_res:
            res.append(r)
        return res

In [6]:
def useExecutionTime(func):
    
    def compute(*args, **kwargs):
        begin = time.time()
        
        result = func(*args, **kwargs)
        
        end = time.time()
        
        return {"result": result, "execution_time": end - begin}
 
    return compute

@useExecutionTime
def getComputeTime(*args, **kwargs):
    return dask.compute(*args, **kwargs)

In [7]:
# cluster = LocalCluster(n_workers=1, threads_per_worker=1, memory_limit=None)

In [8]:
# client = Client(cluster)
# client

In [9]:
def run_gc(dask_worker,**kwargs):
    gc.collect()
    return True

# Register the GC function as a plugin
# client.register_worker_plugin(run_gc, "my_gc_plugin")
# client.register_worker_plugin(trim_memory, "my_trim_plugin")

In [10]:
# cluster.adapt(minimum=1, maximum=8)

In [11]:
columnTypes = {
#     'case:IDofConceptCase': 'string',
#     'case:Includes_subCases': 'string',
#     'case:Responsible_actor': 'string',
#     'case:caseProcedure': 'string',
#     'case:concept:name': 'int64',
#     'dueDate': 'object',
#     'case:termName': 'string',
#     'dateStop': 'object',
#     'case:endDate': 'object',
#     'case:endDatePlanned': 'object',
#     'case:parts': 'object',
#     'msgCode': 'string',
#     'msgType': 'string',
#     'case:landRegisterID': 'object'
}

# list of file paths to be loaded

file_paths = ['BPI_2020_InternationalDeclarations']

# load the first file as a Dask dataframe
df = dd.read_csv('{}.csv'.format(file_paths[0]), dtype=columnTypes, encoding="ISO-8859-1")

# iterate over the remaining files
for file_path in file_paths[1:]:
    # usecols parameter to load only the columns that are present in both dataframes
    df_temp = dd.read_csv('{}.csv'.format(file_path), dtype=columnTypes)
    # concatenate the dataframes along the rows
    df = dd.concat([df, df_temp], interleave_partitions=True)

# columnTypes = {
#     'OfferID': 'string'
# }


# BPI 2014 sep=';'
# df = df.rename(columns={"Incident ID": "case:concept:name", "IncidentActivity_Type": "concept:name", "DateStamp": "time:timestamp"})

# df = df.rename(columns={"case concept:name": "case:concept:name", "event concept:name": "concept:name", "event time:timestamp": "time:timestamp"})
for column in df.columns:
    if re.search("[Dd]ate.*|time.*", column):
        df[column] = dask.dataframe.to_datetime(df[column], utc=True)

df['case:concept:name'] = df['case:concept:name'].replace(to_replace="[a-zA-Z]", value='', regex=True)
df['case:concept:name'] = df['case:concept:name'].astype('int')
        
df = df.repartition(npartitions=1)

In [12]:
def transformToDFG(dfgResult):
    result = {}
    for record in dfgResult:
        result[(record["parent"], record["child"])] = record["frequency"]
    
    return result

def transformToStartEndActivity(activities):
    result = {}
    for record in activities:
        result[record['name']] = record["frequency"]
        
    return result

In [13]:
def getDFG():
    queries = {
        "dfgQuery": """MATCH result=(p:Activity)-[r:PRODUCES]->(c:Activity) RETURN p.name as parent, c.name as child, r.frequency as frequency""",
        "startEndActivitiesQuery": ["MATCH (a:StartActivity) RETURN a.name as name , a.frequency as frequency", "MATCH (a:EndActivity) RETURN a.name as name , a.frequency as frequency"],
    }
    
    neo4jConnection = graph_driver(uri_scheme="neo4j",host="neo4j", password="123456")
    
    dfgResult = neo4jConnection.run_single_query(queries['dfgQuery'])
    startEndActivitiesResult = neo4jConnection.run_bulk_query(queries['startEndActivitiesQuery'])
    return [transformToDFG(dfgResult), transformToStartEndActivity(startEndActivitiesResult[0]["result"]), transformToStartEndActivity(startEndActivitiesResult[1]["result"])]
    

In [14]:
indexed_df = df.set_index('case:concept:name', drop=False, sorted=True)
indexed_df['case:concept:name'] = indexed_df['case:concept:name'].astype({'case:concept:name': 'string'})

In [15]:
del df

In [16]:
indexed_df.index = indexed_df.index.rename('caseId')
indexed_df = indexed_df.repartition(npartitions=6)

In [17]:
dfg, start, end = getDFG()
dfgObj = DFG(dfg, start_activities=start, end_activities=end)

100% 2/2 [00:00<00:00, 271.29it/s]


In [18]:
@useExecutionTime
def getMinerResult(dfg, miner):
    result = {}
    if miner == 'heuristic_miner':
        net, im, fm = heuristics_miner.apply_dfg(dfg['dfg'])
    elif miner == 'inductive_miner':
        net, im, fm = inductive_miner(dfg['dfgObj'])
    elif miner == 'alpha_miner':
        net, im, fm = alpha_miner.apply_dfg(dfg['dfg'])
    
    result[miner] = serialize(net, im, fm)
    
    
    
    return result
    
def setLazyMiners(dfg):
    lazyList = []
    miners = [
        'heuristic_miner',
        'inductive_miner',
        'alpha_miner'
    ]
    for miner in miners:
        task = dask.delayed(getMinerResult)(dfg, miner)
        lazyList.append(task)
    
    return lazyList

def reformatMinersResults(lazyMinersResults):
    minersResults = {}
    for result in lazyMinersResults:
        miner = list(result['result'].keys())[0]
        minersResults[miner] = {}
        net, im, fm = deserialize(result['result'][miner])
        minersResults[miner] = {
            'net': net,
            'im': im,
            'fm': fm,
            'execution_time': result['execution_time']
        }
    return minersResults

In [19]:
@useExecutionTime
def getMetrics(log, miner, metric, net, im, fm):
    sys.setrecursionlimit(3000)
    try:
        result = {}
        result.setdefault(miner, {})
        result[miner].setdefault(metric, 0)
        if metric == 'fitness':
            result[miner][metric] = fitness_alignment.apply(log, net, im, fm)
        elif metric == 'simplicity':
            result[miner][metric] = simplicity_evaluator.apply(net)
        elif metric == 'precision':
            result[miner][metric] = precision_alignment.apply(log, net, im, fm)
        elif metric == 'generalization':
            result[miner][metric] = generalization.apply(log, net, im, fm)

        return result
    except Exception as e:
        return {miner: {metric: {"error": e}}}

def setLazyMetrics(log, miners):
    lazyList = []
    metrics = [
#         'fitness',
#         'simplicity',
        'precision',
#         'generalization'
    ]
    
    for metric in metrics:
        for miner in miners.keys():
            
            net, im, fm = [miners[miner]['net'], miners[miner]['im'], miners[miner]['fm']]
            task = getMetrics(log, miner, metric, net, im, fm)
            lazyList.append(task)
    
    return lazyList

In [20]:
lazyMiners = setLazyMiners({"dfgObj": dfgObj, "dfg": dfg})

In [21]:
lazyMinersResults = reformatMinersResults(dask.compute(*lazyMiners, scheduler='processes'))

In [22]:
@dask.delayed
def aggregate(partitions):
    import sys
    sys.setrecursionlimit(10**5)
    result = {}
    
    for partition in partitions:
        for output in partition:
            miner = list(output['result'].keys())[0]
            metric = list(output['result'][miner].keys())[0]
            e_time = output['execution_time']
            
            result.setdefault(miner, {})
            result[miner].setdefault(metric, {})
            result[miner][metric].setdefault('result', None)
            result[miner][metric].setdefault('execution_time', 0)
            
            result[miner][metric]['execution_time'] = e_time
            
            if result[miner][metric]['result'] == None:
                result[miner][metric]['result'] = output['result'][miner][metric]
                continue
    
            if metric and metric == 'fitness':
                result[miner][metric]['result'] = fitness_alignment.aggregate(output['result'][miner][metric], result[miner][metric]['result'])
            elif metric and metric == 'precision':
                result[miner][metric]['result'] = precision_alignment.aggregate(output['result'][miner][metric], result[miner][metric]['result'])
            if metric and metric == 'generalization':
                result[miner][metric]['result'] = generalization.aggregate([output['result'][miner][metric], result[miner][metric]['result']])

                
    return result

In [23]:
@dask.delayed
def compute_metrics(aggregatedMetrics, minersResults):
    
    import sys
    sys.setrecursionlimit(10**5)
    
    results = {}    
    for miner, metrics in aggregatedMetrics.items():
        net, im, fm = [minersResults[miner]['net'], minersResults[miner]['im'], minersResults[miner]['fm']]
        for metricKey, metricValue in metrics.items():
            results.setdefault(miner, {})
            results[miner].setdefault(metricKey, {})
            results[miner][metricKey].setdefault('result', None)
            results[miner][metricKey].setdefault('execution_time', 0)
            
            results[miner][metricKey]['execution_time'] = metricValue['execution_time']
            start_time = timeit.default_timer()
            if not check_easy_soundness_net_in_fin_marking(net, im, fm) and metricKey == 'precision':
                results[miner][metricKey]['result'] = 'NA'
                continue
            if metricKey and metricKey == 'fitness':
                results[miner][metricKey]['result'] = fitness_alignment.compute(metricValue['result'], net=net, im=im, fm=fm)
            elif metricKey and metricKey == 'precision':
                results[miner][metricKey]['result'] = precision_alignment.compute(**metricValue['result'], net=net, im=im, fm=fm)
            elif metricKey and metricKey == 'generalization':
                results[miner][metricKey]['result'] = generalization.compute(metricValue['result']['trans_occ_map'], minersResults[miner]['net'])
            elif metricKey and metricKey == 'simplicity':
                results[miner][metricKey]['result'] = simplicity_evaluator.apply(net)
            end_time = timeit.default_timer()
            results[miner][metricKey]['execution_time'] = sum([(end_time - start_time), metricValue['execution_time']])
    return results

In [24]:
mapped_data = indexed_df.map_partitions(setLazyMetrics, lazyMinersResults, meta=[])

In [25]:
aggregated_results = aggregate(mapped_data)

In [26]:
r = compute_metrics(aggregated_results, lazyMinersResults)

In [27]:
results = dask.compute(r, scheduler='processes')

To preserve the previous behavior, use

	>>> .groupby(..., group_keys=False)


	>>> .groupby(..., group_keys=True)
  traces = list(log.groupby(case_id_key)[activity_key].apply(tuple))


[########################################] | 100% Completed | 9.61 ss


In [28]:
savedResult = pd.DataFrame(results)

In [29]:
pd.set_option('display.max_colwidth', None)

In [30]:
savedResult

Unnamed: 0,heuristic_miner,inductive_miner,alpha_miner
0,"{'precision': {'result': 'NA', 'execution_time': 0.0012431144714355469}}","{'precision': {'result': 0.10602897118456533, 'execution_time': 9.917851378209889}}","{'precision': {'result': 'NA', 'execution_time': 0.000667572021484375}}"


In [31]:
def getStatisticalDataFrames(minersResults):

    metricsExecutionTimePerMiner = {}
            
    for miner in minersResults.keys():
        execution_time = minersResults[miner]['execution_time']
        metricsExecutionTimePerMiner.setdefault(miner, execution_time)
        metricsExecutionTimePerMiner[miner] = execution_time
            
    metricsExecutionTimePerMiner['data_set'] = '-'.join(file_paths)
            
    return pd.DataFrame(metricsExecutionTimePerMiner, index=['execution_time'])

In [32]:
miner_execution_time = getStatisticalDataFrames(lazyMinersResults)

In [33]:
miner_execution_time

Unnamed: 0,heuristic_miner,inductive_miner,alpha_miner,data_set
execution_time,0.034365,0.026889,0.015153,BPI_2020_InternationalDeclarations


In [34]:
# savedResult.to_csv('./results/3 - distributed setup/{}_results.csv'.format('-'.join(file_paths)))
# miner_execution_time.to_csv('./results/3 - distributed setup/{}_miner_execution_time.csv'.format('-'.join(file_paths)))