Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion hr_payroll_document/__manifest__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@
"version": "16.0.1.3.0",
"depends": ["hr", "base_vat"],
"maintainers": ["peluko00"],
"external_dependencies": {"python": ["pypdf"]},
"external_dependencies": {
"python": [
"pypdf",
],
},
"data": [
"wizard/payroll_management_wizard.xml",
"security/ir.model.access.csv",
Expand Down
28 changes: 20 additions & 8 deletions hr_payroll_document/models/hr_employee.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from odoo import _, fields, models
# Copyright 2025 Simone Rubino - PyTech
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl).

from odoo import _, api, fields, models
from odoo.exceptions import ValidationError


Expand All @@ -13,10 +16,19 @@ class Employee(models.Model):
"existing payrolls will not change their encryption status.",
)

def write(self, vals):
res = super().write(vals)
if "identification_id" in vals and not self.env["res.partner"].simple_vat_check(
self.env.company.country_id.code, vals["identification_id"]
):
raise ValidationError(_("The field identification ID is not valid"))
return res
def _validate_payroll_identification(self, code=None):
# Override if the identification should be validated in another way
if code is None and len(self) == 1:
code = self.identification_id
if country_code := self.env.company.country_id.code:
is_valid = self.env["res.partner"].simple_vat_check(country_code, code)
else:
is_valid = True
return is_valid

@api.constrains("identification_id")
def _constrain_payroll_identification(self):
# Only check the employees that have an `identification_id`
for employee in self.filtered("identification_id"):
if not employee._validate_payroll_identification():
raise ValidationError(_("The field identification ID is not valid"))
35 changes: 30 additions & 5 deletions hr_payroll_document/tests/common.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# Part of Odoo. See LICENSE file for full copyright and licensing details.
import base64
import contextlib
from unittest import mock

from odoo.modules.module import get_module_resource
from odoo.tests import common
Expand Down Expand Up @@ -32,9 +34,12 @@ def setUp(self):
}
)

with open(
get_module_resource("hr_payroll_document", "tests", "test.pdf"), "rb"
) as pdf_file:
self.wizard = self._create_wizard(
"January", ["hr_payroll_document", "tests", "test.pdf"]
)

