In [1]:
# Mask raster values > 360
# Build this so we can loop over a collection of rasters
# Do the aggregation operations per admin unit in Dask.DataFrames

import dask
import coiled
from dask.distributed import Client, LocalCluster, Lock
from dask.utils import SerializableLock
import dask.dataframe as dd
import pandas as pd

import rioxarray as rx
import xarray as xr

import os


def config_coiled_cluster(env_name="riox"):
    software_env_name = env_name
    coiled.create_software_environment(
        name=software_env_name,
        #container="mrmaksimize/prefect-coiled-env:latest",
        pip=[
            "Fiona==1.8.19",
            "rasterio==1.2.3",
            "s3fs==2021.5.0",
            "xarray==0.18.2",
            "xarray-spatial==0.2.2",
            "rioxarray==0.4.0",
            "dask==2021.4.1",
            "distributed==2021.4.1"
        ],
        backend_options={"region": "us-east-1"})

    # Create a cluster configuration named "my-cluster-config"
    coiled.create_cluster_configuration(
        name=f"{software_env_name}-dev",
        scheduler_cpu=4,
        #scheduler_memory="8 GiB",
        scheduler_memory="30 GiB",
        worker_cpu=4,
        #worker_memory="8 GiB",
        worker_memory="30 GiB",
        software=f"mrmaksimize/{software_env_name}",
    )
    
    

    return True


rasters = {
 'dry_allroads_1993': 'dry_allroads_1993_20210516_WGS84_COG.tif',
 'dry_allroads_2001': 'dry_allroads_2001_20210517_WGS84_COG.tif',
 'dry_allroads_2011': 'dry_allroads_2011_20210516_WGS84_COG.tif',
 'dry_allroads_2021': 'dry_allroads_2021_20210517_WGS84_COG.tif',
 'dry_highways_1993': 'dry_highways_1993_20210516_WGS84_COG.tif',
 'dry_highways_2001': 'dry_highways_2001_20210517_WGS84_COG.tif',
 'dry_highways_2011': 'dry_highways_2011_20210516_WGS84_COG.tif',
 'dry_highways_2021': 'dry_highways_2021_20210517_WGS84_COG.tif',
 'dry_urban_centers_1993': 'dry_urban_centers_1993_20210516_WGS84_COG.tif',
 'dry_urban_centers_2001': 'dry_urban_centers_2001_20210517_WGS84_COG.tif',
 'dry_urban_centers_2011': 'dry_urban_centers_2011_20210516_WGS84_COG.tif',
 'dry_urban_centers_2021': 'dry_urban_centers_2021_20210517_WGS84_COG.tif',
 'msn_allroads_1993': 'msn_allroads_1993_20210516_WGS84_COG.tif',
 'msn_allroads_2001': 'msn_allroads_2001_20210517_WGS84_COG.tif',
 'msn_allroads_2011': 'msn_allroads_2011_20210517_WGS84_COG.tif',
 'msn_allroads_2021': 'msn_allroads_2021_20210517_WGS84_COG.tif',
 'msn_highways_1993': 'msn_highways_1993_20210516_WGS84_COG.tif',
 'msn_highways_2001': 'msn_highways_2001_20210517_WGS84_COG.tif',
 'msn_highways_2011': 'msn_highways_2011_20210517_WGS84_COG.tif',
 'msn_highways_2021': 'msn_highways_2021_20210517_WGS84_COG.tif',
 'msn_urban_centers_1993': 'msn_urban_centers_1993_20210516_WGS84_COG.tif',
 'msn_urban_centers_2001': 'msn_urban_centers_2001_20210517_WGS84_COG.tif',
 'msn_urban_centers_2011': 'msn_urban_centers_2011_20210517_WGS84_COG.tif',
 'msn_urban_centers_2021': 'msn_urban_centers_2021_20210517_WGS84_COG.tif'
}

FRICTION_THRESHOLD = 150
DENSITY_THRESHOLD = 0.02

PATH_BASE = "s3://data-fully-public-bucket/nepal"
PATH_BASE_LOCAL = "../data/nepal"




