# DC1 Data Access with Dask

This notebook serves as an introduction to data access with [Dask](https://dask.pydata.org/en/latest/), a flexible parallel computing library for analytic computing. In order to run this notebook, follow the instructions for setting up [Jupyter-dev](https://github.com/LSSTDESC/Monitor/blob/master/doc/jupyter-dev.md). We provide a modern (as of 2017-12-04) Python3 version of [`lsst_kernel.sh`](https://github.com/LSSTDESC/SSim_DC1/blob/master/scripts/lsst-kernel.sh).

In addition, to the generic LSST stack setup, you may need to install several additional python packages. This can be accomplished with `pip` either from a separate shell or inside this notebook by prefixing the commands with `!`:
```
pip install --user dask
pip install --user toolz
pip install --user cloudpickle
pip install --user tables
```

Load the required modules, including `dask`.

In [1]:
%pylab inline

import os
import numpy as np
import pandas as pd
import dask.dataframe as dd

print(np.__version__, pd.__version__)

Populating the interactive namespace from numpy and matplotlib
1.13.1 0.20.3


Read the 15 Gbyte dataframe located in a HDF5 data file into a dask dataframe.  This will allow us to do out of memory operations.

In [2]:
file_path = '/global/projecta/projectdirs/lsst/groups/SSim/DC1'
df = dd.read_hdf(os.path.join(file_path,'coadd-DC1-imsim-dithered.hdf'), key='*')

In [3]:
df.info()

<class 'dask.dataframe.core.DataFrame'>
Columns: 298 entries, id to psfMag
dtypes: bool(136), float32(8), float64(147), int32(5), int64(2)

Now do a dask operation and put the results (which will fit in memory) into a normal pandas data frame.  This selects a patch and only some columns for the result.

In [4]:
# The columns we actually want to use
requested_columns = ['base_PsfFlux_flux','base_PsfFlux_fluxSigma']

This is all of the entries, no selection and some of the columns.

In [5]:
%%time
selected_columns = df[requested_columns].compute()
selected_columns.shape

CPU times: user 27 s, sys: 13.5 s, total: 40.5 s
Wall time: 1min 2s


This applies a selection and only returns the requested columns (in this case, objects that are detected with SN >= 5)

In [6]:
%%time
selected_flux = df.query("base_PsfFlux_flux / base_PsfFlux_fluxSigma >= 5")[requested_columns].compute()
selected_flux.shape

CPU times: user 32.7 s, sys: 15.8 s, total: 48.5 s
Wall time: 1min 4s


This selects the first 100 entries with all of the columns.

In [7]:
%%time
first_100 = df.head(100)
first_100.shape

CPU times: user 209 ms, sys: 78.8 ms, total: 288 ms
Wall time: 402 ms


In [8]:
first_100.head(5)

Unnamed: 0,id,coord_ra,coord_dec,parent,flags_negative,merge_footprint_i,merge_footprint_r,merge_footprint_z,merge_footprint_g,merge_footprint_y,...,base_GaussianFlux_apCorr,base_GaussianFlux_apCorrSigma,base_GaussianFlux_flag_apCorr,modelfit_CModel_dev_apCorr,modelfit_CModel_dev_apCorrSigma,modelfit_CModel_dev_flag_apCorr,base_ClassificationExtendedness_value,base_ClassificationExtendedness_flag,cmodelMag,psfMag
0,87971667640321,1.682373,-0.53545,0,False,False,True,False,False,False,...,1.045746,0.0,False,0.985232,0.0,False,,True,,24.071767
1,87971667640322,1.682283,-0.535452,0,False,False,True,False,False,False,...,1.045243,0.0,False,0.98498,0.0,False,,True,,26.157276
2,87971667640323,1.682122,-0.535456,0,False,False,True,False,False,False,...,1.045074,0.0,False,0.984928,0.0,False,,True,,23.902366
3,87971667640324,1.681998,-0.535457,0,False,False,True,False,False,False,...,1.045593,0.0,False,0.985221,0.0,False,,True,,26.263831
4,87971667640325,1.681962,-0.535433,0,False,False,True,False,False,False,...,1.045501,0.0,False,0.985194,0.0,False,1.0,False,22.666801,23.098674


Lets look at the size of the resulting data frames.

In [9]:
selected_columns.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 10650992 entries, 0 to 76110
Data columns (total 2 columns):
base_PsfFlux_flux         float64
base_PsfFlux_fluxSigma    float64
dtypes: float64(2)
memory usage: 243.8 MB


In [10]:
selected_flux.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 10502668 entries, 0 to 76110
Data columns (total 2 columns):
base_PsfFlux_flux         float64
base_PsfFlux_fluxSigma    float64
dtypes: float64(2)
memory usage: 240.4 MB


In [11]:
first_100.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 100 entries, 0 to 99
Columns: 298 entries, id to psfMag
dtypes: bool(136), float32(8), float64(147), int32(5), int64(2)
memory usage: 135.5 KB
