Skip to content

Commit

Permalink
Update tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jdddog committed May 2, 2023
1 parent d7817f8 commit 7689fd9
Show file tree
Hide file tree
Showing 7 changed files with 726 additions and 684 deletions.
25 changes: 25 additions & 0 deletions observatory-platform/observatory/platform/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

# Author: James Diprose, Aniek Roelofs, Tuan Chien

import json
import os
from typing import ClassVar

Expand Down Expand Up @@ -379,6 +380,30 @@ def terraform(command, config_path, terraform_credentials_path, debug):
terraform_cmd.update_workspace()


@cli.command("sort-schema")
@click.argument("input-file", type=click.Path(exists=True, file_okay=True, dir_okay=False))
def sort_schema_cmd(input_file):
def sort_schema(schema):
sorted_schema = sorted(schema, key=lambda x: x["name"])

for field in sorted_schema:
if field["type"] == "RECORD" and "fields" in field:
field["fields"] = sort_schema(field["fields"])

return sorted_schema

# Load the JSON schema from a string
with open(input_file, mode="r") as f:
data = json.load(f)

# Sort the schema
sorted_json_schema = sort_schema(data)

# Save the schema
with open(input_file, mode="w") as f:
json.dump(sorted_json_schema, f, indent=2)


def terraform_check_dependencies(
terraform_cmd: TerraformCommand, generate_cmd: GenerateCommand, min_line_chars: int = 80
):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -456,15 +456,13 @@ def create_dag_run(
self,
dag: DAG,
execution_date: pendulum.DateTime,
freeze: bool = True,
run_type: DagRunType = DagRunType.SCHEDULED,
):
"""Create a DagRun that can be used when running tasks.
During cleanup the DAG run state is updated.
:param dag: the Airflow DAG instance.
:param execution_date: the execution date of the DAG.
:param freeze: whether to freeze time to the start date of the DAG run.
:param run_type: what run_type to use when running the DAG run.
:return: None.
"""
Expand Down
2 changes: 1 addition & 1 deletion observatory-platform/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pandas
tenacity

# Test utils
freezegun>=1.1.0,<2
time_machine>=2.0.0,<3
httpretty>=1.0.0,<2
sftpserver>=0.3,<1

Expand Down
103 changes: 53 additions & 50 deletions tests/observatory/platform/test_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
gcs_delete_old_buckets_with_prefix,
gcs_blob_name_from_path,
)
from observatory.platform.observatory_environment import random_id
from observatory.platform.observatory_environment import random_id, aws_bucket_test_env


def make_account_url(account_name: str) -> str:
Expand Down Expand Up @@ -92,9 +92,8 @@ def __init__(self, *args, **kwargs):
self.az_storage_account_name: str = os.getenv("TEST_AZURE_STORAGE_ACCOUNT_NAME")
self.az_container_sas_token: str = os.getenv("TEST_AZURE_CONTAINER_SAS_TOKEN")
self.az_container_name: str = os.getenv("TEST_AZURE_CONTAINER_NAME")
self.aws_key_id: str = os.getenv("TEST_AWS_KEY_ID")
self.aws_secret_key: str = os.getenv("TEST_AWS_SECRET_KEY")
self.aws_bucket_name: str = os.getenv("TEST_AWS_BUCKET_NAME")
self.aws_key = (os.getenv("AWS_ACCESS_KEY_ID"), os.getenv("AWS_SECRET_ACCESS_KEY"))
self.aws_region_name = os.getenv("AWS_DEFAULT_REGION")
self.gc_project_id: str = os.getenv("TEST_GCP_PROJECT_ID")
self.gc_bucket_name: str = os.getenv("TEST_GCP_BUCKET_NAME")
self.gc_location: str = os.getenv("TEST_GCP_DATA_LOCATION")
Expand Down Expand Up @@ -343,55 +342,59 @@ def test_gcs_create_aws_transfer(self):
bucket = storage_client.get_bucket(self.gc_bucket_name)
gc_blob = bucket.blob(blob_name)

