In [1]:
import pm4py
import random
from transformers import AutoModelForSeq2SeqLM
from transformers import AutoTokenizer
import torch
from evaluation.utils import generate_prediction_list

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
class xSemAD:
    def __init__(self, path_to_model, random_seed=4, max_new_tokens=100) -> None:
        self.path_to_model = path_to_model
        self.random_seed = random_seed
        self.max_new_tokens = max_new_tokens
        try:
            self.model = AutoModelForSeq2SeqLM.from_pretrained(self.path_to_model)
            self.tokenizer = AutoTokenizer.from_pretrained(self.path_to_model)
            #to device
            self.device = "cuda" if torch.cuda.is_available() else "cpu"
            self.model = self.model.to(self.device)
            print('model loaded from: ', self.path_to_model)
        except:
            print('Error: Check model path!')
    
    ##################
    # LOAD EVENT LOG
    ##################
    def load_event_log_as_xes(self,path_to_file, activity_column_name = 'concept:name', case_id_column_name='case:concept:name'):
        self.activity_column_name = activity_column_name
        self.case_id_column_name = case_id_column_name
        self.log = pm4py.read_xes(path_to_file)
        self.log[activity_column_name] = self.log[activity_column_name].str.replace('  ',' ').str.lower()
        # generate context
        random.seed(self.random_seed)
        self.events = self.log[activity_column_name].unique()
        #random.shuffle(events)
        self.context = " <event> " + " <event> ".join(self.events)

    ##################
    # GENERATE CONSTRAINTS
    ##################
    def _filter_items(self, prediction_list, events):
        # Create a new list to hold the filtered items
        filtered_list = []
        # Iterate over each tuple in the first list
        for item in prediction_list:
            # Extract the description part from the item, finding the part between the square brackets
            start_idx = item[0].find('[') + 1
            end_idx = item[0].find(']')
            description = item[0][start_idx:end_idx]
            # Split the description by comma to handle multiple elements
            description_elements = [elem.strip() for elem in description.split(',')]
            
            # Check for duplicates in description elements
            if len(description_elements) != len(set(description_elements)):
                continue
            
            # Check if any of the description elements are in the second list
            if any(elem in events for elem in description_elements):
                # If yes, add the item to the filtered list
                filtered_list.append(item)
        return filtered_list
    
    def _filter_list_by_threshold(self, prediction_list, threshold):
        filtered_items = [item for item in prediction_list if item[1] > threshold]
        return filtered_items

    def generate_constraint(self, constraint_type, threshold=.8):
        print('Get prediction for constraint type: ', constraint_type)
        prompt = f'{constraint_type}: {self.context}'
        prediction = generate_prediction_list(prompt,self.tokenizer,self.model,30, max_new_tokens=self.max_new_tokens, device=self.device)
        prediction = self._filter_items(prediction, self.events)
        prediction = self._filter_list_by_threshold(prediction, threshold)
        return prediction
    
    ##################
    # CHECK FOR ANOMALIES
    ##################
    # Define functions for each constraint type
    def check_init(self, trace, a):
        return trace[0] == a
    def check_end(self, trace, a):
        return trace[-1] == a
    def check_prec(self, trace, aj, ak):
        return ak not in trace or aj in trace[:trace.index(ak)]
    def check_alt_prec(self, trace, aj, ak):
        for i in range(1, len(trace)):
            if trace[i] == ak and trace[i-1] != aj:
                return False
        return True
    def check_co_ex(self, trace, aj, ak):
        return aj in trace and ak in trace
    def check_resp(self, trace, aj, ak):
        if aj in trace:
            return ak in trace[trace.index(aj):]
        return True
    def check_alt_resp(self, trace, aj, ak):
        last_aj_index = -1
        for i in range(len(trace)):
            if trace[i] == aj:
                if last_aj_index != -1 and ak not in trace[last_aj_index+1:i]:
                    return False
                last_aj_index = i
        return True
    def check_succ(self, trace, aj, ak):
        return aj not in trace or (ak in trace and trace.index(ak) == trace.index(aj) + 1)
    def check_alt_succ(self, trace, aj, ak):
        expected_next = None
        for a in trace:
            if a == aj:
                if expected_next and expected_next != ak:
                    return False
                expected_next = ak
            elif a == ak:
                if expected_next and expected_next != aj:
                    return False
                expected_next = aj
        return True
    def check_choice(self, trace, aj, ak):
        return aj in trace or ak in trace
    def check_ex_ch(self, trace, aj, ak):
        return (aj in trace) != (ak in trace)
    def apply_constraint(self,trace, constraint):
        if constraint.startswith('Init'):
            a = constraint[len('Init['):-1]
            return self.check_init(trace, a)
        elif constraint.startswith('End'):
            a = constraint[len('End['):-1]
            return self.check_end(trace, a)
        elif constraint.startswith('Precedence'):
            aj, ak = constraint[len('Precedence['):-1].split(', ')
            return self.check_prec(trace, aj, ak)
        elif constraint.startswith('Alternate Precedence'):
            aj, ak = constraint[len('Alternate Precedence['):-1].split(', ')
            return self.check_alt_prec(trace, aj, ak)
        elif constraint.startswith('Co-Existence'):
            aj, ak = constraint[len('Co-Existence['):-1].split(', ')
            return self.check_co_ex(trace, aj, ak)
        elif constraint.startswith('Response'):
            aj, ak = constraint[len('Response['):-1].split(', ')
            return self.check_resp(trace, aj, ak)
        elif constraint.startswith('Alternate Response'):
            aj, ak = constraint[len('Alternate Response['):-1].split(', ')
            return self.check_alt_resp(trace, aj, ak)
        elif constraint.startswith('Succession'):
            aj, ak = constraint[len('Succession['):-1].split(', ')
            return self.check_succ(trace, aj, ak)
        elif constraint.startswith('Alternate Succession'):
            aj, ak = constraint[len('Alternate Succession['):-1].split(', ')
            return self.check_alt_succ(trace, aj, ak)
        elif constraint.startswith('Choice'):
            aj, ak = constraint[len('Choice['):-1].split(', ')
            return self.check_choice(trace, aj, ak)
        elif constraint.startswith('Exclusive Choice'):
            aj, ak = constraint[len('Exclusive Choice['):-1].split(', ')
            return self.check_ex_ch(trace, aj, ak)
        return False

    # function to count violations
    def count_violations(self, prediction):
        violations = {constraint: 0 for constraint, _ in prediction}
        for case_id, group in self.log.groupby(self.case_id_column_name):
            trace = group[self.activity_column_name].tolist()
            for constraint, _ in prediction:
                if not self.apply_constraint(trace, constraint):
                    violations[constraint] += 1
        return violations

    