os.environ["AWS_ACCESS_KEY_ID"] = "AKIAUEWT2Z5SGDVBLHEL"
os.environ["AWS_SECRET_ACCESS_KEY"] = "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"
#os.environ["DASK_COILED__USER"] = "MrMaksimize" 
#os.environ['DASK_COILED__TOKEN'] = "dd777d6d9b5c1f56f5ab105c767cbd4a08c1e78a"
#os.environ['DASK_COILED__SERVER'] = "https://cloud.coiled.io"

STORAGE_OPTS = {"secret": os.environ.get("AWS_SECRET_ACCESS_KEY"), 
                "key": os.environ.get("AWS_ACCESS_KEY_ID")}

CLUSTER = 'remote'
#PATH_BASE = PATH_BASE_LOCAL # comment for S3
COG_BASE = f"{PATH_BASE}/cog"
#COG_BASE = f"https://data-fully-public-bucket.s3.amazonaws.com/nepal/cog"


points_large = f"{PATH_BASE}/NPL_WP_2020_19m_Pts_XY.csv"
points_small = f"{PATH_BASE}/NPL_WP_2020_500k_Example_Pts_XY.csv"


out_path_mod = ""

POINTS_URL = points_large

In [48]:

client = None

if CLUSTER == 'local':
    cluster = LocalCluster(n_workers = 8, 
                       processes=True, 
                       threads_per_worker=8, 
                       scheduler_port=8786)
    
    client = Client(cluster)
    
else:
    config_coiled_cluster("riox")
    cluster = coiled.Cluster(
        name='riox-cluster',
        configuration=f"riox-dev",
        n_workers=4
    )

    client = Client(cluster) 
    #client.get_versions(packages=["xgboost", "numpy"], check=True)

client
    

Updating software environment...
Found existing software environment build, returning


Output()

Found software environment build


0,1
Client  Scheduler: tls://ec2-44-192-70-140.compute-1.amazonaws.com:8786  Dashboard: http://ec2-44-192-70-140.compute-1.amazonaws.com:8787,Cluster  Workers: 0  Cores: 0  Memory: 0 B


In [49]:
client.restart()

0,1
Client  Scheduler: tls://ec2-44-192-70-140.compute-1.amazonaws.com:8786  Dashboard: http://ec2-44-192-70-140.compute-1.amazonaws.com:8787,Cluster  Workers: 0  Cores: 0  Memory: 0 B


In [50]:
"""
Extract pixel values from a dask-backed DataArray/Dataset at x, y coordinates given in a dask DataFrame.

See the ``pixel_values_at_points`` docstring for usage.

Example
-------
>>> import dask
>>> import xarray as xr
>>> data = xr.DataArray(
...     da.random.random((256, 512), chunks=100),
...     coords=[("y", np.linspace(0, 1, 256)), ("x", np.linspace(0, 1, 512))],
... )
>>> df = dask.datasets.timeseries()
>>> df_with_values = pixel_values_at_points(data, df)
>>> df_with_values
Dask DataFrame Structure:
                   id    name        x        y pixel_values
npartitions=17
                int64  object  float64  float64      float64
                  ...     ...      ...      ...          ...
...               ...     ...      ...      ...          ...
                  ...     ...      ...      ...          ...
                  ...     ...      ...      ...          ...
Dask Name: pixel-values, 103 tasks
>>> df_with_values.head()
                       id    name         x         y  pixel_values
timestamp
2000-01-01 00:00:35   980     Tim  0.054010  0.108094      0.704963
2000-01-01 00:02:16  1045     Dan  0.009220  0.231619      0.904162
2000-01-01 00:05:04  1020  Ursula  0.116082  0.048629      0.463596
2000-01-01 00:06:31  1075  Ingrid  0.175014  0.383851      0.991749
2000-01-01 00:07:35  1017   Kevin  0.067010  0.149642      0.661196
"""

from typing import Union
import dask
from dask.highlevelgraph import HighLevelGraph
import dask.dataframe as dd
import dask.array as da
import xarray as xr
import pandas as pd
import numpy as np


