In [38]:
# import timesfm
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import pyod.models.iforest as iforest
import pickle
from google import genai
from IPython.display import Markdown, display
from apache_beam.ml.inference.base import ModelHandler
from faker import Faker
import json
import os

In [2]:
from dotenv import load_dotenv
load_dotenv()

True

In [3]:
# Initialize Faker
fake = Faker()

# Simulated Data

Here we define the patter for simulating normal and anomalous behavior of a cloud activity of a particular user.

* **Data Composition:** The dataset consists of simulated user activity logs, where each entry includes:
    * `access_type`: The defined access level of the user account (e.g., `no_access`, `reader`, `editor`) or its anomalous counterpart (e.g., `anomaly_no_access`).
    * `num_read`: The count of read operations performed.
    * `num_edits`: The count of edit operations performed.
    * `num_downloads`: The count of download operations performed.
    * `num_encryption`: The count of encryption operations performed.

* **Defining Normal Behavior:** Normal activity patterns are specific to each `access_type`:
    * **For `no_access` accounts:**
        * All activity metrics (`num_read`, `num_edits`, `num_downloads`, `num_encryption`) are consistently zero, as these accounts should have no system interaction.
    * **For `reader` accounts:**
        * `num_read`: Typically moderate (e.g., around 100).
        * `num_edits`: Consistently zero (readers do not edit).
        * `num_downloads`: Typically low (e.g., around 10).
        * `num_encryption`: Consistently zero (readers do not encrypt).
    * **For `editor` accounts:**
        * `num_read`: Typically low (e.g., around 10).
        * `num_edits`: Typically high (e.g., around 200), reflecting their primary function.
        * `num_downloads`: Typically low (e.g., around 5).
        * `num_encryption`: Typically low (e.g., around 10).


* **Defining Anomalous Behavior:** Anomalous data represents significant deviations from these established normal patterns, often indicated by an `access_type` prefixed with "anomaly\_":
    * **For `anomaly_no_access` accounts (originally `no_access`):**
        * Any recorded activity (e.g., even small numbers of `num_read` around 5 or `num_edits` around 3) is anomalous, as no activity is expected.
    * **For `anomaly_reader` accounts (originally `reader`):**
        * `num_read`: Unusually high (e.g., around 350).
        * `num_edits`: Any positive value is anomalous (e.g., around 20), as readers should not edit.
        * `num_downloads`: Unusually high (e.g., around 75).
        * `num_encryption`: Any positive value is anomalous (e.g., around 10), as readers should not encrypt.
    * **For `anomaly_editor` accounts (originally `editor`):**
        * `num_read`: Unusually high for an editor (e.g., around 150).
        * `num_edits`: Unusually low compared to normal editor activity (e.g., around 25).
        * `num_downloads`: Unusually high (e.g., around 50).
        * `num_encryption`: Unusually high (e.g., around 70).

## Code for generating normal and anomalous data point

In [4]:
import random

def generate_simulation_data(access_type: str) -> dict:
    """
    Generates simulated data based on the access type.

    The function simulates user activity metrics (reads, edits, downloads, encryptions)
    based on predefined access levels: "no_access", "reader", or "editor".
    Each metric is generated from a normal distribution with specified means and
    standard deviations, ensuring results are non-negative integers.

    Args:
        access_type: A string indicating the type of access.
                     Expected values: "no_access", "reader", "editor".

    Returns:
        A dictionary containing the simulated data:
        {
            "access_type": str,    # The input access type
            "num_read": int,       # Simulated number of reads
            "num_edits": int,      # Simulated number of edits
            "num_downloads": int,  # Simulated number of downloads
            "num_encryption": int  # Simulated number of encryptions
        }

    Raises:
        ValueError: If an unknown access_type is provided.
    """
    # Initialize all metrics to 0
    num_read = 0
    num_edits = 0
    num_downloads = 0
    num_encryption = 0

    # Rule 1: No access
    if access_type == "no_access":
        # All metrics remain 0, so no action needed here.
        pass
    # Rule 2: Reader access
    elif access_type == "reader":
        # num_read is N(100, 50)
        num_read = max(0, round(random.normalvariate(100, 50)))
        # num_downloads is N(10, 5)
        num_downloads = max(0, round(random.normalvariate(10, 5)))
        # num_edits and num_encryption remain 0 for "reader"
    # Rule 3: Editor access
    elif access_type == "editor":
        # num_read is N(10, 10)
        num_read = max(0, round(random.normalvariate(10, 10)))
        # num_edits is N(200, 100)
        num_edits = max(0, round(random.normalvariate(200, 100)))
        # num_downloads is N(5, 2)
        num_downloads = max(0, round(random.normalvariate(5, 2)))
        # num_encryption is N(10, 10)
        num_encryption = max(0, round(random.normalvariate(10, 10)))
    # Handle unknown access types
    else:
        raise ValueError(
            f"Unknown access_type: '{access_type}'. "
            "Expected 'no_access', 'reader', or 'editor'."
        )

    # Return the generated data as a dictionary
    return {
        "user_name": fake.user_name(),
        "access_type": access_type,
        "num_read": num_read,
        "num_edits": num_edits,
        "num_downloads": num_downloads,
        "num_encryption": num_encryption
    }

