# Distributed Data Processing (Q5)

Apache Spark is used for the executable parts; MapReduce design is explained in markdown.

## a) MapReduce DFG design

We model each log row as `(line_no, raw_line) in N0 x S`.

- Map 1: `m1 : N0 x S -> C x (T x A)`, `m1(i, line) = (case_id, (ts, act))`. Splits the CSV row into CaseID, Activity, Timestamp, Resource and keeps `(ts, act)` keyed by case.
- Reduce 1: `r1 : C x multiset(T x A) -> C x list(A)`, `r1(c, events) = (c, ordered_activities)` where `events` are sorted by timestamp and activities are returned as an ordered list per case.
- Map 2: `m2 : C x list(A) -> multiset(A x A)`, `m2(c, [a1,...,an])` emits `((a_i, a_{i+1}), 1)` for all consecutive pairs.
- Reduce 2: `r2 : (A x A) x multiset(N0) -> (A x A) x N0`, sums the counts per directly-follows pair to obtain DFG arc weights.

This yields the directly-follows counts per activity pair.

In [7]:
from pathlib import Path
import pyspark
from pyspark import SparkContext
import matplotlib.pyplot as plt
import networkx as nx

sc = SparkContext.getOrCreate()

In [9]:
log_path_candidates = [
    Path("big-data/event_log.csv.gz"),
    Path("files/big-data/event_log.csv.gz"),
    Path("../files/big-data/event_log.csv.gz"),
    Path("data/event_log.csv.gz"),
]
for p in log_path_candidates:
    if p.exists():
        log_path = p
        break
else:
    raise FileNotFoundError(f"event_log.csv not found in: {log_path_candidates}")

print(f"Using log file: {log_path}")

Using log file: data/event_log.csv.gz


In [10]:
# Parse CSV lines into typed tuples: (case_id, activity, timestamp, resource)
log_rdd = sc.textFile(str(log_path))
raw = log_rdd.map(lambda line: line.split(","))

def clean_row(parts):
    if len(parts) != 4:
        return None
    try:
        ts_val = int(float(parts[2]))  # accepts '123' and '123.0'
    except ValueError:
        return None
    return (parts[0], parts[1], ts_val, parts[3])

parsed_rdd = raw.map(clean_row).filter(lambda x: x is not None)

# Show a few parsed rows
print(parsed_rdd.take(5))


25/12/20 14:27:05 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)/ 1]
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1247, in main
    process()
  File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1239, in process
    serializer.dump_stream(out_iter, outfile)
  File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 274, in dump_stream
    vs = list(itertools.islice(iterator, batch))
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/bitnami/spark/python/pyspark/rdd.py", line 2849, in takeUpToNumLeft
    yield next(iterator)
          ^^^^^^^^^^^^^^
  File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/util.py", line 83, in wrapper
    return f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^
  File "/tmp/ipykernel_207/2359505431.py", line 3, in <lambda>
ValueError: invalid literal for int() with base 10: '11536

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0) (aa62a55fc4f3 executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1247, in main
    process()
  File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1239, in process
    serializer.dump_stream(out_iter, outfile)
  File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 274, in dump_stream
    vs = list(itertools.islice(iterator, batch))
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/bitnami/spark/python/pyspark/rdd.py", line 2849, in takeUpToNumLeft
    yield next(iterator)
          ^^^^^^^^^^^^^^
  File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/util.py", line 83, in wrapper
    return f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^
  File "/tmp/ipykernel_207/2359505431.py", line 3, in <lambda>
ValueError: invalid literal for int() with base 10: '1153692000.0'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:784)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$.$anonfun$runJob$1(PythonRDD.scala:181)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2433)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2414)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2433)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:181)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1247, in main
    process()
  File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1239, in process
    serializer.dump_stream(out_iter, outfile)
  File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 274, in dump_stream
    vs = list(itertools.islice(iterator, batch))
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/bitnami/spark/python/pyspark/rdd.py", line 2849, in takeUpToNumLeft
    yield next(iterator)
          ^^^^^^^^^^^^^^
  File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/util.py", line 83, in wrapper
    return f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^
  File "/tmp/ipykernel_207/2359505431.py", line 3, in <lambda>
ValueError: invalid literal for int() with base 10: '1153692000.0'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:784)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$.$anonfun$runJob$1(PythonRDD.scala:181)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2433)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	... 1 more


## b) DFG discovery with PySpark

