# Inspecting datastreams

Goal is to find the nature of the data, and inspect the timestamps and rates of incoming data streams

In [None]:
import matplotlib.pyplot as plt
import datetime
import tilemapbase as tmb
import numpy as np
import pandas as pd
from pluma.stream.georeference import Georeference
from pluma.stream.ubx import _UBX_MSGIDS
from pluma.schema import Dataset
from IPython.display import clear_output
from FmulSchema import custom_schema


## Notebook plotting
%matplotlib inline
%matplotlib widget

plt.style.use('ggplot')

## Figure export parameters
new_rc_params = {'text.usetex': False,
"svg.fonttype": 'none'
}
import matplotlib as mpl
mpl.rcParams.update(new_rc_params)

## Check the errors in the code below when if below load from remote is True 
Another point is that environment.yaml should have specific versions of libraries that we are loading 
We need to update readme.md in order to explain how to get data from aws

# Set the paths to the dataset and build the dataset

In [None]:
LOAD_FROM_REMOTE = True
root = r"C:\Users\YourName\Desktop"

stream_root_folder = 's3://emotional-cities/data/nrg/poc-v1/FMUL/FMUL2022_10_21_11_20_53/'  # Path to the dataset. Can be local or remote.

#dataset = Dataset(stream_root_folder, datasetlabel="FMUL_" + stream_root_folder.split("\\")[-1], georeference= Georeference())  # Create a Dataset object that will contain the ingested data.
dataset = Dataset(
    stream_root_folder,
    datasetlabel="FMUL_" + stream_root_folder.split("\\")[-1],
    georeference= Georeference(),
    schema=custom_schema)
dataset.populate_streams(autoload=False)  # Add the "schema" that we want to load to our Dataset. If we want to load the whole dataset automatically, set autoload to True.

if LOAD_FROM_REMOTE:
# To load a single stream, we can set the autoload property to "True" and use the Dataset.reload_streams method. In this case, we will load all streams by default
    if False:
        dataset.streams.EEG.autoload = True
        dataset.streams.UBX.autoload = True
        dataset.reload_streams(force_load=False)
    # For now, we will build the whole dataset:
    else:
        dataset.reload_streams(force_load=True)  # We will just load every single stream at the same time. This might take a while if loading from AWS
        dataset.add_georeference_and_calibrate()
        dataset.export_dataset(filename=f"{root}\dataset.pickle") # We can export the dataset as a .pickle file.

    # Some warnings will be printed if some sensors were not acquired during the experiment. These are normal and can be usually ignored.

# In order to not having to run this routine multiple times, the output of the ingestion can be saved as a pickle file to be loaded later. E.g.:
else:
    dataset = Dataset.import_dataset(f"{root}\dataset.pickle")  # ... and reimport it at a later point.

print(f"Dataset: {dataset} loaded successfully, and {'not' if not dataset.has_calibration else 'sucessfully'} calibrated." )



Define function to find the basic statistics and plot the difference between adjacent timestamps

In [None]:
def diff_stats(stream, start = 100, end = 200, xlabel = 'Time', ylabel = ''):
    try:
        diff_series = stream.data.index.to_series().diff().dropna().astype(np.int64)  / int(1e6)
        mini = diff_series[1:].min()
        maxi = diff_series[1:].max()
        meani = diff_series[1:].mean()
        medi = diff_series[1:].median()
        print(f'Minimum = {mini}; Maximum = {maxi}; Mean = {meani}; Median = {medi}; FrameRate = {1000/medi}') 
        # names = stream.data.columns
        
        plt.figure()
        plt.plot(diff_series)
        plt.title(stream.streamlabel + ' Difference')
        plt.ylabel('Time Difference (ms)')
        plt.xlabel(stream.data.index.name)
        
        # plt.figure()
        # plt.title('Data')
        stream.data.plot(title = stream.streamlabel)

        # plt.plot(data)
        # plt.title('Data')
        # plt.ylabel(ylabel)
        # plt.xlabel(xlabel)

        print(diff_series[start:end])
    except:
        clear_output(wait=False)
    return diff_series

