diff --git a/env.example b/env.example index 3c4775f..80d7217 100644 --- a/env.example +++ b/env.example @@ -46,6 +46,8 @@ PIP_INDEX_URL=https://pypi.python.org/simple/ # Ingestion Parameters BSS_METADATA_WORKBOOK='gdrive://Database Design/BSS Metadata' # 15XVXFjbom1sScVXbsetnbgAnPpRux2AgNy8w5U8bXdI BSS_METADATA_STORAGE_OPTIONS='{"token": "service_account", "access": "read_only", "creds": ${GOOGLE_APPLICATION_CREDENTIALS}, "root_file_id": "0AOJ0gJ8sjnO7Uk9PVA"}' +BSS_LABEL_RECOGNITION_WORKBOOK=./BSS_Labels.xlsx # or 'gdrive://Database Design/BSS Labels (${ENV}).xlsx' +BSS_LABEL_RECOGNITION_STORAGE_OPTIONS='{}' # or '{"token": "service_account", "access": "full_control", "creds": ${GOOGLE_APPLICATION_CREDENTIALS}, "root_file_id": "0AOJ0gJ8sjnO7Uk9PVA"}' BSS_FILES_FOLDER='gdrive://Discovery Folder/Baseline Storage Sheets (BSS)' BSS_FILES_STORAGE_OPTIONS='{"token": "service_account", "access": "read_only", "creds": ${GOOGLE_APPLICATION_CREDENTIALS}, "root_file_id": "0AOJ0gJ8sjnO7Uk9PVA"}' diff --git a/pipelines/__init__.py b/pipelines/__init__.py index f70caec..5cc4896 100644 --- a/pipelines/__init__.py +++ b/pipelines/__init__.py @@ -23,6 +23,7 @@ livelihood_activity_fixture, livelihood_activity_instances, livelihood_activity_label_dataframe, + livelihood_activity_label_recognition_dataframe, livelihood_activity_valid_instances, livelihood_summary_dataframe, livelihood_summary_label_dataframe, @@ -93,6 +94,7 @@ livelihood_summary_label_dataframe, all_livelihood_summary_labels_dataframe, summary_livelihood_summary_labels_dataframe, + livelihood_activity_label_recognition_dataframe, livelihood_activity_instances, livelihood_activity_valid_instances, livelihood_activity_fixture, diff --git a/pipelines/assets/fixtures.py b/pipelines/assets/fixtures.py index 5e0574c..8beb7e6 100644 --- a/pipelines/assets/fixtures.py +++ b/pipelines/assets/fixtures.py @@ -6,7 +6,6 @@ from io import StringIO import django -import numpy as np import pandas as pd from dagster import AssetExecutionContext, MetadataValue, Output, asset from django.core.files import File @@ -24,12 +23,13 @@ django.setup() from baseline.models import LivelihoodZoneBaseline # NOQA: E402 -from common.lookups import ClassifiedProductLookup # NOQA: E402 from common.management.commands import verbose_load_data # NOQA: E402 +from common.models import ClassifiedProduct # NOQA: E402 +from metadata.models import LivelihoodStrategyType # NOQA: E402 def validate_instances( - context: AssetExecutionContext, instances: dict[str, list[dict]], partition_key: str + context: AssetExecutionContext, config: BSSMetadataConfig, instances: dict[str, list[dict]], partition_key: str ) -> tuple[dict[str, list[dict]], dict]: """ Validate the instances for a set of related models, prior to loading them as a fixture. @@ -46,182 +46,204 @@ def validate_instances( subclass_instance = instance.copy() # The subclass instances also need a pointer to the base class instance subclass_instance["livelihoodactivity_ptr"] = ( - instance["wealth_group"][:3] + instance["livelihood_strategy"][2:] + [instance["wealth_group"][3]] + instance["wealth_group"][:3] # livelihood_zone, reference_year_end_date, wealth_group_category + + instance["livelihood_strategy"][2:] # strategy_type, season, product_id, additional_identifier + + [instance["wealth_group"][3]] # full_name ) subclass_livelihood_activities[instance["strategy_type"]].append(subclass_instance) instances = {**instances, **subclass_livelihood_activities} - # Create a dict to contain a dataframe of the instances for each model - dfs = {} + valid_instances = {model_name: [] for model_name in instances} + valid_keys = {model_name: [] for model_name in instances} errors = [] + current_timestamp = datetime.datetime.now(tz=datetime.timezone.utc).isoformat() for model_name, model_instances in instances.items(): - # Ignore models where we don't have any instances to validate. - if model_instances: - model = class_from_name(f"baseline.models.{model_name}") - # Build a list of expected field names - valid_field_names = [field.name for field in model._meta.concrete_fields] - # Also include values that point directly to the primary key of related objects - valid_field_names += [ - field.get_attname() - for field in model._meta.concrete_fields - if field.get_attname() not in valid_field_names - ] - # Apply some model-level defaults. We do this by iterating over the instances rather than using the dataframe - # because we want to return the instances without any other changes that might be made by the dataframe as a - # result of dtype conversion or NaNs. - current_timestamp = datetime.datetime.now(tz=datetime.timezone.utc).isoformat() - for instance in model_instances: - for field in ["created", "modified"]: - if field in valid_field_names and field not in instance: - instance[field] = current_timestamp - - df = pd.DataFrame.from_records(model_instances) + # Ignore models where we don't have any instances to validate. + if not model_instances: + continue - # Add the natural key for the instances to the dataframe, - # so we can validate foreign keys in child models. + model = class_from_name(f"baseline.models.{model_name}") + # Build a list of expected field names + valid_field_names = [field.name for field in model._meta.concrete_fields] + # Also include values that point directly to the primary key of related objects + valid_field_names += [ + field.get_attname() + for field in model._meta.concrete_fields + if field.get_attname() not in valid_field_names + ] + + # Iterate over the instances, validating each one in turn + for i, instance in enumerate(model_instances): + model_errors = [] + # Add created and modified timestamps if they are missing + for field in ["created", "modified"]: + if field in valid_field_names and field not in instance: + instance[field] = current_timestamp + + # Add the natural key so we can validate foreign keys in child models. if model_name == "LivelihoodZone": - df["key"] = df["code"] + instance["natural_key"] = instance["code"] elif model_name == "LivelihoodZoneBaseline": - df["key"] = df[["livelihood_zone_id", "reference_year_end_date"]].apply( - lambda x: [x.iloc[0], x.iloc[1]], axis="columns" - ) + instance["natural_key"] = [instance["livelihood_zone_id"], instance["reference_year_end_date"]] elif model_name == "Community": - df["key"] = df[["livelihood_zone_baseline", "full_name"]].apply( - lambda x: x.iloc[0] + [x.iloc[1]], axis="columns" - ) + instance["natural_key"] = instance["livelihood_zone_baseline"] + [instance["full_name"]] elif model_name == "WealthGroup": - df["key"] = df[["livelihood_zone_baseline", "wealth_group_category", "community"]].apply( - lambda x: x.iloc[0] + [x.iloc[1], x.iloc[2][-1] if x.iloc[2] else ""], axis="columns" - ) + instance["natural_key"] = instance["livelihood_zone_baseline"] + [ + instance["wealth_group_category"], + instance["full_name"], + ] elif model_name == "LivelihoodStrategy": - df["key"] = df[ - ["livelihood_zone_baseline", "strategy_type", "season", "product_id", "additional_identifier"] - ].apply( - lambda x: x.iloc[0] - + [x.iloc[1], x.iloc[2][0] if x.iloc[2] else "", x.iloc[3] if x.iloc[3] else "", x.iloc[4]], - axis="columns", + instance["natural_key"] = instance["livelihood_zone_baseline"] + [ + instance["strategy_type"], + # instance['season'] is a natural key itself, so it is stored as a list even though it only + # has a single component - the season name - so take the first element of the list. + # Natural key components must be "" rather than None + instance["season"][0] if instance["season"] else "", + instance["product_id"] or "", # Natural key components must be "" rather than None + instance["additional_identifier"], + ] + elif model_name in ["LivelihoodActivity"] + [x for x in LivelihoodStrategyType]: + instance["natural_key"] = ( + instance["wealth_group"][:3] # livelihood_zone, reference_year_end_date, wealth_group_category + + instance["livelihood_strategy"][2:] # strategy_type, season, product_id, additional_identifier + + [instance["wealth_group"][3]] # full_name ) - elif model_name == "LivelihoodActivity": - df["key"] = df[["livelihood_zone_baseline", "wealth_group", "livelihood_strategy"]].apply( - lambda x: [ - x.iloc[0][0], - x.iloc[0][1], - x.iloc[1][2], - x.iloc[2][2], - x.iloc[2][3], - x.iloc[2][4], - x.iloc[2][5], - x.iloc[1][3], - ], - axis="columns", + # The natural key is a list of strings, or possibly numbers, so validate that here to avoid confusing + # error messages later. + if "natural_key" not in instance: + raise RuntimeError(f"Missing natural_key for {model_name} {i} from {str(instance)}") + if not isinstance(instance["natural_key"], list): + raise RuntimeError( + f"Invalid natural_key {instance['natural_key']} for {model_name} {i} from {str(instance)}" ) + for component in instance["natural_key"]: + if not isinstance(component, (str, int)): + raise RuntimeError( + f"Invalid component '{component}' in natural_key {instance['natural_key']} for {model_name} {i} from {str(instance)}" + ) - # Save the model and dataframe so we can use them validate natural foreign keys later - dfs[model_name] = (model, df) + # Create a string reference to the record for error messages + record_reference = f"{model_name} {i} from " + if "bss_sheet" in instance: + record_reference += f"'{instance['bss_sheet']}'!" + if "bss_column" in instance and "bss_row" in instance: + record_reference += f"{instance['bss_column']}{instance['bss_row']}: " + elif "bss_column" in instance: + record_reference += f"{instance['bss_column']}:{instance['bss_column']}: " + elif "bss_row" in instance: + record_reference += f"{instance['bss_row']}:{instance['bss_row']}: " + record_reference += f"{str({k: v for k,v in instance.items()})}" # Apply field-level checks for field in model._meta.concrete_fields: - column = field.name if field.name in df else field.get_attname() + column = field.name if field.name in instance else field.get_attname() + # Ensure that mandatory fields have values if not field.blank: - if column not in df: - error = f"Missing mandatory field {field.name} for {model_name}" - errors.append(error) - continue - else: - for record in df[df[column].isnull()].itertuples(): - error = ( - f"Missing value for mandatory field {column} for {model_name} in record " - f"{record.Index} from cell '{record.bss_sheet}'!{record.bss_column}{record.bss_row}" - f'{str({k: v for k,v in record._asdict().items() if k != "Index"})}' - ) - errors.append(error) + if column not in instance: + error = f"Missing mandatory field {field.name} for {record_reference}" + model_errors.append(error) + elif not instance[column]: + error = f"Missing value for mandatory field {column} for {record_reference}" + model_errors.append(error) # Ensure the instances contain valid parent references for foreign keys if isinstance(field, models.ForeignKey): - if column not in df: - error = f"Missing mandatory foreign key {column} for {model_name}" - errors.append(error) - continue - if not field.null: - for record in df[df[column].isnull()].itertuples(): - error = ( - f"Missing mandatory foreign key {column} for {model_name} in record " - f"{record.Index} from cell '{record.bss_sheet}'!{record.bss_column}{record.bss_row}" - f'{str({k: v for k,v in record._asdict().items() if k != "Index"})}' - ) - errors.append(error) - # Validate foreign key values - if field.related_model.__name__ in dfs: - # The model is part of the fixture, so use the saved key from the dataframe - remote_keys = dfs[field.related_model.__name__][1]["key"] - else: - # The model is not in the fixture, so use the primary and natural keys for already saved instances - remote_keys = [instance.pk for instance in field.related_model.objects.all()] - if "natural_key" in dir(field.related_model): - remote_keys += [ - list(instance.natural_key()) for instance in field.related_model.objects.all() - ] - # Check the non-null foreign key values are in the remote keys - for record in df[ - df[column].replace("", pd.NA).notna() & ~df[column].isin(remote_keys) - ].itertuples(): - error = ( - f"Unrecognized '{column}' foreign key {getattr(record, column)} " - f"for {model_name} in record " - f"{record.Index} from cell '{record.bss_sheet}'!{record.bss_column}{record.bss_row}" - f'{str({k: v for k,v in record._asdict().items() if k != "Index"})}' - ) - errors.append(error) + if column not in instance: + error = f"Missing mandatory foreign key {column} for {record_reference}" + model_errors.append(error) + elif not field.null: + if not instance[column]: + error = f"Missing mandatory foreign key {column} for {record_reference}" + model_errors.append(error) + else: + # Validate foreign key values + if field.related_model.__name__ in instances: + # The related model is part of the fixture, so check that we already validated it + if field.related_model.__name__ not in valid_keys: + raise RuntimeError( + "Related model %s not validated yet but needed for %s" + % (field.related_model.__name__, model_name) + ) + elif field.related_model.__name__ not in valid_keys: + # The model is not in the fixture, and hasn't been checked already, so use the primary and + # natural keys for already saved instances + remote_keys = [instance.pk for instance in field.related_model.objects.all()] + if "natural_key" in dir(field.related_model): + remote_keys += [ + list(instance.natural_key()) for instance in field.related_model.objects.all() + ] + valid_keys[field.related_model.__name__] = remote_keys + # Check the non-null foreign key values are in the remote keys + if instance[column] not in valid_keys[field.related_model.__name__]: + error = ( + f"Unrecognized '{column}' foreign key {instance[column]} for {record_reference}." + ) + model_errors.append(error) # Use the Django model to validate the fields, so we can apply already defined model validations and # return informative error messages. fields = [ field for field in model._meta.concrete_fields - if not isinstance(field, models.ForeignKey) and field.name in df + if not isinstance(field, models.ForeignKey) and field.name in instance ] - instance = model() - for record in df.replace(np.nan, None).itertuples(): - for field in fields: - value = getattr(record, field.name) - if not value and field.null: - # Replace empty strings with None for optional fields - value = None - try: - field.clean(value, instance) - except Exception as e: - error = ( - f'Invalid {field.name} value {value}: "{", ".join(e.error_list[0].messages)}"\nRecord ' - f"{record.Index} from cell '{record.bss_sheet}'!{record.bss_column}{record.bss_row} " - f"for {model_name} in record " - f'{str({k: v for k,v in record._asdict().items() if k != "Index"})}.' - ) - errors.append(error) + model_instance = model() + for field in fields: + value = instance[field.name] + if not value and field.null: + # Replace empty strings with None for optional fields + value = None + try: + field.clean(value, model_instance) + except Exception as e: + error = ( + f'Invalid {field.name} value {value}: "{", ".join(e.error_list[0].messages)}"\n' + f"for {record_reference}." + ) + model_errors.append(error) # Check that the kcals/kg matches the values in the ClassifiedProduct model, if it's present in the BSS - if model_name == "LivelihoodActivity" and "product__kcals_per_unit" in df: - df["product"] = df["livelihood_strategy"].apply(lambda x: x[4]) - df = ClassifiedProductLookup().get_instances(df, "product", "product") - df["reference_kcals_per_unit"] = df["product"].apply(lambda x: x.kcals_per_unit) - df["reference_unit_of_measure"] = df["product"].apply(lambda x: x.unit_of_measure) - for record in df[df["product__kcals_per_unit"] != df["reference_kcals_per_unit"]].itertuples(): + if model_name == "LivelihoodActivity" and "product__kcals_per_unit" in instance: + product = ClassifiedProduct.objects.get(pk=instance["product_id"]) + if instance["product__kcals_per_unit"] != product.kcals_per_unit: error = ( - f"Non-standard value {record.product__kcals_per_unit} in '{record.column}' " - f"for {model_name} in record " - f'{str({k: v for k,v in record._asdict().items() if k != "Index"})}. ' - f"Expected {record.reference_kcals_per_unit}/{record.reference_unit_of_measure} for {record.product}" + f"Non-standard value {instance['product__kcals_per_unit']}; " + f"expected {product.kcals_per_unit}/{product.unit_of_measure} for {product}\n" + f"for {record_reference}." ) - errors.append(error) + model_errors.append(error) + + if model_errors: + errors += model_errors + else: + # Instance is valid, so add it to the list of valid instances + valid_instances[model_name].append(instance) + valid_keys[model_name].append(instance["natural_key"]) + + metadata = {} + for model_name in instances.keys(): + metadata[f"valid_{model_name}"] = f"{len(valid_instances[model_name])}/{len(instances[model_name])}" + metadata["total_valid_instances"] = ( + f"{sum(len(value) for value in valid_instances.values())}/{sum(len(value) for value in instances.values())}" + ) + metadata["preview"] = MetadataValue.md( + f"```json\n{json.dumps(valid_instances, indent=4, ensure_ascii=False)}\n```" + ) if errors: - raise RuntimeError("Missing or inconsistent metadata in BSS %s:\n%s" % (partition_key, "\n".join(errors))) + if config.strict: + raise RuntimeError("Missing or inconsistent metadata in BSS %s:\n%s" % (partition_key, "\n".join(errors))) + else: + context.log.warning( + "Ignoring missing or inconsistent metadata in BSS %s:\n%s" % (partition_key, "\n".join(errors)) + ) + metadata["errors"] = MetadataValue.md(f'```text\n{"\n".join(errors)}\n```') + # Move the preview metadata item to the end of the dict + metadata["preview"] = metadata.pop("preview") - metadata = {f"num_{key.lower()}": len(value) for key, value in instances.items()} - metadata["total_instances"] = sum(len(value) for value in instances.values()) - metadata["preview"] = MetadataValue.md(f"```json\n{json.dumps(instances, indent=4, ensure_ascii=False)}\n```") - return instances, metadata + return Output(valid_instances, metadata=metadata) def get_fixture_from_instances(instance_dict: dict[str, list[dict]]) -> tuple[list[dict], dict]: @@ -288,7 +310,7 @@ def get_fixture_from_instances(instance_dict: dict[str, list[dict]]) -> tuple[li metadata["total_instances"] = len(fixture) metadata["preview"] = MetadataValue.md(f"```json\n{json.dumps(fixture, indent=4, ensure_ascii=False)}\n```") - return fixture, metadata + return Output(fixture, metadata=metadata) def import_fixture(fixture: list[dict]) -> dict: @@ -311,7 +333,11 @@ def import_fixture(fixture: list[dict]) -> dict: metadata["total_instances"] = len(fixture) metadata["preview"] = MetadataValue.md(f"```json\n{json.dumps(fixture, indent=4, ensure_ascii=False)}\n```") metadata["output"] = MetadataValue.md(f"```\n{output_buffer.getvalue()}\n```") - return metadata + # No downstream assets, so we only need to return the metadata + return Output( + None, + metadata=metadata, + ) @asset(partitions_def=bss_instances_partitions_def, io_manager_key="json_io_manager") diff --git a/pipelines/assets/livelihood_activity.py b/pipelines/assets/livelihood_activity.py index 7878d2a..d931c9a 100644 --- a/pipelines/assets/livelihood_activity.py +++ b/pipelines/assets/livelihood_activity.py @@ -58,6 +58,8 @@ import django import pandas as pd from dagster import AssetExecutionContext, MetadataValue, Output, asset +from django.db.models.functions import Lower +from upath import UPath from ..configs import BSSMetadataConfig from ..partitions import bss_instances_partitions_def @@ -235,7 +237,7 @@ def get_livelihood_activity_regexes() -> list: # Create regex patterns for metadata attributes to replace the placeholders in the regexes placeholder_patterns = { "label_pattern": r"[a-zà-ÿ][a-zà-ÿ',/ \.\>\-\(\)]+?", - "product_pattern": r"(?P[a-zà-ÿ][a-zà-ÿ',/ \.\>\-\(\)]+?)", + "product_pattern": r"(?P[a-zà-ÿ][a-zà-ÿ1-9',/ \.\>\-\(\)]+?)", "season_pattern": r"(?Pseason [12]|saison [12]|[12][a-z] season||[12][a-zà-ÿ] saison|r[eé]colte principale|principale r[eé]colte|gu|deyr+?)", # NOQA: E501 "additional_identifier_pattern": r"\(?(?Prainfed|irrigated|pluviale?|irriguée|submersion libre|submersion contrôlée|flottant)\)?", "unit_of_measure_pattern": r"(?P[a-z]+)", @@ -255,6 +257,49 @@ def get_livelihood_activity_regexes() -> list: return compiled_regexes +@functools.cache +def get_livelihood_activity_regular_expression_attributes(label: str) -> dict: + """ + Return a dict of the attributes for a well-known Livelihood Activity label using regular expression matches. + """ + label = prepare_lookup(label) + attributes = { + "activity_label": None, + "strategy_type": None, + "is_start": None, + "product_id": None, + "unit_of_measure_id": None, + "season": None, + "additional_identifier": None, + "attribute": None, + "notes": None, + } + for pattern, strategy_type, is_start, attribute in get_livelihood_activity_regexes(): + match = pattern.fullmatch(label) + if match: + attributes.update(match.groupdict()) + attributes["activity_label"] = label + attributes["strategy_type"] = strategy_type + attributes["is_start"] = is_start + if isinstance(attribute, dict): + # Attribute contains a dict of attributes, e.g. notes, etc. + attributes.update(attribute) + else: + # Attribute is a string containing the attribute name + attributes["attribute"] = attribute + # Save the matched pattern to aid trouble-shooting + attributes["notes"] = ( + attributes["notes"] + " " + f' r"{pattern.pattern}"' + if attributes["notes"] + else f'r"{pattern.pattern}"' + ) + # Return the first matching pattern + return attributes + + # Didn't match any patterns, so return empty attributes + return attributes + + @functools.cache def get_livelihood_activity_label_map(activity_type: str) -> dict[str, dict]: """ @@ -306,40 +351,9 @@ def get_label_attributes(label: str, activity_type: str) -> pd.Series: try: return pd.Series(get_livelihood_activity_label_map(activity_type)[label]) except KeyError: - # No entry in the ActivityLabel model for this label, so attempt to match the label against the regexes - attributes = { - "activity_label": None, - "strategy_type": None, - "is_start": None, - "product_id": None, - "unit_of_measure_id": None, - "season": None, - "additional_identifier": None, - "attribute": None, - "notes": None, - } - for pattern, strategy_type, is_start, attribute in get_livelihood_activity_regexes(): - match = pattern.fullmatch(label) - if match: - attributes.update(match.groupdict()) - attributes["activity_label"] = label - attributes["strategy_type"] = strategy_type - attributes["is_start"] = is_start - if isinstance(attribute, dict): - # Attribute contains a dict of attributes, e.g. notes, etc. - attributes.update(attribute) - else: - # Attribute is a string containing the attribute name - attributes["attribute"] = attribute - # Save the matched pattern to aid trouble-shooting - attributes["notes"] = ( - attributes["notes"] + " " + f' r"{pattern.pattern}"' - if attributes["notes"] - else f'r"{pattern.pattern}"' - ) - return pd.Series(attributes) - # No pattern matched - return pd.Series(attributes).fillna(pd.NA) + # No entry in the ActivityLabel model instance for this label, so attempt to match against the regexes + attributes = get_livelihood_activity_regular_expression_attributes(label) + return pd.Series(attributes) def get_all_label_attributes(labels: pd.Series, activity_type: str, country_code: str | None) -> pd.DataFrame: @@ -385,8 +399,91 @@ def get_all_label_attributes(labels: pd.Series, activity_type: str, country_code return all_label_attributes +@asset +def livelihood_activity_label_recognition_dataframe( + context: AssetExecutionContext, + config: BSSMetadataConfig, + all_livelihood_activity_labels_dataframe: pd.DataFrame, + all_other_cash_income_labels_dataframe: pd.DataFrame, + all_wild_foods_labels_dataframe: pd.DataFrame, + all_livelihood_summary_labels_dataframe: pd.DataFrame, +): + """ + A saved spreadsheet showing how each BSS label is recognized, either from the ActivityLabel model or a regex. + """ + # Path to the output spreadsheet + p = UPath(config.bss_label_recognition_workbook, **config.bss_label_recognition_storage_options) + + all_livelihood_activity_labels_dataframe["activity_type"] = ( + ActivityLabel.LivelihoodActivityType.LIVELIHOOD_ACTIVITY + ) + all_other_cash_income_labels_dataframe["activity_type"] = ActivityLabel.LivelihoodActivityType.OTHER_CASH_INCOME + all_wild_foods_labels_dataframe["activity_type"] = ActivityLabel.LivelihoodActivityType.WILD_FOODS + all_livelihood_summary_labels_dataframe["activity_type"] = ActivityLabel.LivelihoodActivityType.LIVELIHOOD_SUMMARY + + # Build a dataframe of all the Activity Labels from all BSSs + all_labels_df = pd.concat( + [ + all_livelihood_activity_labels_dataframe, + all_other_cash_income_labels_dataframe, + all_wild_foods_labels_dataframe, + all_livelihood_summary_labels_dataframe, + ], + ignore_index=True, + ) + + # Add the regular expressions + regex_attributes_df = pd.DataFrame.from_records( + all_labels_df["label"].astype(str).map(get_livelihood_activity_regular_expression_attributes) + ) + all_labels_df = all_labels_df.join( + regex_attributes_df, + how="left", + ) + + # Add the labels from the database + db_labels_df = pd.DataFrame.from_records( + ActivityLabel.objects.annotate(label_lower=Lower("activity_label")).values( + "label_lower", + "activity_type", + "status", + "strategy_type", + "is_start", + "product_id", + "unit_of_measure_id", + "currency_id", + "season", + "additional_identifier", + "attribute", + "notes", + ) + ) + all_labels_df = all_labels_df.join( + db_labels_df.set_index(["label_lower", "activity_type"]), + on=("label_lower", "activity_type"), + how="left", + rsuffix="_db", + lsuffix="_regex", + ) + + # GDriveFS doesn't support updating existing files, it always create a new file with same name. + # This leads to multiple files with the same name in the folder, so we delete any existing files first. + if p.exists(): + # @TODO This doesn't work with the current version of gdrivefs, possibly because of an error + # with accessing Shared Drives. For now, we need to manually delete the old files before running + # the asset again. + # We need to experiment and possibly create a custom gdrivefs that reuses code from KiLuigi's GoogleDriveTarget + p.unlink() + + # Save the dataframe to an Excel workbook + with p.fs.open(p.path, mode="wb") as f: + with pd.ExcelWriter(f, engine="openpyxl") as writer: + all_labels_df.to_excel(writer, index=False, sheet_name="All Labels") + + def get_instances_from_dataframe( context: AssetExecutionContext, + config: BSSMetadataConfig, df: pd.DataFrame, livelihood_zone_baseline: LivelihoodZoneBaseline, activity_type: str, @@ -435,10 +532,14 @@ def get_instances_from_dataframe( ) # Check that we recognize all of the activity labels + # The unrecognized labels are rows after the header rows where column A is not blank, + # but the matching row in all_label_attributes dataframe has a blank activity_label. + # Group the resulting dataframe so that we have a label and a list of the rows where it occurs. allow_unrecognized_labels = True unrecognized_labels = ( df.iloc[num_header_rows:][ - (df["A"].iloc[num_header_rows:] != "") & (all_label_attributes.iloc[num_header_rows:, 0].isna()) + (df["A"].iloc[num_header_rows:] != "") + & (all_label_attributes.iloc[num_header_rows:]["activity_label"] == "") ] .groupby("A") .apply(lambda x: ", ".join(x.index.astype(str)), include_groups=False) @@ -727,8 +828,9 @@ def get_instances_from_dataframe( for i, livelihood_activity in enumerate(livelihood_activities_for_strategy): livelihood_activity["livelihood_strategy"] = livelihood_zone_baseline_key + [ livelihood_strategy["strategy_type"], - livelihood_strategy["season"] if livelihood_strategy["season"] else "", - livelihood_strategy["product_id"] if livelihood_strategy["product_id"] else "", + livelihood_strategy["season"] or "", # Natural key components must be "" rather than None + livelihood_strategy["product_id"] + or "", # Natural key components must be "" rather than None livelihood_strategy["additional_identifier"], ] @@ -1149,13 +1251,6 @@ def get_instances_from_dataframe( % (partition_key, worksheet_name, row, label) ) from e - raise_errors = True - if errors and raise_errors: - errors = "\n".join(errors) - raise RuntimeError( - "Missing or inconsistent metadata in BSS %s worksheet '%s':\n%s" % (partition_key, worksheet_name, errors) - ) - result = { "LivelihoodStrategy": livelihood_strategies, "LivelihoodActivity": livelihood_activities, @@ -1177,6 +1272,19 @@ def get_instances_from_dataframe( if not unrecognized_labels.empty: metadata["unrecognized_labels"] = MetadataValue.md(unrecognized_labels.to_markdown(index=False)) + if errors: + if config.strict: + raise RuntimeError( + "Missing or inconsistent metadata in BSS %s worksheet '%s':\n%s" + % (partition_key, worksheet_name, "\n".join(errors)) + ) + else: + context.log.error( + "Missing or inconsistent metadata in BSS %s worksheet '%s':\n%s" + % (partition_key, worksheet_name, "\n".join(errors)) + ) + metadata["errors"] = MetadataValue.md(f'```text\n{"\n".join(errors)}\n```') + return Output( result, metadata=metadata, @@ -1185,6 +1293,7 @@ def get_instances_from_dataframe( def get_annotated_instances_from_dataframe( context: AssetExecutionContext, + config: BSSMetadataConfig, livelihood_activity_dataframe: pd.DataFrame, livelihood_summary_dataframe: pd.DataFrame, activity_type: str, @@ -1203,6 +1312,7 @@ def get_annotated_instances_from_dataframe( # Get the detail LivelihoodStrategy and LivelihoodActivity instances output = get_instances_from_dataframe( context, + config, livelihood_activity_dataframe, livelihood_zone_baseline, activity_type, @@ -1214,6 +1324,7 @@ def get_annotated_instances_from_dataframe( # Get the summary instances reported_summary_output = get_instances_from_dataframe( context, + config, livelihood_summary_dataframe, livelihood_zone_baseline, ActivityLabel.LivelihoodActivityType.LIVELIHOOD_SUMMARY, @@ -1325,7 +1436,9 @@ def get_annotated_instances_from_dataframe( summary_df.replace(pd.NA, None).to_markdown(floatfmt=",.0f") ) - # Move the preview and metadata item to the end of the dict + # Move the preview and errors metadata item to the end of the dict + if "errors" in output.metadata: + output.metadata["errors"] = output.metadata.pop("errors") output.metadata["preview"] = output.metadata.pop("preview") return output @@ -1334,26 +1447,27 @@ def get_annotated_instances_from_dataframe( @asset(partitions_def=bss_instances_partitions_def, io_manager_key="json_io_manager") def livelihood_activity_instances( context: AssetExecutionContext, + config: BSSMetadataConfig, livelihood_activity_dataframe: pd.DataFrame, livelihood_summary_dataframe: pd.DataFrame, ) -> Output[dict]: """ LivelhoodStrategy and LivelihoodActivity instances extracted from the BSS. """ - output = get_annotated_instances_from_dataframe( + return get_annotated_instances_from_dataframe( context, + config, livelihood_activity_dataframe, livelihood_summary_dataframe, - ActivityLabel.LivelihoodActivityType.LIVELIHOOD_SUMMARY, + ActivityLabel.LivelihoodActivityType.LIVELIHOOD_ACTIVITY, len(HEADER_ROWS), ) - return output - @asset(partitions_def=bss_instances_partitions_def, io_manager_key="json_io_manager") def livelihood_activity_valid_instances( context: AssetExecutionContext, + config: BSSMetadataConfig, livelihood_activity_instances: dict, wealth_characteristic_instances: dict, ) -> Output[dict]: @@ -1369,16 +1483,7 @@ def livelihood_activity_valid_instances( **{"WealthGroup": wealth_characteristic_instances["WealthGroup"]}, **livelihood_activity_instances, } - valid_instances, metadata = validate_instances(context, livelihood_activity_instances, partition_key) - metadata = {f"num_{key.lower()}": len(value) for key, value in valid_instances.items()} - metadata["total_instances"] = sum(len(value) for value in valid_instances.values()) - metadata["preview"] = MetadataValue.md( - f"```json\n{json.dumps(valid_instances, indent=4, ensure_ascii=False)}\n```" - ) - return Output( - valid_instances, - metadata=metadata, - ) + return validate_instances(context, config, livelihood_activity_instances, partition_key) @asset(partitions_def=bss_instances_partitions_def, io_manager_key="json_io_manager") @@ -1390,11 +1495,7 @@ def livelihood_activity_fixture( """ Django fixture for the Livelihood Activities from a BSS. """ - fixture, metadata = get_fixture_from_instances(livelihood_activity_valid_instances) - return Output( - fixture, - metadata=metadata, - ) + return get_fixture_from_instances(livelihood_activity_valid_instances) @asset(partitions_def=bss_instances_partitions_def) @@ -1405,8 +1506,4 @@ def imported_livelihood_activities( """ Imported Django fixtures for a BSS, added to the Django database. """ - metadata = import_fixture(livelihood_activity_fixture) - return Output( - None, - metadata=metadata, - ) + return import_fixture(livelihood_activity_fixture) diff --git a/pipelines/assets/livelihood_activity_regexes.json b/pipelines/assets/livelihood_activity_regexes.json index b33ddc0..b84a6e5 100644 --- a/pipelines/assets/livelihood_activity_regexes.json +++ b/pipelines/assets/livelihood_activity_regexes.json @@ -703,6 +703,24 @@ true, "quantity_produced" ], + [ + "(?:wild foods?{separator_pattern} )?{product_pattern}{separator_pattern} \\(?{unit_of_measure_pattern} gathered\\)?", + null, + true, + "quantity_produced" + ], + [ + "(?:fish|fish \\(?dry\\)?|fish \\(?fresh\\)?){separator_pattern} {product_pattern}{separator_pattern} \\(?{unit_of_measure_pattern} gathered\\)?", + null, + true, + "quantity_produced" + ], + [ + "{product_pattern}{separator_pattern}\\(?{unit_of_measure_pattern} gathered\\)?", + null, + true, + "quantity_produced" + ], [ "{product_pattern} (?P[1|2]è[m|r]e récolte){separator_pattern} {nbr_pattern} mois", null, diff --git a/pipelines/assets/other_cash_income.py b/pipelines/assets/other_cash_income.py index 2792064..3eaae86 100644 --- a/pipelines/assets/other_cash_income.py +++ b/pipelines/assets/other_cash_income.py @@ -38,12 +38,11 @@ | 32 | income | | | | | | | | | | """ # NOQA: E501 -import json import os import django import pandas as pd -from dagster import AssetExecutionContext, MetadataValue, Output, asset +from dagster import AssetExecutionContext, Output, asset from ..configs import BSSMetadataConfig from ..partitions import bss_instances_partitions_def @@ -127,6 +126,7 @@ def summary_other_cash_income_labels_dataframe( @asset(partitions_def=bss_instances_partitions_def, io_manager_key="json_io_manager") def other_cash_income_instances( context: AssetExecutionContext, + config: BSSMetadataConfig, other_cash_income_dataframe: pd.DataFrame, livelihood_summary_dataframe: pd.DataFrame, ) -> Output[dict]: @@ -134,22 +134,22 @@ def other_cash_income_instances( LivelhoodStrategy and LivelihoodActivity instances extracted from the BSS. """ if other_cash_income_dataframe.empty: - output = {} + return Output({}, metadata={"message": "No Data2 worksheet found in this BSS"}) - output = get_annotated_instances_from_dataframe( + return get_annotated_instances_from_dataframe( context, + config, other_cash_income_dataframe, livelihood_summary_dataframe, ActivityLabel.LivelihoodActivityType.OTHER_CASH_INCOME, len(HEADER_ROWS), ) - return output - @asset(partitions_def=bss_instances_partitions_def, io_manager_key="json_io_manager") def other_cash_income_valid_instances( context: AssetExecutionContext, + config: BSSMetadataConfig, other_cash_income_instances: dict, wealth_characteristic_instances: dict, ) -> Output[dict]: @@ -165,16 +165,7 @@ def other_cash_income_valid_instances( **{"WealthGroup": wealth_characteristic_instances["WealthGroup"]}, **other_cash_income_instances, } - valid_instances, metadata = validate_instances(context, other_cash_income_instances, partition_key) - metadata = {f"num_{key.lower()}": len(value) for key, value in valid_instances.items()} - metadata["total_instances"] = sum(len(value) for value in valid_instances.values()) - metadata["preview"] = MetadataValue.md( - f"```json\n{json.dumps(valid_instances, indent=4, ensure_ascii=False)}\n```" - ) - return Output( - valid_instances, - metadata=metadata, - ) + return validate_instances(context, config, other_cash_income_instances, partition_key) @asset(partitions_def=bss_instances_partitions_def, io_manager_key="json_io_manager") @@ -186,11 +177,7 @@ def other_cash_income_fixture( """ Django fixture for the Livelihood Activities from a BSS. """ - fixture, metadata = get_fixture_from_instances(other_cash_income_valid_instances) - return Output( - fixture, - metadata=metadata, - ) + return get_fixture_from_instances(other_cash_income_valid_instances) @asset(partitions_def=bss_instances_partitions_def) @@ -201,8 +188,4 @@ def imported_other_cash_income_activities( """ Imported Django fixtures for a BSS, added to the Django database. """ - metadata = import_fixture(other_cash_income_fixture) - return Output( - None, - metadata=metadata, - ) + return import_fixture(other_cash_income_fixture) diff --git a/pipelines/assets/wealth_characteristic.py b/pipelines/assets/wealth_characteristic.py index eec7314..0b6b6b7 100644 --- a/pipelines/assets/wealth_characteristic.py +++ b/pipelines/assets/wealth_characteristic.py @@ -524,13 +524,14 @@ def wealth_characteristic_instances( @asset(partitions_def=bss_instances_partitions_def, io_manager_key="json_io_manager") def wealth_characteristic_valid_instances( context: AssetExecutionContext, + config: BSSMetadataConfig, wealth_characteristic_instances, ) -> Output[dict]: """ Valid WealthGroup and WealthGroupCharacteristicValue instances from a BSS, ready to be loaded via a Django fixture. """ partition_key = context.asset_partition_key_for_output() - valid_instances, metadata = validate_instances(context, wealth_characteristic_instances, partition_key) + valid_instances, metadata = validate_instances(context, config, wealth_characteristic_instances, partition_key) metadata = {f"num_{key.lower()}": len(value) for key, value in valid_instances.items()} metadata["total_instances"] = sum(len(value) for value in valid_instances.values()) metadata["preview"] = MetadataValue.md( diff --git a/pipelines/assets/wild_foods.py b/pipelines/assets/wild_foods.py index e663c2e..d19cba5 100644 --- a/pipelines/assets/wild_foods.py +++ b/pipelines/assets/wild_foods.py @@ -56,12 +56,11 @@ | 85 | TOTAL FISHING KCALS (%) | 0.009088932377 | 0.005577299413 | 0 | 0.009639776763 | 0.01133165595 | 0 | 0 | 0.009708632311 | 0 | """ # NOQA: E501 -import json import os import django import pandas as pd -from dagster import AssetExecutionContext, MetadataValue, Output, asset +from dagster import AssetExecutionContext, Output, asset from ..configs import BSSMetadataConfig from ..partitions import bss_instances_partitions_def @@ -137,6 +136,7 @@ def summary_wild_foods_labels_dataframe( @asset(partitions_def=bss_instances_partitions_def, io_manager_key="json_io_manager") def wild_foods_instances( context: AssetExecutionContext, + config: BSSMetadataConfig, wild_foods_dataframe: pd.DataFrame, livelihood_summary_dataframe: pd.DataFrame, ) -> Output[dict]: @@ -144,22 +144,22 @@ def wild_foods_instances( LivelhoodStrategy and LivelihoodActivity instances extracted from the BSS. """ if wild_foods_dataframe.empty: - output = {} + return Output({}, metadata={"message": "No Data3 worksheet found in this BSS"}) - output = get_annotated_instances_from_dataframe( + return get_annotated_instances_from_dataframe( context, + config, wild_foods_dataframe, livelihood_summary_dataframe, ActivityLabel.LivelihoodActivityType.WILD_FOODS, len(HEADER_ROWS), ) - return output - @asset(partitions_def=bss_instances_partitions_def, io_manager_key="json_io_manager") def wild_foods_valid_instances( context: AssetExecutionContext, + config: BSSMetadataConfig, wild_foods_instances: dict, wealth_characteristic_instances: dict, ) -> Output[dict]: @@ -175,16 +175,7 @@ def wild_foods_valid_instances( **{"WealthGroup": wealth_characteristic_instances["WealthGroup"]}, **wild_foods_instances, } - valid_instances, metadata = validate_instances(context, wild_foods_instances, partition_key) - metadata = {f"num_{key.lower()}": len(value) for key, value in valid_instances.items()} - metadata["total_instances"] = sum(len(value) for value in valid_instances.values()) - metadata["preview"] = MetadataValue.md( - f"```json\n{json.dumps(valid_instances, indent=4, ensure_ascii=False)}\n```" - ) - return Output( - valid_instances, - metadata=metadata, - ) + return validate_instances(context, config, wild_foods_instances, partition_key) @asset(partitions_def=bss_instances_partitions_def, io_manager_key="json_io_manager") @@ -196,11 +187,7 @@ def wild_foods_fixture( """ Django fixture for the Livelihood Activities from a BSS. """ - fixture, metadata = get_fixture_from_instances(wild_foods_valid_instances) - return Output( - fixture, - metadata=metadata, - ) + return get_fixture_from_instances(wild_foods_valid_instances) @asset(partitions_def=bss_instances_partitions_def) @@ -211,8 +198,4 @@ def imported_wild_foods_activities( """ Imported Django fixtures for a BSS, added to the Django database. """ - metadata = import_fixture(wild_foods_fixture) - return Output( - None, - metadata=metadata, - ) + return import_fixture(wild_foods_fixture) diff --git a/pipelines/configs.py b/pipelines/configs.py index 7d505fc..8d4aa27 100644 --- a/pipelines/configs.py +++ b/pipelines/configs.py @@ -9,6 +9,12 @@ class BSSMetadataConfig(Config): bss_metadata_workbook: str = EnvVar("BSS_METADATA_WORKBOOK") # The fsspec storage options for the BSS metadata spreadsheet bss_metadata_storage_options: dict = json.loads(EnvVar("BSS_METADATA_STORAGE_OPTIONS").get_value("{}")) + # The fspec path of the spreadsheet containing the BSS Labels and their recognition mechanism + bss_label_recognition_workbook: str = EnvVar("BSS_LABEL_RECOGNITION_WORKBOOK") + # The fsspec storage options for the BSS label recognition spreadsheet + bss_label_recognition_storage_options: dict = json.loads( + EnvVar("BSS_LABEL_RECOGNITION_STORAGE_OPTIONS").get_value("{}") + ) # The fspec path of the root folder containing the BSSs # For example: # "/home/user/Temp/Baseline Storage Sheets (BSS)" diff --git a/pipelines_tests/test_assets/test_livelihood_activity_regexes.json b/pipelines_tests/test_assets/test_livelihood_activity_regexes.json index fdfa490..26c992e 100644 --- a/pipelines_tests/test_assets/test_livelihood_activity_regexes.json +++ b/pipelines_tests/test_assets/test_livelihood_activity_regexes.json @@ -829,5 +829,35 @@ "attribute": "payment_per_time", "product_id": "grain", "unit_of_measure_id": "kg" + }, + "wild food: avocado (kg gathered)": { + "is_start": true, + "product_id": "avocado", + "unit_of_measure_id": "kg", + "attribute": "quantity_produced" + }, + "mangoes (kg gathered)": { + "is_start": true, + "product_id": "mangoes", + "unit_of_measure_id": "kg", + "attribute": "quantity_produced" + }, + "okra - kg gathered": { + "is_start": true, + "product_id": "okra", + "unit_of_measure_id": "kg", + "attribute": "quantity_produced" + }, + "Fish (dry) : Tilapia (dry/smoked) (kg gathered)": { + "is_start": true, + "product_id": "tilapia (dry/smoked)", + "unit_of_measure_id": "kg", + "attribute": "quantity_produced" + }, + "Fish type 2 (dried) - kg gathered": { + "is_start": true, + "product_id": "fish type 2 (dried)", + "unit_of_measure_id": "kg", + "attribute": "quantity_produced" } } \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index c1cc4db..19ff5e1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,13 @@ +[project] +name = "hea-database-development" +version = "0.1.0" +description = "The HEA Database manages HEA Baseline data." +readme = "README.md" +requires-python = ">=3.12" + [tool.ruff] line-length = 119 -target-version = 'py310' +target-version = 'py312' exclude = [ '.eggs', # exclude a few common directories in the '.git', # root of the project @@ -27,7 +34,7 @@ docstring-quotes = "double" [tool.black] line-length = 119 -target-version = ['py310'] +target-version = ['py312'] include = '\.pyi?$' exclude = ''' diff --git a/requirements/base.txt b/requirements/base.txt index f360375..785d848 100644 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -19,7 +19,7 @@ djangorestframework-gis==1.1 djangorestframework-xml==2.0.0 docutils factory-boy==3.2.1 -git+https://github.com/American-Institutes-for-Research/gdrivefs.git@e870c19e1d730635e3760e7ae21eebf9ddda765e +git+https://github.com/American-Institutes-for-Research/gdrivefs.git@f4ec53446e6a27be2e368b24dadfa9081e1272f2 googletrans-py==4.0.0 # Required for rendering Dagster graphs in Jupyter notebooks graphviz==0.21