# Rolling Mean Processing in Real-time

This Jupyter Notebook is created for performing rolling mean processing on a [spool](https://dascore.org/tutorial/concepts.html#:~:text=read%20the%20docs!-,Data%20structures,-DASCore%20has%20two) of distributed acoustic sensing (DAS) data in real-time. It uses [DASCore](https://dascore.org/) package and the ```lf_das.py``` script. 


<svg width="100%" height="1">
  <line x1="0" y1="0" x2="100%" y2="0" style="stroke:rgb(0,0,0);stroke-width:2" />
</svg>


#### Notes: 
1. Before using this notebook, make sure you have included the ```lf_das.py``` script in the current directory with this notebook and successfully installed DASCore using ```pip``` (recommended) or ```conda```:
    ```python
    pip install dascore
    ```
    or
    ```python
    conda install dascore -c conda-forge
    ```   
2. Please find all supported I/O [here](https://dascore.org/#:~:text=specialized%20analysis/visualization.-,Supported%20file%20formats,-name).
3. You need to have at least 1 patch of data in the directory before you start real-time processing.
 

Current DASCore version: 0.0.13 (tested)

Date: 09/07/2023


Contact: [Ahmad Tourei](https://github.com/ahmadtourei/)

ahmadtourei@gmail.com

In [None]:
# import libraries
import warnings
warnings.simplefilter('ignore')

import dascore as dc
import matplotlib.pyplot as plt
import numpy as np
import time 

from dascore.units import s # s as seconds
from datetime import datetime
from lf_das import _get_filename


### Get a spool of data to work on

In [None]:
# define data path (spool of data) and output folder 
data_path = '/mnt/h/data'
output_data_folder =  '/mnt/h/results'
output_figure_folder = '/mnt/h/figures'

# get the sorted spool of data form the defined data path (on first run, it will index the patches and subsequently update the index file for future uses)
sp = dc.spool(data_path).sort("time").update()

# print the contents of first 5 patches
content_df = sp.get_contents()
content_df.head()

### Get some metadata and define a sub spool (if needed)

In [None]:
# get sampling rate, channel spacing, and gauge length from the first patch
patch_0 = sp[0]
gauge_length = patch_0.attrs['gauge_length']
print("Gauge length = ", gauge_length)
channel_spacing = patch_0.attrs['d_distance']
print("Channel spacing = ", channel_spacing)
sampling_interval = patch_0.attrs['d_time']
print("Sampling interval = ", sampling_interval)
sampling_rate = 1/(sampling_interval / np.timedelta64(1, 's'))
print("Sampling rate = ", sampling_rate)
num_sec = len(patch_0.coords["time"])/sampling_rate
print("Number of seconds in each patch= ", num_sec)

# select a sub-spool
ch_start = 400
ch_end = 1400
d_1 = patch_0.coords['distance'][ch_start] # in meter
d_2 = patch_0.coords['distance'][ch_end] # in meter
# or:
# d_1 = -115 # in meter
# d_2 = 2000 # in meter
sub_sp = sp.select(distance=(d_1, d_2)) 


### Set real-time processing parameters

In [None]:
# define the target sampling interval in seconds
d_t = 10.0 # so, cutoff_freq = Nyq_new = 1/(2*d_t) = 0.05 hz

# determine window size in sec.
window = d_t*s

# determine step size in sec.
step = d_t*s

# define the scale to apply to the raw data
scale_iDAS = float((116*sampling_rate/gauge_length)/1e9)

# set the desired wait time after each run
time_step_for_processing = num_sec # in sec.

print("time_step_for_processing: ", time_step_for_processing, "\n")
print("patch length: ", num_sec, "\n")

# set the starting time from which low-freq. processing applys (it can be the time_min for the first patch of the spool)
start_processing_time = np.datetime64('2023-03-22T06:00:00') # in UTC, or any other time zone that original data are stored \


### Do rolling mean processing in real-time

In [None]:
# start the for loop for real-time processing
initial_run = True
while True:
    # select a updated sub-spool
    sp = dc.spool(data_path).sort("time").update()
    sub_sp = sp.select(distance=(d_1, d_2))
    len_updated_sp = len(sub_sp)

    if not initial_run:
        num_added_patches = len_updated_sp - len_last_sp
        if len_last_sp == len_updated_sp:
            print("No new data was detected. Real-time data processing ended successfully.")
            break

    if initial_run:
        i=1
        print("\nrun number: ", i)
        for j, patch in enumerate (sub_sp):
            print ("working on patch ", j)
            # apply rolling mean function
            rolling_mean_patch = patch.rolling(time=window, step=step, engine="numpy").mean()
            # scale data
            new_scaled_patch = rolling_mean_patch.new(data=rolling_mean_patch.data*scale_iDAS) 

            # save the result to output folder
            filename = _get_filename(new_scaled_patch.attrs['time_min'],
                        new_scaled_patch.attrs['time_max'])
            filename = output_data_folder + '/' + filename 
            new_scaled_patch.io.write(filename, "dasdae") 

        initial_run = False 
        len_last_sp = len(sub_sp)
        time.sleep(time_step_for_processing)
    else:
        i+=1
        print("\nrun number: ", i)
        for j in range(len_last_sp,len_updated_sp):
            patch = sub_sp[j]
            print ("working on patch ", j)
            # apply rolling mean function
            rolling_mean_patch = patch.rolling(time=window, step=step, engine="numpy").mean()
            # scale data
            new_scaled_patch = rolling_mean_patch.new(data=rolling_mean_patch.data*scale_iDAS) 

            # save the result to output folder
            filename = _get_filename(new_scaled_patch.attrs['time_min'],
                        new_scaled_patch.attrs['time_max'])
            filename = output_data_folder + '/' + filename 
            new_scaled_patch.io.write(filename, "dasdae") 
        len_last_sp = len(sub_sp)
        time.sleep(time_step_for_processing)