def generate_anomaly_data(access_type: str) -> dict:
    """
    Generates anomalous simulated data based on the access type.

    This function creates data points that deviate from the normal patterns
    defined in `generate_simulation_data`, using different fixed distributions.

    Args:
        access_type: A string indicating the type of access for which to generate an anomaly.
                     Expected values: "no_access", "reader", "editor".

    Returns:
        A dictionary containing the anomalous simulated data.
        Structure is the same as `generate_simulation_data`.

    Raises:
        ValueError: If an unknown access_type is provided.
    """
    num_read = 0
    num_edits = 0
    num_downloads = 0
    num_encryption = 0

    if access_type == "no_access":
        # Anomaly: Some unexpected activity
        num_read = max(0, round(random.normalvariate(5, 2)))  # Small number of reads
        num_edits = max(0, round(random.normalvariate(3, 1))) # Small number of edits
        # num_downloads and num_encryption remain 0 or very low
        num_downloads = 0
        num_encryption = 0
    elif access_type == "reader":
        # Anomaly: Unusually high activity or unexpected actions
        num_read = max(0, round(random.normalvariate(350, 50))) # Very high reads
        num_edits = max(0, round(random.normalvariate(20, 5)))   # Edits, normally 0
        num_downloads = max(0, round(random.normalvariate(75, 15)))# Very high downloads
        num_encryption = max(0, round(random.normalvariate(10, 3)))# Encryptions, normally 0
    elif access_type == "editor":
        # Anomaly: Unusual pattern of activity for an editor
        num_read = max(0, round(random.normalvariate(150, 30))) # High reads (normally low for editor)
        num_edits = max(0, round(random.normalvariate(25, 10)))  # Low edits (normally high)
        num_downloads = max(0, round(random.normalvariate(50, 10)))# High downloads
        num_encryption = max(0, round(random.normalvariate(70, 20)))# Very high encryptions
    else:
        raise ValueError(
            f"Unknown access_type for anomaly: '{access_type}'. "
            "Expected 'no_access', 'reader', or 'editor'."
        )

    return {
        "user_name": fake.user_name(),
        "access_type": access_type, # Mark as anomaly
        "num_read": num_read,
        "num_edits": num_edits,
        "num_downloads": num_downloads,
        "num_encryption": num_encryption
    }


# Main block for example usage (this will run when the script is executed directly)
dataset = []
# Define the number of samples for each access type
access_types_to_simulate_for_normal = (["no_access"] * 200 +
                            ["reader"] * 200 +
                            ["editor"] * 200)
# Define the number of samples for each access type
access_types_to_simulate_for_anomaly = (["no_access"] * 20 +
                            ["reader"] * 20 +
                            ["editor"] * 20)

access_types_to_simulate = access_types_to_simulate_for_normal + access_types_to_simulate_for_anomaly

random.shuffle(access_types_to_simulate) # Shuffle to mix the types

for acc_type in access_types_to_simulate:
    dataset.append(generate_simulation_data(acc_type))

## Train an isolation forest model

In [5]:
df_train = pd.DataFrame(dataset)

In [6]:
model_iforest = iforest.IForest(random_state=1234, contamination=0.1)

In [7]:
df_train_with_dummies = pd.get_dummies(df_train.drop("user_name", axis=1))

