# Data Acquisition

Goal: Extract images from DAS raw data

# 0. Setup and Imports

In [1]:
from azure.storage.blob import BlobServiceClient
import zarr
import xarray as xr
from pathlib import Path
import os

import numpy as np
from datetime import datetime
from scipy import signal as sp
from PIL import Image

from dask.distributed import Client, progress

Setup azure blob and container

In [2]:
account_url = "https://dasdata.blob.core.windows.net/"

blob_service_client = BlobServiceClient(account_url)
container_client = blob_service_client.get_container_client("zarr")

container_client.get_container_properties()

{'name': 'zarr', 'last_modified': datetime.datetime(2023, 2, 2, 14, 24, 54, tzinfo=datetime.timezone.utc), 'etag': '"0x8DB052940E27EBB"', 'lease': {'status': 'unlocked', 'state': 'available', 'duration': None}, 'public_access': 'container', 'has_immutability_policy': False, 'deleted': None, 'version': None, 'has_legal_hold': False, 'metadata': {}, 'encryption_scope': <azure.storage.blob._models.ContainerEncryptionScope object at 0x7f3672002770>, 'immutable_storage_with_versioning_enabled': False}

Access South cable data and load it into xarray

In [3]:
store = zarr.ABSStore(client=container_client, prefix='ooi_South_Tx.zarr/ooi_South_Tx.zarr')
root = zarr.group(store=store)  
ds = xr.open_zarr(store)

# 1. Configuring variables to interact with data

Variables that will be useful to process data

In [4]:
# Constant parameters of raw data
fs = 200
channel_spacing = 2.0419

# Channels skip in time and distance axis. 1 means no skip, 2 means skip every other channel, etc.
dt = 2
dx = 5

# Parameters for bandpass filter
fs_new = fs//dt
low = 14
high = 35

# Duration of each image in seconds
time_dur_seconds = 15

Setting up Dask client to observe progress

In [5]:
client = Client()
client

2023-03-06 04:10:37,527 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-worker-space/worker-ejp3rgh0', purging
2023-03-06 04:10:37,527 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-worker-space/worker-efpjntnw', purging
2023-03-06 04:10:37,527 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-worker-space/worker-eun022s2', purging
2023-03-06 04:10:37,528 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-worker-space/worker-vn8cpwcq', purging
2023-03-06 04:10:37,528 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-worker-space/worker-em1fpfcu', purging
2023-03-06 04:10:37,528 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-worker-space/worker-otjd1k8o', purging
2023-03-06 04:10:37,528 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-worker-space/worker-s3mpdkre', purging

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 8
Total threads: 64,Total memory: 251.70 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:32879,Workers: 8
Dashboard: http://127.0.0.1:8787/status,Total threads: 64
Started: Just now,Total memory: 251.70 GiB

0,1
Comm: tcp://127.0.0.1:32823,Total threads: 8
Dashboard: http://127.0.0.1:34103/status,Memory: 31.46 GiB
Nanny: tcp://127.0.0.1:45577,
Local directory: /tmp/dask-worker-space/worker-symjo5y9,Local directory: /tmp/dask-worker-space/worker-symjo5y9

0,1
Comm: tcp://127.0.0.1:40405,Total threads: 8
Dashboard: http://127.0.0.1:39589/status,Memory: 31.46 GiB
Nanny: tcp://127.0.0.1:43201,
Local directory: /tmp/dask-worker-space/worker-uump6zrz,Local directory: /tmp/dask-worker-space/worker-uump6zrz

0,1
Comm: tcp://127.0.0.1:35399,Total threads: 8
Dashboard: http://127.0.0.1:46081/status,Memory: 31.46 GiB
Nanny: tcp://127.0.0.1:40461,
Local directory: /tmp/dask-worker-space/worker-rct7m62u,Local directory: /tmp/dask-worker-space/worker-rct7m62u

0,1
Comm: tcp://127.0.0.1:37511,Total threads: 8
Dashboard: http://127.0.0.1:36395/status,Memory: 31.46 GiB
Nanny: tcp://127.0.0.1:35999,
Local directory: /tmp/dask-worker-space/worker-ckj4ydyr,Local directory: /tmp/dask-worker-space/worker-ckj4ydyr

0,1
Comm: tcp://127.0.0.1:46551,Total threads: 8
Dashboard: http://127.0.0.1:42067/status,Memory: 31.46 GiB
Nanny: tcp://127.0.0.1:45363,
Local directory: /tmp/dask-worker-space/worker-54nbz7r7,Local directory: /tmp/dask-worker-space/worker-54nbz7r7

0,1
Comm: tcp://127.0.0.1:34407,Total threads: 8
Dashboard: http://127.0.0.1:33877/status,Memory: 31.46 GiB
Nanny: tcp://127.0.0.1:34039,
Local directory: /tmp/dask-worker-space/worker-l4rut4h1,Local directory: /tmp/dask-worker-space/worker-l4rut4h1

