# CDSE Tiled Data Access

This notebook shows how to access data from CDSE programatically, converting it to a possible wanted format (such as Cloud Optimized Geotiff - COG) and lastly registers it into an eoAPI instance.

Following steps are addressed:
* **Searches** for scenes (skipping those already in eoAPI).
* **Downloads** tiles in parallel.
* **Converts** to COG (skipping if file exists).
* **Registers** in eoAPI.

In [None]:
# First we make sure we import all used dependencies
import os
import time
import json
import math
import threading
import concurrent.futures
from datetime import datetime, timedelta
from pathlib import Path
from dataclasses import dataclass

import requests
from oauthlib.oauth2 import BackendApplicationClient
from oauthlib.oauth2.rfc6749.errors import TokenExpiredError
from requests_oauthlib import OAuth2Session

import rasterio
from rasterio.io import MemoryFile
from rasterio.merge import merge
from rasterio.enums import Resampling
from rio_cogeo.cogeo import cog_translate
from rio_cogeo.profiles import cog_profiles
import pystac

from sentinelhub import SHConfig, BBox, bbox_to_dimensions, CRS

In [None]:
# General configuration

# Output directory
output_dir = Path("./bucket/samples/sentinel2_cogs")
output_dir.mkdir(exist_ok=True)

# eoAPI Configuration
EOAPI_URL = "http://eoapi-rw-stac:8080"
STAC_COLLECTION_ID = "sentinel-2-vienna-cdse"
S3_BUCKET = f"s3://{os.getenv("workspace_BUCKET")}"
S3_PREFIX = "sentinel2_cogs"

ALL_BANDS = ["B01", "B02", "B03", "B04", "B05", "B06", "B07", "B08", "B8A", "B09", "B11", "B12"]

# Area of Interest
bbox = BBox(bbox=[16.15, 47.95, 16.55, 48.3], crs=CRS.WGS84)
end_date = datetime.now()
# Trying to take most recent scenes
start_date = end_date - timedelta(days=360)

# Please make sure you the cdse secret is available in
# the Credentials Manager and injection is enabled
# or replace this with you own CDSE tokens
CLIENT_ID = os.getenv("cdse_CLIENT_ID")
CLIENT_SECRET = os.getenv("cdse_CLIENT_SECRET")
TOKEN_URL = "https://identity.dataspace.copernicus.eu/auth/realms/CDSE/protocol/openid-connect/token"
SH_BASE_URL = "https://sh.dataspace.copernicus.eu"

### Authentication Manager
This class wraps the OAuth session to handle token refreshing automatically as it is possible to loose the session while accessing the various tiles

In [None]:
class CDSESessionManager:
    """
    Thread-safe session manager that automatically refreshes the token 
    when a TokenExpiredError occurs.
    """
    def __init__(self, client_id, client_secret):
        self.client_id = client_id
        self.client_secret = client_secret
        self.token_url = TOKEN_URL
        self.lock = threading.Lock()
        self.session = None
        self.authenticate()

    def authenticate(self):
        """Create a new authenticated session"""
        print("Authenticating with CDSE...")
        client = BackendApplicationClient(client_id=self.client_id)
        self.session = OAuth2Session(client=client)
        token = self.session.fetch_token(
            token_url=self.token_url,
            client_secret=self.client_secret,
            include_client_id=True
        )
        print(f"   Token obtained. Expires in {token.get('expires_in', 'unknown')}s")
        return self.session

    def refresh_token(self):
        """Thread-safe token refresh"""
        with self.lock:
            # Check if token was just refreshed by another thread to avoid double-refresh
            # (Simple heuristic: just re-auth)
            print("\nToken expired. Refreshing...", end=" ")
            try:
                self.authenticate()
                print("Done.")
            except Exception as e:
                print(f"FAILED: {e}")
                raise

    def post(self, url, json_data, retries=1):
        """Wrapper for session.post with auto-refresh logic"""
        try:
            response = self.session.post(url, json=json_data)
            # Sometimes the server returns 401 instead of the library raising TokenExpiredError
            if response.status_code == 401:
                raise TokenExpiredError("401 Unauthorized")
            response.raise_for_status()
            return response
            
        except TokenExpiredError:
            if retries > 0:
                self.refresh_token()
                # Retry the request with new session
                return self.post(url, json_data, retries=retries-1)
            else:
                raise

