Skip to content
This repository has been archived by the owner on Jun 14, 2023. It is now read-only.

Commit

Permalink
Merge pull request #25 from adelosa/feature/validate_message_length
Browse files Browse the repository at this point in the history
Add extra validation to ensure full record consumed by bitmap
  • Loading branch information
adelosa committed Aug 6, 2016
2 parents 024d901 + 5d789de commit 0a3cda6
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 14 deletions.
12 changes: 12 additions & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,18 @@
History
=======
0.4.5 (2016-08-06)
------------------
* check that all of message consumed by fields otherwise raise exception
* get rid of a heap of debugging prints that were clogging the output
* allow freestyle de43 fields with the de43 processor enabled. Use regex rather than string splits

0.4.4-0.4.3 (2016-08-03)
------------------------
* Fix issue with mideu when no parameters passed (stack trace)
* Some more debugging messages provided with -d switch
* signed the release with key for 0.4.4. need to publish my pub key somewhere..

0.4.2 (2016-03-13)
------------------
* Complete data elements added to default config.
Expand Down
55 changes: 41 additions & 14 deletions mciutil/mciutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import datetime
import decimal
import codecs
import re
import binascii

import bitarray
Expand Down Expand Up @@ -175,6 +176,16 @@ def flip_message_encoding(message, bit_config, source_format):
# Increment the message pointer and process next field
message_pointer += message_increment
flipped_message += return_message

# check that all of message has been consumed, otherwise raise exception
if message_pointer != len(message_data):
raise Exception(
"Message data not correct length. Bitmap indicates len={0}, message is len={1}\n{2}".format(
message_pointer,
len(message_data),
hexdump.hexdump(message_data, result="return")
)
)
return flipped_message


Expand All @@ -193,13 +204,13 @@ def _flip_element_encoding(bit_config, message_data, source_format):
flipped_element = b("")

field_length = bit_config['field_length']
print("processing field {0}".format(bit_config["field_name"]))
LOGGER.debug("processing field %s", bit_config["field_name"])

length_size = _get_field_length(bit_config)

if length_size > 0:
field_length_string = message_data[:length_size]
print("field_length_string {0}, {1}".format(length_size, field_length_string))
LOGGER.debug("field_length_string %s, %s", length_size, field_length_string)
if source_format == 'ebcdic':
field_length_string = _convert_text_eb2asc(field_length_string)
field_length = int(field_length_string)
Expand Down Expand Up @@ -253,7 +264,7 @@ def get_message_elements(message, bit_config, source_format):
* key = 'TAGxxxx' icc fields
"""
LOGGER.debug("processing record %s", hexdump.hexdump(message))
LOGGER.debug("Processing message: len=%s contents:\n%s", len(message), hexdump.hexdump(message, result="return"))
# split raw message into components MessageType(4B), Bitmap(16B),
# Message(l=*)
message_length = len(message)-20
Expand Down Expand Up @@ -284,6 +295,16 @@ def get_message_elements(message, bit_config, source_format):
message_pointer += message_increment
return_values.update(return_message)

# check that all of message has been consumed, otherwise raise exception
if message_pointer != len(message_data):
raise Exception(
"Message data not correct length. Bitmap indicates len={0}, message is len={1}\n{2}".format(
message_pointer,
len(message_data),
hexdump.hexdump(message_data, result="return")
)
)

return return_values


Expand Down Expand Up @@ -514,7 +535,7 @@ def _get_icc_fields(field_data):
field_pointer += 1

field_tag_display = binascii.b2a_hex(field_tag)
print("field_tag_display={0}".format(field_tag_display))
LOGGER.debug("field_tag_display=%s", field_tag_display)
field_length_raw = field_data[field_pointer:field_pointer+1]
field_length = struct.unpack(">B", field_length_raw)[0]

Expand All @@ -539,16 +560,22 @@ def _get_de43_fields(de43_field):
:param de43_field: data of pds 43
:return: dictionary of pds 43 sub elements
"""

