In [1]:
from confluent_kafka import Producer
from pylsl import StreamInlet, resolve_stream
from time import sleep
import json
import uuid

In [2]:
topic = 'lsl-test-2'
max_messages = 10
pause_time = 10 #sec
callback_timeout = 5 #sec

conf = {
    'bootstrap.servers': "localhost:9092",
    'client.id': 'jupyter-kafka-producer'
}

In [4]:
def callback(error, msg):
    if error is not None:
        print("Failed to deliver message: %s: %s" % (str(msg), str(error)))
    else:
        print("Message produced: %s" % (str(msg)))

In [None]:
# LSL consumer

In [3]:
streams = resolve_stream('type', 'Test')
print(streams)

[<pylsl.pylsl.StreamInfo object at 0x105f94b20>]


In [4]:
streams = resolve_stream('name', 'lsl-test-1')
print(streams)

[<pylsl.pylsl.StreamInfo object at 0x105f94ca0>]


In [12]:
streams[0].source_id()

'lalala123'

In [None]:
inlet = StreamInlet(streams[0])

In [3]:
print("looking for an EEG stream...")
streams = resolve_stream('type', 'Test')
print(streams)
# create a new inlet to read from the stream
inlet = StreamInlet(streams[0])

looking for an EEG stream...
[<pylsl.pylsl.StreamInfo object at 0x11109ff70>]


In [5]:
streams[0].__dict__

{'obj': c_void_p(140406094904912)}

In [4]:
inlet.__dict__

{'obj': c_void_p(140406156113920),
 'channel_format': 1,
 'channel_count': 8,
 'do_pull_sample': <_FuncPtr object at 0x1110a9400>,
 'do_pull_chunk': <_FuncPtr object at 0x1110aac40>,
 'value_type': ctypes.c_float,
 'sample_type': pylsl.pylsl.c_float_Array_8,
 'sample': <pylsl.pylsl.c_float_Array_8 at 0x11104e8c0>,
 'buffers': {}}

In [54]:
inlet.info().name()

'lsl-test-2'

In [55]:
inlet.info().type()

'Test'

In [57]:
inlet.info().source_id()

'lalala123'

In [51]:
inlet.time_correction()

-1.8418999388813972e-05

In [None]:
while True:
    # get a new sample (you can also omit the timestamp part if you're not
    # interested in it)
    raw_sample, raw_timestamp = inlet.pull_sample(5.0)
    if raw_sample:
        sample=raw_sample
        timestamp=raw_timestamp
        time_correction = inlet.time_correction()
        print(timestamp, time_correction, sample)

In [None]:
sample

In [None]:
timestamp

In [9]:
def sample_encoder(timestamp, sample):
    sample_dict = dict([ (i,s) for i,s in enumerate(sample)])
    sample_dict['timestamp'] = timestamp
    return json.dumps(sample_dict)

In [None]:
se = sample_encoder(timestamp,sample)
se

In [10]:
def sample_decoder(encoded_sample: str):
    encoded_sample_dict = json.loads(encoded_sample)

    timestamp = None
    if "timestamp" in encoded_sample_dict.keys():
        timestamp = encoded_sample_dict.pop('timestamp')

    sample_keys = list(encoded_sample_dict.keys())
    sample_keys.sort(reverse=False)

    sample = []
    for k in sample_keys:
        sample.append(encoded_sample_dict[k])

    return timestamp, sample

In [None]:
sd = sample_decoder(se)
sd

In [None]:
type(sd[0]),type(sd[1][2])

In [7]:
producer = Producer(conf)

In [11]:
print("looking for an EEG stream...")
streams = resolve_stream('type', 'EEG')
print(streams)
# create a new inlet to read from the stream
inlet = StreamInlet(streams[0])

looking for an EEG stream...
[<pylsl.pylsl.StreamInfo object at 0x10ecce3a0>]


In [13]:
while True:
    print("\nPolling...")
    raw_sample, raw_timestamp = inlet.pull_sample(5.0)
    
    if raw_sample:
        sample=raw_sample
        timestamp=raw_timestamp
        time_correction = inlet.time_correction()
        print(timestamp, time_correction, sample)
    
        key = str(uuid.uuid4())
        print("Sending message with key: {}".format(key))
        producer.produce(
            topic=topic, 
            key=key,
            value=sample_encoder(timestamp, sample),
            timestamp=int(timestamp),
            callback=callback
        )

        # Wait up to <callback_timeout> second for events. Callbacks will be invoked during
        # this method call if the message is acknowledged.
        print("Waiting for the callback")
        producer.poll(callback_timeout)

        print("Making it sync by flushing")
        producer.flush()

#     print("Sleeping for {} sec".format(pause_time))
#     sleep(pause_time)



Polling...
1614211473.4301162 -6.173248402774334e-05 [152.0, 0.941787838935852, 0.9245811700820923, 0.4275631606578827, 0.9112218618392944, 0.6753268241882324, 0.09600523114204407, 0.8444649577140808]
Sending message with key: 0a4ae09e-895d-4ddd-9963-05dadfdd76e2
Waiting for the callback
Message produced: <cimpl.Message object at 0x1134442c0>
Making it sync by flushing

Polling...
1614211473.430201 -6.173248402774334e-05 [153.0, 0.27271273732185364, 0.3871886432170868, 0.9964645504951477, 0.584395706653595, 0.808883786201477, 0.5599417090415955, 0.27649471163749695]
Sending message with key: 68b10515-53ea-4aa1-a009-32de10e8f24b
Waiting for the callback
Message produced: <cimpl.Message object at 0x10ec7fb40>
Making it sync by flushing

Polling...

Polling...
1614211483.430964 -2.44865077547729e-05 [154.0, 0.09962121397256851, 0.05792103707790375, 0.5017016530036926, 0.6539220213890076, 0.4513823688030243, 0.9464772343635559, 0.12815485894680023]
Sending message with key: b74d3d2a-eafe-

KeyboardInterrupt: 