In [None]:
import yaml
import flatdict as fd
import numpy as np

In [None]:
file_name = "eln_data.yaml"
with open(file_name, "r", encoding="utf-8") as stream:
    yml = fd.FlatDict(yaml.safe_load(stream), delimiter="/")

In [None]:
for key, val in yml.items():
    print(f"{key}, {val}")

In [None]:
yml["user"]

In [None]:
"user" in yml.keys()

In [None]:
"user" in yml.keys()
#isinstance(yml["user"], list)
#(all(isinstance(entry, dict) for entry in yml["user"]))

In [None]:
for dct in yml["user"]:
    print(f"{dct}")

In [None]:
NxSample = {"IGNORE": {"fun": "load_from", "terms": "sample/atom_types"},
            "/ENTRY[entry*]/sample/description": {"fun": "load_from", "terms": "sample/description"},
            "/ENTRY[entry*]/sample/method": {"fun": "load_from", "terms": "sample/method"},
            "/ENTRY[entry*]/sample/name": {"fun": "load_from", "terms": "sample/name"},
            "/ENTRY[entry*]/sample/preparation_date": {"fun": "load_from", "terms": "sample/preparation_date"},
            "/ENTRY[entry*]/sample/sample_history": {"fun": "load_from", "terms": "sample/sample_history"},
            "/ENTRY[entry*]/sample/short_title": {"fun": "load_from", "terms": "sample/short_title"},
            "/ENTRY[entry*]/sample/thickness": {"fun": "load_from", "terms": "sample/thickness/unit"},
            "/ENTRY[entry*]/sample/thickness/@units": {"fun": "load_from", "terms": "sample/thickness/value"}}

In [None]:

import pytz

from datetime import datetime


def load_from_modifier(terms, fd_dct):
    """Implement modifier which reads values of different type from fd_dct."""
    if isinstance(terms, str):
        if terms in fd_dct.keys():
            return fd_dct[terms]
    if all(isinstance(entry, str) for entry in terms) is True:
        if isinstance(terms, list):
            lst = []
            for entry in terms:
                lst.append(fd_dct[entry])
            return lst
    return None


def convert_iso8601_modifier(terms, dct: dict):
    """Implement modifier which transforms nionswift time stamps to proper UTC ISO8601."""
    if terms is not None:
        if isinstance(terms, str):
            if terms in dct.keys():
                return None
        elif (isinstance(terms, list)) and (len(terms) == 2) \
                and (all(isinstance(entry, str) for entry in terms) is True):
            # assume the first argument is a local time
            # assume the second argument is a timezone string
            if terms[0] in dct.keys() and terms[1] in dct.keys():
                # handle the case that these times can be arbitrarily formatted
                # for now we let ourselves be guided
                # by how time stamps are returned in Christoph Koch's
                # nionswift instances also formatting-wise
                date_time_str = dct[terms[0]].replace("T", " ")
                time_zone_str = dct[terms[1]]
                if time_zone_str in pytz.all_timezones:
                    date_time_obj \
                        = datetime.strptime(date_time_str, '%Y-%m-%d %H:%M:%S.%f')
                    utc_time_zone_aware \
                        = pytz.timezone(time_zone_str).localize(date_time_obj)
                    return utc_time_zone_aware
                else:
                    raise ValueError('Invalid timezone string!')
                return None
        else:
            return None
    return None


def apply_modifier(modifier, dct: dict):
    """Interpret a functional mapping using data from dct via calling modifiers."""
    if isinstance(modifier, dict):
        # different commands are available
        if set(["fun", "terms"]) == set(modifier.keys()):
            if modifier["fun"] == "load_from":
                return load_from_modifier(modifier["terms"], dct)
            if modifier["fun"] == "convert_iso8601":
                return convert_iso8601_modifier(modifier["terms"], dct)
        elif set(["link"]) == set(modifier.keys()):
            # CURRENTLY NOT IMPLEMENTED
            # with the jsonmap reader Sherjeel conceptualized "link"
            return None
        else:
            return None
    if isinstance(modifier, str):
        return modifier
    return None


