In [1]:
import geopandas as gpd 
import numpy as np 
import os
import matplotlib.pyplot as plt 
import rasterio as rio 
import rioxarray as rioxr
import glob 
import xarray as xr
import folium
from shapely.geometry import box, Polygon
import pandas as pd 
from shapely import wkt
import otbApplication
import pyotb

from pathlib import Path
from typing import Tuple, Dict, Any, List, Optional
import fsspec
from xml.etree import ElementTree
import re

2024-01-21 16:54:16 (INFO) [pyotb] Successfully loaded 117 OTB applications


In [2]:
import src.config as cfg

In [3]:
CRS = 32622

root_data_path = "/home/rustt/Documents/Projects/S1_S2_classification/Data"

raster_path_zip = str(Path(root_data_path) / "S1/zip" / "S1B_IW_GRDH_1SDV_20170425T214234_20170425T214302_005323_00953E_F02D.zip")

s1_dir = str(Path(root_data_path) / "S1/zip")

raster_path = str(Path(root_data_path) / "Tests/zip" / "S1B_IW_GRDH_1SDV_20170425T214234_20170425T214302_005323_00953E_F02D.SAFE")
raster_path_tif = str(Path(root_data_path) / "Tests" / "s1b-iw-grd-vh-20170425t214234-20170425t214302-005323-00953e-002.tiff")
raster_file_tif = "s1b-iw-grd-vh-20170425t214234-20170425t214302-005323-00953e-002.tiff"

study_area = str(Path(root_data_path) / "StudyArea" / "StudyArea.shp")

Etapes : 
* 1 - parsing coordinates bbox
* 2 - check product which intersect a least 98% study area => filtered
* 3 - calibrate product + ortho + clip

### Extract valid sentinel 1 products
* overlay of at least th_intersect (0.98 default) over roi area
* same orbit as specify in roi_area

In [4]:
roi_area = gpd.read_file(cfg.study_area)

In [5]:
def extract_footprint(manifest_path): 
    manifest = ElementTree.parse(manifest_path).getroot()
    bbox = manifest.findtext(".//safe:footPrint/", namespaces=cfg.ConfigS1.SENTINEL1_NAMESPACES)
    bbox = [float(_) for _  in re.findall("-*\d+\.\d+", bbox)]
    return Polygon([tuple([x, y]) for x,y in zip(bbox[1::2], bbox[0::2])])

In [6]:
# from xarray-sentinel : https://github.com/bopen/xarray-sentinel/blob/main/xarray_sentinel/esa_safe.py
def findtext(
    tree: ElementTree.Element,
    query: str,
    namespaces: Dict[str, str] = cfg.ConfigS1.SENTINEL1_NAMESPACES,
) -> str:
    value = tree.findtext(query, namespaces=namespaces)
    if value is None:
        raise ValueError(f"{query=} returned None")
    return value


def findall(
    tree: ElementTree.Element,
    query: str,
    namespaces: Dict[str, str] = cfg.ConfigS1.SENTINEL1_NAMESPACES,
) -> List[str]:
    tags = tree.findall(query, namespaces=namespaces)
    values: List[str] = []
    for tag in tags:
        if tag.text is None:
            raise ValueError(f"{query=} returned None")
        values.append(tag.text)
    return values

def parse_annotation_filename(name: str) -> Tuple[str, str, str, str]:
    match = re.match(
        r"([a-z-]*)s1[ab]-([^-]*)-[^-]*-([^-]*)-([\dt]*)-", os.path.basename(name)
    )
    if match is None:
        raise ValueError(f"cannot parse name {name!r}")
    return tuple(match.groups())  # type: ignore

