# Example Analysis

In [45]:
import xarray as xr
import pandas as pd
import dask.dataframe as dd
import hvplot.pandas  # noqa
import hvplot.dask  # noqa
import numpy as np

In [46]:
import warnings
warnings.filterwarnings('ignore')

# Build Dask Cluster
1. Use gui interface to create a new cluster with ~10 workers
2. Use < > to insert an "import Client" statement. This is critical because it is how your script knows to use yoru cluster. 
3. Execute client cell
4. Execute your Dask Cell. Note that once you have a cluster running you do not need to re-import the client. 
5. When finished, always shut down your cluster. 

In [47]:
from dask.distributed import Client

client = Client("tcp://10.0.129.222:37813")
client

0,1
Client  Scheduler: tcp://10.0.129.222:37813  Dashboard: /user/daxsoule/proxy/8787/status,Cluster  Workers: 10  Cores: 10  Memory: 12.50 GB


## Read in RS03ECAL-MJ03E-06-BOTPTA302 Data

In [48]:
#!head /home/jovyan/data/botpt/RS03ECAL-MJ03E-06-BOTPTA302/deployment0001_RS03ECAL-MJ03E-06-BOTPTA302-streamed-botpt_nano_sample_20140904T000000-20141128T060000.100000.nc\#fillmisma_resampled.nc

In [49]:
# ds =xr.open_dataset('/home/jovyan/data/botpt/RS03ECAL-MJ03E-06-BOTPTA302/deployment0001_RS03ECAL-MJ03E-06-BOTPTA302-streamed-botpt_nano_sample_20191012T060000-20191014T235959.950000_resampled.nc')
# ds

In [50]:
ds1 = xr.open_mfdataset('/home/jovyan/data/botpt/RS03ECAL-MJ03E-06-BOTPTA302/*_resampled.nc', parallel=True).chunk(10080)
ds1['bottom_pressure_eastern'] = ds1['bottom_pressure']
del ds1['bottom_pressure']
ds1

<xarray.Dataset>
Dimensions:                  (index: 2368081)
Coordinates:
  * index                    (index) datetime64[ns] 2015-02-09T05:59:00 ... 2019-10-14T23:59:00
Data variables:
    bottom_pressure_eastern  (index) float32 dask.array<chunksize=(10080,), meta=np.ndarray>

## Read in Central Caldera

In [51]:
# ds =xr.open_dataset('/home/jovyan/data/botpt/RS03CCAL-MJ03F-05-BOTPTA301/deployment0001_RS03CCAL-MJ03F-05-BOTPTA301-streamed-botpt_nano_sample_20140904T000000-20141128T115959.950000_resampled.nc')
# ds

In [52]:
ds2 = xr.open_mfdataset('/home/jovyan/data/botpt/RS03CCAL-MJ03F-05-BOTPTA301/*_resampled.nc', parallel=True).chunk(10080)
ds2['bottom_pressure_central'] = ds2['bottom_pressure']
del ds2['bottom_pressure']
ds2

<xarray.Dataset>
Dimensions:                  (index: 2606374)
Coordinates:
  * index                    (index) datetime64[ns] 2014-09-04 ... 2019-10-14T23:59:00
Data variables:
    bottom_pressure_central  (index) float32 dask.array<chunksize=(10080,), meta=np.ndarray>

## Create DataFrame

In [53]:
#df = ds1.to_dask_dataframe()
df1 = ds1.to_dataframe()
df2= ds2.to_dataframe()
#df = df.rename(columns={"index": "time"});
# df = df.set_index('time')

In [54]:
df1['timestring']=np.datetime_as_string(df1.index)
df2['timestring']=np.datetime_as_string(df2.index)

In [55]:
df1['timestring']=df1['timestring'].str[:19]
df2['timestring']=df2['timestring'].str[:19]

In [56]:
df_botpt=pd.merge(df1,df2, on = 'timestring')
df_botpt.head()

Unnamed: 0,bottom_pressure_eastern,timestring,bottom_pressure_central
0,2239.624512,2015-02-09T05:59:00,2252.405518
1,2239.624512,2015-02-09T06:00:00,2252.413574
2,2239.625488,2015-02-09T06:01:00,2252.41333
3,2239.632568,2015-02-09T06:02:00,2252.420166
4,2239.635742,2015-02-09T06:03:00,2252.424316