try:
# Create client for working with AWS storage bucket
s3 = boto3.resource("s3", aws_access_key_id=self.aws_key_id, aws_secret_access_key=self.aws_secret_key)
aws_blob = s3.Object(self.aws_bucket_name, blob_name)
aws_blob.put(Body=self.data)

# Test transfer where no data is found, because modified date is not between dates
success, objects_count = gcs_create_aws_transfer(
aws_key=(self.aws_key_id, self.aws_secret_key),
aws_bucket=self.aws_bucket_name,
include_prefixes=[blob_name],
gc_project_id=self.gc_project_id,
gc_bucket_dst_uri=f"gs://{self.gc_bucket_name}",
description=f"Test AWS to Google Cloud Storage Transfer " f"{pendulum.now('UTC').to_datetime_string()}",
last_modified_before=pendulum.datetime(2021, 1, 1),
)
# Check that transfer was successful, but no objects were transferred
self.assertTrue(success)
self.assertEqual(0, objects_count)

# Transfer data
success, objects_count = gcs_create_aws_transfer(
aws_key=(self.aws_key_id, self.aws_secret_key),
aws_bucket=self.aws_bucket_name,
include_prefixes=[blob_name],
gc_project_id=self.gc_project_id,
gc_bucket_dst_uri=f"gs://{self.gc_bucket_name}",
description=f"Test AWS to Google Cloud Storage Transfer " f"{pendulum.now('UTC').to_datetime_string()}",
last_modified_since=pendulum.datetime(2021, 1, 1),
last_modified_before=pendulum.now("UTC") + timedelta(days=1),
)
# Check that transfer was successful and 1 object was transferred
self.assertTrue(success)
self.assertEqual(1, objects_count)
# Create bucket and dataset for use in first and second run
with aws_bucket_test_env(prefix="gcs", region_name=self.aws_region_name) as aws_bucket_name:
try:
# Create client for working with AWS storage bucket
s3 = boto3.resource("s3")
aws_blob = s3.Object(aws_bucket_name, blob_name)
aws_blob.put(Body=self.data)

# Test transfer where no data is found, because modified date is not between dates
success, objects_count = gcs_create_aws_transfer(
aws_key=self.aws_key,
aws_bucket=aws_bucket_name,
include_prefixes=[blob_name],
gc_project_id=self.gc_project_id,
gc_bucket_dst_uri=f"gs://{self.gc_bucket_name}",
description=f"Test AWS to Google Cloud Storage Transfer "
f"{pendulum.now('UTC').to_datetime_string()}",
last_modified_before=pendulum.datetime(2021, 1, 1),
)
# Check that transfer was successful, but no objects were transferred
self.assertTrue(success)
self.assertEqual(0, objects_count)

# Transfer data
success, objects_count = gcs_create_aws_transfer(
aws_key=self.aws_key,
aws_bucket=aws_bucket_name,
include_prefixes=[blob_name],
gc_project_id=self.gc_project_id,
gc_bucket_dst_uri=f"gs://{self.gc_bucket_name}",
description=f"Test AWS to Google Cloud Storage Transfer "
f"{pendulum.now('UTC').to_datetime_string()}",
last_modified_since=pendulum.datetime(2021, 1, 1),
last_modified_before=pendulum.now("UTC") + timedelta(days=1),
)
# Check that transfer was successful and 1 object was transferred
self.assertTrue(success)
self.assertEqual(1, objects_count)

# Check that blob exists
self.assertTrue(gc_blob.exists())
# Check that blob exists
self.assertTrue(gc_blob.exists())

# Check that blob has expected crc32c token
gc_blob.update()
self.assertEqual(gc_blob.crc32c, self.expected_crc32c)
finally:
# Delete file on Google Cloud
if gc_blob.exists():
gc_blob.delete()
# Check that blob has expected crc32c token
gc_blob.update()
self.assertEqual(gc_blob.crc32c, self.expected_crc32c)
finally:
# Delete file on Google Cloud
if gc_blob.exists():
gc_blob.delete()

