## Assignment 1 - Alpha algorithm

##### Input - list of tuples of str's: [(str1, ..., strL), (strJ,...strK), ..., (strM,...,strN)]
##### Output - set of tuples of tuples str's {((str1,...,strI), (strL,...,strK)),...}

In [2]:
# Downloading petri_net.py
import requests

# File url on GitHub
URL = "https://raw.githubusercontent.com/Evgeny11111/Process_Mining/main/petri_net.py"

response = requests.get(URL, timeout=10)
response.raise_for_status()
with open('petri_net.py', 'wb') as f:
    f.write(response.content)

In [3]:
from petri_net import PetriNet
from subprocess import check_call
from pprint import pprint

In [4]:
log = [('a', 'b', 'c', 'd'), ('a', 'c', 'b', 'd'), ('a', 'e', 'd')]

#### Step 1. Make set of events

In [5]:
#expected output: set of str's
def all_events(log):
    tl = set()
    for item in log:
        for i in item:
            tl.add(i)
    return tl

events = all_events(log)

events

{'a', 'b', 'c', 'd', 'e'}

#### Step 2. Get all start events

In [6]:
#expected output: set of str's
def get_start_nodes(log):
    tl = set()
    for item in log:
        tl.add(item[0])
    return tl

start_nodes = get_start_nodes(log)

start_nodes

{'a'}

#### Step 3. Get all end events

In [7]:
#expected output: set of str's
def get_end_nodes(log):
    tl = set()
    for item in log:
        tl.add(item[-1])
    return tl

end_nodes = get_end_nodes(log)

end_nodes

{'d'}

#### Step 4. Make dataset

1. Define functions for causality, parallel and unrelated pairs<br>
2. Create footprint matrix<br>
3. Create pairs of events<br>
4. Iterate until no non-maximal pairs available

In [8]:
def footprint_matrix(log):
    """
    Return dictionary

    dict_[event_1][event_2] = ind, where
    ind =:
        0, event_1 # event_2
        1, event_1 -> event_2
        2, event_1 <- event_2
        3, event_1 || event_2
    """
    events = all_events(log)

    events_dict = {}
    for event_1 in events:
        events_dict[event_1] = {event_2: 0 for event_2 in events}
    
    for item in log:
        for ind in range(len(item) - 1):
            events_dict[item[ind]][item[ind + 1]] = 1

    matrix = {}
    for event_1 in events:
        matrix[event_1] = {}
        for event_2 in events:
            val = events_dict[event_1][event_2] + \
                  2 * events_dict[event_2][event_1]

            matrix[event_1][event_2] = val

    return matrix

def pairs_of_events(matrix):
    pairs = []
    for event_1 in matrix:
        for event_2 in matrix[event_1]:
            if (not matrix[event_1][event_1]) and \
               (not matrix[event_2][event_2]) and \
               matrix[event_1][event_2] == 1:
                pairs.append(((event_1,), (event_2,)))

    return pairs

def try_maximize(pair_1, pair_2, matrix):
    """
    Return new pair, if can, else return None
    """
    if pair_1[0] == pair_2[0]:
        for ev_1 in pair_1[1]:
            for ev_2 in pair_2[1]:
                if matrix[ev_1][ev_2]:
                    return None
        return (pair_1[0], tuple(sorted(set(pair_1[1] + pair_2[1]))))

    if pair_1[1] == pair_2[1]:
        for ev_1 in pair_1[0]:
            for ev_2 in pair_2[0]:
                if matrix[ev_1][ev_2]:
                    return None
        return (tuple(sorted(set(pair_1[0] + pair_2[0]))), pair_1[1])

    return None
    
def maximize(pairs, matrix):
    flag = False
    pairs_for_delete = set()
    added_pairs = set()
    for ind_1, pair_1 in enumerate(pairs):
        for ind_2 in range(ind_1 + 1, len(pairs)):
            pair_2 = pairs[ind_2]

            pair = try_maximize(pair_1, pair_2, matrix)
            if pair:
                flag = True
                pairs_for_delete.add(pair_1)
                pairs_for_delete.add(pair_2)
                added_pairs.add(pair)

    pairs = set(pairs)
    pairs = pairs - pairs_for_delete
    pairs = pairs | added_pairs
    pairs = list(pairs)

    if flag:
        return maximize(pairs, matrix)
    else:
        return pairs

def make_dataset(log):
    matrix = footprint_matrix(log)
    pairs = pairs_of_events(matrix)
    return set(maximize(pairs, matrix))

dataset = make_dataset(log)
dataset

{(('a',), ('b', 'e')),
 (('a',), ('c', 'e')),
 (('b', 'e'), ('d',)),
 (('c', 'e'), ('d',))}

Expected output (order irrelevant): {(('a',), ('c', 'e')),
 (('a',), ('e', 'b')),
 (('c', 'e'), ('d',)),
 (('e', 'b'), ('d',))} 

#### Let's check the result with PetriNet drawer

In [9]:
pn = PetriNet()
filename = 'my_first_alpha_miner'
pn.generate_with_alpha(tl=events,
                       ti=start_nodes,
                       to=end_nodes,
                       yl=dataset,
                       dotfile="{}.dot".format(filename))
check_call(["dot", "-Tpng", "{}.dot".format(filename),"-o", "{}.png".format(filename)])

0

## Another log for testing

Input:

In [10]:
log = [('a', 'b', 'e', 'f'),
 ('a', 'b', 'e', 'c', 'd', 'b', 'f'),
 ('a', 'b', 'c', 'e', 'd', 'b', 'f'),
 ('a', 'b', 'c', 'd', 'e', 'b', 'f'),
 ('a', 'e', 'b', 'c', 'd', 'b', 'f')]
log

[('a', 'b', 'e', 'f'),
 ('a', 'b', 'e', 'c', 'd', 'b', 'f'),
 ('a', 'b', 'c', 'e', 'd', 'b', 'f'),
 ('a', 'b', 'c', 'd', 'e', 'b', 'f'),
 ('a', 'e', 'b', 'c', 'd', 'b', 'f')]

Expected output:

In [11]:
dataset = {(('a',), ('e',)), (('a', 'd'), ('b',)), (('b',), ('c', 'f')), (('c',), ('d',)), (('e',), ('f',))}
dataset

{(('a',), ('e',)),
 (('a', 'd'), ('b',)),
 (('b',), ('c', 'f')),
 (('c',), ('d',)),
 (('e',), ('f',))}

In [12]:
dataset = make_dataset(log)
dataset

{(('a',), ('e',)),
 (('a', 'd'), ('b',)),
 (('b',), ('c', 'f')),
 (('c',), ('d',)),
 (('e',), ('f',))}