In [13]:
import pm4py 
import pm4py.stats
import graphviz #visulaization

#correlation
import pandas as pd 
from pm4py.objects.log.util import dataframe_utils
from pm4py.algo.discovery.correlation_mining import algorithm as correlation_miner


#process discovery
from pm4py.algo.discovery.alpha import algorithm as alpha_miner  
from pm4py.algo.discovery.inductive import algorithm as inductive_miner
from pm4py.algo.discovery.heuristics import algorithm as heuristics_miner
from pm4py.algo.discovery.dfg import algorithm as dfg_discovery
from pm4py.visualization.petri_net import visualizer as pn_visualizer
from pm4py.visualization.heuristics_net import visualizer as hn_visualizer
from pm4py.visualization.dfg import visualizer as dfg_visualization

#process tree to petri net
from pm4py.objects.conversion.process_tree import converter as pt_converter
from pm4py.objects.petri_net.exporter import exporter as pnml_exporter
from pm4py.objects.petri_net import semantics
from pm4py.objects.petri_net.utils import reachability_graph
from pm4py.visualization.transition_system import visualizer as ts_visualizer


#Conformance Checking
from pm4py.algo.conformance.tokenreplay import algorithm as token_replay
from pm4py.algo.filtering.log.auto_filter.auto_filter import apply_auto_filter
from pm4py.algo.conformance.tokenreplay import algorithm as token_based_replay
from pm4py.algo.conformance.tokenreplay.diagnostics import duration_diagnostics

#statistics
from pm4py.statistics.traces.log import case_statistics
from pm4py.statistics.traces.log import case_arrival
from pm4py.util.business_hours import BusinessHours
from datetime import datetime
from pm4py.objects.log.util import interval_lifecycle
from pm4py.statistics.eventually_follows.log import get as efg_get
from svglib.svglib import svg2rlg
from reportlab.graphics import renderPM


#performance
from pm4py.algo.evaluation.replay_fitness import algorithm as replay_fitness_evaluator
from pm4py.algo.evaluation.precision import algorithm as precision_evaluator
from pm4py.algo.evaluation.generalization import algorithm as generalization_evaluator
from pm4py.algo.evaluation.simplicity import algorithm as simplicity_evaluator

#analyses
from pm4py.visualization.sna import visualizer as sna_visualizer
from pm4py.algo.organizational_mining.sna import algorithm as sna

def import_xes(file_path):
  event_log = pm4py.read_xes(file_path)
  start_activities = pm4py.get_start_activities(event_log)
  end_activities = pm4py.get_end_activities(event_log)
  print("Start activities: {}\nEnd activities: {}".format(start_activities, end_activities))
  print("\nthe first trace of the log",event_log[0]) #prints the first trace of the log
  print("\nthe first event of the log",event_log[0][0]) #prints the first event of the first trace

    
    
def process_model(file_path):
  event_log = pm4py.read_xes(file_path)  
  process_tree = pm4py.discovery.discover_process_tree_inductive(event_log)
  pm4py.view_process_tree(process_tree) #view process tree 

  bpmn_model = pm4py.convert_to_bpmn(process_tree)
  pm4py.view_bpmn(bpmn_model) # view BPMN

  dfg, start_activities, end_activities = pm4py.discover_dfg(event_log)
  pm4py.view_dfg(dfg, start_activities, end_activities) #view process map (Directly-Followed Graph)

  map = pm4py.discover_heuristics_net(event_log)
  pm4py.view_heuristics_net(map) # view process map ( Heuristics Miner,)



def process_discovery(file_path):
    event_log = pm4py.read_xes(file_path)
    
    net, initial_marking, final_marking = alpha_miner.apply(event_log)  #alpha_miner
    gviz = pn_visualizer.apply(net, initial_marking, final_marking) 
    pn_visualizer.view(gviz)
    
    net, initial_marking, final_marking = inductive_miner.apply(event_log)  #inductive miner
    gviz = pn_visualizer.apply(net, initial_marking, final_marking) 
    pn_visualizer.view(gviz)
    
    heu_net = heuristics_miner.apply_heu(event_log, parameters={heuristics_miner.Variants.CLASSIC.value.Parameters.DEPENDENCY_THRESH: 0.99})  #Heuristic miner
    gviz = hn_visualizer.apply(heu_net)
    hn_visualizer.view(gviz)
    # to show the petri net of heuristic miner 
    net, im, fm = heuristics_miner.apply(event_log, parameters={heuristics_miner.Variants.CLASSIC.value.Parameters.DEPENDENCY_THRESH: 0.99})
    gviz = pn_visualizer.apply(net, im, fm)
    pn_visualizer.view(gviz)

    
    dfg = dfg_discovery.apply(event_log)  #Directed-Followed Graph
    gviz = dfg_visualization.apply(dfg, log=event_log, variant=dfg_visualization.Variants.FREQUENCY)
    dfg_visualization.view(gviz)
    #To get a Directly-Follows graph decorated with the performance between the edges, two parameters of the previous code have to be replaced.
    dfg = dfg_discovery.apply(event_log, variant=dfg_discovery.Variants.PERFORMANCE)
    gviz = dfg_visualization.apply(dfg, log=event_log, variant=dfg_visualization.Variants.PERFORMANCE)
    dfg_visualization.view(gviz)
    
    
    
    
    