In [8]:
model_iforest.fit(df_train_with_dummies)

IForest(behaviour='old', bootstrap=False, contamination=0.1, max_features=1.0,
    max_samples='auto', n_estimators=100, n_jobs=1, random_state=1234,
    verbose=0)

## Saving the Model

In [9]:
model_save_path = './iforest.pkl'
with open(model_save_path, 'wb') as fp:
  pickle.dump(model_iforest, fp)

## Test

In [10]:
# Main block for example usage (this will run when the script is executed directly)
anomaly_dataset = []
# Define the number of samples for each access type
access_types_to_simulate = (
    ["no_access"] * 2 
                            +
                            ["reader"] * 200 +
                            ["editor"] * 200
                           )
# random.shuffle(access_types_to_simulate) # Shuffle to mix the types

for acc_type in access_types_to_simulate:
    anomaly_dataset.append(generate_anomaly_data(acc_type))

In [11]:
df_anomaly = pd.DataFrame(anomaly_dataset)

In [12]:
df_anomaly_test_without_user_name = pd.get_dummies(df_anomaly.drop("user_name", axis=1))

In [14]:
model_iforest.predict(df_anomaly_test_without_user_name).mean()



np.float64(0.9950248756218906)

Model was able to detection 99.5 anomalies.

In [15]:
df_anomaly_test_with_username = df_anomaly_test_without_user_name.copy()
df_anomaly_test_with_username["user_name"] = df_anomaly.user_name

## Making anomaly actionable by asking Gemini to find the attack associated with the anomaly

## System Prompt for Alert Detection

In [16]:
system_prompt = """
You are an AI Security Analyst. Your primary function is to analyze logs of user activity that have already been flagged as anomalous and categorize them into specific cybersecurity threat types. You must use the provided context about normal behavior to make your determination.

**Your Task:**

Given a JSON object representing a single anomalous activity record, you must classify this activity into ONE of the following four categories:

1.  Privilege Escalation  
2.  Data Exfiltration  
3.  Ransomware Attack  
4.  Unauthorized Access

**Input Data Structure:**

You will receive a JSON object with the following fields:

```json  
{  
    "access_type": "anomaly_no_access" | "anomaly_reader" | "anomaly_editor", // Indicates the original role whose behavior is now anomalous  
    "num_read": int,       // Number of read operations  
    "num_edits": int,      // Number of edit operations  
    "num_downloads": int,  // Number of download operations  
    "num_encryption": int  // Number of encryption operations  
}
```

**Context on Normal (Non-Anomalous) Behavior Patterns:**

Understanding normal behavior is key to identifying the nature of an anomaly. Here's a summary:

* **Normal no_access Profile:**  
  * num_read: 0  
  * num_edits: 0  
  * num_downloads: 0  
  * num_encryption: 0  
  * *Any activity is anomalous.*  
* **Normal reader Profile:**  
  * num_read: Typically around 100 (e.g., from a distribution like N(100, 50)).  
  * num_edits: 0 (Readers do not edit).  
  * num_downloads: Typically around 10 (e.g., from N(10, 5)).  
  * num_encryption: 0 (Readers do not encrypt).  
  * *Anomalies might include any edits, any encryption, or vastly different read/download volumes.*  
* **Normal editor Profile:**  
  * num_read: Typically low, around 10 (e.g., from N(10, 10)).  
  * num_edits: Typically high, around 200 (e.g., from N(200, 100)).  
  * num_downloads: Typically low, around 5 (e.g., from N(5, 2)).  
  * num_encryption: Typically low, around 10 (e.g., from N(10, 10)).  
  * *Anomalies might include unusually high reads, very low edits, very high downloads, or very high encryption relative to their normal baseline.*

**Anomaly Categories & Key Indicators:**

Carefully consider the anomalous data in light of the normal patterns and the access_type prefix (e.g., anomaly_reader was originally a reader).

1. **Privilege Escalation:**  
   * **Definition:** An account performing actions typically reserved for a higher access level or explicitly forbidden for its designated role.  
   * **Key Indicators:**  
     * An anomaly_no_access record showing *any* num_read > 0 or num_edits > 0.  
     * An anomaly_reader record showing num_edits > 0 or num_encryption > 0 (since normal readers cannot perform these actions).  
     * The focus is on *gaining new capabilities* not just volume changes of existing ones.  
2. **Data Exfiltration:**  
   * **Definition:** The unauthorized copying, transfer, or retrieval of large volumes of data from the system.  
   * **Key Indicators:**  
     * An anomaly_reader or anomaly_editor record showing a num_downloads count that is drastically and significantly higher than its normal baseline (e.g., anomaly_reader with num_downloads of 50+ when normal is ~10; anomaly_editor with num_downloads of 30+ when normal is ~5).  
     * May be accompanied by an unusually high num_read count, but the primary indicator is the excessive num_downloads.  
3. **Ransomware Attack:**  
   * **Definition:** Malicious activity involving the unauthorized encryption of data, rendering it inaccessible.  
   * **Key Indicators:**  
     * An anomaly_reader record showing *any* num_encryption > 0 (since normal readers do not encrypt).  
     * An anomaly_editor record showing a num_encryption count that is drastically and significantly higher than its normal baseline (e.g., num_encryption of 50+ when normal is ~10).  
     * May be accompanied by unusual num_edits if files are modified/encrypted in place.  
4. **Unauthorized Access:**  
   * **Definition:** Access to systems, applications, or data without proper authorization, or use of an authorized account in an unauthorized manner that doesn't clearly fit the other categories. This can be an initial breach or misuse of credentials.  
   * **Key Indicators:**  
     * This is the most direct category if an anomaly_no_access record shows *any activity at all*, as it signifies the account was used when it shouldn't have been.  
     * An anomaly_reader or anomaly_editor performing actions that are unusual for their role but don't strongly align with the specific high-impact patterns of Privilege Escalation, Data Exfiltration, or Ransomware. For example:  
       * An anomaly_editor with unusually high num_read (e.g., 100+) and/or unusually low num_edits (e.g., < 50), but without extreme num_downloads or num_encryption spikes. This might indicate an attacker using an editor account for reconnaissance rather than typical editing tasks.  
     * Use this as a category when the activity is clearly suspicious and outside normal bounds for the given role, but doesn't meet the more specific criteria of the other three categories.

Output Format:

Respond with the name of the category and then the possible explanation for the input to belong to that category. If an input belong to multiple categories, then describle all the them one by one.
"""

