# ASF HyP3

> Module for transferring HyP3 processed data to Earth Engine. 

In [None]:
#| default_exp asf_hyp3

# Module Imports

In [None]:
#| export
import datetime
import logging
import tempfile
import os
import re
import subprocess
import zipfile
from pprint import pprint

import asf_search
from IPython.display import JSON
import ee
from fastcore.basics import patch
import gcsfs
from hyp3_sdk import HyP3
from rio_cogeo import cogeo

from sar_asf_to_gee import core

# Prerequisites

Authenticate to NASA Earthdata
- See: https://nasa-openscapes.github.io/2021-Cloud-Hackathon/tutorials/04_NASA_Earthdata_Authentication.html
- See: https://urs.earthdata.nasa.gov/documentation/for_users/data_access/create_net_rc_file

Authenticate to Google Cloud
- See: https://cloud.google.com/sdk/gcloud/reference/auth/application-default/loginca
- `gcloud auth application-default login`

Authenticate to Google Earth Engine
- See: https://developers.google.com/earth-engine/guides/auth
- `earthengine authenticate`

# Setup

In [None]:
# Create a HyP3 instance.
hyp3 = HyP3()

In [None]:
# Set the logging level to display detailed information.
logging.basicConfig(level=logging.INFO)

# ASF HyP3 Files

