In [2]:
from pystac_client import Client
from pystac import Catalog
import pystac
import pandas as pd
import matplotlib.pyplot as plt
import leafmap.foliumap as leafmap
from shapely import Polygon
from shapely.geometry import box
import geopandas as gpd
import numpy as np
from pydantic import ValidationError, BaseModel

from typing import Dict, List, Optional, Union, Any
import concurrent.futures
import re
import socket
from tqdm import tqdm
from urllib.error import URLError
from urllib.request import urlopen
from datetime import datetime
import pickle
import requests
from requests import Session
from threading import local
import pprint as pp
import os
import logging

logging.basicConfig()
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

### Objectif 

* Reproduire le dataframe `Morocco-Earthquake-Sept-2023.tsv` de manière plus efficient : les objets STAC sont tres lent à manipuler/naviguer; on va donc au plus extraire les éléments en json puis implémenter les requêtes en multithreading
* Plusieurs produits sont disponibles : pan, rgb, multispectral. On permet le choix du produit désiré (un à la fois).
* Implementer Downloader pour les tiles en fonction de la zone, de la date (non implémenté)
* Voir remarques en fin de notebook pour reflexions futures et limites

### Data path

In [3]:
# please create - change output directory before run
out_dir_path = "../Data/processed"

### Utils functions

In [3]:
def make_path(file_name, dir_path=None):
    if dir_path is None:
        dir_path = out_dir_path
    return os.path.join(dir_path, file_name)

In [4]:
def save_pickle(data, output_path):
    with open(output_path, 'wb') as f:
        pickle.dump(data, f)
        
def load_pickle(path):
    with open(path, 'rb') as f:
        data = pickle.load(f)
    return data

#### Stac items processing

In [5]:
class TypeAssetError(BaseException):
    """Error on img asset selection"""

class STACOpenerError(Exception):
    """An error indicating that a STAC file could not be opened"""
    
    
_root_url_id_ = "ard/29/" # change for other collection maxar - see STAC catalog
    
def id_to_href(hrefs, collection):
    """
    build dictionnary to map id product - threads with .submits() doesn't keep order
    hope it's consistent with stac standards
    id_item : quadkey/date/catalog_id
    """

    hrefs_mapping = {}
    for h in hrefs:
        id_item = h.split("/".join([collection, _root_url_id_]))[1:][0][:-5]
        hrefs_mapping[id_item] = '/'.join(h.split("/")[:-1])
    return hrefs_mapping

def isotime(date_string):
    return date_string.replace("T", " ").replace("Z", "").rstrip()
    
def build_link_product(item, hrefs_map):
    """faster than use STAC Item 
    not very readable
    """
    
    tmp_id = (
        "/".join([
            item["quadkey"],
            datetime.strftime(
                datetime.strptime(isotime(item["datetime"]), "%Y-%m-%d %H:%M:%S")
                , "%Y-%m-%d"),
            item["catalog_id"]
        ])
    )
    item["href"] = '/'.join([hrefs_map[tmp_id], item['href'].split('/')[-1]])
    return item

def extract_assets(content, type_item="visual"):
    """
    Parse json - selection of interesting properties
    type_item :  'data-mask', 'ms_analytic', 'pan_analytic', 'visual'
    """
    assets_selection = {
        "id":"id",
        "assets": {type_item : ["title", "type", "href", ]},
        "properties" : ["datetime", "platform", "gsd", "catalog_id", "utm_zone", "quadkey", "grid:code", "proj:epsg", "proj:bbox", "proj:shape"] \
            + [_ for _ in content["properties"] if _.startswith("view")]\
            + [_ for _ in content["properties"] if _.startswith("tile")]\

    }
    
    def process_key(k):
        return k.replace(':', '_')

    new_content = {}
    for group, asset_keys in assets_selection.items(): 
        if isinstance(asset_keys, dict):
            (sub_group, sub_asset_keys), = asset_keys.items()
            try:
                new_content = new_content | {k:v for k,v in content[group][sub_group].items() if k in sub_asset_keys}
            except KeyError as e:
                logger.error(f"Authorized types : {content[group].keys()}")
                raise TypeAssetError

    
        elif isinstance(asset_keys, str):
            new_content = new_content | {asset_keys: content[group]}
        else:
            # list
            #asset_keys = [process_key(_) for _ in asset_keys]
            new_content = new_content | {k:v for k,v in content[group].items() if k in asset_keys}
    # np array to prevent error of index out of bounds
    geom = np.array(content["properties"]["proj:geometry"]["coordinates"]).squeeze().tolist()
    new_content = new_content | {"proj_geometry":geom}
    new_content = {process_key(k):v for k,v in new_content.items()}
    
    return new_content

