Skip to content

Commit

Permalink
[ENH] Improve determination of start and end balance for camt.
Browse files Browse the repository at this point in the history
  • Loading branch information
NL66278 committed Oct 21, 2016
1 parent 1de9e28 commit bffaa78
Showing 1 changed file with 132 additions and 83 deletions.
215 changes: 132 additions & 83 deletions account_bank_statement_import_camt/camt.py
Expand Up @@ -3,6 +3,7 @@
##############################################################################
#
# Copyright (C) 2013-2015 Therp BV <http://therp.nl>
# (C) 2015 1200wd.com
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published
Expand All @@ -18,32 +19,51 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
##############################################################################
import logging
import re
from datetime import datetime
from lxml import etree

from openerp import _
from openerp.addons.account_bank_statement_import.parserlib import (
BankStatement)


_logger = logging.getLogger(__name__)


class CamtParser(object):
"""Parser for camt bank statement import files."""

def parse_amount(self, ns, node):
def __init__(self):
"""Define and initialize attributes."""
super(CamtParser, self).__init__()
self.namespace = ''

def xpath(self, node, expr):
"""
Wrap namespaces argument into call to Element.xpath():
self.xpath(node, './ns:Acct/ns:Id')
"""
return node.xpath(expr, namespaces={'ns': self.namespace})

def parse_amount(self, node):
"""Parse element that contains Amount and CreditDebitIndicator."""
if node is None:
return 0.0
sign = 1
amount = 0.0
sign_node = node.xpath('ns:CdtDbtInd', namespaces={'ns': ns})
sign_node = self.xpath(node, 'ns:CdtDbtInd')
if sign_node and sign_node[0].text == 'DBIT':
sign = -1
amount_node = node.xpath('ns:Amt', namespaces={'ns': ns})
amount_node = self.xpath(node, 'ns:Amt')
if amount_node:
amount = sign * float(amount_node[0].text)
return amount