# Initialize the Manager
auth_manager = CDSESessionManager(CLIENT_ID, CLIENT_SECRET)


In [None]:
# Collection Setup
coll_payload = {
    "type": "Collection", "stac_version": "1.0.0", "id": STAC_COLLECTION_ID,
    "title": "Vienna Sentinel-2", "description": "Imported from CDSE",
    "eodash:rasterform": "https://workspace-ui-public.gtif-austria.hub-otc.eox.at/api/public/share/public-4wazei3y-02/test/aducat_s2_form.json",
    "license": "CC-BY-4.0",
    "extent": {"spatial": {"bbox": [[16.2, 48.1, 16.6, 48.3]]}, "temporal": {"interval": [["2020-01-01T00:00:00Z", None]]}},
    "links": []
}
try: 
    requests.post(f"{EOAPI_URL}/collections", json=coll_payload)
except: pass

## Search in CDSE STAC

In [None]:
# --------------------------------------------------------------------------------
# Search for Latest Sentinel-2 Scenes
# --------------------------------------------------------------------------------

def get_existing_dates(eoapi_url, collection_id):
    """
    Fetch all unique dates (YYYY-MM-DD) currently registered in the STAC collection.
    """
    existing_dates = set()
    url = f"{eoapi_url}/collections/{collection_id}/items"
    
    # Simple pagination handling to get all items
    params = {"limit": 100} 
    
    try:
        print(f" Checking existing dates in collection '{collection_id}'...")
        while url:
            response = requests.get(url, params=params)
            if response.status_code == 404:
                return set() # Collection likely doesn't exist yet
            
            response.raise_for_status()
            data = response.json()
            
            for feature in data.get('features', []):
                # Extract YYYY-MM-DD
                date_str = feature['properties']['datetime'][:10]
                existing_dates.add(date_str)
                
            # Check for next page link
            url = None
            links = data.get('links', [])
            for link in links:
                if link['rel'] == 'next':
                    url = link['href']
                    params = {} # Params usually included in next link
                    break
    except Exception as e:
        print(f" Could not fetch existing items (starting fresh?): {e}")
        
    return existing_dates

def search_sentinel2_scenes(auth_manager, bbox, start_date, end_date, max_results=2):
    """
    Search for Sentinel-2 scenes using CDSE OpenSearch API,
    filtering out days that are already in the STAC collection.
    """
    base_url = "https://catalogue.dataspace.copernicus.eu/resto/api/collections/Sentinel2/search.json"
    
    params = {
        "box": f"{bbox.min_x},{bbox.min_y},{bbox.max_x},{bbox.max_y}",
        "startDate": start_date.strftime("%Y-%m-%dT%H:%M:%SZ"),
        "completionDate": end_date.strftime("%Y-%m-%dT%H:%M:%SZ"),
        "processingLevel": "S2MSI2A", 
        "cloudCover": "[0,5]",
        "maxRecords": 200,
        "sortParam": "startDate",
        "sortOrder": "descending"
    }
    
    # 1. Fetch existing dates from your catalog
    existing_dates = get_existing_dates(EOAPI_URL, STAC_COLLECTION_ID)
    if existing_dates:
        print(f" Found {len(existing_dates)} days already registered. Skipping these dates.")
    
    # 2. Search CDSE
    print(" Searching CDSE catalog...")
    response = requests.get(base_url, params=params)
    response.raise_for_status()
    
    results = response.json()
    features = results.get('features', [])
    
    # 3. Filter results
    # Initialize seen_dates with what is already in the DB so we don't pick them again
    seen_dates = existing_dates.copy()
    converted_features = []
    
    for feature in features:
        scene_id = feature['id']
        # Get YYYY-MM-DD
        scene_date = feature['properties']['startDate'][:10]
        
        # SKIP IF DATE ALREADY EXISTS (in DB or earlier in this loop)
        if scene_date in seen_dates:
            continue
            
        # Mark this date as seen so we don't pick another scene from the same day
        seen_dates.add(scene_date)
        
        converted_features.append({
            'id': scene_id,
            'properties': {
                'datetime': feature['properties']['startDate'],
                'eo:cloud_cover': feature['properties'].get('cloudCover', 0)
            },
            'geometry': feature.get('geometry'),
            'bbox': feature.get('bbox')
        })
        
        if len(converted_features) >= max_results:
            break
    
    return converted_features