def _add_pixel_values_to_df(
    pixels: np.ndarray,
    y_coords: np.ndarray,
    x_coords: np.ndarray,
    partition: pd.DataFrame,
    colname: str = "value",
) -> pd.DataFrame:
    # NOTE: coords must have already been chunked to align with `pixels`
    arr = xr.DataArray(
        pixels, coords=[("y", y_coords.squeeze()), ("x", x_coords.squeeze())]
    )

    # Filter points that actually overlap with this chunk.
    # Besides improving performance, this also saves us from having to do any merge
    # step of the output dataframes: since raster chunks are non-overlapping
    # spatially, the contents of each dataframe will be non-overlapping as well.

    # minx, maxx, miny, maxy = x_coords[0], x_coords[-1], y_coords[-1], y_coords[0]
    minx, miny, maxx, maxy = (  # noqa: F841
        x_coords.min(),
        y_coords.min(),
        x_coords.max(),
        y_coords.max(),
    )

    inbounds_df = partition.query("@minx < x < @maxx and @miny < y < @maxy")
    points_xr = xr.Dataset.from_dataframe(inbounds_df[["x", "y"]])

    # Select pixel values at points
    pixel_values = arr.sel(x=points_xr.x, y=points_xr.y, method="nearest")
    pixel_values_df = pixel_values.reset_coords(drop=True).to_dataframe(name=colname)
    return pd.concat([inbounds_df, pixel_values_df], axis="columns")


def _pixel_values_at_points(
    arr: da.Array,
    ddf: dd.DataFrame,
    y_coords: da.Array,
    x_coords: da.Array,
    colname: str = "value",
) -> dd.DataFrame:
    # TODO this would be easy
    assert arr.ndim == 2, "Multi-band not supported yet"

    # Turn each chunk of the raster into a DataFrame of pixel values extracted from that chunk.
    px_values_df_chunks = arr.map_blocks(
        _add_pixel_values_to_df,
        # Expand coords arrays so they broadcast correctly
        y_coords[:, None],
        x_coords[None],
        ddf,
        colname=colname,
        # give a fake meta since `_add_pixel_values_to_df` would fail on dummy data
        meta=np.empty((0, 0)),
    )
    # NOTE: The cake is a lie! All of `px_values_df_chunks`'s metadata is wrong.
    # Its chunks are DataFrames, not ndarrays; its shape and dtype are also
    # not what `_meta` says.

    # To turn the "array" of DataFrames into one dask DataFrame, we have to flatten it
    # so its keys match the pattern of a DataFrame.
    # Sadly we can't just `.ravel()`, because that will add actual `np.reshape` tasks to the graph,
    # which will try to reshape the DataFrames (!!) to shapes that make no sense (!!!).

    # So we throw up our hands and write a dask graph by hand that just renames from one key to another.
    # TODO make this non-materialized (don't think it's blockwise-able though)

    df_name = f"pixel-values-{dask.base.tokenize(px_values_df_chunks)}"
    dsk = {
        (df_name, i): key
        for i, key in enumerate(dask.core.flatten(px_values_df_chunks.__dask_keys__()))
    }
    hlg = HighLevelGraph.from_collections(
        df_name, dsk, dependencies=[px_values_df_chunks]
    )

    meta = pd.concat(
        [ddf._meta, dd.utils.make_meta({colname: arr.dtype})], axis="columns"
    )
    return dd.DataFrame(hlg, df_name, meta, (None,) * len(dsk))


