From b4a6ba8048f87ba324231238ac1b49c2da2a7baa Mon Sep 17 00:00:00 2001 From: Roger Hunwicks Date: Tue, 14 Oct 2025 23:15:13 -0400 Subject: [PATCH 1/4] Better error message from load_all_metadata - see HEA-572 --- pipelines/jobs/metadata.py | 47 ++++++++++++++++++++++++++++++-------- 1 file changed, 37 insertions(+), 10 deletions(-) diff --git a/pipelines/jobs/metadata.py b/pipelines/jobs/metadata.py index 089428f..65048fc 100644 --- a/pipelines/jobs/metadata.py +++ b/pipelines/jobs/metadata.py @@ -8,6 +8,7 @@ from io import BytesIO import django +import numpy as np import pandas as pd import requests from dagster import OpExecutionContext, job, op @@ -127,20 +128,46 @@ def load_metadata_for_model(context: OpExecutionContext, sheet_name: str, model: id_fields = "wealth_characteristic_label" else: id_fields = "code" + # Add primary keys if they are not already in the id_fields, + # so that we can save individual instances if required + if isinstance(id_fields, str): + id_fields = [id_fields] + if model._meta.pk.name not in id_fields: + keys_df = pd.DataFrame.from_records( + model.objects.all().values(model._meta.pk.name, *id_fields) + ) # NOQA: E501 + df = df.merge( + keys_df, + how="left", + on=id_fields, + ) + df[model._meta.pk.name] = df[model._meta.pk.name].replace(np.nan, None) + # Turn the dataframe into a set of unsaved model instances instances = [] for record in df.to_dict(orient="records"): - if isinstance(id_fields, str): - id_fields = [id_fields] - record = {k: v for k, v in record.items() if k in valid_field_names} instances.append(model(**record)) - instances = model.objects.bulk_create( - instances, - update_conflicts=True, - update_fields=[k for k in record if k not in id_fields], - unique_fields=id_fields, - ) - context.log.info(f"Created or updated {len(instances)} {sheet_name} instances") + try: + instances = model.objects.bulk_create( + instances, + update_conflicts=True, + update_fields=[k for k in record if k not in id_fields and k != model._meta.pk.name], + unique_fields=id_fields, + ) + context.log.info(f"Created or updated {len(instances)} {sheet_name} instances") + except Exception as e: + # Bulk create failed, so try creating/updating the instances one at a time to see which one failed + for i, instance in enumerate(instances): + try: + instance.save() + except Exception as e2: + key = [getattr(instance, id_field) for id_field in id_fields] + instance = { + k: v for k, v in instance.__dict__.items() if k not in ["_state", "created", "modified"] + } + raise RuntimeError( + f"Failed to create/update {model_name} instance {i} {key} from:\n{json.dumps(instance, indent=4, ensure_ascii=False)}" + ) from e2 @op From 04e1676edb07955126c1337967b366cd398a98c0 Mon Sep 17 00:00:00 2001 From: Roger Hunwicks Date: Tue, 14 Oct 2025 23:23:54 -0400 Subject: [PATCH 2/4] Remove unused exception variable from load_metadata_for_model - see HEA-572 --- pipelines/jobs/metadata.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pipelines/jobs/metadata.py b/pipelines/jobs/metadata.py index 65048fc..3a81a3a 100644 --- a/pipelines/jobs/metadata.py +++ b/pipelines/jobs/metadata.py @@ -155,19 +155,19 @@ def load_metadata_for_model(context: OpExecutionContext, sheet_name: str, model: unique_fields=id_fields, ) context.log.info(f"Created or updated {len(instances)} {sheet_name} instances") - except Exception as e: + except Exception: # Bulk create failed, so try creating/updating the instances one at a time to see which one failed for i, instance in enumerate(instances): try: instance.save() - except Exception as e2: + except Exception as e: key = [getattr(instance, id_field) for id_field in id_fields] instance = { k: v for k, v in instance.__dict__.items() if k not in ["_state", "created", "modified"] } raise RuntimeError( f"Failed to create/update {model_name} instance {i} {key} from:\n{json.dumps(instance, indent=4, ensure_ascii=False)}" - ) from e2 + ) from e @op From 73b51dd0b45456463519c2db3ca2f2564c78b52a Mon Sep 17 00:00:00 2001 From: Roger Hunwicks Date: Wed, 15 Oct 2025 10:33:48 -0400 Subject: [PATCH 3/4] Minor fix to load_all_metadata - see HEA-572 Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- pipelines/jobs/metadata.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/jobs/metadata.py b/pipelines/jobs/metadata.py index 3a81a3a..c8d0722 100644 --- a/pipelines/jobs/metadata.py +++ b/pipelines/jobs/metadata.py @@ -151,7 +151,7 @@ def load_metadata_for_model(context: OpExecutionContext, sheet_name: str, model: instances = model.objects.bulk_create( instances, update_conflicts=True, - update_fields=[k for k in record if k not in id_fields and k != model._meta.pk.name], + update_fields=[k for k in valid_field_names if k not in id_fields and k != model._meta.pk.name], unique_fields=id_fields, ) context.log.info(f"Created or updated {len(instances)} {sheet_name} instances") From c8bec4e4ae6cfe56c22b1e60ce8fd04f5cb04253 Mon Sep 17 00:00:00 2001 From: Roger Hunwicks Date: Wed, 15 Oct 2025 11:40:59 -0400 Subject: [PATCH 4/4] Clearer specification of metadata upload fields - see HEA-572 --- pipelines/jobs/metadata.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pipelines/jobs/metadata.py b/pipelines/jobs/metadata.py index c8d0722..24fb3a0 100644 --- a/pipelines/jobs/metadata.py +++ b/pipelines/jobs/metadata.py @@ -144,14 +144,15 @@ def load_metadata_for_model(context: OpExecutionContext, sheet_name: str, model: df[model._meta.pk.name] = df[model._meta.pk.name].replace(np.nan, None) # Turn the dataframe into a set of unsaved model instances instances = [] + fields = [k for k in df.columns if k in valid_field_names] for record in df.to_dict(orient="records"): - record = {k: v for k, v in record.items() if k in valid_field_names} + record = {k: v for k, v in record.items() if k in fields} instances.append(model(**record)) try: instances = model.objects.bulk_create( instances, update_conflicts=True, - update_fields=[k for k in valid_field_names if k not in id_fields and k != model._meta.pk.name], + update_fields=[k for k in fields if k not in id_fields and k != model._meta.pk.name], unique_fields=id_fields, ) context.log.info(f"Created or updated {len(instances)} {sheet_name} instances")