In [0]:
#!pip install pm4py

In [0]:
import pm4py
from pm4py.objects.conversion.log import converter as log_converter
import pandas as pd
from pyspark.sql.functions import col, struct, explode, collect_list,lit,array,split, expr
from pyspark.sql.types import StructType, StructField, StringType, ArrayType,FloatType,DoubleType,IntegerType,TimestampType
from time import sleep
from pyspark.sql.functions import udf
from pyspark.sql import DataFrame,SparkSession
import pyspark.sql.functions as F 
from pyspark.sql.functions import current_timestamp
import datetime



In [0]:
dbutils.fs.cp(
  'dbfs:/FileStore/BPI_2012_1k_sample.xes', 
  'file:/tmp/BPI_2012_1k_sample.xes')

Out[3]: True

Create model

In [0]:
class AlphabetService:
    def __init__(self):
        self.activity_to_alphabet = {}
        self.alphabet_to_activity = {}
        self.char_counter = 64  

    def alphabetize(self, label):
        if label not in self.activity_to_alphabet:
            self.char_counter += 1
            self.activity_to_alphabet[label] = chr(self.char_counter)
            self.alphabet_to_activity[chr(self.char_counter)] = label
            #print("label",label,"char",chr(self.char_counter))
        return self.activity_to_alphabet[label]

    def clear(self):
        self.activity_to_alphabet.clear()
        self.alphabet_to_activity.clear()
        self.char_counter = 64


def process_trace(trace, nodes,depth, current_node='root'):
    last_node = current_node #root
    previous_n_nodes = ['root']

    nodes['root'] = {'label': 'root', 'parent': None, 'level':0, 'direct_children': set(),'direct_children_labels':set(),'nth_children':set()}
    for event in trace:
        event_name = event['alphabetized_label']
        node_id = f"{last_node}-{event_name}"
        if node_id not in nodes:
            nodes[node_id] = {'label': event_name, 'parent': last_node if last_node != 'root' else 'root', 'level':int((len(node_id)-4)/2), 'direct_children': set(),'direct_children_labels':set(),'nth_children':set()}
        
        #update the children of the parent node
        if len(previous_n_nodes)>depth:
            previous_n_nodes = previous_n_nodes[1:]
        for node in previous_n_nodes:
            start = len(node)
            end = len(node_id)-1
            events_between = tuple(node_id[start+1:end-1].split("-"))
            sub_array = (node_id,event_name,int((len(node_id)-4)/2),events_between)
            nodes[node]['nth_children'].add(sub_array)
        if last_node != 'root':
            nodes[last_node]['direct_children'].add(node_id)
            nodes[last_node]['direct_children_labels'].add(event_name)
        previous_n_nodes.append(node_id)
        last_node = node_id

    return nodes

def convert_sets_to_lists(obj):
    if isinstance(obj, set):
        return list(obj)
    elif isinstance(obj, dict):
        return {k: convert_sets_to_lists(v) for k, v in obj.items()}
    else:
        return obj

event_log = pm4py.read_xes("/tmp/BPI_2012_1k_sample.xes")
dataframe = log_converter.apply(event_log, variant=log_converter.Variants.TO_DATA_FRAME)
labels_trace = dataframe[["concept:name", "case:concept:name"]]
grouped_traces = labels_trace.groupby("case:concept:name", sort=False)

alphabet_service = AlphabetService()
nodes = {}
for trace, group in grouped_traces:
    group['alphabetized_label'] = group["concept:name"].apply(alphabet_service.alphabetize)
    nodes = process_trace(group.to_dict('records'), nodes, 3)

labels = alphabet_service.activity_to_alphabet
schema = StructType([
    StructField("event", StringType(), True),
    StructField("label", StringType(), True)
])
df_labels = spark.createDataFrame(list(labels.items()), schema=schema)

data = [{"node_id": node_id, **convert_sets_to_lists(node_data)} for node_id, node_data in nodes.items()]
#df_nodes = spark.createDataFrame(data)

