Skip to content

Commit

Permalink
PR revisions
Browse files Browse the repository at this point in the history
  • Loading branch information
ghukill committed Sep 1, 2023
1 parent 26295d6 commit 7a7f51c
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 91 deletions.
180 changes: 97 additions & 83 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# ruff: noqa: G004
import datetime
import logging
import os
import urllib
from contextlib import contextmanager
from importlib import reload

import boto3
Expand All @@ -19,6 +19,24 @@
ORIGINAL_ENV = os.environ.copy()


@contextmanager
def temp_environ(environ):
"""Provide a temporary environ with automatic teardown.
Tests that use fixtures, that use this, are all invoked as part of the 'yield' step
below. Therefore, the environment is reset via 'finally' regardless of their
success / fail / error results.
"""
previous_environ = os.environ.copy()
# ruff: noqa: B003
os.environ = environ
try:
yield
finally:
# ruff: noqa: B003
os.environ = previous_environ


@pytest.fixture(autouse=True)
def _test_env():
os.environ["AWS_ACCESS_KEY_ID"] = "testing"
Expand Down Expand Up @@ -163,112 +181,101 @@ def stubbed_bursar_sfn_client():
yield sfn


@pytest.fixture
def _set_integration_tests_env_vars() -> None:
"""Fixture to set os environment variables by retrieving data from AWS.
Because mocked AWS credentials are set in the testing environment, this temporary
reinstating of the calling environment (e.g. developer's machine) when the tests
began is required. Once data about deployed assets, e.g. lambda function URL and
deployed environment variables, is retrieved, the testing environment is used again.
"""
# backup current, test env
test_env = os.environ.copy()

# set os.environ as original env before testing framework
# ruff: noqa: B003
os.environ = ORIGINAL_ENV

try:
if not os.getenv("WORKSPACE"):
# ruff: noqa: TRY301, TRY002, TRY003, EM101
raise Exception("WORKSPACE env var must be set for integration tests")

# get lambda configurations
lambda_client = boto3.client("lambda")
lambda_function_config = lambda_client.get_function_configuration(
FunctionName=f"alma-webhook-lambdas-{os.getenv('WORKSPACE').lower()}"
)
lambda_function_env_vars = lambda_function_config["Environment"]["Variables"]
lambda_function_url = lambda_client.get_function_url_config(
FunctionName=f"alma-webhook-lambdas-{os.getenv('WORKSPACE').lower()}"
)["FunctionUrl"]

# get values from parameter store
ssm_client = boto3.client("ssm")
ssm_client.get_parameter(Name="/apps/almahook/alma-pod-export-job-name")[
"Parameter"
]["Value"]
ppod_state_machine_arn = ssm_client.get_parameter(
Name="/apps/almahook/ppod-state-machine-arn"
)["Parameter"]["Value"]
timdex_state_machine_arn = ssm_client.get_parameter(
Name="/apps/almahook/timdex-ingest-state-machine-arn"
)["Parameter"]["Value"]

except:
logging.exception("could not retrieve lambda configurations via boto3")
raise
finally:
# reset testing env vars
os.environ = test_env
def set_env_vars_from_deployed_lambda_configurations():
env = os.getenv("WORKSPACE").lower()
lambda_client = boto3.client("lambda")
lambda_function_config = lambda_client.get_function_configuration(
FunctionName=f"alma-webhook-lambdas-{env}"
)
lambda_function_env_vars = lambda_function_config["Environment"]["Variables"]
lambda_function_url = lambda_client.get_function_url_config(
FunctionName=f"alma-webhook-lambdas-{env}"
)["FunctionUrl"]

# set env vars
os.environ["LAMBDA_FUNCTION_URL"] = lambda_function_url
os.environ["ALMA_CHALLENGE_SECRET"] = lambda_function_env_vars[
"ALMA_CHALLENGE_SECRET"
]
os.environ["ALMA_POD_EXPORT_JOB_NAME"] = "Publishing Platform Job PPOD EXPORT to Dev1"


def set_env_vars_from_ssm_parameters():
ssm_client = boto3.client("ssm")
ppod_state_machine_arn = ssm_client.get_parameter(
Name="/apps/almahook/ppod-state-machine-arn"
)["Parameter"]["Value"]
timdex_state_machine_arn = ssm_client.get_parameter(
Name="/apps/almahook/timdex-ingest-state-machine-arn"
)["Parameter"]["Value"]