0,1
Comm: tcp://127.0.0.1:33959,Total threads: 8
Dashboard: http://127.0.0.1:46373/status,Memory: 31.46 GiB
Nanny: tcp://127.0.0.1:39515,
Local directory: /tmp/dask-worker-space/worker-m5tso_8w,Local directory: /tmp/dask-worker-space/worker-m5tso_8w

0,1
Comm: tcp://127.0.0.1:34735,Total threads: 8
Dashboard: http://127.0.0.1:33453/status,Memory: 31.46 GiB
Nanny: tcp://127.0.0.1:45287,
Local directory: /tmp/dask-worker-space/worker-7xbul0h4,Local directory: /tmp/dask-worker-space/worker-7xbul0h4


In [6]:
#Skipping the first 10,000 channels
dist = range(10000,ds.dims["distance"],dx)
time = range(0,ds.dims["time"],dt)

In [7]:
 # Slicing based on previously defined distance range
ds = ds.loc[{"distance": dist, "time": time}]

In [8]:
ds

Unnamed: 0,Array,Chunk
Bytes,18.68 MiB,1.46 kiB
Shape,"(19584000,)","(1500,)"
Count,3 Graph Layers,13056 Chunks
Type,uint8,numpy.ndarray
"Array Chunk Bytes 18.68 MiB 1.46 kiB Shape (19584000,) (1500,) Count 3 Graph Layers 13056 Chunks Type uint8 numpy.ndarray",19584000  1,

Unnamed: 0,Array,Chunk
Bytes,18.68 MiB,1.46 kiB
Shape,"(19584000,)","(1500,)"
Count,3 Graph Layers,13056 Chunks
Type,uint8,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,18.68 MiB,1.46 kiB
Shape,"(19584000,)","(1500,)"
Count,3 Graph Layers,13056 Chunks
Type,uint8,numpy.ndarray
"Array Chunk Bytes 18.68 MiB 1.46 kiB Shape (19584000,) (1500,) Count 3 Graph Layers 13056 Chunks Type uint8 numpy.ndarray",19584000  1,

Unnamed: 0,Array,Chunk
Bytes,18.68 MiB,1.46 kiB
Shape,"(19584000,)","(1500,)"
Count,3 Graph Layers,13056 Chunks
Type,uint8,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,74.71 MiB,5.86 kiB
Shape,"(19584000,)","(1500,)"
Count,3 Graph Layers,13056 Chunks
Type,uint32,numpy.ndarray
"Array Chunk Bytes 74.71 MiB 5.86 kiB Shape (19584000,) (1500,) Count 3 Graph Layers 13056 Chunks Type uint32 numpy.ndarray",19584000  1,

Unnamed: 0,Array,Chunk
Bytes,74.71 MiB,5.86 kiB
Shape,"(19584000,)","(1500,)"
Count,3 Graph Layers,13056 Chunks
Type,uint32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,547.17 GiB,3.43 MiB
Shape,"(7500, 19584000)","(600, 1500)"
Count,4 Graph Layers,169728 Chunks
Type,int32,numpy.ndarray
"Array Chunk Bytes 547.17 GiB 3.43 MiB Shape (7500, 19584000) (600, 1500) Count 4 Graph Layers 169728 Chunks Type int32 numpy.ndarray",19584000  7500,

Unnamed: 0,Array,Chunk
Bytes,547.17 GiB,3.43 MiB
Shape,"(7500, 19584000)","(600, 1500)"
Count,4 Graph Layers,169728 Chunks
Type,int32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,149.41 MiB,11.72 kiB
Shape,"(19584000,)","(1500,)"
Count,3 Graph Layers,13056 Chunks
Type,int64,numpy.ndarray
"Array Chunk Bytes 149.41 MiB 11.72 kiB Shape (19584000,) (1500,) Count 3 Graph Layers 13056 Chunks Type int64 numpy.ndarray",19584000  1,

Unnamed: 0,Array,Chunk
Bytes,149.41 MiB,11.72 kiB
Shape,"(19584000,)","(1500,)"
Count,3 Graph Layers,13056 Chunks
Type,int64,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,149.41 MiB,11.72 kiB
Shape,"(19584000,)","(1500,)"
Count,3 Graph Layers,13056 Chunks
Type,int64,numpy.ndarray
"Array Chunk Bytes 149.41 MiB 11.72 kiB Shape (19584000,) (1500,) Count 3 Graph Layers 13056 Chunks Type int64 numpy.ndarray",19584000  1,

Unnamed: 0,Array,Chunk
Bytes,149.41 MiB,11.72 kiB
Shape,"(19584000,)","(1500,)"
Count,3 Graph Layers,13056 Chunks
Type,int64,numpy.ndarray