# Run the search, here we configure the area and time of interest as well as how many scenes to fetch
scenes = search_sentinel2_scenes(auth_manager, bbox, start_date, end_date, 5)

print(f"\nSelected {len(scenes)} NEW scenes for processing:")
for i, scene in enumerate(scenes, 1):
    print(f"{i}. ID: {scene['id']}")
    print(f"   Date: {scene['properties']['datetime']}")
    print(f"   Cloud Cover: {scene['properties'].get('eo:cloud_cover', 'N/A')}%\n")

## Data conversion

In [None]:

# --- Configuration ---
TARGET_RES = 10 
MAX_PIXELS = 2000 
MAX_WORKERS = 2 
MAX_RETRIES = 3
RETRY_DELAY = 2

@dataclass
class SimpleBBox:
    min_x: float; min_y: float; max_x: float; max_y: float

def split_bbox_into_tiles(bbox):
    w_deg, h_deg = bbox.max_x - bbox.min_x, bbox.max_y - bbox.min_y
    center_lat = (bbox.min_y + bbox.max_y) / 2
    m_lat, m_lon = 111320, 111320 * math.cos(math.radians(center_lat))
    tot_w_px, tot_h_px = int((w_deg * m_lon) / TARGET_RES), int((h_deg * m_lat) / TARGET_RES)
    cols, rows = math.ceil(tot_w_px / MAX_PIXELS), math.ceil(tot_h_px / MAX_PIXELS)
    
    tiles = []
    print(f"    Tiling: {cols}x{rows} grid")
    for r in range(rows):
        for c in range(cols):
            tiles.append({
                "bbox": SimpleBBox(
                    bbox.min_x + (c * (w_deg/cols)), 
                    bbox.min_y + (r * (h_deg/rows)), 
                    bbox.min_x + ((c+1) * (w_deg/cols)), 
                    bbox.min_y + ((r+1) * (h_deg/rows))
                ),
                "size": (int(tot_w_px/cols), int(tot_h_px/rows)),
                "index": (r, c)
            })
    return tiles

def convert_to_cog(path, arr, transform, crs):
    prof = cog_profiles.get("deflate").copy()
    prof.update({
        "BIGTIFF": "IF_NEEDED",
        "blockxsize": 512,
        "blockysize": 512,
        "photometric": "MINISBLACK"
    })
    with MemoryFile() as mem:
        with mem.open(driver="GTiff", dtype=str(arr.dtype), count=arr.shape[0], 
                      height=arr.shape[1], width=arr.shape[2], crs=crs, 
                      transform=transform, photometric="MINISBLACK") as dst:
            dst.write(arr)
            dst.descriptions = tuple(ALL_BANDS)
            dst.build_overviews([2,4,8,16], Resampling.nearest)
            dst.update_tags(ns='rio_overview', resampling='nearest')
        with mem.open() as src:
            cog_translate(src, path, prof, in_memory=True, quiet=True)
    print(f"   COG Saved: {path}")