In [5]:
path_to_model="data/model/sap_sam_2022/filtered/new/google/flan-t5-small/checkpoint-127200/checkpoint-42400/"
path_to_log = 'data/realworld/InternationalDeclarations.xes'
constraint_types = [('Precedence',.75),('Alternate Precedence',.75),('Co-Existence',.75),('Response',.75),('Alternate Response',.75),('Succession',.75),('Alternate Succession',.75),('Init',.85),('End',.85),('Choice',.75), ('Exclusive Choice',.75)]

In [6]:
model = xSemAD(path_to_model=path_to_model)
model.load_event_log_as_xes(path_to_log, activity_column_name = 'concept:name', case_id_column_name='case:concept:name')

for constraint_type in constraint_types:
    predictions = model.generate_constraint(constraint_type=constraint_type[0], threshold=constraint_type[1])
    violations = model.count_violations(predictions)
    print(violations)

model loaded from:  data/model/sap_sam_2022/filtered/new/google/flan-t5-small/checkpoint-127200/checkpoint-42400/


parsing log, completed traces :: 100%|██████████| 6449/6449 [00:08<00:00, 755.01it/s] 


Get prediction for constraint type:  Precedence
{'Precedence[send reminder, declaration rejected by supervisor]': 116, 'Precedence[declaration approved by supervisor, permit rejected by supervisor]': 92, 'Precedence[declaration approved by supervisor, declaration rejected by supervisor]': 121, 'Precedence[send reminder, declaration approved by supervisor]': 234, 'Precedence[declaration approved by supervisor, permit approved by supervisor]': 637, 'Precedence[start trip, declaration rejected by supervisor]': 10, 'Precedence[request payment, declaration rejected by supervisor]': 122, 'Precedence[start trip, declaration approved by supervisor]': 32, 'Precedence[request payment, permit rejected by supervisor]': 92}
Get prediction for constraint type:  Alternate Precedence
{'Alternate Precedence[send reminder, declaration rejected by supervisor]': 122, 'Alternate Precedence[declaration approved by supervisor, permit rejected by supervisor]': 92, 'Alternate Precedence[send reminder, declarat