#### HTTP Requests

In [6]:
thread_local = local()

def get_session() -> Session:
    if not hasattr(thread_local,'session'):
        thread_local.session = requests.Session() # Create a new Session if not exists
    return thread_local.session

def read_http_stac_file(url, timeout=None):
    """requests with session object"""
    try:
        session = get_session()
        # cannot set timeout to request session
        with session.get(url) as response:
            return response.json()
        
    except URLError as e:
        if isinstance(e.reason, socket.timeout):
            logger.error("%s: %s", url, e)
            raise socket.timeout(
                f"{url} with a timeout of {self.timeout} seconds"
            ) from None
        else:
            logger.debug("read_http_remote_json is not the right STAC opener")
            raise STACOpenerError

### Save tmp files (optional)

In [None]:
%%time
root_catalog = Client.open('https://maxar-opendata.s3.amazonaws.com/events/catalog.json')
cat = root_catalog.get_child('Morocco-Earthquake-Sept-2023')
hrefs = []
for parent_catalog, _, _ in cat.walk():
    hrefs += [
        link.get_absolute_href() for link in parent_catalog.get_item_links()
    ]

In [None]:
# stac items href (.json)
#save_pickle(hrefs, make_path("maxar_href_stac_items.pkl"))

In [None]:
%%time
N_EX=None
N=100
timeout=100
items = []
if N_EX is None:
    N_EX = len(hrefs)
    
def download_threaded(hrefs, max_connections, timeout=None):
    with tqdm(total=len(hrefs[:N_EX])) as pbar:
        with concurrent.futures.ThreadPoolExecutor(max_workers=max_connections) as executor:
            future_to_href = [executor.submit(read_http_stac_file, href, timeout) for href in hrefs[:N_EX]]
            for future in concurrent.futures.as_completed(future_to_href):
                pbar.update(1)
                yield future.result()
                
for res in download_threaded(hrefs, max_connections=N, timeout=timeout):
    items.append(res)

In [None]:
# stac items raw (json)
#save_pickle(items, make_path("maxar_stac_items.pkl"))

### Data Model validation

In [11]:
# implemented but not tested
class MaxarOpenData(BaseModel):
    id: str
    type: str
    title: str
    href: str
    proj_bbox: list[float]
    shape: list[int]
    platform: str
    gsd: float
    catalog_id: str
    utm_zone: int
    quadkey: str
    view_off_nadir: float
    view_azimuth: float
    view_incidence_angle: float
    view_sun_azimuth: float
    view_sun_elevation: float
    proj_epsg: int
    proj_geometry: list
    grid_code: str
    tile_data_area: float
    tile_clouds_area: float
    tile_clouds_percent: float

### Stac reader items

