In this notebook, we will discuss streaming large datasets into our Python environment. Traditionally, if you wanted to work with raster or shapefiles, you needed to download them beforehand and then import them directly from your local disk. There are two issues with this traditional approach. The first issue is the time spent downloading the file, and the second is the size of the file, which may end up being enormous. You may not want to work with the entire shapefile or raster file. Instead, you might only want to focus on a specific bounding box within that file. So, why spend time and memory on parts of the data that you don't need? This is why we would like to discuss streaming data directly into your script.

Now, let's focus on HRDEM collections in Datacube. These collections are in very fine resolutions, which means they contain valuable information for your projects and can improve the accuracy of geomatics calculations. However, they are huge! You need to spend a lot of time downloading them, and when you import them into your Python script, right off the bat, you may run into memory issues. How can we efficiently solve this issue?

The first step is to define the address of the Datacube STAC API endpoint. Then, we need to establish a connection to the API endpoint in order to access the collections within that endpoint. We use pystac_client, which is a Python package for working with STAC Catalogs and APIs that conform to the STAC and STAC API specs. PySTAC Client builds upon PySTAC through higher-level functionality and ability to leverage STAC API search endpoints.

In [3]:
import pystac_client
DC_URL = 'https://datacube.services.geo.ca/stac/api/'
# We establish a connection
catalog = pystac_client.Client.open(DC_URL)  

In [66]:
# List collections
collections = list(catalog.get_collections())

# Print sorted collection IDs
for collection in sorted(collections, key=lambda col: col.id):
    print(collection.id)

canadian-wetland-inventory
cancvi
cdem
cdsm
dynamic-surface-water-annual
dynamic-surface-water-compilation
et-month
et-year
flood-susceptibility
hrdem-arcticdem
hrdem-lidar
hrdem-mosaic-1m
hrdem-mosaic-2m
inland-water
landcover
monthly-vegetation-parameters-20m-v1
mrdem-30
msi
napl-halifax
napl-markham
napl-ottawa
napl-regina
napl-ring-of-fire
napl-salish
napl-tuktoyaktuk
napl-victoria
pet-month
pet-year
rcm-ard-mosaic
river-ice-canada-archive
surface-water-frequency
vegetation


Now, with the catalog object, we are able to search the endpoint for any collection, bbox, date, etc., to find the data that we want to stream. You can think of this as manually using the STAC browser. 

Now we need to define our filters. How should all the data in the STAC API be filtered so that we only get the one(s) we want? First, we start with the data collection or collections that we want to search.

In [67]:
collections = ['hrdem-lidar']

Then, we can define a filter for temporal search and a filter for spatial search.

In [68]:
# Get collection metadata
for collection in collections:
    
    cltn = catalog.get_collection(collection)
    
    # Check bbox and CRS
    print("Collection ID:", cltn.id)
    print("Bounding Box:", cltn.extent.spatial.bboxes)  # Check bbox values


Collection ID: hrdem-lidar
Bounding Box: [[-136.66278783043433, 43.078890520216284, -56.47686150258771, 69.48173760623511]]


For the temporal search, we define the start date and an end date (inclusive) between which we want to retrieve data.

In [69]:
datetime_range = '2019-01-01/2024-12-31'

For the spatial search, we can either define a bbox or an AOI polygon for the area of interest. They should be in **lat/lon**.

In [70]:
bbox = [-114.00, 51.24, -113.94, 51.33]

In [71]:
aoi_polygon = {
    "type": "Polygon",
    "coordinates": [
        [
            [bbox[0], bbox[1]],  
            [bbox[2], bbox[1]],  
            [bbox[2], bbox[3]],
            [bbox[0], bbox[3]],
            [bbox[0], bbox[1]]   
        ]
    ]
}

Now, with these three basic filters, we can search the STAC API endpoint to retrieve the STAC items that may be available.

In [72]:
stac_items     = catalog.search(collections = collections, 
                                bbox = bbox, 
                                datetime = datetime_range
                               )

One STAC item has been found, and now it's time to read or stream the data into our script.

To stream the data into our script, we have three options:

-  Using the **odc-stac** library, which reads data and indirectly uses **GDAL** for handling geospatial data formats like GeoTIFFs or other raster-based formats.
-  Using the **stackstac.stack** library, which reads data and indirectly uses **GDAL** for handling geospatial data formats like GeoTIFFs or other raster-based formats.
-  Using the **rioxarray** library, which reads data and indirectly uses **rasterio** for handling geospatial data formats like GeoTIFFs or other raster-based formats.

But before diving into their differences and the requirements that should be met, what is the common feature of all these libraries? The common feature is the fact that these libraries read data directly into **Xarrays**, not NumPy arrays.