In [17]:
client = genai.Client(http_options=genai.types.HttpOptions(api_version="v1"))

## Testing of the Gemini with an anomalous input

In [18]:
test_alert_input = {
    'num_read': 345,
    'num_edits': 15,
    'num_downloads': 57,
    'num_encryption': 8,
    'access_type_editor': False,
    'access_type_no_access': False,
    'access_type_reader': True
}

In [19]:
response = client.models.generate_content(
    model="gemini-2.0-flash",
    contents=f'''
    # Data
    
    {test_alert_input}
    ''',
     config=genai.types.GenerateContentConfig(
                system_instruction=system_prompt
            ),
)

In [20]:
Markdown(response.text)

**Data Exfiltration:** The `num_downloads` value is 57. Since this is an anomaly_reader, and readers typically download around 10 files, this significant increase suggests a potential data exfiltration attempt.


In [21]:
def find_attack_associated_with_anomaly(client, alert_data):
    response = client.models.generate_content(
        model="gemini-2.0-flash",
        contents=f'''
        # Data

        {alert_data}
        ''',
         config=genai.types.GenerateContentConfig(
                    system_instruction=system_prompt
                ),
    )
    return response.text

# Beam Pipeline

In [22]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.ml.anomaly.transforms import AnomalyDetection

In [24]:
from apache_beam.ml.anomaly.detectors.pyod_adapter import PyODFactory

# Create detector for PyOd model pickled file
detector = PyODFactory.create_detector(model_save_path, features=df_anomaly_test_without_user_name.columns)

## Test in batch mode with limited data 

In [25]:
df_test_sample = list(df_anomaly_test_with_username.transpose().to_dict().values())[:10]

In [26]:
df_test_sample