def _create_wizard(self, subject, file_path):
with open(get_module_resource(*file_path), "rb") as pdf_file:
encoded_string = base64.b64encode(pdf_file.read())
ir_values = {
"name": "test",
Expand All @@ -45,7 +50,27 @@ def setUp(self):
"res_id": 1,
}
self.attachment = self.env["ir.attachment"].create(ir_values)
self.subject = "January"
self.wizard = self.env["payroll.management.wizard"].create(
self.subject = subject
return self.env["payroll.management.wizard"].create(
{"payrolls": [self.attachment.id], "subject": self.subject}
)

@contextlib.contextmanager
def _mock_valid_identification(self, employee, identification_code):
def _mocked_validate_payroll_identification(self, code=None):
if code is None:
code = employee.identification_id
return code == identification_code

with mock.patch.object(
type(employee),
"_validate_payroll_identification",
_mocked_validate_payroll_identification,
) as patch:
patch.side_effect = _mocked_validate_payroll_identification
yield

def fill_company_id(self):
self.env.company.country_id = self.env["res.country"].search(
[("name", "=", "Spain")]
)
24 changes: 2 additions & 22 deletions hr_payroll_document/tests/test_hr_payroll_document.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

from odoo import _
from odoo.exceptions import UserError, ValidationError
from odoo.modules.module import get_module_resource

from odoo.addons.hr_payroll_document.tests.common import TestHrPayrollDocument

Expand All @@ -14,28 +13,9 @@ class TestHRPayrollDocument(TestHrPayrollDocument):
def setUp(self, *args, **kwargs):
super().setUp(*args, **kwargs)

def fill_company_id(self):
self.env.company.country_id = self.env["res.country"].search(
[("name", "=", "Spain")]
)

def test_extension_error(self):
with open(
get_module_resource("hr_payroll_document", "tests", "test.docx"), "rb"
) as pdf_file:
encoded_string = base64.b64encode(pdf_file.read())
ir_values = {
"name": "test",
"type": "binary",
"datas": encoded_string,
"store_fname": encoded_string,
"res_model": "payroll.management.wizard",
"res_id": 1,
}
self.attachment = self.env["ir.attachment"].create(ir_values)
self.subject = "January"
self.wizard = self.env["payroll.management.wizard"].create(
{"payrolls": [self.attachment.id], "subject": self.subject}
self.wizard = self._create_wizard(
"January", ["hr_payroll_document", "tests", "test.docx"]
)
with self.assertRaises(ValidationError):
self.wizard.send_payrolls()
Expand Down
147 changes: 95 additions & 52 deletions hr_payroll_document/wizard/payroll_management_wizard.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import base64
from base64 import b64decode

from pypdf import PdfReader, PdfWriter
from pypdf import PdfReader, PdfWriter, errors

from odoo import _, fields, models
from odoo.exceptions import UserError, ValidationError
Expand All @@ -22,49 +22,114 @@ class PayrollManagamentWizard(models.TransientModel):
copy=False,
)

def send_payrolls(self):
not_found = set()
self.merge_pdfs()
reader = PdfReader("/tmp/merged-pdf.pdf")
employees = set()

# Validate if company have country
if not self.env.company.country_id:
raise UserError(_("You must to filled country field of company"))
def _get_fallback_reader(self, pdf_reader):
# Override to use another reader
pass

def _read_page_content(self, pdf_reader, page, fallback_reader=None):
try:
page_content = page.extract_text().split()
except errors.PdfReadError:
if fallback_reader:
# The original page cannot be read:
# read the simplified page in the fallback_reader
page_number = pdf_reader.get_page_number(page)
fallback_page = fallback_reader.get_page(page_number)
page_content = fallback_page.extract_text().split()
else:
raise
return page_content

def _extract_employees(self, pdf_reader, fallback_reader=None):
employee_to_pages = dict()
not_found_ids = set()

# Find all IDs of the employees
for page in reader.pages:
for value in page.extract_text().split():
for page in pdf_reader.pages:
page_content = self._read_page_content(
pdf_reader, page, fallback_reader=fallback_reader
)
for value in page_content:
if self.validate_id(value) and value != self.env.company.vat:
employee = self.env["hr.employee"].search(
[("identification_id", "=", value)]
)
if employee:
employees.add(employee)
employee_to_pages.setdefault(employee, []).append(page)
else:
not_found.add(value)
not_found_ids.add(value)
break

for employee in list(employees):
pdfWriter = PdfWriter()
for page in reader.pages:
if employee.identification_id in page.extract_text():
# Save pdf with payrolls of employee
pdfWriter.add_page(page)
return employee_to_pages, not_found_ids

def _build_employee_payroll(self, file_name, pdf_pages, encryption_key=None):
"""Return the path to the created payroll.

Optionally encrypt the payroll file with `encryption_key`.
"""
pdfWriter = PdfWriter()
for page in pdf_pages:
pdfWriter.add_page(page)

path = "/tmp/" + _("Payroll ") + employee.name + ".pdf"
path = "/tmp/" + file_name

if not employee.no_payroll_encryption:
# Encrypt the payroll file with the identification identifier of the employee
pdfWriter.encrypt(employee.identification_id, algorithm="AES-256")
if encryption_key:
pdfWriter.encrypt(encryption_key, algorithm="AES-256")

f = open(path, "wb")
with open(path, "wb") as f:
pdfWriter.write(f)
f.close()
return path

def _show_employees_action(self):
return {
"name": _("Payrolls sent"),
"type": "ir.actions.act_window",
"res_model": "hr.employee",
"views": [
(False, "kanban"),
(False, "tree"),
(False, "form"),
(False, "activity"),
],
}

def send_payrolls(self):
self.merge_pdfs()
# Validate if company have country
if not self.env.company.country_id:
raise UserError(_("You must to filled country field of company"))

reader = PdfReader("/tmp/merged-pdf.pdf")

try:
employee_to_pages, not_found = self._extract_employees(reader)
except errors.PdfReadError:
# Couldn't read the file, try again with another reader
fallback_reader = self._get_fallback_reader(reader)
if fallback_reader:
employee_to_pages, not_found = self._extract_employees(
reader, fallback_reader=fallback_reader
)
else:
raise

for employee, pages in employee_to_pages.items():
encryption_key = (
None if employee.no_payroll_encryption else employee.identification_id
)
path = self._build_employee_payroll(
_(
"Payroll %(subject)s %(employee)s.pdf",
employee=employee.name,
subject=self.subject,
),
pages,
encryption_key=encryption_key,
)
# Send payroll to the employee
self.send_mail(employee, path)

next_action = self._show_employees_action()
if not_found:
return {
"type": "ir.actions.client",
Expand All @@ -75,17 +140,7 @@ def send_payrolls(self):
+ ", ".join(list(not_found)),
"sticky": True,
"type": "warning",
"next": {
"name": _("Payrolls sent"),
"type": "ir.actions.act_window",
"res_model": "hr.employee",
"views": [
(False, "kanban"),
(False, "tree"),
(False, "form"),
(False, "activity"),
],
},
"next": next_action,
},
}

Expand All @@ -97,17 +152,7 @@ def send_payrolls(self):
"message": _("Payrolls sent to employees correctly"),
"sticky": False,
"type": "success",
"next": {
"name": _("Payrolls sent"),
"type": "ir.actions.act_window",
"res_model": "hr.employee",
"views": [
(False, "kanban"),
(False, "tree"),
(False, "form"),
(False, "activity"),
],
},
"next": next_action,
},
}

Expand Down Expand Up @@ -167,7 +212,5 @@ def send_mail(self, employee, path):
employee.id, force_send=True
)

def validate_id(self, number):
return self.env["res.partner"].simple_vat_check(
self.env.company.country_id.code, number
)
def validate_id(self, code):
return self.env["hr.employee"]._validate_payroll_identification(code=code)
Loading