In [119]:
class StaticStacReader:
        
    def __init__(self, 
                 model_validation: BaseModel, 
                 url_catalog: str, 
                 collection: str, 
                 local_items:Dict[str, str]=None,
                 max_connections: int=20, 
                 timeout: int=20):
        
        self.model_validation = model_validation
        self.timeout = timeout
        self.max_connections = max_connections
        self.url_catalog = url_catalog
        self.collection = collection
        self.catalog = None
        self.local_items = local_items

    def open_catalog(self, url_catalog: str, collection: str) -> pystac.Catalog:
        root_catalog = Client.open(url_catalog)
        return root_catalog.get_child(collection)
        
    def read(self, type_item: str, validate:bool=False) -> gpd.GeoDataFrame:
        """
        add validation json retun with pydantic (currently implemented but not tested)
        """
        items = self.get_items(type_item)
        if validate:
            self.validate(items)
        
        geom = [Polygon(_["proj_geometry"]) for _ in items]
        gdf = gpd.GeoDataFrame(items, crs=int(items[0]["proj_epsg"]), geometry=geom)
        
        return gdf
    
    def validate(self, items: List[Dict]) -> None:
        # to check
        for item in items:
            try:
                self.model_validation(**item)  
            except ValidationError as e:
                print(e.errors())
                
    def get_items(self, type_item: str) -> List[Dict[Any, Any]]:
        
        items: List[Dict[Any, Any]] = []
        logger.info("Build absolute path for stac items")
            
        hrefs = self.get_hrefs()
        items = self._get_items(hrefs)
        items = self.process_items(items, hrefs, type_item)
        
        return items
    
    def get_hrefs(self) -> List[str]:
        hrefs = []
        if self.local_items is None:
            self.catalog = self.open_catalog(self.url_catalog, self.collection) 
            # build absolute path for file stac items
            for parent_catalog, _, _ in self.catalog.walk():
                hrefs += [
                    link.get_absolute_href() for link in parent_catalog.get_item_links()
                ]
        else:
            hrefs = load_pickle(self.local_items["hrefs"])
        return hrefs
        
    
    def process_items(self, items: List[Dict[Any, Any]], hrefs: List[str], type_item: str) -> List[Dict[Any, Any]]:
        """items: Generator"""
        items_clean = []
        hrefs_mapping = id_to_href(hrefs, self.collection)
        
        for item in items:
            n_item = extract_assets(item, type_item=type_item)
            n_item = build_link_product(n_item, hrefs_mapping)
            items_clean.append(n_item)
            
        return items_clean
    
    def _get_items(self, hrefs: List[str]) -> Any: # add Generator type
        if self.local_items is not None:
            # return generator
            logger.info("Load stac items from local file..")

            return (x for x in load_pickle(self.local_items["items"]))
        else:
            logger.info("Start download stac items..")
            return self.download_items_threaded(hrefs)
        
    def download_items_threaded(self, hrefs: List[str]) -> Any: # add Generator type
  
        with tqdm(total=len(hrefs)) as pbar:
            with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_connections) as executor:
                future_to_href = [executor.submit(read_http_stac_file, href, self.timeout) for href in hrefs]
                for future in concurrent.futures.as_completed(future_to_href):
                    pbar.update(1)
                    yield future.result()

### Runner

local items are tmp files (see above) but if not provided; the reader will download them (not save locally).

Important to put high timeout to avoid timeout issue with remote server.

From scratch : ~ 1min40 vs 45min with leafmap.

In [77]:
reader = StaticStacReader(model_validation=MaxarOpenData, 
                 url_catalog='https://maxar-opendata.s3.amazonaws.com/events/catalog.json', 
                 collection='Morocco-Earthquake-Sept-2023', 
                 local_items=None,
                 max_connections=100, 
                 timeout=30)

With local items (2s) :

```python
reader = StaticStacReader(model_validation=MaxarOpenData, 
                 url_catalog='https://maxar-opendata.s3.amazonaws.com/events/catalog.json', 
                 collection='Morocco-Earthquake-Sept-2023', 
                 local_items={
                     "hrefs": make_path("maxar_href_stac_items.pkl"),
                     "items":make_path("maxar_stac_items.pkl")
                 },
                 max_connections=100, 
                 timeout=30)
```

In [121]:
%%time
gdf = reader.read(type_item="visual")

INFO:__main__:Build absolute path for stac items
INFO:__main__:Start download stac items..
100%|██████████████████████████████████████████████| 8724/8724 [00:24<00:00, 358.34it/s]


CPU times: user 21 s, sys: 2.93 s, total: 23.9 s
Wall time: 1min 40s


In [122]:
# change to shp
gdf.head()