def diff_stats_data_frame(stream, start = 100, end = 200, xlabel = 'Time', ylabel = '', label=' '):
    stream = stream.select_dtypes(include=np.number)
    diff_series = stream.index.to_series().diff().dropna().astype(np.int64)  / int(1e6)
    mini = diff_series[1:].min()
    maxi = diff_series[1:].max()
    meani = diff_series[1:].mean()
    medi = diff_series[1:].median()
    print(f'Minimum = {mini}; Maximum = {maxi}; Mean = {meani}; Median = {medi}; FrameRate = {1000/medi}') 
    # names = stream.data.columns
    
    plt.figure()
    plt.plot(diff_series)
    plt.title(label + ' Difference')
    plt.ylabel('Time Difference (ms)')
    plt.xlabel(stream.index.name)
    
    #plt.figure()
    stream.plot(title = label)
    print(diff_series[start:end])

    return diff_series

def diff_stats_empatica(stream, start = 100, end = 200, xlabel = 'Time', ylabel = '', label=' '):
    stream.reset_index(inplace=True)
    stream.set_index('E4_Seconds', inplace=True)
    stream = stream.select_dtypes(include=np.number)
    diff_series = stream.index.to_series().diff().dropna().astype(np.int64)  / int(1e6)
    mini = diff_series[1:].min()
    maxi = diff_series[1:].max()
    meani = diff_series[1:].mean()
    medi = diff_series[1:].median()
    print(f'Minimum = {mini}; Maximum = {maxi}; Mean = {meani}; Median = {medi}; FrameRate = {1000/medi}') 
    # names = stream.data.columns
    
    plt.figure()
    plt.plot(diff_series)
    plt.title(label + ' Difference')
    plt.ylabel('Time Difference (ms)')
    plt.xlabel(stream.index.name)
    
    #plt.figure()
    stream.plot(title = label)
    print(diff_series[start:end])

    return diff_series


### Check basic statistics and plot timestamp difference

