# Exploring moorings timeseries data in Parquet format with Dask

In [1]:
import s3fs
s3 = s3fs.S3FileSystem(anon=False)
s3_path = 's3://imos-data-lab-optimised/parquet/timeseries/'
s3.glob(s3_path)

['imos-data-lab-optimised/parquet/timeseries/site_code=PH100']

## Using single-machine scheduler

In [2]:
import dask
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
dask.config.set(scheduler='threads')

<dask.config.set at 0x7f372194cc40>

In [31]:
%%time
df = dd.read_parquet('s3://imos-data-lab-optimised/parquet/timeseries/site_code=PH100/deployment_code=PH100-2112/',
                     columns=['nominal_depth', 'temp', 'temp_quality_control', 'depth', 'depth_quality_control'],
                     index='time'
                     )
df

CPU times: user 232 ms, sys: 8.13 ms, total: 240 ms
Wall time: 828 ms


Unnamed: 0_level_0,nominal_depth,temp,temp_quality_control,depth,depth_quality_control
npartitions=15,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
,float32,float32,float32,float32,float32
,...,...,...,...,...
...,...,...,...,...,...
,...,...,...,...,...
,...,...,...,...,...


In [32]:
%%time
print(len(df), df.npartitions)

436153 15
CPU times: user 287 ms, sys: 10.2 ms, total: 297 ms
Wall time: 457 ms


In [33]:
%%time
df.head(100_000, npartitions=-1)

CPU times: user 380 ms, sys: 41.3 ms, total: 421 ms
Wall time: 683 ms


Unnamed: 0_level_0,nominal_depth,temp,temp_quality_control,depth,depth_quality_control
time,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2021-12-14 19:00:01.000001280,16.0,17.043800,4.0,0.138000,4.0
2021-12-14 19:05:01.000003584,16.0,16.990700,4.0,0.140000,4.0
2021-12-14 19:10:01.000005632,16.0,16.969101,4.0,0.141000,4.0
2021-12-14 19:15:00.999997952,16.0,16.978201,4.0,0.143000,4.0
2021-12-14 19:20:01.000000256,16.0,17.001699,4.0,0.142000,4.0
...,...,...,...,...,...
2022-01-20 11:55:00.000007936,32.0,22.022560,1.0,32.999931,1.0
2022-01-20 12:00:00.000000000,32.0,21.992298,1.0,32.992203,1.0
2022-01-20 12:05:00.000002304,32.0,21.975014,1.0,32.989025,1.0
2022-01-20 12:10:00.000004352,32.0,21.987978,1.0,32.970432,1.0


In [38]:
%%time
df.loc['2022-01-01'].compute()

CPU times: user 603 ms, sys: 63.6 ms, total: 666 ms
Wall time: 1.32 s


Unnamed: 0_level_0,nominal_depth,temp,temp_quality_control,depth,depth_quality_control
time,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2022-01-01 00:00:01.000004608,16.0,21.090900,1.0,17.671000,1.0
2022-01-01 00:05:00.999996928,16.0,20.712200,1.0,17.754999,1.0
2022-01-01 00:10:00.999998976,16.0,20.772800,1.0,17.629000,1.0
2022-01-01 00:15:01.000001280,16.0,21.026600,1.0,17.704000,1.0
2022-01-01 00:20:01.000003584,16.0,20.889200,1.0,17.480000,1.0
...,...,...,...,...,...
2022-01-01 23:34:59.999998976,88.0,14.302550,1.0,88.392899,1.0
2022-01-01 23:40:00.000001024,88.0,14.348369,1.0,88.372681,1.0
2022-01-01 23:45:00.000003328,88.0,14.459716,1.0,88.341156,1.0
2022-01-01 23:50:00.000005632,88.0,14.330039,1.0,88.302521,1.0


In [40]:
%%time
df.loc['2022-01'].compute()

CPU times: user 299 ms, sys: 42 ms, total: 341 ms
Wall time: 445 ms


Unnamed: 0_level_0,nominal_depth,temp,temp_quality_control,depth,depth_quality_control
time,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2022-01-01 00:00:01.000004608,16.0,21.090900,1.0,17.671000,1.0
2022-01-01 00:05:00.999996928,16.0,20.712200,1.0,17.754999,1.0
2022-01-01 00:10:00.999998976,16.0,20.772800,1.0,17.629000,1.0
2022-01-01 00:15:01.000001280,16.0,21.026600,1.0,17.704000,1.0
2022-01-01 00:20:01.000003584,16.0,20.889200,1.0,17.480000,1.0
...,...,...,...,...,...
2022-01-31 23:34:59.999998976,88.0,12.603698,1.0,88.541756,1.0
2022-01-31 23:40:00.000001024,88.0,12.601111,1.0,88.523026,1.0
2022-01-31 23:45:00.000003328,88.0,12.579122,1.0,88.496277,1.0
2022-01-31 23:50:00.000005632,88.0,12.575242,1.0,88.487083,1.0