de43_elements = {}
de43_split = de43_field.split(b('\\'))
de43_elements["DE43_NAME"] = de43_split[0].rstrip()
de43_elements["DE43_ADDRESS"] = de43_split[1].rstrip()
de43_elements["DE43_SUBURB"] = de43_split[2].rstrip()
de43_elements["DE43_POSTCODE"] = de43_split[3][:4]
de43_elements["DE43_STATE"] = de43_split[3][len(de43_split[3])-6:len(de43_split[3])-3]
de43_elements["DE43_COUNTRY"] = de43_split[3][len(de43_split[3])-3:len(de43_split[3])]
return de43_elements
LOGGER.debug("de43_field=%s", de43_field)
de43_regex = (
r"(?P<DE43_NAME>.+?) *\\(?P<DE43_ADDRESS>.+?) *\\(?P<DE43_SUBURB>.+?) *\\"
r"(?P<DE43_POSTCODE>\d{4,10}) *(?P<DE43_STATE>.{3})(?P<DE43_COUNTRY>.{3})"
)

field_match = re.match(de43_regex, de43_field.decode())
if not field_match:
return dict()
# get the dict
field_dict = field_match.groupdict()
# set fields in dict to bytes (no effect for py2)
for field in field_dict:
field_dict[field] = b(field_dict[field])

return field_dict

if sys.version_info < (3,):
def b(string):
Expand Down
5 changes: 5 additions & 0 deletions tests/test_mciutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ def test_get_message_elements_ebcdic(self):
self.assertEqual(message_elements["DE3"], b"111111")
self.assertEqual(message_elements["DE4"], b"000000009999")

def test_get_de43_elements_no_pattern_match(self):
de43_raw = b("MUMBAI EMV ATM - 2 MUMBAI IN")
de43_elements = _get_de43_fields(de43_raw)
self.assertEqual({}, de43_elements)

def test_get_de43_elements(self):
de43_raw = b("AAMI \\36 WICKHAM TERRACE "
" \\BRISBANE \\4000 QLDAUS")
Expand Down
25 changes: 25 additions & 0 deletions tests/test_mideu.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def setUp(self):
create_test_ascii_ipm_file()
create_test_ebcdic_ipm_file()
create_test_ascii_de55_ipm_file()
create_bad_ascii_ipm_file()


class MideuCommonTestCase(CommandLineTestCase):
Expand Down Expand Up @@ -120,6 +121,11 @@ def test_with_ascii_de55_values(self):
"build/test/test_ascii_de55_ipm.in"])
_main(args)

def test_with_bad_ascii_file(self):
args = self.parser.parse_args(["extract", "-s", "ascii",
"build/test/test_bad_ascii_ipm.in"])
self.assertRaises(Exception, lambda: _main(args))


class CsvOutputTest(TestCase):
def test_output_byte_field(self):
Expand Down Expand Up @@ -214,6 +220,25 @@ def create_test_ascii_ipm_file():
block_and_write_list(message_list, TEST_ASCII_IPM_FILENAME)


def create_bad_ascii_ipm_file():
"""
Create a test IPM file with length greater than bitmap data.
This record has additional byte added
"""
message_raw = b(
"\x00\x00\x00\xee1144\xF0\x10\x05\x42\x84\x61\x80\x02\x02\x00\x00\x04"
"\x00\x00\x00\x00"
"1644445555444455551111110000000099992015081517151234"
"5678901233312342357995799120000001230612061234561234"
"5657994211111111145BIG BOBS\\70 FERNDALE ST\\ANNERLE"
"Y\\4103 QLDAUS0080001001Y99901600000000000000011234"
"567806999999^"
)
filename = "build/test/test_bad_ascii_ipm.in"
with open(filename, 'wb') as output_file:
output_file.write(message_raw)


def block_and_write_list(message_list, file_name):
# block and write to file
if not os.path.exists(os.path.dirname(file_name)):
Expand Down

0 comments on commit 0a3cda6

Please sign in to comment.