In [4]:
#default_exp prep_utils

# Prep Utils

> Basic utility functions across ard-workflows.

In [2]:
#hide
%load_ext autoreload
%autoreload 2

In [3]:
#hide
from nbdev.showdoc import *

In [1]:
#export
import os
import zipfile
import time
from glob import glob
import gc
import shutil
import logging
import subprocess
import platform

import boto3
import gdal
from asynchronousfilereader import AsynchronousFileReader

from sedas_pyapi.sedas_api import SeDASAPI
from sedas_pyapi.bulk_download import SeDASBulkDownload

In [16]:
import pandas as pd

## Sedas

For NovaSAR and other Catapult-hosted datasets, me must download from Sedas.

In [39]:
#export
def sedas_client():
    "Log into sedas Test api."
    sedas = SeDASAPI(os.getenv('SEDAS_USERNAME'), os.getenv('SEDAS_PWD'))
    sedas.base_url="https://geobrowsertest.satapps.org/api/"
    
    sedas.sensor_url = f"{sedas.base_url}sensors"
    sedas.authentication_url = f"{sedas.base_url}authentication"
    sedas.search_url = f"{sedas.base_url}search"
    sedas._token = None  
    # now we can get the users actual test password
    sedas._username = 'tom_jones'
    sedas.__password = os.getenv('SEDAS_PWD')
    # and log into test
    sedas.login()
    
    return sedas

In [42]:
sedas_client()

<sedas_pyapi.sedas_api.SeDASAPI at 0x7fdf22b83890>

should inc. test for env vars...?

And we'll want to find datasets from different *Collections*, which sedas expects as *groups*. 

In [11]:
#export
def sedas_get_collections():
    sedas = sedas_client()
    result_groups = sedas.list_sensor_groups()
    groups = []
    for i in range(0, len(result_groups)):
        groups.append(result_groups[i]['name'])
    return f"Available groups are: {', '.join(groups)}"

In [12]:
sedas_get_collections()

'Available groups are: Cosmo-SkyMed, SPOT, Pleiades, S1, S2, AIRSAR'

In [44]:
#export
def sedas_find_datasets(wkt, startDate, endDate, collection):
    optical_groups = ['S2','Pleiades','SPOT']
    sar_groups = ['S1','Cosmo-SkyMed','AIRSAR','NovaSAR']
    sedas = sedas_client()
    if collection in optical_groups: 
        res = sedas.search_optical(wkt, startDate, endDate, source_group=collection) 
    elif collection in sar_groups:
        res = sedas.search_sar(wkt, startDate, endDate, source_group=collection)
    return res

For example for a small area over Oxford we can find...

In [17]:
result = sedas_find_datasets("POLYGON((-1.91 51.81,-1.15 51.81,-1.15 51.50,-1.91 51.50,-1.91 51.81))", 
                             "2000-01-01T00:00:00Z", 
                             "2020-10-27T00:00:00Z",
                             "Pleiades"
                            )
pd.DataFrame(result['products']).head(2)

Unnamed: 0,productId,supplierId,type,satelliteName,instrumentName,modeName,sensorType,sensorResolution,coordinatesWKT,start,...,area,aoiCoveragePercent,usefulAreaPercent,cloudCoveragePercent,productType,latency,ql,thumbnail,vendorSpecific,downloadUrl
0,c0ec9a5f12356e87bc910ddbc49dbb76,Pleiades_UKSA396_SO18034616-96-01_DS_PHR1B_201...,ARCHIVE,Pleiades-1B,MS/PAN,0.0,Optical,2.0,"POLYGON((-1.654728 51.309517,-1.345414 51.3081...",2018-10-24T11:17:22Z,...,482512500.0,1.0,5.0,0.0,L3,Standard,https://geobrowser.satapps.org/archiveql/aeweb...,https://sedasdm.satapps.org/qls/qlmgr.php?scen...,"{'property': 'vendorSpecific', 'Filehash': 'cf...",https://sedasdm.satapps.org/datamgr/datamgr.ph...
1,94cc7887414be0912de7ca44288f79da,Pleiades_UKSA174_SO18034614-74-01_DS_PHR1A_201...,ARCHIVE,Pleiades-1A,MS/PAN,0.0,Optical,2.0,"POLYGON((-2.153852 51.603313,-1.841533 51.6033...",2018-09-29T11:10:08Z,...,156526000.0,2.0,22.0,0.0,L3,Standard,https://geobrowser.satapps.org/archiveql/aeweb...,https://sedasdm.satapps.org/qls/qlmgr.php?scen...,"{'property': 'vendorSpecific', 'Filehash': '4b...",https://sedasdm.satapps.org/datamgr/datamgr.ph...


