# Data Streams

Data streams allow client applications to publish named streams of arbitrary structured data which are added to recordings and are available for live visualisation.

Each entry in a single data stream is is required to have a unique, always ascending timestamp. The entry data itself can be a Python type such as an `dict`, `tuple`, `list`, `int`, `float`, or a numpy array. Data stream entries are useful for storing changes to data over time - for example you might store the (x, y) position of a ball as it moves within a pong simulation.

Data streams also support named attributes. Updates to an attribute replace earlier values for that attribute and are best used for meta data such as the current and eventually final score of a gameplay simulation.

In [1]:
import cl
import numpy

with cl.open() as neurons:
    # Create a named data stream - by default, it will be added to any active or future recordings.
    data_stream = \
        neurons.create_data_stream(
            name='example_data_stream',
            attributes={ 'score': 0, 'another_attrbute': [0, 1, 2, 3] })
    
    # Start a recording
    recording = neurons.record(stop_after_seconds=1)
    
    timestamp = neurons.timestamp()
    
    # Add some data stream entries with unique, ascending timestamps:
    data_stream.append(timestamp + 0, { 'arbitrary': 'data' })
    data_stream.append(timestamp + 1, ['of', 'arbitrary', 'size'])
    data_stream.append(timestamp + 2, 'and type.')
    data_stream.append(timestamp + 3, numpy.array([2**64 - 1, 2**64 - 2, 2**64 - 3], dtype=numpy.uint64))
    
    # Update a single attribute
    data_stream.set_attribute('score', 1)
    
    # Update multiple attributes at once
    data_stream.update_attributes({ 'score': 2, 'new_attribute': 9.9 })
    
    recording.wait_until_stopped()
    
recording_view = recording.open()

print("Top level RecordingView:\n")
print(recording_view, '\n')

print("Data Streams in recording:\n")
print(recording_view.data_streams, '\n')

print("Example Data Stream attributes:\n")
print(recording_view.data_streams.example_data_stream.attributes, '\n')

Top level RecordingView:

RecordingView of file: /data/recordings/2024-12-19_15-20-34.893+10-00_recording.h5
    file:         Direct access to the underlying PyTables object
    attributes:   A view of the recording attributes
    spikes:       Access spikes stored in the recording
    stims:        Access stims stored in the recording
    samples:      Access raw frames of samples stored in the recording
    data_streams: A colletion of recorded data streams 

Data Streams in recording:

Data Streams:
    example_data_stream 

Example Data Stream attributes:

/data_stream/example_data_stream._v_attrs (AttributeSet), 5 attributes:
   [CLASS := 'GROUP',
    TITLE := '',
    VERSION := '1.0',
    application := {'score': 2, 'another_attrbute': [0, 1, 2, 3], 'new_attribute': 9.9},
    name := 'example_data_stream'] 



## DataStreamView

The HD5F format does not natively provide for an inituitive way to efficiently store an indexed list of items where any item can be a different size. For this reason, data stream data is stored end-to-end in a simple array of bytes, and another HDF5 dataset is used to associate timestamps with indexes into this data for retrieval.

We provide a DataStreamView class which makes working with data streams comfortable:

In [2]:
data_stream = recording_view.data_streams.example_data_stream

print(f"There are {len(data_stream)} entries in example_data_stream.")

There are 4 entries in example_data_stream.


Entries in the data stream can be iterated over in a few ways.

Iterating over all (timestamp, data):

In [3]:
for timestamp, data in data_stream.items():
    print('\n', timestamp, data)


 2664 {'arbitrary': 'data'}

 2665 ['of', 'arbitrary', 'size']

 2666 and type.

 2667 [18446744073709551615 18446744073709551614 18446744073709551613]


Iterating over timestamps:

In [4]:
for timestamp in data_stream.keys():
    print('\n', timestamp)


 2664

 2665

 2666

 2667


Iterating over data directly:

In [5]:
for data in data_stream.values():
    print('\n', data)


 {'arbitrary': 'data'}

 ['of', 'arbitrary', 'size']

 and type.

 [18446744073709551615 18446744073709551614 18446744073709551613]


Data can also be retrieved by using a timestamp as an index:

In [6]:
for timestamp in data_stream.keys():
    data = data_stream[timestamp]
    print('\n', timestamp, data)


 2664 {'arbitrary': 'data'}

 2665 ['of', 'arbitrary', 'size']

 2666 and type.

 2667 [18446744073709551615 18446744073709551614 18446744073709551613]


You can also retrieve entries for a range of timestamps.

**Note:** an entry must have a timestamp less than the end timestamp to be returned:

In [7]:
# TODO: SUPPORT DIRECT INDEX WITHIN KEYS TO AVOID THIS HACK
timestamps       = [timestamp for timestamp in data_stream.keys()]
second_timestamp = timestamps[1]
last_timestamp   = timestamps[-1]

print('Range ending at last timestamp:')

for timestamp, data in data_stream.items_for_range(second_timestamp, last_timestamp):
    print('\n', timestamp, data)
    
print('\n\nRange ending at last timestamp plus one:')

for timestamp, data in data_stream.items_for_range(second_timestamp, last_timestamp + 1):
    print('\n', timestamp, data)

