### Pulls from the raw Obseravatory Google Sheets, does "strict" and "semi-strict" validation depending on constant
(Lax is pointless as it should pass and write no errors)

In [6]:
%load_ext mypy_ipython
%load_ext autoreload
%autoreload 2
%reload_ext autoreload
    
import os
import sys
import math
import pickle
import validators
from pathlib import Path, PurePath
import pandas as pd
from pydantic import ValidationError
from pprint import pprint
from validation_classes import samplingModel, measuredModel, samplingModelStrict, samplingModelSemiStrict
from typing import Any, Union

############################ CAUTION ##################################################
STRICT: bool      = False  # As defined by Ioulia, dates corrected, NA's removed etc
SEMI_STRICT: bool = True   # As defined by Ioulia but not checking for mandatory fields
                     # ints and str coerced to floats when possible
#######################################################################################

def parse_sample_sheets(sampling_strategy: str,
                        sheet_type: str,
                        addresses: list[tuple[str, str]],
                       ) -> None:
    
    for observatory in addresses:
        observatory_id, sheet_link = observatory
        #print(f"Observatory_id {observatory_id} sheet_link {sheet_link}")
        if not isinstance(sheet_link, str):
            #print(f"This is the sheet_link type {type(sheet_link)}")
            if isinstance(sheet_link, float): 
                if math.isnan(sheet_link):
                    print(f"Observatory {observatory_id} does not do {sampling_strategy}")
                    continue
            else:
                raise ValueError(f"Unknown URL value {sheet_link} to observatory {observatory_id}")
        else:
            if not validators.url(sheet_link):
                raise ValueError(f"URL {sheet_link=} is not valid")

            if observatory_id == "Plenzia": continue # Sheets not publically available
            # UMF soft_sed has two source_mat_ids
            if sampling_strategy == "soft_sediment" and observatory_id == "UMF":
                continue
    
            print(f"Processing {observatory_id}...")
            sampling_sheet_base: str = sheet_link.split("/edit")[0]
            sampling_sheet_suffix: str = "/gviz/tq?tqx=out:csv&sheet=%s"
            sample_sheet_link: str = sampling_sheet_base + sampling_sheet_suffix % sheet_type
            print(f"Sample sheet link: {sample_sheet_link}")
            df: pd.core.frame.DataFrame = pd.read_csv(sample_sheet_link, encoding='utf-8')
            data_records_all: dict[str, str] = df.to_dict(orient="records")
    
            # Many sheets have partially filled rows
            # The source_mat_id is manually curated and the PRIMARY_KEY
            # Therefore filter records on source_mat_id
            def filter_on_source_mat_id(d):
                # Bergen has it as source_material_id
                try:
                    value: Union[str, float, None] = d["source_mat_id"]
                except KeyError:
                    try:
                        value: Union[str, float, None] = d["source_material_id"]
                    except KeyError:
                        raise ValueError("Cannot find source_mat_id field")
                if isinstance(value, float):
                    if math.isnan(value):
                        return False
                elif value is None:
                    return False
                # Remove mis-formatted
                elif len(value.split("_")) < 6:
                    return False
                #Edge case of this otherwise blank entry having 6 "bits"
                elif value == "EMOBON_VB_Wa_230509_um_":
                    return False 
                else:
                    return True

            data_records_filtered: list[dict[str, str]] = list(filter(filter_on_source_mat_id, data_records_all))
    
            if len(data_records_all) > len(data_records_filtered):
                print(f"Discarded {len(data_records_all) - len(data_records_filtered)} records leaving {len(data_records_filtered)}.")

            if STRICT:
                model_type = f"{sheet_type}_strict"
            elif SEMI_STRICT:
                model_type = f"{sheet_type}_semistrict"
            else:
                model_type = sheet_type

            validator = validator_classes[model_type]
            validated_rows = []
            errors = [] # type is way too complicated to include :)
            for row in data_records_filtered:
                try:
                    vr = validator(**row)
                except ValidationError as e:
                    if observatory_id == "Bergen":
                        errors.append([(row["source_material_id"], e.errors())])
                    else:
                        errors.append([(row["source_mat_id"], e.errors())])
                else:
                    validated_rows.append(vr.model_dump())

            if errors:
                # errors is a list of lists where each inner list is a dict of row errors
                # where each isof key = source_mat_id and values is list of dicts each of which
                # is an error:
                total_number_errors: int = sum([len(row[1]) for e in errors for row in e])
                print(f"Errors were found... {total_number_errors} in total")
                save_dir_errors: Path = Path("./validation_errors")
                #outfile_name_pk: str = f"{observatory_id}_{sampling_strategy}_{model_type}_ERRORS.pickle"
                #out_path_pk: Path = os.path.join(save_dir, outfile_name_pk)
                #with open(out_path_pk, "wb") as f:
                #    pickle.dump(errors, f, pickle.HIGHEST_PROTOCOL)
                outfile_name_log: Path = Path(f"{observatory_id}_{sampling_strategy}_{model_type}_ERRORS.log")
                out_path_log: PurePath = PurePath(save_dir_errors, outfile_name_log)
                #os.path.join(save_dir, outfile_name_log)                
                with open(out_path_log, "w") as f:
                    pprint(errors, f)
            else:
                assert len(validated_rows) == len(data_records_filtered), "Not sure what happenned, but len(validated_rows) != len(data_filtered_records)"
                print("All records passed!")
            
                #for record in validated_rows:
                #    for field in record:
                #        print(f"Record {field} has value {record[field]} is type {type(record[field])}")

                if not STRICT and not SEMI_STRICT:
                    save_dir_logsheets: Path = Path("./logsheets")
                    outfile_name: Path = Path(f"{observatory_id}_{sampling_strategy}_{model_type}_validated.csv")
                    ndf = pd.DataFrame.from_records(validated_rows, index="source_mat_id")
                    ndf.to_csv(PurePath(save_dir_logsheets, outfile_name))
                    print(f"Written {os.path.join(save_dir_logsheets, outfile_name)}")

