In [1]:
import pandas as pd
import dask.dataframe as dd
import rioxarray as riox

from dask import compute
from pyproj import Transformer
from shapely.geometry import mapping, Point

In [2]:
from dask.distributed import LocalCluster, Client

In [3]:
# cluster = LocalCluster(n_workers=12)
# cluster

In [4]:
# cluster.close()

In [5]:
# client = Client(cluster)
client = Client("tcp://127.0.0.1:44791")

## Run the analysis in a scalable way

load the raster and fill NaNs with 0

In [6]:
myraster = (
    riox.open_rasterio('test-data/Impervious_Surface_NOAA_Satellite_2010/Impervious_Surface_NOAA_Satellite_2010/data_provided/impsa_2010_20210519.tif')
    .sel(band=1)
)

In [7]:
%%time
myraster = myraster.where(myraster != myraster.rio.nodata, 0)

CPU times: user 1.34 s, sys: 253 ms, total: 1.59 s
Wall time: 1.6 s


Load points for data extraction and create buffers

In [8]:
transformer = Transformer.from_crs("EPSG:3577", myraster.rio.crs, always_xy=True)

In [9]:
# points = (
#     dd.read_csv('test-data/one-million.csv')
#     .repartition(12)
#     .rename(columns={'X': 'x', 'Y': 'y'})
# )
# points.head()

In [10]:
# points['lat_lon_tuple'] = points[['x', 'y']].apply(
#     lambda row: transformer.transform(row['x'], row['y']), axis=1,
#     meta=object
# )
# points['lat'] = points['lat_lon_tuple'].apply(lambda el: el[0], meta=float)
# points['lon'] = points['lat_lon_tuple'].apply(lambda el: el[1], meta=float)
# points['points'] = points['lat_lon_tuple'].apply(Point, meta=object)
# points['points_buffer'] = points['points'].apply(lambda x: x.buffer(10), meta=object)

In [11]:
%%time
points = (
    pd.read_csv('test-data/one-million.csv')
    .rename(columns={'X': 'x', 'Y': 'y'})
    .assign(
        lat_lon_tuple = lambda columns: columns[['x', 'y']].apply(
            lambda row: transformer.transform(row['x'], row['y']), axis=1
        ),
        lat = lambda columns: columns['lat_lon_tuple'].apply(lambda el: el[0]),
        lon = lambda columns: columns['lat_lon_tuple'].apply(lambda el: el[1]),
        points = lambda columns: columns['lat_lon_tuple'].apply(Point),
        points_buffer = lambda columns: columns['points'].apply(lambda x: x.buffer(10))
    )
)

CPU times: user 49.3 s, sys: 603 ms, total: 49.9 s
Wall time: 50 s


create dask dataframe, scatter to dictribute computing cluster and run extraction

In [12]:
points_dask = dd.from_pandas(points, npartitions=4).persist()

In [13]:
def extract_mean_from_buffer(raster, geom):
    data_points = pd.Series(geom.exterior.coords)
    values_from_raster = data_points.apply(lambda row: raster.sel(x=row[0], y=row[1], method="nearest").item()).values
    return values_from_raster.mean()

In [14]:
points_dask['extracted_mean'] = points_dask['points_buffer'].apply(
    lambda x: extract_mean_from_buffer(myraster, x), meta=float
) * myraster.attrs['scale_factor'] + myraster.attrs['add_offset']

In [None]:
%%time
points_dask = points_dask.compute()

In [16]:
client.close()

In [None]:
points_dask.sample(20)