# Streaming

It's time to get hands-on with LSL. In this part, we'll be creating an LSL stream and capturing it with a server.

## Stream Outlet

Let's create a stream outlet that can be used to push single-channel data to the LSL network. The outlet needs to be constructed with information detailing:

- The name of the stream
- The type of the stream
- The number of channels the stream uses
- The sampling rate the stream is updated at
- The type of value the stream outputs
- A (should-be) unique identifier for the stream

In [1]:
from pylsl import StreamInfo, StreamOutlet

def create_outlet():
    name = 'workshop_outlet'                                                                # The name of the outlet
    type = 'single_stream'                                                                  # Stream type. This can be anything, but should probably be descriptive of what data is going through (e.g. EDA, EEG, Markers)
    n_of_channels = 1                                                                       # Single-channel data
    samping_rate = 1                                                                        # 1Hz = 1 sample per second
    value_type = 'float32'                                                                  # The type that all stream values should be.
    outlet_id = 'workshop_outlet_1234'                                                      # An unique identifier used to resolve streams when pulling data from the LSL network

    stream = StreamInfo(name, type, n_of_channels, samping_rate, value_type, outlet_id)     # Establish the details of the stream
    outlet = StreamOutlet(stream)                                                           # Create the outlet

    return outlet

Next, let's define a function that generates random floating point numbers with max. 2 decimals and pushes them through the outlet. Note that since outlets can be used to push samples of various numbers of channels, we need to wrap our sample in an array `[]` when sending it, even if we defined the stream to be one channel earlier.



In [2]:
import time
import random

def output_random_sample(target_outlet):
    random_sample = round(random.random(), 2)                                               # Generate a random floating point value between 0 and 1 and round it to 2 decimals.
    print('Outputting random sample:', random_sample)                                       # Print out the sample we just generated

    target_outlet.push_sample([random_sample])                                              # Push the sample through the outlet

Finally, let's try to run the function. You should see a new printout every second. 

Note three things about the code below:
- To keep the notebook clean, we are using the `clear_output` function from IPython to remove previous outputs.
- We defined our sampling rate to be 1Hz, so we're using Python's `time.sleep` to artificially delay each loop execution by 1 second
- As it has been hard-coded to run indefinitely, you can stop the output by pressing the stop button next to the cell.

In [4]:
from IPython.display import clear_output

outlet = create_outlet()

while True:
    clear_output()                                                                          # Clear the output below the cell so that only one value is displayed at atime

    output_random_sample(outlet)
    time.sleep(1)                                                                           # Wait for 1 second until the next loop

Outputting random sample: 0.98


KeyboardInterrupt: 

## Stream Inlet

Now we know how to push data to the LSL network, but how do we make use of it? 

As mentioned in part 1, we now need to create an inlet. To create an inlet, we need to resolve available streams in the network. 

