Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions changelog_entry.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
- bump: patch
changes:
changed:
- Improved logging within API
195 changes: 131 additions & 64 deletions policyengine_api/jobs/calculate_economy_simulation_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
ReformImpactsService,
)
from policyengine_api.endpoints.economy.compare import compare_economic_outputs
from policyengine_api.endpoints.economy.reform_impact import set_comment_on_job
from policyengine_api.constants import COUNTRY_PACKAGE_VERSIONS
from policyengine_api.country import COUNTRIES, create_policy_reform
from policyengine_api.utils.v2_v1_comparison import (
Expand All @@ -34,8 +33,6 @@

from policyengine_us import Microsimulation
from policyengine_uk import Microsimulation
import logging
import huggingface_hub

load_dotenv()

Expand Down Expand Up @@ -81,8 +78,9 @@
)

if not check_against_api_v2:
logging.warn(
"Didn't find any GOOGLE_APPLICATION_CREDENTIALS, so will not check APIv1 results against APIv2."
logger.log_text(

Check warning on line 81 in policyengine_api/jobs/calculate_economy_simulation_job.py

View check run for this annotation

Codecov / codecov/patch

policyengine_api/jobs/calculate_economy_simulation_job.py#L81

Added line #L81 was not covered by tests
"Didn't find any GOOGLE_APPLICATION_CREDENTIALS, so will not check APIv1 results against APIv2.",
severity="WARNING",
)


Expand All @@ -104,7 +102,28 @@
baseline_policy: dict,
reform_policy: Annotated[str, "String-formatted JSON"],
):
print(f"Starting CalculateEconomySimulationJob.run")
job_id = self._set_job_id()
job_setup_options = {

Check warning on line 106 in policyengine_api/jobs/calculate_economy_simulation_job.py

View check run for this annotation

Codecov / codecov/patch

policyengine_api/jobs/calculate_economy_simulation_job.py#L105-L106

Added lines #L105 - L106 were not covered by tests
"job_id": job_id,
"job_type": "CALCULATE_ECONOMY_SIMULATION_JOB",
"baseline_policy_id": baseline_policy_id,
"reform_policy_id": policy_id,
"country_id": country_id,
"region": region,
"dataset": dataset,
"time_period": time_period,
"options": options,
"baseline_policy": baseline_policy,
"reform_policy": reform_policy,
}
logger.log_struct(

Check warning on line 119 in policyengine_api/jobs/calculate_economy_simulation_job.py

View check run for this annotation

Codecov / codecov/patch

policyengine_api/jobs/calculate_economy_simulation_job.py#L119

Added line #L119 was not covered by tests
{
"message": "Starting job with job_id {job_id}",
**job_setup_options,
},
severity="INFO",
)

try:
# Configure inputs
# Note for anyone modifying options_hash: redis-queue treats ":" as a namespace
Expand All @@ -115,6 +134,13 @@
baseline_policy_id = int(baseline_policy_id)
policy_id = int(policy_id)

logger.log_struct(

Check warning on line 137 in policyengine_api/jobs/calculate_economy_simulation_job.py

View check run for this annotation

Codecov / codecov/patch

policyengine_api/jobs/calculate_economy_simulation_job.py#L137

Added line #L137 was not covered by tests
{
"message": "Checking if completed result already exists",
**job_setup_options,
}
)

# Check if a completed result already exists
existing = reform_impacts_service.get_all_reform_impacts(
country_id,
Expand All @@ -127,7 +153,12 @@
COUNTRY_PACKAGE_VERSIONS[country_id],
)
if any(x["status"] == "ok" for x in existing):
print(f"Job already completed successfully")
logger.log_struct(

Check warning on line 156 in policyengine_api/jobs/calculate_economy_simulation_job.py

View check run for this annotation

Codecov / codecov/patch

policyengine_api/jobs/calculate_economy_simulation_job.py#L156

Added line #L156 was not covered by tests
{
"message": "Found existing completed result",
**job_setup_options,
}
)
return

