In [1]:
import xarray as xr
import pandas as pd
import numpy as np
import os
from tqdm.auto import tqdm

import fsspec
import dask.dataframe as ddf
from zarr.errors import GroupNotFoundError

import matplotlib.pyplot as plt
import matplotlib

import rhg_compute_tools.kubernetes as rhgk

  from distributed.utils import LoopRunner, format_bytes


In [2]:
fs = fsspec.filesystem('gs')

In [3]:
CRS_SUPPORT_BUCKET = os.environ['CRS_SUPPORT_BUCKET']

In [4]:
readme_fp = (
    f'gs://{CRS_SUPPORT_BUCKET}/public_datasets/spatial/exposure/GLOBAL/population/'
    'gpw-v4-population-count-adjusted-to-2015-unwpp-country-totals-rev11_2020_30_sec_tif/'
    'gpw_v4_population_count_adjusted_to_2015_unwpp_country_totals_rev11_2020_30_sec_tif_readme.txt'
)

In [5]:
source_patt = (
    'gs://{CRS_SUPPORT_BUCKET}/public_datasets/spatial/exposure/GLOBAL/{category}/'
    '{dataset}_{native_res}_tif/'
    '{dataset_underscore}_{native_res}.tif'
)

output_data = (
    'gs://{CRS_SUPPORT_BUCKET}/public_datasets/spatial/exposure/GLOBAL/'
    '{category}/{dataset}_{native_res}_tif/derived_datasets/{kind}/{name}.{ext}'
)

NATIVE_RES = '30_sec'

POPULATION_CITATION = (
    'Center for International Earth Science Information Network - CIESIN - '
    'Columbia University. 2018. Gridded Population of the World, Version 4 '
    '(GPWv4): Population Count Adjusted to Match 2015 Revision of UN WPP '
    'Country Totals, Revision 11. Palisades, NY: NASA Socioeconomic Data and '
    'Applications Center (SEDAC). https://doi.org/10.7927/H4PN93PB. '
    'Accessed 18 Feb 2022.'
)

LANDWATER_CITATION = (
    'Center for International Earth Science Information Network - CIESIN - '
    'Columbia University. 2018. Gridded Population of the World, Version 4 '
    '(GPWv4): Land and Water Area, Revision 11. Palisades, NY: NASA '
    'Socioeconomic Data and Applications Center (SEDAC). '
    'https://doi.org/10.7927/H4Z60M4Z. Accessed 24 Feb 2022.'
)

POP_CAT = 'population'
POP_VAR_NAME = 'population'
POP_UNIT = 'count'
POP_DATASET = 'gpw-v4-population-count-adjusted-to-2015-unwpp-country-totals-rev11_2020'
POP_URL = 'https://sedac.ciesin.columbia.edu/data/set/gpw-v4-population-count-adjusted-to-2015-unwpp-country-totals-rev11'
POP_CITATION = POPULATION_CITATION

LAND_CAT = 'land_water_area'
LAND_VAR_NAME = 'land_area'
LAND_UNIT = 'km^2'
LAND_DATASET = 'gpw-v4-land-water-area-rev11_landareakm'
LAND_URL = 'https://sedac.ciesin.columbia.edu/data/set/gpw-v4-land-water-area-rev11'
LAND_CITATION = LANDWATER_CITATION

WATER_CAT = 'land_water_area'
WATER_VAR_NAME = 'water_area'
WATER_UNIT = 'km^2'
WATER_DATASET = 'gpw-v4-land-water-area-rev11_waterareakm'
WATER_URL = 'https://sedac.ciesin.columbia.edu/data/set/gpw-v4-land-water-area-rev11'
WATER_CITATION = LANDWATER_CITATION

In [6]:
client, cluster = rhgk.get_giant_cluster()
cluster.scale(30)
cluster