Xarray is built on top of Numpy and Pandas and adds extra capabilities for working with labeled, multi-dimensional arrays (such as climate data, satellite images, etc.).
It extends Numpy arrays by allowing metadata (coordinates, labels, etc.) to be attached to arrays. This makes it easier to work with data where dimensions have meaningful names, such as time, latitude, longitude, and variables.
In addition, Xarrays can be backed by Dask arrays, which support lazy evaluation. This means you can work with large datasets that don't fit in memory, performing computations while deferring the actual data reading until the end of the computation. Dask optimizes the workflow by parallelizing tasks, allowing efficient memory usage. It reads data in chunks rather than loading the entire dataset into memory at once, helping to improve both performance and memory efficiency.

- odc-stac:
  This library is primarily built for integrating STAC (SpatioTemporal Asset Catalog) items with the Open Data Cube (ODC) ecosystem. It is part of the ODC framework, which is used for managing and analyzing large-scale geospatial data.

- stackstac:
   stackstac is focused on efficiently stacking geospatial data from STAC (like satellite images or other temporal data) into a time-series or multi-temporal stack. It is lightweight compared to odc-stac and more specialized for handling STAC items and data for large-scale, multi-temporal analysis.

-  rioxarray:
  rioxarray is a general-purpose library for working with raster data in the xarray format. It integrates rasterio for reading and writing geospatial formats (like GeoTIFF) and leverages xarray for handling multi-dimensional arrays of data, which is useful for operations on raster datasets that involve multiple bands or time series.


Before diving into each of them, there is a requirement that should be met when we read HRDEM collections from the DataCube STAC API using **odc-stac** and **stackstac**, which are based on GDAL. GetGeoTransform and rasterio use different formats for reporting transform information. Order expected in proj:transform is the same as reported by rasterio. When using GDAL method you need to re-order in the following way:

In [73]:
def reorder_transform(gdal_transform):
    """
    Reorders the GDAL GeoTransform (6-element tuple) into the 9-element format
    that is compatible with proj:transform.
    """
    return [gdal_transform[1], gdal_transform[2], gdal_transform[0],  # x scale (a), rotation about y (b), x-coordinate of upper left (c)
            gdal_transform[4], gdal_transform[5], gdal_transform[3],  # rotation about x (d), y scale (e), y-coordinate of upper left (f)
            0, 0, 1]  # The last part is fixed for affine transformation matrix (0, 0, 1)

In [79]:
stac_found_items = [item for item in stac_items.items()]
for item in stac_found_items:
    if "proj:transform" in item.properties:
        proj_transform = item.properties["proj:transform"]
        if isinstance(proj_transform, list):
            
            reordered_transform = reorder_transform(proj_transform)
            item.properties["proj:transform"] = reordered_transform
            print(f"Updated proj:transform for item {item.id}")
    else:
        print(f"Item {item.id} does not have proj:transform in properties.")

Updated proj:transform for item NRCAN-Calgary_East_utm12_2020-1m
Updated proj:transform for item NRCAN-Calgary_West_utm11_2020-1m


Now, we begin by reading the data using odc-stac. Beforehand, we may want to review the available bands for the item(s) and then select only the bands that we need.

In [77]:
for item in stac_found_items:
    print(item.assets.keys())

dict_keys(['dsm', 'dtm', 'extent', 'dsm-vrt', 'dtm-vrt', 'coverage', 'thumbnail'])
dict_keys(['dsm', 'dtm', 'extent', 'dsm-vrt', 'dtm-vrt', 'coverage', 'thumbnail'])


In [78]:
import odc.stac

xrDS = odc.stac.load(stac_found_items,
                     chunks     = {"x": 1000, "y": 1000},
                     bands      = ["dsm", "dtm"],
                     bbox       = bbox,
                     resolution = 1.0)

In [12]:
xrDS

Unnamed: 0,Array,Chunk
Bytes,281.45 MiB,390.62 kiB
Shape,"(1, 10713, 6887)","(1, 1000, 100)"
Dask graph,759 chunks in 3 graph layers,759 chunks in 3 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 281.45 MiB 390.62 kiB Shape (1, 10713, 6887) (1, 1000, 100) Dask graph 759 chunks in 3 graph layers Data type float32 numpy.ndarray",6887  10713  1,

Unnamed: 0,Array,Chunk
Bytes,281.45 MiB,390.62 kiB
Shape,"(1, 10713, 6887)","(1, 1000, 100)"
Dask graph,759 chunks in 3 graph layers,759 chunks in 3 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,281.45 MiB,390.62 kiB
Shape,"(1, 10713, 6887)","(1, 1000, 100)"
Dask graph,759 chunks in 3 graph layers,759 chunks in 3 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 281.45 MiB 390.62 kiB Shape (1, 10713, 6887) (1, 1000, 100) Dask graph 759 chunks in 3 graph layers Data type float32 numpy.ndarray",6887  10713  1,

