In [None]:
from deephaven_server import Server
s = Server(port=10000, jvm_args=["-Xmx40g", "-Dprocess.info.system-info.enabled=false", "-DAuthHandlers=io.deephaven.auth.AnonymousAuthenticationHandler"])
s.start()

### Simulate streaming Amazon data

First, we want to stream the Amazon review dataset in real-time. The Amazon dataset is static, so we will use [`TableReplayer`](https://deephaven.io/core/docs/how-to-guides/replay-data/) to simulate a real-time review stream. If you have a real-time review stream in a format like Kafka, you can directly use the stream without needing to simulate it.

Start by importing the necessary libraries.

In [None]:
from deephaven import parquet, dtypes
from deephaven.table import TableDefinition
from deephaven.replay import TableReplayer
from deephaven.time import to_j_instant

from deephaven_ipywidgets import DeephavenWidget

Now, read the Amazon reviews into a Parquet table with [Deephaven's Parquet module](https://deephaven.io/core/docs/how-to-guides/data-import-export/parquet-import/).

In [None]:
# create table definition for review datasets
reviews_def = TableDefinition({
    "rating": dtypes.double,
    "title": dtypes.string,
    "text": dtypes.string,
    "parent_asin": dtypes.string,
    "user_id": dtypes.string,
    "timestamp": dtypes.long
})

# read reviews into a single table
reviews = parquet.read(
    "../amazon-data/reviews/",
    file_layout=parquet.ParquetFileLayout.FLAT_PARTITIONED,
    table_definition=reviews_def
)

# convert timestamp to date-time timestamps
reviews = (
    reviews
    .update("timestamp = epochMillisToInstant(timestamp)")
    .sort("timestamp")
)

In [None]:
display(DeephavenWidget(reviews))

The `reviews` table has 25.6 million observations spanning 9 months. Streaming through all of those observations in real time would take... 9 months. Instead, we randomly sample 1 in 10,000 reviews and replay that data at 10,000x speed. This emulates Amazon's real-world review frequency and lets us visualize long-term trends in just a few minutes.

In [None]:
# minimum time from filtered table - faster to use UI than to compute with a query
min_time = to_j_instant("2023-01-01T00:00:00.000Z")

# create replay start time and end time
replay_start_time = to_j_instant("2024-01-01T00:00:00Z")
replay_end_time = to_j_instant("2024-01-01T00:36:00Z")

# replay data at 10,000x speed
data_speed = 10_000

# randomly sample data and create a timestamp that increments at 10,000x original speed
reviews = (
    reviews
    .where("random() < 1 / data_speed")
    .update([
        "dist = (long)floor((timestamp - min_time) / data_speed)",
        "replay_timestamp = replay_start_time + dist"
    ])
    .drop_columns("dist")
)

Now, replay the data with Deephaven's [`TableReplayer`](https://deephaven.io/core/docs/how-to-guides/replay-data/).

In [None]:
# create table replayer and start replay
reviews_replayer = TableReplayer(replay_start_time, replay_end_time)
reviews_ticking = reviews_replayer.add_table(reviews, "replay_timestamp").drop_columns("replay_timestamp")
reviews_replayer.start()

In [None]:
display(DeephavenWidget(reviews_ticking))

### Real-time bot detection

With data flowing in simulation, it's possible to focus on the real-time detection of AI-bots. It's easier than you'd expect.

First, load the necessary libraries.

In [None]:
import concurrent.futures
import logging
import torch
import numpy as np
from transformers import BertTokenizer, BertForSequenceClassification
from deephaven.table_listener import listen, TableUpdate
from deephaven.stream.table_publisher import table_publisher
from deephaven.stream import blink_to_append_only
from deephaven import new_table
import deephaven.column as dhcol
import deephaven.dtypes as dtypes

Next, import the trained model's parameters into a new model object and load the tokenizer needed to transform the input data.

In [None]:
# suppress transformer parameter name warnings
loggers = [logging.getLogger(name) for name in logging.root.manager.loggerDict]
for logger in loggers:
    if "transformers" in logger.name.lower():
        logger.setLevel(logging.ERROR)

# instantiate model and load parameters
model = BertForSequenceClassification.from_pretrained('bert-base-uncased', num_labels=2)
model.load_state_dict(torch.load("../detector/detector.pt", weights_only=False))

# get device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# instantiate tokenizer
tokenizer = BertTokenizer.from_pretrained(
    'bert-base-uncased',
    do_lower_case=True,
    padding=True,
    truncation=True,
    max_length=128,
    clean_up_tokenization_spaces=True
)

Now, we're going to walk through a real-time AI workflow step-by-step. The workflow looks like this:

1. Create an object called a [`TablePublisher`](https://deephaven.io/core/docs/how-to-guides/table-publisher/) to publish new data to a ticking table. This table, `preds_blink`, will contain the new predictions.
2. Define a function to perform inference and publish the results to `preds_blink`.
3. Create a [`TableListener`](https://deephaven.io/core/docs/how-to-guides/table-listeners-python/) that will listen to the ticking data source and call the inference/publisher function as new data rolls in.
4. Tie it all together by listening to the ticking source, performing inference on new inputs, and publishing the results to a new table.

First, create the [`TablePublisher`](https://deephaven.io/core/docs/how-to-guides/table-publisher/) using the [`table_publisher`](https://deephaven.io/core/docs/reference/table-operations/create/TablePublisher/) function. This function returns an empty table to capture the published data, which we'll call `preds_blink`, and an object that publishes data to that table, which we'll call `preds_publish`. `preds_blink` is a [blink table](https://deephaven.io/core/docs/conceptual/table-types/#blink), meaning that it will only hold the most recent data from a given update cycle. Check out the [guide on table publishers](https://deephaven.io/core/docs/how-to-guides/table-publisher/) to learn more.

In [None]:
# create table publisher, and blink table that data will be published to
preds_blink, preds_publish = table_publisher(
    "DetectorOutput", {
        "rating": dtypes.double,
        "parent_asin": dtypes.string,
        "user_id": dtypes.string,
        "timestamp": dtypes.Instant,
        "gen_prob": dtypes.float32
    },
)

Next, define a function to perform the inference and publish the results to a new table using the table publisher defined previously. This function will be called every time more data rolls in, enabling Deephaven to perform real-time inference on only the most recent data. For simplicity, we've broken this into two functions: one to actually perform the inference on a given set of inputs, and one to call that function and publish the results to a new table.

In [None]:
# function that determines if a review was generated by a bot
def detect_bot(text):
    # tokenize text
    tokenized_text = tokenizer(text.tolist(), padding=True, truncation=True, return_tensors='pt')

    # move input tensor to the same device as the model
    tokenized_text = {key: value.to(device) for key, value in tokenized_text.items()}

    # generate predictions using trained model
    with torch.no_grad():
        outputs = model(**tokenized_text)
        logits = outputs.logits

    # the first column of logits corresponds to the negative class (non-AI-generated)
    # and the second column corresponds to the positive class (AI-generated)
    predictions = torch.softmax(logits, dim=1)[:, 1].cpu().numpy()

    return predictions

# function to perform inference and publish the results to preds_blink
def compute_and_publish_inference(inputs, features):

    # get outputs from AI model
    outputs = detect_bot(inputs)

    # create new table with relevant features and outputs
    output_table = new_table(
        [
            dhcol.double_col("rating", features["rating"]),
            dhcol.string_col("parent_asin", features["parent_asin"]),
            dhcol.string_col("user_id", features["user_id"]),
            dhcol.datetime_col("timestamp", features["timestamp"]),
            dhcol.float_col("gen_prob", outputs)
        ]
    )

    # publish inference to preds_blink
    preds_publish.add(output_table)

    return None

Next, we create a `TableListener` that listens to the ticking source and calls `compute_and_publish` on new data. To do this, define a function called `on_update` that takes two arguments, `update` and `is_replay`. Extract the added and modified data from the `update` argument using [`update.added()`](https://deephaven.io/core/pydoc/code/deephaven.table_listener.html#deephaven.table_listener.TableUpdate.added) and [`update.modified()`](https://deephaven.io/core/pydoc/code/deephaven.table_listener.html#deephaven.table_listener.TableUpdate.modified). See the [guide on table listeners](https://deephaven.io/core/docs/how-to-guides/table-listeners-python/) to learn more.

Finally, we know that calling `compute_and_publish` will be expensive - neural network inference is not cheap. Instead of delaying the main thread with these expensive calculations, offload them to a separate thread using a [`ThreadPoolExecutor`](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor). This will collect the calculations to be done into a queue, and execute them as resources are available.

In [None]:
# use a ThreadPoolExecutor to multi-thread inference calculations
executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)

# function that the table listener will call as new reviews roll in
def on_update(update: TableUpdate, is_replay: bool) -> None:
    input_col = "text"
    feature_cols = ["rating", "parent_asin", "user_id", "timestamp"]

    # get table enries that were added or modified
    adds = update.added(cols=[input_col, *feature_cols])
    modifies = update.modified(cols=[input_col, *feature_cols])

    # collect data from this cycle into objects to feed to inference and output
    if adds and modifies:
        inputs = np.hstack([adds[input_col], modifies[input_col]])
        features = {feature_col: np.hstack([adds[feature_col], modifies[feature_col]]) for feature_col in feature_cols}
    elif adds:
        inputs = adds[input_col]
        features = {feature_col: adds[feature_col] for feature_col in feature_cols}
    elif modifies:
        inputs = modifies[input_col]
        features = {feature_col: modifies[feature_col] for feature_col in feature_cols}
    else:
        return

    # submit inference work to ThreadPoolExecutor
    executor.submit(compute_and_publish_inference, inputs, features)

Now, tie it all together. The [`listen`](https://deephaven.io/core/pydoc/code/deephaven.table_listener.html#deephaven.table_listener.listen) function below calls `on_update` every time a new review ticks into `reviews_ticking`. This runs the inference calculation on the new data and stors the result in `preds_blink`. Finally, [`blink_to_append_only`](https://deephaven.io/core/docs/reference/table-operations/create/blink-to-append-only/) converts `preds_blink` to an append-only table that stores the full history of the reviews and predictions.

In [None]:
# listen to ticking source and publish inference
handle = listen(reviews_ticking, on_update)
# convert preds_blink to a full-history table
preds = blink_to_append_only(preds_blink)

In [None]:
display(DeephavenWidget(preds))

The AI model output is captured in `preds` _in real time_ as data rolls into `reviews_ticking`.