def correlation(file_path):
    log = pm4py.read_xes(file_path)
    df = pm4py.convert_to_dataframe(log)
    df = dataframe_utils.convert_timestamp_columns_in_df(df)
    df = df[["concept:name", "time:timestamp"]]
    frequency_dfg, performance_dfg = correlation_miner.apply(df, parameters={correlation_miner.Variants.CLASSIC.value.Parameters.ACTIVITY_KEY: "concept:name",
                                                            correlation_miner.Variants.CLASSIC.value.Parameters.TIMESTAMP_KEY: "time:timestamp"})
    print("frequencey:",frequency_dfg)
    print("\nperformance",performance_dfg)
    
    
    
    
def petri_net(file_path):
    event_log = pm4py.read_xes(file_path)  
    process_tree = pm4py.discovery.discover_process_tree_inductive(event_log)
    net, im, fm = pt_converter.apply(process_tree)
    gviz = pn_visualizer.apply(net, im, fm)
    pn_visualizer.view(gviz)
    pnml_exporter.apply(net, im, "petri.pnml")
    #The list of transitions enabled in a particular marking can be obtained(here we return the enabled transitions for initial_marking im )
    transitions = semantics.enabled_transitions(net,im)
    #to get all places, transitions and arcs
    places = net.places
    transitions = net.transitions
    arcs = net.arcs
    for place in places:
        print("\nPLACE: "+place.name)
        
    #Reachability Graph
    ts = reachability_graph.construct_reachability_graph(net, im)
    gviz = ts_visualizer.apply(ts, parameters={ts_visualizer.Variants.VIEW_BASED.value.Parameters.FORMAT:"svg"})
    gviz.render('reachability_graph.svg', view=True) 
    

    
    
    
def token_based(file_path):
    event_log = pm4py.read_xes(file_path)
    net, initial_marking, final_marking = alpha_miner.apply(event_log)
    replayed_traces = token_replay.apply(event_log , net, initial_marking, final_marking)
    
    filtered_log = apply_auto_filter(event_log) #To create an unfit model, a filtering operation producing a log where only part of the behavior is kept can be executed
    net, initial_marking, final_marking = inductive_miner.apply(filtered_log)
    parameters_tbr = {token_based_replay.Variants.TOKEN_REPLAY.value.Parameters.DISABLE_VARIANTS: True, token_based_replay.Variants.TOKEN_REPLAY.value.Parameters.ENABLE_PLTR_FITNESS: True} #set the parameters
    replayed_traces, place_fitness, trans_fitness, unwanted_activities = token_based_replay.apply(event_log , net,initial_marking,final_marking,parameters=parameters_tbr) # apply the token-based replay with special settings
    trans_diagnostics = duration_diagnostics.diagnose_from_trans_fitness(event_log, trans_fitness) #To perform throughput analysis on the transitions that were executed unfit
    for trans in trans_diagnostics:
        print(trans, trans_diagnostics[trans])
    #To perform throughput analysis on the process executions containing activities that are not in the model   
    
    act_diagnostics = duration_diagnostics.diagnose_from_notexisting_activities(event_log, unwanted_activities) 
    for act in act_diagnostics:
        print(act, act_diagnostics[act])

        
        
        
def statistics(file_path):
    
    event_log = pm4py.read_xes(file_path)
    
    all_case_durations = case_statistics.get_all_casedurations(event_log, parameters={  #all cases durations
    case_statistics.Parameters.TIMESTAMP_KEY: "time:timestamp"})
    print("all case durations is ",all_case_durations)
    
    case_arrival_ratio = case_arrival.get_case_arrival_avg(event_log, parameters={      #arrival ratio
    case_arrival.Parameters.TIMESTAMP_KEY: "time:timestamp"})
    print("case arrival ratio is ",case_arrival_ratio)
    
    case_dispersion_ratio = case_arrival.get_case_dispersion_avg(event_log, parameters={  #dispersion ratio
    case_arrival.Parameters.TIMESTAMP_KEY: "time:timestamp"})
    print("case dispersion ratio is ",case_dispersion_ratio)
    
    
    #working Hours(result converted to seconds) starting from 100000000 represented in seconds of unix timestamp until 200000000  
    st = datetime.fromtimestamp(100000000)
    et = datetime.fromtimestamp(200000000)
    bh_object = BusinessHours(st, et)
    worked_time = bh_object.getseconds()
    print("\nseconds of working ",worked_time)
    
    
    #lead and cycle time
    enriched_log = interval_lifecycle.assign_lead_cycle_time(event_log)
    print("\nevent log enriched with cycle and lead time ",enriched_log)
    
     #Eventually-Follows Graph (partial order of the events inside the process)
    
    efg_graph = efg_get.apply(event_log)
    print("\npartial order of the events inside the process ",efg_graph)

    #Eventually-Follows Graph (partial order of the events inside the process)
    #pm4py.view_dotted_chart(event_log, format="svg") 

def performacne(file_path):
    #Fitness value between Event_log and petri net
    event_log = pm4py.read_xes(file_path)
    process_tree = pm4py.discovery.discover_process_tree_inductive(event_log)
    net, im, fm = pt_converter.apply(process_tree)
    fitness = replay_fitness_evaluator.apply(event_log, net, im, fm, variant=replay_fitness_evaluator.Variants.TOKEN_BASED)
    print("Replay Fitness value",fitness)
    
    #Precision
    prec = precision_evaluator.apply(event_log, net, im, fm, variant=precision_evaluator.Variants.ETCONFORMANCE_TOKEN)
    print("precision value",prec)
    
    #Generalization
    gen = generalization_evaluator.apply(event_log , net, im, fm)
    print("generalization value",gen)
    
    #Simplicity
    simp = simplicity_evaluator.apply(net)
    print("simplicity value",gen)


In [14]:
analyse('BPI_Challenge_2012.xes');

parsing log, completed traces ::   0%|          | 0/13087 [00:00<?, ?it/s]

KeyError: 'org:resource'