[{'num_read': 6,
  'num_edits': 1,
  'num_downloads': 0,
  'num_encryption': 0,
  'access_type_editor': False,
  'access_type_no_access': True,
  'access_type_reader': False,
  'user_name': 'fboyd'},
 {'num_read': 8,
  'num_edits': 4,
  'num_downloads': 0,
  'num_encryption': 0,
  'access_type_editor': False,
  'access_type_no_access': True,
  'access_type_reader': False,
  'user_name': 'kevinwilliams'},
 {'num_read': 375,
  'num_edits': 20,
  'num_downloads': 91,
  'num_encryption': 8,
  'access_type_editor': False,
  'access_type_no_access': False,
  'access_type_reader': True,
  'user_name': 'boothbrian'},
 {'num_read': 363,
  'num_edits': 20,
  'num_downloads': 75,
  'num_encryption': 15,
  'access_type_editor': False,
  'access_type_no_access': False,
  'access_type_reader': True,
  'user_name': 'karen28'},
 {'num_read': 348,
  'num_edits': 17,
  'num_downloads': 77,
  'num_encryption': 9,
  'access_type_editor': False,
  'access_type_no_access': False,
  'access_type_reader': Tru

In [27]:
def fix_access_type(alert_data):
    """
    Fix access type in alert data.
    
    Args:
        alert_data: Tuple of (user_name, data)
        
    Returns:
        Dictionary with fixed access type
    """
    new_alert_data = {}
    for k, v in alert_data[1].items():
        if not k.startswith("access_type_"):
            new_alert_data[k] = v
            continue
        if not v:
            continue
        new_alert_data["access_type"] = k.split("access_type_")[1]
    new_alert_data["user_name"] = alert_data[0]
    return new_alert_data

In [30]:
class GeminiModelHandler(ModelHandler):
    """Model handler for Gemini inference."""
    
    def load_model(self):
        """Load Gemini client."""
        client = genai.Client()
        return client

    def run_inference(self, batch, client, inference_args=None):
        """Run inference on a batch of examples.
        
        Args:
            batch: Batch of examples
            client: Gemini client
            inference_args: Additional arguments for inference
            
        Yields:
            Dictionary with original data and attack classification
        """
        for b in batch:
            yield {"data": b, "attack": find_attack_associated_with_anomaly(client, b)}


In [31]:
def convert_data_to_key_value(data):
    """
    Convert the data to a key-value pair.
    
    Args:
        data: Dictionary containing user data
        
    Returns:
        Tuple of (user_name, data)
    """
    data = data.copy()
    user_name = data["user_name"]
    del data["user_name"]
    return (user_name, data)

In [32]:
with beam.Pipeline() as p:
    _ = (p
         | beam.Create(df_test_sample)
         | beam.Map(convert_data_to_key_value)
         | beam.Map(lambda x: (x[0], beam.Row(**x[1])))
         | AnomalyDetection(detector=detector)
         | beam.Filter(lambda x: x[1].predictions[0].label)  # Filter only anomalies
         | beam.Map(lambda x: (x[0], x[1].example.as_dict()))
         | beam.Map(lambda x: fix_access_type(x))
         | beam.ml.inference.RunInference(model_handler=GeminiModelHandler())
         | "JsonifyInput" >> beam.Map(lambda x: json.dumps(x, indent=2))
         # | beam.Map(print)
         | beam.Map(lambda x: [
             display(Markdown(x)), 
             display(Markdown("---"))])
    )

{
  "data": {
    "num_read": 375,
    "num_edits": 20,
    "num_downloads": 91,
    "num_encryption": 8,
    "access_type": "reader",
    "user_name": "boothbrian"
  },
  "attack": "Data Exfiltration: The `num_downloads` value of 91 is drastically higher than the typical baseline of approximately 10 for a reader. This suggests a large volume of data being copied or transferred from the system, indicative of data exfiltration.\n\nUnauthorized Access: The `num_read` value of 375 is high, suggesting an unauthorized user may be exploring the system beyond normal reader activity.\n"
}

---

{
  "data": {
    "num_read": 363,
    "num_edits": 20,
    "num_downloads": 75,
    "num_encryption": 15,
    "access_type": "reader",
    "user_name": "karen28"
  },
  "attack": "**Data Exfiltration:**\n\nExplanation: The user 'karen28' was initially a 'reader'. The 'num_downloads' count of 75 is drastically higher than the normal baseline for a reader (typically around 10). This suggests a large amount of data has been copied or transferred from the system, which is a key indicator of data exfiltration. Even though number of reads and encryption is also high, download is the primary indicator.\n"
}

---

{
  "data": {
    "num_read": 348,
    "num_edits": 17,
    "num_downloads": 77,
    "num_encryption": 9,
    "access_type": "reader",
    "user_name": "donnagoodman"
  },
  "attack": "Data Exfiltration: The number of downloads (77) is significantly higher than the normal baseline for a reader (around 10). This suggests a large amount of data is being copied or transferred.\n\nUnauthorized Access: The number of reads (348) is unusually high for a reader. This along with the downloads suggests they are doing something they are not meant to, which could just be unauthorized data gathering.\n"
}

---

{
  "data": {
    "num_read": 414,
    "num_edits": 24,
    "num_downloads": 84,
    "num_encryption": 8,
    "access_type": "reader",
    "user_name": "porterandrea"
  },
  "attack": "Data Exfiltration: The num_downloads value of 84 is significantly higher than the normal baseline of approximately 10 for a reader. This suggests a large amount of data was downloaded, indicating potential data exfiltration.\nUnauthorized Access: The num_read value of 414 is significantly higher than the normal baseline of approximately 100 for a reader. While downloads are high, the read volume alone can also suggest unauthorized reconnaissance activities.\n\n"
}

---

{
  "data": {
    "num_read": 307,
    "num_edits": 27,
    "num_downloads": 87,
    "num_encryption": 15,
    "access_type": "reader",
    "user_name": "christopher49"
  },
  "attack": "Data Exfiltration: The num_downloads value of 87 is significantly higher than the reader's normal download baseline of approximately 10. This suggests a potential unauthorized extraction of data.\n\nRansomware Attack: The num_encryption value of 15 is non-zero, which is not a normal operation for a reader account. This suggests a potential ransomware attack.\n"
}

---

{
  "data": {
    "num_read": 290,
    "num_edits": 16,
    "num_downloads": 89,
    "num_encryption": 8,
    "access_type": "reader",
    "user_name": "timothycooke"
  },
  "attack": "Data Exfiltration: The `num_downloads` value (89) is drastically higher than the normal baseline for a reader (typically around 10). This suggests a large amount of data is being copied or transferred.\n\nUnauthorized Access: The `num_read` value (290) is also significantly higher than what is considered normal (typically around 100). This, coupled with the downloads, suggests that a lot of data that is uncharacteristic for reader activities.\n\nPrivilege Escalation: The `num_edits` value is 16. This violates the normal behavior of reader.\n"
}

---

{
  "data": {
    "num_read": 346,
    "num_edits": 18,
    "num_downloads": 85,
    "num_encryption": 15,
    "access_type": "reader",
    "user_name": "tina72"
  },
  "attack": "Data Exfiltration: The num_downloads value of 85 is significantly higher than the normal baseline for a reader (approximately 10), indicating a potential unauthorized copying of data.\n\nRansomware Attack: The num_encryption value of 15 is non-zero which is anomalous for the `reader` access_type.\n"
}

---

{
  "data": {
    "num_read": 378,
    "num_edits": 18,
    "num_downloads": 68,
    "num_encryption": 9,
    "access_type": "reader",
    "user_name": "brian81"
  },
  "attack": "Data Exfiltration: The 'reader' account 'brian81' shows a significantly elevated number of downloads (68) compared to the typical reader profile (around 10). This suggests a potential attempt to exfiltrate data.\n\nUnauthorized Access: The high number of reads (378) is also outside the typical range for a reader, suggesting the account may be used for unauthorized data discovery in addition to the downloads.\n"
}

---

# Streaming with impulse sequence

Here we will create a streaming sequence using impulse generator to run the pipeline

In [34]:
from collections.abc import Sequence
from typing import Any
from apache_beam.coders import VarIntCoder

from apache_beam.transforms.periodicsequence import PeriodicImpulse
from apache_beam.transforms.userstate import ReadModifyWriteStateSpec
from apache_beam.transforms.window import FixedWindows

In [35]:
class SequenceToPeriodicStream(beam.PTransform):
    """ A streaming source that generate periodic event based on a given sequence. """
    def __init__(self, data:Sequence[Any], delay: float = 0.1, repeat: bool = True):
        self._data = data
        self._delay = delay
        self._repeat = repeat

    class EmitOne(beam.DoFn):
        INDEX_SPEC = ReadModifyWriteStateSpec('index', VarIntCoder())

        def __init__(self, data, repeat):
            self._data = data
            self._repeat = repeat
            self._max_index = len(self._data)

        def process(self, element, model_state=beam.DoFn.StateParam(INDEX_SPEC)):
            index = model_state.read() or 0
            if index >= self._max_index:
                return

            yield self._data[index]

            index += 1
            if self._repeat:
                index %= self._max_index
            model_state.write(index)

    def expand(self, input):
        return (
            input | PeriodicImpulse(fire_interval=self._delay)
            | beam.Map(lambda x: (0, x))
            | beam.ParDo(SequenceToPeriodicStream.EmitOne(self._data, self._repeat))
            | beam.WindowInto(FixedWindows(self._delay)))

In [43]:
# Running the pipeline on prism
options = PipelineOptions([
    "--streaming",
    "--job_server_timeout=10",
    "--environment_type=LOOPBACK",
    "--runner=PrismRunner", 
])

In [46]:
model_save_path = "gs://" + os.environ["GCS_ROOT"] + '/anomaly/iforest.pkl'

features = ('num_read', 'num_edits', 'num_downloads', 'num_encryption', 
           'access_type_editor', 'access_type_no_access', 'access_type_reader')
detector = PyODFactory.create_detector(model_save_path, features=features)

In [47]:
with beam.Pipeline(options=options) as p:
    _ = (p
         | SequenceToPeriodicStream(df_test_sample, delay=1, repeat=False)
         | beam.Map(convert_data_to_key_value)
         | beam.Map(lambda x: (x[0], beam.Row(**x[1])))
         | AnomalyDetection(detector=detector)
         | beam.Filter(lambda x: x[1].predictions[0].label) # Filter only anomalies
         | beam.Map(lambda x: (x[0], x[1].example.as_dict()))
         | beam.Map(lambda x: fix_access_type(x))
         | beam.ml.inference.RunInference(model_handler=GeminiModelHandler())
         | "JsonifyInput" >> beam.Map(lambda x: json.dumps(x, indent=2))
         | beam.Map(lambda x: [
             display(Markdown(x)), 
             display(Markdown("---"))])
         # | "PublishAlerts" >> beam.io.gcp.pubsub.WriteToPubSub(topic=os.environ["PUBSUB_TOPIC_ALERT"])
    )

{
  "data": {
    "num_read": 375,
    "num_edits": 20,
    "num_downloads": 91,
    "num_encryption": 8,
    "access_type": "reader",
    "user_name": "boothbrian"
  },
  "attack": "Data Exfiltration: The number of downloads (91) is significantly higher than the normal baseline for a reader (around 10). This suggests a large amount of data is being copied or transferred.\n\nUnauthorized Access: The number of reads (375) is unusually high for the typical behavior of the reader.\n"
}

---

{
  "data": {
    "num_read": 363,
    "num_edits": 20,
    "num_downloads": 75,
    "num_encryption": 15,
    "access_type": "reader",
    "user_name": "karen28"
  },
  "attack": "**Data Exfiltration:**\n\nExplanation: The user 'karen28', whose access type is reader, has a significantly high number of downloads (75) compared to the normal baseline for readers (around 10). This suggests a potential data exfiltration attempt.\n\n**Unauthorized Access:**\nExplanation: Number of edits for a reader is anomalous behavior. Readers do not usually have the ability to edit.\n"
}

---

{
  "data": {
    "num_read": 348,
    "num_edits": 17,
    "num_downloads": 77,
    "num_encryption": 9,
    "access_type": "reader",
    "user_name": "donnagoodman"
  },
  "attack": "Data Exfiltration\n\nExplanation:\nThe user 'donnagoodman' has 'reader' access. The number of downloads is 77, which is significantly higher than the typical download volume of 10 for reader profiles. This suggests a potential data exfiltration attempt, where the user is downloading a large amount of data.\n"
}

---

{
  "data": {
    "num_read": 414,
    "num_edits": 24,
    "num_downloads": 84,
    "num_encryption": 8,
    "access_type": "reader",
    "user_name": "porterandrea"
  },
  "attack": "Data Exfiltration: The number of downloads (84) is significantly higher than the reader's normal baseline (around 10), suggesting a potential data exfiltration attempt. Also, the number of reads is also high.\n"
}

---

{
  "data": {
    "num_read": 307,
    "num_edits": 27,
    "num_downloads": 87,
    "num_encryption": 15,
    "access_type": "reader",
    "user_name": "christopher49"
  },
  "attack": "Data Exfiltration: The num_downloads is significantly higher than the typical baseline for a reader (87 vs ~10). This suggests a large volume of data was downloaded, indicating potential data exfiltration.\n"
}

---

{
  "data": {
    "num_read": 290,
    "num_edits": 16,
    "num_downloads": 89,
    "num_encryption": 8,
    "access_type": "reader",
    "user_name": "timothycooke"
  },
  "attack": "Data Exfiltration: The 'reader' account 'timothycooke' has an extremely high number of downloads (89) compared to the normal baseline of approximately 10. This significant deviation suggests a potential data exfiltration attempt.\n\nUnauthorized Access: The 'reader' account 'timothycooke' has a very large number of reads (290) compared to the normal baseline of approximately 100. Although the number of edits (16) and encryption (8) are zero according to the anomaly_reader profile context, the large number of reads indicates that this user might be doing unauthorized stuff.\n"
}

---

{
  "data": {
    "num_read": 346,
    "num_edits": 18,
    "num_downloads": 85,
    "num_encryption": 15,
    "access_type": "reader",
    "user_name": "tina72"
  },
  "attack": "Data Exfiltration: The num_downloads is 85, which is significantly higher than the normal download volume for a reader (typically around 10). This suggests that the user may be attempting to exfiltrate data from the system.\n\nRansomware Attack: Also, a normal reader account should not perform any encryption operation, but the num_encryption here is 15, which strongly indicates that the user is performing malicious encryption operation on the data.\n"
}

---

{
  "data": {
    "num_read": 378,
    "num_edits": 18,
    "num_downloads": 68,
    "num_encryption": 9,
    "access_type": "reader",
    "user_name": "brian81"
  },
  "attack": "Data Exfiltration: The `num_downloads` (68) is significantly higher than the normal baseline for a reader (typically around 10). This suggests a large volume of data is being copied or transferred, indicating potential data exfiltration.\n\nUnauthorized Access: The `num_read` is higher than the normal baseline. Also the `num_edits` and `num_encryption` is not 0.\n"
}

---

KeyboardInterrupt: 

# Streaming with pubsub and dataflow

1. In this particular part, we will create a script that populates the user events into a pubsub topic. 
2. The dataflow job that consume the pubsub event and write the alert to a pubsub topic. 
3. Then we create a streamlit app that shows this alert to a security admin for taking appropriate actions.

In [168]:
# timesfm_backend = "cpu"  # @param

# model = timesfm.TimesFm(
#       hparams=timesfm.TimesFmHparams(
#           backend=timesfm_backend,
#           per_core_batch_size=32,
#           horizon_len=128,
#           num_layers=50,
#           # use_positional_embedding=False,
#           context_len=2048,
#       ),
#       checkpoint=timesfm.TimesFmCheckpoint(
#           huggingface_repo_id="google/timesfm-2.0-500m-jax"),
#   )

# tfm = timesfm.TimesFm(
#     hparams=timesfm.TimesFmHparams(
#         context_len=320, # multiple of 32 (input_path_len) up to 512
#         horizon_len=14, # anything but < context_len recommended
#         input_patch_len=32,
#         output_patch_len=128,
#         num_layers=20,
#         model_dims=1280,
#         backend="cpu", # cpu or gpu or tpu
#     ),
#     checkpoint=timesfm.TimesFmCheckpoint(
#           huggingface_repo_id="google/timesfm-1.0-200m-pytorch"
#     )
# )

# forecast_input = [
#     np.sin(np.linspace(0, 20, 100)),
#     np.sin(np.linspace(0, 20, 200)),
#     np.sin(np.linspace(0, 20, 400)),
# ]
# frequency_input = [0, 1, 2]

# point_forecast, experimental_quantile_forecast = tfm.forecast(
#     forecast_input,
#     freq=frequency_input,
# )