> You could resolve all streams at once if you wanted to, but here we'll filter out everything that doesn't possess our unique stream identifier. Take a look at [the liblsl documentation for the `stream_info` class](https://labstreaminglayer.readthedocs.io/projects/liblsl/ref/streaminfo.html?highlight=streaminfo) to see the parameter names that can be used to filter streams (here, `source_id` refers to the value we gave our `outlet_id` variable above).

In [3]:
from pylsl import StreamInlet, resolve_stream

def create_inlet():
    stream = resolve_stream('source_id', 'workshop_outlet_1234')[0]                     # Find streams with our desired ID and grab the first one (the function returns an array)
    inlet = StreamInlet(stream)                                                         # Establish an inlet that data can be pulled through
    return inlet

Next, let's write a function that pulls a sample from an inlet. Notable here is that when pulling the sample we can also access its timestamp. The ease of this (no configuration required on our part) is one of the core features that make LSL such an attractive solution for research.

The `pull_sample` function returns a tuple, which we then destructure (unpack) to two variables: `samples`, which is an array containing our sample, and `ts`, which is the timestamp for when the sample was pushed through the outlet.

In [4]:
def pull_random_sample(source_inlet):
    samples, ts = source_inlet.pull_sample()
    print('Received sample:', samples[0], 'with timestamp', ts)

Now, let's demonstrate how we can combine our outlet and inlet and run them in parallel to send and capture data in real time. 

In a realistic environment, you would probably have two separate scripts: one for gathering samples and pushing them through an outlet, and another for pulling samples from the LSL network and logging them. Due to the limitations of the notebook format, we cannot run two cells at once. That's why in the script below, we use `Thread` to run our random sampler & outlet functionality asynchronously in a separate CPU thread from the one running our inlet.

This time, the script runs for 20 seconds.

In [5]:
from threading import Thread
from IPython.display import clear_output

clear_output()

run_time = 20                                                       # Samples will be generated and collected for 20 seconds

# Demo: Run the sampler & outlet in a separate CPU thread to circumvent Jupyter limitations
# You probably don't need/want to do this in any real-world application (not threading, just running the outlet & inlet in a single script)

def run_sampling(run_time):                              
    outlet = create_outlet()

    while run_time > 0:
        output_random_sample(outlet)
        time.sleep(1)                                               # Artifically sleep for 1s between samples (1Hz sampling rate)
        run_time -= 1                                               # n-1 seconds left for sampling
    
    print('Sample output complete!')

sampler = Thread(target = run_sampling, args = [run_time])          # Assign our outlet to a separate CPU thread
sampler.start()

# Pull samples from the network

inlet = create_inlet()

while run_time > 0:
    pull_random_sample(inlet)
    run_time -= 1                                                   # the run_time variable has separate instances for this main thread and the sampling thread so we have to deduct this here as well

inlet.close_stream()                                                # Close the inlet! This allows our program to finish. Otherwise, LSL will hang indefinitely waiting to reconnect. This is a great safety feature but for the demo a bit annoying...

2024-05-22 16:34:46.171 (   7.577s) [           4B98C]      netinterfaces.cpp:91    INFO| netif 'lo0' (status: 1, multicast: 32768, broadcast: 0)
2024-05-22 16:34:46.171 (   7.577s) [           4B98C]      netinterfaces.cpp:91    INFO| netif 'lo0' (status: 1, multicast: 32768, broadcast: 0)
2024-05-22 16:34:46.171 (   7.577s) [           4B98C]      netinterfaces.cpp:102   INFO| 	IPv4 addr: 7f000001
2024-05-22 16:34:46.172 (   7.577s) [           4B98C]      netinterfaces.cpp:91    INFO| netif 'lo0' (status: 1, multicast: 32768, broadcast: 0)
2024-05-22 16:34:46.172 (   7.577s) [           4B98C]      netinterfaces.cpp:105   INFO| 	IPv6 addr: ::1
2024-05-22 16:34:46.172 (   7.577s) [           4B98C]      netinterfaces.cpp:91    INFO| netif 'lo0' (status: 1, multicast: 32768, broadcast: 0)
2024-05-22 16:34:46.172 (   7.577s) [           4B98C]      netinterfaces.cpp:105   INFO| 	IPv6 addr: fe80::1%lo0
2024-05-22 16:34:46.172 (   7.577s) [           4B98C]      netinterfaces.cpp:91    I

Outputting random sample: 0.48
20
20
True
Outputting random sample: 0.28
Received sample: 0.2800000011920929 with timestamp 26035.881976166
20
19
True
Outputting random sample: 0.71
Received sample: 0.7099999785423279 with timestamp 26036.882736666
20
18
True
Outputting random sample: 0.08
Received sample: 0.07999999821186066 with timestamp 26037.888398208
20
17
True
Outputting random sample: 0.41
Received sample: 0.4099999964237213 with timestamp 26038.89229275
20
16
True
Outputting random sample: 0.68
Received sample: 0.6800000071525574 with timestamp 26039.895497416
20
15
True
Outputting random sample: 0.37
Received sample: 0.3700000047683716 with timestamp 26040.896275958
20
14
True
Outputting random sample: 0.11
Received sample: 0.10999999940395355 with timestamp 26041.902312666
20
13
True
Outputting random sample: 0.5
Received sample: 0.5 with timestamp 26042.907834958
20
12
True
Outputting random sample: 0.01
Received sample: 0.009999999776482582 with timestamp 26043.91238575
20

Exception in thread Thread-4 (run_sampling):
Traceback (most recent call last):
  File "/Users/haaviso1/miniconda3/envs/lsl-ws/lib/python3.12/threading.py", line 1073, in _bootstrap_inner
    self.run()
  File "/Users/haaviso1/miniconda3/envs/lsl-ws/lib/python3.12/site-packages/ipykernel/ipkernel.py", line 761, in run_closure
    _threading_Thread_run(self)
  File "/Users/haaviso1/miniconda3/envs/lsl-ws/lib/python3.12/threading.py", line 1010, in run
    self._target(*self._args, **self._kwargs)
  File "/var/folders/_s/rhl3j4bj01j0ktphlt6b80l40000gp/T/ipykernel_13198/2707818713.py", line 20, in run_sampling
  File "/Users/haaviso1/miniconda3/envs/lsl-ws/lib/python3.12/threading.py", line 1144, in join
    raise RuntimeError("cannot join current thread")
RuntimeError: cannot join current thread


Sample output complete!
Received sample: 0.9100000262260437 with timestamp 26054.975622958


2024-05-22 16:35:06.274 (  27.679s) [R_workshop_out  ]      data_receiver.cpp:344    ERR| Stream transmission broke off (Input stream error.); re-connecting...


You might have noticed a couple of things above. First of all, the first sample that was pushed to the outlet was not captured by the inlet. This is because the inlet takes a bit to be initialized, making it recommended that you start your logger script slightly ahead of your sampler script. In the example, we could change this by e.g. adding a `sleep` of 1-2 seconds at the beginning of `run_sampling`.

Second, you might have noticed that while the received samples round out to the ones we push to the outlet, they are not exact. This is a floating point error that is inherent to Python's implementation of decimal values (see [this documentation page](https://docs.python.org/3/tutorial/floatingpoint.html) for more information). If you recall, our stream's type was defined as `float32`. If, somehow, the values we output to the outlet should be exactly two decimals long, we could **a)** multiply the rounded values by 10 and convert them to integers, **b)** convert them to [integer ratios](https://docs.python.org/3/library/stdtypes.html#float.as_integer_ratio) before pushing them to the network or **c)** render them as strings before pushing them to the network. 

**HOWEVER**, you probably will be working with devices that output floats with *n* decimals anyway, and the better practice is to **log everything raw** and do the processing during data analysis.