In [19]:
import neurokit2 as nk

In [30]:
import numpy as np
import pandas as pd
import asyncio
import nest_asyncio
import logging
from python_phoenix_client import PhoenixChannelClient
import requests
import time

logging.basicConfig(level=logging.INFO) # Set Logging Level
save_path = "/home/jovyan"

duration = 10
#sampling_rate = 100
sampling_rate = 20
heart_rate = 200
respiratory_rate = 15
# EDA
scr_number = 10
# EMG
burst_number = 10

# Generate synthetic signals
ecg = nk.ecg_simulate(duration=duration, sampling_rate=sampling_rate, heart_rate=heart_rate)
ppg = nk.ppg_simulate(duration=duration, sampling_rate=sampling_rate, heart_rate=heart_rate)
rsp = nk.rsp_simulate(duration=duration, sampling_rate=sampling_rate, respiratory_rate=respiratory_rate)
eda = nk.eda_simulate(duration=duration, sampling_rate=sampling_rate, scr_number=scr_number)
emg = nk.emg_simulate(duration=duration, sampling_rate=sampling_rate, burst_number=burst_number)
eda = eda[:len(ecg)]

# Visualise biosignals
data = pd.DataFrame({"ECG": ecg,
                     "PPG": ppg,
                     "RSP": rsp,
                     "EDA": eda,
                     "EMG": emg})
#nk.signal_plot(data, subplots=True)

Skipping frequency modulation, since the modulation_strength 0.2 leads to physiologically implausible wave durations of 240.0 milliseconds.
Skipping random IBI modulation, since the offset_weight 0.1 leads to physiologically implausible wave durations of 269.99999999999983 milliseconds.


In [31]:
# create a time delta list for each signal
def time_delta(samples, delta, noise_range_percentage = 1):
    dt_list = [delta] * samples
    noise_number = (delta) / 100 * noise_range_percentage
    noise_range = (-noise_number,noise_number)
    dt_list_noisy = [x + np.random.uniform(*noise_range) for x in dt_list]
    return dt_list, dt_list_noisy

In [32]:
delta = 100 / sampling_rate # milliseconds
dt_list, dt_list_noisy = time_delta(len(ecg), delta, noise_range_percentage=1)

In [None]:
#nk.signal_plot(dt_list_noisy)

In [33]:
time_deltas = np.array(dt_list) # or dt_list_noisy

In [34]:
# saves the signals and the time deltas to csv files
for i, (key, signal) in enumerate(data.items()):
    signal = np.array(signal)
    combined = np.column_stack((signal, time_deltas))
    np.savetxt(f"{save_path}/{key}.csv", combined, delimiter=",")
    print(f"Saved {key} signal to {save_path}/{key}.csv")

Saved ECG signal to /home/jovyan/ECG.csv
Saved PPG signal to /home/jovyan/PPG.csv
Saved RSP signal to /home/jovyan/RSP.csv
Saved EDA signal to /home/jovyan/EDA.csv
Saved EMG signal to /home/jovyan/EMG.csv


In [35]:
async def stream_sensor_data(client, sensor_type, sensor_id, data, time_deltas, sampling_rate):
    for i, point in enumerate(data):
        payload = {
            'time': time.time(),
            'value': point,
            'uuid': sensor_type
           }
        print(payload)
        try:
            await client.push(f"sensor_data:{sensor_id}", "measurement", payload)
        except Exception as e:
            logging.error(f"Error pushing message: {e}")
            return
        await asyncio.sleep(time_deltas[i]/1000)


def handle_message(topic, event, payload):
    logging.info(f"Received message on topic: {topic}, event: {event}, payload: {payload}")


async def main():
    try:
        test_url = 'http://192.168.1.64:4000'
        response = requests.get(test_url, timeout=5)
        response.raise_for_status()
        logging.info(f'Successfully connected to {test_url} Status code: {response.status_code}')
    except requests.exceptions.RequestException as e:
        logging.error(f'Failed to connect to {test_url}. Error: {e}')
        return # exit if server is not reachable
   
    
    sensor_type = "ecg"  # Set the sensor type to ECG
    socket_url = "ws://192.168.1.64:4000/socket/websocket"  # Replace with your Phoenix socket URL
    client = PhoenixChannelClient(socket_url, handle_message)
    logging.info(f"Connecting to: {socket_url}")

    sensor_id = f"sensor_1:{sensor_type}"
    await client.subscribe(f"sensor_data:{sensor_id}", {"device_name": sensor_id})

    await client.connect()

    logging.info(f"Start streaming {sensor_type} data")

    ecg_data = data['ECG'].tolist()
    await stream_sensor_data(client, sensor_type, sensor_id, ecg_data, time_deltas, sampling_rate)

    logging.info("Finished sending data")
    await client.close()


In [36]:
nest_asyncio.apply()  # Apply nest_asyncio
asyncio.run(main())

INFO:root:Successfully connected to http://192.168.1.64:4000 Status code: 200
INFO:root:Connecting to: ws://192.168.1.64:4000/socket/websocket
INFO:root:WebSocket connection opened
INFO:root:Joining channel: sensor_data:sensor_1:ecg
INFO:root:Start streaming ecg data
ERROR:root:Error pushing message: 'ClientConnection' object has no attribute 'closed'
INFO:root:Finished sending data
INFO:root:Received message on topic: sensor_data:sensor_1:ecg, event: phx_reply, payload: {'status': 'ok', 'response': {}}
INFO:root:Received message on topic: sensor_data:sensor_1:ecg, event: presence_state, payload: {}


{'time': 1737302414.270454, 'value': 1.1726788677240707, 'uuid': 'ecg'}
