Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/pr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,5 @@ jobs:
- name: Run smoke tests only
run: python -m pytest -m smoke --reruns 2 --reruns-delay 5 -v -s
env:
RUN_SMOKE_TESTS: "1"
RUN_SMOKE_TESTS: "1"
POLICYENGINE_GITHUB_MICRODATA_AUTH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
4 changes: 4 additions & 0 deletions changelog_entry.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
- bump: patch
changes:
fixed:
- Fix NumPy 2.1.0 random seed overflow issue by ensuring seeds are always non-negative
79 changes: 79 additions & 0 deletions tests/core/commons/test_between_function.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
"""Test the between function from commons.formulas."""

import numpy as np
import pytest
from policyengine_core.commons.formulas import between


class TestBetweenFunction:
"""Test the between function for checking if values are within bounds."""

def test_between_inclusive_both(self):
"""Test between with both bounds inclusive (default)."""
values = np.array([1, 2, 3, 4, 5])
result = between(values, 2, 4)
expected = np.array([False, True, True, True, False])
np.testing.assert_array_equal(result, expected)

def test_between_inclusive_left(self):
"""Test between with only left bound inclusive."""
values = np.array([1, 2, 3, 4, 5])
result = between(values, 2, 4, inclusive="left")
expected = np.array([False, True, True, False, False])
np.testing.assert_array_equal(result, expected)

def test_between_inclusive_right(self):
"""Test between with only right bound inclusive."""
values = np.array([1, 2, 3, 4, 5])
result = between(values, 2, 4, inclusive="right")
expected = np.array([False, False, True, True, False])
np.testing.assert_array_equal(result, expected)

def test_between_inclusive_neither(self):
"""Test between with neither bound inclusive."""
values = np.array([1, 2, 3, 4, 5])
result = between(values, 2, 4, inclusive="neither")
expected = np.array([False, False, True, False, False])
np.testing.assert_array_equal(result, expected)

def test_between_with_floats(self):
"""Test between with float values."""
values = np.array([1.5, 2.5, 3.5, 4.5])
result = between(values, 2.0, 4.0)
expected = np.array([False, True, True, False])
np.testing.assert_array_equal(result, expected)

def test_between_with_negative_values(self):
"""Test between with negative values."""
values = np.array([-3, -2, -1, 0, 1, 2, 3])
result = between(values, -2, 2)
expected = np.array([False, True, True, True, True, True, False])
np.testing.assert_array_equal(result, expected)

def test_between_single_value(self):
"""Test between with a single value."""
value = 5
assert between(value, 0, 10).item() == True
assert between(value, 0, 4).item() == False
assert between(value, 5, 10).item() == True
assert between(value, 5, 10, inclusive="left").item() == True
assert between(value, 5, 10, inclusive="neither").item() == False

def test_between_edge_cases(self):
"""Test between with edge cases."""
# Empty array
values = np.array([])
result = between(values, 0, 10)
assert len(result) == 0

# All values equal to bounds
values = np.array([5, 5, 5])
result = between(values, 5, 5)
expected = np.array([True, True, True])
np.testing.assert_array_equal(result, expected)

# Bounds in reverse order (upper < lower)
values = np.array([1, 2, 3, 4, 5])
result = between(values, 4, 2) # This should return all False
expected = np.array([False, False, False, False, False])
np.testing.assert_array_equal(result, expected)
79 changes: 79 additions & 0 deletions tests/core/commons/test_is_in_function.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
"""Test the is_in function from commons.formulas."""

import numpy as np
import pytest
from policyengine_core.commons.formulas import is_in


class TestIsInFunction:
"""Test the is_in function for checking membership."""

def test_is_in_basic(self):
"""Test basic is_in functionality."""
values = np.array([1, 2, 3, 4, 5])
result = is_in(values, 2, 4)
expected = np.array([False, True, False, True, False])
np.testing.assert_array_equal(result, expected)

def test_is_in_with_list(self):
"""Test is_in with a list of targets."""
values = np.array([1, 2, 3, 4, 5])
result = is_in(values, [2, 4])
expected = np.array([False, True, False, True, False])
np.testing.assert_array_equal(result, expected)

def test_is_in_with_strings(self):
"""Test is_in with string values."""
values = np.array(["apple", "banana", "cherry", "date"])
result = is_in(values, "banana", "date")
expected = np.array([False, True, False, True])
np.testing.assert_array_equal(result, expected)

def test_is_in_with_mixed_types(self):
"""Test is_in with mixed numeric types."""
values = np.array([1.0, 2.0, 3.0, 4.0])
result = is_in(values, 2, 4) # int targets, float values
expected = np.array([False, True, False, True])
np.testing.assert_array_equal(result, expected)

def test_is_in_single_value(self):
"""Test is_in with a single value."""
value = 5
assert is_in(value, 5) == True
assert is_in(value, 1, 2, 3, 4, 5) == True
assert is_in(value, 1, 2, 3) == False
assert is_in(value, [1, 2, 3, 4, 5]) == True

def test_is_in_empty_targets(self):
"""Test is_in with empty targets."""
values = np.array([1, 2, 3])
result = is_in(values, [])
expected = np.array([False, False, False])
np.testing.assert_array_equal(result, expected)

