# multispectral DIM conversion handler

Top-level handler Notebook for converting multispectral tiles to PDS4.

Note: paths should be changed to reflect the actual locations of the input PDS3 products on your system.

In [None]:
from multiprocessing import Pool
from pathlib import Path
import time
import warnings

from cytoolz import frequencies
import numpy as np
import pandas as pd
from pdr.pdr import DuplicateKeyWarning

from utilz import index_breadth_first, make_edr_lidmap, print_inline
from vo_bandset import VOBandSet
from vo_conversion import VikingMspecConverter, VikingMspecBrowseWriter

output_root = Path("/datascratch/viking/scratch_write/")

In [None]:
# make an index of the input products
dim_df = pd.DataFrame(
    index_breadth_first("/datascratch/viking/vo1_vo2-m-vis-5-dim-v1.0/")
)
mspec = dim_df.loc[
    dim_df['path'].str.match(r".*\.(vio|grn|sgr|red)")
].reset_index(drop=True)
mspec = mspec.drop(
    columns=['excluded', 'directory', 'ATIME', 'CTIME', 'MTIME']
).copy()

In [None]:
# filter superseded files (described in source errata)
# and construct filenames / PDS4 LIDs for remaining products.
# this includes assigning latitude bins to all input products and
# associating them with matching products in other spectral bands.
paths = mspec['path'].map(lambda p: Path(p))
mspec['band'] = [p.suffix[1:] for p in paths]
mspec['name'] = [p.name for p in paths]
mspec['stem'] = [p.stem for p in paths]
mspec['dtype'] = mspec['name'].str.slice(0, 1)
mspec['res'] = mspec['name'].str.slice(1, 2)
parts = mspec['path'].str.split("/", expand=True)
mspec['orbit'] = None
mspec.loc[parts[8].isna(), 'orbit'] = parts.loc[parts[8].isna(), 6]
mspec.loc[parts[8].notna(), 'orbit'] = parts.loc[parts[8].notna(), 7]
mspec.loc[
    parts[7].str.contains('scale'), 'orbit'
] = parts.loc[parts[7].str.contains('scale'), 6]
mspec['lidstem'] = mspec['name'] + mspec['orbit']
mspec['cube'] = mspec['stem'] + mspec['orbit']
mspec = mspec.sort_values(by='name').reset_index(drop=True)
superseded_ix = set()
for name, group in mspec.groupby("lidstem"):
    if len(group) == 1:
        continue
    errata = group['path'].str.contains('errata')
    if errata.any():
        superseded_ix |= set(group.loc[~errata].index)
    scale = group['path'].str.contains('scale')
    if scale.any():
        superseded_ix |= set(group.loc[~scale].index)
    if scale.any() and errata.any():
        raise ValueError("this seems off")
latsign = mspec['name'].str.slice(4, 5)
lat = mspec['name'].str.slice(2, 4).astype(int)
mspec['lat_bin'] = (
    np.floor(lat / 10) * 10
).astype(int).astype(str).str.zfill(2) + latsign   
mspec = mspec.drop(superseded_ix)
# ensure that no duplicate LIDs exist in the index.
dupes = mspec.loc[mspec['lidstem'].duplicated(keep=False)]
assert len(dupes) == 0

In [None]:
# index the EDRs to associate map-projected products with their source products.
EDR_ROOT = '/datascratch/viking/scratch_write/data/edr'
edr_lidmap = make_edr_lidmap(EDR_ROOT)

In [None]:
def convert_mspec(chunk, write_browse=True):
    """
    handler function for converting a single product. constructs a VOBandset
    grouping 1-4 input products in different spectral bands together; uses it
    to construct a VikingMspecConverter and uses it to write a PDS4 data product;
    then uses that converter to construct a VikingMspecBrowseWriter to
    write an associated browse product.
    """
    warnings.simplefilter("ignore", category=DuplicateKeyWarning)
    warnings.simplefilter("error", category=RuntimeWarning)
    bandset = VOBandSet(chunk['path'].tolist())
    converter = VikingMspecConverter(bandset, edr_lidmap)
    output_directory = Path(output_root, "data", "mspec")
    if chunk['path'].str.contains('special').any():
        output_directory = Path(
            output_directory, 
            "special", 
            chunk['path'].iloc[0].split('/')[-2]
        )
    else:
        output_directory = Path(
            output_directory, chunk['lat_bin'].iloc[0]
        )
    output_directory.mkdir(parents=True, exist_ok=True)
    for obj in converter.object_names:
        converter.write_file(obj, output_directory)
    converter.convert_label()
    converter.write_label(output_directory)
    if write_browse is True:
        browse = VikingMspecBrowseWriter(converter)
        browse_output_directory = Path(str(output_directory).replace("/data/", "/browse/"))
        browse_output_directory.mkdir(parents=True, exist_ok=True)
        browse.write_file("image", browse_output_directory)
        browse.convert_label()
        browse.write_label(browse_output_directory)
    return 0

In [None]:
# execute convert_mspec in parallel across the input products.
cubes = tuple(cube for _, cube in mspec.groupby('cube'))
pool = Pool(5)
results = {}
for ix, cube in enumerate(cubes):
    results[ix] = pool.apply_async(convert_mspec, (cube, True))
pool.close()
ready = {}
while len(ready) < len(results):
    print_inline(f"{len(ready)}/{len(results)}")
    ready = [k for k, v in results.items() if v.ready()]
    time.sleep(1)
print_inline(f"{len(ready)}/{len(results)}")

In [None]:
# retrieve error/success messages from the completed processes
final = {}
for k, v in results.items():
    try:
        final[k] = v.get()
    except KeyboardInterrupt:
        raise
    except Exception as ex:
        final[k] = ex
pool.terminate()
broken = {
    k: v for k, v in final.items() if isinstance(v, Exception)
}
# show error messages (if any) along with their frequencies of occurrence
frequencies(map(str, broken.values()))