In [1]:
!pip install river

Collecting river
  Downloading river-0.22.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (9.0 kB)
Collecting pandas<3.0.0,>=2.2.3 (from river)
  Downloading pandas-2.2.3-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (89 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m89.9/89.9 kB[0m [31m883.2 kB/s[0m eta [36m0:00:00[0m
Downloading river-0.22.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.2 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.2/3.2 MB[0m [31m31.9 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading pandas-2.2.3-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (13.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.1/13.1 MB[0m [31m43.1 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: pandas, river
  Attempting uninstall: pandas
    Found existing installation: pandas 2.2.2
    Uninstalling pandas-2.2.2:
      Successfully uni

In [None]:
from copy import deepcopy
from river.datasets import synth

def save_stream_to_arff(stream, relation_name="agrawal_stream", output_file="agrawal_stream.arff"):
    stream = list(stream)  # Unpack the stream to inspect
    if not stream:
        raise ValueError("Stream is empty")

    x_example, _ = stream[0]
    feature_names = list(x_example.keys())
    class_labels = sorted(set(y for _, y in stream))

    with open(output_file, "w") as f:
        # Header
        f.write(f"@RELATION {relation_name}\n\n")

        for feat in feature_names:
            f.write(f"@ATTRIBUTE {feat} NUMERIC\n")  # You can customize this if needed

        f.write(f"@ATTRIBUTE class {{{', '.join(map(str, class_labels))}}}\n\n")
        f.write("@DATA\n")

        # Data rows
        for x, y in stream:
            values = [str(x[feat]) for feat in feature_names]
            values.append(str(y))
            f.write(",".join(values) + "\n")

    print(f"Stream saved to {output_file} in ARFF format.")

# Usage
stream = deepcopy(synth.Agrawal()).take(1000)
save_stream_to_arff(stream)


In [None]:
from river import evaluate, metrics, tree, ensemble, forest
from river.datasets import synth
from copy import deepcopy

# -------------------------------
# Define Stream Generators
# -------------------------------
binary_classification_datasets = [
    synth.Agrawal()
]

all_datasets = {
    ds.__class__.__name__: ds for ds in binary_classification_datasets
}

# -------------------------------
# Evaluation Loop
# -------------------------------
for dataset_name, stream in all_datasets.items():
    print(f"\n🔍 Dataset: {dataset_name}")

    # Re-initialize stream (take 1999 samples)
    dataset = deepcopy(stream).take(1999)

    # Define base model
    base_model = tree.HoeffdingTreeClassifier(grace_period=50, delta=0.01)

    # Define models
    models = {
        "Static SRPClassifier": ensemble.SRPClassifier(
            model=base_model.clone(), n_models=10, seed=42
        ),
        "ARFClassifier": forest.ARFClassifier(
            seed=8, leaf_prediction="mc"
        )
    }

    for model_name, model in models.items():
        dataset = deepcopy(stream).take(1999)  # Reset stream again

        # Define metrics
        acc = metrics.Accuracy()
        prec = metrics.Precision()
        rec = metrics.Recall()
        kappa = metrics.CohenKappa()

        # Track sample count and early debug
        count = 0

        for x, y in dataset:
            y_pred = model.predict_one(x)

            # Only update metrics if we got a prediction
            if y_pred is not None:
                acc.update(y, y_pred)
                prec.update(y, y_pred)
                rec.update(y, y_pred)
                kappa.update(y, y_pred)

            model.learn_one(x, y)
            count += 1

            if count == 10:  # Show early predictions to debug
                print(f"🧪 Sample {count}: True={y}, Pred={y_pred}")

        # Output results
        print(f"{model_name:>25}:")
        for m in [acc, prec, rec, kappa]:
            print(f"  {m.__class__.__name__:>15}: {m.get():.4f}")


In [None]:
# arff_dataset.py

import pathlib
from typing import Any, Dict, Iterator, List, Tuple, Callable, Optional
from scipy.io import arff
from river import datasets

class ARFFDataset(datasets.base.FileDataset):
    """A streaming view over a local ARFF file.

    Parameters
    ----------
    filename
        Name of the .arff file (must be in `directory` or as given by a full path).
    directory
        Directory where to find `filename`. If None, `filename` may be an absolute path.
    target
        Name of the attribute to use as the target.
    task
        One of River’s task constants (e.g. BINARY_CLF, MULTI_CLF, REG).
    """

    def __init__(
        self,
        filename: str,
        directory: str | None = None,
        *,
        target: str,
        task: str,
    ):
        # Resolve the full path
        path = pathlib.Path(directory or "") / filename

        # Load once to get both data array and metadata
        with open(path, "r") as f:
            data_arr, meta = arff.loadarff(f)

        # Build attribute metadata list: (name, type_name, domain)
        attrs_meta: List[Tuple[str, str, Optional[List[str]]]] = []
        for name in meta.names():
            attr = meta._attributes[name]
            if attr.type_name == "nominal":
                domain = [v.decode('utf-8') if isinstance(v, bytes) else str(v)
                          for v in attr.values]
                attrs_meta.append((name, "nominal", domain))
            else:
                attrs_meta.append((name, attr.type_name, None))

        # Identify feature names and target meta
        feature_names = [n for n, _, _ in attrs_meta if n != target]
        target_meta = next((m for m in attrs_meta if m[0] == target), None)
        if target_meta is None:
            raise ValueError(f"Target attribute '{target}' not found in ARFF header.")

        # Converter for target values: if numeric domain strings, cast to int; else keep as str or float
        _, t_type, t_domain = target_meta
        self._target_converter: Callable[[Any], Any]
        if t_type == "nominal" and t_domain is not None:
            # Try integer conversion
            try:
                int_vals = [int(v) for v in t_domain]
                self._target_converter = lambda v: int(v)
            except ValueError:
                # Fallback: use string
                self._target_converter = lambda v: v
        elif t_type == "numeric":
            self._target_converter = lambda v: float(v)
        else:
            self._target_converter = lambda v: v

        # Infer counts
        n_features = len(feature_names)
        n_samples = data_arr.shape[0]
        n_classes = None
        if task in (datasets.base.BINARY_CLF, datasets.base.MULTI_CLF) and t_domain is not None:
            n_classes = len(t_domain)

        # Initialize base FileDataset
        super().__init__(
            filename=filename,
            directory=directory,
            task=task,
            n_features=n_features,
            n_classes=n_classes,
            n_samples=n_samples,
        )

        # Persist for iteration
        self._attrs_meta = attrs_meta
        self._data_arr = data_arr
        self._target = target

    def _iter(self) -> Iterator[Tuple[Dict[str, Any], Any]]:
        for rec in self._data_arr:
            x: Dict[str, Any] = {}
            y: Any = None
            for name, typ, domain in self._attrs_meta:
                val = rec[name]
                # Bytes → str
                if isinstance(val, bytes):
                    val = val.decode("utf-8")
                # Convert target
                if name == self._target:
                    try:
                        y = self._target_converter(val)
                    except Exception:
                        y = val
                else:
                    # Feature: keep numeric types as float, others as str
                    if typ == "numeric":
                        x[name] = float(val)
                    else:
                        x[name] = val
            yield x, y

    def __iter__(self) -> Iterator[Tuple[Dict[str, Any], Any]]:
        # Ensure file exists, then create a fresh iterator
        path = pathlib.Path(self.directory or "") / self.filename
        if not path.exists():
            raise FileNotFoundError(f"ARFF file not found: {path}")
        return self._iter()


In [None]:
from river import evaluate, metrics, tree, ensemble, forest
from river.datasets import synth
from copy import deepcopy

# -------------------------------
# Define Stream Generators
# -------------------------------
binary_classification_datasets = [
    synth.Agrawal(),
    synth.AnomalySine(),
    synth.ConceptDriftStream(),
    synth.Hyperplane(),
    synth.Mixed(),
    synth.SEA(),
    synth.STAGGER(),
    synth.Sine()
]

multi_class_classification_datasets = [
    synth.LED(),
    synth.LEDDrift(),
    synth.RandomRBF(),
    synth.RandomRBFDrift(),
    synth.RandomTree(),
    synth.Waveform()
]

all_datasets = {
    ds.__class__.__name__: ds for ds in binary_classification_datasets + multi_class_classification_datasets
}

# -------------------------------
# Evaluation Loop
# -------------------------------
for dataset_name, stream in all_datasets.items():
    print(f"\n🔍 Dataset: {dataset_name}")

    # Re-initialize stream (take 1999 samples)
    dataset = deepcopy(stream).take(1999)

    # Define base model
    base_model = tree.HoeffdingTreeClassifier(grace_period=50, delta=0.01)

    # Define models
    models = {
        "Static SRPClassifier": ensemble.SRPClassifier(
            model=base_model.clone(), n_models=10, seed=42
        ),
        "ARFClassifier": forest.ARFClassifier(
            seed=8, leaf_prediction="mc"
        )
    }

    # Evaluate each model
    for model_name, model in models.items():
        dataset = deepcopy(stream).take(1999)  # Reset stream again

        # Define metrics individually
        acc = metrics.Accuracy()
        prec = metrics.Precision()
        rec = metrics.Recall()
        kappa = metrics.CohenKappa()

        # Combine for evaluation
        metric_set = metrics.ClassificationReport([acc, prec, rec, kappa])

        # Perform progressive validation
        result = evaluate.progressive_val_score(
            dataset=dataset,
            model=model,
            metric=metric_set,
            print_every=0
        )

        # Output each metric manually
        print(f"{model_name:>25}:")
        for m in [acc, prec, rec, kappa]:
            print(f"  {m.__class__.__name__:>15}: {m.get():.4f}")
