In [1]:
import numpy as np
import pandas as pd
import pystac

## Load the catalog
This will create a Catalog object, but will not load the metadata items yet.
The catalog contains only a single folder for `IMOS` data for now.

In [2]:
catalog = pystac.Catalog.from_file('data_index/catalog.json')
catalog

## Get all s3 paths for files in a collection
Querying by metadata uuid

In [3]:
uuid = "13b3900f-2623-463c-98a7-ea60ac8e61ae"
collection = catalog.get_child(uuid, recursive=True)
collection

In [4]:
s3_paths = [item.assets['data'].href for item in collection.get_all_items()]
s3_paths

['s3://imos-data/IMOS/ANMN/NRS/REAL_TIME/NRSMAI/Biogeochem_timeseries/IMOS_ANMN-NRS_TPSOBUE_20150626T015952Z_NRSMAI_FV00_NRSMAI-SubSurface-23-2015-06-WQM-20-realtime.nc',
 's3://imos-data/IMOS/ANMN/NRS/REAL_TIME/NRSMAI/Biogeochem_timeseries/IMOS_ANMN-NRS_TPSOBUE_20150626T021348Z_NRSMAI_FV00_NRSMAI-SubSurface-23-2015-06-WQM-85-realtime.nc',
 's3://imos-data/IMOS/ANMN/NRS/REAL_TIME/NRSMAI/Biogeochem_timeseries/IMOS_ANMN-NRS_TPSOBUE_20151207T055945Z_NRSMAI_FV00_NRSMAI-SubSurface-24-2015-11-WQM-20-realtime.nc',
 's3://imos-data/IMOS/ANMN/NRS/REAL_TIME/NRSMAI/Biogeochem_timeseries/IMOS_ANMN-NRS_TPSOBUE_20151207T055948Z_NRSMAI_FV00_NRSMAI-SubSurface-24-2015-11-WQM-85-realtime.nc',
 's3://imos-data/IMOS/ANMN/NRS/REAL_TIME/NRSMAI/Biogeochem_timeseries/IMOS_ANMN-NRS_TPSOBUE_20160508T031446Z_NRSMAI_FV00_NRSMAI-SubSurface-25-2016-05-WQM-85-realtime.nc',
 's3://imos-data/IMOS/ANMN/NRS/REAL_TIME/NRSMAI/Biogeochem_timeseries/IMOS_ANMN-NRS_TPSOBUE_20160508T031446Z_NRSMAI_FV00_NRSMAI-SubSurface-25-201

This can also work with higher level metadata items, containing several collections.

In [5]:
uuid = "6c981d98-d7fb-4120-9ebe-347ef1188ae0"
collection = catalog.get_child(uuid, recursive=True)
collection

In [6]:
# s3_paths = [item.assets['data'].href for item in collection.get_items(recursive=True)]
# len(s3_paths)

## Identify duplicate files


In [7]:
# Get all items in ANMN collection
collection = catalog.get_child("f9c151bd-d95b-4af6-8cb7-21c05b7b383b", recursive=True)
items = collection.get_items(recursive=True)
# Convert to DataFrame
df = pd.DataFrame.from_dict(
    {
        item: item.properties | {"collection": item.collection_id}
        for item in items
    },
    orient="index",
)

In [8]:
# Find duplicated items
duplicates = df[~df['toolbox_input_file'].isna()]  # Only consider items with toolbox_input_file
duplicates = duplicates[
    duplicates.duplicated(
        [
            'deployment_code',
            'collection',
            'file_version',
            'instrument_serial_number',
            'toolbox_input_file',
        ],
        keep=False,
    )
]
duplicates

Unnamed: 0,toolbox_input_file,toolbox_version,file_version,file_version_quality_control,project,Conventions,standard_name_vocabulary,title,institution,comment,...,toolbox_parser,contributor_email,contributor_name,contributor_role,generating_code_version,included_values_flagged_as,source_file,source_file_download,source_file_opendap,rejected_files
<Item id=IMOS_ANMN_NRS_NRSPHB_Biogeochem_profiles_IMOS_ANMN-NRS_CDEKOSTUZ_20240911T004157Z_NRSPHB_FV01_Profile-SBE19plus_C-20241203T002357Z_nc>,C:\Data\IMOS\IMOS_TOOLBOX\CTD_DATA\2024\PH2024...,2.6.9 - PCWIN64,Level 1 - Quality Controlled Data,Quality controlled data have been through qual...,Integrated Marine Observing System (IMOS),"CF-1.6,IMOS-1.4",NetCDF Climate and Forecast (CF) Metadata Conv...,ANMN NSW-IMOS Mooring Data,ANMN-NRS,NSW-IMOS Port Hacking NRS water sampling NRSPH...,...,,,,,,,,,,
<Item id=IMOS_ANMN_NRS_NRSPHB_Biogeochem_profiles_IMOS_ANMN-NRS_CDEKOSTUZ_20240911T004157Z_NRSPHB_FV01_Profile-SBE19plus_C-20241203T002526Z_nc>,C:\Data\IMOS\IMOS_TOOLBOX\CTD_DATA\2024\PH2024...,2.6.9 - PCWIN64,Level 1 - Quality Controlled Data,Quality controlled data have been through qual...,Integrated Marine Observing System (IMOS),"CF-1.6,IMOS-1.4",NetCDF Climate and Forecast (CF) Metadata Conv...,ANMN NSW-IMOS Mooring Data,ANMN-NRS,NSW-IMOS Port Hacking NRS water sampling NRSPH...,...,,,,,,,,,,


In [9]:
[dup.assets['data'].href for dup in duplicates.index]

['s3://imos-data/IMOS/ANMN/NRS/NRSPHB/Biogeochem_profiles/IMOS_ANMN-NRS_CDEKOSTUZ_20240911T004157Z_NRSPHB_FV01_Profile-SBE19plus_C-20241203T002357Z.nc',
 's3://imos-data/IMOS/ANMN/NRS/NRSPHB/Biogeochem_profiles/IMOS_ANMN-NRS_CDEKOSTUZ_20240911T004157Z_NRSPHB_FV01_Profile-SBE19plus_C-20241203T002526Z.nc']

## Identify input files for creating a product
For example the Moorings - Hourly time-series product is generated from hundreds of individual instrument-deployment files at each mooring site.

Code adapted from [this script](https://github.com/aodn/python-aodndata/blob/master/aodndata/moorings/moorings_product_trigger.py).

In [10]:
INCLUDED_VARIABLES = {'TEMP', 'PSAL', 'CPHL', 'CHLF', 'CHLU', 'TURB', 'DOX1', 'DOX2', 'DOXS', 'PAR', 'VCUR'}
def pivot_variables(df: pd.DataFrame) -> pd.DataFrame:
    """Rearrange the file-list data frame so that each row lists one variable only
    (multiple rows per file where needed)
    """
    assert 'variables' in df.columns
    files_vars = []
    for row in df.itertuples():
        variables = set(row.variables.keys()) & INCLUDED_VARIABLES
        files_vars.extend((row.Index, v) for v in variables)

    files_vars = pd.DataFrame(files_vars, columns=['i', 'variable']).set_index('i')

    return df.drop(columns='variables').join(files_vars, how='right')

In [11]:
possible_sites= ['NRSDAR', 'NRSESP', 'NRSKAI', 'NRSMAI', 'NRSNIN', 'NRSNSI',
                 'NRSPHB', 'NRSROT', 'NRSYON', 'SEQ200', 'SEQ400', 'DARBGF']
aggregated_uuid = "moorings-aggregated-timeseries-product"
hourly_uuid = "efd8201c-1eca-412e-9ad2-0534e96cea14"
for site_code in possible_sites:
    site_files = df[df['site_code'] == site_code]
    source_index = site_files['file_version'] == 'Level 1 - Quality Controlled Data'

    # We will need to filter out aggregated products once we index them.
    # Can be done filtering by collection id, possibly after going through the
    # collections to see which ones are aggregations
    source_files = site_files.loc[source_index, ['created', 'cube:variables']].rename(
        columns={'cube:variables': 'variables'}
    )
    source_files = pivot_variables(source_files)

    # Aggregated products
    collection = catalog.get_child(aggregated_uuid, recursive=True)
    items = collection.get_items(recursive=True)
    # Convert to DataFrame
    aggregated_files = pd.DataFrame.from_dict(
        {item: item.properties | {"collection": item.collection_id} for item in items},
        orient="index",
    )
    aggregated_files = aggregated_files[['created', 'cube:variables']].rename(
        columns={'cube:variables': 'variables'}
    )
    aggregated_files = pivot_variables(aggregated_files)

    # Hourly products
    collection = catalog.get_child(hourly_uuid, recursive=True)
    items = collection.get_items(recursive=True)
    # Convert to DataFrame
    hourly_files = pd.DataFrame.from_dict(
        {item: item.properties | {"collejction": item.collection_id} for item in items},
        orient="index",
    )
    hourly_files = hourly_files[['created']]

    # when were data for each variable updated?
    vars_updated = source_files.groupby('variable').created.max()
    vars_updated.name = 'source_updated'
    aggregated_files = aggregated_files.join(vars_updated, on='variable', how='right')

    # which variables have newer data than the product, or missing product?
    new_vars = aggregated_files[np.logical_or(aggregated_files.created < aggregated_files.source_updated,
                                                aggregated_files.created.isna()
                                                )]
    new_vars = set(new_vars.variable)

    # reprocess hourly products if newer source files exist, or product files missing?
    products_updated = hourly_files.created.max()
    process_hourly = any(source_files.created > products_updated) or hourly_files.empty

    if len(new_vars) == 0 and not process_hourly:
        print(f"{site_code}: All products up to date")
    else:
        # products to generate
        # VCUR is used as a proxy for all velocity variables - if included, need to handle separately
        products = set()
        if 'VCUR' in new_vars:
            new_vars.remove('VCUR')
            products.update({'velocity_aggregated', 'velocity_hourly'})
        if new_vars:
            products.add('aggregated')
        if process_hourly:
            # gridded is created from hourly, so need to process both
            products.update({'hourly', 'gridded'})

        # create manifest
        manifest = {'site_code': site_code,
                    'variables': list(new_vars),
                    'products': list(products)
                    }
        print(manifest)

{'site_code': 'NRSDAR', 'variables': ['DOX2', 'DOXS', 'TURB', 'TEMP', 'PAR', 'PSAL', 'DOX1', 'CPHL'], 'products': ['velocity_hourly', 'aggregated', 'velocity_aggregated']}
NRSESP: All products up to date
{'site_code': 'NRSKAI', 'variables': ['DOX2', 'DOXS', 'TURB', 'TEMP', 'PAR', 'PSAL', 'DOX1', 'CPHL'], 'products': ['velocity_hourly', 'aggregated', 'velocity_aggregated']}
{'site_code': 'NRSMAI', 'variables': ['DOX2', 'DOXS', 'TURB', 'TEMP', 'PSAL', 'DOX1', 'CPHL'], 'products': ['aggregated', 'gridded', 'hourly', 'velocity_aggregated', 'velocity_hourly']}
NRSNIN: All products up to date
{'site_code': 'NRSNSI', 'variables': ['DOX2', 'DOXS', 'TURB', 'TEMP', 'PSAL', 'DOX1', 'CPHL'], 'products': ['aggregated', 'gridded', 'hourly', 'velocity_aggregated', 'velocity_hourly']}
{'site_code': 'NRSPHB', 'variables': ['DOX2', 'DOXS', 'TURB', 'TEMP', 'PAR', 'PSAL', 'DOX1', 'CPHL'], 'products': ['hourly', 'aggregated', 'gridded']}
{'site_code': 'NRSROT', 'variables': ['DOX2', 'TURB', 'TEMP', 'PAR', 