# Using Dask to process DAS data: "Seven Trees" Aftershock (10/25/2022) 
[//]: <> (Notebook Author: Thomas Cullison, Stanford University, Feb. 2023)


In this lab we will be using [Dask](https://docs.dask.org/en/stable/) to accelerate/parallelize the processing of distributed acoustic sensing (DAS) data. In particular, we will be processing data that was recorded during the aftershock of an earthquake that took place near the Seven Trees, CA area in October of 2022.  These data were recorded by a DAS interrogator that was connected to fiber-optic cables that traverse the area around the Stanford campus.


## External Resources
If you have any question regarding some specific Python functionality you can consult the official [Python documenation](http://docs.python.org/3/).

### Dask

* [Main Page](https://docs.dask.org/en/stable/)
* [General Tutorials \& Talks](https://docs.dask.org/en/stable/presentations.html)
* [Dask-Futures (more specific to this lab)](https://docs.dask.org/en/stable/futures.html)
* [Futures Tutorial](https://tutorial.dask.org/05_futures.html)
* [Github with Tutorial Jupyter Notebooks](https://github.com/dask/dask-tutorial)
* [High-level, but Extensive Overview Tutorial](https://youtu.be/EybGGLbLipI)


### DAS

* [Distributed Fiber-Optic Sensing](https://youtu.be/LAcQ44YRMuM): Overview of DAS and related technologies


## Required Preperation

Take a look at the API documentation for the following *dask.distributed* objects and functions.

* [Dask-Futures API](https://docs.dask.org/en/stable/futures.html#api)
* [Client](https://docs.dask.org/en/stable/futures.html#distributed.Client)
* [LocalCluster](https://distributed.dask.org/en/latest/api.html#distributed.LocalCluster)
* [Client.map()](https://docs.dask.org/en/stable/futures.html#distributed.Client.map)
* [wait()](https://docs.dask.org/en/stable/futures.html#distributed.wait)
* [Client.who_has()](https://docs.dask.org/en/stable/futures.html#distributed.Client.who_has)
* [Client.gather()](https://docs.dask.org/en/stable/futures.html#distributed.Client.gather)
* [Client.cancel()](https://docs.dask.org/en/stable/futures.html#distributed.Client.cancel)
* [Client.scatter()](https://docs.dask.org/en/stable/futures.html#distributed.Client.gather)

Also, please take a look at the following examples.

* [Dask-Futures Examples](https://examples.dask.org/futures.html)

<br><br>

## Exercise 0

Please answer the following questions below. (simple answers)

1. If one wishes to start a *Local Cluster*, what parameter, if any, needs to be passed to the *Client* constructor?

<br>

2. What is the difference between the following two *Cluster* parameters: *n_workers* and *threads_per_worker*?

<br>

3. How can one get a link to the *Dashboard* from a *Client*?

<br>

4. How can a *LocalCluster* that is associated with a *Client* be rescaled?

<br>

5. What does the *Client.map()* function return?

<br>

6. What does the *wait()* function do?

<br>

7. What does the *Client.who_has()* function do?

<br>

8. What does the *Client.gather()* function return, and where does the memory exist for the thing or things that are returned (e.g. in the memory of the worker(s), or in the memory of the "host-python-thread" (e.g. Jupyter-Notebook thread) that is interacting with the *Client* and *LocalCluster*)?

<br>

9. Can copies of the same memory exist both in the workers and on the "host-python-thread"?

<br>

10. What function can be used to "clean-up/release" the memory held by a worker or workers?

<br>

11. How can one rescale the number of workers in a cluster?

<br>

12. How can data that is stored in the "host-python-thread" be distributed to workers in a *Cluster*/*LocalCluster*?

<br><br>

## Exercise 1: Spin-up Dask-Cluster and Get Raw DAS Data

**Tasks for this exercise:**
* Import all the python modules that we need
* Define the functions we need to get the DAS data from the Cloud and to store the data into *numpy* arrays. 
* Spin-up a local dask-cluster
* Look at the *Dashboard* for the cluster
* Pull the data from the Cloud in parallel (one-worker per file)
* Collect the data to the "host-python-thread" that is running this notebook
* Clean-up data that resides in the workers
* Rescale the cluster
* Concatenate the gathered data along the time-axis to get it ready for parallel proccessing along the time time axis.

### Imports

In [None]:
import io

import datetime
import h5py
import numba

import numpy as np

from scipy import signal
from google.cloud import storage
from dask.distributed import Client, wait
from time import time
from os import cpu_count

### Function Defs for Getting Data from the Cloud

These have been defined for you because the knowledge of how to get these data is beyond the scope of this lab. However, it will probably be of some benefit to understand what these functions are doing.

In [None]:
def gcs_h5_to_buffer(bname,bucket=None):
    """ Pull data from Cloud storage (data is in hdf5 format) """
    client = storage.Client()
    bucket = client.get_bucket(bucket)
    blob = bucket.get_blob(bname)
    buffer = io.BytesIO()
    blob.download_to_file(buffer)
    
    return buffer


def h5_to_array(tbuffer):
    """ Get specific data from hdf5 and store to ndarrays """
    f = h5py.File(tbuffer, 'r')
    data = np.array(f["Acquisition"]["Raw[0]"]["RawData"],dtype=np.float32)
    tata = np.array(f["Acquisition"]["Raw[0]"]["RawDataTime"],dtype=np.dtype('<i8'))
    f.close()
    
    return data,tata


def read_h5_buffer_to_thing(func,bname,bucket=None):
    """ 
        Interface function for getting hdf5 data from
        the cloud and storing it into  different formats
    """
    buffer = gcs_h5_to_buffer(bname,bucket=bucket)
    item = func(buffer)
    del buffer
    return item
    
    
def read_h5_buffer_to_array(bname,bucket=None):
    """ Get ndarrays from the Cloud storage """
    return read_h5_buffer_to_thing(h5_to_array,bname,bucket=bucket)

### Setup List of Files to Read: (a priori Knowledge Req.)

Here we will generate a list of DAS data files that we need to process. Each file stores **one minute's** worth of data, and we will be getting **10** consecutive **minutes** of data for **48,000 channels**. The **channel spacing** is **one meter**. The **time-sampling**, $\mathbf \Delta t$, is **0.005 s**, and the aftershock occures within the 10 minute window that we are retrieving.

Knowing where this data is stored what files to get is beyond the scope of this lab, but the data is coming from the cloud, and at no point will it be stored to disk, which I think is pretty cool.

In [None]:
buckname = 'das-stanford' #kind of like the head/main directory
bdir = 'Stanford-P2kHz-GL20m-FS200Hz-1m-48000_2022-09-29T14_44_55-0700/'


lblob_e0 = ['Stanford-P2kHz-GL20m-FS200Hz-1m-48000_2022-10-25T183756Z.h5', \
            'Stanford-P2kHz-GL20m-FS200Hz-1m-48000_2022-10-25T183856Z.h5', \
            'Stanford-P2kHz-GL20m-FS200Hz-1m-48000_2022-10-25T183956Z.h5', \
            'Stanford-P2kHz-GL20m-FS200Hz-1m-48000_2022-10-25T184056Z.h5', \
            'Stanford-P2kHz-GL20m-FS200Hz-1m-48000_2022-10-25T184156Z.h5', \
            'Stanford-P2kHz-GL20m-FS200Hz-1m-48000_2022-10-25T184256Z.h5', \
            'Stanford-P2kHz-GL20m-FS200Hz-1m-48000_2022-10-25T184356Z.h5', \
            'Stanford-P2kHz-GL20m-FS200Hz-1m-48000_2022-10-25T184456Z.h5', \
            'Stanford-P2kHz-GL20m-FS200Hz-1m-48000_2022-10-25T184556Z.h5', \
            'Stanford-P2kHz-GL20m-FS200Hz-1m-48000_2022-10-25T184656Z.h5']

bloblist = [bdir+fname for fname in lblob_e0]
bucklist = [buckname for fname in lblob_e0]
nfiles = len(bloblist)
print(f'nfiles: {nfiles}')

### Start Dask Distributed Cluster: (10 Threads at Most, One-per-file, but Max of ncores)

In [None]:
ncore = #use a function call. See imports above
nwork = min('?','?') 

client = Client(n_workers=nwork,processes=True,threads_per_worker=1)
#Garbage collection problems if not set ----^   and --------------^

### Show the Dashboard link below

Then click the link. It should open another browser tab. Keep this tab open for the duration of this lab. Feel free to explore the *Dashboard*.  One thing of particular interest is the *CPU* tab in the bottom left quadrant.

In [None]:
#your code


### Show the Memory for Each Worker in the Cluster

**Note of Caution:** when each worker has a "ton" of different memory chunks, this function can crash a notebook.  It is ok to use it for this exercise, but I *strongly* suggest that you don't use it after this exercise unless it has been explicityly requested to do so.

In [None]:
#your code


### Read All Files to Arrays -- Map to Threads : (Memory in Cluster)

Take notice of the output. This information can be useful when testing, debugging, or learning, but otherwise it mostly not useful. Read the comments!

**Watch** the *Dashboard* after executing the cell below

In [None]:
%%time

arrs = client.map('which-function-goes-here?',bloblist,bucket=buckname,pure=False)
# Must set this flag to FALSE ---------------------------------------------^

# wait for the workers to finish above
wait('what goes here?')

### Show the Memory for Each Worker

In [None]:
# your code


### Gather Arrays to Notebook

In [None]:
%%time

gathered_data = client.gather('what-goes-here?',direct=True)

In [None]:
# show where the memory is (your code)


### Clean-up Cluster Memory

Be careful when using cleaning up memory from the workers. The data they store may have a dependency on other data that has yet to be processed. There may also be multiple variables that point to the same data before and after some processing has been applied to the data; therefore, one might accidentaly delete a dependency or unintentionally try to "double-delete/release" the same memory.

In [None]:
%%time

for t in arrs:
    client.cancel('what-goes-here?')
client.cancel(arrs)

In [None]:
# show where the memory is (your code)


### Scale-down Cluster to One Worker

Take a look at the *Dashboard* after you do this, especially the *CPU* tab.

In [None]:
%%time

#your code

### Concatenate Arrays Over Time Axis: (Notebook Thread)

This part has been done for you, but be sure you understand what is happening.

In [None]:
%%time

tup_list = list(map(list, zip(*gathered_data)))
rdlist = tup_list[0]
rtlist = tup_list[1]

In [None]:
%%time

# np.hstack() should also work
rdata = np.concatenate(rdlist,axis=1)
tdata = np.concatenate(rtlist)

# The deletes below are for the "host-python-thread" (this notebook), only, 
# not the workers in the cluster
del rdlist[:]
del rdlist
del rtlist[:]
del rtlist

## Show the Shape of the Concatenated Array

Is it what you expect? if not, you may have to fix something above.

In [None]:
#show the shape of the concatenated array as a Q.C. (your code)


<br>

## Exercise 2

Now that we have collected the data and concatenated it along the time axis, we can preprocess the data so that we can may analyze it. Most of the preprocessign will be done by the workers in our local dask-cluster. However, the last preprocessing step will be done in parallel, using numba, by the "host-python-thread." This last processing step accessed the data at a single-point-in-time but across all channels. Think about why this might be a good idea.

**Tasks for this exercise:**
* Plot the raw-data via the "host-python-thread", so that we can eventually compare with the final preprocessed result.
* Define our functions we will used for preproccing.
* Scale-up the cluster so that there are as many workers as there are cores
* Scatter our concatenated, raw-data, array from the "host-python-thread" to the workers (splitting-up along the channel axis, not the time-axis).
* Run our preprocessing functions for each channel
* Gather the data back to the "host-python-thread" for the last preprocessing step.
* Shutdown the local dask-cluster
* Run the last preprocessing function
* Plot the final results of the preprocessing.

<br>

### Define function for plotting: (host-python-thread only)

**Note:** This function has been defined for you. Furthermore, we will only be plotting a subrange of channels because, given the limited screen space of the notebook, it is somewhat cummbersome to look at all 48k channels at once. See the code lines for *start_c* and *end_c*, below.

In [None]:
def plot_seven_trees_data(data,times,pclip=.95,fig_size=(9,10)):

    import matplotlib.pyplot as plt

    eqdate = datetime.datetime.utcfromtimestamp(times[0]//1000000)
    start_c = 23000
    end_c = 35000 
    bounds = (0,nfiles*60,end_c,start_c)


    vclip = (1-pclip)*np.abs(data[start_c:end_c+1,:]).max()


    plt.figure(figsize=fig_size)
    plt.imshow(data[start_c:end_c+1,:], aspect='auto', interpolation='none', cmap='gray', vmin=-vclip, vmax=vclip, extent=bounds)
    plt.title('DAS for Seven Trees 1st-Aftershock, 3.1 EQ @' + str(eqdate) )
    plt.xlabel('seconds from: ' + str(eqdate.time()))
    plt.ylabel('channel')
    
    return plt

### 2D Plot of the Raw DAS Data

This part has been done for you. It will take a while...

In [None]:
%%time

# We will use a different clip after preprocessing because
# the dynamic range will be so different
pclip = 0. 
print(f'pclip: {pclip}')

plt = plot_seven_trees_data(rdata,tdata,pclip=pclip)
plt.show()

<br>

### Doesn't the plot above look magnificent!?
[//]: <> (as magnificent as Trogdor? https://youtu.be/90X5NJleYJQ)

Run the cell below to clean up the memory related to the plot. It can help with making the notebook run more "smoothly."

In [None]:
# Clean the gunk
plt.close('all')
del plt

### Function Defs for Processing

In [None]:
# Notice that this function will be ran on the "host-python-thread"
@numba.njit(cache=True, fastmath=True, nogil=True, parallel=True)
def remove_median_xchannel(orig_tr):
    """ remove the median cross-channel value for every time sample"""
    rmed_traces = orig_tr.copy()
    for it in numba.prange(orig_tr.shape[-1]):
        rmed_traces[:, it] -= np.median(orig_tr[:, it])
    return rmed_traces


def detrend_single_trace(orig_tr):
    """ 
        Remove the mean and any linear trend in the data along 
        the time-axis. Runs on the workers.
        (operates per channel)
    """
    det_const = signal.detrend(orig_tr,type='constant')
    det_trace = signal.detrend(det_const,type='linear')
    del det_const
    return det_trace


def bandpass_butter_single_trace(trace, fs=None, b0=None, bN=None, order=5):
    """ 
        Band pass the data along the time-axis. Runs on the workers.
        (operates per channel)
    """
    sos = signal.butter(order, (b0,bN), 'bandpass', fs=fs, output='sos')
    bp_trace = signal.sosfiltfilt(sos, trace)
    return bp_trace


def silly_decimate_single_trace(orig_tr,q=2):
    """
        Decimate the data after bandpassing. Runs on the Workers. 
        (operates per channel)
    """
    return orig_tr[::q]

### Scale-up Cluster to All Cores for Data Processing

Look at the *CPU* tab in the *Dashboard* after launching the cell below.

In [None]:
%%time

#your code

### Scatter Concatenated, Raw Data to All Workers (from Notebook to Cluster)

Read the comments!
<br>
**Watch** the *Dashboard* as soon as you execute the cell below.
<br>
**Do NOT** try to display the memory across all the workers. It will likely crash and mangle your notebook.

In [None]:
%%time

future = client.scatter(list('what goes here?'))
junk = wait('what goes here?')

### Detrend Data Per Channel: Multiple Channels Per Worker

The scheduler decides based on how the data was scattered above.
<br>
**Watch** the *Dashboard* as soon as you execute the cell below.

In [None]:
%%time

det_data = client.map('what-function-goes-here?','what-thing-goes-here?',pure=False)
jink = 'what goes here?' 

rdata_dtype = rdata.dtype #save for gathering
del rdata #clean-up Notebook Memory

### Bandpass Filter Per Channel (Nearly the same as for Detrend)

**Watch** the *Dashboard*

In [None]:
%%time

bl = 0.025
br = 5.0
fs = 200

bp_data = client.map('what-function-goes-here?','what-data-goes-here?',fs=fs,b0=bl,bN=br,pure=False)
# note how the parameters to the function are passed --------------------^  ---^  ---^
junk = 'what goes here?'

### Decimate Per Channel

This is slightly faster than if the decimation where done by the "host-python-thread" after gathering.
<br>
**Watch** the *Dashboard*

In [None]:
%%time 

ss = 4 #decimation factor -- same as parameter 'q=' in the function

bp_data = client.map('what-function?','what-data?',q='what-goes-here?',pure=False)
junk = 'what goes here?'

### Gather Preprocessed Data: (From cluster to Notebook)

It's pretty boring to watch the *Dashboard* for this step.

In [None]:
%%time

dec_bp_data = np.asarray(client.gather('what-data?',direct=True),dtype=rdata_dtype)
#                          ^                       #NOTE: ------------^
#                          |
# --- LOOK ----------------  # for some reason this is slightly faster than two lines of code


# NOTE: np.vstack() has ~same RUNTIME as np.asarray()

## Show the Shape of the Processed Array

Is it what you expect? if not, you may have to fix something above. Reminder: you have decimated the array along the time-axis.

In [None]:
#show the shape of the processed array as a Q.C. (your code)


### Release Cluster and Scheduler: (and All Related Resources, i.e. Memory, Cores, etc.)

This part has been done for you.
<br>
Look at the *Dashboard* **after** you run the cell below.

In [None]:
%%time
client.shutdown()
client.close()

### X-Channel Median Removal Per Time-Sample

This part has been done for you, and it happens on the "host-python-thread." It will be quite fast compared to the other processing steps. Ponder as to why this is so much faster, and tell me your thoughts on this in the *markdown* cell that follows the code.

In [None]:
%%time

proc_data = remove_median_xchannel('what-goes-here?')

del dec_bp_data # clean up notebook memory

<br>

### Provide your thoughts on the X-Channel Parallel Speed-up over the Dask-Cluster Processing Steps

"Write here"
<br><br>

### 2D Plot of the Processed DAS Data (Yay! Finally)

This part has been done for you.

In [None]:
%%time

pclip = .99
print(f'pclip: {pclip}')

plt = plot_seven_trees_data(proc_data,tdata[::ss],pclip=pclip)
plt.show()

<br>

### Roughly how many seconds into the data does the aftershock arrive?

"Write here"

<br>