VBox(children=(HTML(value='<h2>GatewayCluster</h2>'), HBox(children=(HTML(value='\n<div>\n<style scoped>\n    …

# Convert to different formats

* The zarr version is an exact replica of the GeoTiff, just chunked & in a cloud-optimized format
* The parquet file is converted to a columnar format, and all zero values are dropped

In [7]:
for category, dataset, url, citation, varname, unit in tqdm([
    (POP_CAT, POP_DATASET, POP_URL, POP_CITATION, POP_VAR_NAME, POP_UNIT),
    (LAND_CAT, LAND_DATASET, LAND_URL, LAND_CITATION, LAND_VAR_NAME, LAND_UNIT),
    (WATER_CAT, WATER_DATASET, WATER_URL, WATER_CITATION, WATER_VAR_NAME, WATER_UNIT),
], desc='dataset'):

    fmt_zarr = output_data.format(
        CRS_SUPPORT_BUCKET=CRS_SUPPORT_BUCKET,
        kind='reformatted',
        category=category,
        dataset=dataset,
        native_res=NATIVE_RES,
        name=f'{dataset}_{NATIVE_RES}',
        ext='zarr',
    )

    fmt_parquet = output_data.format(
        CRS_SUPPORT_BUCKET=CRS_SUPPORT_BUCKET,
        kind='reformatted',
        category=category,
        dataset=dataset,
        native_res=NATIVE_RES,
        name=f'{dataset}_{NATIVE_RES}',
        ext='parquet',
    )

    try:
        mapper = fs.get_mapper(fmt_zarr)
        ds = xr.open_zarr(mapper, consolidated=True)
    except (FileNotFoundError, GroupNotFoundError, IOError, KeyError):
        if fs.isdir(fmt_zarr):
            raise IOError(f'Error reading {fmt_zarr}')

        source_fp = source_patt.format(
            CRS_SUPPORT_BUCKET=CRS_SUPPORT_BUCKET,
            category=category,
            dataset=dataset,
            dataset_underscore=dataset.replace('-', '_'),
            native_res=NATIVE_RES,
        )

        with xr.open_rasterio(source_fp, chunks={'x':  2160, 'y': 2160}) as da:
            ds = da.to_dataset(name=varname)
            ds.attrs.update({
                'method': 'Converted to zarr array from original GeoTiff for ease of access. No other modifications made.',
                'updated': pd.Timestamp.now(tz='US/Pacific').strftime('%c (%Z)'),
                'version': 'v4r11',
                'url': url,
                'citation': citation,
            })

            ds.to_zarr(fmt_zarr, consolidated=True)

        mapper = fs.get_mapper(fmt_zarr)
        ds = xr.open_zarr(mapper, consolidated=True)

    try:
        df = ddf.read_parquet(fmt_parquet)
    except (FileNotFoundError, IOError):
        df = ds.to_dask_dataframe()
        df = df[df[varname] > 0]
        df.repartition(partition_size='200MB')
        df.to_parquet(fmt_parquet)

        df = ddf.read_parquet(fmt_parquet)
        
    for RES, RES_NAME in tqdm([
        (0.1, '0.1degree'),
        (0.125, '0.125degree'),
        (0.25, '0.25degree'),
        (0.5, '0.5degree'),
        (1, '1degree'),
    ], desc='coarsen', leave=False):

        fmt_coarsened_parquet = output_data.format(
            CRS_SUPPORT_BUCKET=CRS_SUPPORT_BUCKET,
            kind='coarsened',
            category=category,
            dataset=dataset,
            native_res=NATIVE_RES,
            name=f'{dataset}_{RES_NAME}',
            ext='parquet',
        )

        fmt_coarsened_zarr = output_data.format(
            CRS_SUPPORT_BUCKET=CRS_SUPPORT_BUCKET,
            kind='coarsened',
            category=category,
            dataset=dataset,
            native_res=NATIVE_RES,
            name=f'{dataset}_{RES_NAME}',
            ext='zarr',
        )

        try:
            xr.open_zarr(fmt_coarsened_zarr, consolidated=True)
            continue
        except (FileNotFoundError, GroupNotFoundError):
            pass

        grouped = df[[varname]].assign(lat=(((df.y // RES) + 0.5) * RES), lon=(((df.x // RES) + 0.5) * RES)).groupby(['lat', 'lon']).sum()
        grouped.reset_index(drop=False).repartition(partition_size='200MB').to_parquet(fmt_coarsened_parquet)

        coarsened = grouped.compute().to_xarray().fillna(0)
        coarsened[varname].attrs.update({
            'long_name': varname,
            'units': unit,
            'crs': ds[varname].attrs['crs'],
        })

        coarsened.attrs.update(ds.attrs)
        coarsened.attrs.update({
            'method': f'{varname} summed from 30-as source data',
            'resolution_degrees': RES,
            'resolution_description': RES_NAME,
        })

        coarsened.to_zarr(fmt_coarsened_zarr, consolidated=True)

dataset:   0%|          | 0/3 [00:00<?, ?it/s]

coarsen:   0%|          | 0/5 [00:00<?, ?it/s]

1. Consolidating metadata in this existing store with zarr.consolidate_metadata().
2. Explicitly setting consolidated=False, to avoid trying to read consolidate metadata, or
3. Explicitly setting consolidated=True, to raise an error in this case instead of falling back to try reading non-consolidated metadata.
  xr.open_zarr(fmt_coarsened_zarr)


coarsen:   0%|          | 0/5 [00:00<?, ?it/s]

coarsen:   0%|          | 0/5 [00:00<?, ?it/s]

In [15]:
client.restart()
cluster.scale(0)
client.close()
cluster.close();