In [2]:
import sys
sys.path.append('FLTLf')
from FLTLf.parser import LTLfParser
import time
import torch
import pandas as pd
import pdb
import os
from FLTLf.parser import LTLfParser
from FLTLf.converter import Converter
from FLTLf import core
import input
import traceback
import argparse

# Remove unrecognized arguments if running in Jupyter/IPython
if '__file__' not in globals():  # If not running as a standalone script
    sys.argv = sys.argv[:1]  # Retain only the script name

parser = argparse.ArgumentParser()
parser.add_argument('-d', "--device", default="cpu")

args = parser.parse_args()
core.device = args.device
core.debug = False

torch.manual_seed(0)

num_runs = 10
results = {"Run" : [], "Num. Cases" : [], "Case Length" : []}
batch_sizes_column = []
maxlengths_column = []
run_column = []

i = 0 
maxlengths = [i for i in range(500, 2500, 500)]
batch_sizes = [100, 1000, 10000,100000]
num_AP = 2
predicate_names = [f"p{i}" for i in range(num_AP)]

simple_formulas = ["(X(p0) >= 0.5)", "(WX(p0) >= 0.5)", "(G(p0) >= 0.5)", "(F(p0) >= 0.5)", "((p0Up1) >= 0.5)", "((p0Wp1) >= 0.5)", "((p0Rp1) >= 0.5)", "((p0Mp1) >= 0.5)"]

results_per_formula = {formula : [] for formula in simple_formulas}

#verbose printouts
verbose = False
#save results on a csv
savefile = True
#padding needed?
skippadding = True

#for every combination of maxlength, batchsize
for batch_size in batch_sizes:
    for maxlength in maxlengths:
        
        run_column.extend([i for i in range(num_runs)])
        batch_sizes_column.extend([batch_size]*num_runs)
        maxlengths_column.extend([maxlength]*num_runs)

        print("------times-------")
        start = time.time()

        # Generate a random tensor log
        tensor_log = torch.rand(batch_size, maxlength, len(predicate_names), dtype=torch.half)
        rand = time.time()
        print(f"RANDOM GEN time {rand - start}")

        # Padding
        converter = Converter(predicate_names)    
        tensor_log = converter.log2tensor(tensor_log, verbose, skippadding) 

        if not(skippadding):
            print(f"PADDING time {time.time() - rand}")
        else:
            print(f"PADDING skipped")        

        # Parsing into a formula
        parser = LTLfParser()
        
        # For each formula
        for formula in simple_formulas:
            start = time.time()
                
            # Slicing of predicates not in formula
            # The core routine reads the predicate_names from input, so we override them
            core.tensor_log, input.predicate_names = converter.slice_tensor_log(tensor_log, formula, verbose)
            core.tensor_log = core.tensor_log.to(core.device)
                
            # Setting dimensions for practical access
            core.batch_size = converter.batch_size
            core.maxlength = converter.maxlength

            ready = time.time()
            print(f"SLICING TIME {ready - start}")

            try:
                pyformula = parser(formula)     
            except Exception as e:
                print(traceback.format_exc())

            print(f"PARSING TIME {time.time() - ready}")
                            
            for run in range(num_runs):
                start = time.time()
                visitor = core.Visitor()

                print("------------------")
                print(f"Evaluation of {pyformula.print()} at instant {i} :")
                try:
                    visitor.visit(pyformula, i)
                except Exception as e:
                    print(traceback.format_exc())

                end = time.time()
                exec_time = end - start

                if savefile:
                    results_per_formula[formula].append(exec_time)

                print(f"({run}, {core.batch_size}, {core.maxlength}) -> {exec_time}")


results["Run"] = run_column
results["Num. Cases"] = batch_sizes_column
results["Case Length"] = maxlengths_column

results_df = pd.DataFrame.from_dict(results_per_formula)
settings_df = pd.DataFrame.from_dict(results)
results_df = pd.concat([settings_df, results_df], axis=1)

if savefile:
    os.makedirs('results', exist_ok=True)
    results_df.to_csv(os.path.join('results', f'simple_conformance_check_times_{core.device.upper()}.csv'))
    print("Results saved successfully.")


------times-------
RANDOM GEN time 0.0
PADDING skipped
SLICING TIME 0.0
PARSING TIME 0.0
------------------
Evaluation of NEXT((p0 >= 0.5)) at instant 0 :
(0, 100, 500) -> 0.0
------------------
Evaluation of NEXT((p0 >= 0.5)) at instant 0 :
(1, 100, 500) -> 0.0
------------------
Evaluation of NEXT((p0 >= 0.5)) at instant 0 :
(2, 100, 500) -> 0.0
------------------
Evaluation of NEXT((p0 >= 0.5)) at instant 0 :
(3, 100, 500) -> 0.0
------------------
Evaluation of NEXT((p0 >= 0.5)) at instant 0 :
(4, 100, 500) -> 0.0
------------------
Evaluation of NEXT((p0 >= 0.5)) at instant 0 :
(5, 100, 500) -> 0.0
------------------
Evaluation of NEXT((p0 >= 0.5)) at instant 0 :
(6, 100, 500) -> 0.0
------------------
Evaluation of NEXT((p0 >= 0.5)) at instant 0 :
(7, 100, 500) -> 0.0
------------------
Evaluation of NEXT((p0 >= 0.5)) at instant 0 :
(8, 100, 500) -> 0.0
------------------
Evaluation of NEXT((p0 >= 0.5)) at instant 0 :
(9, 100, 500) -> 0.0
SLICING TIME 0.0
PARSING TIME 0.0
-------