# Delete file on AWS
if aws_blob is not None:
aws_blob.delete()
# Delete file on AWS
if aws_blob is not None:
aws_blob.delete()

def test_gcs_delete_bucket_dir(self):
runner = CliRunner()
Expand Down
12 changes: 6 additions & 6 deletions tests/observatory/platform/test_observatory_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ def test_create_dagrun(self):
with env.create_dag_run(dag, second_execution_date):
# Test DAG Run is set and has frozen start date
self.assertIsNotNone(env.dag_run)
self.assertEqual(second_start_date.date(), env.dag_run.start_date.date())
self.assertEqual(second_start_date, env.dag_run.start_date)

ti2 = env.run_task(telescope.check_dependencies.__name__)
self.assertEqual("success", ti2.state)
Expand All @@ -349,20 +349,20 @@ def test_create_dagrun(self):
env.add_connection(conn)

# First DAG Run
with env.create_dag_run(dag, first_execution_date, freeze=False):
with env.create_dag_run(dag, first_execution_date):
# Test DAG Run is set and has today as start date
self.assertIsNotNone(env.dag_run)
self.assertEqual(pendulum.now("UTC").date(), env.dag_run.start_date.date())
self.assertEqual(first_start_date, env.dag_run.start_date)

ti1 = env.run_task(telescope.check_dependencies.__name__)
self.assertEqual("success", ti1.state)
self.assertIsNone(ti1.previous_ti)

# Second DAG Run
with env.create_dag_run(dag, second_execution_date, freeze=False):
with env.create_dag_run(dag, second_execution_date):
# Test DAG Run is set and has today as start date
self.assertIsNotNone(env.dag_run)
self.assertEqual(pendulum.now("UTC").date(), env.dag_run.start_date.date())
self.assertEqual(second_start_date, env.dag_run.start_date)

ti2 = env.run_task(telescope.check_dependencies.__name__)
self.assertEqual("success", ti2.state)
Expand All @@ -380,7 +380,7 @@ def test_create_dag_run_timedelta(self):
with env.create():
with env.create_dag_run(dag, execution_date):
self.assertIsNotNone(env.dag_run)
self.assertEqual(expected_dag_date.date(), env.dag_run.start_date.date())
self.assertEqual(expected_dag_date, env.dag_run.start_date)


class TestObservatoryTestCase(unittest.TestCase):
Expand Down
115 changes: 59 additions & 56 deletions tests/observatory/platform/workflows/test_vm_create.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from unittest.mock import PropertyMock, patch

import pendulum
import time_machine
from airflow.models import XCom
from airflow.models.connection import Connection
from airflow.utils.session import provide_session
Expand Down Expand Up @@ -158,30 +159,31 @@ def test_workflow_vm_already_on(self, m_tapi, m_list_workspace_vars):
execution_date = pendulum.datetime(2021, 1, 1)
self.setup_env(env)

with env.create_dag_run(dag, execution_date):
# check dependencies
ti = env.run_task(workflow.check_dependencies.__name__)
self.assertEqual(ti.state, State.SUCCESS)
with env.create_dag_run(dag, execution_date) as dag_run:
with time_machine.travel(dag_run.start_date, tick=True):
# check dependencies
ti = env.run_task(workflow.check_dependencies.__name__)
self.assertEqual(ti.state, State.SUCCESS)

# check vm state
ti = env.run_task(workflow.check_vm_state.__name__)
self.assertEqual(ti.state, State.SUCCESS)
# check vm state
ti = env.run_task(workflow.check_vm_state.__name__)
self.assertEqual(ti.state, State.SUCCESS)

# update terraform variable
ti = env.run_task(workflow.update_terraform_variable.__name__)
self.assertEqual(ti.state, State.SKIPPED)
# update terraform variable
ti = env.run_task(workflow.update_terraform_variable.__name__)
self.assertEqual(ti.state, State.SKIPPED)