Unnamed: 0,id,type,title,href,datetime,platform,gsd,catalog_id,utm_zone,quadkey,...,view_sun_azimuth,view_sun_elevation,proj_epsg,grid_code,proj_bbox,tile_data_area,tile_clouds_area,tile_clouds_percent,proj_geometry,geometry
0,29/031313133122/04eddade-a2dc-4d16-8710-fb764d...,image/tiff; application=geotiff; profile=cloud...,Visual Image,https://maxar-opendata.s3.amazonaws.com/events...,2018-08-31 11:25:53Z,WV04,0.37,04eddade-a2dc-4d16-8710-fb764d318766-inv,29,31313133122,...,139.1,62.6,32629,MXRA-Z29-031313133122,"[483029.47998046875, 3379843.75, 485156.25, 33...",10.7,0.0,0,"[[483208.0078125, 3385087.5854492188], [483840...","POLYGON ((483208.008 3385087.585, 483840.637 3..."
1,29/031313133132/04eddade-a2dc-4d16-8710-fb764d...,image/tiff; application=geotiff; profile=cloud...,Visual Image,https://maxar-opendata.s3.amazonaws.com/events...,2018-08-31 11:25:53Z,WV04,0.37,04eddade-a2dc-4d16-8710-fb764d318766-inv,29,31313133132,...,139.1,62.6,32629,MXRA-Z29-031313133132,"[489843.75, 3379843.75, 495156.25, 3384461.975...",23.0,0.0,0,"[[489843.75, 3384461.9750976562], [489843.75, ...","POLYGON ((489843.750 3384461.975, 489843.750 3..."
2,29/031313133120/04eddade-a2dc-4d16-8710-fb764d...,image/tiff; application=geotiff; profile=cloud...,Visual Image,https://maxar-opendata.s3.amazonaws.com/events...,2018-08-31 11:25:53Z,WV04,0.37,04eddade-a2dc-4d16-8710-fb764d318766-inv,29,31313133120,...,139.1,62.6,32629,MXRA-Z29-031313133120,"[483190.3076171875, 3384843.75, 485156.25, 338...",0.3,0.0,0,"[[483208.0078125, 3385087.5854492188], [483266...","POLYGON ((483208.008 3385087.585, 483266.296 3..."
3,29/031313133303/04eddade-a2dc-4d16-8710-fb764d...,image/tiff; application=geotiff; profile=cloud...,Visual Image,https://maxar-opendata.s3.amazonaws.com/events...,2018-08-31 11:25:52Z,WV04,0.37,04eddade-a2dc-4d16-8710-fb764d318766-inv,29,31313133303,...,139.1,62.7,32629,MXRA-Z29-031313133303,"[484843.75, 3369843.75, 490156.25, 3375156.25]",28.2,0.0,0,"[[484843.75, 3375156.25], [484843.75, 3369843....","POLYGON ((484843.750 3375156.250, 484843.750 3..."
4,29/031313133301/04eddade-a2dc-4d16-8710-fb764d...,image/tiff; application=geotiff; profile=cloud...,Visual Image,https://maxar-opendata.s3.amazonaws.com/events...,2018-08-31 11:25:52Z,WV04,0.37,04eddade-a2dc-4d16-8710-fb764d318766-inv,29,31313133301,...,139.1,62.6,32629,MXRA-Z29-031313133301,"[484843.75, 3374843.75, 490156.25, 3380156.25]",28.2,0.0,0,"[[484843.75, 3380156.25], [484843.75, 3374843....","POLYGON ((484843.750 3380156.250, 484843.750 3..."


In [88]:
stac_items = load_pickle(make_path("maxar_stac_items.pkl"))

### Check with référence dataset (opengeos)

In [102]:
opengeo = pd.read_csv(make_path("opengeos_Morocco-Earthquake-Sept-2023.tsv"), sep="\t")

In [104]:
print(opengeo.shape)
print(gdf.shape)

(8724, 19)
(8724, 22)


In [123]:
assert len(set(opengeo.visual).intersection(set(gdf.href))) == opengeo.shape[0] == gdf.shape[0]

Ok on retrouve bien les mêmes liens que opengeos

### Memory usage

In [80]:
gdf.memory_usage(deep=True).sum()

10193736

In [81]:
gdf.iloc[0]

