In [3]:
import asyncio
import random
import time
import numpy as np
import os
import sys
import pandas as pd

from exampleDevice import Device

#### Producer: 

In [5]:
device = Device('LINAC:BCM1', 'brianna_redis')
await device.initialize()

Successfully connected to Redis at socket brianna_redis synchronously.


In [6]:
# "stream A" - just noise
data_file_path1 = 'data/tek0002CH2_added_streamA.csv'
    
# "stream B" - signal and noise
data_file_path2 = 'data/tek0002CH2_added_streamB.csv'

In [4]:
numpy_data1 = np.genfromtxt(data_file_path1, dtype=np.int16)

In [5]:
numpy_data2 = np.genfromtxt(data_file_path2, dtype=np.int16)

In [6]:
stream_key_raw = f"{device.base_key}:RAW_DATA" 
stream_key_noise = f"{device.base_key}:RAW_DATA_NOISE"
binary_field = '_'
data_type_field = 'TYPE'
data_type = 'INT16'

##### Raw data (from numpy_data1) - stream A

In [7]:
# raw data, aka "stream A"
data_point_raw = {binary_field: numpy_data1, data_type_field: data_type}

In [8]:
await device.redis_adapter.streamAdd(stream_key_raw, data_point_raw, maxlen=10)

In [9]:
messagesA = await device.redis_adapter.streamReadRange( stream_key_raw, start='+', end='-', count=1, dtype=np.int16)

In [10]:
print(messagesA)


[('LINAC:BCM1:RAW_DATA', b'1721752582918-0', {'_': array([-1, 70, 43, ..., -1, -1, -1], dtype=int16), b'TYPE': b'INT16'})]


In [11]:
messagesA = await device.redis_adapter.streamReadRange( stream_key_raw, start='+', end='-', count=3, dtype=np.int16)

In [12]:
print(messagesA)

[('LINAC:BCM1:RAW_DATA', b'1721752582918-0', {'_': array([-1, 70, 43, ..., -1, -1, -1], dtype=int16), b'TYPE': b'INT16'}), ('LINAC:BCM1:RAW_DATA', b'1721668060848-0', {'_': array([-1, 70, 43, ..., -1, -1, -1], dtype=int16), b'TYPE': b'INT16'}), ('LINAC:BCM1:RAW_DATA', b'1721667882340-0', {'_': array([-1, 70, 43, ..., -1, -1, -1], dtype=int16), b'TYPE': b'INT16'})]


In [13]:
messageA = messagesA[0]

In [14]:
streamA, timeA, dataA = messageA


In [15]:
print(dataA)

{'_': array([-1, 70, 43, ..., -1, -1, -1], dtype=int16), b'TYPE': b'INT16'}


##### Noise data (from numpy_data2) - stream B

In [16]:
# noise data, aka "stream B"
data_point_noise = {binary_field: numpy_data2, data_type_field: data_type}

In [17]:
await device.redis_adapter.streamAdd(stream_key_noise, data_point_noise, maxlen=10)

In [18]:
messagesB = await device.redis_adapter.streamReadRange( stream_key_noise, start='+', end='-', count=1, dtype=np.int16)

In [19]:
print(messagesB)

[('LINAC:BCM1:RAW_DATA_NOISE', b'1721752587288-0', {'_': array([ -1, 131, 164, ...,  -1,  -1,  -1], dtype=int16), b'TYPE': b'INT16'})]


In [20]:
messagesB = await device.redis_adapter.streamReadRange( stream_key_noise, start='+', end='-', count=3, dtype=np.int16)

In [21]:
print(messagesB)

[('LINAC:BCM1:RAW_DATA_NOISE', b'1721752587288-0', {'_': array([ -1, 131, 164, ...,  -1,  -1,  -1], dtype=int16), b'TYPE': b'INT16'}), ('LINAC:BCM1:RAW_DATA_NOISE', b'1721667889243-0', {'_': array([ -1, 131, 164, ...,  -1,  -1,  -1], dtype=int16), b'TYPE': b'INT16'}), ('LINAC:BCM1:RAW_DATA_NOISE', b'1721667782856-0', {'_': array([ -1, 131, 164, ...,  -1,  -1,  -1], dtype=int16), b'TYPE': b'INT16'})]


In [22]:
messageB = messagesB[0]

In [23]:
streamB, timeB, dataB = messageB


#### Consumer: 

#### Process Data (function)

In [24]:
df_streamA = pd.DataFrame(dataA['_'])
print(df_streamA)
df_streamB = pd.DataFrame(dataB['_'])
print(df_streamB)

         0