# run terraform
ti = env.run_task(workflow.run_terraform.__name__)
self.assertEqual(ti.state, State.SKIPPED)
# run terraform
ti = env.run_task(workflow.run_terraform.__name__)
self.assertEqual(ti.state, State.SKIPPED)

# check run status
ti = env.run_task(workflow.check_run_status.__name__)
self.assertEqual(ti.state, State.SKIPPED)
# check run status
ti = env.run_task(workflow.check_run_status.__name__)
self.assertEqual(ti.state, State.SKIPPED)

# cleanup
ti = env.run_task(workflow.cleanup.__name__)
self.assertEqual(ti.state, State.SUCCESS)
# cleanup
ti = env.run_task(workflow.cleanup.__name__)
self.assertEqual(ti.state, State.SUCCESS)

@patch("observatory.platform.workflows.vm_workflow.send_slack_msg")
@patch("observatory.platform.workflows.vm_workflow.TerraformApi.get_run_details")
Expand Down Expand Up @@ -217,40 +219,41 @@ def test_workflow_vm_create(
execution_date = pendulum.datetime(2021, 1, 1)
self.setup_env(env)

with env.create_dag_run(dag, execution_date):
# check dependencies
ti = env.run_task(workflow.check_dependencies.__name__)
self.assertEqual(ti.state, State.SUCCESS)

# check vm state
ti = env.run_task(workflow.check_vm_state.__name__)
self.assertEqual(ti.state, State.SUCCESS)

# update terraform variable
ti = env.run_task(workflow.update_terraform_variable.__name__)
self.assertEqual(m_update.call_count, 1)
call_args, _ = m_update.call_args
self.assertEqual(call_args[0], vm_tf)
self.assertEqual(call_args[1], "workspace")
self.assertEqual(ti.state, State.SUCCESS)

# run terraform
ti = env.run_task(workflow.run_terraform.__name__)
self.assertEqual(ti.state, State.SUCCESS)
self.assertEqual(m_create_run.call_count, 1)

# check run status
ti = env.run_task(workflow.check_run_status.__name__)
self.assertEqual(ti.state, State.SUCCESS)
self.assertEqual(m_send_slack_msg.call_count, 1)

# cleanup
ti = env.run_task(workflow.cleanup.__name__)
self.assertEqual(ti.state, State.SUCCESS)
self.assertEqual(
xcom_count(
execution_date=execution_date,
dag_ids=workflow.dag_id,
),
3,
)
with env.create_dag_run(dag, execution_date) as dag_run:
with time_machine.travel(dag_run.start_date, tick=True):
# check dependencies
ti = env.run_task(workflow.check_dependencies.__name__)
self.assertEqual(ti.state, State.SUCCESS)

# check vm state
ti = env.run_task(workflow.check_vm_state.__name__)
self.assertEqual(ti.state, State.SUCCESS)

# update terraform variable
ti = env.run_task(workflow.update_terraform_variable.__name__)
self.assertEqual(m_update.call_count, 1)
call_args, _ = m_update.call_args
self.assertEqual(call_args[0], vm_tf)
self.assertEqual(call_args[1], "workspace")
self.assertEqual(ti.state, State.SUCCESS)

# run terraform
ti = env.run_task(workflow.run_terraform.__name__)
self.assertEqual(ti.state, State.SUCCESS)
self.assertEqual(m_create_run.call_count, 1)

# check run status
ti = env.run_task(workflow.check_run_status.__name__)
self.assertEqual(ti.state, State.SUCCESS)
self.assertEqual(m_send_slack_msg.call_count, 1)

# cleanup
ti = env.run_task(workflow.cleanup.__name__)
self.assertEqual(ti.state, State.SUCCESS)
self.assertEqual(
xcom_count(
execution_date=execution_date,
dag_ids=workflow.dag_id,
),
3,
)
Loading

0 comments on commit 7689fd9

Please sign in to comment.