# Workflow to create and maintain the validation table for CSLC Cal/Val

This notebook demonstrates how to query the public S3 bucket hosting granules associated with bursts identified for CSLC Cal/Val activities.

Specifically the motivation here to access + record the preqrequiste information from each granule in order to virtually perform Cal/Val analyses.

For reference, the following resources were used to help create this notebook:
https://alexwlchan.net/2017/listing-s3-keys/
https://github.com/boto/boto3/issues/1200

## Load prerequisite modules

In [1]:
import boto3
from botocore import UNSIGNED
from botocore.config import Config

import fsspec

import geopandas as gpd

import h5py

import pandas as pd

from pathlib import Path

import shapely.wkt as wkt

## Static variables that identify S3 paths to data

<div class="alert alert-warning">
Only change IF you know what you are doing (i.e. itentional changes to reflect hypothetical migration of validation data).
</div>

In [2]:
# set S3 path variables
bucket = 'opera-pst-rs-pop1'
prefix = 'products/CSLC_S1'
suffix = 'Z.h5'
s3_path = f's3://{bucket}'
DATA_ROOT = 'science/SENTINEL1'

# track specific version and corresponding static layer
version_num = '0.1'
if version_num == '0.0':
    static_suffix = '_static_layers.h5'
    id_path = f'{DATA_ROOT}/identification'
if version_num == '0.1':
    static_suffix = '_Static.h5'
    id_path = f'identification'

# For priority bursts, specify: validation_bursts_priority_v0.1.csv
# For all bursts, specify: validation_bursts_all_v0.1.csv
validation_bursts = Path(f'validation_bursts_priority_v{version_num}.csv')

# Set name of output containing all burst products for streaming
validation_csv = Path(f'table_{validation_bursts}')

## Load function to query S3 bucket

In [3]:
def get_matching_s3_keys(bucket, prefix='', suffix='', burstId='', version_num=''):
    """
    Generate the keys in an S3 bucket.

    :param bucket: Name of the S3 bucket.
    :param prefix: Only fetch keys that start with this prefix.
    :param suffix: Only fetch keys that end with this suffix.
    :param burstId: Only fetch keys that match burstId.
    :param burstId: Only fetch keys that match version num.
    """
    s3 = boto3.client('s3', config=Config(signature_version=UNSIGNED))
    kwargs = {'Bucket': bucket}

    # If the prefix is a single string (not a tuple of strings), we can
    # do the filtering directly in the S3 API.
    # Efficiently capture only matches that share relevant filename prefix
    if isinstance(prefix, str):
        kwargs['Prefix'] = prefix + '/' + burstId

    # set versioning string
    version_num = '_v' + version_num + '_'

    # Max query for `list_object_v2` is only 1000
    # Need to institute while loop to circumvent
    while True:
        # 'Contents' contains information about the listed objects
        # Sort by last modified in order to get most recent static layer
        resp = s3.list_objects_v2(**kwargs)

        try:
            valid_results = resp['Contents']
        except KeyError: 
            break
            
        get_last_modified = lambda valid_results: int(valid_results['LastModified'].strftime('%s'))
        valid_results = [obj['Key'] for obj in sorted(valid_results, key=get_last_modified)]

        # filter by suffix, version number, and remove legacy products
        valid_results = [i for i in valid_results if i.endswith(suffix)]
        valid_results = [i for i in valid_results if version_num in i]
        valid_results = [i for i in valid_results if '_v0.0_202303' not in i]
        for key in valid_results:
            yield key

        # The S3 API is paginated, returning up to 1000 keys at a time.
        # Pass the continuation token into the next response, until we
        # reach the final page (when this field is missing).
        try:
            kwargs['ContinuationToken'] = resp['NextContinuationToken']
        except KeyError:
            break

    return

## Access or initiate validation table containing links to S3 bucket

In [4]:
# access dataframe from file, if it exists
if validation_csv.is_file():
    df = pd.read_csv(validation_csv)
    validation_bursts_df = gpd.GeoDataFrame(
        df.loc[:, [c for c in df.columns if c != "geometry"]],
        geometry=gpd.GeoSeries.from_wkt(df["geometry"])
        )