def download_single_tile(auth_mgr, tile_data, scene_id, scene_datetime):
    """Worker function with RETRY logic"""
    dt = datetime.fromisoformat(scene_datetime.replace('Z', '+00:00'))
    
    # ... (Evalscript needed for data extraction) ...
    evalscript = """
    //VERSION=3
    function setup() {
        return {
            input: [{ bands: """ + json.dumps(ALL_BANDS) + """, units: "DN" }],
            output: { bands: """ + str(len(ALL_BANDS)) + """, sampleType: "UINT16" }
        };
    }
    function evaluatePixel(sample) {
        return [""" + ", ".join([f"sample.{b}" for b in ALL_BANDS]) + """];
    }
    """
    
    t_bbox = tile_data['bbox']
    payload = {
        "input": {
            "bounds": {
                "bbox": [t_bbox.min_x, t_bbox.min_y, t_bbox.max_x, t_bbox.max_y],
                "properties": {"crs": "http://www.opengis.net/def/crs/EPSG/0/4326"}
            },
            "data": [{
                "type": "sentinel-2-l2a",
                "dataFilter": {
                    "timeRange": {
                        "from": (dt - timedelta(minutes=30)).strftime("%Y-%m-%dT%H:%M:%SZ"),
                        "to": (dt + timedelta(minutes=30)).strftime("%Y-%m-%dT%H:%M:%SZ")
                    }
                }
            }]
        },
        "output": {
            "width": tile_data['size'][0], "height": tile_data['size'][1],
            "responses": [{"identifier": "default", "format": {"type": "image/tiff"}}]
        },
        "evalscript": evalscript
    }
    
    # --- Retry Loop ---
    for attempt in range(1, MAX_RETRIES + 1):
        try:
            response = auth_mgr.post(f"{SH_BASE_URL}/api/v1/process", payload)
            # Assuming auth_mgr raises an exception on non-200. 
            # If not, check response.status_code here.
            
            # Simple validation to ensure we actually got data
            if response.content and len(response.content) > 100:
                return tile_data['index'], response.content
            else:
                raise Exception("Empty response received")

        except Exception as e:
            # If this was the last attempt, fail
            if attempt == MAX_RETRIES:
                print(f"      Tile {tile_data['index']} failed permanently: {e}")
                return tile_data['index'], None
            
            # Otherwise, wait and retry
            sleep_time = RETRY_DELAY * attempt
            print(f"      Tile {tile_data['index']} failed (Attempt {attempt}/{MAX_RETRIES}). Retrying in {sleep_time}s...")
            time.sleep(sleep_time)

    return tile_data['index'], None

# --- Main Loop ---
processed_scenes = []
tiles = split_bbox_into_tiles(bbox) 

for scene in scenes:
    s_id, s_dt = scene['id'], scene['properties']['datetime']
    cog_path = output_dir / f"{s_dt[:10]}_{s_id}_cog.tif"
    print(f"\nProcessing {s_id}...")

    if cog_path.exists():
        print(f"   File exists. Skipping download.")
        processed_scenes.append({"id": s_id, "path": cog_path, "props": scene['properties'], "geom": scene['geometry']})
        continue

    print(f"   Downloading {len(tiles)} tiles (Parallel)...")
    tile_results = {}
    
    with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        futures = {executor.submit(download_single_tile, auth_manager, t, s_id, s_dt): t for t in tiles}
        
        for f in concurrent.futures.as_completed(futures):
            idx, content = f.result()
            # Only store if content is valid
            if content: 
                tile_results[idx] = content
    
    # --- Completeness Check ---
    # We compare the number of valid results to the number of expected tiles.
    if len(tile_results) != len(tiles):
        missing_count = len(tiles) - len(tile_results)
        print(f"   Scene failed. Missing {missing_count} tiles. Discarding scene.")
        # We 'continue' here to move to the next scene without stitching
        continue

    print("   Stitching & Converting...")
    m_files, m_dsets = [], []
    try:
        for k in sorted(tile_results.keys()):
            mf = MemoryFile(tile_results[k])
            m_files.append(mf); m_dsets.append(mf.open())
        
        mosaic, trans = merge(m_dsets)
        convert_to_cog(str(cog_path), mosaic, trans, m_dsets[0].crs)
        
        if cog_path.exists():
            processed_scenes.append({"id": s_id, "path": cog_path, "props": scene['properties'], "geom": scene['geometry']})
    except Exception as e:
        print(f"   Stitching failed: {e}")
    finally:
        for d in m_dsets: d.close()
        for m in m_files: m.close()

## Register in eoAPI

If wanted this example continues explaining how the retrieved data can be registered into an eoAPI instance

