## Query Parallelization with Dask

#### Import Libraries & Setup Engines

In [1]:
import numpy as np
import pandas as pd
import dask.dataframe as dd

sql_uri = 'mssql+pyodbc://@localhost'

#### Read the table using Dask
Dask will automatically parallelize the query based on your execution plan

In [2]:
table = 'STAR_CLASSIFICATION'

In [3]:
ddf = dd.read_sql_table(table, sql_uri, index_col='obj_ID')
%time ddf = ddf.compute()
ddf.head()

CPU times: total: 1.5 s
Wall time: 1.5 s


Unnamed: 0_level_0,alpha,delta,u,g,r,i,z,run_ID,rerun_ID,cam_col,field_ID,spec_obj_ID,class,redshift,plate,MJD,fiber_ID
obj_ID,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1
1.237661e+18,135.689107,32.494632,23.87882,22.2753,20.39501,19.16573,18.79371,3606,301,2,79,6.543777e+18,GALAXY,0.634794,5812,56354,171
1.237665e+18,144.826101,31.274185,24.77759,22.83188,22.58444,21.16812,21.61427,4518,301,5,119,1.176014e+19,GALAXY,0.779136,10445,58158,427
1.237661e+18,142.18879,35.582444,25.26307,22.66389,20.60976,19.34857,18.94827,3606,301,2,120,5.1522e+18,GALAXY,0.644195,4576,55592,299
1.237663e+18,338.741038,-0.402828,22.13682,23.77656,21.61162,20.50454,19.2501,4192,301,3,214,1.030107e+19,GALAXY,0.932346,9149,58039,775
1.23768e+18,345.282593,21.183866,19.43718,17.58028,16.49747,15.97711,15.54461,8102,301,3,137,6.891865e+18,GALAXY,0.116123,6121,56187,842


#### Using Pandas syntax in a Dask execution plan
We can do things like filter and groupby all before pulling the data into memory (lazily evaluated) 

In [4]:
ddf1 = dd.read_sql_table(table, sql_uri, index_col='obj_ID')

ddf1 = ddf1[(ddf1['delta'] >= 0) & (ddf1['redshift'] < 0.9)]
ddf1 = ddf1.groupby('cam_col').sum()

%time ddf1 = ddf1.compute()
ddf1.head(10)

CPU times: total: 1.5 s
Wall time: 1.52 s


Unnamed: 0_level_0,alpha,delta,u,g,r,i,z,run_ID,rerun_ID,field_ID,spec_obj_ID,redshift,plate,MJD,fiber_ID
cam_col,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1
1,1661899.0,254803.658105,202440.24983,187602.76629,177199.95032,171479.30427,168393.34941,42598960,2766190,1707256,4.855822e+22,2751.582667,43127416,508818276,3874903
2,2105171.0,334356.790126,251274.22103,232506.26965,219528.9291,212133.6355,208071.10213,53206596,3382939,1964217,6.412969e+22,3530.20816,56957465,624926187,4671147
3,2575440.0,386398.836185,296026.93885,274575.14957,259474.00753,251069.60433,246431.50847,60251791,4049955,2413418,7.219306e+22,4123.934495,64118962,745663550,5552290
4,2879258.0,367365.052408,327381.6372,302521.99284,285530.66554,275972.961533,270914.294603,65093871,4420787,2726988,7.80759e+22,4506.632607,69343589,812848527,7127991
5,2490769.0,359906.855479,302667.67032,281141.59488,265414.75985,256655.9898,251689.18298,60993465,4119787,2655767,7.083933e+22,4111.676663,62916311,756539052,6759529
6,1696724.0,283298.002362,214027.02455,197919.41253,187172.93188,181163.22224,177880.08352,39772271,2924817,1846419,4.970232e+22,2726.474377,44143452,536477681,4390941


This comes in handy when the relevant data is out-of-memory, but the result set can fit in RAM