Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion spp_programs/__manifest__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"name": "OpenSPP Programs",
"summary": "Manage programs, cycles, beneficiary enrollment, entitlements (cash and in-kind), payments, and fund tracking for social protection.",
"category": "OpenSPP/Core",
"version": "19.0.2.0.6",
"version": "19.0.2.0.9",
"sequence": 1,
"author": "OpenSPP.org",
"website": "https://github.com/OpenSPP/OpenSPP2",
Expand Down
14 changes: 12 additions & 2 deletions spp_programs/data/queue_data.xml
Original file line number Diff line number Diff line change
@@ -1,16 +1,26 @@
<odoo>
<record id="limit_cycle" model="queue.limit">
<field name="name">cycle</field>
<field name="limit">1</field>
<field name="limit">4</field>
<field name="rate_limit">0</field>
</record>
<record id="limit_eligibility_manager" model="queue.limit">
<field name="name">eligibility_manager</field>
<field name="limit">1</field>
<field name="limit">4</field>
<field name="rate_limit">0</field>
</record>
<record id="limit_program_manager" model="queue.limit">
<field name="name">program_manager</field>
<field name="limit">4</field>
<field name="rate_limit">0</field>
</record>
<record id="limit_entitlement_approval" model="queue.limit">
<field name="name">entitlement_approval</field>
<field name="limit">1</field>
<field name="rate_limit">0</field>
</record>
<record id="limit_statistics_refresh" model="queue.limit">
<field name="name">statistics_refresh</field>
<field name="limit">1</field>
<field name="rate_limit">0</field>
</record>
Expand Down
10 changes: 10 additions & 0 deletions spp_programs/models/cycle.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,16 @@ def _compute_entitlements_count(self):
entitlements_count = self.env["spp.entitlement"].search_count([("cycle_id", "=", rec.id)])
rec.entitlements_count = entitlements_count

def refresh_statistics(self):
"""Refresh all cycle statistics after bulk operations.

Call this after raw SQL inserts that bypass ORM dependency tracking
(e.g. bulk_create_memberships with skip_duplicates=True).
"""
self._compute_members_count()
self._compute_entitlements_count()
self._compute_total_entitlements_count()

@api.depends("entitlement_ids", "inkind_entitlement_ids")
def _compute_total_entitlements_count(self):
if not self.ids:
Expand Down
73 changes: 72 additions & 1 deletion spp_programs/models/cycle_membership.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
# Part of OpenSPP. See LICENSE file for full copyright and licensing details.
from odoo import _, fields, models
import logging

from odoo import _, api, fields, models
from odoo.exceptions import ValidationError

_logger = logging.getLogger(__name__)


class SPPCycleMembership(models.Model):
_name = "spp.cycle.membership"
Expand Down Expand Up @@ -87,6 +91,73 @@ def open_registrant_form(self):
},
}

@api.model
def bulk_create_memberships(self, vals_list, chunk_size=1000, skip_duplicates=False):
"""Create cycle memberships in bulk with optional duplicate skipping.

:param vals_list: List of dicts with membership values
:param chunk_size: Number of records per batch (default 1000)
:param skip_duplicates: When True, use INSERT ... ON CONFLICT DO NOTHING
to silently skip duplicate (partner_id, cycle_id) pairs.
Returns the count of inserted rows.
:return: Recordset (skip_duplicates=False) or int count (skip_duplicates=True)
"""
if not vals_list:
return 0 if skip_duplicates else self.env["spp.cycle.membership"]

if skip_duplicates:
return self._bulk_insert_on_conflict(vals_list, chunk_size)

return self.create(vals_list)

def _bulk_insert_on_conflict(self, vals_list, chunk_size=1000):
"""Insert cycle memberships using raw SQL with ON CONFLICT DO NOTHING.

:param vals_list: List of dicts with at least partner_id, cycle_id, state
:param chunk_size: Number of records per SQL INSERT batch
:return: Total number of rows actually inserted
"""
cr = self.env.cr
uid = self.env.uid
total_inserted = 0
today = fields.Date.today()

for i in range(0, len(vals_list), chunk_size):
batch = vals_list[i : i + chunk_size]
values = []
params = []
for v in batch:
values.append("(%s, %s, %s, %s, %s, %s, now(), now())")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

When using raw SQL in Odoo, it is recommended to explicitly use (now() at time zone 'utc') for create_date and write_date. This ensures that the timestamps are stored in UTC regardless of the database session's timezone configuration, adhering to Odoo's standard data storage practices.