# Save identifiers for later commenting on processing status
Expand All @@ -141,7 +172,12 @@
options_hash,
)

print("Checking existing reform impacts...")
logger.log_struct(

Check warning on line 175 in policyengine_api/jobs/calculate_economy_simulation_job.py

View check run for this annotation

Codecov / codecov/patch

policyengine_api/jobs/calculate_economy_simulation_job.py#L175

Added line #L175 was not covered by tests
{
"message": "No existing completed result found, proceeding with computation",
**job_setup_options,
}
)
# Query existing impacts before deleting
existing = reform_impacts_service.get_all_reform_impacts(
country_id,
Expand All @@ -153,7 +189,6 @@
options_hash,
COUNTRY_PACKAGE_VERSIONS[country_id],
)
print(f"Found {len(existing)} existing impacts before delete")

# Delete any existing reform impact rows with the same identifiers
reform_impacts_service.delete_reform_impact(
Expand All @@ -166,8 +201,12 @@
options_hash,
)

print("Deleted existing computing impacts")

logger.log_struct(

Check warning on line 204 in policyengine_api/jobs/calculate_economy_simulation_job.py

View check run for this annotation

Codecov / codecov/patch

policyengine_api/jobs/calculate_economy_simulation_job.py#L204

Added line #L204 was not covered by tests
{
"message": "Creating new reform impact computation process",
**job_setup_options,
}
)
# Insert new reform impact row with status 'computing'
reform_impacts_service.set_reform_impact(
country_id=country_id,
Expand All @@ -187,49 +226,53 @@
),
)

comment = lambda x: set_comment_on_job(x, *identifiers)
comment("Computing baseline")
logger.log_struct(

Check warning on line 229 in policyengine_api/jobs/calculate_economy_simulation_job.py

View check run for this annotation

Codecov / codecov/patch

policyengine_api/jobs/calculate_economy_simulation_job.py#L229

Added line #L229 was not covered by tests
{
"message": "Starting computation of baseline policy",
**job_setup_options,
}
)

# If comparing against API v2, start job
if check_against_api_v2:

# Populate v2/v1 comparison config data; we will pass this
# to GCP logs either on error or success
comparison_data = {
"country_id": country_id,
"region": region,
"reform_policy": reform_policy,
"baseline_policy": baseline_policy,
"reform_policy_id": policy_id,
"baseline_policy_id": baseline_policy_id,
"time_period": time_period,
"dataset": dataset,
"v1_country_package_version": COUNTRY_PACKAGE_VERSIONS[
country_id
],
"v2_id": None, # Unavailable until job starts
"v2_country_package_version": None, # Unavailable until job completes
}
try:

Check warning on line 239 in policyengine_api/jobs/calculate_economy_simulation_job.py

View check run for this annotation

Codecov / codecov/patch

policyengine_api/jobs/calculate_economy_simulation_job.py#L239

Added line #L239 was not covered by tests
# Populate v2/v1 comparison config data; we will pass this
# to GCP logs either on error or success
comparison_data = {

Check warning on line 242 in policyengine_api/jobs/calculate_economy_simulation_job.py

View check run for this annotation

Codecov / codecov/patch

policyengine_api/jobs/calculate_economy_simulation_job.py#L242

Added line #L242 was not covered by tests
**job_setup_options,
"v1_country_package_version": COUNTRY_PACKAGE_VERSIONS[
country_id
],
"v2_id": None, # Unavailable until job starts
"v2_country_package_version": None, # Unavailable until job completes
}