In [59]:
df_botpt= df_botpt.set_index(pd.to_datetime(df_botpt['timestring']))
del df_botpt['timestring']
df_botpt.head()

Unnamed: 0_level_0,bottom_pressure_eastern,bottom_pressure_central
timestring,Unnamed: 1_level_1,Unnamed: 2_level_1
2015-02-09 05:59:00,2239.624512,2252.405518
2015-02-09 06:00:00,2239.624512,2252.413574
2015-02-09 06:01:00,2239.625488,2252.41333
2015-02-09 06:02:00,2239.632568,2252.420166
2015-02-09 06:03:00,2239.635742,2252.424316


## Plot RS03ECAL-MJ03E-06-BOTPTA302 using Dask

In [61]:
df_botpt.hvplot(y='bottom_pressure_eastern', datashade =True, height=200,
                       flip_yaxis=True)

## This throws an warning... why?

In [25]:
ds2.index

<xarray.DataArray 'index' (index: 2606374)>
array(['2014-09-04T00:00:00.000000000', '2014-09-04T00:01:00.000000000',
       '2014-09-04T00:02:00.000000000', ..., '2019-10-14T23:57:00.000000000',
       '2019-10-14T23:58:00.000000000', '2019-10-14T23:59:00.000000000'],
      dtype='datetime64[ns]')
Coordinates:
  * index    (index) datetime64[ns] 2014-09-04 ... 2019-10-14T23:59:00

In [32]:
type(ds1.index[0].values)

numpy.datetime64

merge xarray

In [24]:
xr.combine_by_coords([ds1, ds2])

InvalidIndexError: Reindexing only valid with uniquely valued Index objects

## Create Dask DataFrame for F

In [21]:
dff = ds.to_dask_dataframe()
dff = dff.rename(columns={"index": "time"});
dff = dff.set_index('time')

## Plot RS03CCAL-MJ03F-05-BOTPTA301 using Dask

In [22]:
dff.hvplot(y='bottom_pressure', datashade =True, height=200,
                       flip_yaxis=True)

## Merge E and F

In [26]:
test = dd.merge(df, dff,how='outer', indicator=True, left_index=True, right_index=True, suffixes=('_E', '_F'))

In [27]:
df_botptMerge = test[test['_merge'] == 'both']
del df_botptMerge['_merge']

In [28]:
df_botptMerge.head()

Unnamed: 0_level_0,bottom_pressure_E,bottom_pressure_F
time,Unnamed: 1_level_1,Unnamed: 2_level_1


In [35]:
merge = dd.merge(df, dff, left_index=True, right_index=True)

MergeError: Must pass left_on or left_index=True

distributed.client - ERROR - Failed to reconnect to scheduler after 10.00 seconds, closing client
distributed.utils - ERROR - 
Traceback (most recent call last):
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/utils.py", line 662, in log_errors
    yield
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/client.py", line 1284, in _close
    await gen.with_timeout(timedelta(seconds=2), list(coroutines))
concurrent.futures._base.CancelledError
distributed.utils - ERROR - 
Traceback (most recent call last):
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/utils.py", line 662, in log_errors
    yield
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/client.py", line 1013, in _reconnect
    await self._close()
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/client.py", line 1284, in _close
    await gen.with_timeout(timedelta(seconds=2), list(coroutines))
concurrent.futures

In [33]:
merge.head(10)

Unnamed: 0_level_0,bottom_pressure_x,bottom_pressure_y
time,Unnamed: 1_level_1,Unnamed: 2_level_1


### Read in tide data E

In [None]:
df_grav=dd.read_csv('/home/jovyan/data/bravoseis_data/SADO/jan_2019/gravimetro_bruto.proc/*.proc', 
               parse_dates=['fecha'], date_parser=dateparse, 
                    dtype = {'fecha': object,'status': np.float64,
                                'gravimetria_bruta': np.float64, 'spring_tension': np.float64,
                                'longitud': np.float64, 'latitud': np.float64,
                                'velocidad': np.float64,'rumbo': np.float64 })
#df.partitions[5].compute()
df_grav=df_grav.set_index("fecha")
del df_grav['fecha_telegrama']
del df_grav['rumbo']
del df_grav['velocidad']
del df_grav['spring_tension']
del df_grav['status']
df_grav.head()