schema = StructType([
    StructField("node_id", StringType(), True),
    StructField("label", StringType(), True),
    StructField("level", IntegerType(), True),
    StructField("direct_children", ArrayType(StringType()), True),
    StructField("direct_children_labels", ArrayType(StringType()), True),
    StructField("nth_children", ArrayType(
        StructType([
            StructField("node_id", StringType(), True),
            StructField("label", StringType(), True),
            StructField("level", IntegerType(), True),
            StructField("events_between", ArrayType(StringType()), True)
        ])
    ), True),
    StructField("parent", StringType(), True)
])
df_nodes = spark.createDataFrame(data, schema=schema)



parsing log, completed traces ::   0%|          | 0/1000 [00:00<?, ?it/s]

In [0]:
df_nodes.createOrReplaceTempView("iws_model")
df_labels.createOrReplaceTempView("iws_labels")

In [0]:
%sql
CREATE OR REPLACE TABLE iws_event (event STRING, time_stamp TIMESTAMP, trace_id STRING);

CREATE OR REPLACE TABLE iws_state
(trace_id STRING, ts TIMESTAMP, current_node STRING,current_id STRING,cost_of_alignment INTEGER,previous_events STRING, trace STRING, execution_sequence STRING,event_level INTEGER,current_event_level INTEGER,current_node_level INTEGER);
--event level to filter out the latest alignments later

In [0]:
event_df = spark.readStream.table("iws_event").withWatermark("time_stamp", "1 minute")
event_df.createOrReplaceTempView("events")

In [0]:
%scala
import scala.collection.mutable.ArrayBuffer

def calculateAlignmentCost(modelEvents: String, eventArray: Array[String]): Array[(String, String, Int, Int)] = {
  val newEvents = modelEvents.replace("-", "").split("")
  val n = eventArray.length
  val m = newEvents.length
  val dp = Array.tabulate(n + 1, m + 1)((i, j) => if (i == 0) j else if (j == 0) i else 0)

  if (modelEvents == "") {
    return eventArray.zipWithIndex.map { case (event, index) =>
      (event, "log", 1, index + 1)
    }
  }

  // Fill the matrix
  for (i <- 1 to n) {
    for (j <- 1 to m) {
      if (eventArray(i - 1) == newEvents(j - 1)) {
        dp(i)(j) = dp(i - 1)(j - 1)
      } else {
        dp(i)(j) = math.min(dp(i - 1)(j) + 1, dp(i)(j - 1) + 1)
      }
    }
  }

  // Track back to build the alignment
  val alignment = ArrayBuffer[(String, String, Int, Int)]()
  var i = n
  var j = m
  var cost = dp(n)(m)

  while (i > 0 && j > 0) {
    if (eventArray(i - 1) == newEvents(j - 1)) {
      alignment.prepend((eventArray(i - 1), "sync", 0, i))
      i -= 1
      j -= 1
    } else if (dp(i)(j) == dp(i - 1)(j) + 1) {
      alignment.prepend((eventArray(i - 1), "log", 1, i))
      i -= 1
    } else {
      alignment.prepend((newEvents(j - 1), "model", 1, j))
      j -= 1
    }
  }
  // Handle any remaining elements necessary if there are trailing events in the beginning of the trace or model nodes
  while (i > 0) {
    alignment.prepend((eventArray(i - 1), "log", 1, i))
    i -= 1
  }
  while (j > 0) {
    alignment.prepend((newEvents(j - 1), "model", 1, j))
    j -= 1
  }


  alignment.toArray
}

// Registering the UDF
spark.udf.register("calculateAlignmentCost", (modelEvents: String, eventArray: Array[String]) => calculateAlignmentCost(modelEvents, eventArray))

### foreachBatch method

In [0]:
%sql
CREATE OR REPLACE TABLE state_test_batch_test_1
(trace_id STRING, ts TIMESTAMP,current_id STRING,previous_events STRING,event_level INTEGER,current_node_level INTEGER,label STRING,alignment ARRAY<STRUCT<event String,move_type String>>,cost_of_alignment INTEGER,event_array ARRAY<STRING>,event_index INTEGER,batch_id INTEGER);