In [None]:
def create_and_register(scene, eoapi_url, collection_id):
    s_id, props = scene['id'], scene['props']
    dt = datetime.fromisoformat(props['datetime'].replace('Z', '+00:00'))
    cog_url = f"{S3_BUCKET}/{S3_PREFIX}/{scene['path'].name}"
    
    # Geometry fallback
    geom = scene['geom']
    if not geom: 
        geom = {"type": "Polygon", "coordinates": [[ [bbox.min_x, bbox.min_y], [bbox.max_x, bbox.min_y], [bbox.max_x, bbox.max_y], [bbox.min_x, bbox.max_y], [bbox.min_x, bbox.min_y] ]]}
        
    # Get BBox from geometry for STAC Item
    coords = geom['coordinates'][0]
    xs, ys = [c[0] for c in coords], [c[1] for c in coords]
    ibbox = [min(xs), min(ys), max(xs), max(ys)]

    item = pystac.Item(id=s_id, geometry=geom, bbox=ibbox, datetime=dt, 
                       properties={"eo:cloud_cover": props.get('eo:cloud_cover', 0)}, collection=collection_id)
    
    item.add_asset("cog", pystac.Asset(href=cog_url, media_type="image/tiff; application=geotiff; profile=cloud-optimized", roles=["data", "visual"]))
    
    # Add XYZ Preview Link
    eoapi_endpoint = os.getenv("workspace_RASTER_ENDPOINT")
    xyz = f"{eoapi_endpoint}/collections/{collection_id}/items/{s_id}/tiles/WebMercatorQuad/{{z}}/{{x}}/{{y}}@1x?assets=cog&asset_bidx=cog|4,3,2&rescale=0,3000"
    item.add_link(pystac.Link(rel="xyz", target=xyz, media_type="application/json"))

    res = requests.post(f"{eoapi_url}/collections/{collection_id}/items", json=item.to_dict())
    if res.status_code in [200, 201]: print(f" Registered {s_id}")
    else: print(f" Failed {s_id}: {res.text}")

print(f"Registration...")
for s in processed_scenes:
    create_and_register(s, EOAPI_URL, STAC_COLLECTION_ID)

In [None]:
# --------------------------------------------------------------------------------
# Finally we update Collection Temporal Extent
# --------------------------------------------------------------------------------
import requests

def update_collection_extent(eoapi_url, collection_id):
    print(f"Updating temporal extent for collection: '{collection_id}'...")

    # 1. Fetch all item datetimes to determine the full range
    all_datetimes = []
    items_url = f"{eoapi_url}/collections/{collection_id}/items"
    params = {"limit": 100}

    try:
        # Loop through pages to get every single item's date
        while items_url:
            r = requests.get(items_url, params=params)
            r.raise_for_status()
            data = r.json()
            
            for feature in data.get('features', []):
                all_datetimes.append(feature['properties']['datetime'])
            
            # Handle Pagination (Follow 'next' link)
            items_url = None
            for link in data.get('links', []):
                if link['rel'] == 'next':
                    items_url = link['href']
                    params = {} # Parameters are usually included in the 'next' href
                    break

        if not all_datetimes:
            print(" No items found in collection. Skipping extent update.")
            return

        # 2. Calculate new Min/Max
        min_dt = min(all_datetimes)
        max_dt = max(all_datetimes)
        
        print(f"   Found {len(all_datetimes)} items.")
        print(f"   Calculated Interval: {min_dt}  <-->  {max_dt}")

        # 3. Fetch the current Collection definition
        coll_url = f"{eoapi_url}/collections/{collection_id}"
        coll_resp = requests.get(coll_url)
        coll_resp.raise_for_status()
        coll_data = coll_resp.json()
        
        # 4. Update the extent in the JSON
        # STAC Spec: extent.temporal.interval is a list of lists [[start, end]]
        coll_data['extent']['temporal']['interval'] = [[min_dt, max_dt]]
        
        # 5. Send PUT request to save changes
        update_resp = requests.put(coll_url, json=coll_data)
        
        if update_resp.status_code in [200, 204]:
            print(f" Success! Collection '{collection_id}' temporal extent updated.")
        else:
            print(f" Failed to update collection. Server returned: {update_resp.status_code}")
            print(update_resp.text)

    except Exception as e:
        print(f" Error updating extent: {e}")

# Run the update
update_collection_extent(EOAPI_URL, STAC_COLLECTION_ID)