In [1]:
import datetime
import requests
import hyp3_sdk
import numpy as np
from tqdm import tqdm 
from copy import deepcopy
import requests
import itertools
from multiprocessing import Pool

In [None]:
%load_ext autoreload
%autoreload 2

In [2]:
def isce_jobs4resubmit(jobs_dicts:list):
    job_definition = {'name': None,
                      'job_parameters': None,
                      'job_type': 'INSAR_ISCE'
                     }

    prepared_jobs = []
    for job in jobs_dicts:
        prepared_job = deepcopy(job_definition)
        prepared_job['name'] = job['name']
        prepared_job['job_parameters'] = job['job_parameters']
        prepared_job['job_type'] = job['job_type']
        prepared_jobs.append(prepared_job)
    return prepared_jobs

def raider_jobs4resubmit(jobs_dicts:list):
    job_definition = {
        'name': 're-run RAiDER step',  # optional: provide a name
        "job_type": "ARIA_RAIDER",
        "job_parameters": {
            # "job_id": "27836b79-e5b2-4d8f-932f-659724ea02c3",
            # "weather_model": "ERA5"
        },
    }

    prepared_jobs = []
    for job in jobs_dicts:
        prepared_job = deepcopy(job_definition)  
        if job['job_type'] == 'INSAR_ISCE':
            prepared_job['name'] = job['name'] + '_rerun_raider'
            prepared_job['job_parameters']['job_id'] = job['job_id']
        else:
            prepared_job['name'] = job['name'] 
            prepared_job['job_parameters']['job_id'] = job['job_parameters']['job_id']
        prepared_job['job_parameters']['weather_model'] = job['job_parameters']['weather_model'] 
        prepared_jobs.append(prepared_job)
    return prepared_jobs

def split_failed_jobs(failed_jobs):
    flag_insar_job = lambda x, y: (x.job_type == 'INSAR_ISCE') & (len(x.processing_times)==y)
    # isce
    isce_jobs = [j.to_dict() for j in failed_jobs if flag_insar_job(j, 1)]
    # raider
    raider_jobs_new = [j.to_dict() for j in failed_jobs if flag_insar_job(j, 2)] 
    raider_jobs = [j.to_dict() for j in failed_jobs if j.job_type == 'ARIA_RAIDER']
    raider_jobs = raider_jobs_new + raider_jobs
    return isce_jobs, raider_jobs

def get_request_date(job_dict):
    job_date = datetime.datetime.strptime(job_dict['request_time'], '%Y-%m-%dT%H:%M:%S%z')
    return datetime.datetime.strftime(job_date,'%Y-%m-%d') 

# TODO: add function to find duplicate jobs to filter out same jobs
# for now track it by date
class access_hyp3_resubmit():
    def __init__(self, hyp3_jobs, verbose=True):
        # Filter to failed jobs
        failed_jobs = hyp3_jobs.filter_jobs(succeeded=False, pending=False,
                                            running=False, failed=True)
        print(f'Failed jobs:\n {failed_jobs}')

        # Failed at topssApp step
        self.isce_jobs, self.raider_jobs = split_failed_jobs(failed_jobs)
        if verbose:
            self.print_info()

    def print_info(self):
        print(f'Topsapp jobs: {len(self.isce_jobs)}')
        print(f' Submission dates: {np.unique(list(map(get_request_date, self.isce_jobs)))}')

        # Failed at raider step
        print(f'Raider jobs: {len(self.raider_jobs)}')
        print(f' Submission dates: {np.unique(list(map(get_request_date, self.raider_jobs)))}')

    def prepare_jobs(self, batch_size=200):
        self.prepared_isce = isce_jobs4resubmit(self.isce_jobs)
        self.prepared_raider = raider_jobs4resubmit(self.raider_jobs)
        # isce
        if len(self.prepared_isce) > 0:
            self.isce_resubmit = [batch for batch in hyp3_sdk.util.chunk(self.prepared_isce, batch_size)]
            msg = f'Prepared isce jobs: {len(self.prepared_isce)} in {len(self.isce_resubmit[0])}'
            msg += f'x {len(self.isce_resubmit)} batches'
            print(msg)
        else:
            self.isce_resubmit = None 
        # raider
        if len(self.prepared_raider) > 0:
            self.raider_resubmit = [batch for batch in hyp3_sdk.util.chunk(self.prepared_raider, batch_size)]
            msg = f'Prepared raider jobs: {len(self.prepared_raider)} in {len(self.raider_resubmit[0])}'
            msg += f'x {len(self.raider_resubmit)} batches'
            print(msg)
        else:
            self.raider_resubmit = None
        
    def submit_isce(self):
        if self.isce_resubmit is None:
            raise ValueError('There are no prepared jobs!') 
        print(f'Submitting: {len(self.prepared_isce)} jobs')
        for batch in tqdm(self.isce_resubmit):
            hyp3_isce.submit_prepared_jobs(batch)

    def submit_raider(self):
        if self.raider_resubmit is None:
            raise ValueError('There are no prepared jobs!') 
        print(f'Submitting: {len(self.prepared_raider)} jobs')
        for batch in tqdm(self.raider_resubmit):
            hyp3_isce.submit_prepared_jobs(batch) 