In [0]:
%sql
SELECT trace_id,current_id,previous_events,slice(alignment, 2, size(alignment)),cost_of_alignment
FROM stream_test_alignm_batch_test_1
WHERE (trace_id, cost_of_alignment) IN (
    SELECT trace_id, MIN(cost_of_alignment)
    FROM stream_test_alignm_batch_test_1
    GROUP BY trace_id
);


trace_id,current_id,previous_events,"slice(alignment, 2, size(alignment))",cost_of_alignment
trace_0,root-A-B,AB,"List(List(A, sync), List(B, sync))",0
trace_1,root-A-B,AB,"List(List(A, sync), List(B, sync))",0
trace_10,root-A-B-G,ABG,"List(List(A, sync), List(B, sync), List(G, sync))",0
trace_100,root-A-B,ABW,"List(List(A, sync), List(B, sync), List(W, log))",1
trace_1000,root-A-B-C,ABC,"List(List(A, sync), List(B, sync), List(C, sync))",0
trace_1001,root-A-B-D-E,ABDE,"List(List(A, sync), List(B, sync), List(D, sync), List(E, sync))",0
trace_1002,root-A-B-D-E,ABDE,"List(List(A, sync), List(B, sync), List(D, sync), List(E, sync))",0
trace_1003,root-A-B,ABNTRSUO,"List(List(A, sync), List(B, sync), List(N, log), List(T, log), List(R, log), List(S, log), List(U, log), List(O, log))",6
trace_1004,root-A-B,ABW,"List(List(A, sync), List(B, sync), List(W, log))",1
trace_1005,root-A-B-D-E-E-H-I-J-K-L-M,ABHIJKLM,"List(List(A, sync), List(B, sync), List(D, model), List(E, model), List(E, model), List(H, sync), List(I, sync), List(J, sync), List(K, sync), List(L, sync), List(M, sync))",3


In [0]:
%sql
CREATE OR REPLACE temp VIEW stream_test_alignm_batch_test_1 AS SELECT DISTINCT 
                trace_id,
                current_id,
                previous_events,
                alignment,
                cost_of_alignment,
                event_level,
                max_event_level as current_event_level,
                current_node_level,
                rn,
                event_array,
                event_index,
                batch_id
FROM   (
        SELECT *,
        Max(event_level) OVER (partition BY trace_id) AS max_event_level,
        Row_number() OVER (partition BY trace_id,current_id ORDER BY event_level DESC,cost_of_alignment ASC, Len(previous_events) DESC) rn
        FROM state_test_batch_test_1)
WHERE rn = 1

In [0]:
def process_batch(df: DataFrame, batch_id: int):

    if not df.isEmpty():
        df.createOrReplaceTempView("streaming_data")

        result_df = df.sparkSession.sql("""
            WITH FIRST_BD AS (
            SELECT *,substr(node_id FROM len(previous_id) + 1) model_sub, CASE WHEN len(substr(node_id FROM len(previous_id) + 1)) = 0 THEN 0 ELSE _event_index END as event_index  FROM (
            SELECT
                e.trace_id AS trace_id,
                idx AS _event_index,
                col.time_stamp AS time_stamp,
                col.label AS incoming_label,
                e.event_array.label AS event_array,
                size(e.event_array) AS len,
                COALESCE(r.current_id, 'root') AS previous_id,
                COALESCE(r.cost_of_alignment, 0) AS cost_of_alignment,
                COALESCE(r.previous_events, '') AS previous_events,
                COALESCE(r.alignment, ARRAY(struct("" AS event, "" AS move_type))) AS previous_alignment,
                COALESCE(r.event_level, 0) AS event_level,
                COALESCE(r.current_event_level, 0) AS current_event_level,
                COALESCE(r.current_node_level, 0) AS current_node_level
            FROM (
                SELECT 
                    trace_id,
                    array_sort(collect_list(struct(time_stamp, label))) AS event_array
                FROM 
                    streaming_data e 
                JOIN 
                    iws_labels l ON e.event = l.event
                GROUP BY trace_id
            ) e
            LEFT JOIN stream_test_alignm_batch_test_1 r ON e.trace_id = r.trace_id 
            LATERAL VIEW posexplode(e.event_array) AS idx, col
        ) f
        JOIN iws_model m ON (m.node_id LIKE CONCAT(f.previous_id, '%') 
            AND m.level < f.current_node_level  + _event_index + 3 + 2
            AND m.label = f.incoming_label) 
            OR (m.node_id = f.previous_id AND len = _event_index + 1)),
MaxEventIndexPerTrace AS (
    SELECT
        trace_id as _trace_id,
        MAX(event_index) AS max_event_index
    FROM FIRST_BD
    GROUP BY trace_id
),
BASE_DATA AS (SELECT * FROM FIRST_BD LEFT JOIN MaxEventIndexPerTrace on FIRST_BD.trace_id = MaxEventIndexPerTrace._trace_id WHERE event_index >= greatest(max_event_index-4,0))

SELECT 
      trace_id,
                time_stamp as ts,
                node_id as current_id,
                concat(previous_events,concat_ws("",event_array)) as previous_events,
                current_event_level + len as event_level,
                level as current_node_level,
                label,
                CONCAT(
                    previous_alignment,
                    TRANSFORM(
                        calc_alignment,
                            x -> named_struct('event', x._1, 'move_type', x._2)
                        )
                 ) AS alignment,
                cost_of_alignment + aggregate(calc_alignment._3, 0, (acc, x) -> acc + x) AS cost_of_alignment,
                event_array,
                event_index
    FROM (
        SELECT *,calculateAlignmentCost(substr(node_id FROM len(previous_id) + 1),event_array) as calc_alignment 
        FROM BASE_DATA
            ) 
        """)
        result_df = result_df.withColumn("batch_id",F.lit(batch_id))
        # Write the results of the SQL query to a Delta table
        result_df.write.format("delta").mode("append").option("checkpointLocation", "/tmp/delta/state_append_30059/").saveAsTable("state_test_batch_test_1")