def add_value_from_node(
self, ns, node, xpath_str, obj, attr_name, join_str=None):
self, node, xpath_str, obj, attr_name, join_str=None):
"""Add value to object from first or all nodes found with xpath.
If xpath_str is a list (or iterable), it will be seen as a series
Expand All @@ -52,7 +72,7 @@ def add_value_from_node(
if not isinstance(xpath_str, (list, tuple)):
xpath_str = [xpath_str]
for search_str in xpath_str:
found_node = node.xpath(search_str, namespaces={'ns': ns})
found_node = self.xpath(node, search_str)
if found_node:
if join_str is None:
attr_value = found_node[0].text
Expand All @@ -61,169 +81,198 @@ def add_value_from_node(
setattr(obj, attr_name, attr_value)
break

def parse_transaction_details(self, ns, node, transaction):
def parse_transaction_details(self, node, transaction):
"""Parse transaction details (message, party, account...)."""
# message
self.add_value_from_node(
ns, node, [
node, [
'./ns:RmtInf/ns:Ustrd',
'./ns:AddtlTxInf',
'./ns:AddtlNtryInf',
], transaction, 'message', join_str='\n')
], transaction, 'message', join_str='\n'
)
# eref
self.add_value_from_node(
ns, node, [
node, [
'./ns:RmtInf/ns:Strd/ns:CdtrRefInf/ns:Ref',
'./ns:Refs/ns:EndToEndId',
],
transaction, 'eref'
)
# remote party values
party_type = 'Dbtr'
party_type_node = node.xpath(
'../../ns:CdtDbtInd', namespaces={'ns': ns})
party_type_node = self.xpath(node, '../../ns:CdtDbtInd')
if party_type_node and party_type_node[0].text != 'CRDT':
party_type = 'Cdtr'
party_node = node.xpath(
'./ns:RltdPties/ns:%s' % party_type, namespaces={'ns': ns})
party_node = self.xpath(node, './ns:RltdPties/ns:%s' % party_type)
if party_node:
self.add_value_from_node(
ns, party_node[0], './ns:Nm', transaction, 'remote_owner')
party_node[0], './ns:Nm', transaction, 'remote_owner'
)
self.add_value_from_node(
ns, party_node[0], './ns:PstlAdr/ns:Ctry', transaction,
party_node[0], './ns:PstlAdr/ns:Ctry', transaction,
'remote_owner_country'
)
address_node = party_node[0].xpath(
'./ns:PstlAdr/ns:AdrLine', namespaces={'ns': ns})
address_node = self.xpath(
party_node[0], './ns:PstlAdr/ns:AdrLine'
)
if address_node:
transaction.remote_owner_address = [address_node[0].text]
# Get remote_account from iban or from domestic account:
account_node = node.xpath(
'./ns:RltdPties/ns:%sAcct/ns:Id' % party_type,
namespaces={'ns': ns}
account_node = self.xpath(
node, './ns:RltdPties/ns:%sAcct/ns:Id' % party_type
)
if account_node:
iban_node = account_node[0].xpath(
'./ns:IBAN', namespaces={'ns': ns})
iban_node = self.xpath(account_node[0], './ns:IBAN')
if iban_node:
transaction.remote_account = iban_node[0].text
bic_node = node.xpath(
'./ns:RltdAgts/ns:%sAgt/ns:FinInstnId/ns:BIC' % party_type,
namespaces={'ns': ns}
bic_node = self.xpath(
node,
'./ns:RltdAgts/ns:%sAgt/ns:FinInstnId/ns:BIC'
% party_type
)
if bic_node:
transaction.remote_bank_bic = bic_node[0].text
else:
self.add_value_from_node(
ns, account_node[0], './ns:Othr/ns:Id', transaction,
account_node[0], './ns:Othr/ns:Id', transaction,
'remote_account'
)

def parse_transaction(self, ns, node, transaction):
def parse_transaction(self, node, transaction):
"""Parse transaction (entry) node."""
self.add_value_from_node(
ns, node, './ns:BkTxCd/ns:Prtry/ns:Cd', transaction,
node, './ns:BkTxCd/ns:Prtry/ns:Cd', transaction,
'transfer_type'
)
self.add_value_from_node(
ns, node, './ns:BookgDt/ns:Dt', transaction, 'execution_date')
node, './ns:BookgDt/ns:Dt', transaction, 'execution_date'
)
self.add_value_from_node(
ns, node, './ns:ValDt/ns:Dt', transaction, 'value_date')
transaction.transferred_amount = self.parse_amount(ns, node)
details_node = node.xpath(
'./ns:NtryDtls/ns:TxDtls', namespaces={'ns': ns})
node, './ns:ValDt/ns:Dt', transaction, 'value_date'
)
transaction.transferred_amount = self.parse_amount(node)
details_node = self.xpath(
node, './ns:NtryDtls/ns:TxDtls'
)
if details_node:
self.parse_transaction_details(ns, details_node[0], transaction)
self.parse_transaction_details(details_node[0], transaction)
if not transaction.message:
self.add_value_from_node(
ns, node, './ns:AddtlNtryInf', transaction, 'message')
node, './ns:AddtlNtryInf', transaction, 'message'
)
if not transaction.eref:
self.add_value_from_node(
ns, node, [
node, [
'./ns:NtryDtls/ns:Btch/ns:PmtInfId',
],
transaction, 'eref'
)
transaction.data = etree.tostring(node)
return transaction

def get_balance_amounts(self, ns, node):
"""Return opening and closing balance.
def get_balance_type_node(self, node, balance_type):
"""
:param node: BkToCstmrStmt/Stmt/Bal node
:param balance type: one of 'OPBD', 'PRCD', 'ITBD', 'CLBD'
"""
code_expr = (
'./ns:Bal/ns:Tp/ns:CdOrPrtry/ns:Cd[text()="%s"]/../../..' %
balance_type
)
return self.xpath(node, code_expr)

Depending on kind of balance and statement, the balance might be in a
different kind of node:
OPBD = OpeningBalance
PRCD = PreviousClosingBalance
ITBD = InterimBalance (first ITBD is start-, second is end-balance)
CLBD = ClosingBalance
def get_start_balance(self, node):
"""
start_balance_node = None
end_balance_node = None
for node_name in ['OPBD', 'PRCD', 'CLBD', 'ITBD']:
code_expr = (
'./ns:Bal/ns:Tp/ns:CdOrPrtry/ns:Cd[text()="%s"]/../../..' %
node_name
)
balance_node = node.xpath(code_expr, namespaces={'ns': ns})
if balance_node:
if node_name in ['OPBD', 'PRCD']:
start_balance_node = balance_node[0]
elif node_name == 'CLBD':
end_balance_node = balance_node[0]
else:
if not start_balance_node:
start_balance_node = balance_node[0]
if not end_balance_node:
end_balance_node = balance_node[-1]
return (
self.parse_amount(ns, start_balance_node),
self.parse_amount(ns, end_balance_node)
Find the (only) balance node with code OpeningBalance, or
the only one with code 'PreviousClosingBalance'
or the first balance node with code InterimBalance in
the case of preceeding pagination.
:param node: BkToCstmrStmt/Stmt/Bal node
"""
balance = 0
nodes = (
self.get_balance_type_node(node, 'OPBD') or
self.get_balance_type_node(node, 'PRCD') or
self.get_balance_type_node(node, 'ITBD')
)
if nodes:
balance = self.parse_amount(nodes[0])
return balance