In [26]:
subset.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 0 entries
Data columns (total 6 columns):
 #   Column                 Non-Null Count  Dtype         
---  ------                 --------------  -----         
 0   nominal_depth          0 non-null      float32       
 1   time                   0 non-null      datetime64[ns]
 2   temp                   0 non-null      float32       
 3   temp_quality_control   0 non-null      float32       
 4   depth                  0 non-null      float32       
 5   depth_quality_control  0 non-null      float32       
dtypes: datetime64[ns](1), float32(5)
memory usage: 124.0 bytes


## Using distributed scheduler

In [2]:
from dask.distributed import Client

client = Client("tcp://127.0.0.1:34859")

In [3]:
%%time
import dask.dataframe as dd

df = dd.read_parquet('s3://imos-data-lab-optimised/parquet/timeseries/site_code=PH100/',
                     columns=['nominal_depth', 'time', 'temp', 'temp_quality_control', 'depth', 'depth_quality_control']
                    )
df

CPU times: user 637 ms, sys: 35.2 ms, total: 672 ms
Wall time: 3.01 s


Unnamed: 0_level_0,nominal_depth,time,temp,temp_quality_control,depth,depth_quality_control
npartitions=832,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
,float32,datetime64[ns],float32,float32,float32,float32
,...,...,...,...,...,...
...,...,...,...,...,...,...
,...,...,...,...,...,...
,...,...,...,...,...,...


In [4]:
df.info()

<class 'dask.dataframe.core.DataFrame'>
Columns: 6 entries, nominal_depth to depth_quality_control
dtypes: datetime64[ns](1), float32(5)

In [5]:
df.head()

Unnamed: 0,nominal_depth,time,temp,temp_quality_control,depth,depth_quality_control
0,32.0,2009-10-28 13:00:00.000006656,18.740826,4.0,0.226669,4.0
1,32.0,2009-10-28 13:04:59.999998976,18.731161,4.0,0.232219,4.0
2,32.0,2009-10-28 13:10:00.000001024,18.720119,4.0,0.232219,4.0
3,32.0,2009-10-28 13:15:00.000003328,18.71184,4.0,0.232219,4.0
4,32.0,2009-10-28 13:20:00.000005632,18.704939,4.0,0.226669,4.0


In [12]:
%%time
import numpy as np
good_rows = np.logical_and(df.temp_quality_control==1,
                           df.depth_quality_control==1)
df_temp = df.loc[good_rows, ['nominal_depth', 'time', 'temp', 'depth']] 
df_temp

CPU times: user 3.45 ms, sys: 4.23 ms, total: 7.68 ms
Wall time: 6.93 ms


Unnamed: 0_level_0,nominal_depth,time,temp,depth
npartitions=832,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
,float32,datetime64[ns],float32,float32
,...,...,...,...
...,...,...,...,...
,...,...,...,...
,...,...,...,...


In [13]:
df_temp.head()

Unnamed: 0,nominal_depth,time,temp,depth
169,32.0,2009-10-29 03:05:00.000002304,18.892847,33.270786
170,32.0,2009-10-29 03:10:00.000004352,18.90115,33.320877
171,32.0,2009-10-29 03:15:00.000006656,18.906685,33.343143
172,32.0,2009-10-29 03:19:59.999998976,18.876247,33.343143
173,32.0,2009-10-29 03:25:00.000001024,18.858269,33.320877


In [9]:
hourly_temp = df_temp.groupby('nominal_depth', group_keys=False) #.resample('1H').mean()
hourly_temp

<dask.dataframe.groupby.DataFrameGroupBy at 0x7f7e23c74850>

In [14]:
df_temp.info()

<class 'dask.dataframe.core.DataFrame'>
Columns: 4 entries, nominal_depth to depth
dtypes: datetime64[ns](1), float32(3)

In [35]:
%%time
def hourly_mean(df):
    return df.resample('1H', on='time').mean()

hourly_temp = df_temp.map_partitions(hourly_mean,
                                     meta=df_temp.loc[0, ['nominal_depth', 'temp', 'depth']]
                                    )
hourly_temp

CPU times: user 6.24 ms, sys: 213 µs, total: 6.46 ms
Wall time: 6.21 ms


Unnamed: 0_level_0,nominal_depth,temp,depth
npartitions=832,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
,float32,float32,float32
,...,...,...
...,...,...,...
,...,...,...
,...,...,...


In [41]:
hourly_temp.compute()

KeyError: 'depth'

In [38]:
import hvplot
import hvplot.dask

In [39]:
%%time
hourly_temp.hvplot.scatter(x='time', y='depth', c='temp', cmap='plasma', flip_yaxis=True)

DataError: Supplied data does not contain specified dimensions, the following dimensions were not found: ['time']

DaskInterface expects tabular data, for more information on supported datatypes see http://holoviews.org/user_guide/Tabular_Datasets.html