def pixel_values_at_points(
    raster: Union[xr.DataArray, xr.Dataset], points: dd.DataFrame
) -> dd.DataFrame:
    """
    Extract pixel values from a DataArray or Dataset at coordinates given by a dask DataFrame

    The ``points`` DataFrame should have columns ``x`` and ``y``, which should be in
    the same coordinate reference system as the ``x`` and ``y`` coordinates on the
    ``raster`` DataArray/Dataset.

    The output DataFrame will contain the same data as ``points``, with one column added
    per data variable in the Dataset (or just one column in the case of a DataArray).
    The column(s) will have the same names as the data variables.

    The output DataFrame will have one partition per chunk of the raster.
    Each partition will contain only the points overlapping that chunk.
    It will have the same index, but the divisions will be unknown.

    Note that the entire ``points`` DataFrame will be loaded into memory; it cannot
    be processed in parallel.
    """
    # Turn x and y coordinates into dask arrays, so we can re-create the xarray
    # object within our blockwise function. By using dask arrays instead of NumPy
    # array literals, dask will split up the coordinates into chunks that align with the pixels.
    # Ideally we could just use `DataArray.map_blocks` for all this, but 1) xarray's `map_blocks`
    # doesn't support auxiliary non-xarray collections yet (i.e. ``points```), and 2) it generates
    # a low-level graph instead of using a Blockwise layer.
    try:
        xs, ys = [
            da.from_array(
                raster.coords[coord].values,
                chunks=(
                    raster.chunks[raster.get_axis_num(coord)]
                    if isinstance(raster, xr.DataArray)
                    else raster.chunks[coord]
                    # ^ NOTE: raises error if chunks are inconsistent
                ),
                inline_array=True,
            )
            for coord in ["x", "y"]
        ]
    except KeyError as e:
        raise ValueError(f"`rasters` is missing the coordinate '{e}'")

    if isinstance(raster, xr.DataArray):
        return _pixel_values_at_points(
            raster.data, points, ys, xs, colname=raster.name or "value"
        )
    else:
        # TODO how do we merge all the DataFrames for non-overlapping arrays?
        # They might not even be the same length, partitions won't correspond, etc.
        # Ideally we'd have spatial partitioning to make the merge easy.
        # But even without, I guess we need the index on each DataFrame to actually
        # be identifying back to the original row so we can at least do a shuffle.
        if len(set(arr.shape for arr in raster.data_vars.values())) != 1:
            raise NotImplementedError(
                "Can't handle Datasets with variables of different shapes yet."
            )
        if len(set(arr.chunks for arr in raster.data_vars.values())) != 1:
            raise NotImplementedError(
                "Can't handle Datasets with variables of different chunk patterns yet."
            )

        # NOTE: we use the full points dataframe for the first raster, and just the xy-coords
        # for the subsequent ones, so we can concatenate all them column-wise without
        # duplicating data.
        points_justxy = points[["x", "y"]]
        dfs = [
            _pixel_values_at_points(
                arr.data, points if i == 0 else points_justxy, ys, xs, colname=name
            )
            for i, (name, arr) in enumerate(raster.data_vars.items())
        ]

        # NOTE: this is safe, since all the rasters have the same chunk pattern
        # (and so we assume are spatially aligned; should actually check this),
        # so the resulting DataFrames will have matching partitions.
        return dd.concat(dfs, axis="columns", ignore_unknown_divisions=True)

In [51]:
points = dd.read_csv(POINTS_URL, storage_options=STORAGE_OPTS,
                     usecols = [
                         0, #"fid",
                         #1, #"OBJECTID",
                         2, #"VALUE", 
                         #3, "PALIKA",
                         #4, "DISTRICT",
                         #5, "GAPA_NAPA",
                         #6, "GN_TYPE",
                         #7, "PROVINCE",
                         8, #"CBS_ID",
                         #9, "HLCIT_ID",
                         #10,"HLCIT_Ward",
                         11,#"CBS_Ward",
                         12,#"wgs84_x",
                         13,#"wgs84_y"
                     ],
                     header=0,
                     names=[
                         "ix",
                         #"OBJECTID",
                         "VALUE_PTS",
                         #"PALIKA",
                         #"DISTRICT",
                         #"GAPA_NAPA",
                         #"GN_TYPE",
                         #"PROVINCE",
                         "CBS_ID",
                         #"HLCIT_ID",
                         #"HLCIT_Ward",
                         "CBS_Ward",
                         "x",
                         "y"
                     ],
                     dtype = {
                         "ix": int, 
                         #"OBJECTID": int,
                         "VALUE_PTS": float, 
                         #"PALIKA": str,
                         #"DISTRICT": str,
                         #"GAPA_NAPA":str, 
                         #"GN_TYPE": str, 
                         #"PROVINCE": str,
                         "CBS_ID":str,  
                         #"HLCIT_ID":str,
                         #"HLCIT_Ward":str,
                         "CBS_Ward": str,
                         "x": float, 
                         "y": float 
                     },
                     na_values = ' ',
                     #blocksize='25mb'#'100mb'
                ).set_index('ix', drop=True, sorted=True)

points['CBS_Ward'] = points['CBS_Ward'].fillna('0')
points['CBS_ID'] = points['CBS_ID'].fillna('0')
points['CBS_Ward'] = points['CBS_Ward'].astype(float).astype(int)
points['CBS_ID'] = points['CBS_ID'].astype(float).astype(int)
points.head()