Unnamed: 0,Array,Chunk
Bytes,281.45 MiB,390.62 kiB
Shape,"(1, 10713, 6887)","(1, 1000, 100)"
Dask graph,759 chunks in 3 graph layers,759 chunks in 3 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray


So, we streamed our data using odc.stac.load. There are a couple of arguments that helped us streamline the data stream according to our filters and preferences.

1. **bands**: As you can see, each STAC item has 9 bands: ['dsm', 'dtm', 'extent', 'dsm-vrt', 'dtm-vrt', 'coverage', 'thumbnail', 'hillshade-dsm', 'hillshade-dtm']. Importing unnecessary bands may cause memory and speed issues. We can benefit from the bands argument and only import the bands that we need, disregarding the rest.
2. **bbox**: Instead of importing the large HRDEM dataset, we can import only the region of interest. This helps us manage the speed and memory usage of our program.
3. **resolution** : This argument should be defined whenever we have the bbox argument set.
4. **chunks** : This is an interesting argument, and you need to set this whenever you want your xarray to be backed by a dask array. This means whenever you want to enable lazy loading and benefit from dask's parallel computing under the hood. The chunk size is essentially the definition for dask on how to read data. When we set 1000 for both x and y, dask reads a box of 100 by 100 of the data, instead of reading the entire array. And with parallel computing, it reads multiple chunks simultaneously. So, you're increasing speed and controlling memory usage.

When you look at xrDS, you do not see the data in the dask arrays. At this point, the data has not been read **yet**. We've defined a lens to look at the data. The metadata, the array shape, and everything are set, except the data itself. You can run calculations on these datasets and get new xarrays (dask arrays), but the data is not yet loaded. When do we get the data?

Imagine you have xrDS, and then you run Calculations A, B, and C to get xrDS_C. Now you actually want to see the data. What you need to do is run **.compute()** on xrDS_C, and as soon as you do that, data starts flowing into the workflow you've created. Each of those intermediate arrays gets their data and modifies the data until you get xrDS_C.

This is cool because when you create a dask workflow and then run .compute() on the last array, dask optimizes everything for you. It evaluates the workflow and runs it in the most efficient way computationally. So, there is a lot of optimization regarding resource usage!

In [None]:
xrDS.compute()

Now the data is there!

We continue by reading the data using stackstac. 

In [None]:
import stackstac
xrDS_ss = stackstac.stack(stac_found_items, 
                          assets        = ["dsm", "dtm"], 
                          resolution    = 1.0,
                          bounds_latlon = bbox, 
                          chunksize     = (1000, 1000))

In [14]:
xrDS_ss

Unnamed: 0,Array,Chunk
Bytes,2.20 GiB,7.63 MiB
Shape,"(2, 2, 10713, 6887)","(1, 1, 1000, 1000)"
Dask graph,308 chunks in 3 graph layers,308 chunks in 3 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 2.20 GiB 7.63 MiB Shape (2, 2, 10713, 6887) (1, 1, 1000, 1000) Dask graph 308 chunks in 3 graph layers Data type float64 numpy.ndarray",2  1  6887  10713  2,

Unnamed: 0,Array,Chunk
Bytes,2.20 GiB,7.63 MiB
Shape,"(2, 2, 10713, 6887)","(1, 1, 1000, 1000)"
Dask graph,308 chunks in 3 graph layers,308 chunks in 3 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


The arguments seem pretty much alike. But with stackstac, you'll get many more attributes saved in your xarray.

In [None]:
xrDS_ss.compute()

The last option is streaming data using rioxarray. This library is based on rasterio, and it can read data in xarray format as well. For streaming the data using rioxarray, we do not have as many options for defining the bbox or resolution. What we work with is the href of our STAC items, and after streaming the whole dataset, we can clip it to the bounding box of interest.

In addition, we cannot stack the items. We need to read each item into a separate xarray dataset. So, we start with the first STAC item in our STAC items list.

In [None]:
dtm_url = None
for asset_key, asset in stac_items[0].assets.items():
    print(f"URL for {asset_key} is {asset.href}")
    if asset_key == "dtm":
        dtm_url = asset.href

In [None]:
import rioxarray
bbox_pr = [-1296660.0, 443357.0, -1289951.0, 452669.35559263]
xds_rio = rioxarray.open_rasterio(
        dtm_url,
        chunks={'x': 1000, 'y': 1000},
).rio.clip_box(*bbox_pr)

xds_rio

Along with the mentioned points for when we work with rioxarray, another important point is that the bbox used for clipping data should be in the projected coordinate system, not in lat/lon coordinates.

So we have introduced three different ways and libraries of streaming Datacube collection datasets into your Python scripts instead of downloading them beforehand. In the next notebook, we will show some examples of basic classifications on the Dask array that we just created.