os.environ["PPOD_STATE_MACHINE_ARN"] = ppod_state_machine_arn
os.environ["VALID_POD_EXPORT_DATE"] = "2023-08-15" # matches fixture date
os.environ["TIMDEX_STATE_MACHINE_ARN"] = timdex_state_machine_arn
os.environ["VALID_TIMDEX_EXPORT_DATE"] = "2023-08-15" # matches fixture date


@pytest.fixture
def _integration_tests_s3_fixtures(_set_integration_tests_env_vars) -> None:
"""Upload integration test fixtures to S3, if they don't already exist.
def _set_integration_test_environ() -> None:
"""Fixture to bypass the auto used fixture '_test_env' for integration tests.
Because mocked AWS credentials are set in the auto used testing environment, this
temporarily bypasses the testing environment and uses the calling environment
(e.g. developer machine) such that AWS credentials can be used for integration tests.
When any test that uses this fixture is finished, the testing environment is
automatically reinstated.
"""
with temp_environ(ORIGINAL_ENV):
if os.getenv("WORKSPACE") != "dev":
# ruff: noqa: TRY301, TRY002, TRY003, EM101
raise Exception("WORKSPACE env var must be 'dev' for integration tests")

os.environ[
"ALMA_POD_EXPORT_JOB_NAME"
] = "Publishing Platform Job PPOD EXPORT to Dev1"
os.environ["VALID_POD_EXPORT_DATE"] = "2023-08-15" # matches fixture date
os.environ["VALID_TIMDEX_EXPORT_DATE"] = "2023-08-15" # matches fixture date

set_env_vars_from_deployed_lambda_configurations()

set_env_vars_from_ssm_parameters()

# required to allow tests to run in this contextmanager
yield


@pytest.fixture
def _integration_tests_s3_fixtures(_set_integration_test_environ) -> None:
"""Upload integration test fixtures to S3 if they don't already exist.
These s3 files are used by deployed assets during integration tests. This fixture
relies on _set_integration_tests_env_vars as a dependency to ensure AWS credentials
have not been clobbered by testing env vars.
"""
fixtures = [
(
"dev-sftp-shared",
"exlibris/pod/POD_ALMA_EXPORT_20230815_220844[016]_new.tar.gz",
),
(
"dev-sftp-shared",
"exlibris/timdex/TIMDEX_ALMA_EXPORT_DAILY_20230815_220844[016]_new.tar.gz",
),
]
s3 = boto3.client("s3")

def check_and_upload_file(bucket, key):
for bucket, key in fixtures:
try:
s3.head_object(Bucket=bucket, Key=key + "foo")
logging.info(f"File s3://{bucket}/{key} already exists, nothing to do!")
except ClientError as e:
error_code = int(e.response["Error"]["Code"])
# ruff: noqa: PLR2004
if error_code == 404:
logging.info(f"File s3://{bucket}/{key} not found. Uploading...")
local_file_path = os.path.join("tests", "fixtures", os.path.basename(key))
if os.path.exists(local_file_path):
s3.upload_file(local_file_path, bucket, key)
logging.info(f"File uploaded to s3://{bucket}/{key}")
else:
msg = f"Fixture file {local_file_path} does not exist."
logging.exception(msg)
msg = f"Local file '{local_file_path}' does not exist."
raise FileNotFoundError(msg) from None
else:
raise

# Specify your bucket and key
fixtures = [
(
"dev-sftp-shared",
"exlibris/pod/POD_ALMA_EXPORT_20230815_220844[016]_new.tar.gz",
),
(
"dev-sftp-shared",
"exlibris/timdex/TIMDEX_ALMA_EXPORT_DAILY_20230815_220844[016]_new.tar.gz",
),
]
for bucket, key in fixtures:
check_and_upload_file(bucket, key)


@pytest.fixture
def sample_webhook_post_body() -> dict:
Expand Down Expand Up @@ -316,12 +323,8 @@ def sample_timdex_export_job_end_webhook_post_body() -> dict:
}


def pytest_collection_modifyitems(config, items):
"""Hook that is run after all tests collected, which allows for modification pre-run.
https://docs.pytest.org/en/7.1.x/reference/reference.html#pytest.hookspec.pytest_collection_modifyitems
"""
# skip integration tests if WORKSPACE is not 'dev'
def _skip_integration_tests_when_not_dev_workspace(items):
"""Skip integration tests if WORKSPACE is not 'dev'."""
allowed_test_environments = ["dev"]
for item in items:
if (
Expand All @@ -334,3 +337,14 @@ def pytest_collection_modifyitems(config, items):
% allowed_test_environments
)
)


def pytest_collection_modifyitems(config, items):
"""Pytest hook that is run after all tests collected.
https://docs.pytest.org/en/7.1.x/reference/reference.html#pytest.hookspec.pytest_collection_modifyitems
It is preferred that any actions needed performed by this hook will have a dedicated
function, keeping this hook runner relatively simple.
"""
_skip_integration_tests_when_not_dev_workspace(items)
19 changes: 11 additions & 8 deletions tests/integration/test_webhook.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@
)


def get_step_function_invocation(step_function_arn: str, timeout: int = 15) -> dict:
"""Retrieve RUNNING StepFunction invocation, parse input JSON for first event"""
def get_step_function_invocation_results(
step_function_arn: str,
timeout: int = 15,
) -> dict:
"""Retrieve results from RUNNING StepFunction invocation and parse 0th event."""
sfn_client = boto3.client("stepfunctions")
t0 = time.time()
while True:
Expand Down Expand Up @@ -56,7 +59,7 @@ def get_step_function_invocation(step_function_arn: str, timeout: int = 15) -> d


@pytest.mark.integration()
@pytest.mark.usefixtures("_set_integration_tests_env_vars")
@pytest.mark.usefixtures("_set_integration_test_environ")
def test_integration_webhook_handles_get_request_success():
"""Test that deployed lambda can receive GET requsts"""
challenge_phrase = "challenge-accepted"
Expand All @@ -65,7 +68,7 @@ def test_integration_webhook_handles_get_request_success():


@pytest.mark.integration()
@pytest.mark.usefixtures("_set_integration_tests_env_vars")
@pytest.mark.usefixtures("_set_integration_test_environ")
def test_integration_webhook_handles_post_request_but_no_job_type_match():
"""Test that deployed lambda can receive POST requests and skips unknown job type"""
payload = {"action": "JOB_END", "job_instance": {"name": "Unhandled Job Name Here"}}
Expand All @@ -77,7 +80,7 @@ def test_integration_webhook_handles_post_request_but_no_job_type_match():


@pytest.mark.integration()
@pytest.mark.usefixtures("_set_integration_tests_env_vars")
@pytest.mark.usefixtures("_set_integration_test_environ")
@pytest.mark.usefixtures("_integration_tests_s3_fixtures")
def test_integration_webhook_handles_pod_step_function_trigger(
sample_pod_export_job_end_webhook_post_body,
Expand All @@ -98,15 +101,15 @@ def test_integration_webhook_handles_pod_step_function_trigger(

# assert StepFunction running and with expected payload
step_function_arn = os.getenv("PPOD_STATE_MACHINE_ARN")
step_function_input_json = get_step_function_invocation(step_function_arn)
step_function_input_json = get_step_function_invocation_results(step_function_arn)
assert (
step_function_input_json["filename-prefix"]
== "exlibris/pod/POD_ALMA_EXPORT_20230815"
)


@pytest.mark.integration()
@pytest.mark.usefixtures("_set_integration_tests_env_vars")
@pytest.mark.usefixtures("_set_integration_test_environ")
@pytest.mark.usefixtures("_integration_tests_s3_fixtures")
def test_integration_webhook_handles_timdex_step_function_trigger(
sample_timdex_export_job_end_webhook_post_body,
Expand All @@ -127,7 +130,7 @@ def test_integration_webhook_handles_timdex_step_function_trigger(

# assert StepFunction running and with expected payload
step_function_arn = os.getenv("TIMDEX_STATE_MACHINE_ARN")
step_function_input_json = get_step_function_invocation(step_function_arn)
step_function_input_json = get_step_function_invocation_results(step_function_arn)
assert step_function_input_json["next-step"] == "transform"
assert step_function_input_json["run-date"] == "2023-08-15"
assert step_function_input_json["run-type"] == "daily"
Expand Down

0 comments on commit 7a7f51c

Please sign in to comment.