Suggested change
values.append("(%s, %s, %s, %s, %s, %s, now(), now())")
values.append("(%s, %s, %s, %s, %s, %s, now() at time zone 'utc', now() at time zone 'utc')")

params.extend(
[
v["partner_id"],
v["cycle_id"],
v.get("state", "draft"),
v.get("enrollment_date", today),
uid,
uid,
]
)

sql = """
INSERT INTO spp_cycle_membership
(partner_id, cycle_id, state, enrollment_date,
create_uid, write_uid, create_date, write_date)
VALUES {}
ON CONFLICT (partner_id, cycle_id) DO NOTHING
""".format( # noqa: S608 # nosec B608
", ".join(values)
)
cr.execute(sql, params)
total_inserted += cr.rowcount

_logger.info(
"Bulk inserted %d cycle memberships (%d skipped as duplicates)",
total_inserted,
len(vals_list) - total_inserted,
)
return total_inserted

def unlink(self):
if not self:
return
Expand Down
66 changes: 41 additions & 25 deletions spp_programs/models/managers/cycle_manager_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,8 +325,8 @@ def mark_import_as_done(self, cycle, msg):
cycle.locked_reason = None
cycle.message_post(body=msg)

# Update Statistics
cycle._compute_members_count()
# Refresh statistics after bulk operations
cycle.refresh_statistics()

def mark_prepare_entitlement_as_done(self, cycle, msg):
"""Complete the preparation of entitlements.
Expand Down Expand Up @@ -518,10 +518,13 @@ def _check_eligibility_async(self, cycle, beneficiaries_count):
jobs = []
for i in range(0, beneficiaries_count, self.MAX_ROW_JOB_QUEUE):
jobs.append(
self.delayable(channel="cycle")._check_eligibility(cycle, offset=i, limit=self.MAX_ROW_JOB_QUEUE)
self.delayable(
channel="cycle",
identity_key=f"check_elig_{cycle.id}_{i}",
)._check_eligibility(cycle, offset=i, limit=self.MAX_ROW_JOB_QUEUE)
)
main_job = group(*jobs)
main_job.on_done(self.delayable(channel="cycle").mark_check_eligibility_as_done(cycle))
main_job.on_done(self.delayable(channel="statistics_refresh").mark_check_eligibility_as_done(cycle))
main_job.delay()

def _check_eligibility(self, cycle, beneficiaries=None, offset=0, limit=None, do_count=False):
Expand Down Expand Up @@ -587,10 +590,17 @@ def _prepare_entitlements_async(self, cycle, beneficiaries_count):

jobs = []
for i in range(0, beneficiaries_count, self.MAX_ROW_JOB_QUEUE):
jobs.append(self.delayable(channel="cycle")._prepare_entitlements(cycle, i, self.MAX_ROW_JOB_QUEUE))
jobs.append(
self.delayable(
channel="cycle",
identity_key=f"prepare_ent_{cycle.id}_{i}",
)._prepare_entitlements(cycle, i, self.MAX_ROW_JOB_QUEUE)
)
main_job = group(*jobs)
main_job.on_done(
self.delayable(channel="cycle").mark_prepare_entitlement_as_done(cycle, _("Entitlement Ready."))
self.delayable(channel="statistics_refresh").mark_prepare_entitlement_as_done(
cycle, _("Entitlement Ready.")
)
)
main_job.delay()

Expand Down Expand Up @@ -820,40 +830,46 @@ def _add_beneficiaries_async(self, cycle, beneficiaries, state):
jobs = []
for i in range(0, beneficiaries_count, self.MAX_ROW_JOB_QUEUE):
jobs.append(
self.delayable(channel="cycle")._add_beneficiaries(
self.delayable(
channel="cycle",
identity_key=f"add_benef_{cycle.id}_{i}",
)._add_beneficiaries(
cycle,
beneficiaries[i : i + self.MAX_ROW_JOB_QUEUE],
state,
)
)

main_job = group(*jobs)
main_job.on_done(self.delayable(channel="cycle").mark_import_as_done(cycle, _("Beneficiary import finished.")))
main_job.on_done(
self.delayable(channel="statistics_refresh").mark_import_as_done(cycle, _("Beneficiary import finished."))
)
main_job.delay()

def _add_beneficiaries(self, cycle, beneficiaries, state="draft", do_count=False):
"""Add Beneficiaries

