From b5f1b5bbf5e8fc27204a91be884d23df63807980 Mon Sep 17 00:00:00 2001 From: Anthony Volk Date: Wed, 4 Jun 2025 15:36:24 -0400 Subject: [PATCH 1/3] fix: Improve logging --- changelog_entry.yaml | 4 + .../jobs/calculate_economy_simulation_job.py | 186 ++++++++++++------ policyengine_api/utils/v2_v1_comparison.py | 1 + tests/fixtures/utils/v2_v1_comparison.py | 3 + .../test_calculate_economy_simulation_job.py | 9 + tests/unit/utils/test_v2_v1_comparison.py | 2 + 6 files changed, 149 insertions(+), 56 deletions(-) diff --git a/changelog_entry.yaml b/changelog_entry.yaml index e69de29bb..c62e011e9 100644 --- a/changelog_entry.yaml +++ b/changelog_entry.yaml @@ -0,0 +1,4 @@ +- bump: patch + changes: + changed: + - Improved logging within API \ No newline at end of file diff --git a/policyengine_api/jobs/calculate_economy_simulation_job.py b/policyengine_api/jobs/calculate_economy_simulation_job.py index a3125fec0..99dd70a85 100644 --- a/policyengine_api/jobs/calculate_economy_simulation_job.py +++ b/policyengine_api/jobs/calculate_economy_simulation_job.py @@ -18,7 +18,6 @@ ReformImpactsService, ) from policyengine_api.endpoints.economy.compare import compare_economic_outputs -from policyengine_api.endpoints.economy.reform_impact import set_comment_on_job from policyengine_api.constants import COUNTRY_PACKAGE_VERSIONS from policyengine_api.country import COUNTRIES, create_policy_reform from policyengine_api.utils.v2_v1_comparison import ( @@ -34,8 +33,6 @@ from policyengine_us import Microsimulation from policyengine_uk import Microsimulation -import logging -import huggingface_hub load_dotenv() @@ -81,8 +78,9 @@ ) if not check_against_api_v2: - logging.warn( - "Didn't find any GOOGLE_APPLICATION_CREDENTIALS, so will not check APIv1 results against APIv2." + logger.log_text( + "Didn't find any GOOGLE_APPLICATION_CREDENTIALS, so will not check APIv1 results against APIv2.", + severity="WARNING", ) @@ -104,7 +102,28 @@ def run( baseline_policy: dict, reform_policy: Annotated[str, "String-formatted JSON"], ): - print(f"Starting CalculateEconomySimulationJob.run") + job_id = self._set_job_id() + job_setup_options = { + "job_id": job_id, + "job_type": "CALCULATE_ECONOMY_SIMULATION_JOB", + "baseline_policy_id": baseline_policy_id, + "policy_id": policy_id, + "country_id": country_id, + "region": region, + "dataset": dataset, + "time_period": time_period, + "options": options, + "baseline_policy": baseline_policy, + "reform_policy": reform_policy, + } + logger.log_struct( + { + "message": "Starting job with job_id {job_id}", + **job_setup_options, + }, + severity="INFO", + ) + try: # Configure inputs # Note for anyone modifying options_hash: redis-queue treats ":" as a namespace @@ -115,6 +134,13 @@ def run( baseline_policy_id = int(baseline_policy_id) policy_id = int(policy_id) + logger.log_struct( + { + "message": "Checking if completed result already exists", + **job_setup_options, + } + ) + # Check if a completed result already exists existing = reform_impacts_service.get_all_reform_impacts( country_id, @@ -127,7 +153,12 @@ def run( COUNTRY_PACKAGE_VERSIONS[country_id], ) if any(x["status"] == "ok" for x in existing): - print(f"Job already completed successfully") + logger.log_struct( + { + "message": "Found existing completed result", + **job_setup_options, + } + ) return # Save identifiers for later commenting on processing status @@ -141,7 +172,12 @@ def run( options_hash, ) - print("Checking existing reform impacts...") + logger.log_struct( + { + "message": "No existing completed result found, proceeding with computation", + **job_setup_options, + } + ) # Query existing impacts before deleting existing = reform_impacts_service.get_all_reform_impacts( country_id, @@ -153,7 +189,6 @@ def run( options_hash, COUNTRY_PACKAGE_VERSIONS[country_id], ) - print(f"Found {len(existing)} existing impacts before delete") # Delete any existing reform impact rows with the same identifiers reform_impacts_service.delete_reform_impact( @@ -166,8 +201,12 @@ def run( options_hash, ) - print("Deleted existing computing impacts") - + logger.log_struct( + { + "message": "Creating new reform impact computation process", + **job_setup_options, + } + ) # Insert new reform impact row with status 'computing' reform_impacts_service.set_reform_impact( country_id=country_id, @@ -187,49 +226,53 @@ def run( ), ) - comment = lambda x: set_comment_on_job(x, *identifiers) - comment("Computing baseline") + logger.log_struct( + { + "message": "Starting computation of baseline policy", + **job_setup_options, + } + ) # If comparing against API v2, start job if check_against_api_v2: - # Populate v2/v1 comparison config data; we will pass this - # to GCP logs either on error or success - comparison_data = { - "country_id": country_id, - "region": region, - "reform_policy": reform_policy, - "baseline_policy": baseline_policy, - "reform_policy_id": policy_id, - "baseline_policy_id": baseline_policy_id, - "time_period": time_period, - "dataset": dataset, - "v1_country_package_version": COUNTRY_PACKAGE_VERSIONS[ - country_id - ], - "v2_id": None, # Unavailable until job starts - "v2_country_package_version": None, # Unavailable until job completes - } + try: + # Populate v2/v1 comparison config data; we will pass this + # to GCP logs either on error or success + comparison_data = { + **job_setup_options, + "v1_country_package_version": COUNTRY_PACKAGE_VERSIONS[ + country_id + ], + "v2_id": None, # Unavailable until job starts + "v2_country_package_version": None, # Unavailable until job completes + } - # Set up APIv2 job - comment("Setting up APIv2 job") - sim_config: dict[str, Any] = self.api_v2._setup_sim_options( - country_id=country_id, - scope="macro", - reform_policy=reform_policy, - baseline_policy=baseline_policy, - time_period=time_period, - region=region, - dataset=dataset, - model_version=COUNTRY_PACKAGE_VERSIONS[country_id], - data_version=( - uk_dataset_version - if country_id == "uk" - else us_dataset_version - ), - ) + # Set up APIv2 job + logger.log_struct( + { + "message": "Setting up APIv2 job", + **comparison_data, + } + ) + sim_config: dict[str, Any] = ( + self.api_v2._setup_sim_options( + country_id=country_id, + scope="macro", + reform_policy=reform_policy, + baseline_policy=baseline_policy, + time_period=time_period, + region=region, + dataset=dataset, + model_version=COUNTRY_PACKAGE_VERSIONS[country_id], + data_version=( + uk_dataset_version + if country_id == "uk" + else us_dataset_version + ), + ) + ) - try: api_v2_execution = self.api_v2.run(sim_config) execution_id = self.api_v2.get_execution_id( api_v2_execution @@ -245,7 +288,7 @@ def run( "v1_impact": None, "v2_impact": None, "v1_v2_diff": None, - "message": "CALCULATE_ECONOMY_SIMULATION_JOB: APIv2 job started", + "message": "APIv2 job started", } ) ) @@ -266,6 +309,12 @@ def run( ) # Compute baseline economy + logger.log_struct( + { + "message": "Computing baseline economy...", + **job_setup_options, + } + ) baseline_economy = self._compute_economy( country_id=country_id, region=region, @@ -274,9 +323,14 @@ def run( options=options, policy_json=baseline_policy, ) - comment("Computing reform") # Compute reform economy + logger.log_struct( + { + "message": "Computing reform economy...", + **job_setup_options, + } + ) reform_economy = self._compute_economy( country_id=country_id, region=region, @@ -288,7 +342,12 @@ def run( baseline_economy = baseline_economy["result"] reform_economy = reform_economy["result"] - comment("Comparing baseline and reform") + logger.log_struct( + { + "message": "Computed baseline and reform economies", + **job_setup_options, + } + ) impact: dict[str, Any] = compare_economic_outputs( baseline_economy, reform_economy, country_id=country_id ) @@ -314,7 +373,7 @@ def run( "v2_impact": api_v2_output, "v1_v2_diff": None, "v2_country_package_version": v2_country_package_version, - "message": "CALCULATE_ECONOMY_SIMULATION_JOB: APIv2 job completed", + "message": "APIv2 job completed", } ) ) @@ -337,7 +396,7 @@ def run( "v2_impact": api_v2_output, "v1_v2_diff": v1_v2_diff, "v2_country_package_version": v2_country_package_version, - "message": "CALCULATE_ECONOMY_SIMULATION_JOB: APIv2 job comparison with APIv1 completed", + "message": "APIv2 job comparison with APIv1 completed", } ) ) @@ -356,7 +415,7 @@ def run( "v1_impact": impact, "v2_impact": None, "v1_v2_diff": None, - "message": "CALCULATE_ECONOMY_SIMULATION_JOB: APIv2 job failed", + "message": "APIv2 job failed", } ) logger.log_struct( @@ -408,15 +467,30 @@ def run( "v1_impact": None, "v2_impact": None, "v1_v2_diff": None, - "message": "CALCULATE_ECONOMY_SIMULATION_JOB: APIv1 job failed", + "message": "APIv1 job failed", } ) logger.log_struct( error_log.model_dump(mode="json"), severity="ERROR" ) - print(f"Error setting reform impact: {str(e)}") + logger.log_struct( + { + "message": "Error during job execution", + "error": str(e), + **job_setup_options, + } + ) raise e + def _set_job_id(self) -> str: + """ + Generate a unique job ID based on the current timestamp and a random number. + This is used to track the job in the database and logs. + """ + timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S") + random_number = np.random.randint(1000, 9999) + return f"job_{timestamp}_{random_number}" + def _compute_economy( self, country_id, region, dataset, time_period, options, policy_json ): diff --git a/policyengine_api/utils/v2_v1_comparison.py b/policyengine_api/utils/v2_v1_comparison.py index b74d39c85..20b8e1fd0 100644 --- a/policyengine_api/utils/v2_v1_comparison.py +++ b/policyengine_api/utils/v2_v1_comparison.py @@ -16,6 +16,7 @@ class V2V1Comparison(BaseModel): baseline_policy_id: int time_period: str dataset: str + job_id: str v1_country_package_version: str # v2_country_package_version comes from v2 API results, so unavailable during runtime errors v2_country_package_version: str | None = None diff --git a/tests/fixtures/utils/v2_v1_comparison.py b/tests/fixtures/utils/v2_v1_comparison.py index 737e7d5f3..6e8f1ce38 100644 --- a/tests/fixtures/utils/v2_v1_comparison.py +++ b/tests/fixtures/utils/v2_v1_comparison.py @@ -1,3 +1,5 @@ +VALID_JOB_ID = "valid_job_id" + valid_v2_v1_comparison = { "country_id": "us", "region": "ca", @@ -15,6 +17,7 @@ "v2_impact": {"impact_value": 120}, "v1_v2_diff": {"impact_value": 20}, "message": None, + "job_id": VALID_JOB_ID, } invalid_v2_v1_comparison = { diff --git a/tests/unit/jobs/test_calculate_economy_simulation_job.py b/tests/unit/jobs/test_calculate_economy_simulation_job.py index c534d709d..ac6ad9acf 100644 --- a/tests/unit/jobs/test_calculate_economy_simulation_job.py +++ b/tests/unit/jobs/test_calculate_economy_simulation_job.py @@ -365,3 +365,12 @@ def test__given_uk_dataset__returns_none(self): result = sim_api._setup_data(dataset, country_id, region) # Assert the expected value assert result is None + + +class TestSetJobId: + def test__sets_job_id(self): + + job = object.__new__(CalculateEconomySimulationJob) + job_id = job._set_job_id() + + assert isinstance(job_id, str) diff --git a/tests/unit/utils/test_v2_v1_comparison.py b/tests/unit/utils/test_v2_v1_comparison.py index 5c0e9a765..d6c1cbc74 100644 --- a/tests/unit/utils/test_v2_v1_comparison.py +++ b/tests/unit/utils/test_v2_v1_comparison.py @@ -6,6 +6,7 @@ from tests.fixtures.utils.v2_v1_comparison import ( valid_v2_v1_comparison, invalid_v2_v1_comparison, + VALID_JOB_ID, ) @@ -68,6 +69,7 @@ def test__given_valid_inputs__returns_schema(self): comparison_instance.v2_country_package_version == valid_v2_v1_comparison["v2_country_package_version"] ) + assert comparison_instance.job_id == VALID_JOB_ID def test__given_invalid_inputs__raises_validation_error(self): From cdb9f53ba613926e62452108b0bbc6359c12e662 Mon Sep 17 00:00:00 2001 From: Anthony Volk Date: Wed, 4 Jun 2025 16:07:38 -0400 Subject: [PATCH 2/3] fix: Include job ID in error --- .../jobs/calculate_economy_simulation_job.py | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/policyengine_api/jobs/calculate_economy_simulation_job.py b/policyengine_api/jobs/calculate_economy_simulation_job.py index 99dd70a85..5aaaf71a8 100644 --- a/policyengine_api/jobs/calculate_economy_simulation_job.py +++ b/policyengine_api/jobs/calculate_economy_simulation_job.py @@ -449,14 +449,7 @@ def run( # Show that API v1 failed and API v2 was not run error_log: V2V1Comparison = V2V1Comparison.model_validate( { - "country_id": country_id, - "region": region, - "reform_policy": reform_policy, - "baseline_policy": baseline_policy, - "reform_policy_id": policy_id, - "baseline_policy_id": baseline_policy_id, - "time_period": time_period, - "dataset": dataset, + **job_setup_options, "v1_country_package_version": COUNTRY_PACKAGE_VERSIONS[ country_id ], From aaf326faa3cd57a8a5712cd381a7a8b12fdd79bd Mon Sep 17 00:00:00 2001 From: Anthony Volk Date: Wed, 4 Jun 2025 16:27:21 -0400 Subject: [PATCH 3/3] fix: Fix key --- policyengine_api/jobs/calculate_economy_simulation_job.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/policyengine_api/jobs/calculate_economy_simulation_job.py b/policyengine_api/jobs/calculate_economy_simulation_job.py index 5aaaf71a8..ba949bd66 100644 --- a/policyengine_api/jobs/calculate_economy_simulation_job.py +++ b/policyengine_api/jobs/calculate_economy_simulation_job.py @@ -107,7 +107,7 @@ def run( "job_id": job_id, "job_type": "CALCULATE_ECONOMY_SIMULATION_JOB", "baseline_policy_id": baseline_policy_id, - "policy_id": policy_id, + "reform_policy_id": policy_id, "country_id": country_id, "region": region, "dataset": dataset,