In [3]:
## Filtering functions
def _exist(failed_job, filt_job_list):
    return failed_job['job_parameters'] in filt_job_list

# Filter based on status code
def filter_failed_jobs(failed_jobs:list, jobs: hyp3_sdk.jobs.Batch, n_jobs:int=10, verbose:bool = False):
    # Get succeded, running and pending
    filt_job_list = [j.job_parameters for j in jobs.filter_jobs().jobs]
    if verbose: print(f'Succedded,pending,running jobs: {len(filt_job_list)}')
    if verbose: print(f'Failed jobs: {len(failed_jobs)}')
    
    # Check if exists in parallel 
    with Pool(n_jobs) as pool:
        flags = pool.starmap(_exist, zip(failed_jobs, itertools.repeat(filt_job_list)))
    if verbose: print(f'Failed jobs that exist in above: {np.count_nonzero(np.atleast_1d(flags))}')

    # Filter 
    filt_failed = list(itertools.compress(failed_jobs, ~np.bool_(flags)))
    if verbose: print(f'Number of filtered failed jobs: {len(filt_failed)}')

    return filt_failed

# Filter duplicates
# NOTE: wrote this fast, probably could be done better
request_date2dt = lambda x: datetime.datetime.strptime(x['request_time'],
                                                         '%Y-%m-%dT%H:%M:%S%z')
def find_latest_datetime_index(datetimes):
    if not datetimes:
        return None
    
    latest_index = 0
    latest_datetime = datetimes[0]
    
    for i, dt in enumerate(datetimes[1:], start=1):
        if dt > latest_datetime:
            latest_datetime = dt
            latest_index = i
    
    return latest_index

def get_job_last_requested(duplicate_jobs: list): 
    dates = [request_date2dt(d) for d in duplicate_jobs]
    ix = find_latest_datetime_index(dates)
    return duplicate_jobs[ix]

 
def remove_duplicates(job_list: list):
    # strip to only job_parameters
    jobs2count = [j['job_parameters'] for j in job_list]

    # Get count for jobs
    count = {} 
    for ix, j1 in enumerate(jobs2count):
        index = []
        for ik, j2 in enumerate(jobs2count):
            if  j1 == j2:
                index.append(ik)
        count[ix] = index

    #Filter out same duplicates
    for ix in list(count):
        if count.get(ix, None) is not None: 
            for ik in count.get(ix, None):
                if ik > ix:
                    count.pop(ik, None)

    # Loop through duplicates and get job last request
    filt_jobs = []
    for ix in count:
        duplicates =  [job_list[i] for i in count[ix]]
        if len(duplicates) > 1:
            filt_jobs.append(get_job_last_requested(duplicates))
        else:
            filt_jobs.append(duplicates[0])
    
    return filt_jobs, count

