Skip to content

Commit

Permalink
FS-3535/FS-4340: implementation of the SQS extended client for the as…
Browse files Browse the repository at this point in the history
…sessment store (#475)

* FS-3535: implementation of the SQS extended client for the assessment store

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* FS-3535: implementation of the SQS extended client for the assessment store

* FS-3535: addressing review comments and adding util changes

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>

merging since got the approval
  • Loading branch information
nuwan-samarasinghe committed Jun 5, 2024
1 parent 14a555d commit 1ffe025
Show file tree
Hide file tree
Showing 15 changed files with 943 additions and 216 deletions.
76 changes: 0 additions & 76 deletions _helpers/import_application.py

This file was deleted.

32 changes: 32 additions & 0 deletions _helpers/task_executer_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import json
import threading

from config.mappings.assessment_mapping_fund_round import fund_round_data_key_mappings
from db.queries import bulk_insert_application_record
from fsd_utils.sqs_scheduler.task_executer_service import TaskExecutorService


class AssessmentTaskExecutorService(TaskExecutorService):
def message_executor(self, message):
"""Processing the message in a separate worker thread and this will call
the GOV notify service to send emails :param message Json message."""
current_thread = threading.current_thread()
thread_id = f"[{current_thread.name}:{current_thread.ident}]"
self.logger.info(f"[{thread_id}] Notification Triggered")
massage_id = message["sqs"]["MessageId"]
try:
application_json = json.loads(message["s3"])
application_json_list = []
fund_round_shortname = "".join(application_json["reference"].split("-")[:2])
# Check if the import config exists for the application
if fund_round_shortname not in fund_round_data_key_mappings.keys():
self.logger.info.warning(f"Missing import config for the application: {application_json['reference']}.")
return message

application_json_list.append(application_json)
bulk_insert_application_record(application_json_list, is_json=True)
self.logger.info(f"{thread_id} Processed the message: {massage_id}")
return message

except Exception as e:
self.logger.error(f"An error occurred while processing the message {massage_id}", e)
36 changes: 25 additions & 11 deletions app.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import connexion
from _helpers.import_application import import_applications_from_queue
from _helpers.task_executer_service import TaskExecutorService
from apscheduler.schedulers.background import BackgroundScheduler
from config import Config
from connexion.resolver import MethodViewResolver
from flask import Flask
from fsd_utils import init_sentry
from fsd_utils.healthchecks.checkers import DbChecker
from fsd_utils.healthchecks.checkers import FlaskRunningChecker
from fsd_utils.healthchecks.healthcheck import Healthcheck
from fsd_utils.logging import logging
from fsd_utils.sqs_scheduler.context_aware_executor import ContextAwareExecutor
from fsd_utils.sqs_scheduler.scheduler_service import scheduler_executor
from openapi.utils import get_bundled_specs


Expand Down Expand Up @@ -48,24 +51,35 @@ def create_app() -> Flask:
health.add_check(FlaskRunningChecker())
health.add_check(DbChecker(db))

# TODO: Flask-Apscheduler is a short-term solution for processing messages
# on queue periodically. Will move to SNS trigger after the AWS migration

executor = ContextAwareExecutor(
max_workers=Config.TASK_EXECUTOR_MAX_THREAD, thread_name_prefix="NotifTask", flask_app=flask_app
)
# Configure Task Executor service
task_executor_service = TaskExecutorService(
flask_app=flask_app,
executor=executor,
s3_bucket=Config.AWS_MSG_BUCKET_NAME,
sqs_primary_url=Config.AWS_SQS_IMPORT_APP_PRIMARY_QUEUE_URL,
task_executor_max_thread=Config.TASK_EXECUTOR_MAX_THREAD,
sqs_batch_size=Config.SQS_BATCH_SIZE,
visibility_time=Config.SQS_VISIBILITY_TIME,
sqs_wait_time=Config.SQS_WAIT_TIME,
region_name=Config.AWS_REGION,
endpoint_url_override=Config.AWS_ENDPOINT_OVERRIDE,
aws_access_key_id=Config.AWS_SQS_ACCESS_KEY_ID,
aws_secret_access_key=Config.AWS_SQS_ACCESS_KEY_ID,
)
# Configurations for Flask-Apscheduler
scheduler = BackgroundScheduler()
scheduler.add_job(
func=import_applications_from_queue,
func=scheduler_executor,
trigger="interval",
seconds=flask_app.config["SQS_RECEIVE_MESSAGE_CYCLE_TIME"], # Run the job every 'x' seconds
kwargs={"task_executor_service": task_executor_service},
)
scheduler.start()

try:
# To keep the main thread alive (scheduler to run only on main thread)
return flask_app
except Exception:
# shutdown if execption occurs when returning app
return scheduler.shutdown()
return flask_app


app = create_app()
37 changes: 16 additions & 21 deletions config/envs/default.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
"""Flask configuration."""
import json
from os import environ
from pathlib import Path

Expand Down Expand Up @@ -51,26 +50,22 @@ class DefaultConfig:
# ---------------
# AWS Config
# ---------------
if "PRIMARY_QUEUE_URL" in environ:
AWS_REGION = environ.get("AWS_REGION")
AWS_SQS_IMPORT_APP_PRIMARY_QUEUE_URL = environ.get("PRIMARY_QUEUE_URL")
AWS_SQS_IMPORT_APP_SECONDARY_QUEUE_URL = environ.get("DEAD_LETTER_QUEUE_URL")
elif "VCAP_SERVICES" in environ:
vcap_services = json.loads(environ["VCAP_SERVICES"])
if "aws-sqs-queue" in vcap_services:
sqs_credentials = vcap_services["aws-sqs-queue"][0]["credentials"]
AWS_REGION = sqs_credentials["aws_region"]
AWS_ACCESS_KEY_ID = sqs_credentials["aws_access_key_id"]
AWS_SECRET_ACCESS_KEY = sqs_credentials["aws_secret_access_key"]
AWS_SQS_IMPORT_APP_PRIMARY_QUEUE_URL = sqs_credentials["primary_queue_url"]
AWS_SQS_IMPORT_APP_SECONDARY_QUEUE_URL = sqs_credentials["secondary_queue_url"]
else:
AWS_ACCESS_KEY_ID = environ.get("AWS_ACCESS_KEY_ID")
AWS_SECRET_ACCESS_KEY = environ.get("AWS_SECRET_ACCESS_KEY")
AWS_REGION = environ.get("AWS_REGION")
AWS_SQS_IMPORT_APP_PRIMARY_QUEUE_URL = environ.get("AWS_SQS_IMPORT_APP_PRIMARY_QUEUE_URL")
AWS_SQS_IMPORT_APP_SECONDARY_QUEUE_URL = environ.get("AWS_SQS_IMPORT_APP_SECONDARY_QUEUE_URL")
AWS_DLQ_MAX_RECIEVE_COUNT = int(environ.get("AWS_DLQ_MAX_RECIEVE_COUNT", 3))
AWS_REGION = environ.get("AWS_REGION")
AWS_SQS_IMPORT_APP_PRIMARY_QUEUE_URL = environ.get("PRIMARY_QUEUE_URL")
AWS_SQS_IMPORT_APP_SECONDARY_QUEUE_URL = environ.get("DEAD_LETTER_QUEUE_URL")
AWS_SQS_ACCESS_KEY_ID = AWS_ACCESS_KEY_ID = environ.get("AWS_ACCESS_KEY_ID")
AWS_SQS_SECRET_ACCESS_KEY = AWS_SECRET_ACCESS_KEY = environ.get("AWS_SECRET_ACCESS_KEY")
AWS_SQS_REGION = AWS_REGION = environ.get("AWS_REGION")
AWS_ENDPOINT_OVERRIDE = environ.get("AWS_ENDPOINT_OVERRIDE")

# ---------------
# S3 Config
# ---------------
AWS_MSG_BUCKET_NAME = environ.get("AWS_MSG_BUCKET_NAME")
# ---------------
# Task Executor Config
# ---------------
TASK_EXECUTOR_MAX_THREAD = int(environ.get("TASK_EXECUTOR_MAX_THREAD", 5)) # max amount of threads

# ---------------
# SQS Config
Expand Down
23 changes: 20 additions & 3 deletions config/envs/unit_testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,25 @@ class UnitTestingConfig(DefaultConfig):

SQLALCHEMY_DATABASE_URI = DefaultConfig.SQLALCHEMY_DATABASE_URI + "_UNIT_TEST"

AWS_ACCESS_KEY_ID = "test_access_id"
AWS_SECRET_ACCESS_KEY = "test_secret_key" # pragma: allowlist secret
AWS_REGION = "eu-west-2"
# ---------------
# Task Executor Config
# ---------------
TASK_EXECUTOR_MAX_THREAD = 5 # max amount of threads
# ---------------
# S3 Config
# ---------------
AWS_MSG_BUCKET_NAME = "fsd-notification-bucket"

AWS_SQS_ACCESS_KEY_ID = AWS_ACCESS_KEY_ID = "test_access_id"
AWS_SQS_SECRET_ACCESS_KEY = AWS_SECRET_ACCESS_KEY = "test_secret_key" # pragma: allowlist secret
AWS_SQS_REGION = AWS_REGION = "eu-west-2"
AWS_SQS_IMPORT_APP_PRIMARY_QUEUE_URL = "test_primary_url"
AWS_SQS_IMPORT_APP_SECONDARY_QUEUE_URL = "test_secondary_url"

# ---------------
# SQS Config
# ---------------
SQS_WAIT_TIME = 2 # max time to wait (in sec) before returning
SQS_BATCH_SIZE = 10 # MaxNumber Of Messages to process
SQS_VISIBILITY_TIME = 1 # time for message to temporarily invisible to others (in sec)
SQS_RECEIVE_MESSAGE_CYCLE_TIME = 5 # Run the job every 'x' seconds
2 changes: 2 additions & 0 deletions copilot/fsd-assessment-store/manifest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ variables:
FLASK_ENV: ${COPILOT_ENVIRONMENT_NAME}
APPLICATION_STORE_API_HOST: http://fsd-application-store:8080
ACCOUNT_STORE_API_HOST: http://fsd-account-store:8080
AWS_MSG_BUCKET_NAME:
from_cfn: ${COPILOT_APPLICATION_NAME}-${COPILOT_ENVIRONMENT_NAME}-MessageBucket
PRIMARY_QUEUE_URL:
from_cfn: ${COPILOT_APPLICATION_NAME}-${COPILOT_ENVIRONMENT_NAME}-AssessmentImportQueueURL
DEAD_LETTER_QUEUE_URL:
Expand Down
1 change: 1 addition & 0 deletions db/queries/assessment_records/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ def bulk_insert_application_record(
except exc.SQLAlchemyError as e:
db.session.rollback()
print(f"Error occurred while inserting application {row['application_id']}, error: {e}")
raise e

print("Inserted application_ids (i.e. application rows) :" f" {[row['application_id'] for row in rows]}")
return rows
Expand Down
3 changes: 2 additions & 1 deletion requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ flupy==1.2.0
# via
# -r requirements.txt
# alembic-utils
funding-service-design-utils==2.0.42
funding-service-design-utils==2.0.51
# via -r requirements.txt
gunicorn==20.1.0
# via
Expand Down Expand Up @@ -321,6 +321,7 @@ pytest==7.4.3
# pytest-env
# pytest-flask
# pytest-mock
moto[s3,sqs]==5.0.7
pytest-env==0.8.1
# via -r requirements-dev.in
pytest-flask==1.3.0
Expand Down
2 changes: 1 addition & 1 deletion requirements.in
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#-----------------------------------
# FSD Utils
#-----------------------------------
funding-service-design-utils>=2.0.34,<2.1.0
funding-service-design-utils>=2.0.51,<2.1.0

requests

Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ flipper-client==1.3.1
# via funding-service-design-utils
flupy==1.2.0
# via alembic-utils
funding-service-design-utils==2.0.42
funding-service-design-utils==2.0.51
# via -r requirements.in
gunicorn==20.1.0
# via funding-service-design-utils
Expand Down
1 change: 0 additions & 1 deletion services/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +0,0 @@
from .aws import _SQS_CLIENT # noqa
18 changes: 0 additions & 18 deletions services/aws.py

This file was deleted.

Loading

0 comments on commit 1ffe025

Please sign in to comment.