In [24]:
#export
def sedas_download(sedas_res_dicts, down_dir, sedas=None):
    "Use product dicts list to download into a dir."
    if not sedas:
        sedas = sedas_client()
    downloader = SeDASBulkDownload(sedas, down_dir, parallel=2)
    downloader.add(sedas_res_dicts)
    print(f"{time.strftime('%Y-%m-%d %H:%M:%S')} Downloading")
    while not downloader.is_done():
        time.sleep(5)
    downloader.shutdown()
    print(f"{time.strftime('%Y-%m-%d %H:%M:%S')} Downloaded")

In [None]:
# tbd example

In [25]:
#export
def sedas_extract(down_zip, scene_dir):
    print(f"{time.strftime('%Y-%m-%d %H:%M:%S')} Extracting {down_zip}")
    with zipfile.ZipFile(down_zip, 'r') as zip_ref:
        zip_ref.extractall(scene_dir)
    if os.path.exists(scene_dir):
        os.remove(down_zip)
    print(f"{time.strftime('%Y-%m-%d %H:%M:%S')} Extracted to {scene_dir}")

In [None]:
# tbd example

## Cloud-Optimised Formatting

In [4]:
#export
def convert2cog(img_path, cog_path, band):
    """
    Convert gdal raster into COG with default settings.
    Considering lzw compression.
    See https://www.cogeo.org/developers-guide.html.
    """
    # translate into new cog file
    kwargs = {
        'format': 'COG',
#         'creationOptions' : ['COMPRESS=LZW'],
        'bandList': [band]
    }
    print(f"{time.strftime('%Y-%m-%d %H:%M:%S')} Starting conversion: {img_path}.")
    ds = gdal.Translate(cog_path, img_path, **kwargs)
    ds = None
    print(f"{time.strftime('%Y-%m-%d %H:%M:%S')} Conversion complete: {os.path.exists(cog_path)}.")

In [5]:
#export
def mosaic2cog(imgs, out_cog, band):
    """
    Mosaic imgs into cog.
    Usual vrt assumptions.
    """
    tmp_vrt = f"{out_cog[:-4]}_mosaic.tif"
    # mosaic into vrt
    print(f"{time.strftime('%Y-%m-%d %H:%M:%S')} Mosaicing band {band} imgs: {imgs}")
    ds = gdal.BuildVRT(tmp_vrt, imgs, bandList=[band])
    print(f"{time.strftime('%Y-%m-%d %H:%M:%S')} Mosaicd {tmp_vrt}")
    # write vrt mosaic to cog
    convert2cog(ds, out_cog, 1)
    ds = None

    if os.path.exists(tmp_vrt):
        os.remove(tmp_vrt)

In [6]:
#export
def cogmosaicbands(img_paths, numbands, basename):
    """
    Convert bands from img_paths to cog, output 
    basename_band#.tif. Mosaic if >1 img paths.
    """
    if len(img_paths) == 1:
        for b in range(1, numbands+1):
            convert2cog(img_paths[0], f"{basename}_band{b}.tif", b)
        for f in glob(f"{img_paths[0][:-4]}*"):
            os.remove(f)
    elif len(img_paths) > 1:
        for b in range(1, numbands+1):
            mosaic2cog(img_paths[0], f"{basename}_band{b}.tif", b)
        for img_path in img_paths:
            for f in glob(f"{img_path[:-4]}*"):
                os.remove(f)

## Object Storage

In [16]:
#export
def s3_create_client(s3_bucket):
    """
    Create and set up a connection to S3
    :param s3_bucket:
    :return: the s3 client object.
    """
    access = os.getenv("AWS_ACCESS_KEY_ID")
    secret = os.getenv("AWS_SECRET_ACCESS_KEY")

    session = boto3.Session(
        access,
        secret,
    )
    endpoint_url = os.getenv("AWS_S3_ENDPOINT_URL")

    if endpoint_url is not None:
        s3 = session.resource('s3', endpoint_url=endpoint_url)
    else:
        s3 = session.resource('s3', region_name='eu-west-2')

    bucket = s3.Bucket(s3_bucket)

    if endpoint_url is not None:
        s3_client = boto3.client(
            's3',
            aws_access_key_id=access,
            aws_secret_access_key=secret,
            endpoint_url=endpoint_url
        )
    else:
        s3_client = boto3.client(
            's3',
            aws_access_key_id=access,
            aws_secret_access_key=secret
        )
    return s3_client, bucket

In [9]:
#export
gb = 1024 ** 3