# Set up the write stream using foreachBatch
query = streaming_df_7.writeStream.foreachBatch(process_batch).start()
#query.awaitTermination()

Solution trying to optimise the amount of UDF calculations to be done, however cross-join is probaly too slow.

In [0]:
def process_batch(df, batch_id):

    if not df.isEmpty():
        df.createOrReplaceTempView("streaming_data")

        result_df = df.sparkSession.sql("""
            WITH FIRST_BD AS (
            SELECT *,substr(node_id FROM len(previous_id) + 1) model_sub, CASE WHEN len(substr(node_id FROM len(previous_id) + 1)) = 0 THEN 0 ELSE _event_index END as event_index  FROM (
            SELECT
                e.trace_id AS trace_id,
                idx AS _event_index,
                col.time_stamp AS time_stamp,
                col.label AS incoming_label,
                e.event_array.label AS event_array,
                size(e.event_array) AS len,
                COALESCE(r.current_id, 'root') AS previous_id,
                COALESCE(r.cost_of_alignment, 0) AS cost_of_alignment,
                COALESCE(r.previous_events, '') AS previous_events,
                substr(concat_ws("", e.event_array.label), greatest(idx-3,0), idx + 1) AS trace_suffix,
                COALESCE(r.alignment, ARRAY(struct("" AS event, "" AS move_type))) AS previous_alignment,
                COALESCE(r.event_level, 0) AS event_level,
                COALESCE(r.current_event_level, 0) AS current_event_level,
                COALESCE(r.current_node_level, 0) AS current_node_level
            FROM (
                SELECT 
                    trace_id,
                    array_sort(collect_list(struct(time_stamp, label))) AS event_array
                FROM 
                    streaming_data e 
                JOIN 
                    iws_labels l ON e.event = l.event
                GROUP BY trace_id
            ) e
            LEFT JOIN stream_test_alignm_batch r ON e.trace_id = r.trace_id 
            LATERAL VIEW posexplode(e.event_array) AS idx, col
        ) f
        JOIN iws_model m ON (m.node_id LIKE CONCAT(f.previous_id, '%') 
            AND m.level < f.current_node_level  + _event_index + 2
            AND m.label = f.incoming_label) 
            OR (m.node_id = f.previous_id AND len = _event_index + 1)
    ),
MaxEventIndexPerTrace AS (
    SELECT
        trace_id as _trace_id,
        MAX(event_index) AS max_event_index
    FROM FIRST_BD
    GROUP BY trace_id
),
BASE_DATA AS (SELECT * FROM FIRST_BD LEFT JOIN MaxEventIndexPerTrace on FIRST_BD.trace_id = MaxEventIndexPerTrace._trace_id WHERE event_index > greatest(max_event_index-4,0)),

    SUFFIXES AS (
        SELECT  * FROM (
            SELECT  
                a.previous_id, 
                a.trace_id, 
                a.node_id AS candidate,
                MAX(CASE WHEN b.model_sub LIKE CONCAT(a.model_sub, '-%') THEN 1 ELSE 0 END) AS is_covered
            FROM BASE_DATA a
            JOIN BASE_DATA b ON a.model_sub <> b.model_sub 
                AND a.trace_id = b.trace_id 
                AND a.previous_id = b.previous_id
            GROUP BY a.trace_id, a.previous_id,a.node_id
        ) WHERE IS_COVERED = 0
    ),
    INTERMEDIATE AS (
        SELECT 
            trace_id, 
            node_id AS current_id,
            TRANSFORM(
                calc_alignment,
                x -> named_struct('event', x._1, 'move_type', x._2,"_cost",x._3, "event_index", x._4)
            ) AS alignment,
            model_sub
        FROM (
            SELECT 
                b.trace_id, 
                b.node_id, 
                b.previous_id, 
                b.model_sub,
                calculateAlignmentCost(
                    b.model_sub, 
                    event_array
                ) AS calc_alignment  
            FROM BASE_DATA b 
            INNER JOIN SUFFIXES s ON b.trace_id = s.trace_id 
                AND b.previous_id = s.previous_id 
                AND b.node_id = s.candidate
        )
    ),
    UPDATED_BASE_DATA AS (
        SELECT 
            b.*,
            i.alignment 
        FROM BASE_DATA b
        LEFT JOIN INTERMEDIATE i ON b.trace_id = i.trace_id 
            AND i.current_id LIKE CONCAT("%",b.model_sub, '%') 
    ),
UPDATED_ALIGNMENTS AS(SELECT len(model_sub)/2,trace_id,time_stamp,previous_events,current_event_level,len,event_array,label,previous_alignment,cost_of_alignment,event_index,node_id,level,TRANSFORM(
                    alignment,
                        x -> CASE WHEN x.event_index > event_index+1 OR len(model_sub) = 0 and x.move_type in ("log","sync") THEN named_struct('event', x.event, 'move_type', "log","_cost",1,"index",x.event_index)
                        WHEN x.move_type in ("model","sync") and len(model_sub)/2 < x.event_index THEN named_struct('event',"",'move_type',"",'_cost',0,"index",x.event_index)
                        else named_struct('event', x.event, 'move_type', x.move_type,"_cost",x._cost,"index",x.event_index) END)
                         AS new_alignment FROM UPDATED_BASE_DATA)

      SELECT  trace_id,
                time_stamp as ts,
                node_id as current_id,
                concat(previous_events,concat_ws("",event_array)) as previous_events,
                current_event_level + len as event_level,
                level as current_node_level,
                label,
                TRANSFORM(
                        new_alignment,
                            x -> named_struct('event', x.event, 'move_type', x.move_type)
                        
                 ) AS alignment,
                cost_of_alignment + aggregate(new_alignment._cost, 0, (acc, x) -> acc + x) AS cost_of_alignment,
                event_array,
                event_index FROM UPDATED_ALIGNMENTS 
""")
        result_df = result_df.withColumn("batch_id",F.lit(batch_id))

        result_df.write.format("delta").mode("append").option("checkpointLocation", "/tmp/delta/state_append_30033/").saveAsTable("state_test_batch")

query = streaming_df_batch.writeStream.foreachBatch(process_batch).start()


In [0]:
%sql
select * from state_test_batch 

Add events to iws_event

In [0]:
schema = StructType([
    StructField("event", StringType(), True),
    StructField("time_stamp", TimestampType(), True),
    StructField("trace_id", StringType(), True)
])
streaming_df_7= spark.readStream \
    .format("parquet").schema(schema) \
    .load('dbfs:/FileStore/shared_uploads/full/*')