# Seafloor Bacterial Floc Blob Analysis

This notebook shows an example of doing an analysis of water column "floc" using Pangeo and the regions data from Aaron's [CamHD_motion_metadata](https://github.com/CamHD-Analysis/CamHD_motion_metadata) repo. In this version we use information from blob size and number using connencted components analysis. The goal of this work is to understand changes in the concentration of floc, which is bacterial material that has been flushed from the hydrothermal system into the ocean. Changes in floc are a potential indicator of changes in the hydrothermal system, possibly resulting from a magmatic event or seismic swarm.

#### Start a Dask cluster
We do this first because it can take a while for the Kubernetes cluster to scale up to accomodate workers.

In [None]:
from dask_kubernetes import KubeCluster
cluster = KubeCluster(n_workers=15)
cluster

#### Get a list of CamHD files to process

In [None]:
import json

In [None]:
url = []
filename = []
scene_tag = []
deployment = []
start_frame = []
end_frame = []

with open('region_list.txt') as f:
    for line in f:
        with open(line.strip()) as l:
            data = json.load(l)
            for region in data['regions']:
                if region['type'] == 'static':
                    try:
                        if '_p2_z0' in region['sceneTag']:
                            url.append('https://rawdata.oceanobservatories.org/files' + data['movie']['URL'])
                            filename.append((data['movie']['URL'].split('/')[-1]))
                            scene_tag.append(region['sceneTag'])
                            deployment.append(int(scene_tag[-1].split('_')[0][1]))                            
                            start_frame.append(int(region['startFrame']))
                            end_frame.append(int(region['endFrame']))
                    except:
                        continue

In [None]:
import pandas as pd

In [None]:
scene_windows = pd.DataFrame({'filename': filename, 'url': url, 'scene_tag': scene_tag, 'deployment': deployment, 'start_frame': start_frame, 'end_frame': end_frame})
scene_windows.tail()

#### Create list of frames to process for each scene window

In [None]:
frame_interval = 10
window_buffer = 10
frame_lists = []
for i, start_frame in enumerate(scene_windows.start_frame):
    frame_lists.append(list(range(start_frame+window_buffer, scene_windows.end_frame[i]-window_buffer, frame_interval)))
scene_windows['frame_list'] = frame_lists
scene_windows.head()

#### Wrappers to deal with potential get_moov_atom and get_frame timeouts

In [None]:
import pycamhd as camhd
import numpy as np

In [None]:
def get_moov_atom_timeout(filename):
    try:
        return camhd.get_moov_atom(filename)
    except:
        return False

In [None]:
def get_frame_timeout(filename, frame_number, pix_fmt, moov_atom):
    if moov_atom:
        try:
            return camhd.get_frame(filename, frame_number, pix_fmt, moov_atom)
        except:
            return np.zeros((1080, 1920), dtype=np.uint16)
    else:
        return np.zeros((1080, 1920), dtype=np.uint16)

#### Set up a Dask array of delayed images
In this notebook we don't actually use this Dask array of delayed objects for the analysis. Below we refactor into using a list of delayed functions which I think works better here. Or at least it is easier.

In [None]:
from dask import delayed
import dask.array as dsa

In [None]:
delayed_frame_list = []
for i, row in scene_windows[scene_windows.deployment == 4].iterrows():
    filename = row.url
    delayed_moov_atom = delayed(get_moov_atom_timeout)(filename)
    for frame_number in row.frame_list:
        delayed_frame = delayed(get_frame_timeout)(filename, frame_number, 'gray16le', delayed_moov_atom)
        delayed_frame_list.append(dsa.from_delayed(delayed_frame, (1080, 1920), np.uint16))
delayed_frame_array = dsa.stack(delayed_frame_list)
delayed_frame_array