Unnamed: 0_level_0,VALUE_PTS,CBS_ID,CBS_Ward,x,y
ix,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
1,0.001882,60304,6030406,81.620833,30.445833
2,0.001878,60304,6030406,81.621666,30.445833
3,0.001938,60304,6030406,81.6225,30.445833
4,0.001996,60304,6030406,81.623333,30.445833
5,0.001992,60304,6030406,81.624166,30.445833


In [52]:
# Temp subset points
#points_keep = [
#    10609,
#    31101,
#    60703,
#    40303,
#    40301
#]

#points = points[points['CBS_ID'].isin(points_keep)]
#out_path_mod = "_subset"

In [53]:
# Remove low pop densities
points = points[points['VALUE_PTS'] > DENSITY_THRESHOLD]
#points = points.repartition(npartitions=40)

In [54]:
loaded_rasters = {}
for key in rasters:
    print(f"Persist raster: {key} at {COG_BASE}/{rasters[key]}")
    raster = xr.open_rasterio(f"{COG_BASE}/{rasters[key]}", 
                                           chunks = (4, "auto", -1),
                                           parse_coordinates=True)
                                           #lock = True)
    loaded_rasters[key] = raster
    

Persist raster: dry_allroads_1993 at s3://data-fully-public-bucket/nepal/cog/dry_allroads_1993_20210516_WGS84_COG.tif
Persist raster: dry_allroads_2001 at s3://data-fully-public-bucket/nepal/cog/dry_allroads_2001_20210517_WGS84_COG.tif
Persist raster: dry_allroads_2011 at s3://data-fully-public-bucket/nepal/cog/dry_allroads_2011_20210516_WGS84_COG.tif
Persist raster: dry_allroads_2021 at s3://data-fully-public-bucket/nepal/cog/dry_allroads_2021_20210517_WGS84_COG.tif
Persist raster: dry_highways_1993 at s3://data-fully-public-bucket/nepal/cog/dry_highways_1993_20210516_WGS84_COG.tif
Persist raster: dry_highways_2001 at s3://data-fully-public-bucket/nepal/cog/dry_highways_2001_20210517_WGS84_COG.tif
Persist raster: dry_highways_2011 at s3://data-fully-public-bucket/nepal/cog/dry_highways_2011_20210516_WGS84_COG.tif
Persist raster: dry_highways_2021 at s3://data-fully-public-bucket/nepal/cog/dry_highways_2021_20210517_WGS84_COG.tif
Persist raster: dry_urban_centers_1993 at s3://data-full

In [55]:
rasters_ds = (
    xr.Dataset(loaded_rasters)
    .sel(band=1)
    .map(lambda arr: arr.where(arr != arr.nodatavals[0]))
)

In [56]:
df_pixels = pixel_values_at_points(rasters_ds, points)


In [57]:
#Keep all points where dry_allroads_2021_hrs < FRICTION_Threshold
df_pixels = df_pixels[df_pixels.dry_allroads_2021 < FRICTION_THRESHOLD]
df_pixels = df_pixels.loc[:,~df_pixels.columns.duplicated()]
df_pixels = df_pixels.persist()

In [58]:
# Get Pops by CBS_ID
adm3_pop = df_pixels.groupby('CBS_ID')['VALUE_PTS'].sum().to_frame("adm3_pop")

# Get Pops by CBS Ward
adm4_pop = df_pixels.groupby('CBS_Ward')['VALUE_PTS'].sum().to_frame("adm4_pop")

In [59]:
# Merge the Pops into Ref DF
df_pixels = dd.merge(df_pixels, adm3_pop, how='left', left_on="CBS_ID", right_index=True)
df_pixels = dd.merge(df_pixels, adm4_pop, how = 'left', left_on="CBS_Ward", right_index=True)


#points = points.persist()

df_pixels = df_pixels.persist()


In [60]:
# Calculate the Weights for each admin area
df_pixels['wt_adm_3'] = (df_pixels['VALUE_PTS'] / df_pixels['adm3_pop'])
df_pixels['wt_adm_4'] = (df_pixels['VALUE_PTS'] / df_pixels['adm4_pop'])

