In [None]:
# @title ###### Licensed to the Apache Software Foundation (ASF), Version 2.0 (the "License")

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License

# Anomaly Detection with Ensemble Models using Apache Beam (Isolation Forest)

<table align="left">
  <td>
    <a target="_blank" href="https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/beam-ml/anomaly_detection/anomaly_detection_iforest.ipynb"><img src="https://raw.githubusercontent.com/google/or-tools/main/tools/colab_32px.png" />Run in Google Colab</a>
  </td>
  <td>
    <a target="_blank" href="https://github.com/apache/beam/blob/master/examples/notebooks/beam-ml/anomaly_detection/anomaly_detection_iforest.ipynb"><img src="https://raw.githubusercontent.com/google/or-tools/main/tools/github_32px.png" />View source on GitHub</a>
  </td>
</table>

This notebook demonstrates how to perform anomaly detection on streaming data using the AnomalyDetection PTransform, a new feature introduced in Apache Beam 2.64.0 with more improvement on offline model support in 2.65.0.

We will first fetch the data set of Statlog (shuttle) from UCI Machine Learning Repository (cached in gs://apache-beam-samples/anomaly_detection/shuttle/, original link: https://archive.ics.uci.edu/dataset/148/statlog+shuttle). This data will be streamed into the pipeline following a periodic impulse. Our Beam pipeline will then apply the AnomalyDetection PTransform with the a pre-trained isolation forest model, compute model metrics.

We demonstrate running the pipeline with Prism (our new local runner) and Dataflow Runner.

## Preparation

### Setting Environment Variables

In [13]:
# GCP project id
PROJECT_ID = 'apache-beam-testing'  # @param {type:'string'}

# Temporary root path, used to store generated files (and temp and staging files if running on Dataflow)
TEMP_ROOT = 'gs://apache-beam-testing-temp'  # @param {type:'string'}

# Required if running on Dataflow
REGION = 'us-central1'  # @param {type:'string'}

# TODO: Change this to an official release once 2.65.0 is available
BEAM_VERSION = '2.65.0rc2'

import random
SUFFIX = str(random.randint(0, 10000))

In [2]:
from google.colab import auth
auth.authenticate_user(project_id=PROJECT_ID)

### Installing Beam and Other Dependencies

In [3]:
# For running with local prism runner
!pip install 'apache_beam[interactive]=={BEAM_VERSION}' --quiet

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m89.7/89.7 kB[0m [31m3.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m152.0/152.0 kB[0m [31m7.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m43.5/43.5 kB[0m [31m2.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Preparing metadata (setup.py) ... [?25l[?25hdone
  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m17.4/17.4 MB[0m [31m69.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.3/3.3 MB[0m [31m87.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m5.7/5.7 MB[0m [31m76.9 MB/s[0m eta [36m0:00:00[0m
[2

In [4]:
# Download the latest prism
# TODO: We don't need this step once 2.65.0 is available.
! wget https://dist.apache.org/repos/dist/dev/beam/2.65.0/prism/linux/amd64/apache_beam-v2.65.0-prism-linux-amd64.zip

--2025-05-08 00:53:47--  https://dist.apache.org/repos/dist/dev/beam/2.65.0/prism/linux/amd64/apache_beam-v2.65.0-prism-linux-amd64.zip
Resolving dist.apache.org (dist.apache.org)... 13.90.137.153
Connecting to dist.apache.org (dist.apache.org)|13.90.137.153|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 24922389 (24M) [application/octet-stream]
Saving to: ‘apache_beam-v2.65.0-prism-linux-amd64.zip’


2025-05-08 00:53:48 (16.2 MB/s) - ‘apache_beam-v2.65.0-prism-linux-amd64.zip’ saved [24922389/24922389]



In [5]:
# Install pyod for offline anomaly detectors
!pip install pyod==2.0.3

Collecting pyod==2.0.3
  Downloading pyod-2.0.3.tar.gz (169 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/169.6 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m169.6/169.6 kB[0m [31m5.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyod
  Building wheel for pyod (setup.py) ... [?25l[?25hdone
  Created wheel for pyod: filename=pyod-2.0.3-py3-none-any.whl size=200466 sha256=8986d8fbc9aed9ae64b9cce885ceac4a00be9ce294d60a9bcbf78301fa6a20d5
  Stored in directory: /root/.cache/pip/wheels/2d/60/5b/f74eccd2c9c892a2c298202ca510f10995f9940647fcc2d97f
Successfully built pyod
Installing collected packages: pyod
Successfully installed pyod-2.0.3


## Part 1: Training and Scoring with an Offline Isolation Forest Model

### Model Training

In [6]:
# Download the sample data from GCS
train_data_fn = "./shuttle.trn"
! gcloud storage cp "gs://apache-beam-samples/anomaly_detection/shuttle/shuttle.trn" {train_data_fn}

Copying gs://apache-beam-samples/anomaly_detection/shuttle/shuttle.trn to file://./shuttle.trn

Average throughput: 61.0MiB/s


In [8]:
import pandas as pd
import pyod.models.iforest as iforest
import pickle

FIELD_NAMES = ["time", "f1", "f2", "f3", "f4", "f5", "f6", "f7", "f8", "target"]
SEP = " "
df = pd.read_csv(train_data_fn, sep=" ", names=FIELD_NAMES)

# We don't need the time and target field for training.
train_data = df.drop(['time', 'target'], axis=1)
train_data_np = train_data.to_numpy()

# Training an isolation forest model
my_iforest = iforest.IForest(random_state=1234)
my_iforest.fit(train_data_np)

# Save the model into a file
pickled_fn = './iforest_pickled'
with open(pickled_fn, 'wb') as fp:
  pickle.dump(my_iforest, fp)

In [14]:
# Upload the pickled model file to GCS
PICKLED_PATH = TEMP_ROOT + '/anomaly/iforest-notebook-' + SUFFIX + '/pickled'
pickled_fn_gcs = PICKLED_PATH + '/iforest.pickled'

! gcloud storage cp {pickled_fn} {pickled_fn_gcs}

Copying file://./iforest_pickled to gs://apache-beam-testing-temp/anomaly/iforest-notebook-3489/pickled/iforest.pickled


### Defining a Streaming Source and a DoFn for Model Metrics

In [15]:
import math
from typing import Any
from collections.abc import Sequence

import sklearn

import apache_beam as beam
from apache_beam.coders import PickleCoder
from apache_beam.coders import VarIntCoder
from apache_beam.transforms.periodicsequence import PeriodicImpulse
from apache_beam.transforms.userstate import ReadModifyWriteStateSpec
from apache_beam.transforms.window import FixedWindows
from apache_beam.ml.anomaly.base import AnomalyResult

class SequenceToPeriodicStream(beam.PTransform):
  """ A streaming source that generate periodic event based on a given sequence. """
  def __init__(self, data:Sequence[Any], delay: float = 0.1, repeat: bool = True):
    self._data = data
    self._delay = delay
    self._repeat = repeat

  class EmitOne(beam.DoFn):
    INDEX_SPEC = ReadModifyWriteStateSpec('index', VarIntCoder())

    def __init__(self, data, repeat):
      self._data = data
      self._repeat = repeat
      self._max_index = len(self._data)

    def process(self, element, model_state=beam.DoFn.StateParam(INDEX_SPEC)):
      index = model_state.read() or 0
      if index >= self._max_index:
        return

      yield self._data[index]

      index += 1
      if self._repeat:
        index %= self._max_index
      model_state.write(index)

  def expand(self, input):
    return (input | PeriodicImpulse(fire_interval=self._delay)
        | beam.Map(lambda x: (0, x))
        | beam.ParDo(SequenceToPeriodicStream.EmitOne(self._data, self._repeat))
        | beam.WindowInto(FixedWindows(self._delay)))


class ComputeMetrics(beam.DoFn):
    """ A DoFn to compute Area Under Curve (AUC) """
    METRIC_STATE_INDEX = ReadModifyWriteStateSpec('saved_tracker', PickleCoder())

    def __init__(self, get_target):
        self._underlying: tuple[list[float], list[int]]
        self._get_target = get_target

    def process(self,
              element: tuple[Any, AnomalyResult],
              metric_state=beam.DoFn.StateParam(METRIC_STATE_INDEX),
              **kwargs):
        self._underlying: tuple[list[float], list[int]] = metric_state.read()
        if self._underlying is None:
            scores = []
            labels = []
            targets = []
            self._underlying = (scores, labels, targets)
        else:
            scores, labels, targets = self._underlying

        prediction = next(iter(element[1].predictions))
        if math.isnan(prediction.score):
            yield element[0], beam.Row()
        else:
            # buffer the scores and targets for auc computation
            scores.append(prediction.score)
            labels.append(prediction.label)
            targets.append(self._get_target(element[1].example))

            accuracy = sklearn.metrics.accuracy_score(targets, labels)
            recall = sklearn.metrics.recall_score(targets, labels)
            precision = sklearn.metrics.precision_score(targets, labels)
            f1 = sklearn.metrics.f1_score(targets, labels)
            fpr, tpr, _ = sklearn.metrics.roc_curve(targets, scores)
            auc = sklearn.metrics.auc(fpr, tpr)

            yield element[0], beam.Row(id=element[1].example.id,
                                       target=element[1].example.target,
                                       predicted_label=next(iter(element[1].predictions)).label,
                                       predicted_score=next(iter(element[1].predictions)).score,
                                       accuracy=float(accuracy),
                                       recall=float(recall),
                                       precision=float(precision),
                                       f1=float(f1),
                                       auc=float(auc))

        metric_state.write(self._underlying)

### Preparing Test Data for Streaming

In [16]:
# Download the data from GCS
test_data_fn = "./test.trn"
! gcloud storage cp "gs://apache-beam-samples/anomaly_detection/shuttle/shuttle.tst" {test_data_fn}

Copying gs://apache-beam-samples/anomaly_detection/shuttle/shuttle.tst to file://./test.trn


In [17]:
from apache_beam.io.filesystems import FileSystems
import pandas as pd

FIELD_NAMES = ["time", "f1", "f2", "f3", "f4", "f5", "f6", "f7", "f8", "target"]
SEP = " "
with FileSystems().open(test_data_fn) as f:
  df = pd.read_csv(f, sep=" ", names=FIELD_NAMES)
  # just use first 500 instances for demo
  df = df[:500]
  rows = [row.to_dict() for _, row in df.iterrows()]
  for i, row in enumerate(rows):
    row["id"] = i

# Dropping time and target for testing
test_data_cols = FIELD_NAMES.copy()
test_data_cols.remove("time")
test_data_cols.remove("target")

### Constructing the Pipeline and Running with Prism

In [18]:
from apache_beam.ml.anomaly.detectors.pyod_adapter import PyODFactory

# Create detector for PyOd model pickled file
detector = PyODFactory.create_detector(pickled_fn_gcs, features=test_data_cols)

In [19]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.ml.anomaly.transforms import AnomalyDetection
from apache_beam.transforms.window import GlobalWindows
from apache_beam.io import fileio

import logging
logging.getLogger().setLevel(logging.INFO)

# Running the pipeline on prism
options = PipelineOptions([
    "--streaming",
    "--job_server_timeout=600",
    "--environment_type=LOOPBACK",
    # TODO: remove --prism_location once 2.65 is released
    "--runner=PrismRunner", "--prism_location=./apache_beam-v2.65.0-prism-linux-amd64.zip"
])

with beam.Pipeline(options=options) as p:
  _ = (p
       | SequenceToPeriodicStream(rows, delay=1, repeat=True)
       | beam.Map(lambda x: beam.Row(**x))
       | beam.WithKeys(0)
       | AnomalyDetection(detector=detector)
       | beam.WindowInto(GlobalWindows()) # put everything into global window to compute overall auc
       | beam.ParDo(ComputeMetrics(lambda x: 1 if x.target != 1 else 0))
       | beam.LogElements()
  )

INFO:apache_beam.runners.worker.worker_pool_main:Listening for workers at localhost:44645
INFO:apache_beam.runners.portability.prism_runner:Using local prism binary/zip from ./apache_beam-v2.65.0-prism-linux-amd64.zip
INFO:apache_beam.runners.portability.prism_runner:Unzipping prism from ./apache_beam-v2.65.0-prism-linux-amd64.zip to /root/.apache_beam/cache/prism/bin/apache_beam-v2.65.0-prism-linux-amd64
INFO:apache_beam.runners.portability.prism_runner:Prism binary path resolved to: /root/.apache_beam/cache/prism/bin/apache_beam-v2.65.0-prism-linux-amd64
INFO:apache_beam.utils.subprocess_server:Starting service with ('/root/.apache_beam/cache/prism/bin/apache_beam-v2.65.0-prism-linux-amd64' '--job_port' '38741' '--serve_http' 'False')
INFO:apache_beam.utils.subprocess_server:[2m[37m[2025-05-08T01:05:34.948932456Z][0m [42m[30m INFO [0m [32mServing JobManagement[0m
INFO:apache_beam.utils.subprocess_server:[34m*[0m [35mendpoint[0m: [4m[34mlocalhost:38741[0m[0m
INFO:apach

(0, Row(id=0, target=4, predicted_label=0, predicted_score=-0.043273046417952266, accuracy=0.0, recall=0.0, precision=0.0, f1=0.0, auc=nan))
(0, Row(id=1, target=4, predicted_label=0, predicted_score=-0.1475705627129747, accuracy=0.0, recall=0.0, precision=0.0, f1=0.0, auc=nan))
(0, Row(id=2, target=1, predicted_label=0, predicted_score=-0.11675834074391167, accuracy=0.3333333333333333, recall=0.0, precision=0.0, f1=0.0, auc=0.5))
(0, Row(id=3, target=4, predicted_label=0, predicted_score=-0.08698219095013143, accuracy=0.25, recall=0.0, precision=0.0, f1=0.0, auc=0.6666666666666666))
(0, Row(id=4, target=1, predicted_label=0, predicted_score=-0.12194074770933055, accuracy=0.4, recall=0.0, precision=0.0, f1=0.0, auc=0.6666666666666666))
(0, Row(id=5, target=1, predicted_label=0, predicted_score=-0.15440580096479756, accuracy=0.5, recall=0.0, precision=0.0, f1=0.0, auc=0.7777777777777778))


  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


(0, Row(id=6, target=1, predicted_label=0, predicted_score=-0.05099042972651385, accuracy=0.5714285714285714, recall=0.0, precision=0.0, f1=0.0, auc=0.6666666666666666))
(0, Row(id=7, target=1, predicted_label=0, predicted_score=-0.17796149714613052, accuracy=0.625, recall=0.0, precision=0.0, f1=0.0, auc=0.7333333333333333))
(0, Row(id=8, target=1, predicted_label=0, predicted_score=-0.15675169491269175, accuracy=0.6666666666666666, recall=0.0, precision=0.0, f1=0.0, auc=0.7777777777777778))
(0, Row(id=9, target=1, predicted_label=0, predicted_score=-0.16808018675634556, accuracy=0.7, recall=0.0, precision=0.0, f1=0.0, auc=0.8095238095238095))
(0, Row(id=10, target=1, predicted_label=0, predicted_score=-0.12107176709195522, accuracy=0.7272727272727273, recall=0.0, precision=0.0, f1=0.0, auc=0.7916666666666667))
(0, Row(id=11, target=4, predicted_label=0, predicted_score=-0.1493176313563489, accuracy=0.6666666666666666, recall=0.0, precision=0.0, f1=0.0, auc=0.71875))
(0, Row(id=12, tar

  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


(0, Row(id=15, target=1, predicted_label=0, predicted_score=-0.17606284623641083, accuracy=0.75, recall=0.0, precision=0.0, f1=0.0, auc=0.6666666666666667))
(0, Row(id=16, target=1, predicted_label=0, predicted_score=-0.1374089909516345, accuracy=0.7647058823529411, recall=0.0, precision=0.0, f1=0.0, auc=0.6538461538461539))
(0, Row(id=17, target=4, predicted_label=0, predicted_score=-0.11195671475496366, accuracy=0.7222222222222222, recall=0.0, precision=0.0, f1=0.0, auc=0.6923076923076923))
(0, Row(id=18, target=4, predicted_label=0, predicted_score=-0.052353477210396016, accuracy=0.6842105263157895, recall=0.0, precision=0.0, f1=0.0, auc=0.7307692307692307))
(0, Row(id=19, target=1, predicted_label=0, predicted_score=-0.13749997611032588, accuracy=0.7, recall=0.0, precision=0.0, f1=0.0, auc=0.7261904761904762))
(0, Row(id=20, target=1, predicted_label=1, predicted_score=0.0209395012825373, accuracy=0.6666666666666666, recall=0.0, precision=0.0, f1=0.0, auc=0.6777777777777778))
(0, R

Exception in thread wait_until_finish_read:
Traceback (most recent call last):
  File "/usr/lib/python3.11/threading.py", line 1045, in _bootstrap_inner
Exception in thread run_worker_job-001[job]_ref_Environment_default_environment_1:
Traceback (most recent call last):
  File "/usr/lib/python3.11/threading.py", line 1045, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.11/threading.py", line 982, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.11/dist-packages/apache_beam/runners/portability/portable_runner.py", line 534, in read_messages
    for message in self._message_stream:
  File "/usr/local/lib/python3.11/dist-packages/grpc/_channel.py", line 543, in __next__
    return self._next()
           ^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/grpc/_channel.py", line 969, in _next
    raise self
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.UNAVAILABLE

KeyboardInterrupt: 

### Alternative: Running the Pipeline with Dataflow Runner

In [None]:
# Environment Variables for Dataflow Runner
TEMP_LOCATION = TEMP_ROOT + '/anomaly/iforest-notebook-' + SUFFIX + '/temp'
STAGING_LOCATION = TEMP_ROOT + '/anomaly/iforest-notebook-' + SUFFIX + '/staging'

In [None]:
# For running with dataflow runner
!pip install 'apache_beam[gcp, interactive]=={BEAM_VERSION}' --quiet

In [None]:
# Running the pipeline on dataflow
options = PipelineOptions([
  "--runner=DataflowRunner",
  "--temp_location=" + TEMP_LOCATION,
  "--staging_location=" + STAGING_LOCATION,
  "--project=" + PROJECT_ID,
  "--region=" + REGION,
  "--extra_package=gs://shunping-test/anomaly-temp/pyod-2.0.3.tar.gz",
])

with beam.Pipeline(options=options) as p:
  _ = (p
       | SequenceToPeriodicStream(rows, delay=1, repeat=True)
       | beam.Map(lambda x: beam.Row(**x))
       | beam.WithKeys(0)
       | AnomalyDetection(detector=detector)
       | beam.WindowInto(GlobalWindows()) # put everything into global window to compute overall auc
       | beam.ParDo(ComputeMetrics(lambda x: 1 if x.target != 1 else 0))
       | beam.LogElements()
  )