# notebook for processing fully reduced m3 data "triplets"
This is a notebook for processing L0 / L1B / L2 triplets (i.e.,
the observations that got reduced).

## general notes

We process the reduced data in triplets simply to improve the metadata on the
L0 and L2 products. We convert L1B first to extract several attributes to fill
out their metadata. This data is scratched to disk in
[./directories/m3/m3_index.csv'](./directories/m3/m3_index.csv), because it
also serves as a useful user-facing index to the archive. A complete version
of this index is provided in this repository, but this index was originally
created during this conversion process, and will be recreated if you run it
again. This index is read into the ```m3_index variable``` below; its path is
also soft-coded in several ```m3_conversion``` classes, so make sure you
change that or feed them the correct path as an argument if you change this
location.

This notebook does not apply programmatic rules to iterate over the file
structure of the mirrored archive. It uses an index that was partly manually
generated:
[/src/directories/m3/m3_data_mappings.csv](/src/directories/m3/m3_data_mappings.csv).
This was manually manipulated to manage several small idiosyncracies in the
PDS3 archive.

35 of the V3 L1B products in the PDS3 archive are duplicated: one copy in the
correct month-by-year directory, one copy in some incorrect month-by-year
directory. We pick the 'first' one in all cases (see the line
```pds3_label_file = input_directory + group_files[product_type][0]``` below).
Each pair's members have identical md5sums, so it *probably* doesn't matter
which member of the pair we use.

## performance tips

The most likely bottlenecks for this process are I/O throughput and CPU. We
recommend both using a high-throughput disk and parallelizing this, either
using ```pathos``` (vanilla Python ```multiprocessing``` will probably fail
during a pickling step involving ```rasterio```) or simply by running multiple
copies of this notebook. If you do parallelize this process on a single
machine, note that working memory can suddenly catch you off-guard as a
constraint. While many of the M3 observational data files are small, some are
over 4 GB, and the method presented here requires them to be completely loaded
into memory in order to convert them to FITS and strip the prefix tables from
the L0 files. When passed ```clean=True```, the ```m3_converter```
observational data writer class constructors aggressively delete data after
using it, but this still results in a pretty high -- and spiky -- working
memory burden.

In [1]:
import datetime as dt
import os
from types import MappingProxyType

from more_itertools import distribute
import pandas as pd
import sh

from m3_bulk import basenamer, make_m3_triplet, \
    m3_triplet_bundle_paths, crude_time_log, fix_end_object_tags
from m3_conversion import M3L0Converter, M3L1BConverter, M3L2Converter
from pvl.decoder import ParseError

In [2]:

m3_index = pd.read_csv('./directories/m3/m3_index.csv')

# directory of file mappings, grouped into m3 basename clusters
file_mappings = pd.read_csv('./directories/m3/m3_data_mappings.csv')
file_mappings["basename"] = file_mappings["filepath"].apply(basenamer)
basename_groups = list(file_mappings.groupby("basename"))

# what kind of files does each pds4 product have?
# paths to the locally-written versions are stored in the relevant attributes of 
# the associated PDSVersionConverter instance.
pds4_filetypes = MappingProxyType({
    'l0': ('pds4_label_file', 'clock_file', 'fits_image_file'),
    'l1b': ('pds4_label_file', 'loc_file', 'tim_file', 'rdn_file', 'obs_file'),
    'l2': ('pds4_label_file', 'sup_file', 'rfl_file')
})

# root directories of PDS3 and PDS4 data sets respectively
input_directory = '/home/ubuntu/m3_input/'
output_directory = '/home/ubuntu/m3_output/'

In [18]:
# all the triplets: what we are converting here.
reduced_groups = [group for group in basename_groups if len(group[1]) >= 3]

# the
edr_groups = [group for group in basename_groups if len(group[1]) == 1] # lonesome EDR images

triplet_product_types = ('l1b', 'l0', 'l2')

# initialize our mapping of product types to
# product-writer class constructors.
# MappingProxyType is just a safety mechanism
# to make sure constructors don't get messed with
converters = MappingProxyType({
    'l0': M3L0Converter,
    'l1b': M3L1BConverter,
    'l2': M3L2Converter
})
writers = {}  # dict to hold instances of the converter classes

In [None]:
# initialize iteration, control execution in whatever way

# this is a place to split your index up however you like
# if you're parallelizing using multiple copies of this
# notebook.
chunk_ix_of_this_notebook = 0
total_chunks = 40
chunks = distribute(total_chunks, reduced_groups)
# eagerly evaluate so we know how long it is,
# and what all is in it if we have an error
chunk = list(chunks[chunk_ix_of_this_notebook])
log_string = "_" + str(chunk_ix_of_this_notebook)

group_enumerator = enumerate(chunk)

In [None]:
for ix, group in group_enumerator:

    print(ix, len(chunk))
    print("beginning product conversion")
    triplet_start_time = dt.datetime.now()
    group_files = make_m3_triplet(group)
    # what are the correct output paths (relative to
    # the root of the pds4 bundle) for these products?
    bundle_paths = m3_triplet_bundle_paths(group)

    for product_type in triplet_product_types:
        # read the PDS3 product and perform file conversions 
        pds3_label_file = input_directory + group_files[product_type][0]
        try: 
            writers[product_type] = converters[product_type](
                pds3_label_file, suppress_warnings=True, clean=True
            )
        except ParseError: # fix broken END_OBJECT tags in some of the target-mode files  
            print("fixing broken END_OBJECT tags")
            temp_label_file = fix_end_object_tags(pds3_label_file)
            writers[product_type] = converters[product_type](
                temp_label_file, suppress_warnings=True, clean=True
            )
            os.remove(temp_label_file)
        # write PDS4 label and product files
        # don't actually need to shave the extra / here but...
        # this would be more safely rewritten with PyFilesystem
        # (see clem-conversion)
        output_path = output_directory + bundle_paths[product_type][1:] 
        sh.mkdir("-p", output_path)
        writers[product_type].write_pds4(output_path, write_product_files=True, clean=True)    

        # occasionally (slow but very useful) spot-check with validate tool
        # note that this just invokes a one-line script at /usr/bin/validate
        # that links to the local install of the PDS Validate Tool; this
        # allows us to avoid throwing java stuff all over our environment
        if ix % 20 == 1:
            print("1-mod-20th triplet: running Validate Tool")
            validate_results = sh.validate("-t", writers[product_type].pds4_label_file)
            with open("validate_dump.txt", "a") as file:
                file.write(validate_results.stdout.decode())
            print("validated successfully")
        # log transfer crudely
        crude_time_log(
            "m3_data_conversion_log" + log_string + ".csv",
            writers[product_type], 
            str((dt.datetime.now() - triplet_start_time).total_seconds())
        )

    print(
        "done with this triplet; total seconds " 
        + str((dt.datetime.now() - triplet_start_time).total_seconds())
    )