In [61]:
adm3_avg_cols = []
adm4_avg_cols = []
for rkey in rasters:
    hrs_col = f"{rkey}"
    avg_col_adm_3 = f"{rkey}_avt_adm3"
    avg_col_adm_4 = f"{rkey}_avt_adm4"
    df_pixels[avg_col_adm_3] = df_pixels[hrs_col] * df_pixels['wt_adm_3']
    df_pixels[avg_col_adm_4] = df_pixels[hrs_col] * df_pixels['wt_adm_4']
    
    adm3_avg_cols.append(avg_col_adm_3)
    adm4_avg_cols.append(avg_col_adm_4)

In [62]:
#df_pixels = df_pixels.persist()
#df_pixels.head()

In [63]:
print(",".join(adm3_avg_cols))
print(",".join(adm4_avg_cols))

dry_allroads_1993_avt_adm3,dry_allroads_2001_avt_adm3,dry_allroads_2011_avt_adm3,dry_allroads_2021_avt_adm3,dry_highways_1993_avt_adm3,dry_highways_2001_avt_adm3,dry_highways_2011_avt_adm3,dry_highways_2021_avt_adm3,dry_urban_centers_1993_avt_adm3,dry_urban_centers_2001_avt_adm3,dry_urban_centers_2011_avt_adm3,dry_urban_centers_2021_avt_adm3,msn_allroads_1993_avt_adm3,msn_allroads_2001_avt_adm3,msn_allroads_2011_avt_adm3,msn_allroads_2021_avt_adm3,msn_highways_1993_avt_adm3,msn_highways_2001_avt_adm3,msn_highways_2011_avt_adm3,msn_highways_2021_avt_adm3,msn_urban_centers_2011_avt_adm3,msn_urban_centers_1993_avt_adm3,msn_urban_centers_2001_avt_adm3,msn_urban_centers_2021_avt_adm3
dry_allroads_1993_avt_adm4,dry_allroads_2001_avt_adm4,dry_allroads_2011_avt_adm4,dry_allroads_2021_avt_adm4,dry_highways_1993_avt_adm4,dry_highways_2001_avt_adm4,dry_highways_2011_avt_adm4,dry_highways_2021_avt_adm4,dry_urban_centers_1993_avt_adm4,dry_urban_centers_2001_avt_adm4,dry_urban_centers_2011_avt_adm4,

In [64]:
#%time df_pixels.to_csv(f"s3://data-fully-public-bucket/nepal/output/19m/df_pixels_raw{out_path_mod}.csv", single_file = True, storage_options=STORAGE_OPTS)

In [65]:
lgu = df_pixels.groupby(['CBS_ID'])[adm3_avg_cols].sum().reset_index()


In [66]:
ward = df_pixels.groupby(['CBS_Ward'])[adm4_avg_cols].sum().reset_index()


In [67]:
lgu_output = f"s3://data-fully-public-bucket/nepal/output/19m/lgu{out_path_mod}.csv"
ward_output = f"s3://data-fully-public-bucket/nepal/output/19m/ward{out_path_mod}.csv"


In [68]:
print(lgu_output)

s3://data-fully-public-bucket/nepal/output/19m/lgu.csv


In [70]:
%time lgu.to_csv(lgu_output, single_file = True, storage_options=STORAGE_OPTS)

  warn("Appending data to a network storage system may not work.")


CPU times: user 520 ms, sys: 34.3 ms, total: 555 ms
Wall time: 3min 47s


['data-fully-public-bucket/nepal/output/19m/lgu.csv']

In [71]:
%time ward.to_csv(ward_output, single_file = True, storage_options=STORAGE_OPTS)

CPU times: user 137 ms, sys: 0 ns, total: 137 ms
Wall time: 4.18 s


['data-fully-public-bucket/nepal/output/19m/ward.csv']

distributed.client - ERROR - Failed to reconnect to scheduler after 10.00 seconds, closing client
_GatheringFuture exception was never retrieved
future: <_GatheringFuture finished exception=CancelledError()>
asyncio.exceptions.CancelledError


In [None]:
import dask
import xarray as xr
data = xr.DataArray(
    da.random.random((256, 512), chunks=100),
    coords=[("y", np.linspace(0, 1, 256)), ("x", np.linspace(0, 1, 512))],
)