Helper functions to process DAS data chunks

In [9]:
def median_subtract(data):
    """
    Subtract median of each timepoint from all traces
    """
    # x =  np.tile(np.median(data,axis=1),(data.shape[1],1)).T
    x = np.median(data,axis=1)
    
    return (data.T-x).T


def bandpass_filter(data, fs, low, high):
    """
    Apply a bandpass filter to the data.
    """
    assert fs > 2*high, "High frequency must be less than half the sampling frequency."

    nyq = fs/2
    low = low/nyq
    high = high/nyq
    
    b, a = sp.butter(8, [low, high], 'bandpass')
    
    return sp.filtfilt(b, a, data, axis=1)

def calc_envelope(data):
    """
    Calculate the envelope of the data and return its log transform.
    """
    
    return 20*np.log10(np.abs(sp.hilbert(data, axis=1))/1e-6)

def get_timestamp(times):
    """
    Convert the time to a timestamp.
    """

    return xr.DataArray([datetime.utcfromtimestamp(time*1e-6) for time in times.values], dims=times.dims, coords=times.coords)

def save_to_image(data, filename):
    """
    Save the data to an image file.
    """

    d = data
    filename = str(filename)
    d = (d - np.min(d))/(np.max(d) - np.min(d))*255 #Normalize
    
    im = Image.fromarray(np.uint8(d.T))
    im.resize((256, 256)).save(filename+".png")
    im.close()

    return data


In [10]:
# Assigning timestamp to coordinates for easy querying of data
times = ds.RawDataTime.map_blocks(get_timestamp)
ds = ds.assign_coords({"time": times.compute()})

distances = [channel_spacing*i for i in dist]
ds = ds.assign_coords({"distance": distances})

# 2. Setting up list of functions to execute and running them

In [11]:
# Setting up computation graph for Dask. Operations are performed in the order they are defined.
rawData = ds.RawData
out = xr.apply_ufunc(median_subtract, rawData, dask="parallelized", output_dtypes=[rawData.dtype])
out = xr.apply_ufunc(bandpass_filter, out, fs_new, low, high, dask="parallelized", output_dtypes=[out.dtype])
out = xr.apply_ufunc(calc_envelope, out, dask="parallelized", output_dtypes=[out.dtype])
out = out.chunk({"time": fs_new*time_dur_seconds, "distance": out.shape[0]//10})
#Slicing and processing only 24hr of data
out = out.sel(time=slice("2021-11-01T23:11:14.834000000", "2021-11-04T05:35:14.824000000"))

In [12]:
file_times = out.coords["time"].data
file_times = file_times[::fs_new*time_dur_seconds][:-1]

file_distances = out.coords["distance"].data
file_distances = file_distances[::out.shape[0]//10]

filenames = np.asarray([(t,d) for t in file_times for d in file_distances])

In [13]:
time_chunks = out.coords["time"].data
# Processing 15-minute time chunks at a time
time_chunks = time_chunks[::fs_new*60*15]

In [None]:
import dask.array as da
import dask
import dask.bag as db

# If the processing gets stuck (which sometimes seems to happen on dask for no reason), look for the output to see which time chunk it is stuck on and set continue_point to that value-1
continue_point = 0

# Setting up the folder structure to save the images
root_folder = f"south_dist_chunked_{dist[0]}to{dist[-1]}_dx{dx}_fs{fs_new}_bpf{low}to{high}Hz"
Path(root_folder).mkdir(parents=True, exist_ok=True)

# We process 15 minutes of data at a time to not overload the memory
for i,time in enumerate(time_chunks[continue_point:]):
    print(f"Processing {i+1+continue_point} of {len(time_chunks)}")
    
    # Getting the filenames for the images to be saved in this slice of data
    start = np.where(time == filenames[:,0].astype("datetime64[ms]"))[0][0]

    # Slicing the data to 15 minutes or till the end of the data if it is the last slice
    if i == len(time_chunks[continue_point:])-1:
        dat = out.sel(time=slice(time, out.coords["time"].data[-1]))
        files = filenames[start:,:]
    else:
        dat = out.sel(time=slice(time, time+np.timedelta64(15, 'm')))
        end = np.where(time+np.timedelta64(15, 'm') == filenames[:,0].astype("datetime64[ms]"))[0][0]
        files = filenames[start:end, :]
    
    # Converting to dask objects to parallelize the computation
    dat = dat.data.to_delayed().T.flatten()
    
    # Saving the images
    save_imgs = [dask.delayed(save_to_image)(d, root_folder+'/'+str(tm)+'_start'+str(dst)) for d, (tm, dst) in zip(dat, files)]
    bag = db.from_delayed(save_imgs)
    bag.compute()
    
    # Clearing the memory
    del bag
    del save_imgs
    del dat