validator_classes = {"sampling": samplingModel,
                     "measured": measuredModel,
                     "sampling_strict": samplingModelStrict,
                     "sampling_semistrict": samplingModelSemiStrict
                    }

# Get list of all URL links to sampling sheets
# NB  you cant use a "with" closure here when reading the Pandas df
governance_logsheets_validated_csv = "./governance/logsheets_validated.csv"
df: pd.core.frame.DataFrame = pd.read_csv(governance_logsheets_validated_csv)
water_column_sheet_addresses: list[tuple[str, str]] = df[["observatory_id", "water_column"]].values.tolist()
soft_sediment_sheet_addresses: list[tuple[str, str]]  = df[["observatory_id", "soft_sediment"]].values.tolist()
del df

parse_sample_sheets("water_column", "sampling", water_column_sheet_addresses)
parse_sample_sheets("soft_sediment", "sampling", soft_sediment_sheet_addresses)

#There are not strict or semi-strict sheets for "measured"
#parse_sample_sheets("water_column", "measured", water_column_sheet_addresses)
#parse_sample_sheets("soft_sediment", "measured", soft_sediment_sheet_addresses)


The mypy_ipython extension is already loaded. To reload it, use:
  %reload_ext mypy_ipython
The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload
Processing ESC68N...
Sample sheet link: https://docs.google.com/spreadsheets/d/11_Eu0W1-sDiuzKx1cIl6YuxjRHmWezN6u9v3Ly8JZ3A/gviz/tq?tqx=out:csv&sheet=sampling
Discarded 114 records leaving 150.
All records passed!
Processing Bergen...
Sample sheet link: https://docs.google.com/spreadsheets/d/1HuXHiUJICZrmCrJ4EZDyU5aSCMzDAc1cy_tne5YVPTg/gviz/tq?tqx=out:csv&sheet=sampling
Errors were found... 280 in total
Processing MBAL4...
Sample sheet link: https://docs.google.com/spreadsheets/d/1xfrqraPa0auQ1O-C9RUo68RhxrPCDWkVMCAUbj79AZI/gviz/tq?tqx=out:csv&sheet=sampling
Errors were found... 72 in total
Processing BPNS...
Sample sheet link: https://docs.google.com/spreadsheets/d/1mEi4Bd2YR63WD0j54FQ6QkzcUw_As9Wilue9kaXO2DE/gviz/tq?tqx=out:csv&sheet=sampling
Errors were found... 596 in total
Processing ROSKOGO...
Sample sh

In [7]:
%reload_ext mypy_ipython
%mypy

[34mnote:[m In function [m[1m"parse_sample_sheets"[m:[m
                        value: Union[str, float, None]
[34mnote:[m By default the bodies of untyped functions are not checked, consider using --check-untyped-defs  [annotation-unchecked][m
                            value: Union[str, float, N
[34mnote:[m By default the bodies of untyped functions are not checked, consider using --check-untyped-defs  [annotation-unchecked][m
                data_records_filtered: list[dict[str, 
[1m[31merror:[m Argument 1 to [m[1m"filter"[m has incompatible type [m[1m"Callable[[Any], Any]"[m; expected [m[1m"Callable[[str], TypeGuard[dict[str, str]]]"[m  [m[33m[arg-type][m
[1m[31mFound 1 error in 1 file (checked 1 source file)[m


Type checking failed