:param cycle: Recordset of cycle
:param beneficiaries: Recordset of beneficiaries
:param beneficiaries: List of partner IDs
:param state: String state to be set to beneficiary
:param do_count: Boolean - set to False to not run compute functions
:return: Integer - count of not enrolled members
"""
new_beneficiaries = []
for r in beneficiaries:
new_beneficiaries.append(
[
0,
0,
{
"partner_id": r,
"enrollment_date": fields.Date.today(),
"state": state,
},
]
)
cycle.update({"cycle_membership_ids": new_beneficiaries})
:return: Integer - count of inserted members
"""
today = fields.Date.today()
vals_list = [
{
"partner_id": partner_id,
"cycle_id": cycle.id,
"enrollment_date": today,
"state": state,
}
for partner_id in beneficiaries
]
self.env["spp.cycle.membership"].bulk_create_memberships(vals_list, skip_duplicates=True)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The method _add_beneficiaries is missing a return statement. According to its docstring, it should return an integer representing the count of inserted members. Returning the result of bulk_create_memberships will satisfy this requirement.

Suggested change
self.env["spp.cycle.membership"].bulk_create_memberships(vals_list, skip_duplicates=True)
return self.env["spp.cycle.membership"].bulk_create_memberships(vals_list, skip_duplicates=True)


# Raw SQL bypasses the ORM cache — invalidate so subsequent reads
# (e.g. cycle.cycle_membership_ids) reflect the new rows.
cycle.invalidate_recordset(["cycle_membership_ids"])

if do_count:
# Update Statistics
Expand Down
26 changes: 14 additions & 12 deletions spp_programs/models/managers/eligibility_manager.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Part of OpenSPP. See LICENSE file for full copyright and licensing details.
import logging

from odoo import Command, _, api, fields, models
from odoo import _, api, fields, models

from odoo.addons.job_worker.delay import group

Expand Down Expand Up @@ -155,30 +155,32 @@ def _import_registrants_async(self, new_beneficiaries, state="draft"):
jobs = []
for i in range(0, len(new_beneficiaries), 10000):
jobs.append(
self.delayable(channel="eligibility_manager")._import_registrants(
new_beneficiaries[i : i + 10000], state
)
self.delayable(
channel="eligibility_manager",
identity_key=f"import_reg_{program.id}_{i}",
)._import_registrants(new_beneficiaries[i : i + 10000], state)
)
main_job = group(*jobs)
main_job.on_done(self.delayable(channel="eligibility_manager").mark_import_as_done())
main_job.on_done(self.delayable(channel="statistics_refresh").mark_import_as_done())
main_job.delay()

def mark_import_as_done(self):
self.ensure_one()
self.program_id._compute_eligible_beneficiary_count()
self.program_id._compute_beneficiary_count()
self.program_id.refresh_beneficiary_counts()

self.program_id.is_locked = False
self.program_id.locked_reason = None
self.program_id.message_post(body=_("Import finished."))

def _import_registrants(self, new_beneficiaries, state="draft", do_count=False):
_logger.info("Importing %s beneficiaries", len(new_beneficiaries))
_logger.info("updated")
beneficiaries_val = []
for beneficiary in new_beneficiaries:
beneficiaries_val.append(Command.create({"partner_id": beneficiary.id, "state": state}))
self.program_id.update({"program_membership_ids": beneficiaries_val})
vals_list = [{"partner_id": b.id, "program_id": self.program_id.id, "state": state} for b in new_beneficiaries]
count = self.env["spp.program.membership"].bulk_create_memberships(vals_list, skip_duplicates=True)
_logger.info("Imported %d new memberships (%d duplicates skipped)", count, len(vals_list) - count)

# Raw SQL bypasses the ORM cache — invalidate so subsequent reads
# (e.g. program.program_membership_ids) reflect the new rows.
self.program_id.invalidate_recordset(["program_membership_ids"])

if do_count:
# Compute Statistics
Expand Down
16 changes: 13 additions & 3 deletions spp_programs/models/managers/entitlement_manager_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,9 @@ def _set_pending_validation_entitlements_async(self, cycle, entitlements):
jobs = []
for i in range(0, entitlements_count, self.MAX_ROW_JOB_QUEUE):
jobs.append(
self.delayable()._set_pending_validation_entitlements(entitlements[i : i + self.MAX_ROW_JOB_QUEUE])
self.delayable(channel="entitlement_approval")._set_pending_validation_entitlements(
entitlements[i : i + self.MAX_ROW_JOB_QUEUE]
)
)
main_job = group(*jobs)
main_job.on_done(self.delayable().mark_job_as_done(cycle, _("Entitlements Set to Pending Validation.")))
Expand Down Expand Up @@ -137,7 +139,11 @@ def _validate_entitlements_async(self, cycle, entitlements, entitlements_count):

