From bffaa78fb591f97fca96e82dd0cf98e8a4813b0a Mon Sep 17 00:00:00 2001 From: Ronald Portier Date: Thu, 20 Oct 2016 11:34:11 +0200 Subject: [PATCH] [ENH] Improve determination of start and end balance for camt. --- account_bank_statement_import_camt/camt.py | 215 +++++++++++++-------- 1 file changed, 132 insertions(+), 83 deletions(-) diff --git a/account_bank_statement_import_camt/camt.py b/account_bank_statement_import_camt/camt.py index aed01dbf0d..b26c0e7f47 100644 --- a/account_bank_statement_import_camt/camt.py +++ b/account_bank_statement_import_camt/camt.py @@ -3,6 +3,7 @@ ############################################################################## # # Copyright (C) 2013-2015 Therp BV +# (C) 2015 1200wd.com # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published @@ -18,32 +19,51 @@ # along with this program. If not, see . # ############################################################################## +import logging import re from datetime import datetime from lxml import etree + +from openerp import _ from openerp.addons.account_bank_statement_import.parserlib import ( BankStatement) +_logger = logging.getLogger(__name__) + + class CamtParser(object): """Parser for camt bank statement import files.""" - def parse_amount(self, ns, node): + def __init__(self): + """Define and initialize attributes.""" + super(CamtParser, self).__init__() + self.namespace = '' + + def xpath(self, node, expr): + """ + Wrap namespaces argument into call to Element.xpath(): + + self.xpath(node, './ns:Acct/ns:Id') + """ + return node.xpath(expr, namespaces={'ns': self.namespace}) + + def parse_amount(self, node): """Parse element that contains Amount and CreditDebitIndicator.""" if node is None: return 0.0 sign = 1 amount = 0.0 - sign_node = node.xpath('ns:CdtDbtInd', namespaces={'ns': ns}) + sign_node = self.xpath(node, 'ns:CdtDbtInd') if sign_node and sign_node[0].text == 'DBIT': sign = -1 - amount_node = node.xpath('ns:Amt', namespaces={'ns': ns}) + amount_node = self.xpath(node, 'ns:Amt') if amount_node: amount = sign * float(amount_node[0].text) return amount def add_value_from_node( - self, ns, node, xpath_str, obj, attr_name, join_str=None): + self, node, xpath_str, obj, attr_name, join_str=None): """Add value to object from first or all nodes found with xpath. If xpath_str is a list (or iterable), it will be seen as a series @@ -52,7 +72,7 @@ def add_value_from_node( if not isinstance(xpath_str, (list, tuple)): xpath_str = [xpath_str] for search_str in xpath_str: - found_node = node.xpath(search_str, namespaces={'ns': ns}) + found_node = self.xpath(node, search_str) if found_node: if join_str is None: attr_value = found_node[0].text @@ -61,18 +81,19 @@ def add_value_from_node( setattr(obj, attr_name, attr_value) break - def parse_transaction_details(self, ns, node, transaction): + def parse_transaction_details(self, node, transaction): """Parse transaction details (message, party, account...).""" # message self.add_value_from_node( - ns, node, [ + node, [ './ns:RmtInf/ns:Ustrd', './ns:AddtlTxInf', './ns:AddtlNtryInf', - ], transaction, 'message', join_str='\n') + ], transaction, 'message', join_str='\n' + ) # eref self.add_value_from_node( - ns, node, [ + node, [ './ns:RmtInf/ns:Strd/ns:CdtrRefInf/ns:Ref', './ns:Refs/ns:EndToEndId', ], @@ -80,66 +101,69 @@ def parse_transaction_details(self, ns, node, transaction): ) # remote party values party_type = 'Dbtr' - party_type_node = node.xpath( - '../../ns:CdtDbtInd', namespaces={'ns': ns}) + party_type_node = self.xpath(node, '../../ns:CdtDbtInd') if party_type_node and party_type_node[0].text != 'CRDT': party_type = 'Cdtr' - party_node = node.xpath( - './ns:RltdPties/ns:%s' % party_type, namespaces={'ns': ns}) + party_node = self.xpath(node, './ns:RltdPties/ns:%s' % party_type) if party_node: self.add_value_from_node( - ns, party_node[0], './ns:Nm', transaction, 'remote_owner') + party_node[0], './ns:Nm', transaction, 'remote_owner' + ) self.add_value_from_node( - ns, party_node[0], './ns:PstlAdr/ns:Ctry', transaction, + party_node[0], './ns:PstlAdr/ns:Ctry', transaction, 'remote_owner_country' ) - address_node = party_node[0].xpath( - './ns:PstlAdr/ns:AdrLine', namespaces={'ns': ns}) + address_node = self.xpath( + party_node[0], './ns:PstlAdr/ns:AdrLine' + ) if address_node: transaction.remote_owner_address = [address_node[0].text] # Get remote_account from iban or from domestic account: - account_node = node.xpath( - './ns:RltdPties/ns:%sAcct/ns:Id' % party_type, - namespaces={'ns': ns} + account_node = self.xpath( + node, './ns:RltdPties/ns:%sAcct/ns:Id' % party_type ) if account_node: - iban_node = account_node[0].xpath( - './ns:IBAN', namespaces={'ns': ns}) + iban_node = self.xpath(account_node[0], './ns:IBAN') if iban_node: transaction.remote_account = iban_node[0].text - bic_node = node.xpath( - './ns:RltdAgts/ns:%sAgt/ns:FinInstnId/ns:BIC' % party_type, - namespaces={'ns': ns} + bic_node = self.xpath( + node, + './ns:RltdAgts/ns:%sAgt/ns:FinInstnId/ns:BIC' + % party_type ) if bic_node: transaction.remote_bank_bic = bic_node[0].text else: self.add_value_from_node( - ns, account_node[0], './ns:Othr/ns:Id', transaction, + account_node[0], './ns:Othr/ns:Id', transaction, 'remote_account' ) - def parse_transaction(self, ns, node, transaction): + def parse_transaction(self, node, transaction): """Parse transaction (entry) node.""" self.add_value_from_node( - ns, node, './ns:BkTxCd/ns:Prtry/ns:Cd', transaction, + node, './ns:BkTxCd/ns:Prtry/ns:Cd', transaction, 'transfer_type' ) self.add_value_from_node( - ns, node, './ns:BookgDt/ns:Dt', transaction, 'execution_date') + node, './ns:BookgDt/ns:Dt', transaction, 'execution_date' + ) self.add_value_from_node( - ns, node, './ns:ValDt/ns:Dt', transaction, 'value_date') - transaction.transferred_amount = self.parse_amount(ns, node) - details_node = node.xpath( - './ns:NtryDtls/ns:TxDtls', namespaces={'ns': ns}) + node, './ns:ValDt/ns:Dt', transaction, 'value_date' + ) + transaction.transferred_amount = self.parse_amount(node) + details_node = self.xpath( + node, './ns:NtryDtls/ns:TxDtls' + ) if details_node: - self.parse_transaction_details(ns, details_node[0], transaction) + self.parse_transaction_details(details_node[0], transaction) if not transaction.message: self.add_value_from_node( - ns, node, './ns:AddtlNtryInf', transaction, 'message') + node, './ns:AddtlNtryInf', transaction, 'message' + ) if not transaction.eref: self.add_value_from_node( - ns, node, [ + node, [ './ns:NtryDtls/ns:Btch/ns:PmtInfId', ], transaction, 'eref' @@ -147,72 +171,97 @@ def parse_transaction(self, ns, node, transaction): transaction.data = etree.tostring(node) return transaction - def get_balance_amounts(self, ns, node): - """Return opening and closing balance. + def get_balance_type_node(self, node, balance_type): + """ + :param node: BkToCstmrStmt/Stmt/Bal node + :param balance type: one of 'OPBD', 'PRCD', 'ITBD', 'CLBD' + """ + code_expr = ( + './ns:Bal/ns:Tp/ns:CdOrPrtry/ns:Cd[text()="%s"]/../../..' % + balance_type + ) + return self.xpath(node, code_expr) - Depending on kind of balance and statement, the balance might be in a - different kind of node: - OPBD = OpeningBalance - PRCD = PreviousClosingBalance - ITBD = InterimBalance (first ITBD is start-, second is end-balance) - CLBD = ClosingBalance + def get_start_balance(self, node): """ - start_balance_node = None - end_balance_node = None - for node_name in ['OPBD', 'PRCD', 'CLBD', 'ITBD']: - code_expr = ( - './ns:Bal/ns:Tp/ns:CdOrPrtry/ns:Cd[text()="%s"]/../../..' % - node_name - ) - balance_node = node.xpath(code_expr, namespaces={'ns': ns}) - if balance_node: - if node_name in ['OPBD', 'PRCD']: - start_balance_node = balance_node[0] - elif node_name == 'CLBD': - end_balance_node = balance_node[0] - else: - if not start_balance_node: - start_balance_node = balance_node[0] - if not end_balance_node: - end_balance_node = balance_node[-1] - return ( - self.parse_amount(ns, start_balance_node), - self.parse_amount(ns, end_balance_node) + Find the (only) balance node with code OpeningBalance, or + the only one with code 'PreviousClosingBalance' + or the first balance node with code InterimBalance in + the case of preceeding pagination. + + :param node: BkToCstmrStmt/Stmt/Bal node + """ + balance = 0 + nodes = ( + self.get_balance_type_node(node, 'OPBD') or + self.get_balance_type_node(node, 'PRCD') or + self.get_balance_type_node(node, 'ITBD') ) + if nodes: + balance = self.parse_amount(nodes[0]) + return balance - def parse_statement(self, ns, node): + def get_end_balance(self, node): + """ + Find the (only) balance node with code ClosingBalance, or + the second (and last) balance node with code InterimBalance in + the case of continued pagination. + + :param node: BkToCstmrStmt/Stmt/Bal node + """ + balance = 0 + nodes = ( + self.get_balance_type_node(node, 'CLAV') or + self.get_balance_type_node(node, 'CLBD') or + self.get_balance_type_node(node, 'ITBD') + ) + if nodes: + balance = self.parse_amount(nodes[-1]) + return balance + + def parse_statement(self, node): """Parse a single Stmt node.""" statement = BankStatement() self.add_value_from_node( - ns, node, [ + node, [ './ns:Acct/ns:Id/ns:IBAN', './ns:Acct/ns:Id/ns:Othr/ns:Id', ], statement, 'local_account' ) + self.add_value_from_node(node, './ns:Id', statement, 'statement_id') self.add_value_from_node( - ns, node, './ns:Id', statement, 'statement_id') - self.add_value_from_node( - ns, node, './ns:Acct/ns:Ccy', statement, 'local_currency') - (statement.start_balance, statement.end_balance) = ( - self.get_balance_amounts(ns, node)) - transaction_nodes = node.xpath('./ns:Ntry', namespaces={'ns': ns}) + node, './ns:Acct/ns:Ccy', statement, 'local_currency') + statement.start_balance = self.get_start_balance(node) + statement.end_balance = self.get_end_balance(node) + transaction_nodes = self.xpath(node, './ns:Ntry') + total_amount = 0 for entry_node in transaction_nodes: transaction = statement.create_transaction() - self.parse_transaction(ns, entry_node, transaction) + self.parse_transaction(entry_node, transaction) + total_amount += transaction.transferred_amount if statement['transactions']: statement.date = datetime.strptime( statement['transactions'][0].execution_date, "%Y-%m-%d") + if statement.start_balance == 0 and statement.end_balance != 0: + statement.start_balance = statement.end_balance - total_amount + _logger.debug( + _("Start balance %s calculated from end balance %s and" + " Total amount %s."), + statement.start_balance, + statement.end_balance, + total_amount + ) return statement - def check_version(self, ns, root): + def check_version(self, root): """Validate validity of camt file.""" # Check wether it is camt at all: re_camt = re.compile( r'(^urn:iso:std:iso:20022:tech:xsd:camt.' r'|^ISO:camt.)' ) - if not re_camt.search(ns): - raise ValueError('no camt: ' + ns) + if not re_camt.search(self.namespace): + raise ValueError('no camt: ' + self.namespace) # Check wether version 052 or 053: re_camt_version = re.compile( r'(^urn:iso:std:iso:20022:tech:xsd:camt.053.' @@ -220,10 +269,10 @@ def check_version(self, ns, root): r'|^ISO:camt.053.' r'|^ISO:camt.052.)' ) - if not re_camt_version.search(ns): - raise ValueError('no camt 052 or 053: ' + ns) + if not re_camt_version.search(self.namespace): + raise ValueError('no camt 052 or 053: ' + self.namespace) # Check GrpHdr element: - root_0_0 = root[0][0].tag[len(ns) + 2:] # strip namespace + root_0_0 = root[0][0].tag[len(self.namespace) + 2:] # strip namespace if root_0_0 != 'GrpHdr': raise ValueError('expected GrpHdr, got: ' + root_0_0) @@ -239,11 +288,11 @@ def parse(self, data): if root is None: raise ValueError( 'Not a valid xml file, or not an xml file at all.') - ns = root.tag[1:root.tag.index("}")] - self.check_version(ns, root) + self.namespace = root.tag[1:root.tag.index("}")] + self.check_version(root) statements = [] for node in root[0][1:]: - statement = self.parse_statement(ns, node) + statement = self.parse_statement(node) if len(statement['transactions']): statements.append(statement) return statements