In [None]:
df = dask.datasets.timeseries()
df.head()

In [None]:
df_with_values = pixel_values_at_points(data, df)
df_with_values.head()

In [None]:
#points.head()
chunk_list = []

for chunk_df in points:
 
    ind_x = xr.DataArray(chunk_df.x.values, dims="unique_index")
    ind_y = xr.DataArray(chunk_df.y.values, dims="unique_index")
    
    # TODO - this should be parallel somehow
    for rkey in loaded_rasters:
        print(f"Extracting from {rkey}")
        rds = loaded_rasters[rkey]
        fill_val = rds.attrs['_FillValue']
        result = rds.sel(band=1, x = ind_x, y = ind_y, method='nearest').values
        chunk_df[f"{rkey}_hrs"] = result
        chunk_df[chunk_df[f"{rkey}_hrs"] == fill_val] = None
    
    chunk_list.append(chunk_df)
    
p2 = pd.concat(chunk_list)
p2.head()

In [None]:
results = {}
for chunk_df in points:
    print('e')
    ind_x = xr.DataArray(chunk_df.x.values, dims="unique_index")
    ind_y = xr.DataArray(chunk_df.y.values, dims="unique_index")
    
    # TODO - this should be parallel somehow
    res = {}
    for rkey in loaded_rasters:
        print(f"Extracting from {rkey}")
        rds = loaded_rasters[rkey]
        res[rkey] = rds.sel(band=1, x = ind_x, y = ind_y, method='nearest')
        
    results.append(res)
        

In [None]:
proc = []
for r in results:
    proc.append(r['msn_allroads_2021'])

In [None]:
res = dask.compute(*proc)

In [None]:
#loaded_rasters = {}
#url = "https://data-fully-public-bucket.s3.amazonaws.com/nepal/cog/dry_allroads_2021_20210517_WGS84_COG.tif"
#url2 = "https://data-fully-public-bucket.s3.amazonaws.com/nepal/cog/msn_allroads_2021_20210517_WGS84_COG.tif"
#loaded_rasters['dry_uncog'] = rx.open_rasterio(url, 
#                                       chunks = (4, "auto", -1),
#                                       masked=False,
#                                       parse_coordinates=True, 
#                                       lock = True).persist()

In [None]:
def get_values_from_rasters(chunk_df, loaded_rasters, partition_info=None): 
    print(partition_info)
    #If divisions are not known (for instance if the index is not sorted) then you will get None as the division.
    ind_x = xr.DataArray(chunk_df.x.values, dims="unique_index")
    ind_y = xr.DataArray(chunk_df.y.values, dims="unique_index")
    
    # TODO - this should be parallel somehow
    for rkey in loaded_rasters:
        print(f"Extracting from {rkey}")
        rds = loaded_rasters[rkey]
        fill_val = rds.attrs['_FillValue']
        result = rds.sel(band=1, x = ind_x, y = ind_y, method='nearest').values
        chunk_df[f"{rkey}_hrs"] = result
        chunk_df[chunk_df[f"{rkey}_hrs"] == fill_val] = None
    return chunk_df

# Build output meta
meta = {'VALUE': float, 'CBS_ID': int, 'CBS_Ward': int, 'x': float, 'y': float}
for rkey in loaded_rasters:
    meta[f"{rkey}_hrs"] = float


# Promise the calculation
# Adding loaded rasters per coiled team suggestion
points = points.map_partitions(get_values_from_rasters, loaded_rasters=loaded_rasters,
                                 meta=meta)
points.head()

In [None]:
points.head()

In [None]:
lgu = points.groupby(['CBS_ID'])[adm3_avg_cols].sum()
lgu_output = "s3://data-fully-public-bucket/nepal/output/500k/lgu.csv"
#lgu_output = "../data/nepal/lgu_test.csv"
lgu.to_csv(lgu_output, single_file = True, storage_options=STORAGE_OPTS)

In [None]:
#ward = points.groupby(['CBS_Ward'])[adm4_avg_cols].sum()
#ward_output = "s3://data-fully-public-bucket/nepal/output/500k/lgu.csv"
#ward_output = "../data/nepal/ward_test.csv"
#ward.to_csv(ward_output, single_file = True, storage_options=STORAGE_OPTS)

In [None]:
#points.visualize()