In [1]:
import geoengine as ge
import math
from datetime import datetime


## Connect to the Geo Engine

In [2]:
ge.initialize("http://pc12730.mathematik.uni-marburg.de:3031/api", credentials=("admin@localhost", "adminadmin"))

In [3]:
session = ge.get_session()
user_id = session.user_id
session

Server:              http://pc12730.mathematik.uni-marburg.de:3031/api
User Id:             d5328854-6190-4af9-ad69-4e74b0961ac9
Session Id:          6626ddb1-6bd5-44cc-9063-1113c00f6ac4
Session valid until: 2024-04-24T20:25:58.595Z

Some information about the bands we want to use:

In [4]:
band_resolutions = {
    "10": ["B02", "B03", "B04", "B08"],    
    "20": ["B11", "B12", "SCL"]
}

band_names = ["B02", "B03", "B04", "B08", "B11", "B12"]
scl_name = "SCL"

def get_band_resolution(band_name):
    if band_name =="NDVI":
        return 10

    for (res, bands) in band_resolutions.items():
        if band_name in bands:
            return float(res)
    return None

def get_resoluton_bands(res):
    return band_resolutions[res]

get_band_resolution("B02")

10.0

The tiles we want to use and way to modify the bounds to match the pixel resolution of the band (also use a power of 2 because it is nice)

In [5]:
def next_power_of_2(x):
    return 1 if x == 0 else 2**math.ceil(math.log2(x))

def better_tile_bounds(xmin, ymin, xmax, ymax, res):
    size_x = xmax - xmin
    size_y = ymax - ymin
    x_start = int(xmin/res)*res
    y_start = int(ymin/res)*res
    x_end = x_start + next_power_of_2(size_x/res)*res
    y_end = y_start + next_power_of_2(size_y/res)*res
    return [x_start, y_start, x_end, y_end]


tiles = {
    "1": [677434.81148609332740307, 5845298.07400647643953562, 717747.09661422134377062, 5885618.66241065505892038], # 1
    "2": [683873.44084652571473271, 5368245.35890970937907696, 724127.46885261742863804, 5408501.99355369247496128], # 2
    "3": [441200.25685675046406686, 5683048.10977123398333788, 481481.32107784802792594 , 5723328.07085583545267582], # 3
    "4": [364967.8289228081703186,  5443493.64452091883867979, 405243.77002240129513666,  5483803.21444271318614483], # 4
    "5": [478746.69248974783113226, 5842533.08955569006502628, 519031.87294624710921198, 5882816.76165715791285038],  # 5
    "6": [555410.59772756404709071, 6042341.78814582619816065, 595702.63884835026692599, 6082661.56917595490813255], # 6
    "7": [560953.47924317768774927, 5644941.74779673106968403, 601231.64683432993479073, 5685220.59041254408657551], # 7
    "8": [717747.09661422134377062, 5806104.44838424492627382, 758067.72125238052103668, 5846427.60958131216466427], # 8
    "9": [403628.38119195553008467, 5523546.04429935291409492, 443903.65706163534196094, 5563838.3149972390383482], # 9
    "10": [475968.06303980643860996, 6041234.39164876379072666, 516251.6795115998829715, 6081531.74282156955450773] # 10
}

max_pixel_size = int(get_band_resolution("B12"))

better_tiles = {b: better_tile_bounds(*tiles[b], max_pixel_size) for b in tiles}

for (tile, tile_bounds) in better_tiles.items():
    print(tile, tile_bounds)
    [xmin, ymin, xmax, ymax] = tile_bounds
    size_x = xmax - xmin
    size_y = ymax - ymin
    print(tile, size_x, size_y)

1 [677420, 5845280, 718380, 5886240]
1 40960 40960
2 [683860, 5368240, 724820, 5409200]
2 40960 40960
3 [441200, 5683040, 482160, 5724000]
3 40960 40960
4 [364960, 5443480, 405920, 5484440]
4 40960 40960
5 [478740, 5842520, 519700, 5883480]
5 40960 40960
6 [555400, 6042340, 596360, 6083300]
6 40960 40960
7 [560940, 5644940, 601900, 5685900]
7 40960 40960
8 [717740, 5806100, 758700, 5847060]
8 40960 40960
9 [403620, 5523540, 444580, 5564500]
9 40960 40960
10 [475960, 6041220, 516920, 6082180]
10 40960 40960


For convenience, the tiles are stored as different datasets. Here is a simple way to resolve there names:

In [6]:
def get_dataset_name(user_id, tile, band):
    band_resolution = int(get_band_resolution(band))
    return f"{user_id}:lena_sentinel2_{band_resolution}m_tile_{tile}_band_{band}_2022_2023"

get_dataset_name(user_id, "10", "B02")