def remove_raider_duplicates(job_list: list):
    # Strip job_id  
    jobs2count = []
    for j in job_list:
        if j['job_type'] == 'INSAR_ISCE':
            jobs2count.append(j['job_id'])
        elif j['job_type'] == 'ARIA_RAIDER':
            jobs2count.append(j['job_parameters']['job_id'])

    # Get unique job_ids
    unique, indices, counts = np.unique(jobs2count,
                                        return_index=True,
                                        return_counts=True)
    # Filter unique raider failed jobs
    count_1 = np.array(job_list)[indices[counts == 1]]

    # Find duplicates raider failed jobs
    duplicates = []
    for ix in unique[counts>1]: 
        indices = np.where(ix == np.array(jobs2count))[0]
        duplicates.append(indices)

    # Loop through duplicates and get the last request job
    filt_jobs = []
    for ix in duplicates:
        dx = [job_list[i] for i in ix]
        filt_jobs.append(get_job_last_requested(dx))

    filt_jobs = np.concatenate([filt_jobs, count_1])
    return filt_jobs.tolist()


# Filter based on exit log message
# example when job failed at raider step due to requested model that does not exist in that aoi
hrr_warning_msg = 'ValueError: HRRR was requested but it is not available in this area'

def get_log_exit_msg(log_url, msg):
    response = requests.get(log_url, headers = {'Range': 'bytes=-1000'})
    return response.text.split('\n')[-1] == msg

# Example notebook to inspect jobs: https://github.com/ACCESS-Cloud-Based-InSAR/Aria-Hyp3-GUNW-Frame-Submission/blob/dev/operational_scripts/__Andrew_Inspection.ipynb

## Setup
example on jobs submitted for NISAR calval

In [5]:
# uses .netrc; add `prompt=True` to prompt for credentials;
hyp3_isce = hyp3_sdk.HyP3('https://hyp3-a19-jpl.asf.alaska.edu', prompt=True)
# hyp3_isce = hyp3_sdk.HyP3('https://hyp3-tibet-jpl.asf.alaska.edu', prompt=True)

In [6]:
filtering_flags = dict(succeeded=False, pending=False,
                        running=False, failed=True)

first jobs were submitted on May 8

In [6]:
# May-08 to May 09 

# request separate to differeniate between topsApp and raider jobs
jobs = hyp3_isce.find_jobs(start=datetime.datetime(2024, 5, 8), 
                           end=datetime.datetime(2024, 5, 9),
                           user_id='access_cloud_based_insar',
                           job_type='INSAR_ISCE')
print(jobs)
print('#' * 10)
jobs = access_hyp3_resubmit(jobs.filter_jobs(**filtering_flags))

raider_jobs = hyp3_isce.find_jobs(user_id='access_cloud_based_insar',
                           start=datetime.datetime(2024, 5, 8), 
                           end=datetime.datetime(2024, 5, 9),
                           job_type='ARIA_RAIDER')
print('#' * 10)
print('Raider')
print(raider_jobs)

63861 HyP3 Jobs: 14695 succeeded, 49166 failed, 0 running, 0 pending.
##########
Failed jobs:
 49166 HyP3 Jobs: 0 succeeded, 49166 failed, 0 running, 0 pending.
Topsapp jobs: 45687
 Submission dates: ['2024-05-08']
Raider jobs: 3392
 Submission dates: ['2024-05-08']
##########
Raider
0 HyP3 Jobs: 0 succeeded, 0 failed, 0 running, 0 pending.


in total, 63861 jobs were submitted, 49166 failed of which 45687 failed at topsApp step and 3392 at raider step

In [7]:
# second run
jobs = hyp3_isce.find_jobs(start=datetime.datetime(2024, 5, 10), 
                           end=datetime.datetime(2024, 5, 15),
                           user_id='access_cloud_based_insar',
                           job_type='INSAR_ISCE')