def parse_statement(self, ns, node):
def get_end_balance(self, node):
"""
Find the (only) balance node with code ClosingBalance, or
the second (and last) balance node with code InterimBalance in
the case of continued pagination.
:param node: BkToCstmrStmt/Stmt/Bal node
"""
balance = 0
nodes = (
self.get_balance_type_node(node, 'CLAV') or
self.get_balance_type_node(node, 'CLBD') or
self.get_balance_type_node(node, 'ITBD')
)
if nodes:
balance = self.parse_amount(nodes[-1])
return balance

def parse_statement(self, node):
"""Parse a single Stmt node."""
statement = BankStatement()
self.add_value_from_node(
ns, node, [
node, [
'./ns:Acct/ns:Id/ns:IBAN',
'./ns:Acct/ns:Id/ns:Othr/ns:Id',
], statement, 'local_account'
)
self.add_value_from_node(node, './ns:Id', statement, 'statement_id')
self.add_value_from_node(
ns, node, './ns:Id', statement, 'statement_id')
self.add_value_from_node(
ns, node, './ns:Acct/ns:Ccy', statement, 'local_currency')
(statement.start_balance, statement.end_balance) = (
self.get_balance_amounts(ns, node))
transaction_nodes = node.xpath('./ns:Ntry', namespaces={'ns': ns})
node, './ns:Acct/ns:Ccy', statement, 'local_currency')
statement.start_balance = self.get_start_balance(node)
statement.end_balance = self.get_end_balance(node)
transaction_nodes = self.xpath(node, './ns:Ntry')
total_amount = 0
for entry_node in transaction_nodes:
transaction = statement.create_transaction()
self.parse_transaction(ns, entry_node, transaction)
self.parse_transaction(entry_node, transaction)
total_amount += transaction.transferred_amount
if statement['transactions']:
statement.date = datetime.strptime(
statement['transactions'][0].execution_date, "%Y-%m-%d")
if statement.start_balance == 0 and statement.end_balance != 0:
statement.start_balance = statement.end_balance - total_amount
_logger.debug(
_("Start balance %s calculated from end balance %s and"
" Total amount %s."),
statement.start_balance,
statement.end_balance,
total_amount
)
return statement

def check_version(self, ns, root):
def check_version(self, root):
"""Validate validity of camt file."""
# Check wether it is camt at all:
re_camt = re.compile(
r'(^urn:iso:std:iso:20022:tech:xsd:camt.'
r'|^ISO:camt.)'
)
if not re_camt.search(ns):
raise ValueError('no camt: ' + ns)
if not re_camt.search(self.namespace):
raise ValueError('no camt: ' + self.namespace)
# Check wether version 052 or 053:
re_camt_version = re.compile(
r'(^urn:iso:std:iso:20022:tech:xsd:camt.053.'
r'|^urn:iso:std:iso:20022:tech:xsd:camt.052.'
r'|^ISO:camt.053.'
r'|^ISO:camt.052.)'
)
if not re_camt_version.search(ns):
raise ValueError('no camt 052 or 053: ' + ns)
if not re_camt_version.search(self.namespace):
raise ValueError('no camt 052 or 053: ' + self.namespace)
# Check GrpHdr element:
root_0_0 = root[0][0].tag[len(ns) + 2:] # strip namespace
root_0_0 = root[0][0].tag[len(self.namespace) + 2:] # strip namespace
if root_0_0 != 'GrpHdr':
raise ValueError('expected GrpHdr, got: ' + root_0_0)

Expand All @@ -239,11 +288,11 @@ def parse(self, data):
if root is None:
raise ValueError(
'Not a valid xml file, or not an xml file at all.')
ns = root.tag[1:root.tag.index("}")]
self.check_version(ns, root)
self.namespace = root.tag[1:root.tag.index("}")]
self.check_version(root)
statements = []
for node in root[0][1:]:
statement = self.parse_statement(ns, node)
statement = self.parse_statement(node)
if len(statement['transactions']):
statements.append(statement)
return statements

0 comments on commit bffaa78

Please sign in to comment.