# Working with dask and xarray

In [1]:
from pyhdf.SD import SD
import numpy
import pystare
import xarray
import dask.distributed
import dask.dataframe
import datetime

In [2]:
client = dask.distributed.Client(n_workers=4)

In [3]:
file_path = '../tests/data/granules/MOD05_L2.A2019336.0000.061.2019336211522.hdf'
hdf = SD(file_path)
lon = hdf.select('Longitude').get().astype(numpy.double)
lat = hdf.select('Latitude').get().astype(numpy.double)

In [4]:
start = datetime.datetime.now()
stare = pystare.from_latlon_2d(lat=lat, 
                              lon=lon, 
                              adapt_level=True)
print(datetime.datetime.now()-start)

0:00:00.987186


## Dask

In [5]:
coords = numpy.array([lat, lon])
coords_d = dask.array.from_array(coords, chunks=(2,500,1354))
coords_d

Unnamed: 0,Array,Chunk
Bytes,1.67 MiB,1.67 MiB
Shape,"(2, 406, 270)","(2, 406, 270)"
Count,1 Graph Layer,1 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 1.67 MiB 1.67 MiB Shape (2, 406, 270) (2, 406, 270) Count 1 Graph Layer 1 Chunks Type float64 numpy.ndarray",270  406  2,

Unnamed: 0,Array,Chunk
Bytes,1.67 MiB,1.67 MiB
Shape,"(2, 406, 270)","(2, 406, 270)"
Count,1 Graph Layer,1 Chunks
Type,float64,numpy.ndarray


In [6]:
def stare(coords):
    return pystare.from_latlon_2d(coords[0], 
                                 coords[1], adapt_level=True)

In [7]:
s_d = coords_d.map_blocks(stare, drop_axis=[0], chunks=(100, 1354), dtype='int64')
s_d = s_d.compute()

## Xarray Ufunc

In [8]:
lat_x = xarray.DataArray(lat, dims=['x', 'y']).chunk({'x': 500})
lon_x = xarray.DataArray(lon, dims=['x', 'y']).chunk({'x': 500})

In [9]:
start = datetime.datetime.now()
s_d = xarray.apply_ufunc(pystare.from_latlon_2d, 
                         lat_x,
                         lon_x,
                         dask='parallelized',
                         output_dtypes=[numpy.int64])

sids = numpy.array(s_d)
print(datetime.datetime.now()-start)

0:00:01.039442


In [10]:
sids

array([[4298473764500464809, 4298458168380511209, 4297394569014717897,
        ..., 3604325910693007273, 3604468594879342953,
        3604495833162833193],
       [4298462872969244297, 4298459225563237225, 4297297422977447753,
        ..., 3604330264741384009, 3604471380516185641,
        3604465738696115433],
       [4298462873435275369, 4298459227962358473, 4297297429637206121,
        ..., 3604322952727773225, 3604471381825883401,
        3604465733841987657],
       ...,
       [3652144132972193481, 3650323462937407913, 3650325177740030185,
        ..., 3727730728598789545, 3727841631302055049,
        3727831398613792009],
       [3652144129926505097, 3650323400334252041, 3650325178786309321,
        ..., 3727730732960989609, 3727841627078009577,
        3727831398032615625],
       [3652167198498770729, 3652159322973158121, 3650318911383240361,
        ..., 3727838256925064969, 3727843063731949801,
        3727853163225616425]])

## Write Sidecar

In [11]:
import netCDF4
rootgrp = netCDF4.Dataset('test.nc', "w", format="NETCDF4")

rootgrp.close()

## Dask DataFrame

In [12]:
band1 = hdf.select('Water_Vapor_Infrared').get().astype(numpy.double)
lat = hdf.select('Latitude').get().astype(numpy.double)

In [13]:
import pandas
df = pandas.DataFrame({'stare': sids.flatten(), 'band1': band1.flatten()})
ddf = dask.dataframe.from_pandas(df, npartitions=4)
ddf.set_index('stare')

Unnamed: 0_level_0,band1
npartitions=4,Unnamed: 1_level_1
3604081108103418377,float64
3618019075911588075,...
3661533202196794217,...
3736859491218877322,...
4298544093115426153,...