# examples/tests how to use modifiers
# modd = "µs"
# modd = {"link": "some_link_to_somewhere"}
# modd = {"fun": "load_from", "terms": "metadata/scan/scan_device_properties/mag_boards/MagBoard 1 DAC 11"}
# modd = {"fun": "load_from", "terms": ["metadata/scan/scan_device_properties/mag_boards/MagBoard 1 DAC 11",
#     "metadata/scan/scan_device_properties/mag_boards/MagBoard 1 Relay"]}
# modd = {"fun": "convert_iso8601", "terms": ["data_modified", "timezone"]}
# print(apply_modifier(modd, yml))

def variadic_path_to_specific_path(path: str, instance_identifier: list):
    """Transforms a variadic path to an actual path with instances."""
    if (path is not None) and (path != ""):
        narguments = path.count("*")
        if narguments == 0:  # path is not variadic
            return path
        if len(instance_identifier) >= narguments:
            tmp = path.split("*")
            if len(tmp) == narguments + 1:
                nx_specific_path = ""
                for idx in range(0, narguments):
                    nx_specific_path += f"{tmp[idx]}{instance_identifier[idx]}"
                    idx += 1
                nx_specific_path += f"{tmp[-1]}"
                return nx_specific_path
    return None

In [None]:
import numpy as np
import flatdict as fd
import yaml
from ase.data import chemical_symbols

In [None]:
entry_id = 1
template = {}
src = "sample/atom_types"
trg = f"/ENTRY[entry{entry_id}]/{src}"
if "sample/atom_types" in yml.keys():
    if (isinstance(yml[src], list)) and (len(yml[src]) >= 1):
        atom_types_are_valid = True
        for symbol in yml[src]:
            valid = isinstance(symbol, str) \
                and (symbol in chemical_symbols) and (symbol != "X")
            if valid is False:
                atom_types_are_valid = False
                break
        if atom_types_are_valid is True:
            template[trg] = ", ".join(list(yml[src]))

for nx_path, modifier in NxSample.items():
    print(f"{nx_path}, {modifier}")
    if (nx_path != "IGNORE") and (nx_path != "UNCLEAR"):
        trg = variadic_path_to_specific_path(nx_path, [entry_id])
        print(trg)
        res = apply_modifier(modifier, yml)
        print(res)
        if res is not None:
            template[trg] = res
print(template)

In [None]:
NxUserFromListOfDict = {"/ENTRY[entry*]/USER[user*]/name": {"fun": "load_from", "terms": "name"},
                        "/ENTRY[entry*]/USER[user*]/affiliation": {"fun": "load_from", "terms": "affiliation"},
                        "/ENTRY[entry*]/USER[user*]/address": {"fun": "load_from", "terms": "address"},
                        "/ENTRY[entry*]/USER[user*]/email": {"fun": "load_from", "terms": "email"},
                        "/ENTRY[entry*]/USER[user*]/orcid": {"fun": "load_from", "terms": "orcid"},
                        "/ENTRY[entry*]/USER[user*]/orcid_platform": {"fun": "load_from", "terms": "orcid_platform"},
                        "/ENTRY[entry*]/USER[user*]/telephone_number": {"fun": "load_from", "terms": "telephone_number"},
                        "/ENTRY[entry*]/USER[user*]/role": {"fun": "load_from", "terms": "role"},
                        "/ENTRY[entry*]/USER[user*]/social_media_name": {"fun": "load_from", "terms": "social_media_name"},
                        "/ENTRY[entry*]/USER[user*]/social_media_platform": {"fun": "load_from", "terms": "social_media_platform"}}

In [None]:
entry_id = 1
template = {}
if "user" in yml.keys():
    if isinstance(yml["user"], list):
        if (all(isinstance(entry, dict) for entry in yml["user"]) is True):
            user_id = 1
            # custom schema delivers a list of dictionaries...
            for user_dict in yml["user"]:
                # ... for each of them inspect for fields mappable on NeXus
                identifier = [entry_id, user_id]
                # identifier to get instance NeXus path from variadic NeXus path
                # try to find all quantities on the left-hand side of the mapping
                # table and check if we can find these
                for nx_path, modifier in NxUserFromListOfDict.items():
                    if (nx_path != "IGNORE") and (nx_path != "UNCLEAR"):
                        trg = variadic_path_to_specific_path(nx_path, identifier)
                        res = apply_modifier(modifier, user_dict)
                        if res is not None:
                            template[trg] = res
                user_id += 1
for key, val in template.items():
    print(f"{key}, {val}")