A dask array is in many ways like a numpy array, except in this case it holds a set of instructions for how to acquire each chunk of the array, which makes it easy to farm this array out to workers in the cloud using the [distributed](http://distributed.readthedocs.io/en/latest/#) scheduler.

#### Show one of the images

In [None]:
frame = delayed_frame_array[0].compute()
frame.shape

In [None]:
%matplotlib inline
%config InlineBackend.figure_format = 'svg'
import matplotlib.pyplot as plt
import matplotlib.patches as patches
plt.rc('figure', figsize=(11, 11))
fig, ax = plt.subplots()
im1 = ax.imshow(frame)
im1.set_cmap('gray')
plt.yticks(np.arange(0,1081,270))
plt.xticks(np.arange(0,1921,480))
rect = patches.Rectangle((10,10),1024,1024,linewidth=1.5,edgecolor='w',facecolor='none')
ax.add_patch(rect)
plt.show();

#### Show the filter that will be used to filter images in the frequency domain
To deal with variations in lighting and high-frequency noise, we filter each subimage using a Butterworth bandpass filter.

In [None]:
def butterworth(d1, d2, n):
    x = np.arange(-1024/2+0.5,1024/2+1-0.5)
    xx, yy = np.meshgrid(x, x)
    d = np.sqrt(xx**2+yy**2)
    bff = (1 - (1./(1 + (d/d1)**(2*n))))*(1/(1 + (d/d2)**(2*n)))
    return bff

In [None]:
d1 = 20 # low cut wavenumber
d2 = 400 # high cut wavenumber
n = 4
bff = butterworth(d1, d2, n)
plt.rc('figure', figsize=(6, 6))
imgplot = plt.imshow(bff, cmap='gray')

#### Setup filtering and thresholding functions

In [None]:
def frame_filter(frame, d1, d2, n):
    if frame.ndim == 3 and frame.shape[0] == 1:
        I = np.squeeze(frame[0, 0:1024, 0:1024])
    else:
        I = frame[0:1024, 0:1024]
    bff = butterworth(d1, d2, n)
    I_fft = np.fft.fft2(I)
    I_fft_shift = np.fft.fftshift(I_fft)
    I_fft_shift_filt = I_fft_shift*bff # filter with the Butterworth filter
    I_fft_filt = np.fft.ifftshift(I_fft_shift_filt)
    I_filt = np.fft.ifft2(I_fft_filt)
    return I_filt

In [None]:
threshold = 4000; # this is an arbitrary threshold that seems to work
def frame_thresh(frame, d1, d2, n, threshold):
    I_filt = frame_filter(frame, d1, d2, n)
    I_thresh = np.array(np.absolute(I_filt)>threshold)
    return I_thresh

#### Show example of a thresholded subimage

In [None]:
I_thresh = frame_thresh(frame, d1, d2, n, threshold)

In [None]:
plt.rc('figure', figsize=(6, 6))
imgplot = plt.imshow(I_thresh, cmap='gray')
plt.title('floc_proxy value = %i' % I_thresh.sum());

#### Show an example of image labeling using a subimage

In [None]:
from skimage.measure import label

In [None]:
I_thresh_sub = I_thresh[735:836, 595:696] # arbitrary area with a few floc particles
plt.rc('figure', figsize=(6, 6))
imgplot = plt.imshow(I_thresh_sub, cmap='gray')
plt.title('Subimage of Thresholded Image to Label');

In [None]:
import matplotlib as mpl
cmap = mpl.colors.ListedColormap(['white', 'blue', 'red', 'green', 'black'])
I_labeled = label(I_thresh_sub, connectivity=2)
plt.rc('figure', figsize=(6, 6))
imgplot = plt.imshow(I_labeled, cmap=cmap)
plt.title('Labeled Image');

This plot shows us that skimage.measure.label returns an Numpy array with the same dimensions as the input, with each area labeled with an different integer. The background is all zero, blue=1, red=2, green=3, black=4. This gives us a very easy way to count the number of labeled areas, and determine the size of each.

In [None]:
print('Pixels per Labeled Area\nblue: %i\nred: %i\ngreen: %i\nblack: %i' % 
      ((I_labeled==1).sum(), (I_labeled==2).sum(), 
       (I_labeled==3).sum(),(I_labeled==4).sum()))

#### Setup labeling stats function

In [None]:
# this function takes a thresholded binary image as input
def frame_label_stats(I_thresh):
    I_labeled = label(I_thresh)
    label_stats = []
    for i in range(1, I_labeled.max()+1):
        label_stats.append((I_labeled==i).sum())
    return label_stats

#### Assemble a list of Dask delayed functions using our labeling function
Here we abandon the delayed Dask array because I am still not sure of what it buys us. It might be worth returning to, but we will use a simple list of nested delayed objects for now.

In [None]:
delayed_label_stats = []
for i, row in scene_windows[scene_windows.deployment == 4].iterrows():
    filename = row.url
    delayed_moov_atom = delayed(get_moov_atom_timeout)(filename)
    for frame_number in row.frame_list:
        delayed_frame = delayed(get_frame_timeout)(filename, frame_number, 'gray16le', delayed_moov_atom)
        delayed_frame_thresh = delayed(frame_thresh)(delayed_frame, d1, d2, n, threshold)
        delayed_label_stats.append(delayed(frame_label_stats)(delayed_frame_thresh))
delayed_label_stats[0]

#### Attach the distributed client to the cluster
Make sure all of the workers are ready to go before attaching and running compute.

In [None]:
from dask.distributed import Client
client = Client(cluster)
client

#### Calculate label stats for each frame

In [None]:
from dask import compute

In [None]:
%%time
label_stats = compute(*delayed_label_stats)

#### Count floc particles for each image

In [None]:
nflocs = [len(i) for i in label_stats]

In [None]:
len(nflocs)

#### Get a timestamp for each frame

In [None]:
import datetime, math
import matplotlib.dates as dates

# new code warning! datetime index is what we are doing!!

In [None]:
url = []
frame_datenum = []
frame_datetime = []
# nflocs = [len(i) for i in label_stats]
frame_numbers = []

dbcamhd = pd.read_json('/home/jovyan/rte-camhd/tim/dbcamhd.json', orient="records", lines=True)
for i, row in scene_windows[scene_windows.deployment == 4].iterrows():
    for frame_number in row.frame_list:
        url.append(row.url)
        frame_epoch_seconds = dbcamhd['timestamp'][dbcamhd.filename == row.url].iloc[0] + frame_number/29.97
        frame_datetime.append(datetime.datetime.fromtimestamp(frame_epoch_seconds))
        frame_datenum.append(dates.date2num(frame_datetime[-1]))
        frame_numbers.append(frame_number)

In [None]:
dbcamhd = pd.read_json('/home/jovyan/rte-camhd/tim/dbcamhd.json', orient="records", lines=True)

In [None]:
dbcamhd.tail()

In [None]:
regions_results.set_index('datetime', inplace=True)

In [None]:
test.set_index('Time', inplace =True)
#DATAFRAME WITH RESULTS AND A DIFF df WITH DATETIME INDEX, MERGE AND CONVERT TO WHAT i NEED

In [None]:
regions_results.tail()

In [None]:
nflocs = [len(i) for i in label_stats]

In [None]:
regions_results = pd.DataFrame({'url': url, 'frame_number': frame_numbers, 'timestamp': frame_datenum,
                        'datetime': frame_datetime, 'nflocs': nflocs, 'label_stats': label_stats})
# results.head()

In [None]:
regions_results.to_json('regions_results_for_Dep4.json', orient="records", lines=True)

In [None]:
# dbcamhd = pd.read_json('/home/jovyan/rte-camhd/tim/dbcamhd.json', orient="records", lines=True)
# frame_timestamp = []
# for i, row in scene_windows[scene_windows.deployment == 5].iterrows():
# #     filename = row.url
#     for frame_number in row.frame_list:
#         url.append(row.url)
#         frame_epoch_seconds = dbcamhd['timestamp'][dbcamhd.filename == row.url].iloc[0] + frame_number/29.97
#         frame_datetime.append(datetime.datetime.fromtimestamp(frame_epoch_seconds))
#         frame_num.append(dates.date2num(frame_datetime[-1]))
#         frame_numbers.append(frame_number)
# #         dt = datetime.datetime.fromtimestamp(timestamp)
# #         frame_timestamp.append(dates.date2num(dt))
            

In [None]:
# np.shape(nflocs) 

In [None]:
# test = pd.DataFrame(['A': [1], 'time'; pd.to_datetime(datetime.datetime.frometimestamp())])

In [None]:
# plot_timestamps_floc_for_Dep6_Region = pd.DataFrame(
#     {'timestamps': frame_timestamp, 'floc_volume': nflocs})

In [None]:
# plot_timestamps_floc_for_Dep6_Region

#### Plot a two-dimensional multivariate histogram of the results

In [None]:
plt.rc('font', size=11)
fig, ax = plt.subplots()
fig.set_size_inches(18, 6)
fig.frameon = False
hb1 = ax.hexbin(frame_timestamp, nflocs, vmin=0, vmax=1.75, bins='log', linewidths=0.1,
  gridsize=(80,100), mincnt=1, cmap=plt.cm.BuPu)
fig.colorbar(hb1)
ax.set_ylim([0, 1000])
ax.set_xlim([frame_timestamp[0],frame_timestamp[-1]])
ax.yaxis.grid(True)
ax.xaxis.grid(True)
months = dates.DayLocator()  # every other day
monthsFmt = dates.DateFormatter('%d %m')
ax.xaxis.set_major_locator(months)
ax.xaxis.set_major_formatter(monthsFmt)
plt.ylabel('Number of Floc Particles');
plt.savefig('Number_of_Floc_Particles.png')

#### Plot a two-dimensional multivariate histogram of the results using hvplot


In [None]:
import hvplot.pandas;

In [None]:
results.hvplot.hexbin(x='timestamp', y='nflocs', width=800, height=500)

In [None]:
regions_results.hvplot.scatter(x='datetime', y='nflocs', datashade=True, width=720, height=500)

### References

 - [Pangeo](http://pangeo-data.org/)
 - [PyCamHD](https://github.com/tjcrone/pycamhd)
 - [CamHD Raw Data Archive](https://rawdata.oceanobservatories.org/files/RS03ASHS/PN03B/06-CAMHDA301)
 - [AGU Abstract](https://agu.confex.com/agu/fm16/meetingapp.cgi/Paper/192670)
 - [AGU Poster](https://drive.google.com/open?id=0B-dWW4GM434obGpTM0FZME10Nkk)
 - [Dask](http://dask.pydata.org/en/latest/)