Skip to content

Commit

Permalink
Update testing environment
Browse files Browse the repository at this point in the history
  • Loading branch information
jdddog committed Apr 28, 2023
1 parent b464ae2 commit 57c9a0d
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from dateutil.relativedelta import relativedelta
from deepdiff import DeepDiff
from freezegun import freeze_time
from google.cloud import bigquery, storage
from google.cloud.exceptions import NotFound
Expand Down Expand Up @@ -568,6 +569,66 @@ def create(self, task_logging: bool = False):
self.elastic_env.stop()


def load_json(file_path: str):
"""Load a JSON file for testing purposes. It parses dates and datetimes into pendulum instances."""

def parse_datetime(obj):
for key, value in obj.items():
if isinstance(value, str):
try:
obj[key] = pendulum.from_format(value, "YYYY-MM-DD").date()
except (ValueError, TypeError):
try:
obj[key] = pendulum.from_format(value, "YYYYMMDD").date()
except (ValueError, TypeError):
try:
obj[key] = pendulum.parse(value)
except (ValueError, TypeError):
pass
return obj

with open(file_path, mode="r") as f:
rows = json.load(f, object_hook=parse_datetime)
return rows


def compare_lists_of_dicts(expected: List[Dict], actual: List[Dict], primary_key: str) -> bool:
"""Compare two lists of dictionaries, using a primary_key as the basis for the top level comparisons.
:param expected: the expected data.
:param actual: the actual data.
:param primary_key: the primary key.
:return: whether the expected and actual match.
"""

expected_dict = {item[primary_key]: item for item in expected}
actual_dict = {item[primary_key]: item for item in actual}

if set(expected_dict.keys()) != set(actual_dict.keys()):
logging.error("Primary keys don't match:")
logging.error(f"Only in expected: {set(expected_dict.keys()) - set(actual_dict.keys())}")
logging.error(f"Only in actual: {set(actual_dict.keys()) - set(expected_dict.keys())}")
return False

all_matched = True
for key in expected_dict:
diff = DeepDiff(expected_dict[key], actual_dict[key], ignore_order=True)
for diff_type, changes in diff.items():
if diff_type.startswith("values_changed"):
all_matched = False
for key_path, change in changes.items():
logging.error(
f"(expected) != (actual) {key_path}: {change['old_value']} (expected) != (actual) {change['new_value']}"
)
elif diff_type.startswith("type_changes"):
for key_path, change in changes.items():
logging.warning(
f"(expected) != (actual) {key_path}: {change['old_type']} (expected) != (actual) {change['new_type']}"
)

return all_matched


class ObservatoryTestCase(unittest.TestCase):
"""Common test functions for testing Observatory Platform DAGs"""

Expand Down Expand Up @@ -711,7 +772,7 @@ def assert_table_integrity(self, table_id: str, expected_rows: int = None):
if expected_rows is not None:
self.assertEqual(expected_rows, actual_rows)

def assert_table_content(self, table_id: str, expected_content: List[dict] = None):
def assert_table_content(self, table_id: str, expected_content: List[dict], primary_key: str):
"""Assert whether a BigQuery table has any content and if expected content is given whether it matches the
actual content. The order of the rows is not checked, only whether all rows in the expected content match
the rows in the actual content.
Expand All @@ -720,24 +781,20 @@ def assert_table_content(self, table_id: str, expected_content: List[dict] = Non
:param table_id: the BigQuery table id.
:param expected_content: the expected content.
:param primary_key: the primary key to use to compare.
:return: whether the table has content and the expected content is correct
"""
rows = None
actual_content = None
try:
rows = self.bigquery_client.list_rows(table_id)
actual_content = json.loads(json.dumps([dict(row) for row in rows], default=str))
rows = list(self.bigquery_client.list_rows(table_id))
actual_content = [dict(row) for row in rows]
except NotFound:
pass

self.assertIsNotNone(rows)
if expected_content is not None:
for row in expected_content:
self.assertIn(row, actual_content)
actual_content.remove(row)
self.assertListEqual(
[], actual_content, msg=f"Rows in actual content that are not in expected content: {actual_content}"
)
self.assertIsNotNone(actual_content)
results = compare_lists_of_dicts(expected_content, actual_content, primary_key)
assert results, "Rows in actual content do not match expected content"

def assert_table_bytes(self, table_id: str, expected_bytes: int):
"""Assert whether the given bytes from a BigQuery table matches the expected bytes.
Expand Down
1 change: 1 addition & 0 deletions observatory-platform/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,4 @@ importlib-metadata==3.7.3
deprecation>2,<3

virtualenv
deepdiff

0 comments on commit 57c9a0d

Please sign in to comment.