def parse_manifest_sentinel1(
    manifest_path,
) -> Tuple[Dict[str, Any], Dict[str, Tuple[str, str, str, str, str]]]:
    # We use ElementTree because we didn't find a XSD definition for the manifest
    manifest = ElementTree.parse(manifest_path).getroot()

    family_name = findtext(manifest, ".//safe:platform/safe:familyName")
    if family_name != "SENTINEL-1":
        raise ValueError(f"{family_name=} not supported")

    number = findtext(manifest, ".//safe:platform/safe:number")
    mode = findtext(manifest, ".//s1sarl1:instrumentMode/s1sarl1:mode")
    swaths = findall(manifest, ".//s1sarl1:instrumentMode/s1sarl1:swath")

    orbit_number = findall(manifest, ".//safe:orbitNumber")
    if len(orbit_number) != 2 or orbit_number[0] != orbit_number[1]:
        raise ValueError(f"{orbit_number=} not supported")

    relative_orbit_number = findall(manifest, ".//safe:relativeOrbitNumber")
    if (
        len(relative_orbit_number) != 2
        or relative_orbit_number[0] != relative_orbit_number[1]
    ):
        raise ValueError(f"{relative_orbit_number=} not supported")

    orbit_pass = findtext(manifest, ".//s1:pass")
    if orbit_pass not in {"ASCENDING", "DESCENDING"}:
        raise ValueError(f"pass={orbit_pass} not supported")

    ascending_node_time = findtext(manifest, ".//s1:ascendingNodeTime")

    transmitter_receiver_polarisations = findall(
        manifest, ".//s1sarl1:transmitterReceiverPolarisation"
    )
    product_type = findtext(manifest, ".//s1sarl1:productType")

    start_time = findtext(manifest, ".//safe:startTime")
    stop_time = findtext(manifest, ".//safe:stopTime")

    attributes = {
        "family_name": family_name,
        "number": number,
        "mode": mode,
        "swaths": swaths,
        "orbit_number": int(orbit_number[0]),
        "relative_orbit_number": int(relative_orbit_number[0]),
        "pass": orbit_pass,
        "ascending_node_time": ascending_node_time,
        "transmitter_receiver_polarisations": transmitter_receiver_polarisations,
        "product_type": product_type,
        "start_time": start_time,
        "stop_time": stop_time,
    }

    files = {}

    for file_tag in manifest.findall(".//dataObjectSection/dataObject"):
        location_tag = file_tag.find(".//fileLocation")
        if location_tag is not None:
            file_href = location_tag.attrib["href"]
            try:
                description = parse_annotation_filename(os.path.basename(file_href))
            except ValueError:
                continue
            file_type = file_tag.attrib["repID"]
            files[file_href] = (file_type,) + description

    return attributes, files

def get_fs_path(
    urlpath_or_path,
    fs: Optional[fsspec.AbstractFileSystem] = None,
    storage_options: Optional[Dict[str, Any]] = None,
) -> Tuple[fsspec.AbstractFileSystem, str]:
    if fs is not None and storage_options is not None:
        raise TypeError("only one of 'fs' and 'storage_options' can be not None")

    if fs is None:
        fs, _, paths = fsspec.get_fs_token_paths(
            urlpath_or_path, storage_options=storage_options
        )
        if len(paths) == 0:
            raise ValueError(f"file or object not found {urlpath_or_path!r}")
        elif len(paths) > 1:
            raise ValueError(f"multiple files or objects found {urlpath_or_path!r}")
        path = paths[0]
    else:
        path = str(urlpath_or_path)

    if fs.isdir(path):
        path = os.path.join(path, "manifest.safe")

    return fs, path

In [17]:
def compute_intersection(shp_rs: gpd.GeoDataFrame, shp_roi: gpd. GeoDataFrame) -> gpd.GeoDataFrame:
    assert shp_rs.crs == shp_roi.crs
    shp_rs["intersec_roi_perc"] = gpd.overlay(shp_rs, shp_roi, how='intersection').area / shp_roi.area.item()
    return shp_rs

def get_product_rasters_path(product_url):
    tif_path = [str(p) for p in Path(product_url).rglob("*.tiff")] 
    return tif_path

def extract_rs_meta(prod_name: str, product_url: str) -> Dict[str, Any]:
    """
    Extract all meta data of S1 product
    - footprint
    - sensor
    - scene
    - raster paths
    
    """
    _, manifest_path = get_fs_path(product_url)
    geom_rs = extract_footprint(manifest_path)
    common_attrs, products = parse_manifest_sentinel1(manifest_path)
    tif_path = get_product_rasters_path(product_url)
    
    meta = {
        **dict(prod_name=prod_name, 
               geometry=geom_rs, 
               raster_path=tif_path), 
        **common_attrs
    }

    return meta


