# DLinear model validation using offline batch inference

<div align="left">
<a target="_blank" href="https://console.anyscale.com/"><img src="https://img.shields.io/badge/🚀 Run_on-Anyscale-9hf"></a>&nbsp;
<a href="https://github.com/anyscale/e2e-timeseries" role="button"><img src="https://img.shields.io/static/v1?label=&amp;message=View%20On%20GitHub&amp;color=586069&amp;logo=github&amp;labelColor=2f363d"></a>&nbsp;
</div>

This tutorial demonstrates how to perform batch inference using the DLinear model and Ray Data.
We load the model checkpoint, prepare the test data, run inference in batches, and evaluate the performance.

Note that this notebook requires `notebook/01-Distributed_Training.ipynb` to run in order to fetch pre-trained model artifacts.

<img src="https://raw.githubusercontent.com/anyscale/e2e-timeseries/refs/heads/main/images/batch_inference.png" width=800>


The above figure illustrates how different blocks of data can be processed concurrently at various stages of the pipeline. This parallel execution maximizes resource utilization and throughput.

Note that this diagram is a simplification for various reasons:

* Only one worker is processing each data pipeline stage
* Backpressure mechanisms may throttle upstream operators to prevent overwhelming downstream stages
* Dynamic repartitioning often occurs as data moves through the pipeline, changing block counts and sizes
* Available resources change as the cluster autoscales
* System failures may disrupt the clean sequential flow shown in the diagram


<div class="alert alert-block alert"> <b> Ray Data Streaming Execution</b> 

❌ **Traditional batch execution** (non-streaming like Spark without pipelining, Sagemaker Batch):
- Reads the entire dataset into memory or a persistent intermediate format
- Only then starts applying transformations, such as `.map`, `.filter`, etc.
- Higher memory pressure and startup latency

✅ **Streaming execution** (Ray Data):
- Starts processing blocks as they load, without waiting for the entire dataset
- Reduces memory footprint (preventing OOMs) and speeds up time to first output
- Increases resource utilization by reducing idle time
- Enables online-style inference pipelines with minimal latency

<img src="https://raw.githubusercontent.com/anyscale/e2e-timeseries/refs/heads/main/images/streaming.gif" width=700>

**Note**: Ray Data isn't a real-time stream processing engine like Flink or Kafka Streams. Instead, it's batch processing with streaming execution, which is especially useful for iterative ML workloads, ETL pipelines, and preprocessing before training or inference. Ray typically has a [**2-17x throughput improvement**](https://www.anyscale.com/blog/offline-batch-inference-comparing-ray-apache-spark-and-sagemaker#-results-of-throughput-from-experiments) over solutions like Spark and Sagemaker Batch Transform.



Let's start by setting up the environment and imports

In [None]:
import os

import numpy as np
import ray
import torch

os.environ["RAY_TRAIN_V2_ENABLED"] = "1"

from data_factory import data_provider
from metrics import metric
from model import DLinear

Now, let's set up the DLinear model configiration as well as job configuration.


In [None]:
config = {
    "checkpoint_path": None,  # FIXME: REQUIRED: Update this path
    "num_data_workers": 1,
    "features": "S",
    "target": "OT",
    "smoke_test": False,
    "seq_len": 96,
    "label_len": 48,
    "pred_len": 96,
    "individual": False,
    "batch_size": 64,
    "num_predictor_replicas": 4,
}


def _process_config(config: dict) -> dict:
    """Helper function to process and update configuration."""
    # Configure encoder input size based on task type.
    if config["features"] == "M" or config["features"] == "MS":
        config["enc_in"] = 7  # ETTh1 has 7 features when multi-dimensional prediction is enabled
    else:
        config["enc_in"] = 1

    # Ensure paths are absolute.
    config["checkpoint_path"] = os.path.abspath(config["checkpoint_path"])

    config["num_gpus_per_worker"] = 1.0

    config["train_only"] = False  # Load test subset
    return config


# Set derived values.
config = _process_config(config)

# Data ingest

First, let's load the test dataset as a Ray Data Dataset. We will use `.show(1)` to trigger the execution for a single row,
since Ray Data is lazily evaluated.

In [None]:
ray.init(ignore_reinit_error=True)

print("Loading test data...")
ds = data_provider(config, flag="test")
ds.show(1)

This cell defines the Predictor class. It loads the trained DLinear model from a checkpoint and
processes input batches to produce predictions. The __call__ method is used to perform inference
on a given batch of numpy arrays.

Ray Data's actor-based processing enables you to only load the model weights and transfer it to GPU only once and reuse it across batches.

In [None]:
class Predictor:
    """Actor class for performing inference with the DLinear model."""

    def __init__(self, checkpoint_path: str, config: dict):
        self.config = config
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

        # Load model from checkpoint.
        self.model = DLinear(config).float()
        checkpoint = torch.load(checkpoint_path, map_location=self.device)
        self.model.load_state_dict(checkpoint["model_state_dict"])
        self.model.to(self.device)
        self.model.eval()

    def __call__(self, batch: dict[str, np.ndarray]) -> dict:
        """Process a batch of data for inference (numpy batch format)."""
        # Convert input batch to tensor.
        batch_x = torch.from_numpy(batch["x"]).float().to(self.device)

        with torch.no_grad():
            outputs = self.model(batch_x)  # Shape (N, pred_len, features_out)

        # Determine feature dimension based on config.
        f_dim = -1 if self.config["features"] == "MS" else 0
        outputs = outputs[:, -self.config["pred_len"] :, f_dim:]
        outputs_np = outputs.cpu().numpy()

        # Extract the target part from the batch.
        batch_y = batch["y"]
        batch_y_target = batch_y[:, -self.config["pred_len"] :]

        return {"predictions": outputs_np, "targets": batch_y_target}

In [None]:
ds = ds.map_batches(
    Predictor,
    fn_constructor_kwargs={"checkpoint_path": config["checkpoint_path"], "config": config},
    batch_size=config["batch_size"],
    concurrency=config["num_predictor_replicas"],
    num_gpus=config["num_gpus_per_worker"],
    batch_format="numpy",
)

Next, we do some minor post-processing to get the results in the dimenions we want.

In [None]:
def postprocess_items(item: dict) -> dict:
    # Squeeze singleton dimensions for predictions and targets if necessary 
    if item["predictions"].shape[-1] == 1:
        item["predictions"] = item["predictions"].squeeze(-1)
    if item["targets"].shape[-1] == 1:
        item["targets"] = item["targets"].squeeze(-1)
    return item


ds = ds.map(postprocess_items)

Finally, we are ready to execute all of these lazy steps and materialize it into memory using `take_all()`:

In [None]:
# Trigger the lazy execution of the entire Ray pipeline.
all_results = ds.take_all()

Now that we have the results in memory, we can calculate some validation metrics for our trained DLinear model.

In [None]:
# Concatenate predictions and targets from all batches.
all_predictions = np.concatenate([item["predictions"] for item in all_results], axis=0)
all_targets = np.concatenate([item["targets"] for item in all_results], axis=0)

# Compute evaluation metrics.
mae, mse, rmse, mape, mspe, rse = metric(all_predictions, all_targets)

print("\n--- Test Results ---")
print(f"MSE: {mse:.3f}")
print(f"MAE: {mae:.3f}")
print(f"RMSE: {rmse:.3f}")
print(f"MAPE: {mape:.3f}")
print(f"MSPE: {mspe:.3f}")
print(f"RSE: {rse:.3f}")

print("\nOffline inference finished!")