# Data Access on the Planetary Computer

In this notebook, we'll take a whirlwind tour of accessing geospatial data in many flavors. A few things to note as we go through it:

1. We'll be using cloud-friendly formats
    - We'll stream the data directly from Blob Storage into memory. No downloading to local disk!
2. We'll always start with the STAC API
    - No need to remember URLs / paths in blob storage

In [None]:
import urllib.request
import operator
import functools
import warnings

import dask.distributed
import pystac_client
import planetary_computer
import stackstac
import numpy as np
import geopandas
import dask.dataframe
import dask_geopandas
import fsspec
import xarray as xr
import cartopy.crs as ccrs
import matplotlib.pyplot as plt
import PIL
import seaborn as sns
import pyproj
import pdal
import shapely.geometry
import shapely.ops

warnings.filterwarnings("ignore", message="pandas.Float64Index")

We'll make a `catalog` client to interact with the Planetary Computer's STAC API.

In [None]:
catalog = pystac_client.Client.open(
    "https://planetarycomputer.microsoft.com/api/stac/v1"
)

And we'll make a local Dask "cluster" to do some computations in parallel.

In [None]:
client = dask.distributed.Client()
client

## Raster Data

Raster data is typically stored as Cloud Optimized GeoTIFF. Some examples include

* Satellite imagery / aerial photography
    - [Landsat C2-L2](https://planetarycomputer.microsoft.com/dataset/landsat-8-c2-l2)
    - [Sentinel 2 L2A](https://planetarycomputer.microsoft.com/dataset/sentinel-2-l2a)
    - [NAIP](https://planetarycomputer.microsoft.com/dataset/naip)
* Land use / land cover
    - [Esri / IO 10-Meter Land Cover](https://planetarycomputer.microsoft.com/dataset/io-lulc-9-class)
    - [Land Cover of Canada](https://planetarycomputer.microsoft.com/dataset/nrcan-landcover)
* Elevation
    - [COP DEM](https://planetarycomputer.microsoft.com/dataset/cop-dem-glo-30)
    - [NASADEM](https://planetarycomputer.microsoft.com/dataset/nasadem)
* "Derived variables"
    - [Chloris Biomass](https://planetarycomputer.microsoft.com/dataset/chloris-biomass)
    - [HGB](https://planetarycomputer.microsoft.com/dataset/hgb)
    - [HREA](https://planetarycomputer.microsoft.com/dataset/hrea)



Here we use the STAC API to search for Sentinel-2 scenes matching some spatio-temporal query. We're even able to query on additional properties in the STAC metadata, like the `cloud_cover`.

In [None]:
search = catalog.search(
    bbox=[-122.28, 47.55, -121.96, 47.75],
    datetime="2020-01-01/2020-12-31",
    collections=["sentinel-2-l2a"],
    query={"eo:cloud_cover": {"lt": 25}},
)

items = search.get_all_items()
print(len(items))

Whenever you're working with the Planetary Computer, you need to "sign" your items / assets. This appends a read-only SAS token to the data, so that we can fetch the data from Blob Storage.

In [None]:
signed_items = planetary_computer.sign(items)

STAC is all about *metadata*. So these STAC items are just some objects with links to the actual data (COGs in this case). There are lots of ways to go from a URL to an image, or from a list of STAC items to an image. In this case, we'll use `stackstac` which lets you stack your STAC items into an `xarray.DataArray`.

In [None]:
data = stackstac.stack(
    signed_items,
    assets=["B04", "B08"],  # red, nir
    resolution=100,
).where(
    lambda x: x > 0, other=np.nan
)  # sentinel-2 uses 0 as nodata
data

Let's do a little computation: we'll compute NDVI:

In [None]:
red = data.sel(band="B04")
nir = data.sel(band="B08")

ndvi = (red - nir) / (red + nir)

And we'll plot it for the first time slice:

In [None]:
x = ndvi.isel(time=0).persist()
m = stackstac.show(x, range=(-0.9, 0.9))
m.scroll_wheel_zoom = True
m

## Earth systems data

These datasets are typically stored as Zarr or NetCDF.

* Climate model output
    - Terraclimate, gridMet, Daymet, NEX-GDDP-CMIP6
* Reanalysis
    - ERA5
* Observations
    - GPM IMERG

In this example, we'll load up some data from Terraclimate.

In [None]:
terraclimate = catalog.get_collection("terraclimate")
asset = terraclimate.assets["zarr-https"]


store = fsspec.get_mapper(asset.href)
ds = xr.open_zarr(store, **asset.extra_fields["xarray:open_kwargs"])
ds

Again, we have a DataArray. We can select the last time slice and plot `tmax` for the globe.

In [None]:
average_max_temp = ds.isel(time=-1)["tmax"].coarsen(lat=8, lon=8).mean().load()

fig, ax = plt.subplots(figsize=(20, 10), subplot_kw=dict(projection=ccrs.Robinson()))

average_max_temp.plot(ax=ax, transform=ccrs.PlateCarree())
ax.coastlines();

## Operational forecast data

The Planetary Computer also includes some operational weather forecast data. These are typically stored as Zarr or GRIB2. In this example we'll load some data from the ECMWF's Open Data program (using the staging API. It'll be available through the production API soon)

In [None]:
staging_catalog = pystac_client.Client.open(
    "https://planetarycomputer-staging.microsoft.com/api/stac/v1"
)
search = staging_catalog.search(
    collections=["ecmwf-forecast"],
    query={
        "ecmwf:stream": {"eq": "wave"},
        "ecmwf:type": {"eq": "fc"},
        "ecmwf:step": {"eq": "0h"},
    },
)
items = search.get_all_items()
item = items[0]
item

The GRIB2 file format isn't really cloud-friendly. We're working on that, but in the meantime we'll download the file to disk and load it from there. Again into an `xarray.DataArray`.

In [None]:
url = item.assets["data"].href
filename, _ = urllib.request.urlretrieve(url)

ds = xr.open_dataset(filename, engine="cfgrib")
ds

Let's make another plot, thise time for "significant wave height".

In [None]:
projection = projection = ccrs.Robinson()
fig, ax = plt.subplots(figsize=(16, 9), subplot_kw=dict(projection=projection))

ds.swh.plot(ax=ax, transform=ccrs.PlateCarree());

Or we can plot the join distribution of the "mean wave period" and "significant wave height".

In [None]:
grid = sns.jointplot(
    x=ds.mwp.data.ravel(), y=ds.swh.data.ravel(), alpha=0.25, marker=".", height=12
)
grid.ax_joint.set(xlabel="Mean wave period", ylabel="Significant wave height");

## Tabular data

These are typically stored in Apache Parquet, using the geoparquet standard where appropriate.

- US Census
- Forest Inventory and Analysis
- gNATSGO tables

In this example, we'll load up the Census 2020 Congressional District boundaries.

In [None]:
search = catalog.search(collections=["us-census"])
items = planetary_computer.sign(search.get_all_items())
items = {x.id: x for x in items}
item = items["2020-cb_2020_us_cd116_500k"]
item

That STAC item as a link a Parquet file in Blob Storage. We'll load it up with geopandas.

In [None]:
asset = item.assets["data"]
df = geopandas.read_parquet(
    asset.href, storage_options=asset.extra_fields["table:storage_options"]
)
df

And plot the districts for Maryland (with a state FIPS code of `24`).

In [None]:
maryland = df[df.STATEFP == "24"].astype({"GEOID": "category"})
maryland.explore(column="GEOID")

The largest datasets from this collection are at the census-block level. This datasets would be too large to load with pandas or geopandas, which wants all of the data to fit in RAM. So we'll use `dask_geopandas` to load in the geometries data.

In [None]:
asset = items["2020-census-blocks-geo"].assets["data"]

geo = dask_geopandas.read_parquet(
    asset.href,
    storage_options=asset.extra_fields["table:storage_options"],
)
geo

And `dask.dataframe` to load in the population data.

In [None]:
asset = items["2020-census-blocks-population"].assets["data"]

pop = dask.dataframe.read_parquet(
    asset.href,
    storage_options=asset.extra_fields["table:storage_options"],
)
pop

We can join those together.

In [None]:
df = geo.join(pop)
df

Notice that all those operations were instant. Dask is (mostly) lazy, so it only evaluates when you ask it to.

Theses census-block levels are actually parquet datasets (a folder of files) partitioned by state. So we can do things at a state-level without having to look at the rest of the data.

In [None]:
start = [x for x in geo.divisions if x.startswith("44")][0]
stop = "4499"

ri = geo.loc[start:stop].compute()
ri.head()

## Point-cloud data

Typically stored as COPC.

In [None]:
bean = {"type": "Point", "coordinates": [-87.623358, 41.8826812]}

geom = shapely.geometry.shape(bean)

utm = pyproj.crs.CRS.from_epsg(32616)  # UTM zone for Chicago
wgs84 = pyproj.CRS("EPSG:4326")

project_dd_to_utm = pyproj.Transformer.from_crs(wgs84, utm, always_xy=True).transform
project_utm_to_dd = pyproj.Transformer.from_crs(utm, wgs84, always_xy=True).transform

utm_point = shapely.ops.transform(project_dd_to_utm, geom)
window = utm_point.buffer(400)

window_dd = shapely.ops.transform(project_utm_to_dd, window)

df = geopandas.GeoDataFrame(geometry=[window_dd], crs="EPSG:4326")

df.explore()

In [None]:
# The Bean
bean = shapely.geometry.shape(
    {"type": "Point", "coordinates": [-87.623358, 41.8826812]}
).buffer(0.005)

# The *test* API. Really bleeding-edge now.

test_catalog = pystac_client.Client.open(
    "https://pct-apis-staging.westeurope.cloudapp.azure.com/stac/"
)

search = test_catalog.search(collections=["3dep-lidar-copc"], intersects=bean)
ic = search.get_all_items()

# Filter out for only the Cook County LiDAR collections
cook = []
for item in ic:
    if "Cook" in item.id:
        cook.append(item)

signed = [planetary_computer.sign(i) for i in cook]

In [None]:
OUTPUT_RESOLUTION = 2.0
READ_RESOLUTION = 2.0
polygon = window.wkt + f" / EPSG:{utm.to_epsg()}"

readers = []
for tile in signed:
    url = tile.assets["data"].href
    reader = pdal.Reader.copc(
        url, requests=3, resolution=READ_RESOLUTION, polygon=polygon
    )
    readers.append(reader)


assign = pdal.Filter.assign(value=["Intensity = Intensity / 256"])

writer = pdal.Writer.gdal(
    "intensity.tif",
    resolution=OUTPUT_RESOLUTION,
    dimension="Intensity",
    data_type="uint8",
    output_type="mean",
)

In [None]:
pipeline = None

pipeline = functools.reduce(operator.or_, readers)
pipeline |= assign | writer

In [None]:
%%time

# Use streaming mode at 1e6 points at a time. This
# helps us conserve memory for pipelines that are streamable
# check that with the pipeline.streamable property
results = pipeline.execute_streaming(chunk_size=1000000)
print(pipeline.log)

# the last stage of our pipeline is the writer, and the 'dimension'
# option on the writer is what we want to print
dimension = pipeline.stages[-1].options["dimension"]
print(f"Number of points returned for dimension {dimension}: {results}")

In [None]:
PIL.Image.open("intensity.tif")