Transformations used:
- `textFile` to load the log into an RDD of strings.
- `map` to parse rows into `(case, act, ts, res)` tuples and to create keyed structures.
- `groupByKey` to collect all events per case.
- `flatMap` on each grouped case to emit consecutive activity pairs.
- `reduceByKey` to sum the counts per `(act_i, act_j)` directly-follows edge.

In [None]:
# Group by case, order events by timestamp, and emit directly-follows pairs
case_events = parsed_rdd.map(lambda x: (x[0], (x[2], x[1])))  # (case, (ts, act))

def case_to_pairs(kv):
    _case, events = kv
    ordered = sorted(list(events), key=lambda e: e[0])
    return [((ordered[i][1], ordered[i + 1][1]), 1) for i in range(len(ordered) - 1)]

dfg_counts_rdd = (
    case_events
    .groupByKey()
    .flatMap(case_to_pairs)
    .reduceByKey(lambda a, b: a + b)
)

dfg_counts = dfg_counts_rdd.collect()
print(f"Number of distinct edges: {len(dfg_counts)}")
print("Top 10 edges:")
for (src, dst), cnt in sorted(dfg_counts, key=lambda x: x[1], reverse=True)[:10]:
    print(f"{src} -> {dst}: {cnt}")

In [None]:
# Helper to plot a DFG using networkx

def plot_dfg(counts, title="DFG", min_weight=0, top_n=None):
    edges = sorted(counts, key=lambda x: x[1], reverse=True)
    if top_n is not None:
        edges = edges[:top_n]
    edges = [e for e in edges if e[1] >= min_weight]

    G = nx.DiGraph()
    for (src, dst), w in edges:
        G.add_edge(src, dst, weight=w, label=str(w))

    plt.figure(figsize=(10, 7))
    pos = nx.spring_layout(G, seed=42, k=0.5)
    nx.draw_networkx_nodes(G, pos, node_size=800, node_color="#87ceeb")
    nx.draw_networkx_labels(G, pos, font_size=9)
    nx.draw_networkx_edges(G, pos, arrowstyle="->", arrowsize=15, width=1.5)
    edge_labels = {(u, v): d["label"] for u, v, d in G.edges(data=True)}
    nx.draw_networkx_edge_labels(G, pos, edge_labels=edge_labels, font_size=8)
    plt.title(title)
    plt.axis("off")
    plt.tight_layout()
    plt.show()

# Plot the unfiltered DFG (top 40 edges to keep the visualization readable)
plot_dfg(dfg_counts, title="Directly Follows Graph (unfiltered)", top_n=40)

## c) Filtering low-frequency arcs (< 100)

- **MapReduce update:** add a `filter(count >= 100)` after `r2` (or in a third map) to discard low-frequency edges before output.
- **Spark change:** insert a `filter` on the `(edge, count)` RDD before collecting/plotting.

In [None]:
threshold = 100
filtered_dfg_counts_rdd = dfg_counts_rdd.filter(lambda kv: kv[1] >= threshold)
filtered_counts = filtered_dfg_counts_rdd.collect()

print(f"Edges after filtering >= {threshold}: {len(filtered_counts)}")
print("Top 10 filtered edges:")
for (src, dst), cnt in sorted(filtered_counts, key=lambda x: x[1], reverse=True)[:10]:
    print(f"{src} -> {dst}: {cnt}")

In [None]:
plot_dfg(filtered_counts, title=f"DFG filtered (>= {threshold})", min_weight=threshold, top_n=40)

## d) Most common resource per activity

PySpark transformations:
- `map` to create `((activity, resource), 1)` pairs.
- `reduceByKey` to count occurrences per `(activity, resource)`.
- `map` to re-key by activity: `(activity, (resource, count))`.
- `reduceByKey` with a max on count to keep only the most common resource per activity.

In [None]:
# Count resource usage per activity and pick the most frequent resource for each activity
act_res_counts = (
    parsed_rdd
    .map(lambda x: ((x[1], x[3]), 1))
    .reduceByKey(lambda a, b: a + b)
)

most_common_resource = (
    act_res_counts
    .map(lambda kv: (kv[0][0], (kv[0][1], kv[1])))
    .reduceByKey(lambda a, b: a if a[1] >= b[1] else b)
)

results = sorted(most_common_resource.collect(), key=lambda x: x[0])
print("Most common resource per activity:")
for act, (res, cnt) in results:
    print(f"{act}: {res} (count={cnt})")