jobs = []
for i in range(0, entitlements_count, self.MAX_ROW_JOB_QUEUE):
jobs.append(self.delayable()._validate_entitlements(entitlements[i : i + self.MAX_ROW_JOB_QUEUE]))
jobs.append(
self.delayable(channel="entitlement_approval")._validate_entitlements(
entitlements[i : i + self.MAX_ROW_JOB_QUEUE]
)
)
main_job = group(*jobs)
main_job.on_done(self.delayable().mark_job_as_done(cycle, _("Entitlements Validated and Approved.")))
main_job.delay()
Expand Down Expand Up @@ -197,7 +203,11 @@ def _cancel_entitlements_async(self, cycle, entitlements, entitlements_count):

jobs = []
for i in range(0, entitlements_count, self.MAX_ROW_JOB_QUEUE):
jobs.append(self.delayable()._cancel_entitlements(entitlements[i : i + self.MAX_ROW_JOB_QUEUE]))
jobs.append(
self.delayable(channel="entitlement_approval")._cancel_entitlements(
entitlements[i : i + self.MAX_ROW_JOB_QUEUE]
)
)
main_job = group(*jobs)
main_job.on_done(self.delayable().mark_job_as_done(cycle, _("Entitlements Cancelled.")))
main_job.delay()
Expand Down
6 changes: 5 additions & 1 deletion spp_programs/models/managers/entitlement_manager_cash.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,11 @@ def _validate_entitlements_async(self, cycle, entitlements, entitlements_count):
jobs = []
for i in range(0, entitlements_count, self.MAX_ROW_JOB_QUEUE):
# Needs to override
jobs.append(self.delayable()._validate_entitlements(cycle, entitlements[i : i + self.MAX_ROW_JOB_QUEUE]))
jobs.append(
self.delayable(channel="entitlement_approval")._validate_entitlements(
cycle, entitlements[i : i + self.MAX_ROW_JOB_QUEUE]
)
)
main_job = group(*jobs)
main_job.on_done(self.delayable().mark_job_as_done(cycle, _("Entitlements Validated and Approved.")))
main_job.delay()
Expand Down
10 changes: 8 additions & 2 deletions spp_programs/models/managers/entitlement_manager_inkind.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,9 @@ def _set_pending_validation_entitlements_async(self, cycle, entitlements_count):
jobs = []
for i in range(0, entitlements_count, self.MAX_ROW_JOB_QUEUE):
jobs.append(
self.delayable()._set_pending_validation_entitlements(cycle, offset=i, limit=self.MAX_ROW_JOB_QUEUE)
self.delayable(channel="entitlement_approval")._set_pending_validation_entitlements(
cycle, offset=i, limit=self.MAX_ROW_JOB_QUEUE
)
)
main_job = group(*jobs)
main_job.on_done(self.delayable().mark_job_as_done(cycle, _("Entitlements Set to Pending Validation.")))
Expand Down Expand Up @@ -315,7 +317,11 @@ def _validate_entitlements_async(self, cycle, entitlements_count):

jobs = []
for i in range(0, entitlements_count, self.MAX_ROW_JOB_QUEUE):
jobs.append(self.delayable()._validate_entitlements(cycle, offset=i, limit=self.MAX_ROW_JOB_QUEUE))
jobs.append(
self.delayable(channel="entitlement_approval")._validate_entitlements(
cycle, offset=i, limit=self.MAX_ROW_JOB_QUEUE
)
)
main_job = group(*jobs)
main_job.on_done(self.delayable().mark_job_as_done(cycle, _("Entitlements Validated and Approved.")))
main_job.delay()
Expand Down
9 changes: 5 additions & 4 deletions spp_programs/models/managers/program_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,12 +187,13 @@ def _enroll_eligible_registrants_async(self, states, members_count):
jobs = []
for i in range(0, members_count, self.MAX_ROW_JOB_QUEUE):
jobs.append(
self.delayable(channel="program_manager")._enroll_eligible_registrants(
states, i, self.MAX_ROW_JOB_QUEUE
)
self.delayable(
channel="program_manager",
identity_key=f"enroll_eligible_{program.id}_{i}",
)._enroll_eligible_registrants(states, i, self.MAX_ROW_JOB_QUEUE)
)
main_job = group(*jobs)
main_job.on_done(self.delayable(channel="program_manager").mark_enroll_eligible_as_done())
main_job.on_done(self.delayable(channel="statistics_refresh").mark_enroll_eligible_as_done())
main_job.delay()

def _enroll_eligible_registrants(self, states, offset=0, limit=None, do_count=False):
Expand Down
Loading
Loading