In [None]:
NxEmElnInput = {"IGNORE": {"fun": "load_from_dict_list", "terms": "em_lab/detector"},
                "IGNORE": {"fun": "load_from", "terms": "em_lab/ebeam_column/aberration_correction/applied"},
                "IGNORE": {"fun": "load_from_dict_list", "terms": "em_lab/ebeam_column/aperture_em"},
                "/ENTRY[entry*]/em_lab/EBEAM_COLUMN[ebeam_column]/electron_source/emitter_type": {"fun": "load_from", "terms": "em_lab/ebeam_column/electron_gun/emitter_type"},
                "/ENTRY[entry*]/em_lab/EBEAM_COLUMN[ebeam_column]/electron_source/voltage/@units": {"fun": "load_from", "terms": "em_lab/ebeam_column/electron_gun/voltage/unit"},
                "/ENTRY[entry*]/em_lab/EBEAM_COLUMN[ebeam_column]/electron_source/voltage": {"fun": "load_from", "terms": "em_lab/ebeam_column/electron_gun/voltage/value"},
                "/ENTRY[entry*]/em_lab/FABRICATION[fabrication]/capabilities": {"fun": "load_from", "terms": "em_lab/fabrication/capabilities"},
                "/ENTRY[entry*]/em_lab/FABRICATION[fabrication]/identifier": {"fun": "load_from", "terms": "em_lab/fabrication/identifier"},
                "/ENTRY[entry*]/em_lab/FABRICATION[fabrication]/model": {"fun": "load_from", "terms": "em_lab/fabrication/model"},
                "/ENTRY[entry*]/em_lab/FABRICATION[fabrication]/vendor": {"fun": "load_from", "terms": "em_lab/fabrication/vendor"},
                "/ENTRY[entry*]/em_lab/instrument_name": {"fun": "load_from", "terms": "em_lab/instrument_name"},
                "/ENTRY[entry*]/em_lab/location": {"fun": "load_from", "terms": "em_lab/location"},
                "IGNORE": {"fun": "load_from", "terms": "em_lab/optical_system_em/beam_current/unit"},
                "IGNORE": {"fun": "load_from", "terms": "em_lab/optical_system_em/beam_current/value"},
                "IGNORE": {"fun": "load_from", "terms": "em_lab/optical_system_em/beam_current_description"},
                "IGNORE": {"fun": "load_from", "terms": "em_lab/optical_system_em/magnification"},
                "IGNORE": {"fun": "load_from", "terms": "em_lab/optical_system_em/semi_convergence_angle/unit"},
                "IGNORE": {"fun": "load_from", "terms": "em_lab/optical_system_em/semi_convergence_angle/value"},
                "/ENTRY[entry*]/em_lab/stage_lab/description": {"fun": "load_from", "terms": "em_lab/stage_lab/description"},
                "/ENTRY[entry*]/em_lab/stage_lab/name": {"fun": "load_from", "terms": "em_lab/stage_lab/name"},
                "/ENTRY[entry*]/@version": {"fun": "load_from", "terms": "entry/attr_version"},
                "/ENTRY[entry*]/definition": {"fun": "load_from", "terms": "entry/definition"},
                "/ENTRY[entry*]/end_time": {"fun": "load_from", "terms": "entry/end_time"},
                "/ENTRY[entry*]/experiment_description": {"fun": "load_from", "terms": "entry/experiment_description"},
                "/ENTRY[entry*]/experiment_identifier": {"fun": "load_from", "terms": "entry/experiment_identifier"},
                "/ENTRY[entry*]/PROGRAM[program*]/program": {"fun": "load_from", "terms": "entry/program"},
                "/ENTRY[entry*]/PROGRAM[program*]/program/@version": {"fun": "load_from", "terms": "entry/program__attr_version"},
                "/ENTRY[entry*]/start_time": {"fun": "load_from", "terms": "entry/start_time"},
                "IGNORE": {"fun": "load_from_list_of_dict", "terms": "user"}}

In [None]:
entry_id = 1
template = {}
for nx_path, modifier in NxEmElnInput.items():
    if (nx_path != "IGNORE") and (nx_path != "UNCLEAR"):
        trg = variadic_path_to_specific_path(nx_path, [entry_id, 1])
        res = apply_modifier(modifier, yml)
        if res is not None:
            template[trg] = res

for key, val in template.items():
    print(f"{key}, {val}")