From 7b67c1a7d5bb7c58778fa304d83a34d98a63c260 Mon Sep 17 00:00:00 2001 From: Ken Lewerentz Date: Fri, 3 Apr 2026 09:53:13 +0700 Subject: [PATCH 1/4] perf: bulk membership creation with INSERT ON CONFLICT DO NOTHING Replace per-record ORM creates and Command.create() tuples with raw SQL INSERT ... ON CONFLICT (unique_cols) DO NOTHING for bulk membership creation. Duplicates are silently skipped and the inserted count is returned via cursor.rowcount. Updates _import_registrants and _add_beneficiaries to use the new skip_duplicates path, with ORM cache invalidation after raw SQL inserts. --- spp_programs/__manifest__.py | 2 +- spp_programs/models/cycle_membership.py | 72 ++++++- .../models/managers/cycle_manager_base.py | 35 ++-- .../models/managers/eligibility_manager.py | 14 +- spp_programs/models/program_membership.py | 77 ++++++-- spp_programs/tests/__init__.py | 1 + spp_programs/tests/test_bulk_membership.py | 178 ++++++++++++++++++ 7 files changed, 342 insertions(+), 37 deletions(-) create mode 100644 spp_programs/tests/test_bulk_membership.py diff --git a/spp_programs/__manifest__.py b/spp_programs/__manifest__.py index c7eefaa1..f84a56ae 100644 --- a/spp_programs/__manifest__.py +++ b/spp_programs/__manifest__.py @@ -4,7 +4,7 @@ "name": "OpenSPP Programs", "summary": "Manage programs, cycles, beneficiary enrollment, entitlements (cash and in-kind), payments, and fund tracking for social protection.", "category": "OpenSPP/Core", - "version": "19.0.2.0.6", + "version": "19.0.2.0.7", "sequence": 1, "author": "OpenSPP.org", "website": "https://github.com/OpenSPP/OpenSPP2", diff --git a/spp_programs/models/cycle_membership.py b/spp_programs/models/cycle_membership.py index 1e10faed..e1ac6415 100644 --- a/spp_programs/models/cycle_membership.py +++ b/spp_programs/models/cycle_membership.py @@ -1,7 +1,11 @@ # Part of OpenSPP. See LICENSE file for full copyright and licensing details. -from odoo import _, fields, models +import logging + +from odoo import _, api, fields, models from odoo.exceptions import ValidationError +_logger = logging.getLogger(__name__) + class SPPCycleMembership(models.Model): _name = "spp.cycle.membership" @@ -87,6 +91,72 @@ def open_registrant_form(self): }, } + @api.model + def bulk_create_memberships(self, vals_list, chunk_size=1000, skip_duplicates=False): + """Create cycle memberships in bulk with optional duplicate skipping. + + :param vals_list: List of dicts with membership values + :param chunk_size: Number of records per batch (default 1000) + :param skip_duplicates: When True, use INSERT ... ON CONFLICT DO NOTHING + to silently skip duplicate (partner_id, cycle_id) pairs. + Returns the count of inserted rows. + :return: Recordset (skip_duplicates=False) or int count (skip_duplicates=True) + """ + if not vals_list: + return 0 if skip_duplicates else self.env["spp.cycle.membership"] + + if skip_duplicates: + return self._bulk_insert_on_conflict(vals_list, chunk_size) + + return self.create(vals_list) + + def _bulk_insert_on_conflict(self, vals_list, chunk_size=1000): + """Insert cycle memberships using raw SQL with ON CONFLICT DO NOTHING. + + :param vals_list: List of dicts with at least partner_id, cycle_id, state + :param chunk_size: Number of records per SQL INSERT batch + :return: Total number of rows actually inserted + """ + cr = self.env.cr + uid = self.env.uid + total_inserted = 0 + + for i in range(0, len(vals_list), chunk_size): + batch = vals_list[i : i + chunk_size] + values = [] + params = [] + for v in batch: + values.append("(%s, %s, %s, %s, %s, %s, now(), now())") + params.extend( + [ + v["partner_id"], + v["cycle_id"], + v.get("state", "draft"), + v.get("enrollment_date", fields.Date.today()), + uid, + uid, + ] + ) + + sql = """ + INSERT INTO spp_cycle_membership + (partner_id, cycle_id, state, enrollment_date, + create_uid, write_uid, create_date, write_date) + VALUES {} + ON CONFLICT (partner_id, cycle_id) DO NOTHING + """.format( # noqa: S608 # nosec B608 + ", ".join(values) + ) + cr.execute(sql, params) + total_inserted += cr.rowcount + + _logger.info( + "Bulk inserted %d cycle memberships (%d skipped as duplicates)", + total_inserted, + len(vals_list) - total_inserted, + ) + return total_inserted + def unlink(self): if not self: return diff --git a/spp_programs/models/managers/cycle_manager_base.py b/spp_programs/models/managers/cycle_manager_base.py index 1a09533b..10716d47 100644 --- a/spp_programs/models/managers/cycle_manager_base.py +++ b/spp_programs/models/managers/cycle_manager_base.py @@ -835,25 +835,26 @@ def _add_beneficiaries(self, cycle, beneficiaries, state="draft", do_count=False """Add Beneficiaries :param cycle: Recordset of cycle - :param beneficiaries: Recordset of beneficiaries + :param beneficiaries: List of partner IDs :param state: String state to be set to beneficiary :param do_count: Boolean - set to False to not run compute functions - :return: Integer - count of not enrolled members - """ - new_beneficiaries = [] - for r in beneficiaries: - new_beneficiaries.append( - [ - 0, - 0, - { - "partner_id": r, - "enrollment_date": fields.Date.today(), - "state": state, - }, - ] - ) - cycle.update({"cycle_membership_ids": new_beneficiaries}) + :return: Integer - count of inserted members + """ + today = fields.Date.today() + vals_list = [ + { + "partner_id": partner_id, + "cycle_id": cycle.id, + "enrollment_date": today, + "state": state, + } + for partner_id in beneficiaries + ] + self.env["spp.cycle.membership"].bulk_create_memberships(vals_list, skip_duplicates=True) + + # Raw SQL bypasses the ORM cache — invalidate so subsequent reads + # (e.g. cycle.cycle_membership_ids) reflect the new rows. + cycle.invalidate_recordset(["cycle_membership_ids"]) if do_count: # Update Statistics diff --git a/spp_programs/models/managers/eligibility_manager.py b/spp_programs/models/managers/eligibility_manager.py index d54f7945..5392af4a 100644 --- a/spp_programs/models/managers/eligibility_manager.py +++ b/spp_programs/models/managers/eligibility_manager.py @@ -1,7 +1,7 @@ # Part of OpenSPP. See LICENSE file for full copyright and licensing details. import logging -from odoo import Command, _, api, fields, models +from odoo import _, api, fields, models from odoo.addons.job_worker.delay import group @@ -174,11 +174,13 @@ def mark_import_as_done(self): def _import_registrants(self, new_beneficiaries, state="draft", do_count=False): _logger.info("Importing %s beneficiaries", len(new_beneficiaries)) - _logger.info("updated") - beneficiaries_val = [] - for beneficiary in new_beneficiaries: - beneficiaries_val.append(Command.create({"partner_id": beneficiary.id, "state": state})) - self.program_id.update({"program_membership_ids": beneficiaries_val}) + vals_list = [{"partner_id": b.id, "program_id": self.program_id.id, "state": state} for b in new_beneficiaries] + count = self.env["spp.program.membership"].bulk_create_memberships(vals_list, skip_duplicates=True) + _logger.info("Imported %d new memberships (%d duplicates skipped)", count, len(vals_list) - count) + + # Raw SQL bypasses the ORM cache — invalidate so subsequent reads + # (e.g. program.program_membership_ids) reflect the new rows. + self.program_id.invalidate_recordset(["program_membership_ids"]) if do_count: # Compute Statistics diff --git a/spp_programs/models/program_membership.py b/spp_programs/models/program_membership.py index c22f5a0d..e6ed9041 100644 --- a/spp_programs/models/program_membership.py +++ b/spp_programs/models/program_membership.py @@ -1,4 +1,5 @@ # Part of OpenSPP. See LICENSE file for full copyright and licensing details. +import logging from lxml import etree @@ -7,6 +8,8 @@ from . import constants +_logger = logging.getLogger(__name__) + class SPPProgramMembership(models.Model): _inherit = [ @@ -345,26 +348,26 @@ def action_exit(self): } ) - @api.model_create_multi - def bulk_create_memberships(self, vals_list, chunk_size=1000): + @api.model + def bulk_create_memberships(self, vals_list, chunk_size=1000, skip_duplicates=False): """Create program memberships in bulk with optional chunking. This helper is intended for large enrollment jobs (e.g. CEL-driven bulk enrollment) where thousands of memberships need to be created in a single operation. - It preserves the normal create() semantics, including: - - standard ORM validations and constraints - - audit logging (via spp_audit rules) - - source tracking mixins - - The only optimisation is to: - - accept already-prepared value dicts - - optionally split very large batches into smaller chunks to keep - memory use and per-transaction work bounded. + :param vals_list: List of dicts with membership values + :param chunk_size: Number of records per batch (default 1000) + :param skip_duplicates: When True, use INSERT ... ON CONFLICT DO NOTHING + to silently skip duplicate (partner_id, program_id) pairs instead of + raising IntegrityError. Returns the count of inserted rows. + :return: Recordset (skip_duplicates=False) or int count (skip_duplicates=True) """ if not vals_list: - return self.env["spp.program.membership"] + return 0 if skip_duplicates else self.env["spp.program.membership"] + + if skip_duplicates: + return self._bulk_insert_on_conflict(vals_list, chunk_size) if chunk_size and chunk_size > 0: all_memberships = self.env["spp.program.membership"] @@ -386,3 +389,53 @@ def bulk_create_memberships(self, vals_list, chunk_size=1000): SPPProgramMembership, self.sudo(), # nosemgrep: odoo-sudo-without-context ).create(vals_list) + + def _bulk_insert_on_conflict(self, vals_list, chunk_size=1000): + """Insert memberships using raw SQL with ON CONFLICT DO NOTHING. + + Bypasses ORM for maximum throughput during bulk enrollment. Duplicates + (matching the UNIQUE constraint on partner_id, program_id) are silently + skipped. + + :param vals_list: List of dicts with at least partner_id, program_id, state + :param chunk_size: Number of records per SQL INSERT batch + :return: Total number of rows actually inserted + """ + cr = self.env.cr + uid = self.env.uid + total_inserted = 0 + + for i in range(0, len(vals_list), chunk_size): + batch = vals_list[i : i + chunk_size] + values = [] + params = [] + for v in batch: + values.append("(%s, %s, %s, %s, %s, now(), now())") + params.extend( + [ + v["partner_id"], + v["program_id"], + v.get("state", "draft"), + uid, + uid, + ] + ) + + sql = """ + INSERT INTO spp_program_membership + (partner_id, program_id, state, + create_uid, write_uid, create_date, write_date) + VALUES {} + ON CONFLICT (partner_id, program_id) DO NOTHING + """.format( # noqa: S608 # nosec B608 + ", ".join(values) + ) + cr.execute(sql, params) + total_inserted += cr.rowcount + + _logger.info( + "Bulk inserted %d program memberships (%d skipped as duplicates)", + total_inserted, + len(vals_list) - total_inserted, + ) + return total_inserted diff --git a/spp_programs/tests/__init__.py b/spp_programs/tests/__init__.py index a3ae74d3..344574d9 100644 --- a/spp_programs/tests/__init__.py +++ b/spp_programs/tests/__init__.py @@ -32,3 +32,4 @@ from . import test_payment_and_accounting from . import test_managers from . import test_cycle_auto_approve_fund_check +from . import test_bulk_membership diff --git a/spp_programs/tests/test_bulk_membership.py b/spp_programs/tests/test_bulk_membership.py new file mode 100644 index 00000000..e494c4d0 --- /dev/null +++ b/spp_programs/tests/test_bulk_membership.py @@ -0,0 +1,178 @@ +# Part of OpenSPP. See LICENSE file for full copyright and licensing details. +"""Tests for Phase 7: Bulk membership creation with INSERT ON CONFLICT. + +These tests verify that bulk_create_memberships() with skip_duplicates=True +uses raw SQL INSERT ... ON CONFLICT DO NOTHING to silently skip duplicates +instead of raising IntegrityError or doing per-record search() checks. +""" + +import uuid + +from odoo import fields +from odoo.tests import TransactionCase + + +class TestBulkProgramMembership(TransactionCase): + """Test bulk_create_memberships on spp.program.membership.""" + + def setUp(self): + super().setUp() + self.program = self.env["spp.program"].create({"name": f"Test Program {uuid.uuid4().hex[:8]}"}) + self.partners = self.env["res.partner"].create( + [{"name": f"Registrant {i}", "is_registrant": True} for i in range(10)] + ) + + def test_bulk_create_inserts_all(self): + """bulk_create_memberships with skip_duplicates inserts all new records.""" + vals_list = [{"partner_id": p.id, "program_id": self.program.id, "state": "draft"} for p in self.partners] + count = self.env["spp.program.membership"].bulk_create_memberships(vals_list, skip_duplicates=True) + self.assertEqual(count, 10) + self.assertEqual( + self.env["spp.program.membership"].search_count([("program_id", "=", self.program.id)]), + 10, + ) + + def test_bulk_create_skips_duplicates(self): + """Duplicate (partner_id, program_id) pairs must be silently skipped.""" + # Create first batch + vals_list = [{"partner_id": p.id, "program_id": self.program.id, "state": "draft"} for p in self.partners[:5]] + self.env["spp.program.membership"].bulk_create_memberships(vals_list, skip_duplicates=True) + + # Create second batch with overlap + vals_list_overlap = [ + {"partner_id": p.id, "program_id": self.program.id, "state": "draft"} + for p in self.partners # includes first 5 again + ] + count = self.env["spp.program.membership"].bulk_create_memberships(vals_list_overlap, skip_duplicates=True) + # Only 5 new records should be inserted + self.assertEqual(count, 5) + self.assertEqual( + self.env["spp.program.membership"].search_count([("program_id", "=", self.program.id)]), + 10, + ) + + def test_bulk_create_all_duplicates_returns_zero(self): + """If all records already exist, return 0.""" + vals_list = [{"partner_id": p.id, "program_id": self.program.id, "state": "draft"} for p in self.partners[:3]] + self.env["spp.program.membership"].bulk_create_memberships(vals_list, skip_duplicates=True) + count = self.env["spp.program.membership"].bulk_create_memberships(vals_list, skip_duplicates=True) + self.assertEqual(count, 0) + + def test_bulk_create_empty_list(self): + """Empty vals_list should return 0.""" + count = self.env["spp.program.membership"].bulk_create_memberships([], skip_duplicates=True) + self.assertEqual(count, 0) + + def test_bulk_create_without_skip_duplicates_uses_orm(self): + """Without skip_duplicates, bulk_create_memberships should use the ORM path.""" + vals_list = [{"partner_id": p.id, "program_id": self.program.id, "state": "draft"} for p in self.partners[:3]] + result = self.env["spp.program.membership"].bulk_create_memberships(vals_list) + # ORM path returns a recordset + self.assertEqual(len(result), 3) + + def test_bulk_create_respects_chunk_size(self): + """With skip_duplicates and chunk_size, should process in chunks.""" + vals_list = [{"partner_id": p.id, "program_id": self.program.id, "state": "draft"} for p in self.partners] + count = self.env["spp.program.membership"].bulk_create_memberships( + vals_list, skip_duplicates=True, chunk_size=3 + ) + self.assertEqual(count, 10) + + +class TestBulkCycleMembership(TransactionCase): + """Test bulk_create_memberships on spp.cycle.membership.""" + + def setUp(self): + super().setUp() + self.program = self.env["spp.program"].create({"name": f"Test Program {uuid.uuid4().hex[:8]}"}) + self.cycle = self.env["spp.cycle"].create( + { + "name": "Test Cycle", + "program_id": self.program.id, + "start_date": fields.Date.today(), + "end_date": fields.Date.today(), + } + ) + self.partners = self.env["res.partner"].create( + [{"name": f"Registrant {i}", "is_registrant": True} for i in range(10)] + ) + + def test_bulk_create_inserts_all(self): + """bulk_create_memberships with skip_duplicates inserts all new records.""" + vals_list = [{"partner_id": p.id, "cycle_id": self.cycle.id, "state": "draft"} for p in self.partners] + count = self.env["spp.cycle.membership"].bulk_create_memberships(vals_list, skip_duplicates=True) + self.assertEqual(count, 10) + + def test_bulk_create_skips_duplicates(self): + """Duplicate (partner_id, cycle_id) pairs must be silently skipped.""" + vals_first = [{"partner_id": p.id, "cycle_id": self.cycle.id, "state": "draft"} for p in self.partners[:5]] + self.env["spp.cycle.membership"].bulk_create_memberships(vals_first, skip_duplicates=True) + + vals_overlap = [{"partner_id": p.id, "cycle_id": self.cycle.id, "state": "draft"} for p in self.partners] + count = self.env["spp.cycle.membership"].bulk_create_memberships(vals_overlap, skip_duplicates=True) + self.assertEqual(count, 5) + + def test_bulk_create_empty_list(self): + """Empty vals_list should return 0.""" + count = self.env["spp.cycle.membership"].bulk_create_memberships([], skip_duplicates=True) + self.assertEqual(count, 0) + + def test_bulk_create_without_skip_duplicates_uses_orm(self): + """Without skip_duplicates, bulk_create_memberships should use the ORM path.""" + vals_list = [{"partner_id": p.id, "cycle_id": self.cycle.id, "state": "draft"} for p in self.partners[:3]] + result = self.env["spp.cycle.membership"].bulk_create_memberships(vals_list) + self.assertEqual(len(result), 3) + + +class TestCallerIntegration(TransactionCase): + """Test that _import_registrants and _add_beneficiaries use bulk_create_memberships.""" + + def setUp(self): + super().setUp() + self.program = self.env["spp.program"].create({"name": f"Test Program {uuid.uuid4().hex[:8]}"}) + self.cycle = self.env["spp.cycle"].create( + { + "name": "Test Cycle", + "program_id": self.program.id, + "start_date": fields.Date.today(), + "end_date": fields.Date.today(), + } + ) + self.partners = self.env["res.partner"].create( + [{"name": f"Registrant {i}", "is_registrant": True} for i in range(5)] + ) + + def test_add_beneficiaries_skips_duplicates(self): + """_add_beneficiaries should not raise on duplicate partner IDs.""" + cycle_manager = self.env["spp.cycle.manager.default"].create( + { + "name": "Test Cycle Manager", + "program_id": self.program.id, + } + ) + + partner_ids = self.partners.ids + # Add beneficiaries twice — second call should not raise + cycle_manager._add_beneficiaries(self.cycle, partner_ids, "draft") + cycle_manager._add_beneficiaries(self.cycle, partner_ids, "draft") + + # Should still only have 5 memberships + count = self.env["spp.cycle.membership"].search_count([("cycle_id", "=", self.cycle.id)]) + self.assertEqual(count, 5) + + def test_import_registrants_skips_duplicates(self): + """_import_registrants should not raise on duplicate registrants.""" + elig_manager = self.env["spp.program.membership.manager.default"].create( + { + "name": "Test Elig Manager", + "program_id": self.program.id, + } + ) + + # Import registrants twice + elig_manager._import_registrants(self.partners, "draft") + elig_manager._import_registrants(self.partners, "draft") + + # Should still only have 5 memberships + count = self.env["spp.program.membership"].search_count([("program_id", "=", self.program.id)]) + self.assertEqual(count, 5) From 5f8bf9ba3e2e8611173ca9d8f21907b04bb8d502 Mon Sep 17 00:00:00 2001 From: Ken Lewerentz Date: Fri, 3 Apr 2026 10:23:25 +0700 Subject: [PATCH 2/4] fix: include enrollment_date in bulk SQL insert, hoist Date.today() Set enrollment_date to current timestamp when state is 'enrolled' in the program membership SQL insert (computed field not triggered by raw SQL). Hoist fields.Date.today() outside the loop in cycle membership to avoid repeated calls per record. --- spp_programs/models/cycle_membership.py | 3 ++- spp_programs/models/program_membership.py | 11 ++++++++--- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/spp_programs/models/cycle_membership.py b/spp_programs/models/cycle_membership.py index e1ac6415..4fdb0bf6 100644 --- a/spp_programs/models/cycle_membership.py +++ b/spp_programs/models/cycle_membership.py @@ -120,6 +120,7 @@ def _bulk_insert_on_conflict(self, vals_list, chunk_size=1000): cr = self.env.cr uid = self.env.uid total_inserted = 0 + today = fields.Date.today() for i in range(0, len(vals_list), chunk_size): batch = vals_list[i : i + chunk_size] @@ -132,7 +133,7 @@ def _bulk_insert_on_conflict(self, vals_list, chunk_size=1000): v["partner_id"], v["cycle_id"], v.get("state", "draft"), - v.get("enrollment_date", fields.Date.today()), + v.get("enrollment_date", today), uid, uid, ] diff --git a/spp_programs/models/program_membership.py b/spp_programs/models/program_membership.py index e6ed9041..48634fa8 100644 --- a/spp_programs/models/program_membership.py +++ b/spp_programs/models/program_membership.py @@ -405,17 +405,22 @@ def _bulk_insert_on_conflict(self, vals_list, chunk_size=1000): uid = self.env.uid total_inserted = 0 + now = fields.Datetime.now() + for i in range(0, len(vals_list), chunk_size): batch = vals_list[i : i + chunk_size] values = [] params = [] for v in batch: - values.append("(%s, %s, %s, %s, %s, now(), now())") + state = v.get("state", "draft") + enrollment_date = now if state == "enrolled" else None + values.append("(%s, %s, %s, %s, %s, %s, now(), now())") params.extend( [ v["partner_id"], v["program_id"], - v.get("state", "draft"), + state, + enrollment_date, uid, uid, ] @@ -423,7 +428,7 @@ def _bulk_insert_on_conflict(self, vals_list, chunk_size=1000): sql = """ INSERT INTO spp_program_membership - (partner_id, program_id, state, + (partner_id, program_id, state, enrollment_date, create_uid, write_uid, create_date, write_date) VALUES {} ON CONFLICT (partner_id, program_id) DO NOTHING From 1c16e0c5f62f23047b706cfac3b8eccb52fdaa29 Mon Sep 17 00:00:00 2001 From: Ken Lewerentz Date: Fri, 3 Apr 2026 12:01:13 +0700 Subject: [PATCH 3/4] perf: add canary patterns to skip statistics during bulk operations Add context flags (skip_registrant_statistics, skip_program_statistics) that allow bulk operation callers to suppress expensive computed field recomputation. Add refresh_beneficiary_counts() on spp.program and refresh_statistics() on spp.cycle to recompute once at completion. Also replace bool(rec.program_membership_ids) with SQL EXISTS in _compute_has_members to avoid loading the full membership recordset. --- spp_programs/__manifest__.py | 2 +- spp_programs/models/cycle.py | 10 ++ .../models/managers/cycle_manager_base.py | 4 +- .../models/managers/eligibility_manager.py | 3 +- spp_programs/models/programs.py | 27 +++- spp_programs/models/registrant.py | 8 ++ spp_programs/tests/__init__.py | 1 + spp_programs/tests/test_canary_patterns.py | 125 ++++++++++++++++++ 8 files changed, 174 insertions(+), 6 deletions(-) create mode 100644 spp_programs/tests/test_canary_patterns.py diff --git a/spp_programs/__manifest__.py b/spp_programs/__manifest__.py index f84a56ae..1de62a0c 100644 --- a/spp_programs/__manifest__.py +++ b/spp_programs/__manifest__.py @@ -4,7 +4,7 @@ "name": "OpenSPP Programs", "summary": "Manage programs, cycles, beneficiary enrollment, entitlements (cash and in-kind), payments, and fund tracking for social protection.", "category": "OpenSPP/Core", - "version": "19.0.2.0.7", + "version": "19.0.2.0.8", "sequence": 1, "author": "OpenSPP.org", "website": "https://github.com/OpenSPP/OpenSPP2", diff --git a/spp_programs/models/cycle.py b/spp_programs/models/cycle.py index 3cf0b6f5..0bbfea5e 100644 --- a/spp_programs/models/cycle.py +++ b/spp_programs/models/cycle.py @@ -275,6 +275,16 @@ def _compute_entitlements_count(self): entitlements_count = self.env["spp.entitlement"].search_count([("cycle_id", "=", rec.id)]) rec.entitlements_count = entitlements_count + def refresh_statistics(self): + """Refresh all cycle statistics after bulk operations. + + Call this after raw SQL inserts that bypass ORM dependency tracking + (e.g. bulk_create_memberships with skip_duplicates=True). + """ + self._compute_members_count() + self._compute_entitlements_count() + self._compute_total_entitlements_count() + @api.depends("entitlement_ids", "inkind_entitlement_ids") def _compute_total_entitlements_count(self): if not self.ids: diff --git a/spp_programs/models/managers/cycle_manager_base.py b/spp_programs/models/managers/cycle_manager_base.py index 10716d47..c162fde6 100644 --- a/spp_programs/models/managers/cycle_manager_base.py +++ b/spp_programs/models/managers/cycle_manager_base.py @@ -325,8 +325,8 @@ def mark_import_as_done(self, cycle, msg): cycle.locked_reason = None cycle.message_post(body=msg) - # Update Statistics - cycle._compute_members_count() + # Refresh statistics after bulk operations + cycle.refresh_statistics() def mark_prepare_entitlement_as_done(self, cycle, msg): """Complete the preparation of entitlements. diff --git a/spp_programs/models/managers/eligibility_manager.py b/spp_programs/models/managers/eligibility_manager.py index 5392af4a..54dde250 100644 --- a/spp_programs/models/managers/eligibility_manager.py +++ b/spp_programs/models/managers/eligibility_manager.py @@ -165,8 +165,7 @@ def _import_registrants_async(self, new_beneficiaries, state="draft"): def mark_import_as_done(self): self.ensure_one() - self.program_id._compute_eligible_beneficiary_count() - self.program_id._compute_beneficiary_count() + self.program_id.refresh_beneficiary_counts() self.program_id.is_locked = False self.program_id.locked_reason = None diff --git a/spp_programs/models/programs.py b/spp_programs/models/programs.py index e086c6d2..615f5c93 100644 --- a/spp_programs/models/programs.py +++ b/spp_programs/models/programs.py @@ -187,8 +187,23 @@ def _check_unique_program_name(self): @api.depends("program_membership_ids") def _compute_has_members(self): + if self.env.context.get("skip_program_statistics"): + return + if not self.ids: + for rec in self: + rec.has_members = False + return + self.env.cr.execute( + """ + SELECT program_id FROM spp_program_membership + WHERE program_id IN %s + GROUP BY program_id + """, + (tuple(self.ids),), + ) + programs_with_members = {row[0] for row in self.env.cr.fetchall()} for rec in self: - rec.has_members = bool(rec.program_membership_ids) + rec.has_members = rec.id in programs_with_members @api.depends("compliance_manager_ids", "compliance_manager_ids.manager_ref_id") def _compute_has_compliance_criteria(self): @@ -273,6 +288,16 @@ def _compute_beneficiary_count(self): count = rec.count_beneficiaries(None)["value"] rec.update({"beneficiaries_count": count}) + def refresh_beneficiary_counts(self): + """Refresh all beneficiary statistics after bulk operations. + + Call this after raw SQL inserts that bypass ORM dependency tracking + (e.g. bulk_create_memberships with skip_duplicates=True). + """ + self._compute_beneficiary_count() + self._compute_eligible_beneficiary_count() + self._compute_has_members() + @api.depends("cycle_ids") def _compute_cycle_count(self): for rec in self: diff --git a/spp_programs/models/registrant.py b/spp_programs/models/registrant.py index 15dd4885..20a8cd31 100644 --- a/spp_programs/models/registrant.py +++ b/spp_programs/models/registrant.py @@ -43,6 +43,8 @@ def _compute_total_entitlements_count(self): @api.depends("program_membership_ids") def _compute_program_membership_count(self): """Batch-efficient program membership count using read_group.""" + if self.env.context.get("skip_registrant_statistics"): + return if not self: return @@ -66,6 +68,8 @@ def _compute_program_membership_count(self): @api.depends("entitlement_ids") def _compute_entitlements_count(self): """Batch-efficient entitlements count using _read_group.""" + if self.env.context.get("skip_registrant_statistics"): + return if not self: return @@ -89,6 +93,8 @@ def _compute_entitlements_count(self): @api.depends("cycle_ids") def _compute_cycle_count(self): """Batch-efficient cycle membership count using _read_group.""" + if self.env.context.get("skip_registrant_statistics"): + return if not self: return @@ -112,6 +118,8 @@ def _compute_cycle_count(self): @api.depends("inkind_entitlement_ids") def _compute_inkind_entitlements_count(self): """Batch-efficient in-kind entitlements count using _read_group.""" + if self.env.context.get("skip_registrant_statistics"): + return if not self: return diff --git a/spp_programs/tests/__init__.py b/spp_programs/tests/__init__.py index 344574d9..0bc0f3e1 100644 --- a/spp_programs/tests/__init__.py +++ b/spp_programs/tests/__init__.py @@ -33,3 +33,4 @@ from . import test_managers from . import test_cycle_auto_approve_fund_check from . import test_bulk_membership +from . import test_canary_patterns diff --git a/spp_programs/tests/test_canary_patterns.py b/spp_programs/tests/test_canary_patterns.py new file mode 100644 index 00000000..8008bbc5 --- /dev/null +++ b/spp_programs/tests/test_canary_patterns.py @@ -0,0 +1,125 @@ +# Part of OpenSPP. See LICENSE file for full copyright and licensing details. +"""Tests for Phase 8: Canary patterns for bulk operations. + +During bulk operations, expensive computed field recomputation should be +skipped via context flags and refreshed once at completion. +""" + +import uuid + +from odoo import fields +from odoo.tests import TransactionCase + + +class TestRegistrantCanaryFlags(TransactionCase): + """Test that registrant statistics skip recomputation with context flags.""" + + def setUp(self): + super().setUp() + self.program = self.env["spp.program"].create({"name": f"Test Program {uuid.uuid4().hex[:8]}"}) + self.partner = self.env["res.partner"].create({"name": "Test Registrant", "is_registrant": True}) + + def test_skip_registrant_statistics_skips_program_membership_count(self): + """With skip_registrant_statistics, _compute_program_membership_count should be a no-op.""" + self.partner.with_context(skip_registrant_statistics=True)._compute_program_membership_count() + # Value should remain at default (0) since compute was skipped + self.assertEqual(self.partner.program_membership_count, 0) + + def test_skip_registrant_statistics_skips_entitlements_count(self): + """With skip_registrant_statistics, _compute_entitlements_count should be a no-op.""" + self.partner.with_context(skip_registrant_statistics=True)._compute_entitlements_count() + self.assertEqual(self.partner.entitlements_count, 0) + + def test_skip_registrant_statistics_skips_cycle_count(self): + """With skip_registrant_statistics, _compute_cycle_count should be a no-op.""" + self.partner.with_context(skip_registrant_statistics=True)._compute_cycle_count() + self.assertEqual(self.partner.cycles_count, 0) + + def test_skip_registrant_statistics_skips_inkind_count(self): + """With skip_registrant_statistics, _compute_inkind_entitlements_count should be a no-op.""" + self.partner.with_context(skip_registrant_statistics=True)._compute_inkind_entitlements_count() + self.assertEqual(self.partner.inkind_entitlements_count, 0) + + def test_without_flag_computes_normally(self): + """Without the flag, compute methods should work normally.""" + self.env["spp.program.membership"].create( + { + "partner_id": self.partner.id, + "program_id": self.program.id, + "state": "draft", + } + ) + self.partner._compute_program_membership_count() + self.assertEqual(self.partner.program_membership_count, 1) + + +class TestProgramCanaryFlags(TransactionCase): + """Test that program statistics skip recomputation with context flags.""" + + def setUp(self): + super().setUp() + self.program = self.env["spp.program"].create({"name": f"Test Program {uuid.uuid4().hex[:8]}"}) + + def test_skip_program_statistics_skips_has_members(self): + """With skip_program_statistics, _compute_has_members should be a no-op.""" + self.program.with_context(skip_program_statistics=True)._compute_has_members() + self.assertFalse(self.program.has_members) + + def test_has_members_uses_sql_exists(self): + """_compute_has_members should detect members without loading the recordset.""" + partner = self.env["res.partner"].create({"name": "Registrant", "is_registrant": True}) + self.env["spp.program.membership"].create( + { + "partner_id": partner.id, + "program_id": self.program.id, + "state": "draft", + } + ) + self.program._compute_has_members() + self.assertTrue(self.program.has_members) + + +class TestRefreshMethods(TransactionCase): + """Test refresh_beneficiary_counts and refresh_statistics methods.""" + + def setUp(self): + super().setUp() + self.program = self.env["spp.program"].create({"name": f"Test Program {uuid.uuid4().hex[:8]}"}) + self.cycle = self.env["spp.cycle"].create( + { + "name": "Test Cycle", + "program_id": self.program.id, + "start_date": fields.Date.today(), + "end_date": fields.Date.today(), + } + ) + self.partners = self.env["res.partner"].create( + [{"name": f"Registrant {i}", "is_registrant": True} for i in range(5)] + ) + + def test_program_refresh_beneficiary_counts(self): + """refresh_beneficiary_counts should update all program statistics.""" + # Create memberships via SQL (bypassing ORM triggers) + for p in self.partners: + self.env["spp.program.membership"].bulk_create_memberships( + [{"partner_id": p.id, "program_id": self.program.id, "state": "enrolled"}], + skip_duplicates=True, + ) + + # Counts are stale (SQL insert bypassed ORM) + self.program.refresh_beneficiary_counts() + + self.assertEqual(self.program.beneficiaries_count, 5) + self.assertEqual(self.program.eligible_beneficiaries_count, 5) + self.assertTrue(self.program.has_members) + + def test_cycle_refresh_statistics(self): + """refresh_statistics should update cycle member and entitlement counts.""" + for p in self.partners: + self.env["spp.cycle.membership"].bulk_create_memberships( + [{"partner_id": p.id, "cycle_id": self.cycle.id, "state": "enrolled"}], + skip_duplicates=True, + ) + + self.cycle.refresh_statistics() + self.assertEqual(self.cycle.members_count, 5) From 38bc5175058be7aa55bdff27072a3c7ecf9f7f30 Mon Sep 17 00:00:00 2001 From: Ken Lewerentz Date: Fri, 3 Apr 2026 12:15:42 +0700 Subject: [PATCH 4/4] perf: increase job concurrency, add channel routing and identity keys Increase parallel-safe channel limits from 1 to 4 (cycle, eligibility_manager, program_manager) now that INSERT ON CONFLICT makes these operations safe for concurrent execution. Add two serial channels: - entitlement_approval (limit=1): fund balance tracking must be serial - statistics_refresh (limit=1): avoid concurrent refresh storms Route entitlement approval/validation jobs to entitlement_approval channel. Route all completion handlers (mark_*_as_done) to statistics_refresh channel. Add identity_key to all async dispatch methods to prevent duplicate job submission when users double-click action buttons. --- spp_programs/__manifest__.py | 2 +- spp_programs/data/queue_data.xml | 14 ++++++++-- .../models/managers/cycle_manager_base.py | 27 ++++++++++++++----- .../models/managers/eligibility_manager.py | 9 ++++--- .../managers/entitlement_manager_base.py | 16 ++++++++--- .../managers/entitlement_manager_cash.py | 6 ++++- .../managers/entitlement_manager_inkind.py | 10 +++++-- .../models/managers/program_manager.py | 9 ++++--- 8 files changed, 70 insertions(+), 23 deletions(-) diff --git a/spp_programs/__manifest__.py b/spp_programs/__manifest__.py index 1de62a0c..b32ff972 100644 --- a/spp_programs/__manifest__.py +++ b/spp_programs/__manifest__.py @@ -4,7 +4,7 @@ "name": "OpenSPP Programs", "summary": "Manage programs, cycles, beneficiary enrollment, entitlements (cash and in-kind), payments, and fund tracking for social protection.", "category": "OpenSPP/Core", - "version": "19.0.2.0.8", + "version": "19.0.2.0.9", "sequence": 1, "author": "OpenSPP.org", "website": "https://github.com/OpenSPP/OpenSPP2", diff --git a/spp_programs/data/queue_data.xml b/spp_programs/data/queue_data.xml index 9dd6f2df..afc44466 100644 --- a/spp_programs/data/queue_data.xml +++ b/spp_programs/data/queue_data.xml @@ -1,16 +1,26 @@ cycle - 1 + 4 0 eligibility_manager - 1 + 4 0 program_manager + 4 + 0 + + + entitlement_approval + 1 + 0 + + + statistics_refresh 1 0 diff --git a/spp_programs/models/managers/cycle_manager_base.py b/spp_programs/models/managers/cycle_manager_base.py index c162fde6..50e54d63 100644 --- a/spp_programs/models/managers/cycle_manager_base.py +++ b/spp_programs/models/managers/cycle_manager_base.py @@ -518,10 +518,13 @@ def _check_eligibility_async(self, cycle, beneficiaries_count): jobs = [] for i in range(0, beneficiaries_count, self.MAX_ROW_JOB_QUEUE): jobs.append( - self.delayable(channel="cycle")._check_eligibility(cycle, offset=i, limit=self.MAX_ROW_JOB_QUEUE) + self.delayable( + channel="cycle", + identity_key=f"check_elig_{cycle.id}_{i}", + )._check_eligibility(cycle, offset=i, limit=self.MAX_ROW_JOB_QUEUE) ) main_job = group(*jobs) - main_job.on_done(self.delayable(channel="cycle").mark_check_eligibility_as_done(cycle)) + main_job.on_done(self.delayable(channel="statistics_refresh").mark_check_eligibility_as_done(cycle)) main_job.delay() def _check_eligibility(self, cycle, beneficiaries=None, offset=0, limit=None, do_count=False): @@ -587,10 +590,17 @@ def _prepare_entitlements_async(self, cycle, beneficiaries_count): jobs = [] for i in range(0, beneficiaries_count, self.MAX_ROW_JOB_QUEUE): - jobs.append(self.delayable(channel="cycle")._prepare_entitlements(cycle, i, self.MAX_ROW_JOB_QUEUE)) + jobs.append( + self.delayable( + channel="cycle", + identity_key=f"prepare_ent_{cycle.id}_{i}", + )._prepare_entitlements(cycle, i, self.MAX_ROW_JOB_QUEUE) + ) main_job = group(*jobs) main_job.on_done( - self.delayable(channel="cycle").mark_prepare_entitlement_as_done(cycle, _("Entitlement Ready.")) + self.delayable(channel="statistics_refresh").mark_prepare_entitlement_as_done( + cycle, _("Entitlement Ready.") + ) ) main_job.delay() @@ -820,7 +830,10 @@ def _add_beneficiaries_async(self, cycle, beneficiaries, state): jobs = [] for i in range(0, beneficiaries_count, self.MAX_ROW_JOB_QUEUE): jobs.append( - self.delayable(channel="cycle")._add_beneficiaries( + self.delayable( + channel="cycle", + identity_key=f"add_benef_{cycle.id}_{i}", + )._add_beneficiaries( cycle, beneficiaries[i : i + self.MAX_ROW_JOB_QUEUE], state, @@ -828,7 +841,9 @@ def _add_beneficiaries_async(self, cycle, beneficiaries, state): ) main_job = group(*jobs) - main_job.on_done(self.delayable(channel="cycle").mark_import_as_done(cycle, _("Beneficiary import finished."))) + main_job.on_done( + self.delayable(channel="statistics_refresh").mark_import_as_done(cycle, _("Beneficiary import finished.")) + ) main_job.delay() def _add_beneficiaries(self, cycle, beneficiaries, state="draft", do_count=False): diff --git a/spp_programs/models/managers/eligibility_manager.py b/spp_programs/models/managers/eligibility_manager.py index 54dde250..c93a65e8 100644 --- a/spp_programs/models/managers/eligibility_manager.py +++ b/spp_programs/models/managers/eligibility_manager.py @@ -155,12 +155,13 @@ def _import_registrants_async(self, new_beneficiaries, state="draft"): jobs = [] for i in range(0, len(new_beneficiaries), 10000): jobs.append( - self.delayable(channel="eligibility_manager")._import_registrants( - new_beneficiaries[i : i + 10000], state - ) + self.delayable( + channel="eligibility_manager", + identity_key=f"import_reg_{program.id}_{i}", + )._import_registrants(new_beneficiaries[i : i + 10000], state) ) main_job = group(*jobs) - main_job.on_done(self.delayable(channel="eligibility_manager").mark_import_as_done()) + main_job.on_done(self.delayable(channel="statistics_refresh").mark_import_as_done()) main_job.delay() def mark_import_as_done(self): diff --git a/spp_programs/models/managers/entitlement_manager_base.py b/spp_programs/models/managers/entitlement_manager_base.py index 8a8d6483..5ddb61b6 100644 --- a/spp_programs/models/managers/entitlement_manager_base.py +++ b/spp_programs/models/managers/entitlement_manager_base.py @@ -89,7 +89,9 @@ def _set_pending_validation_entitlements_async(self, cycle, entitlements): jobs = [] for i in range(0, entitlements_count, self.MAX_ROW_JOB_QUEUE): jobs.append( - self.delayable()._set_pending_validation_entitlements(entitlements[i : i + self.MAX_ROW_JOB_QUEUE]) + self.delayable(channel="entitlement_approval")._set_pending_validation_entitlements( + entitlements[i : i + self.MAX_ROW_JOB_QUEUE] + ) ) main_job = group(*jobs) main_job.on_done(self.delayable().mark_job_as_done(cycle, _("Entitlements Set to Pending Validation."))) @@ -137,7 +139,11 @@ def _validate_entitlements_async(self, cycle, entitlements, entitlements_count): jobs = [] for i in range(0, entitlements_count, self.MAX_ROW_JOB_QUEUE): - jobs.append(self.delayable()._validate_entitlements(entitlements[i : i + self.MAX_ROW_JOB_QUEUE])) + jobs.append( + self.delayable(channel="entitlement_approval")._validate_entitlements( + entitlements[i : i + self.MAX_ROW_JOB_QUEUE] + ) + ) main_job = group(*jobs) main_job.on_done(self.delayable().mark_job_as_done(cycle, _("Entitlements Validated and Approved."))) main_job.delay() @@ -197,7 +203,11 @@ def _cancel_entitlements_async(self, cycle, entitlements, entitlements_count): jobs = [] for i in range(0, entitlements_count, self.MAX_ROW_JOB_QUEUE): - jobs.append(self.delayable()._cancel_entitlements(entitlements[i : i + self.MAX_ROW_JOB_QUEUE])) + jobs.append( + self.delayable(channel="entitlement_approval")._cancel_entitlements( + entitlements[i : i + self.MAX_ROW_JOB_QUEUE] + ) + ) main_job = group(*jobs) main_job.on_done(self.delayable().mark_job_as_done(cycle, _("Entitlements Cancelled."))) main_job.delay() diff --git a/spp_programs/models/managers/entitlement_manager_cash.py b/spp_programs/models/managers/entitlement_manager_cash.py index c17c7ec4..7449263d 100644 --- a/spp_programs/models/managers/entitlement_manager_cash.py +++ b/spp_programs/models/managers/entitlement_manager_cash.py @@ -319,7 +319,11 @@ def _validate_entitlements_async(self, cycle, entitlements, entitlements_count): jobs = [] for i in range(0, entitlements_count, self.MAX_ROW_JOB_QUEUE): # Needs to override - jobs.append(self.delayable()._validate_entitlements(cycle, entitlements[i : i + self.MAX_ROW_JOB_QUEUE])) + jobs.append( + self.delayable(channel="entitlement_approval")._validate_entitlements( + cycle, entitlements[i : i + self.MAX_ROW_JOB_QUEUE] + ) + ) main_job = group(*jobs) main_job.on_done(self.delayable().mark_job_as_done(cycle, _("Entitlements Validated and Approved."))) main_job.delay() diff --git a/spp_programs/models/managers/entitlement_manager_inkind.py b/spp_programs/models/managers/entitlement_manager_inkind.py index cdb09077..f3630ee2 100644 --- a/spp_programs/models/managers/entitlement_manager_inkind.py +++ b/spp_programs/models/managers/entitlement_manager_inkind.py @@ -216,7 +216,9 @@ def _set_pending_validation_entitlements_async(self, cycle, entitlements_count): jobs = [] for i in range(0, entitlements_count, self.MAX_ROW_JOB_QUEUE): jobs.append( - self.delayable()._set_pending_validation_entitlements(cycle, offset=i, limit=self.MAX_ROW_JOB_QUEUE) + self.delayable(channel="entitlement_approval")._set_pending_validation_entitlements( + cycle, offset=i, limit=self.MAX_ROW_JOB_QUEUE + ) ) main_job = group(*jobs) main_job.on_done(self.delayable().mark_job_as_done(cycle, _("Entitlements Set to Pending Validation."))) @@ -315,7 +317,11 @@ def _validate_entitlements_async(self, cycle, entitlements_count): jobs = [] for i in range(0, entitlements_count, self.MAX_ROW_JOB_QUEUE): - jobs.append(self.delayable()._validate_entitlements(cycle, offset=i, limit=self.MAX_ROW_JOB_QUEUE)) + jobs.append( + self.delayable(channel="entitlement_approval")._validate_entitlements( + cycle, offset=i, limit=self.MAX_ROW_JOB_QUEUE + ) + ) main_job = group(*jobs) main_job.on_done(self.delayable().mark_job_as_done(cycle, _("Entitlements Validated and Approved."))) main_job.delay() diff --git a/spp_programs/models/managers/program_manager.py b/spp_programs/models/managers/program_manager.py index eccb41a2..1362d213 100644 --- a/spp_programs/models/managers/program_manager.py +++ b/spp_programs/models/managers/program_manager.py @@ -187,12 +187,13 @@ def _enroll_eligible_registrants_async(self, states, members_count): jobs = [] for i in range(0, members_count, self.MAX_ROW_JOB_QUEUE): jobs.append( - self.delayable(channel="program_manager")._enroll_eligible_registrants( - states, i, self.MAX_ROW_JOB_QUEUE - ) + self.delayable( + channel="program_manager", + identity_key=f"enroll_eligible_{program.id}_{i}", + )._enroll_eligible_registrants(states, i, self.MAX_ROW_JOB_QUEUE) ) main_job = group(*jobs) - main_job.on_done(self.delayable(channel="program_manager").mark_enroll_eligible_as_done()) + main_job.on_done(self.delayable(channel="statistics_refresh").mark_enroll_eligible_as_done()) main_job.delay() def _enroll_eligible_registrants(self, states, offset=0, limit=None, do_count=False):