In [1]:
import rasterio
import numpy
import dask
import sys
import starepandas
import pyproj
import time
import xarray
import pystare
import dask
import dask.distributed
import dask.diagnostics
import pickle

In [2]:
dask.config.set({'temporary-directory': '/tablespace/dask'})
dask.config.set({'distributed.comm.timeouts.tcp':  '600s'})
dask.config.set({'distributed.comm.timeouts.connect':  '600s'})
dask.config.get('distributed.comm.timeouts')

{'connect': '600s', 'tcp': '600s'}

In [3]:
file_path = '/tablespace/spires/binary_snowmaps/17DEC11190506binarySnow.tif'

# Loading Data

In [4]:
%%time
bands = None
with rasterio.open(file_path) as src:
    src_crs = src.crs
    values = {}
    if bands is None:
        bands = range(1, src.count + 1)
    for band in bands:
        values[f'band_{band}'] = src.read(band)
    height = values['band_1'].shape[0]
    width = values['band_1'].shape[1]
    transform = src.transform

CPU times: user 4.43 s, sys: 5.03 s, total: 9.46 s
Wall time: 9.65 s


# Dask-Based transfrom and SIDS lookup

In [None]:
colrow = xarray.DataArray(numpy.meshgrid(numpy.arange(width, dtype='int32'), 
                                         numpy.arange(height, dtype='int32'), 
                                         copy=False)).chunk({'dim_1': 1000, 'dim_2': 1000})
colrow

## Gertting the native Coordinates

In [None]:
trans =  numpy.array([transform.a, transform.b, transform.c, transform.d, transform.e, transform.f], dtype='float64')

def wrap_xy(colrow):
    xs = colrow[0] * trans[0] + colrow[1] * trans[1] + trans[2] + trans[0]/2
    ys = colrow[0] * trans[3] + colrow[1] * trans[4] + trans[5] + trans[4]/2
    return numpy.array([xs, ys])

xy = xarray.apply_ufunc(wrap_xy, colrow, dask="parallelized", output_dtypes=['float64'])

## Transform to WGS84

In [None]:
epsg_4326 = 'EPSG:4326'
transformer = pyproj.Transformer.from_crs(src_crs, epsg_4326)

def wrap_transform(coords):
    return numpy.array(transformer.transform(coords[0], coords[1]), dtype='float64')

coords = xarray.apply_ufunc(wrap_transform, xy, dask="parallelized", output_dtypes=['float64'])

## Making Sids

In [None]:
sids = xarray.apply_ufunc(pystare.from_latlon_2d, coords[0], coords[1], dask="parallelized", kwargs={'adapt_level': True})

## Computing

In [None]:
with dask.distributed.Client(n_workers=60, threads_per_worker=1, memory_limit='10GB', processes=True) as client:          
    sids = sids.compute()
sids

In [None]:
del colrow

In [None]:
sids = xarray.DataArray.to_numpy(sids)

In [None]:
with open('/tablespace/spires/binary/2017-12-11_sids.pickle', 'wb') as f:
    pickle.dump(sids, f)

# Make Dataframe

In [5]:
import starepandas
import geopandas
import pickle

In [6]:
with open('data/roi_4326_buffer_huge.pickle', 'rb') as f:
    roi = pickle.load(f)
roi_sids = roi.sids.iloc[0]

In [7]:
with open('/tablespace/spires/binary_snowmaps/2017-12-11_sids.pickle', 'rb') as f:
    sids = pickle.load(f)

In [8]:
sdf = starepandas.STAREDataFrame({'sids': sids.flatten(), 'snow': values['band_1'].flatten()})

In [9]:
del sids, values

In [10]:
%%time
intersects = sdf.stare_intersects(roi_sids, n_partitions=1000)
subset = sdf[intersects]
subset.reset_index(drop=True, inplace=True)

CPU times: user 3min 25s, sys: 4min 17s, total: 7min 43s
Wall time: 16min 35s


In [11]:
subset.info()

<class 'starepandas.staredataframe.STAREDataFrame'>
RangeIndex: 2972307454 entries, 0 to 2972307453
Data columns (total 2 columns):
 #   Column  Dtype
---  ------  -----
 0   sids    int64
 1   snow    uint8
dtypes: int64(1), uint8(1)
memory usage: 24.9 GB


In [12]:
with open('/tablespace/spires/binary_snowmaps/2017-12-11_clippedDF_huge.pickle', 'wb') as f:
    pickle.dump(subset, f)