print(jobs)
print('#' * 10)
jobs = access_hyp3_resubmit(jobs.filter_jobs(**filtering_flags))

raider_jobs = hyp3_isce.find_jobs(user_id='access_cloud_based_insar',
                           start=datetime.datetime(2024, 5, 10), 
                           end=datetime.datetime(2024, 5, 15),
                           job_type='ARIA_RAIDER')
print('#' * 10)
print('Raider')
print(raider_jobs)

45687 HyP3 Jobs: 36170 succeeded, 9517 failed, 0 running, 0 pending.
##########
Failed jobs:
 9517 HyP3 Jobs: 0 succeeded, 9517 failed, 0 running, 0 pending.
Topsapp jobs: 8390
 Submission dates: ['2024-05-11']
Raider jobs: 1011
 Submission dates: ['2024-05-11']
##########
Raider
3392 HyP3 Jobs: 0 succeeded, 3392 failed, 0 running, 0 pending.


In [47]:
45687+8390, 3392+1011+3392

(54077, 7795)

after second run we have 50865 of 63861

In [10]:
# unique succeded aria_raider jobs
np.unique([j.job_parameters['job_id'] for j in raider_jobs.filter_jobs(**{**filtering_flags, **{'succeeded':True, 'failed':False}}).jobs]).shape

(0,)

## Prepare for re-submission

In [None]:
rjobs = hyp3_isce.find_jobs(start=datetime.datetime(2024, 5, 10), 
                           end=datetime.datetime(2024, 5, 15),
                           user_id='access_cloud_based_insar')
print(rjobs)
print('#' * 10)
rjobs = access_hyp3_resubmit(rjobs.filter_jobs(**filtering_flags))

### Get raider jobs  with wrong weather model submission

In [None]:
# I submitted HRRR over Hawaii and Alaska where is not HRRR
logs = [j['logs'][0] for j in rjobs.raider_jobs if len(j['logs']) > 0]

with Pool(10) as pool:
    # Map the function over the logs, passing the warning message to each call
    job_flags = pool.starmap(get_log_exit_msg, zip(logs, itertools.repeat(hrr_warning_msg)))

print(f'Number of failed raider jobs: {len(list(itertools.compress(logs, job_flags)))} of {len(logs)}')
print(f' with msg: {hrr_warning_msg}')

In [None]:
# Find jobs that failed with defined msg
nohrrr_jobs = list(itertools.compress(rjobs.raider_jobs, job_flags))

# Find tracks, tracks 124, 87 (Hawaii), 44 (Northern Alaska Slopes)
np.unique([j['name'] for j in nohrrr_jobs])

In [None]:
import pandas as pd
# Get job_ids to give ASF for manual import to DAAC
hrrr_jobs = []

for j in nohrrr_jobs:
    if j['job_type'] == 'INSAR_ISCE':
        hrrr_jobs.append(j['job_id'])
    elif j['job_type'] == 'ARIA_RAIDER':
        hrrr_jobs.append(j['job_parameters']['job_id'])

hrrr_jobs = pd.DataFrame(hrrr_jobs, columns=['job_id'])
hrrr_jobs.head()

In [None]:
hrrr_jobs.to_csv('access_nohrrr_job_ids.csv', index=False)

In [None]:
# To avoid submitting this jobs again, filter them out
job_flags2 = [~np.bool_(jf) for jf in job_flags]
rjobs.raider_jobs = list(itertools.compress(rjobs.raider_jobs, job_flags2))

In [None]:
# To submit
rjobs.print_info()

### Filtering to avoid tracking submission dates
NOTE: need to work on this, does not filter correctly

In [16]:
# Last run
all_jobs = hyp3_isce.find_jobs(start=datetime.datetime(2024, 5, 8), 
                           end=datetime.datetime(2024, 5, 15),
                           user_id='access_cloud_based_insar')