# Set up APIv2 job
comment("Setting up APIv2 job")
sim_config: dict[str, Any] = self.api_v2._setup_sim_options(
country_id=country_id,
scope="macro",
reform_policy=reform_policy,
baseline_policy=baseline_policy,
time_period=time_period,
region=region,
dataset=dataset,
model_version=COUNTRY_PACKAGE_VERSIONS[country_id],
data_version=(
uk_dataset_version
if country_id == "uk"
else us_dataset_version
),
)
# Set up APIv2 job
logger.log_struct(

Check warning on line 252 in policyengine_api/jobs/calculate_economy_simulation_job.py

View check run for this annotation

Codecov / codecov/patch

policyengine_api/jobs/calculate_economy_simulation_job.py#L252

Added line #L252 was not covered by tests
{
"message": "Setting up APIv2 job",
**comparison_data,
}
)
sim_config: dict[str, Any] = (

Check warning on line 258 in policyengine_api/jobs/calculate_economy_simulation_job.py

View check run for this annotation

Codecov / codecov/patch

policyengine_api/jobs/calculate_economy_simulation_job.py#L258

Added line #L258 was not covered by tests
self.api_v2._setup_sim_options(
country_id=country_id,
scope="macro",
reform_policy=reform_policy,
baseline_policy=baseline_policy,
time_period=time_period,
region=region,
dataset=dataset,
model_version=COUNTRY_PACKAGE_VERSIONS[country_id],
data_version=(
uk_dataset_version
if country_id == "uk"
else us_dataset_version
),
)
)

try:
api_v2_execution = self.api_v2.run(sim_config)
execution_id = self.api_v2.get_execution_id(
api_v2_execution
Expand All @@ -245,7 +288,7 @@
"v1_impact": None,
"v2_impact": None,
"v1_v2_diff": None,
"message": "CALCULATE_ECONOMY_SIMULATION_JOB: APIv2 job started",
"message": "APIv2 job started",
}
)
)
Expand All @@ -266,6 +309,12 @@
)

# Compute baseline economy
logger.log_struct(

Check warning on line 312 in policyengine_api/jobs/calculate_economy_simulation_job.py

View check run for this annotation

Codecov / codecov/patch

policyengine_api/jobs/calculate_economy_simulation_job.py#L312

Added line #L312 was not covered by tests
{
"message": "Computing baseline economy...",
**job_setup_options,
}
)
baseline_economy = self._compute_economy(
country_id=country_id,
region=region,
Expand All @@ -274,9 +323,14 @@
options=options,
policy_json=baseline_policy,
)
comment("Computing reform")