Range ending at last timestamp:

 2665 ['of', 'arbitrary', 'size']

 2666 and type.


Range ending at last timestamp plus one:

 2665 ['of', 'arbitrary', 'size']

 2666 and type.

 2667 [18446744073709551615 18446744073709551614 18446744073709551613]


Similarly, you can retrieve just the timestamps or just the data within a range of timestamps:

In [8]:
for timestamp in data_stream.keys_for_range(second_timestamp, last_timestamp + 1):
    print('\n', timestamp)


 2665

 2666

 2667


In [9]:
for data in data_stream.values_for_range(second_timestamp, last_timestamp + 1):
    print('\n', data)


 ['of', 'arbitrary', 'size']

 and type.

 [18446744073709551615 18446744073709551614 18446744073709551613]


## Data Stream Attributes

Just as the root group can have attributes, each data stream has its own attributes. Any custom attributes added to the data stream by the client application are added in the `application` dict attribute:

In [10]:
print(data_stream.attributes)
print()
print(f"The name of the data stream is '{data_stream.attributes.name}'.")
print(f"The value of the data stream attribute 'score' is '{data_stream.attributes.application['score']}'.")

/data_stream/example_data_stream._v_attrs (AttributeSet), 5 attributes:
   [CLASS := 'GROUP',
    TITLE := '',
    VERSION := '1.0',
    application := {'score': 2, 'another_attrbute': [0, 1, 2, 3], 'new_attribute': 9.9},
    name := 'example_data_stream']

The name of the data stream is 'example_data_stream'.
The value of the data stream attribute 'score' is '2'.


## Working Without DataStreamView

If you prefer, you can work directly with data stream data within the HDF5 file. Each data stream has an `index` table dataset that contains rows with a `timestamp`, `start_index`, and `end_index`:

In [11]:
raw_data_stream = recording_view.file.root.data_stream.example_data_stream
for row in raw_data_stream.index:
    timestamp, start_index, end_index = row['timestamp'], row['start_index'], row['end_index']
    print('\n', timestamp, start_index, end_index)


 2664 0 16

 2665 16 35

 2666 35 45

 2667 45 110


For efficiency, `row[0]`, `row[1]`, and `row[2]` can be used instead of `row['timestamp']`, `row['start_index']`, `row['end_index']`.

The `start_index` and `end_index` values specify which range of bytes from the `data` dataset are needed to access the data for `timestamp`:

In [12]:
for row in raw_data_stream.index:
    timestamp, start_index, end_index = row[0], row[1], row[2]
    data_bytes = raw_data_stream.data[start_index:end_index]
    print('\n', timestamp, data_bytes)


 2664 [129 169  97 114  98 105 116 114  97 114 121 164 100  97 116  97]

 2665 [147 162 111 102 169  97 114  98 105 116 114  97 114 121 164 115 105 122
 101]

 2666 [169  97 110 100  32 116 121 112 101  46]

 2667 [133 196   2 110 100 195 196   4 116 121 112 101 163  60 117  56 196   4
 107 105 110 100 196   0 196   5 115 104  97 112 101 145   3 196   4 100
  97 116  97 196  24 255 255 255 255 255 255 255 255 254 255 255 255 255
 255 255 255 253 255 255 255 255 255 255 255]


The data is stored in msgpack serialised form, so we need to use msgpack to deserialise the data. **Note:** If numpy arrays were directly stored in the data stream, you'll want to use use msgpack_numpy to assist the deserialisation:

In [13]:
import msgpack
import msgpack_numpy

print("Without msgpack_numpy:")

for row in raw_data_stream.index:
    timestamp, start_index, end_index = row[0], row[1], row[2]
    data_bytes = raw_data_stream.data[start_index:end_index]
    data = msgpack.unpackb(data_bytes)
    print('\n', timestamp, data, f"\n  ({type(data)}")
    
print("\nWith msgpack_numpy:")

for row in raw_data_stream.index:
    timestamp, start_index, end_index = row[0], row[1], row[2]
    data_bytes = raw_data_stream.data[start_index:end_index]
    data = msgpack.unpackb(data_bytes, object_hook=msgpack_numpy.decode)
    print('\n', timestamp, data, f"\n  ({type(data)}")

Without msgpack_numpy:

 2664 {'arbitrary': 'data'} 
  (<class 'dict'>

 2665 ['of', 'arbitrary', 'size'] 
  (<class 'list'>

 2666 and type. 
  (<class 'str'>

 2667 {b'nd': True, b'type': '<u8', b'kind': b'', b'shape': [3], b'data': b'\xff\xff\xff\xff\xff\xff\xff\xff\xfe\xff\xff\xff\xff\xff\xff\xff\xfd\xff\xff\xff\xff\xff\xff\xff'} 
  (<class 'dict'>

With msgpack_numpy:

 2664 {'arbitrary': 'data'} 
  (<class 'dict'>

 2665 ['of', 'arbitrary', 'size'] 
  (<class 'list'>

 2666 and type. 
  (<class 'str'>

 2667 [18446744073709551615 18446744073709551614 18446744073709551613] 
  (<class 'numpy.ndarray'>


## Next

[Real-Time Visualisation](CL-04.%20Real-Time%20Visualisation.ipynb)