Skip to content

Commit

Permalink
do not eagerly convert expected values
Browse files Browse the repository at this point in the history
- apply weak-equals comparison if necessary AFTER values have been
fetched from db
  • Loading branch information
elmarx committed Feb 19, 2018
1 parent efc2d9e commit 97ecd04
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 11 deletions.
44 changes: 33 additions & 11 deletions library/mysql_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,35 @@
ERR_NO_SUCH_TABLE = 4


def weak_equals(a, b):
"""
helper function to compare two values where one might be a string
:param a:
:param b:
:return:
"""
# the simple case: a and b are of the same type, or of types that can be compared for equality (e.g.: int and float)
if type(a) == type(b) or a == b:
return a == b

# try to stringify both and compare then…
return str(a) == str(b)


def tuples_weak_equals(a, b):
"""
tests two tuples for weak equality
:param a:
:param b:
:return:
"""
if len(a) == len(b):
pairs = zip(a, b)
return all([weak_equals(a, b) for a, b in pairs])

return False


def change_required(cursor, table, identifiers, desired_values):
"""
check if a change is required
Expand Down Expand Up @@ -122,7 +151,9 @@ def change_required(cursor, table, identifiers, desired_values):
# bring the values argument into shape to compare directly to fetchone() result
expected_query_result = tuple(desired_values.values())
actual_result = cursor.fetchone()
if expected_query_result == actual_result:

# compare expected_query_result to actual_result with implicit casting
if tuples_weak_equals(expected_query_result, actual_result):
return NO_ACTION_REQUIRED

# a record has been found but does not match the desired values
Expand Down Expand Up @@ -216,18 +247,9 @@ def extract_column_value_maps(parameter):
:param parameter:
:return:
"""

def cast_if_necessary(v):
if isinstance(v, int):
return long(v)
if isinstance(v, str) and v.isdigit():
return long(v)

return v

if parameter:
for column, value in parameter.items():
yield (mysql_quote_identifier(column, 'column'), cast_if_necessary(value))
yield (mysql_quote_identifier(column, 'column'), value)


def failed(action):
Expand Down
22 changes: 22 additions & 0 deletions tests/test_mysql_query_multi_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,28 @@ def test_no_change_required(self):
values={'value1': '8', 'value2': 'admin', 'value3': "made up"},
)

with self.assertRaises(AnsibleExitJson) as e:
self.module.main()

result = e.exception.args[0]

self.assertIn('changed', result)
self.assertFalse(result['changed'], 'no changed required is detected')
self.assertEquals(self.f.count_multicolumn_example(), 1, 'no additional row has been inserted in check-mode')

def test_change_detection_for_digits_in_strings(self):
# insert a row that does not need to be updated
self.f.insert_into_multicolumn_example(['elmar@athmer.org', 4, '5'], [8, '15', '16'])

set_module_args(
login_user=MYSQL_CONNECTION_PARAMS['user'],
name=MYSQL_CONNECTION_PARAMS['db'],
login_password=MYSQL_CONNECTION_PARAMS['passwd'],
login_host=MYSQL_CONNECTION_PARAMS['host'],
table='multicolumn_example',
identifiers=dict(identifier1='elmar@athmer.org', identifier2='4', identifier3='5'),
values={'value1': '8', 'value2': '15', 'value3': "16"},
)

with self.assertRaises(AnsibleExitJson) as e:
self.module.main()
Expand Down
34 changes: 34 additions & 0 deletions tests/test_weak_equals.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import unittest

from library.mysql_query import weak_equals


class WeakEquals(unittest.TestCase):

def test_int_int(self):
self.assertTrue(weak_equals(42, 42))
self.assertFalse(weak_equals(42, 43))

def test_string_string(self):
self.assertTrue(weak_equals("a", "a"))
self.assertFalse(weak_equals("a", "b"))

def test_string_int(self):
self.assertTrue(weak_equals("1", 1))
self.assertTrue(weak_equals(1, "1"))
self.assertFalse(weak_equals("2", 1))
self.assertFalse(weak_equals(1, "2"))

def test_string_long(self):
self.assertTrue(weak_equals("1", 1L))
self.assertTrue(weak_equals(1L, "1"))
self.assertFalse(weak_equals("2", 1L))
self.assertFalse(weak_equals(1L, "2"))

def test_string_float(self):
self.assertTrue(weak_equals("1.1", 1.1))
self.assertTrue(weak_equals(1.1, "1.1"))
self.assertFalse(weak_equals("2", 1.1))
self.assertFalse(weak_equals(1.1, "2"))


0 comments on commit 97ecd04

Please sign in to comment.