# Data Streams

A data stream is a façade bringing together an input sequence and an output processing
pipe which are not directly connected. Instead, you get samples from the input and pass
them to the output only when ready. Though unusual, this pattern becomes useful, eg,
when the user has to interact asyncronously with the dataset.

First, we need a toy dataset to play with:

In [None]:
from pipelime.sequences import SamplesSequence
import shutil

shutil.rmtree("toy_dataset", ignore_errors=True)
for _ in SamplesSequence.toy_dataset(10).to_underfolder("toy_dataset"):  # type: ignore
    pass

Now create a DataStream object and get the first sample (NB: **cache is disabled** on
these items, so they are always up-to-date):

In [None]:
from pipelime.sequences import SamplesSequence, DataStream
from pipelime.items import JsonMetadataItem

# getting data
# NB: cache is disabled on these items
data_stream = DataStream.read_write_underfolder("toy_dataset")
s0 = data_stream[0]  # or data_stream.get_input(0)
print("original label:", s0["label"]())

If we change a value on the sample, this will not be automatically propagated to the
original dataset (of course!):

In [None]:
import numpy as np

new_s0 = s0.set_value("label", s0["label"]() + np.random.normal(0, 0.1))  # type: ignore
print("noisy label:", new_s0["label"]())
print("original label:", s0["label"]())
print(
    'new_s0["label"]() != s0["label"]():',
    not np.array_equal(new_s0["label"](), s0["label"]())  # type: ignore
)

We can also add a new item to the sample and, eventually, write back the changes:

In [None]:
new_s0 = new_s0.set_item("new_meta", JsonMetadataItem([np.random.randint(100, 110)]))
print("new metadata:", new_s0["new_meta"]())

print("now saving...")
data_stream.set_output(0, new_s0, ["label", "new_meta"])  # just saving changed keys
print("now look at ./toy_dataset/data/0_new_meta.json")

Now the original sample `s0` will be updated as well, except for the new item:

In [None]:
print("original sample label:", s0["label"]())
print(
    'new_s0["label"]() == s0["label"]():',
    np.array_equal(new_s0["label"](), s0["label"]())  # type: ignore
)
print('"new_meta" not in s0:', "new_meta" not in s0)

To get the new item, just get again the sample:

In [None]:
print("now getting the sample again...")
s0 = data_stream.get_input(0)
print('"new_meta" in s0:', "new_meta" in s0)
print(
    'new_s0["new_meta"]() == s0["new_meta"]():', new_s0["new_meta"]() == s0["new_meta"]()
)

The same applies if we add a new sample at the end of the dataset (NB: the zfilling is
fixed, so that old samples can still be read and written):

In [None]:
from pipelime.items import NumpyItem

print("adding new_s0 at the end...")
data_stream.set_output(10, new_s0)
last_s0 = data_stream.get_input(10)
print("new_s0 keys:", list(last_s0.keys()))
print("last_s0 keys:", list(last_s0.keys()))
print(
    "list(last_s0.keys()) == list(new_s0.keys()):",
    list(last_s0.keys()) == list(new_s0.keys()),
)
print(
    "all items equal: ",
    all(
        np.array_equal(i0(), i1(), equal_nan=True)  # type: ignore
        if isinstance(i0, NumpyItem)
        else i0() == i1()
        for i0, i1 in zip(last_s0.values(), new_s0.values())
    )
)
print("now look at ./toy_dataset/data/10*")

Finally, you can always go through all the items as usual:

In [None]:
for i, s in enumerate(data_stream):
    print(f"#{i}:", list(s.keys()))