def get_valid_products(s1_dir: str, 
                       shp_path: str, 
                       col_orbit_shp: str="RelOrb",
                       crs_shp: int=4326,
                       th_intersec: float=0.98, 
                       save: bool=True) -> gpd.GeoDataFrame:
    """ 
    Extract meta data from the S1 products which are compliant : 
    - to orbit number
    - cover at list th_intersec of the study area

    Return : geoDataFrame of metadata S1 products
    """
    

    valid_prod = []
    prod_meta = []

    roi_area = gpd.read_file(shp_path).to_crs(CRS)

    
    for prod_name in os.listdir(s1_dir):
        if prod_name.endswith(".SAFE"):

            product_url = os.path.join(s1_dir, prod_name)
            
            meta = extract_rs_meta(prod_name, product_url)
            
            prod_meta.append(meta)

    prod_meta = gpd.GeoDataFrame(prod_meta, geometry="geometry", crs=crs_shp).to_crs(CRS)

    prod_meta = compute_intersection(prod_meta, roi_area)

    prod_meta = (
        prod_meta[
        (prod_meta.intersec_roi_perc >= th_intersec) & 
        (prod_meta.relative_orbit_number == roi_area[col_orbit_shp].item())
        ]
    )
    # cast to str
    prod_meta["geometry"] = prod_meta.geometry.apply(lambda x: wkt.dumps(x))
    prod_meta = pd.DataFrame(prod_meta)

    if save:
        prod_meta.to_csv(cfg.ConfigS1.meta_product_path)

    return prod_meta

In [18]:
res = get_valid_products(cfg.ConfigS1.s1_dir, cfg.study_area)



In [9]:
res["prod_name"].iloc[0]

'S1B_IW_GRDH_1SDV_20180114T214240_20180114T214308_009173_0106C1_7F29.SAFE'

In [None]:
ax = res.plot(color="palegreen", edgecolor="green", figsize=(10, 5))
roi_area.to_crs(CRS).plot(ax=ax, color="red")

### S1 Calibration

In [None]:

s1_path_test = "/home/rustt/Documents/Projects/S1_S2_classification/Data/S1/zip/S1B_IW_GRDH_1SDV_20170425T214234_20170425T214302_005323_00953E_F02D.SAFE/measurement/s1b-iw-grd-vv-20170425t214234-20170425t214302-005323-00953e-001.tiff"


In [None]:
dem_path = str(Path(root_data_path) / "Tests" / "DEM" / "srtm_1secarc_Cayenne.tif")
out_path = str(Path(root_data_path) / "Tests" / "out_calib.tif")

In [None]:
from src.data_processing.s1_otb_processing import run_s1_processing_otb

In [None]:
Path(s1_path_test).stem

In [19]:
def load_meta_products(parse_list:bool=True): 
    meta = pd.read_csv(cfg.ConfigS1.meta_product_path)
    if parse_list:
        meta["raster_path"] = meta["raster_path"].apply(lambda x: literal_eval(x))
    return meta

In [20]:
res =  pd.read_csv(cfg.ConfigS1.meta_product_path)


In [21]:
res.raster_path

0    ['/home/rustt/Documents/Projects/S1_S2_classif...
1    ['/home/rustt/Documents/Projects/S1_S2_classif...
2    ['/home/rustt/Documents/Projects/S1_S2_classif...
3    ['/home/rustt/Documents/Projects/S1_S2_classif...
Name: raster_path, dtype: object

In [22]:
from ast import literal_eval 

res = load_meta_products()

In [None]:
# batch calib + ortho 
for i, row in res.iterrows():
    paths, prod_name = row["raster_path"], row["prod_name"]
    out_dir = os.path.join(cfg.data_dir_processed, "S1")
    if not os.path.exists(out_dir):
        os.mkdir(out_dir)

    for tif_path in paths:
        out_file = os.path.join(out_dir, f"calib_ortho_{Path(tif_path).stem}.tif")
        run_s1_processing_otb(tif_path, out_file)

In [None]:
cfg.data_dir_processed

In [None]:
import folium
from shapely.geometry import box
from shapely.geometry import Polygon
from pyproj import Transformer

In [None]:
m = folium.Map()
clip = folium.GeoJson(Polygon(prod_geom), style_function= lambda x: {"fillColor":"ff0000", 
                                                                             'color': 'black',
                                                                             'fillOpacity' : 0.7}).add_to(m)
#area = folium.GeoJson(Polygon(bbox), style_function= lambda x: {"color":"green", "weight":3, "fillOpacity":0}).add_to(m)
area = folium.GeoJson(Polygon(bbox), style_function= lambda x: {"color":"green", "weight":3, "fillOpacity":0}).add_to(m)


m