In [10]:
#export
def s3_single_upload(in_path, s3_path, s3_bucket):
    """
    put a file into S3 from the local file system.
    :param in_path: a path to a file on the local file system
    :param s3_path: where in S3 to put the file.
    :return: None
    """
    # prep session & creds
    s3_client, bucket = s3_create_client(s3_bucket)

    # Ensure that multipart uploads only happen if the size of a transfer is larger than
    # S3's size limit for non multipart uploads, which is 5 GB. we copy using multipart 
    # at anything over 4gb
    transfer_config = boto3.s3.transfer.TransferConfig(multipart_threshold=2 * gb,
                                                       max_concurrency=10,
                                                       multipart_chunksize=2 * gb,
                                                       use_threads=True)
    s3_client.upload_file(in_path, bucket.name, s3_path)

In [11]:
#export
def get_rel_dir_s3_paths(local_dir, s3_dir):
    """
    returns local path, remote path pair list.
    """  
    paths = []
    for subdir, dirs, files in os.walk(local_dir):
        for file in files:
            full_path = os.path.join(subdir, file)
            paths.append([ full_path, s3_dir + local_dir.split('/')[-2] + '/' + full_path[len(local_dir):] ])
    return paths

In [12]:
#export
def s3_upload_dir(in_dir, s3_bucket, s3_dir):
    """
    Upload all items in directory, inc. dir.
    """
    paths = get_rel_dir_s3_paths(in_dir, s3_dir)
    upload_list = [(in_path, out_path, s3_bucket)
                   for in_path, out_path in paths]
    for i in upload_list:
        s3_single_upload(i[0], i[1], i[2])

In [13]:
#export
def s3_list_objects_paths(s3_bucket, prefix):
    """List of paths only returned, not full object responses"""
    client, bucket = s3_create_client(s3_bucket)
    
    return [e['Key'] for p in client.get_paginator("list_objects_v2").paginate(Bucket=s3_bucket, Prefix=prefix) for e in p['Contents']]


In [14]:
#export
def s3_list_objects(s3_bucket, prefix):
    # prep session & creds
    client, bucket = s3_create_client(s3_bucket)
    response = client.list_objects_v2(Bucket=s3_bucket, Prefix=prefix)

    return response

## General

In [15]:
#export
def clean_up(work_dir):
    # TODO: sort out logging changes...
    gc.collect()
    shutil.rmtree(work_dir)
    pass

In [18]:
#export
def setup_logging():

    logging.basicConfig(level=logging.DEBUG)
    root = logging.getLogger()
    root.setLevel(os.environ.get("LOGLEVEL", "DEBUG"))

    # Turn down rasterio. It is extremely chatty at debug level.
    logging.getLogger("rasterio").setLevel("INFO")
    logging.getLogger("rasterio._io").setLevel("WARNING")

    # Boto Core is also very chatty at debug. Logging entire request text etc
    logging.getLogger("botocore").setLevel("INFO")
    logging.getLogger("boto").setLevel("INFO")
    logging.getLogger("boto3.resources").setLevel("INFO")
    logging.getLogger("s3transfer").setLevel("INFO")
    logging.getLogger("urllib3").setLevel("INFO")

    return root

## SNAP

In [52]:
#export
def run_snap_command(command, timeout =  60*45):
    """
    Run a snap command. Internal use.

    :param command: the list of arguments to pass to snap
    :return: None
    """

    # if we need to prepend the snap executable.
    if command[0] != os.environ['SNAP_GPT']:
        full_command = [os.environ['SNAP_GPT']] + command
    else:
        full_command = command

    # on linux there is a warning message printed by snap if this environment variable is not set.
    base_env = os.environ.copy()
    if "LD_LIBRARY_PATH" not in base_env and platform.system() != "Windows":
        base_env["LD_LIBRARY_PATH"] = "."

    logging.debug(f"running {full_command}")

    process = subprocess.Popen(full_command, env=base_env, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    process.timeout = timeout
    snap_logger_out = logging.getLogger("snap_stdout")
    snap_logger_err = logging.getLogger("snap_stderr")
    std_out_reader = AsynchronousFileReader(process.stdout)
    std_err_reader = AsynchronousFileReader(process.stderr)

    def pass_logging():
        while not std_out_reader.queue.empty():
            line = std_out_reader.queue.get().decode()
            snap_logger_out.info(line.rstrip('\n'))
        while not std_err_reader.queue.empty():
            line = std_err_reader.queue.get().decode()
            snap_logger_err.info("stderr:" + line.rstrip('\n'))
    try:
        while process.poll() is None:
            pass_logging()

        std_out_reader.join()
        std_err_reader.join()
    except subprocess.TimeoutExpired as e :
        logging.error(f"IGNORING subprocess timeout running {command}")
        return
    if process.returncode != 0:
        raise Exception("Snap returned non zero exit status")



## Export

In [61]:
#hide
from nbdev.export import notebook2script; notebook2script()

Converted 1_prep_utils.ipynb.
Converted Untitled.ipynb.