[HyP3](https://hyp3-docs.asf.alaska.edu/) processing jobs can be initiated in a variety of ways, including the [Vertex](https://hyp3-docs.asf.alaska.edu/using/vertex/) web application and the [HyP3 Python SDK](https://hyp3-docs.asf.alaska.edu/using/sdk/).

## Starting HyP3 processing



In [None]:
job_type = 'RTC_GAMMA'
job_name = 'RTC_processing_example'
granule_for_rtc = 'S1A_IW_SLC__1SSV_20150621T120220_20150621T120232_006471_008934_72D8'
def submit_job():
    return hyp3.submit_rtc_job(granule_for_rtc, job_name)

In [None]:
# job_type = 'INSAR_GAMMA'
# job_name = 'INSAR_GAMMA_processing_example'
# granule1 = 'S1A_IW_SLC__1SDV_20240311T185926_20240311T185953_052938_066872_3CAD'
# granule2 = 'S1A_IW_SLC__1SDV_20231206T185929_20231206T185956_051538_0638B3_78A8'
# def submit_job():
#     return hyp3.submit_insar_job(granule1, granule2, job_name)

In [None]:
# job_type = 'INSAR_ISCE_BURST'
# job_name = 'INSAR_ISCE_BURST_processing_example'
# burst1 = 'S1_184906_IW1_20240104T154111_VV_3C7F-BURST'
# burst2 = 'S1_184906_IW1_20240116T154110_VV_D1E5-BURST'
# def submit_job():
#     return hyp3.submit_insar_isce_burst_job(burst1, burst2, job_name)

In [None]:
batch_completed = hyp3.find_jobs(
    job_type=job_type,
    name=job_name
)
print(f'Number of {job_type} jobs = {len(batch_completed)}')

In [None]:
batch_active = core.filter_jobs(batch_completed.jobs, expired=False)
print(f'Number of active {job_type} jobs = {len(batch_active)}')

In [None]:
if not batch_active:
    print('Job results for {job_name} were not found. Starting a new job.')
    job = submit_job()
    #job = hyp3.watch(job)

## Finding HyP3 Files

The status of previously submitted jobs can be checked on the following page:
https://search.asf.alaska.edu/#/?searchType=On%20Demand

In [None]:
batch_succeeded = [job for job in batch_active 
                   if job.to_dict()['status_code'] == 'SUCCEEDED']
if len(batch_succeeded) == 0:
    print(f'No successful {job_type} jobs found. Please wait until one of the current active jobs finishes.')
else:
    print(f'Number of successful {job_type} jobs = {len(batch_succeeded)}')
    print(f'Selecting the latest successful job.')
    job = batch_active[0]

In [None]:
display(JSON(job.to_dict()))

# Transfer completed jobs

Create a class that can be used to transfer data between ASF HyP3, a local machine, Google Cloud Storage, and Earth Engine.

In [None]:
#| export
class Transfer():
    def __init__(
        self,
        job_dict,  # HyP3 job dictionary 
        gcs_bucket,  # GCS bucket
        gee_gcp_project, # GCP project used by Earth Engine
        gee_image_collection=None,  # Name of the Earth Engine ImageCollection (optional)
        local_storage=None,
    ):
        self.job_dict = job_dict
        self.gcs_bucket = gcs_bucket
        self.gee_gcp_project = gee_gcp_project
        self.gee_image_collection = gee_image_collection
        if local_storage:
            self.tempdir = None
            self.local_storage = local_storage
        else:
            self.tempdir = tempfile.TemporaryDirectory() 
            self.local_storage = self.tempdir.name
            logging.debug(f'created temporary directory: {self.tempdir.name}')

Create an Transfer class instance.

In [None]:
t = Transfer(
    job_dict=job.to_dict(),
    gcs_bucket='hyp3-data-staging',
    gee_gcp_project='sar-asf-to-gee',
    gee_image_collection=f'HyP3-{job_name}',
    local_storage='temp_downloads',
)

Create a class method for transferring results from HyP3 to a local computer.

In [None]:
#| export
@patch
def to_local(
    self:Transfer,           
):
    "Transfer HyP3 results to local system, unzip, and update the job dictionary."    
    logging.info(f'Starting hpy3_results_to_local()')
    for file in self.job_dict['files']:
        logging.info(f'Processing {file["filename"]}')
        asf_search.download_url(
            url=file['url'],
            path=self.local_storage,
            filename=file['filename'],
        )
        # Unzip the file
        logging.info(f'  Unzipping the file')
        with zipfile.ZipFile(os.path.join(self.local_storage, file['filename']), 'r') as zip_ref:
            zip_ref.extractall(self.local_storage)

        # List the TIF files.
        scene_name = file['filename'].removesuffix('.zip')
        tifs = [x for x in os.listdir(
                    os.path.join('temp_downloads', scene_name))
                if x.endswith('.tif')]

        for count, tif in enumerate(tifs):
            logging.info(f'  Converting to a Cloud Optimized GeoTIFF. {count + 1}/{len(tifs)}')
            subprocess.run([
                "rio",
                "cogeo",
                "create",
                os.path.join(self.local_storage, scene_name, tif),
                os.path.join(self.local_storage, scene_name, tif)
            ])
        
        tif_dict = {}
        pattern = rf'^({scene_name}_(.+).tif)$'
        for i in tifs:
            groups = re.search(pattern, i).groups()
            tif_dict[groups[1]] = os.path.join(scene_name, groups[0])
        
        file['extracted'] = tif_dict

In [None]:
t.to_local()

Display the job dictionary, which now includes the list of extracted files (`root` => `files` => # => `extracted`).

In [None]:
display(JSON(t.job_dict))

## Transfer to Google Cloud Storage

Create a method for transferring results from a local computer to Google Cloud Storage.

In [None]:
#| export
@patch
def to_gcs(
    self:Transfer,
):
    logging.info('Starting to_gcs()')

    fs = gcsfs.GCSFileSystem(token='google_default')

    for file in self.job_dict['files']:
        for band, filename in file['extracted'].items():
            gcs_path = f'{self.gcs_bucket}/{filename}'
            if fs.exists(gcs_path):
                logging.info(f'GCS file already exists:\n    {gcs_path}')
            else:
                logging.info(f'Starting to transfer file to GCS:\n    {gcs_path}')
                # Transfer the local file to GCS.
                fs.put_file(
                    lpath=f"{self.local_storage}/{filename}",
                    rpath=gcs_path
                )    
                logging.info(f'Transferred file to GCS: {gcs_path}')

Transfer the files to Google Cloud Storage.  If your Google Cloud authentication credentials have expired, you will get an error and will need to reauthenticate
`gcloud auth application-default login`

In [None]:
t.to_gcs()

## Create a GEE Assets

In [None]:
#| export
@patch
def create_gee_asset(
    self:Transfer,
):
    "Create an Earth Engine asset."
    logging.info(f'Starting create_gee_asset()')
    
    ee.Initialize(project=self.gee_gcp_project)
    
    core.create_gee_image_collection(self.gee_gcp_project, self.gee_image_collection)

    granule_names = self.job_dict['job_parameters']['granules']
    granules = asf_search.granule_search(granule_names)

    granule_times = [datetime.datetime.fromisoformat(x.properties['stopTime']) for x in granules]
    start_time = min(granule_times)
    end_time = max(granule_times)
    
    id = f"{self.job_dict['job_id']}"

    props = granules[0].properties
    description = (f"{props['platform']}"
                   f" - {props['processingLevel']}"
                   f" - {props['beamModeType']}")
    
    for file_dict in self.job_dict['files']:
        for band, filename in file_dict['extracted'].items():

            # Skip non-geocoded (native range-doppler coordinates) TIFFs.
            if filename.endswith('_rdr.tif'):
                continue
            
            gcs_path = f'{self.gcs_bucket}/{filename}'
            
            request = {
                'type': 'IMAGE',
                'bands': {  # TODO: Update this once multi-band COG assets are supported
                    'id': band
                },
                'gcs_location': {
                    'uris': [f'gs://{gcs_path}']
                },
                'properties': {
                    'source':  file_dict['url'],
                    'band': band  # TODO: Remove this once multi-band COG assets are supported
                },
                'startTime': start_time.strftime(core.FORMAT_GEE_DATETIME_STRING),
                'endTime': end_time.strftime(core.FORMAT_GEE_DATETIME_STRING),
                'description': description
            }

            path_parts = [
                'projects',
                self.gee_gcp_project,
                'assets',
                self.gee_image_collection,
                # TODO: Remove the band suffix once multi-band COG assets are supported
                f'{id}_{band}'.replace(".", "_") 
            ]
            assetname = os.path.join(*[x for x in path_parts if x is not None])

            logging.debug(f'request = {request}')
            logging.debug(f'assetname = {assetname}')
            try:
                ee.data.createAsset(
                    value=request,
                    path=assetname
                )  
                logging.info(f'Finished creating a GEE asset:\n    {assetname}.')
            except ee.EEException as e:
                print(f'e = {e}')
                if "does not exist or doesn't allow this operation" in str(e):
                    raise(e)
                else:
                    raise(e)  # TODO: Add logic to parse the EEException message.
                    logging.info('GEE asset already exists. Skipping.')

In [None]:
t.create_gee_asset()

The Google Earth Engine Code Editor can be used to visualize these assets. Here is a template script that demonstrates basic visualization:
https://code.earthengine.google.com/4140085702fc842227dc641426acb983
Note that you will need to update the script to reference Earth Engine assets that you have permissions to access (for example: assets that you have created).

In [None]:
#| hide
import nbdev; nbdev.nbdev_export()