def test_is_in_empty_values(self):
"""Test is_in with empty values array."""
values = np.array([])
result = is_in(values, 1, 2, 3)
assert len(result) == 0

def test_is_in_with_none(self):
"""Test is_in with None values."""
values = np.array([1, 2, None, 4], dtype=object)
result = is_in(values, None)
expected = np.array([False, False, True, False])
np.testing.assert_array_equal(result, expected)

def test_is_in_all_match(self):
"""Test is_in where all values match."""
values = np.array([1, 1, 1, 1])
result = is_in(values, 1)
expected = np.array([True, True, True, True])
np.testing.assert_array_equal(result, expected)

def test_is_in_no_match(self):
"""Test is_in where no values match."""
values = np.array([1, 2, 3, 4])
result = is_in(values, 5, 6, 7)
expected = np.array([False, False, False, False])
np.testing.assert_array_equal(result, expected)
152 changes: 152 additions & 0 deletions tests/core/commons/test_random_seed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
"""Test the random function with large entity IDs to ensure no overflow."""

import numpy as np
import pytest
from unittest.mock import Mock
from policyengine_core.commons.formulas import random


class TestRandomSeed:
"""Test random seed handling to prevent NumPy overflow errors."""

def test_random_with_large_entity_ids(self):
"""Test that random() handles large entity IDs without overflow."""
# Create a mock population with simulation
population = Mock()
population.simulation = Mock()
population.simulation.count_random_calls = 0
population.entity = Mock()
population.entity.key = "person"

# Mock the get_holder and get_known_periods
holder = Mock()
holder.get_known_periods.return_value = []
population.simulation.get_holder.return_value = holder
population.simulation.default_calculation_period = Mock()

# Test with very large entity IDs that would cause overflow
# if not handled properly
large_ids = np.array(
[
np.iinfo(np.int64).max - 1000, # Very large positive ID
np.iinfo(np.int64).max // 2, # Large positive ID
1234567890123456789, # Another large ID
]
)

# Mock the population call to return large IDs
population.side_effect = lambda key, period: large_ids

# This should not raise a ValueError about negative seeds
result = random(population)

# Check that we got valid random values
assert isinstance(result, np.ndarray)
assert len(result) == len(large_ids)
assert all(0 <= val <= 1 for val in result)

def test_random_seed_consistency(self):
"""Test that random() produces consistent results for same inputs."""
# Create mock population
population = Mock()
population.simulation = Mock()
population.simulation.count_random_calls = 0
population.entity = Mock()
population.entity.key = "household"

holder = Mock()
holder.get_known_periods.return_value = []
population.simulation.get_holder.return_value = holder
population.simulation.default_calculation_period = Mock()

# Use same IDs
ids = np.array([1, 2, 3])
population.side_effect = lambda key, period: ids

# First call
result1 = random(population)

# Reset count to simulate same conditions
population.simulation.count_random_calls = 0

# Second call with same conditions
result2 = random(population)

# Results should be identical
np.testing.assert_array_equal(result1, result2)

def test_random_increments_call_count(self):
"""Test that random() increments the call counter."""
population = Mock()
population.simulation = Mock()
population.simulation.count_random_calls = 0
population.entity = Mock()
population.entity.key = "person"

holder = Mock()
holder.get_known_periods.return_value = []
population.simulation.get_holder.return_value = holder
population.simulation.default_calculation_period = Mock()

ids = np.array([1, 2, 3])
population.side_effect = lambda key, period: ids

# First call
random(population)
assert population.simulation.count_random_calls == 1

# Second call
random(population)
assert population.simulation.count_random_calls == 2

def test_random_handles_negative_ids(self):
"""Test that random() handles negative IDs properly."""
population = Mock()
population.simulation = Mock()
population.simulation.count_random_calls = 0
population.entity = Mock()
population.entity.key = "person"

holder = Mock()
holder.get_known_periods.return_value = []
population.simulation.get_holder.return_value = holder
population.simulation.default_calculation_period = Mock()

# Include negative IDs
ids = np.array([-100, -1, 0, 1, 100])
population.side_effect = lambda key, period: ids

# Should handle negative IDs without errors
result = random(population)

assert isinstance(result, np.ndarray)
assert len(result) == len(ids)
assert all(0 <= val <= 1 for val in result)

def test_no_negative_seed_error_with_overflow(self):
"""Test that seed calculation overflow doesn't cause negative seed error."""
population = Mock()
population.simulation = Mock()
population.simulation.count_random_calls = 999999999 # Large count
population.entity = Mock()
population.entity.key = "person"

holder = Mock()
holder.get_known_periods.return_value = []
population.simulation.get_holder.return_value = holder
population.simulation.default_calculation_period = Mock()

# Use the exact ID that would cause overflow in old implementation
# This ID when multiplied by 100 and added to count_random_calls
# would overflow int64 and become negative
overflow_id = np.array([np.iinfo(np.int64).max // 100])
population.side_effect = lambda key, period: overflow_id

# In the old implementation, this would raise:
# ValueError: Seed must be between 0 and 2**32 - 1
# With the fix using abs(), it should work fine
result = random(population)

assert isinstance(result, np.ndarray)
assert len(result) == 1
assert 0 <= result[0] <= 1