# Compute reform economy
logger.log_struct(

Check warning on line 328 in policyengine_api/jobs/calculate_economy_simulation_job.py

View check run for this annotation

Codecov / codecov/patch

policyengine_api/jobs/calculate_economy_simulation_job.py#L328

Added line #L328 was not covered by tests
{
"message": "Computing reform economy...",
**job_setup_options,
}
)
reform_economy = self._compute_economy(
country_id=country_id,
region=region,
Expand All @@ -288,7 +342,12 @@

baseline_economy = baseline_economy["result"]
reform_economy = reform_economy["result"]
comment("Comparing baseline and reform")
logger.log_struct(

Check warning on line 345 in policyengine_api/jobs/calculate_economy_simulation_job.py

View check run for this annotation

Codecov / codecov/patch

policyengine_api/jobs/calculate_economy_simulation_job.py#L345

Added line #L345 was not covered by tests
{
"message": "Computed baseline and reform economies",
**job_setup_options,
}
)
impact: dict[str, Any] = compare_economic_outputs(
baseline_economy, reform_economy, country_id=country_id
)
Expand All @@ -314,7 +373,7 @@
"v2_impact": api_v2_output,
"v1_v2_diff": None,
"v2_country_package_version": v2_country_package_version,
"message": "CALCULATE_ECONOMY_SIMULATION_JOB: APIv2 job completed",
"message": "APIv2 job completed",
}
)
)
Expand All @@ -337,7 +396,7 @@
"v2_impact": api_v2_output,
"v1_v2_diff": v1_v2_diff,
"v2_country_package_version": v2_country_package_version,
"message": "CALCULATE_ECONOMY_SIMULATION_JOB: APIv2 job comparison with APIv1 completed",
"message": "APIv2 job comparison with APIv1 completed",
}
)
)
Expand All @@ -356,7 +415,7 @@
"v1_impact": impact,
"v2_impact": None,
"v1_v2_diff": None,
"message": "CALCULATE_ECONOMY_SIMULATION_JOB: APIv2 job failed",
"message": "APIv2 job failed",
}
)
logger.log_struct(
Expand Down Expand Up @@ -390,14 +449,7 @@
# Show that API v1 failed and API v2 was not run
error_log: V2V1Comparison = V2V1Comparison.model_validate(
{
"country_id": country_id,
"region": region,
"reform_policy": reform_policy,
"baseline_policy": baseline_policy,
"reform_policy_id": policy_id,
"baseline_policy_id": baseline_policy_id,
"time_period": time_period,
"dataset": dataset,
**job_setup_options,
"v1_country_package_version": COUNTRY_PACKAGE_VERSIONS[
country_id
],
Expand All @@ -408,15 +460,30 @@
"v1_impact": None,
"v2_impact": None,
"v1_v2_diff": None,
"message": "CALCULATE_ECONOMY_SIMULATION_JOB: APIv1 job failed",
"message": "APIv1 job failed",
}
)
logger.log_struct(
error_log.model_dump(mode="json"), severity="ERROR"
)
print(f"Error setting reform impact: {str(e)}")
logger.log_struct(

Check warning on line 469 in policyengine_api/jobs/calculate_economy_simulation_job.py

View check run for this annotation

Codecov / codecov/patch

policyengine_api/jobs/calculate_economy_simulation_job.py#L469

Added line #L469 was not covered by tests
{
"message": "Error during job execution",
"error": str(e),
**job_setup_options,
}
)
raise e

def _set_job_id(self) -> str:
"""
Generate a unique job ID based on the current timestamp and a random number.
This is used to track the job in the database and logs.
"""
timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
random_number = np.random.randint(1000, 9999)
return f"job_{timestamp}_{random_number}"

def _compute_economy(
self, country_id, region, dataset, time_period, options, policy_json
):
Expand Down
1 change: 1 addition & 0 deletions policyengine_api/utils/v2_v1_comparison.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class V2V1Comparison(BaseModel):
baseline_policy_id: int
time_period: str
dataset: str
job_id: str
v1_country_package_version: str
# v2_country_package_version comes from v2 API results, so unavailable during runtime errors
v2_country_package_version: str | None = None
Expand Down
3 changes: 3 additions & 0 deletions tests/fixtures/utils/v2_v1_comparison.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
VALID_JOB_ID = "valid_job_id"

valid_v2_v1_comparison = {
"country_id": "us",
"region": "ca",
Expand All @@ -15,6 +17,7 @@
"v2_impact": {"impact_value": 120},
"v1_v2_diff": {"impact_value": 20},
"message": None,
"job_id": VALID_JOB_ID,
}

invalid_v2_v1_comparison = {
Expand Down
9 changes: 9 additions & 0 deletions tests/unit/jobs/test_calculate_economy_simulation_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,3 +365,12 @@ def test__given_uk_dataset__returns_none(self):
result = sim_api._setup_data(dataset, country_id, region)
# Assert the expected value
assert result is None


class TestSetJobId:
def test__sets_job_id(self):

job = object.__new__(CalculateEconomySimulationJob)
job_id = job._set_job_id()

assert isinstance(job_id, str)
2 changes: 2 additions & 0 deletions tests/unit/utils/test_v2_v1_comparison.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from tests.fixtures.utils.v2_v1_comparison import (
valid_v2_v1_comparison,
invalid_v2_v1_comparison,
VALID_JOB_ID,
)


Expand Down Expand Up @@ -68,6 +69,7 @@ def test__given_valid_inputs__returns_schema(self):
comparison_instance.v2_country_package_version
== valid_v2_v1_comparison["v2_country_package_version"]
)
assert comparison_instance.job_id == VALID_JOB_ID

def test__given_invalid_inputs__raises_validation_error(self):

Expand Down
Loading