print(all_jobs)
print('#' * 10)
access_jobs = access_hyp3_resubmit(all_jobs.filter_jobs(**filtering_flags))

####### Filter failed jobs that succeeded, are pending or running ####### 
print('#' * 10)
verbose = False

if verbose: print('Failed jobs at topsApp step')
access_jobs.isce_jobs = filter_failed_jobs(access_jobs.isce_jobs, all_jobs,
                                           n_jobs=20, verbose=verbose)
if verbose: print('\nFailed jobs at raider step')
access_jobs.raider_jobs = filter_failed_jobs(access_jobs.raider_jobs, all_jobs,
                                             n_jobs=20, verbose=verbose)

print('\nAfter filtering already succedded, pending, running')
print('#' * 10)
access_jobs.print_info()


112940 HyP3 Jobs: 50865 succeeded, 62075 failed, 0 running, 0 pending.
##########
Failed jobs:
 62075 HyP3 Jobs: 0 succeeded, 62075 failed, 0 running, 0 pending.
Topsapp jobs: 54077
 Submission dates: ['2024-05-08' '2024-05-11']
Raider jobs: 7795
 Submission dates: ['2024-05-08' '2024-05-11']


"\n####### Filter failed jobs that succeeded, are pending or running ####### \nprint('#' * 10)\nverbose = False\n\nif verbose: print('Failed jobs at topsApp step')\naccess_jobs.isce_jobs = filter_failed_jobs(access_jobs.isce_jobs, all_jobs,\n                                           n_jobs=20, verbose=verbose)\nif verbose: print('\nFailed jobs at raider step')\naccess_jobs.raider_jobs = filter_failed_jobs(access_jobs.raider_jobs, all_jobs,\n                                             n_jobs=20, verbose=verbose)\n\nprint('\nAfter filtering already succedded, pending, running')\nprint('#' * 10)\naccess_jobs.print_info()\n"

In [17]:
# Another way of doing deduplication, hashing frame_id with ref_sec dates
def _get_gunw_pairing(job_dict):
    date1 = job_dict['granules'][0][17:25]
    date2 = job_dict['secondary_granules'][0][17:25]
    return date1 + '_' + date2

def dict2hash(job_dict:dict):
    return hash((job_dict['frame_id'], _get_gunw_pairing(job_dict)))

# Try to filter with this
jobs2count = [j['job_parameters'] for j in access_jobs.isce_jobs]
np.unique(list(map(dict2hash, jobs2count))).shape, len(access_jobs.isce_jobs)

((45687,), 54077)

In [138]:
len(remove_duplicates(access_jobs.isce_jobs)[0]), len(access_jobs.isce_jobs)

(9517, 17907)

work in the same way, but somehow I am missing to remove 1127

In [139]:
filt_jobs = remove_duplicates(access_jobs.isce_jobs)[0]

In [140]:
# Double check based on request dates
request_dates = list(map(get_request_date, filt_jobs))
unique_date = np.unique(request_dates)
print(unique_date)
ix1 = np.where(np.array('2024-05-08') == np.array(request_dates))
ix2 = np.where(np.array('2024-05-11') == np.array(request_dates))
print(ix1[0].shape[0])
print(ix2[0].shape[0])
# So can see that 1127 in 20240508 are not removed


d1 = np.array(filt_jobs)[ix1]
d2= np.array(filt_jobs)[ix2]

d11 = [d['job_parameters'] for d in d1]
d22 = [d['job_parameters'] for d in d2]

# cannot find where it leaks 
np.count_nonzero([i in d22 for i in d11])

['2024-05-08' '2024-05-11']
1127
8390


0

In [13]:
####### Filter duplicated failed jobs ####### 
access_jobs.isce_jobs = remove_duplicates(access_jobs.isce_jobs)[0]
access_jobs.raider_jobs = remove_raider_duplicates(access_jobs.raider_jobs)