else:
    # otherwise, initialize dataframe
    validation_bursts_df = gpd.GeoDataFrame()
    # add placeholder columns
    validation_bursts_df['burst_id'] = None
    validation_bursts_df['date'] = None
    validation_bursts_df['cslc_url'] = None
    validation_bursts_df['cslc_static_url'] = None

## Access premade, static table containing all bursts identified for CSLC Cal/Val activities

In [5]:
# read list of bursts used for validation
if validation_bursts.is_file():
    df = pd.read_csv(validation_bursts)
    burstId_list = df['burst_id'].to_list()
else:
    raise Exception(f'Expected burst record {validation_bursts.absolute()} '
                    'not found. Check working directory.')

## Query server and build up validation table

In [6]:
# query products on S3 bucket
burstId_list = ['t064_135523_iw2','t071_151224_iw2','t107_227888_iw2'] #!# temp hardcode
for burstId in burstId_list:
    for sensor in ['S1A','S1B']:
        print(f'BurstID: {burstId}, sensor: {sensor}')

        # adjust burst strings to reflect product name convention
        query_burstId = burstId.upper().replace('_','-')
        cslc_url_base = 'OPERA_L2_CSLC-'+sensor+'_IW_' + query_burstId + '_VV_'
        cslc_static_url = None
        for key in get_matching_s3_keys(bucket=bucket,
                                        prefix=prefix,
                                        suffix=suffix,
                                        burstId=cslc_url_base,
                                        version_num=version_num):
            # only proceed if file not already captured in records
            # and only if there is a valid corresponding static layer
            cslc_url = f'{s3_path}/{key}'
            cslc_static_url_base = Path(cslc_url)
            cslc_static_url_base = cslc_static_url_base.parent.name.split(f'_v{version_num}_')[0]

            # 0.1 static layers have different naming conventions
            # which do not reflect a given acquisition date
            if version_num == '0.1':
                cslc_static_url_base = 'OPERA_L2_CSLC-'+sensor+'_IW_' + query_burstId + '_VV_'
            
            # avoid querying the same static URL for v0.1
            # for which it is the same for each burst
            if cslc_static_url == None or version_num == '0.0':
                for static_key in get_matching_s3_keys(bucket=bucket,
                                                    prefix=prefix,
                                                    suffix=static_suffix,
                                                    burstId=cslc_static_url_base,
                                                    version_num=version_num):
                    if static_key is not None:
                        cslc_static_url = f'{s3_path}/{static_key}'
                        break

            if cslc_url not in validation_bursts_df['cslc_url'].values.astype(str):
                # get date
                file_path = Path(key)
                filename = file_path.name
                date = filename.split('_')[-3][:8]

                # check if geometry for burst already in df
                geom_check = validation_bursts_df['burst_id'] == burstId
                idx_geo = next(iter(geom_check.index[geom_check]), False)
                if idx_geo != False:
                    geometry = validation_bursts_df.loc[idx_geo]['geometry']
                else:
                    # otherwise, read file to access geometry
                    s3f = fsspec.open(cslc_url, mode='rb', anon=True,
                                        default_fill_cache=False)
                    with h5py.File(s3f.open(),'r') as h5:
                        if version_num == '0.0':
                            geometry = h5[f'{id_path}/'
                                        'bounding_polygon'][()].astype(str)
                        if version_num == '0.1':
                            geometry = h5[f'{id_path}/'
                                        'bounding_polygon'][()].astype(str)
                    geometry = wkt.loads(geometry)

                # create dictionary for this file
                file_dict = {'burst_id': burstId,
                                'date': date,
                                'cslc_url': cslc_url,
                                'cslc_static_url': cslc_static_url,
                                'geometry': geometry}
                # append to records
                validation_bursts_df = pd.concat([validation_bursts_df, \
                    gpd.GeoDataFrame([file_dict])], ignore_index=True)

BurstID: t064_135523_iw2, sensor: S1A
BurstID: t064_135523_iw2, sensor: S1B
BurstID: t071_151224_iw2, sensor: S1A
BurstID: t071_151224_iw2, sensor: S1B
BurstID: t107_227888_iw2, sensor: S1A
BurstID: t107_227888_iw2, sensor: S1B


## Save validation table

In [7]:
# sort by time
validation_bursts_df = validation_bursts_df.sort_values(by=['burst_id', 'date'], ascending=[True, True])
# save table to file
validation_bursts_df.to_csv(validation_csv, index=False)