id                      29/031313133311/04eddade-a2dc-4d16-8710-fb764d...
type                    image/tiff; application=geotiff; profile=cloud...
title                                                        Visual Image
href                    https://maxar-opendata.s3.amazonaws.com/events...
datetime                                             2018-08-31 11:25:53Z
platform                                                             WV04
gsd                                                                  0.36
catalog_id                       04eddade-a2dc-4d16-8710-fb764d318766-inv
utm_zone                                                               29
quadkey                                                      031313133311
view_off_nadir                                                       24.4
view_azimuth                                                        121.5
view_incidence_angle                                                 63.1
view_sun_azimuth                      

In [82]:
gdf = gdf.assign(
    #platform=gdf.platform.astype("category"),
    gsd=gdf.gsd.astype("float32"),
    utm_zone=gdf.utm_zone.astype("int16"),
    view_off_nadir=gdf.view_off_nadir.astype("float32"),
    view_azimuth=gdf.view_azimuth.astype("float32"),
    view_sun_elevation=gdf.view_sun_elevation.astype("float"),
    proj_epsg=gdf.proj_epsg.astype("int16"),
    data_area=gdf.tile_data_area.astype("float16"),
    clouds_area=gdf.tile_clouds_area.astype("float16"),
    clouds_percent=gdf.tile_clouds_percent.astype("float16"),
    geometry_bbox = [str(x) for x in gdf.proj_bbox]
).drop({"proj_geometry", "proj_bbox", "tile_data_area", "tile_clouds_area", "tile_clouds_percent"}, axis=1)

In [83]:
gdf.crs

<Projected CRS: EPSG:32629>
Name: WGS 84 / UTM zone 29N
Axis Info [cartesian]:
- E[east]: Easting (metre)
- N[north]: Northing (metre)
Area of Use:
- name: Between 12°W and 6°W, northern hemisphere between equator and 84°N, onshore and offshore. Algeria. Côte D'Ivoire (Ivory Coast). Faroe Islands. Guinea. Ireland. Jan Mayen. Mali. Mauritania. Morocco. Portugal. Sierra Leone. Spain. United Kingdom (UK). Western Sahara.
- bounds: (-12.0, 0.0, -6.0, 84.0)
Coordinate Operation:
- name: UTM zone 29N
- method: Transverse Mercator
Datum: World Geodetic System 1984 ensemble
- Ellipsoid: WGS 84
- Prime Meridian: Greenwich

In [84]:
gdf.columns

Index(['id', 'type', 'title', 'href', 'datetime', 'platform', 'gsd',
       'catalog_id', 'utm_zone', 'quadkey', 'view_off_nadir', 'view_azimuth',
       'view_incidence_angle', 'view_sun_azimuth', 'view_sun_elevation',
       'proj_epsg', 'grid_code', 'geometry', 'data_area', 'clouds_area',
       'clouds_percent', 'geometry_bbox'],
      dtype='object')

### Save

Fiona limit 10 lenght characters for columns

In [85]:
mapping_columns = {n[:10]:n for n in gdf.columns}

In [86]:
save_pickle(mapping_columns, make_path("mapping_columns_stac_collection.pkl"))

In [87]:
gdf.to_file(make_path("maxar_stac_items.shp"))



## Downloader Cogs

In [118]:
# to do
"""
- based on href list
- or on bbox selection + date range
- check how to use COGs efficiency
"""
class DownloaderCogs:
    def __init__(self):
        pass

### Remarques

* add validation data with pydantic with MaxarOpenData
* nombre de threads apparement à définir par rapport aux nombres de cpu+4. Mais plus rapide en définissant une grande Pool de thread (100)

In [115]:
gdf.iloc[3].href

'https://maxar-opendata.s3.amazonaws.com/events/Morocco-Earthquake-Sept-2023/ard/29/120200233302/2010-08-26/1030010006CA6400-visual.tif'

In [116]:
gdf.iloc[2].href

'https://maxar-opendata.s3.amazonaws.com/events/Morocco-Earthquake-Sept-2023/ard/29/031313133301/2018-08-31/04eddade-a2dc-4d16-8710-fb764d318766-inv-visual.tif'

Attention l'id ne semble pas etre tres pertinent (non present dans opengeo)

### Références

* leafmap source code : https://github.com/opengeos/leafmap/blob/master/leafmap/stac.py
* eodag source code : https://github.com/CS-SI/eodag