0       -1
1       70
2       43
3       37
4      155
...    ...
87360   -1
87361   -1
87362   -1
87363   -1
87364   -1

[87365 rows x 1 columns]
         0
0       -1
1      131
2      164
3      153
4      208
...    ...
87360   -1
87361   -1
87362   -1
87363   -1
87364   -1

[87365 rows x 1 columns]


In [25]:
# step 1 - subtraction
df_subtraction = df_streamA - df_streamB
df_subtraction = df_subtraction.iloc[2400:3401]

#def process_signal(df_streamA, df_streamB, sampling_rate=100000, f_corner=70, window_size=3):
def process_signal(df_subtraction, sampling_rate=100000, f_corner=70, window_size=3):
    
    # step 2 - droop correction
    time_step = 1 / sampling_rate 
    droopCorr = 2 * np.pi * f_corner
    num_samples = len(df_subtraction)
    integrated = 0.0
    droopCorr_signal = pd.Series(index=df_subtraction.index, dtype=float)

    for i in range(num_samples - 1): # loop up to num_samples - 1 to avoid accessing out of bounds
        integrated += df_subtraction.iloc[i].iloc[0] * time_step
        droopCorr_signal[i] = df_subtraction.iloc[i].iloc[0]  + integrated * droopCorr

    # if not along origin on y axis, shift signal downward
    min_value = droopCorr_signal.min()
    droopCorr_shifted_signal = droopCorr_signal - min_value

    # step 4 - mode filter
    mode_filtered = droopCorr_signal.rolling(window=window_size, min_periods=1).apply(lambda x: x.mode()[0])

    # step 5 - integrate
    integral_mode = mode_filtered.sum()

    """ plots weird so nvm
    # Plotting droopCorr_signal
    plt.figure(figsize=(10, 6))
    plt.plot(df_subtraction.index, df_subtraction, marker='.', linestyle='-', color='b', label='Subtracted Signal')
    plt.plot(droopCorr_signal.index, droopCorr_signal, marker='.', linestyle='-', color='k', label='Droop Corrected Signal')
    plt.plot(mode_filtered.index, mode_filtered, marker='.', linestyle='-', color='r', label='Mode Filtered Signal')
    plt.title('Droop Correction and Mode Filtering')
    plt.xlabel('Sample Number')
    plt.ylabel('Signal Value')
    plt.xlim(2400, 2800)
    plt.ylim(-1000, 8000)
    plt.legend()
    plt.grid(True)
    plt.show()
    """

    # will need to stream this value back to redis
    return integral_mode
    return df_streamA, df_streamB, df_subtraction, droopCorr_signal, droopCorr_shifted_signal, mode_filtered, integral_mode

In [26]:
integral_mode = process_signal(df_subtraction)
print("Integral of mode_filtered:", integral_mode)

Integral of mode_filtered: 299480.66849806893


Return value of integral_mode back to redis: 

In [27]:
# return value to redis
stream_key_processed = f"{device.base_key}:PROCESSED_DATA"
data_point_processed = {
    binary_field: np.array([integral_mode], dtype=np.int16),
    data_type_field: data_type  # Assuming data_type is defined as 'INT16'
}

In [28]:
await device.redis_adapter.streamAdd(stream_key_processed, data_point_processed, maxlen=10)

In [29]:
messagesC = await device.redis_adapter.streamReadRange( stream_key_processed, start='+', end='-', count=1, dtype=np.int16)
print(messagesC)

[('LINAC:BCM1:PROCESSED_DATA', b'1721752608845-0', {'_': array([-28200], dtype=int16), b'TYPE': b'INT16'})]


In [30]:
messagesC = await device.redis_adapter.streamReadRange( stream_key_processed, start='+', end='-', count=3, dtype=np.int16)
print(messagesC)

[('LINAC:BCM1:PROCESSED_DATA', b'1721752608845-0', {'_': array([-28200], dtype=int16), b'TYPE': b'INT16'}), ('LINAC:BCM1:PROCESSED_DATA', b'1721667907654-0', {'_': array([-28200], dtype=int16), b'TYPE': b'INT16'}), ('LINAC:BCM1:PROCESSED_DATA', b'1721406285694-0', {'_': array([-28200], dtype=int16), b'TYPE': b'INT16'})]


In [31]:
messageC = messagesC[0]
streamC, timeC, dataC = messageC
print(dataC)

{'_': array([-28200], dtype=int16), b'TYPE': b'INT16'}