'd5328854-6190-4af9-ad69-4e74b0961ac9:lena_sentinel2_10m_tile_10_band_B02_2022_2023'

The Geo Engine uses a `QueryRectangle` to request the data. It specifies the bounding box of the area of interest and the time range. (The resolution is also specified here, but this will change in the near future)


In [7]:
def create_query(bounds, resolution, time_start, time_end):
    [xmin, ymin, xmax, ymax] = bounds
    return ge.QueryRectangle(
        spatial_bounds=ge.BoundingBox2D(xmin, ymin, xmax, ymax),
        time_interval=ge.TimeInterval(time_start, time_end),
        resolution=ge.SpatialResolution(resolution, resolution),
        srs="EPSG:32632"
    )

time_start = datetime(2022, 1, 1)
time_end = datetime(2023, 1, 1)
    
create_query(better_tiles["10"], 10.0, time_start, time_end)

QueryRectangle( 
    BoundingBox2D(xmin=475960, ymin=6041220, xmax=516920, ymax=6082180)
    TimeInterval(start=2022-01-01T00:00:00.000000, end=2023-01-01T00:00:00.000000)
    SpatialResolution(x=10.0, y=10.0)
    srs=EPSG:32632 
)

In [None]:
from typing import cast


class RasterWorkflowGdalWriter:
    current_dataset: gdal.Dataset = None
    current_time: ge.TimeInterval = None
    dataset_geo_transform = None
    dataset_width = None
    dataset_height = None
    dataset_data_type = gdal.GDT_Float32
    print_info = False


    dataset_prefix = None
    workflow: ge.Workflow = None
    bands = None
    no_data_value = 0 # TODO: add type cast
    time_format = "%Y-%m-%d_%H-%M-%S"

    gdal_driver = "GTiff"
    options = ["TILED=YES", "COMPRESS=DEFLATE", "ZLEVEL=9"]
    tile_size = 512
    
    def __init__(self, dataset_prefix, workflow: ge.Workflow, no_data_value=0, data_type=gdal.GDT_Float32, print_info=False):
        self.dataset_prefix = dataset_prefix
        self.workflow = workflow
        self.no_data_value = no_data_value
        self.dataset_data_type = data_type
        self.print_info = print_info

        ras_res = cast(ge.RasterResultDescriptor, self.workflow.get_result_descriptor())
        self.bands = ras_res.bands

    def close_current_dataset(self):
        if self.current_dataset:
            self.current_dataset = None

    def create_tiling_geo_transform_width_height(self, query: ge.QueryRectangle):

        ul_x = query.spatial_bounds.xmin
        ul_y = query.spatial_bounds.ymax
        lr_x = query.spatial_bounds.xmax
        lr_y = query.spatial_bounds.ymin
        res_x = query.spatial_resolution.x_resolution
        res_y = query.spatial_resolution.y_resolution * -1 # honor the fact that the y axis is flipped

        assert res_y < 0, "The y resolution must be negative"

        assert ul_x < lr_x, "The upper left x coordinate must be smaller than the lower right x coordinate"
        assert ul_y > lr_y, "The upper left y coordinate must be greater than the lower right y coordinate"

        ul_pixel_x = ul_x / res_x # we can assume that the global origin is 0,0
        ul_pixel_y = ul_y / res_y
        lr_pixel_x = lr_x / res_x
        lr_pixel_y = lr_y / res_y

        assert ul_pixel_x < lr_pixel_x, "The upper left pixel x must be smaller than the lower right pixel x"
        assert ul_pixel_y < lr_pixel_y, "The upper left pixel y must be smaller than the lower right pixel y"

        tiling_ul_pixel_x = (ul_pixel_x // self.tile_size) * self.tile_size
        if ul_pixel_x % self.tile_size != 0:
                tiling_ul_pixel_x = ((ul_pixel_x // self.tile_size) -1 ) * self.tile_size
        
        tiling_ul_pixel_y = (ul_pixel_y // self.tile_size) * self.tile_size
        if ul_pixel_y % self.tile_size != 0:            
                tiling_ul_pixel_y = ((ul_pixel_y // self.tile_size) -1 ) * self.tile_size

        assert tiling_ul_pixel_x <= ul_pixel_x, "The tiling upper left x pixel must be smaller than the upper left x coordinate"
        assert tiling_ul_pixel_y <= ul_pixel_y, "The tiling upper left y pixel must be smaller than the upper left y coordinate"
                
        width = int((lr_pixel_x - tiling_ul_pixel_x))
        if width % self.tile_size != 0:
            width = int((width // self.tile_size + 1) * self.tile_size)
        assert width > 0, "The width must be greater than 0"

        height = int((lr_pixel_y - tiling_ul_pixel_y ))
        if height % self.tile_size != 0:
            height = int((height // self.tile_size + 1) * self.tile_size)
        assert height > 0, "The height must be greater than 0"
        
        assert width % self.tile_size == 0, "The width must be a multiple of the tile size"
        assert height % self.tile_size == 0, "The height must be a multiple of the tile size"

        tiling_ul_x_coord = tiling_ul_pixel_x * res_x
        tiling_ul_y_coord = tiling_ul_pixel_y * res_y
        assert tiling_ul_x_coord <= ul_x, "The tiling upper left x coordinate must be smaller than the upper left x coordinate"
        assert tiling_ul_y_coord >= ul_y, "The tiling upper left y coordinate must be greater than the upper left y coordinate"

        geo_transform = (tiling_ul_x_coord, res_x, 0, tiling_ul_y_coord, 0, res_y)

        if self.dataset_geo_transform is None:
            self.dataset_geo_transform = geo_transform
        else:
            assert self.dataset_geo_transform == geo_transform, "The geo transform of the current dataset does not match the new one"

        if self.dataset_width is None:
            self.dataset_width = width
        else:
            assert self.dataset_width == width, "The width of the current dataset does not match the new one"

        if self.dataset_height is None:
            self.dataset_height = height
        else:
            assert self.dataset_height == height, "The height of the current dataset does not match the new one"        


    def create_new_dataset(self, query: ge.QueryRectangle):
        
        time_formated_start = self.current_time.start.astype(datetime).strftime(self.time_format)
        width = self.dataset_width
        height = self.dataset_height
        geo_transform = self.dataset_geo_transform
        if self.print_info:
            print(f"Creating dataset {self.dataset_prefix}{time_formated_start}.tif"
                f" with width {width}, height {height}, geo_transform {geo_transform}")
        
        
        gdal_driver = gdal.GetDriverByName(self.gdal_driver)     
        gdal_dataset = gdal_driver.Create(
            f"{self.dataset_prefix}{time_formated_start}.tif",
            width, height, len(self.bands),
            gdal.GDT_Float32,
            options=self.options
        )
        
        gdal_dataset.SetGeoTransform(geo_transform)
        gdal_dataset.SetProjection(query.srs)

        for i, band in enumerate(self.bands):
            gdal_band = gdal_dataset.GetRasterBand(i+1)
            gdal_band.SetNoDataValue(self.no_data_value)
            gdal_band.SetDescription(f"{band.name} [{band.measurement}]")


        self.current_dataset = gdal_dataset
        

    async def query_and_write(self, query: ge.QueryRectangle):
        self.create_tiling_geo_transform_width_height(query)
        try:
            async for tile in self.workflow.raster_stream(query):
                if self.current_time != tile.time:
                    self.close_current_dataset()
                    self.current_time = tile.time
                    self.create_new_dataset(query)
                
                assert self.current_time == tile.time, "The time of the current dataset does not match the tile"

                tile_ul_x = int((tile.geo_transform.x_min - self.dataset_geo_transform[0] ) / self.dataset_geo_transform[1])
                tile_ul_y = int((tile.geo_transform.y_max - self.dataset_geo_transform[3] )/ self.dataset_geo_transform[5])
 
                band_index = tile.band + 1
                data = tile.to_numpy_data_array(self.no_data_value)

                assert self.tile_size == tile.size_x == tile.size_y, "The tile size does not match the expected tile size"

                self.current_dataset.GetRasterBand(band_index).WriteArray(data, tile_ul_x, tile_ul_y)
        except Exception as e:
            raise Exception(f"Error while processing tile at {tile.spatial_partition()} with {tile.time}") from e

        finally:
            self.close_current_dataset()


            
        


## download raw data

In [9]:
import os
from osgeo import gdal

test_mode = True
download_dir = "./test/raw_data"

for (i, (tile, tb)) in enumerate(better_tiles.items()):
    if test_mode and i > 0:
        break
    for band in band_names + [scl_name]:
        if test_mode and i > 0:
            break
        dataset_name = get_dataset_name(user_id, tile, band)
        workflow = ge.workflow_builder.operators.GdalSource(dataset_name)
        reg_workflow = ge.register_workflow(workflow)

        query = create_query(tb, get_band_resolution(band), time_start, time_end)

        if not os.path.exists(f"{download_dir}{dataset_name[37:]}/"):
            os.makedirs(f"{download_dir}/{dataset_name[37:]}/", exist_ok=True)

        writer = ge.RasterWorkflowGdalWriter(f"{download_dir}/{dataset_name[37:]}/", reg_workflow, no_data_value=0, data_type=gdal.GDT_Int16)

        await writer.query_and_write(query)

CancelledError: 

## Download scaled, cloud free data

In [None]:
import os

test_mode = True
download_dir = "./test/scaled_cloud_free"

def create_cloud_free_scaled_workflow(user_id, tile, band):
    dataset_name = get_dataset_name(user_id, tile, band)
    scl_dataset_name = get_dataset_name(user_id, tile, scl_name)
    workflow =ge.workflow_builder.blueprints.sentinel2_cloud_free_band_custom_input(
        band_dataset=dataset_name,
        scl_dataset=scl_dataset_name,
    )
    workflow = ge.workflow_builder.operators.RasterTypeConversion(workflow, output_data_type="F32") # to float
    workflow = ge.workflow_builder.operators.RasterScaling(workflow, slope=0.00001, offset=0.0) # to reflectance    
    return workflow

for (i, (tile, tb)) in enumerate(better_tiles.items()):
    scl_dataset_name = get_dataset_name(user_id, tile, scl_name)

    if test_mode and i > 0:
        break
    for band in band_names:
        if test_mode and i > 0:
            break
        dataset_name = get_dataset_name(user_id, tile, band)

        workflow = create_cloud_free_scaled_workflow(user_id, tile, band)
        reg_workflow = ge.register_workflow(workflow)

        query = create_query(tb, get_band_resolution(band), time_start, time_end)

        if not os.path.exists(f"{download_dir}{dataset_name[37:]}/"):
            os.makedirs(f"{download_dir}/{dataset_name[37:]}/", exist_ok=True)

        writer = ge.RasterWorkflowGdalWriter(f"{download_dir}/{dataset_name[37:]}/", reg_workflow, no_data_value=0)

        await writer.query_and_write(query)

## Download weekly scaled data + NDVI

In [16]:
import os

test_mode = True
download_dir = "./test/scaled_cloud_free_7days"

def create_cloud_free_scaled_workflow(user_id, tile, band):
    dataset_name = get_dataset_name(user_id, tile, band)
    scl_dataset_name = get_dataset_name(user_id, tile, scl_name)
    workflow =ge.workflow_builder.blueprints.sentinel2_cloud_free_band_custom_input(
        band_dataset=dataset_name,
        scl_dataset=scl_dataset_name,
    )
    workflow = ge.workflow_builder.operators.RasterTypeConversion(workflow, output_data_type="F32") # to float
    workflow = ge.workflow_builder.operators.RasterScaling(workflow, slope=0.0001, offset=0.0) # to reflectance    
    return workflow

def create_cloud_free_scaled_workflow_7day_mean(user_id, tile, band):
    workflow = create_cloud_free_scaled_workflow(user_id, tile, band)
    workflow = ge.workflow_builder.operators.TemporalRasterAggregation(workflow, aggregation_type="mean", granularity='days', window_size=7, ignore_no_data=True)
    return workflow

def create_cloud_free_scaled_workflow_7day_mean_ndvi(user_id, tile):
    nir_workflow = create_cloud_free_scaled_workflow_7day_mean(user_id, tile, "B08")
    red_workflow = create_cloud_free_scaled_workflow_7day_mean(user_id, tile, "B04")
    stacked_workflow = ge.workflow_builder.operators.RasterStacker([nir_workflow, red_workflow])
    ndvi_workflow = ge.workflow_builder.operators.Expression("(A-B)/(A+B)", stacked_workflow, "F32", map_no_data=False)
    return ndvi_workflow


for (i, (tile, tb)) in enumerate(better_tiles.items()):
    scl_dataset_name = get_dataset_name(user_id, tile, scl_name)
    
    for band in band_names:        
        dataset_name = get_dataset_name(user_id, tile, band)

        workflow = create_cloud_free_scaled_workflow_7day_mean(user_id, tile, band)
        reg_workflow = ge.register_workflow(workflow)
        query = create_query(tb, get_band_resolution(band), time_start, time_end)

        if not os.path.exists(f"{download_dir}{dataset_name[37:]}/"):
            os.makedirs(f"{download_dir}/{dataset_name[37:]}/", exist_ok=True)

        writer = ge.RasterWorkflowGdalWriter(f"{download_dir}/{dataset_name[37:]}/", reg_workflow, no_data_value=0)
        await writer.query_and_write(query)

    # ndvi workflow    
    workflow = create_cloud_free_scaled_workflow_7day_mean_ndvi(user_id, tile)
    reg_workflow = ge.register_workflow(workflow)
    query = create_query(tb, 10, time_start, time_end)
    dataset_name = get_dataset_name(user_id, tile, "NDVI")

    if not os.path.exists(f"{download_dir}/ndvi/"):
        os.makedirs(f"{download_dir}/ndvi/", exist_ok=True)
        
    writer = ge.RasterWorkflowGdalWriter(f"{download_dir}/ndvi/", reg_workflow, no_data_value=-2)
    await writer.query_and_write(query)
        