print('\nAfter filtering failed jobs duplicates')
print('#' * 10)
access_jobs.print_info()


After filtering failed jobs duplicates
##########
Topsapp jobs: 45687
 Submission dates: ['2024-05-08' '2024-05-11']
Raider jobs: 4403
 Submission dates: ['2024-05-11']


## Re-submission

In [None]:
# Prepare jobs for re-submission
rjobs.prepare_jobs()

In [None]:
# this goes faster
rjobs.submit_raider()

In [None]:
rjobs.submit_isce()

In [None]:
succeded_flag = {**filtering_flags, **{'succeeded':True, 'failed':False}}
unique_succeded = np.unique([j.job_parameters['job_id'] for j in raider_jobs.filter_jobs(**succeded_flag).jobs])
unique_failed = np.unique([j.job_parameters['job_id'] for j in raider_jobs.filter_jobs(**filtering_flags).jobs])
# NOTE compare ARIA_RAIDER with INSAR_ISCE job_id

In [11]:
# Check
jobs = hyp3_isce.find_jobs(start=datetime.datetime(2024, 5, 16), 
                           end=datetime.datetime(2024, 5, 18),
                           user_id='access_cloud_based_insar',
                           job_type='INSAR_ISCE')
print(jobs)
print('#' * 10)
jobs = access_hyp3_resubmit(jobs.filter_jobs(**filtering_flags))

raider_jobs = hyp3_isce.find_jobs(user_id='access_cloud_based_insar',
                           start=datetime.datetime(2024, 5, 16), 
                           end=datetime.datetime(2024, 5, 18),
                           job_type='ARIA_RAIDER')
print('#' * 10)
print('Raider')
print(raider_jobs)

8390 HyP3 Jobs: 0 succeeded, 1823 failed, 6250 running, 317 pending.
##########
Failed jobs:
 1823 HyP3 Jobs: 0 succeeded, 1823 failed, 0 running, 0 pending.
Topsapp jobs: 1823
 Submission dates: ['2024-05-17']
Raider jobs: 0
 Submission dates: []
##########
Raider
119 HyP3 Jobs: 80 succeeded, 39 failed, 0 running, 0 pending.


In [None]:
raider_jobs.filter_jobs(**filtering_flags).jobs

In [None]:
## Check failed jobs
print(f'# failed jobs {len(rjobs.raider_jobs)}')
all_logs = [j['logs'] for j in rjobs.raider_jobs]
all_logs[0], all_logs[-1]

## asf search

In [None]:
import asf_search as asf
import geopandas as gpd

In [None]:
# requires asf_search > 7.0.8
opts = asf.ASFSearchOptions(
    #shortName='ARIA_S1_GUNW', # version 3
    #relativeOrbit=[13],
    #flightDirection='DESCENDING',
    processingLevel='GUNW_STD',
    intersectsWith='POLYGON((-127.992 32.2865,-115.5115 32.2865,-115.5115 49.1812,-127.992 49.1812,-127.992 32.2865))',
    collectionAlias=False,
)

In [None]:
# requires asf_search > 7.0.8
opts = asf.ASFSearchOptions(
    shortName='ARIA_S1_GUNW', # version 3
    #relativeOrbit=[13],
    #flightDirection='DESCENDING',
    #processingLevel='GUNW_STD',
    intersectsWith='POLYGON((-118.5619 33.4925,-117.6485 33.4925,-117.6485 34.2148,-118.5619 34.2148,-118.5619 33.4925))',
    collectionAlias=False,
)

In [None]:
scenes = asf.search(opts=opts)
gdf = gpd.GeoDataFrame.from_features(scenes.geojson(), crs='EPSG:4326')

In [None]:
gdf.shape

In [None]:
gdf[gdf.fileID.apply(lambda x: x.split('-')[6]) == '20190123_20190117'].url.values

In [None]:
version = gdf.fileID.apply(lambda x: int(x.split('-')[-1].split('_')[0].split('v')[-1]))
version.unique()