[ECG](https://neurogears.sharepoint.com/:b:/s/EmotionalCities/EYOX02N88hRHnUCdREf_kq0BEoxvZY92nHfPOPZmq7Ua3Q?e=xWQPvN) 1KHz


In [None]:
ecg = dataset.streams.BioData.ECG
# ecg.streamlabel
diff_stats(ecg)


Microphone 

Microphone has two streams currently captured data is being done at 44100 kHz with buffers od 0.1 ms which means that every buffer has 5 samples
 1. The timestamps for each buffer sample 
 2. Each buffer data 

In [None]:
audio_timestamps = dataset.streams.Microphone.BufferIndex
diff_stats(audio_timestamps)

In [None]:
audio_data = dataset.streams.Microphone.Audio.data
number_of_audio_buffers = audio_data.shape[0]/5

unique_set = sorted(set(pd.DataFrame(audio_timestamps.data).index))

In [None]:
print('Difference between number of audio timestamps and number of audio buffers')
print(number_of_audio_buffers - audio_timestamps.data.shape[0])
print(f'audio_timestamps = {audio_timestamps.data.shape[0]}')
print(f'number_of_audio_buffers = {number_of_audio_buffers}')
print('Unique Timestamps :')
unique_set

## Tinkerforge GPS V2
From [tinkerforge webpage](https://www.tinkerforge.com/en/doc/Hardware/Bricklets/GPS_V2.html)
Supports GPS and GLONASS simultaneously
Receives movement-, position-, altitude, time data and PPS signal
Elevation, azimuth and SNR for each GPS/GLONASS satellite accessible  
99 channels  
The rate is 10Hz  

 - Latitude 
 - Longitude 
 - Altitude
 - Date 
 - Time
 - Has Fix


### Latitude 
Is only measured when GPS receives satellite signals

In [None]:
tk_latitude = dataset.streams.TK.GPS.Latitude 
diff_stats(tk_latitude)

### Longitude  
Is only measured when GPS receives satellite signals

In [None]:
tk_longitude = dataset.streams.TK.GPS.Longitude 
diff_stats(tk_longitude)

### Altitude  
Is only measured when GPS receives satellite signals

In [None]:
tk_altitude = dataset.streams.TK.GPS.Longitude 
diff_stats(tk_altitude)

### Date of the day  
Currently named Data (should be renamed to date)

In [None]:
tk_date = dataset.streams.TK.GPS.Data 
diff_stats(tk_date)

### Time 
hours minutes and seconds of the day

In [None]:
tk_time = dataset.streams.TK.GPS.Time 
diff_stats(tk_time)

### HasFix  
Means that GPS has a position info from the satellites

In [None]:
tk_has_fix = dataset.streams.TK.GPS.HasFix 
diff_stats(tk_has_fix)

## Air quality 
From [tinkerforge webpage](https://www.tinkerforge.com/en/doc/Hardware/Bricklets/Air_Quality.html)
Measures IAQ (indoor air quality) index, air pressure, humidity and temperature
IAQ index and humidity values are temperature compensated  
Configurable temperature compensation for use cases in enclosures
 - IAQ (indoor air quality) index
 - Temperature in °C
 - Humidity in %RH
 - Air pressure in hPa  



### IAQ (indoor air quality) index  
The IAQ index is dependent on the other measures in this sensor.  
It has a range of 0-500  
The rate is 10 Hz

In [None]:
tk_air_quality = dataset.streams.TK.AirQuality.IAQIndex
diff_stats(tk_air_quality)

### Temperature 
Measured in °C

In [None]:
tk_temperature = dataset.streams.TK.AirQuality.Temperature
diff_stats(tk_temperature)

### Humidity  
Measured in %RH (relative humidity)

In [None]:
tk_humidity = dataset.streams.TK.AirQuality.Humidity
diff_stats(tk_humidity)

### Air pressure  
Measured in hPa (hectoPascals == 100 Pa)

In [None]:
tk_air_pressure = dataset.streams.TK.AirQuality.AirPressure
diff_stats(tk_air_pressure)

## Tinkerforge Sound pressure 
From [tinkerforge webpage](https://www.tinkerforge.com/en/doc/Hardware/Bricklets/Sound_Pressure_Level.html)
The values stored are in dbx10  
The rate is 100Hz


In [None]:
tk_sound_pressure = dataset.streams.TK.SoundPressureLevel.SPL
diff_stats(tk_sound_pressure)

In [None]:
ax = (tk_sound_pressure.data/10).plot(title = 'dB')
ax.set_ylabel("Sound Pressure Level (dB)")


## Tinkerforge Humidity 
From [tinkerforge webpage](https://www.tinkerforge.com/en/doc/Hardware/Bricklets/Humidity_V2.html)
Values stored are in relative humidity x 100.0

In [None]:
tk_humidity = dataset.streams.TK.Humidity.Humidity
diff_stats(tk_humidity)

In [None]:
ax = (tk_humidity.data/100).plot(title = 'RH%')
ax.set_ylabel("Relative Humidity (%)")

### Analog In Voltage
This is connected to the sync pulse that we are sending from the pluma board  
Measured in volts and the signal we are sending is in 3.5 v  
The rate is 100Hz

In [None]:
tk_analog_in = dataset.streams.TK.AnalogIn.Voltage
diff_stats(tk_analog_in)
dataset.streams.BioData.Set.data.reset_index().plot(kind='scatter', x='Seconds', y = 'Value')

## Tinkerforge Particulate matter 
From [tinkerforge webpage](https://www.tinkerforge.com/en/doc/Hardware/Bricklets/Particulate_Matter.html)  
Values are stored in µg/m³  
  
Three streams:  
- PM10
- PM25
- PM100

### PM10

In [None]:
tk_particulate_1 = dataset.streams.TK.ParticulateMatter.PM1_0
diff_stats(tk_particulate_1)

### PM25

In [None]:
tk_particulate_25 = dataset.streams.TK.ParticulateMatter.PM2_5
diff_stats(tk_particulate_25)

### PM100

In [None]:
tk_particulate_100 = dataset.streams.TK.ParticulateMatter.PM10_0
diff_stats(tk_particulate_100)

## Solar light
From [tinkerforge website](https://www.tinkerforge.com/en/doc/Hardware/Bricklets/Industrial_Dual_020mA_V2.html#industrial-dual-0-20ma-v2-bricklet)  
Currently do not have access to the datasheet for the sensor itself. TK only because data is routed through TK device  
Units from the device are milliAmps x 1000000  
The rate is 100Hz

In [None]:
tk_solar_light = dataset.streams.TK.Dual0_20mA.SolarLight
diff_stats(tk_solar_light)

In [None]:
tx = (tk_solar_light.data/1000000).plot(title = 'milliAmps')
tx.set_ylabel("mA")

## Tinkerforge Radiant Temperature (Thermocouple)
From [tinkerforge webpage](https://www.tinkerforge.com/en/doc/Hardware/Bricklets/Thermocouple_V2.html)  
Values are stored in degrees Celsius x 100

In [None]:
tk_thermocouple = dataset.streams.TK.Thermocouple.Temperature
diff_stats(tk_thermocouple)

In [None]:
tx = (tk_thermocouple.data/100).plot(title = 'º Celsius')
tx.set_ylabel("Degrees Celsius")

## Tinkerforge AirTemp

From [tinkerforge website](https://www.tinkerforge.com/en/doc/Hardware/Bricklets/PTC_V2.html)  
Values are stored as degrees Celsius x 100  
The rate is 100Hz

In [None]:
tk_air_temp = dataset.streams.TK.PTC.AirTemp
diff_stats(tk_air_temp)

In [None]:
tx = (tk_air_temp.data/100).plot(title = 'º Celsius')
tx.set_ylabel("Degrees Celsius")

new (
it.Item1 as NorthWindSpeed,
it.Item2 as EastWindSpeed, 
it.Item3 as GustWindSpeeed,
it.Item4 as AirTemperature,
it.Item5 as XOrientation,
it.Item6 as YOrientation,
it.Item7 as NullValue)

In [None]:
Atmos_data_north = dataset.streams.Atmos.NorthWind
diff_stats(Atmos_data_north)

### Atmos Temperature 

In [None]:
tx = (dataset.streams.Atmos.AirTemperature.data).plot(title = 'º Celsius')
tx.set_ylabel("Degrees Celsius")
tx = (dataset.streams.Atmos.XOrientation.data).plot(title = 'X Angle (degrees)')
tx.set_ylabel("Angle (º)")
tx = (dataset.streams.Atmos.YOrientation.data).plot(title = 'Y Angle (degrees)')
tx.set_ylabel("Angle (º)")

### Accelerometer
The rate is 50Hz  

From this sensor we have several datastreams, each with 3 axes (X,Y,Z):  

- Orientation            - measured as an angle (degrees)
- Gyroscope              - measured as angular velocity (degrees per second)
- Magnetometer           - measured as acceleration (m/s²)
- Linear acceleration    - measured as acceleration (m/s²)
- Accl                   - measured as acceleration (m/s²)
- Gravity                - measured as acceleration (m/s²)

In [None]:
tx = pd.DataFrame(dataset.streams.Accelerometer.data.Temperature).plot(title = 'Temperature (degrees Celcius)')
tx.set_ylabel("Temperature (ºC)")

In [None]:

ori = pd.DataFrame(dataset.streams.Accelerometer.data[['Orientation_X', 'Orientation_Y', 'Orientation_Z']])
gyro = pd.DataFrame(dataset.streams.Accelerometer.data[['Gyroscope_X', 'Gyroscope_Y', 'Gyroscope_Z']])
magnet = pd.DataFrame(dataset.streams.Accelerometer.data[['Magnetometer_X', 'Magnetometer_Y', 'Magnetometer_Z']])
linear = pd.DataFrame(dataset.streams.Accelerometer.data[['LinearAccl_X', 'LinearAccl_Y', 'LinearAccl_Z']])
acc = pd.DataFrame(dataset.streams.Accelerometer.data[['Accl_X', 'Accl_Y', 'Accl_Z']])
gravity = pd.DataFrame(dataset.streams.Accelerometer.data[['Gravity_X', 'Gravity_Y', 'Gravity_Z']])

In [None]:
fig, axs = plt.subplots(2, 3, figsize = (16,12))

ori.plot(title = 'Orientation', ax = axs[0,0], layout = 'tight', ylabel = 'Angle (º)')
gyro.plot(title = 'Gyroscope', ax = axs[0,1], layout = 'tight', ylabel = 'Angular velocity (º/s)')
magnet.plot(title = 'Magnetometer', ax = axs[1,0], layout = 'tight', ylabel = 'Angle (º)')
linear.plot(title = 'Linear Acceleration', ax = axs[0,2], layout = 'tight',  ylabel = 'm/s²')
acc.plot(title = 'Accl', ax = axs[1,1], layout = 'tight', ylabel = 'm/s²')
gravity.plot(title = 'Gravity', ax = axs[1,2], layout = 'tight', ylabel = 'm/s²')

## Empatica [E4](https://box.empatica.com/documentation/20141119_E4_TechSpecs.pdf)

Empatica Streaming server is used to collect [data](https://developer.empatica.com/windows-streaming-server-data.html) from the device 

Used mainly for GSR

### Data keys:
 - 'E4_Acc' - AccX(facing usb slot)  AccY(facing shorter strap) AccZ (facing the bottom) - measured in 	m/s2
 - 'E4_Battery' - % of battery between 0 and 1
 - 'E4_Bvp' - Measures the light reflectance of the emmited light by the sensor [reference1](https://support.empatica.com/hc/en-us/articles/360029719792-E4-data-BVP-expected-signal) [reference2](https://support.empatica.com/hc/en-us/articles/204954639-Utilizing-the-PPG-BVP-signal) 
 - 'E4_Gsr' - measured in microsiemens
 - 'E4_Hr' - The value of the detected heartbeat, returned together with the interbeat interval data.
 - 'E4_Ibi' - The value is the distance from the previous detected heartbeat in seconds
 - 'E4_Temperature' - The value of the temperature sample in Celsius degrees. The value is derived from the optical temperature sensor placed on the wrist.
 - 'E4_tag' - The button on the wristband was pressed
 - 'R' -  String with responses from requests sent to the wristband

In [None]:
print(list(dataset.streams.Empatica.data.keys()))
dataset.streams.empatica.data['R']

## E4_Acc 
31.5 Hz

In [None]:
e4_acc = pd.DataFrame(dataset.streams.Empatica.data['E4_Acc'])
diff_stats_empatica(e4_acc, label='Acc')
e4_acc

### E4_Battery  
% of battery between 0 and 1  
The rate is 0.05 Hz

In [None]:
e4_bat = pd.DataFrame(dataset.streams.Empatica.data['E4_Battery'])
diff_stats_empatica(e4_bat)
e4_bat

### Strange Data 
While looking at data we found many repetitions with the same timestamp, we guess this sensor only reports when there is a change in the value and the repetitions are due to the chatter back and forth you can get each moment a value changes reported in batches over bluetooth.

In [None]:
fig, axs = plt.subplots(2, 2)
e4_bat.reset_index(inplace=True)
e4_bat.set_index('E4_Seconds', inplace=True)
axs[0, 0].scatter(e4_bat.index, e4_bat['Value'])
axs[0, 0].set_title('Battery E4_Seconds Scatter')
axs[0, 0].set_xlabel('Time')

e4_bat.plot(kind='hist', ax=axs[0, 1], title='Distribution E4_Seconds', legend=False)

e4_bat.reset_index(inplace=True)
e4_bat.set_index('Seconds', inplace=True)

axs[1, 0].scatter(e4_bat.index, e4_bat['Value'])
axs[1, 0].set_title('Battery Seconds Scatter')
axs[1, 0].set_xlabel('Time')

e4_bat.plot(kind='hist', ax=axs[1, 1], title='Distribution Seconds', legend=False)

plt.tight_layout()
plt.show()

## E_4 BVP

Is a photodiode that measures the reflection of green and red light in the blod under the skin  
The BVP signal is obtained from the PPG sensor by a proprietary algorithm which combines the two types of signal  
The rate is 64 Hz

In [None]:
e4_bvp = pd.DataFrame(dataset.streams.Empatica.data['E4_Bvp'])
diff_stats_empatica(e4_bvp)

e4_bvp

### E4 GSR
Measured in in microsiemens.  
The rate of 4 Hz


In [None]:
e4_gsr = pd.DataFrame(dataset.streams.Empatica.data['E4_Gsr'])
series = diff_stats_empatica(e4_gsr)
e4_gsr

### E4_Hr

Measures the hear rate in beats per minute  
The rate is 1.56 Hz

In [None]:
e4_hr = pd.DataFrame(dataset.streams.Empatica.data['E4_Hr'])
series = diff_stats_empatica(e4_hr) 
e4_hr

### E4_Ibi 
The value is the time from the previous detected heartbeat in seconds  
The rate is 1.56 Hz 

In [None]:
e4_ibi = pd.DataFrame(dataset.streams.Empatica.data['E4_Ibi'])
series = diff_stats_empatica(e4_ibi) 
e4_ibi

### E4_Temperature
The value of the temperature sample in degrees Celsius.  
The value is derived from the optical temperature sensor placed on the wrist.  
The rate is 4Hz

In [None]:
e4_temp = pd.DataFrame(dataset.streams.Empatica.data['E4_Temperature'])
series = diff_stats_empatica(e4_temp) 
e4_temp

## PupilLabs

### Decoded Fames Counter  
There is the need to account for the invalid frames received when calculating the timestamp for each frame  
The rate is 32Hz

In [None]:
pupil_decoded_frames = dataset.streams.PupilLabs.Counter.DecodedFrames
diff_stats(pupil_decoded_frames)

import cv2

pupilVideo = cv2.VideoCapture(stream_root_folder+r"\pupil_video.avi")
avi_length = int(pupilVideo.get(cv2.CAP_PROP_FRAME_COUNT))
# Valid Frame Counters 
print(f"Video lenght = {avi_length}" )
count_decoded_frames_count = pupil_decoded_frames.data.loc[pupil_decoded_frames.data['Value'] == 1].shape[0]
print(f"Valid decoded frames counter lenght {count_decoded_frames_count}")
print(f"All decoded frames counter lenght {pupil_decoded_frames.data.shape[0]}")

#print(f'Minimum = {mini}; Maximum = {maxi}; Mean = {meani}; Median = {medi}') 

## Pupil raw frames 
The rate is 32 Hz

In [None]:
pupil_raw_frames = dataset.streams.PupilLabs.Counter.RawFrames
diff_stats(pupil_raw_frames)


In [None]:
dataset.streams.PupilLabs.Counter.IMU.data

## Pupil Gaze
The rate is 250Hz

In [None]:
pupil_gaze = dataset.streams.PupilLabs.Counter.Gaze
diff_stats(pupil_gaze)

In [None]:
dataset.streams.PupilLabs.Counter.Audio.data

In [None]:
dataset.streams.PupilLabs.Counter.Key