From 38bc18283c833a01615a4a227f1ffc2c53be6072 Mon Sep 17 00:00:00 2001 From: Clemens Wolff Date: Sun, 3 Jun 2018 21:53:27 -0400 Subject: [PATCH 01/19] Turn jobs into apis --- .env | 1 - docker-compose.yml | 30 +++--- docker/job/Dockerfile | 20 ---- docker/job/healthcheck.sh | 8 -- ... api-send-outbound-emails-autoscaler.yaml} | 4 +- ... api-send-outbound-emails-deployment.yaml} | 18 ++-- ... api-store-inbound-emails-autoscaler.yaml} | 4 +- ... api-store-inbound-emails-deployment.yaml} | 18 ++-- ...ore-written-client-emails-autoscaler.yaml} | 4 +- ...ore-written-client-emails-deployment.yaml} | 18 ++-- .../api/send_outbound_emails.py | 17 ++++ .../{jobs => api}/store_inbound_emails.py | 16 ++-- .../store_written_client_emails.py | 17 ++-- opwen_email_server/azure_constants.py | 3 - opwen_email_server/jobs/__init__.py | 0 .../jobs/send_outbound_emails.py | 21 ---- opwen_email_server/services/queue_consumer.py | 87 ----------------- .../static/send-outbound-emails.yaml | 26 +++++ .../static/store-inbound-emails.yaml | 26 +++++ .../static/store-written-emails.yaml | 26 +++++ .../api/test_send_outbound_emails.py | 20 ++++ .../test_store_inbound_emails.py | 17 +--- .../test_store_written_client_emails.py | 19 +--- tests/opwen_email_server/jobs/__init__.py | 0 .../jobs/test_send_outbound_emails.py | 31 ------ .../services/test_queue_consumer.py | 95 ------------------- 26 files changed, 183 insertions(+), 363 deletions(-) delete mode 100644 docker/job/Dockerfile delete mode 100755 docker/job/healthcheck.sh rename helm/templates/{job-store-inbound-emails-autoscaler.yaml => api-send-outbound-emails-autoscaler.yaml} (84%) rename helm/templates/{job-store-inbound-emails-deployment.yaml => api-send-outbound-emails-deployment.yaml} (84%) rename helm/templates/{job-send-outbound-emails-autoscaler.yaml => api-store-inbound-emails-autoscaler.yaml} (84%) rename helm/templates/{job-send-outbound-emails-deployment.yaml => api-store-inbound-emails-deployment.yaml} (84%) rename helm/templates/{job-store-written-client-emails-autoscaler.yaml => api-store-written-client-emails-autoscaler.yaml} (81%) rename helm/templates/{job-store-written-client-emails-deployment.yaml => api-store-written-client-emails-deployment.yaml} (83%) create mode 100644 opwen_email_server/api/send_outbound_emails.py rename opwen_email_server/{jobs => api}/store_inbound_emails.py (65%) rename opwen_email_server/{jobs => api}/store_written_client_emails.py (69%) delete mode 100644 opwen_email_server/jobs/__init__.py delete mode 100644 opwen_email_server/jobs/send_outbound_emails.py delete mode 100644 opwen_email_server/services/queue_consumer.py create mode 100644 opwen_email_server/static/send-outbound-emails.yaml create mode 100644 opwen_email_server/static/store-inbound-emails.yaml create mode 100644 opwen_email_server/static/store-written-emails.yaml create mode 100644 tests/opwen_email_server/api/test_send_outbound_emails.py rename tests/opwen_email_server/{jobs => api}/test_store_inbound_emails.py (54%) rename tests/opwen_email_server/{jobs => api}/test_store_written_client_emails.py (64%) delete mode 100644 tests/opwen_email_server/jobs/__init__.py delete mode 100644 tests/opwen_email_server/jobs/test_send_outbound_emails.py delete mode 100644 tests/opwen_email_server/services/test_queue_consumer.py diff --git a/.env b/.env index 948cd117..55fbc8b4 100644 --- a/.env +++ b/.env @@ -2,5 +2,4 @@ APP_PORT=8080 BUILD_TAG=development DOCKER_REPO=cwolff GUNICORN_WORKERS=1 -LOKOLE_QUEUE_POLL_SECONDS=30 LOKOLE_LOG_LEVEL=DEBUG diff --git a/docker-compose.yml b/docker-compose.yml index 61156a5f..54ac10bf 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -55,40 +55,40 @@ services: - ./secrets/azure.env - ./secrets/sendgrid.env - jobsendoutboundemails: - image: ${DOCKER_REPO}/opwenserver_job:${BUILD_TAG} + apisendoutboundemails: + image: ${DOCKER_REPO}/opwenserver_api:${BUILD_TAG} build: context: . - dockerfile: docker/job/Dockerfile + dockerfile: docker/api/Dockerfile environment: - JOB_NAME: opwen_email_server.jobs.send_outbound_emails - LOKOLE_QUEUE_POLL_SECONDS: ${LOKOLE_QUEUE_POLL_SECONDS} + API_NAME: opwen_email_server/static/send-outbound-emails.yaml + GUNICORN_WORKERS: ${GUNICORN_WORKERS} LOKOLE_LOG_LEVEL: ${LOKOLE_LOG_LEVEL} env_file: - ./secrets/azure.env - ./secrets/sendgrid.env - jobstoreinboundemails: - image: ${DOCKER_REPO}/opwenserver_job:${BUILD_TAG} + apistoreinboundemails: + image: ${DOCKER_REPO}/opwenserver_api:${BUILD_TAG} build: context: . - dockerfile: docker/job/Dockerfile + dockerfile: docker/api/Dockerfile environment: - JOB_NAME: opwen_email_server.jobs.store_inbound_emails - LOKOLE_QUEUE_POLL_SECONDS: ${LOKOLE_QUEUE_POLL_SECONDS} + API_NAME: opwen_email_server/static/store-inbound-emails.yaml + GUNICORN_WORKERS: ${GUNICORN_WORKERS} LOKOLE_LOG_LEVEL: ${LOKOLE_LOG_LEVEL} env_file: - ./secrets/azure.env - ./secrets/sendgrid.env - jobstorewritteclientemails: - image: ${DOCKER_REPO}/opwenserver_job:${BUILD_TAG} + apistorewritteclientemails: + image: ${DOCKER_REPO}/opwenserver_api:${BUILD_TAG} build: context: . - dockerfile: docker/job/Dockerfile + dockerfile: docker/api/Dockerfile environment: - JOB_NAME: opwen_email_server.jobs.store_written_client_emails - LOKOLE_QUEUE_POLL_SECONDS: ${LOKOLE_QUEUE_POLL_SECONDS} + JOB_NAME: opwen_email_server/static/store-written-emails.yaml + GUNICORN_WORKERS: ${GUNICORN_WORKERS} LOKOLE_LOG_LEVEL: ${LOKOLE_LOG_LEVEL} env_file: - ./secrets/azure.env diff --git a/docker/job/Dockerfile b/docker/job/Dockerfile deleted file mode 100644 index 72dbac7c..00000000 --- a/docker/job/Dockerfile +++ /dev/null @@ -1,20 +0,0 @@ -FROM python:3.6 - -ENV JOB_NAME="SET_ME" -ENV LOKOLE_QUEUE_ERROR_FILE="/app/queue_health.txt" - -ADD requirements.txt /app/requirements.txt -RUN apt-get update \ - && apt-get install -y libffi-dev libssl-dev ca-certificates \ - && pip3 --no-cache-dir -q install -U pip setuptools \ - && pip3 --no-cache-dir -q install -r /app/requirements.txt \ - && touch ${LOKOLE_QUEUE_ERROR_FILE} \ - && rm -rf /var/lib/apt/lists/* - -ADD opwen_email_server /app/opwen_email_server - -ADD docker/job/healthcheck.sh /app/healthcheck.sh - -WORKDIR /app -HEALTHCHECK CMD /app/healthcheck.sh -CMD "python3" "-m" "${JOB_NAME}" diff --git a/docker/job/healthcheck.sh b/docker/job/healthcheck.sh deleted file mode 100755 index 955568d8..00000000 --- a/docker/job/healthcheck.sh +++ /dev/null @@ -1,8 +0,0 @@ -#!/usr/bin/env sh - -if test -s "$LOKOLE_QUEUE_ERROR_FILE"; then - cat "$LOKOLE_QUEUE_ERROR_FILE" >&2 - exit 1 -fi - -exit 0 diff --git a/helm/templates/job-store-inbound-emails-autoscaler.yaml b/helm/templates/api-send-outbound-emails-autoscaler.yaml similarity index 84% rename from helm/templates/job-store-inbound-emails-autoscaler.yaml rename to helm/templates/api-send-outbound-emails-autoscaler.yaml index 91317ac5..ed84db7e 100644 --- a/helm/templates/job-store-inbound-emails-autoscaler.yaml +++ b/helm/templates/api-send-outbound-emails-autoscaler.yaml @@ -2,12 +2,12 @@ apiVersion: autoscaling/v1 kind: HorizontalPodAutoscaler metadata: creationTimestamp: null - name: jobstoreinboundemails + name: apisendoutboundemails spec: maxReplicas: {{.Values.autoscale.maxPods}} minReplicas: {{.Values.autoscale.minPods}} scaleTargetRef: apiVersion: extensions/v1beta1 kind: Deployment - name: jobstoreinboundemails + name: apisendoutboundemails targetCPUUtilizationPercentage: {{.Values.autoscale.cpuThreshold}} diff --git a/helm/templates/job-store-inbound-emails-deployment.yaml b/helm/templates/api-send-outbound-emails-deployment.yaml similarity index 84% rename from helm/templates/job-store-inbound-emails-deployment.yaml rename to helm/templates/api-send-outbound-emails-deployment.yaml index c353860e..2b39afaf 100644 --- a/helm/templates/job-store-inbound-emails-deployment.yaml +++ b/helm/templates/api-send-outbound-emails-deployment.yaml @@ -6,8 +6,8 @@ metadata: kompose.version: 1.13.0 (84fa826) creationTimestamp: null labels: - io.kompose.service: jobstoreinboundemails - name: jobstoreinboundemails + io.kompose.service: apisendoutboundemails + name: apisendoutboundemails spec: replicas: 1 strategy: {} @@ -15,18 +15,18 @@ spec: metadata: creationTimestamp: null labels: - io.kompose.service: jobstoreinboundemails + io.kompose.service: apisendoutboundemails spec: containers: - - name: jobstoreinboundemails - image: {{.Values.version.imageRegistry}}/opwenserver_job:{{.Values.version.dockerTag}} + - name: apisendoutboundemails + image: {{.Values.version.imageRegistry}}/opwenserver_api:{{.Values.version.dockerTag}} env: - - name: JOB_NAME - value: opwen_email_server.jobs.store_inbound_emails + - name: API_NAME + value: opwen_email_server/static/send-outbound-emails.yaml + - name: GUNICORN_WORKERS + value: "{{.Values.server.gunicornWorkers}}" - name: LOKOLE_LOG_LEVEL value: {{.Values.logging.level}} - - name: LOKOLE_QUEUE_POLL_SECONDS - value: "{{.Values.worker.pollingIntervalSeconds}}" - name: LOKOLE_CLIENT_AZURE_STORAGE_KEY valueFrom: secretKeyRef: diff --git a/helm/templates/job-send-outbound-emails-autoscaler.yaml b/helm/templates/api-store-inbound-emails-autoscaler.yaml similarity index 84% rename from helm/templates/job-send-outbound-emails-autoscaler.yaml rename to helm/templates/api-store-inbound-emails-autoscaler.yaml index f0114e78..6974fa2f 100644 --- a/helm/templates/job-send-outbound-emails-autoscaler.yaml +++ b/helm/templates/api-store-inbound-emails-autoscaler.yaml @@ -2,12 +2,12 @@ apiVersion: autoscaling/v1 kind: HorizontalPodAutoscaler metadata: creationTimestamp: null - name: jobsendoutboundemails + name: apistoreinboundemails spec: maxReplicas: {{.Values.autoscale.maxPods}} minReplicas: {{.Values.autoscale.minPods}} scaleTargetRef: apiVersion: extensions/v1beta1 kind: Deployment - name: jobsendoutboundemails + name: apistoreinboundemails targetCPUUtilizationPercentage: {{.Values.autoscale.cpuThreshold}} diff --git a/helm/templates/job-send-outbound-emails-deployment.yaml b/helm/templates/api-store-inbound-emails-deployment.yaml similarity index 84% rename from helm/templates/job-send-outbound-emails-deployment.yaml rename to helm/templates/api-store-inbound-emails-deployment.yaml index 3d62b230..4b166ea7 100644 --- a/helm/templates/job-send-outbound-emails-deployment.yaml +++ b/helm/templates/api-store-inbound-emails-deployment.yaml @@ -6,8 +6,8 @@ metadata: kompose.version: 1.13.0 (84fa826) creationTimestamp: null labels: - io.kompose.service: jobsendoutboundemails - name: jobsendoutboundemails + io.kompose.service: apistoreinboundemails + name: apistoreinboundemails spec: replicas: 1 strategy: {} @@ -15,18 +15,18 @@ spec: metadata: creationTimestamp: null labels: - io.kompose.service: jobsendoutboundemails + io.kompose.service: apistoreinboundemails spec: containers: - - name: jobsendoutboundemails - image: {{.Values.version.imageRegistry}}/opwenserver_job:{{.Values.version.dockerTag}} + - name: apistoreinboundemails + image: {{.Values.version.imageRegistry}}/opwenserver_api:{{.Values.version.dockerTag}} env: - - name: JOB_NAME - value: opwen_email_server.jobs.send_outbound_emails + - name: API_NAME + value: opwen_email_server/static/store-inbound-emails.yaml + - name: GUNICORN_WORKERS + value: "{{.Values.server.gunicornWorkers}}" - name: LOKOLE_LOG_LEVEL value: {{.Values.logging.level}} - - name: LOKOLE_QUEUE_POLL_SECONDS - value: "{{.Values.worker.pollingIntervalSeconds}}" - name: LOKOLE_CLIENT_AZURE_STORAGE_KEY valueFrom: secretKeyRef: diff --git a/helm/templates/job-store-written-client-emails-autoscaler.yaml b/helm/templates/api-store-written-client-emails-autoscaler.yaml similarity index 81% rename from helm/templates/job-store-written-client-emails-autoscaler.yaml rename to helm/templates/api-store-written-client-emails-autoscaler.yaml index c3fd0e33..696beec8 100644 --- a/helm/templates/job-store-written-client-emails-autoscaler.yaml +++ b/helm/templates/api-store-written-client-emails-autoscaler.yaml @@ -2,12 +2,12 @@ apiVersion: autoscaling/v1 kind: HorizontalPodAutoscaler metadata: creationTimestamp: null - name: jobstorewrittenclientemails + name: apistorewrittenclientemails spec: maxReplicas: {{.Values.autoscale.maxPods}} minReplicas: {{.Values.autoscale.minPods}} scaleTargetRef: apiVersion: extensions/v1beta1 kind: Deployment - name: jobstorewrittenclientemails + name: apistorewrittenclientemails targetCPUUtilizationPercentage: {{.Values.autoscale.cpuThreshold}} diff --git a/helm/templates/job-store-written-client-emails-deployment.yaml b/helm/templates/api-store-written-client-emails-deployment.yaml similarity index 83% rename from helm/templates/job-store-written-client-emails-deployment.yaml rename to helm/templates/api-store-written-client-emails-deployment.yaml index 2b86debf..405694bf 100644 --- a/helm/templates/job-store-written-client-emails-deployment.yaml +++ b/helm/templates/api-store-written-client-emails-deployment.yaml @@ -6,8 +6,8 @@ metadata: kompose.version: 1.13.0 (84fa826) creationTimestamp: null labels: - io.kompose.service: jobstorewrittenclientemails - name: jobstorewrittenclientemails + io.kompose.service: apistorewrittenclientemails + name: apistorewrittenclientemails spec: replicas: 1 strategy: {} @@ -15,18 +15,18 @@ spec: metadata: creationTimestamp: null labels: - io.kompose.service: jobstorewrittenclientemails + io.kompose.service: apistorewrittenclientemails spec: containers: - - name: jobstorewrittenclientemails - image: {{.Values.version.imageRegistry}}/opwenserver_job:{{.Values.version.dockerTag}} + - name: apistorewrittenclientemails + image: {{.Values.version.imageRegistry}}/opwenserver_api:{{.Values.version.dockerTag}} env: - - name: JOB_NAME - value: opwen_email_server.jobs.store_written_client_emails + - name: API_NAME + value: opwen_email_server/static/store-written-client-emails.yaml + - name: GUNICORN_WORKERS + value: "{{.Values.server.gunicornWorkers}}" - name: LOKOLE_LOG_LEVEL value: {{.Values.logging.level}} - - name: LOKOLE_QUEUE_POLL_SECONDS - value: "{{.Values.worker.pollingIntervalSeconds}}" - name: LOKOLE_CLIENT_AZURE_STORAGE_KEY valueFrom: secretKeyRef: diff --git a/opwen_email_server/api/send_outbound_emails.py b/opwen_email_server/api/send_outbound_emails.py new file mode 100644 index 00000000..773b5664 --- /dev/null +++ b/opwen_email_server/api/send_outbound_emails.py @@ -0,0 +1,17 @@ +from opwen_email_server.backend import email_sender +from opwen_email_server.backend import server_datastore +from opwen_email_server.utils.log import LogMixin + + +class _Sender(LogMixin): + def __call__(self, resource_id: str): + email = server_datastore.fetch_email(resource_id) + self.log_info('Fetched outbound email %s for sending', resource_id) + + email_sender.send(email) + self.log_info('Done sending outbound email %s', resource_id) + + return 'OK', 200 + + +send = _Sender() diff --git a/opwen_email_server/jobs/store_inbound_emails.py b/opwen_email_server/api/store_inbound_emails.py similarity index 65% rename from opwen_email_server/jobs/store_inbound_emails.py rename to opwen_email_server/api/store_inbound_emails.py index 81790879..228c6869 100644 --- a/opwen_email_server/jobs/store_inbound_emails.py +++ b/opwen_email_server/api/store_inbound_emails.py @@ -1,16 +1,12 @@ from opwen_email_server.api import email_receive from opwen_email_server.backend import server_datastore -from opwen_email_server.services.queue_consumer import QueueConsumer from opwen_email_server.utils.email_parser import inline_images from opwen_email_server.utils.email_parser import parse_mime_email +from opwen_email_server.utils.log import LogMixin -class Job(QueueConsumer): - def __init__(self): - super().__init__(email_receive.QUEUE.dequeue) - - def _process_message(self, message: dict): - resource_id = message['resource_id'] +class _InboundStorer(LogMixin): + def __call__(self, resource_id: str): mime_email = email_receive.STORAGE.fetch_text(resource_id) self.log_info('Fetched inbound MIME email %s', resource_id) @@ -22,7 +18,7 @@ def _process_message(self, message: dict): email_receive.STORAGE.delete(resource_id) self.log_info('Deleted inbound MIME email %s', resource_id) + return 'OK', 200 + -if __name__ == '__main__': - from opwen_email_server.services.queue_consumer import cli - cli(Job) +store = _InboundStorer() diff --git a/opwen_email_server/jobs/store_written_client_emails.py b/opwen_email_server/api/store_written_client_emails.py similarity index 69% rename from opwen_email_server/jobs/store_written_client_emails.py rename to opwen_email_server/api/store_written_client_emails.py index 5770a53a..56588710 100644 --- a/opwen_email_server/jobs/store_written_client_emails.py +++ b/opwen_email_server/api/store_written_client_emails.py @@ -1,16 +1,11 @@ -from opwen_email_server.api import client_write from opwen_email_server.backend import client_datastore from opwen_email_server.backend import email_sender from opwen_email_server.backend import server_datastore -from opwen_email_server.services.queue_consumer import QueueConsumer +from opwen_email_server.utils.log import LogMixin -class Job(QueueConsumer): - def __init__(self): - super().__init__(client_write.QUEUE.dequeue) - - def _process_message(self, message: dict): - resource_id = message['resource_id'] +class _WrittenStorer(LogMixin): + def __call__(self, resource_id: str): emails = client_datastore.unpack_emails(resource_id) self.log_info('Fetched packaged client emails from %s', resource_id) @@ -30,7 +25,7 @@ def _process_message(self, message: dict): client_datastore.delete(resource_id) self.log_info('Deleted packaged client emails from %s', resource_id) + return 'OK', 200 + -if __name__ == '__main__': - from opwen_email_server.services.queue_consumer import cli - cli(Job) +store = _WrittenStorer() diff --git a/opwen_email_server/azure_constants.py b/opwen_email_server/azure_constants.py index 1a3f6789..72f4ced0 100644 --- a/opwen_email_server/azure_constants.py +++ b/opwen_email_server/azure_constants.py @@ -1,5 +1,3 @@ -from os import environ - CONTAINER_CLIENT_PACKAGES = 'compressedpackages' CONTAINER_EMAILS = 'emails' CONTAINER_SENDGRID_MIME = 'sendgridinboundemails' @@ -13,4 +11,3 @@ QUEUE_CLIENT_PACKAGE = 'lokoleinboundemails' QUEUE_EMAIL_SEND = 'sengridoutboundemails' QUEUE_SENDGRID_MIME = 'sengridinboundemails' -QUEUE_POLL_INTERVAL = float(environ.get('LOKOLE_QUEUE_POLL_SECONDS', '10')) diff --git a/opwen_email_server/jobs/__init__.py b/opwen_email_server/jobs/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/opwen_email_server/jobs/send_outbound_emails.py b/opwen_email_server/jobs/send_outbound_emails.py deleted file mode 100644 index d1474584..00000000 --- a/opwen_email_server/jobs/send_outbound_emails.py +++ /dev/null @@ -1,21 +0,0 @@ -from opwen_email_server.backend import email_sender -from opwen_email_server.backend import server_datastore -from opwen_email_server.services.queue_consumer import QueueConsumer - - -class Job(QueueConsumer): - def __init__(self): - super().__init__(email_sender.QUEUE.dequeue) - - def _process_message(self, message: dict): - resource_id = message['resource_id'] - email = server_datastore.fetch_email(resource_id) - self.log_info('Fetched outbound email %s for sending', resource_id) - - email_sender.send(email) - self.log_info('Done sending outbound email %s', resource_id) - - -if __name__ == '__main__': - from opwen_email_server.services.queue_consumer import cli - cli(Job) diff --git a/opwen_email_server/services/queue_consumer.py b/opwen_email_server/services/queue_consumer.py deleted file mode 100644 index ea9f8d37..00000000 --- a/opwen_email_server/services/queue_consumer.py +++ /dev/null @@ -1,87 +0,0 @@ -from time import sleep - -from opwen_email_server.azure_constants import QUEUE_POLL_INTERVAL -from opwen_email_server.utils.log import LogMixin -from opwen_email_server.config import QUEUE_ERROR_FILE -from opwen_email_server.utils.temporary import remove_if_exists - - -class QueueConsumer(LogMixin): - def __init__(self, dequeue, - poll_seconds: float=QUEUE_POLL_INTERVAL) -> None: - - self._dequeue = dequeue - self._poll_seconds = poll_seconds - self._is_running = True - - def _process_message(self, message: dict): - raise NotImplementedError - - def run_once(self) -> bool: - self.log_debug('queue consumer checking for messages') - did_process = False - - with self._dequeue() as messages: - self.log_debug('queue consumer got %d messages', len(messages)) - for i, message in enumerate(messages): - self.log_debug('queue consumer processing message %d', i) - self._process_message(message) - did_process = True - - return did_process - - def run_forever(self): - self.log_debug('queue consumer listening') - while self._is_running: - self.log_debug('starting polling queue') - - # noinspection PyBroadException - try: - did_process = self.run_once() - except Exception as ex: - self._report_error(ex) - else: - if did_process: - self._report_success() - else: - self._wait_for_next_message() - - def _wait_for_next_message(self): - self.log_debug('queue consumer waiting for %d', self._poll_seconds) - sleep(self._poll_seconds) - - def _report_success(self): - self.log_debug('done polling queue') - - if QUEUE_ERROR_FILE: - remove_if_exists(QUEUE_ERROR_FILE) - - def _report_error(self, ex: Exception): - self.log_exception('error polling queue:%r', ex) - - if QUEUE_ERROR_FILE: - with open(QUEUE_ERROR_FILE, 'a') as fobj: - fobj.write('{}\n'.format(ex)) - - -def cli(job_class): - from argparse import ArgumentParser - from os.path import dirname - from os.path import join - - parser = ArgumentParser() - parser.add_argument('--once', action='store_true', default=False) - args = parser.parse_args() - - try: - # noinspection PyUnresolvedReferences - from dotenv import load_dotenv - load_dotenv(join(dirname(__file__), '.env')) - except ImportError: - pass - - job = job_class() - if args.once: - job.run_once() - else: - job.run_forever() diff --git a/opwen_email_server/static/send-outbound-emails.yaml b/opwen_email_server/static/send-outbound-emails.yaml new file mode 100644 index 00000000..38c89d0f --- /dev/null +++ b/opwen_email_server/static/send-outbound-emails.yaml @@ -0,0 +1,26 @@ +swagger: '2.0' + +info: + title: Opwen Cloudserver Email API. + version: '0.1' + +basePath: '/job/email/outbound/send' + +paths: + + '/': + + post: + operationId: opwen_email_server.api.send_outbound_emails.send + summary: Queue-triggered endpoint to send an outbound email. + consumes: + - application/json + parameters: + - name: resource_id + description: The id of the email to send. + type: string + in: body + required: true + responses: + 200: + description: The email was sent. diff --git a/opwen_email_server/static/store-inbound-emails.yaml b/opwen_email_server/static/store-inbound-emails.yaml new file mode 100644 index 00000000..86c89f04 --- /dev/null +++ b/opwen_email_server/static/store-inbound-emails.yaml @@ -0,0 +1,26 @@ +swagger: '2.0' + +info: + title: Opwen Cloudserver Email API. + version: '0.1' + +basePath: '/job/email/inbound/store' + +paths: + + '/': + + post: + operationId: opwen_email_server.api.store_inbound_emails.store + summary: Queue-triggered endpoint to store an inbound email. + consumes: + - application/json + parameters: + - name: resource_id + description: The id of the email to store. + type: string + in: body + required: true + responses: + 200: + description: The email was stored. diff --git a/opwen_email_server/static/store-written-emails.yaml b/opwen_email_server/static/store-written-emails.yaml new file mode 100644 index 00000000..7731f0ca --- /dev/null +++ b/opwen_email_server/static/store-written-emails.yaml @@ -0,0 +1,26 @@ +swagger: '2.0' + +info: + title: Opwen Cloudserver Email API. + version: '0.1' + +basePath: '/job/email/client/store' + +paths: + + '/': + + post: + operationId: opwen_email_server.api.store_written_client_emails.store + summary: Queue-triggered endpoint to store a client email. + consumes: + - application/json + parameters: + - name: resource_id + description: The id of the email to store. + type: string + in: body + required: true + responses: + 200: + description: The email was stored. diff --git a/tests/opwen_email_server/api/test_send_outbound_emails.py b/tests/opwen_email_server/api/test_send_outbound_emails.py new file mode 100644 index 00000000..1cc957fe --- /dev/null +++ b/tests/opwen_email_server/api/test_send_outbound_emails.py @@ -0,0 +1,20 @@ +from unittest import TestCase +from unittest.mock import patch + +from opwen_email_server.backend import email_sender +from opwen_email_server.backend import server_datastore +from opwen_email_server.api import send_outbound_emails + + +class SendOutboundEmailsTests(TestCase): + @patch.object(server_datastore, 'fetch_email') + @patch.object(email_sender, 'send') + def test_reads_message_and_stores_email(self, send_mock, fetch_mock): + email_id = '7ad33d8a-c1ee-44c7-a655-fb0d167dc380' + email = {'to': ['foo@bar.com'], '_uid': email_id} + fetch_mock.return_value = email + + send_outbound_emails.send(email_id) + + fetch_mock.assert_called_once_with(email_id) + send_mock.assert_called_once_with(email) diff --git a/tests/opwen_email_server/jobs/test_store_inbound_emails.py b/tests/opwen_email_server/api/test_store_inbound_emails.py similarity index 54% rename from tests/opwen_email_server/jobs/test_store_inbound_emails.py rename to tests/opwen_email_server/api/test_store_inbound_emails.py index 0797ecfe..54a54bc0 100644 --- a/tests/opwen_email_server/jobs/test_store_inbound_emails.py +++ b/tests/opwen_email_server/api/test_store_inbound_emails.py @@ -2,31 +2,22 @@ from unittest.mock import patch from opwen_email_server.backend import server_datastore -from opwen_email_server.jobs import store_inbound_emails +from opwen_email_server.api import store_inbound_emails class StoreInboundEmailsTests(TestCase): - @patch.object(store_inbound_emails.email_receive, 'QUEUE') @patch.object(store_inbound_emails.email_receive, 'STORAGE') @patch.object(server_datastore, 'store_email') @patch.object(store_inbound_emails, 'parse_mime_email') def test_reads_message_and_stores_email( - self, parser_mock, store_mock, storage_mock, queue_mock): + self, parser_mock, store_mock, storage_mock): email_id = '7ad33d8a-c1ee-44c7-a655-fb0d167dc380' email = {'to': ['foo@bar.com']} - self._given_message(email, email_id, parser_mock, queue_mock) - consumer = store_inbound_emails.Job() + parser_mock.return_value = email - consumer.run_once() + store_inbound_emails.store(email_id) self.assertEqual(storage_mock.fetch_text.call_count, 1) self.assertEqual(storage_mock.delete.call_count, 1) - self.assertEqual(queue_mock.dequeue.call_count, 1) store_mock.assert_called_once_with(email_id, email) - - @classmethod - def _given_message(cls, email, email_id, parser_mock, queue_mock): - queue_mock.dequeue.return_value.__enter__.return_value = \ - [{'resource_id': email_id}] - parser_mock.return_value = email diff --git a/tests/opwen_email_server/jobs/test_store_written_client_emails.py b/tests/opwen_email_server/api/test_store_written_client_emails.py similarity index 64% rename from tests/opwen_email_server/jobs/test_store_written_client_emails.py rename to tests/opwen_email_server/api/test_store_written_client_emails.py index 83c4dccd..5401f968 100644 --- a/tests/opwen_email_server/jobs/test_store_written_client_emails.py +++ b/tests/opwen_email_server/api/test_store_written_client_emails.py @@ -4,40 +4,29 @@ from opwen_email_server.backend import client_datastore from opwen_email_server.backend import email_sender from opwen_email_server.backend import server_datastore -from opwen_email_server.jobs import store_written_client_emails +from opwen_email_server.api import store_written_client_emails class StoreWrittenClientEmailsTests(TestCase): - @patch.object(store_written_client_emails.client_write, 'QUEUE') @patch.object(email_sender, 'QUEUE') @patch.object(client_datastore, 'unpack_emails') @patch.object(client_datastore, 'delete') @patch.object(server_datastore, 'store_email') def test_reads_message_and_stores_email( - self, store_mock, delete_mock, unpack_mock, send_queue_mock, - write_queue_mock): + self, store_mock, delete_mock, unpack_mock, send_queue_mock): resource_id = '7ad33d8a-c1ee-44c7-a655-fb0d167dc380' email1_id = '4efba428-143c-11e7-93ae-92361f002671' email2_id = 'c91636ee-143f-11e7-93ae-92361f002671' email1 = {'to': ['foo@test.com'], '_uid': email1_id} email2 = {'to': ['bar@test.com'], '_uid': email2_id} - emails = [email1, email2] - self._given_message(emails, resource_id, unpack_mock, write_queue_mock) - consumer = store_written_client_emails.Job() + unpack_mock.return_value = [email1, email2] - consumer.run_once() + store_written_client_emails.store(resource_id) self.assertEqual(unpack_mock.call_count, 1) self.assertEqual(delete_mock.call_count, 1) self.assertEqual(send_queue_mock.enqueue.call_count, 2) - self.assertEqual(write_queue_mock.dequeue.call_count, 1) self.assertEqual(store_mock.call_count, 2) store_mock.assert_any_call(email1_id, email1) store_mock.assert_any_call(email2_id, email2) - - @classmethod - def _given_message(cls, emails, resource_id, unpack_mock, queue_mock): - queue_mock.dequeue.return_value.__enter__.return_value = \ - [{'resource_id': resource_id}] - unpack_mock.return_value = emails diff --git a/tests/opwen_email_server/jobs/__init__.py b/tests/opwen_email_server/jobs/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/tests/opwen_email_server/jobs/test_send_outbound_emails.py b/tests/opwen_email_server/jobs/test_send_outbound_emails.py deleted file mode 100644 index a4ff2201..00000000 --- a/tests/opwen_email_server/jobs/test_send_outbound_emails.py +++ /dev/null @@ -1,31 +0,0 @@ -from unittest import TestCase -from unittest.mock import patch - -from opwen_email_server.backend import email_sender -from opwen_email_server.backend import server_datastore -from opwen_email_server.jobs import send_outbound_emails - - -class SendOutboundEmailsTests(TestCase): - @patch.object(email_sender, 'QUEUE') - @patch.object(server_datastore, 'fetch_email') - @patch.object(email_sender, 'send') - def test_reads_message_and_stores_email( - self, send_mock, fetch_mock, queue_mock): - - email_id = '7ad33d8a-c1ee-44c7-a655-fb0d167dc380' - email = {'to': ['foo@bar.com'], '_uid': email_id} - self._given_message(email, email_id, fetch_mock, queue_mock) - consumer = send_outbound_emails.Job() - - consumer.run_once() - - self.assertEqual(queue_mock.dequeue.call_count, 1) - fetch_mock.assert_called_once_with(email_id) - send_mock.assert_called_once_with(email) - - @classmethod - def _given_message(cls, email, email_id, fetch_mock, queue_mock): - queue_mock.dequeue.return_value.__enter__.return_value = \ - [{'resource_id': email_id}] - fetch_mock.return_value = email diff --git a/tests/opwen_email_server/services/test_queue_consumer.py b/tests/opwen_email_server/services/test_queue_consumer.py deleted file mode 100644 index 29bbb6c5..00000000 --- a/tests/opwen_email_server/services/test_queue_consumer.py +++ /dev/null @@ -1,95 +0,0 @@ -from contextlib import contextmanager -from unittest import TestCase - -from opwen_email_server.services.queue_consumer import QueueConsumer - - -class TestQueueConsumer(QueueConsumer): - def __init__(self, message_processor, message_generator, max_runs): - super().__init__(message_generator, poll_seconds=0.01) - self.messages_processed = 0 - self.times_waited = 0 - self.exceptions_encountered = 0 - self._num_runs = 0 - self._max_runs = max_runs - self._message_processor = message_processor - - def _process_message(self, message: dict): - self._message_processor() - - def _track_run(self): - self._num_runs += 1 - if self._num_runs >= self._max_runs: - self._is_running = False - - def _report_success(self): - self._track_run() - self.messages_processed += 1 - - def _report_error(self, ex: Exception): - self._track_run() - self.exceptions_encountered += 1 - - def _wait_for_next_message(self): - self._track_run() - self.times_waited += 1 - - -class QueueConsumerTests(TestCase): - def test_processes_messages_immediately_if_there_is_more_work(self): - def _process(): pass - - @contextmanager - def _produce(): yield [{"foo": "bar"}] - - consumer = TestQueueConsumer(_process, _produce, max_runs=10) - - consumer.run_forever() - - self.assertEqual(consumer.messages_processed, 10) - self.assertEqual(consumer.times_waited, 0) - - def test_waits_for_new_messages_if_there_is_no_work_to_do(self): - def _process(): pass - - @contextmanager - def _produce(): yield [] - - consumer = TestQueueConsumer(_process, _produce, max_runs=10) - - consumer.run_forever() - - self.assertEqual(consumer.messages_processed, 0) - self.assertEqual(consumer.times_waited, 10) - - def test_waits_or_processes_messages_if_available(self): - self.messages_produced = 0 - - def _process(): pass - - @contextmanager - def _produce(): - if self.messages_produced % 2 == 0: - yield [] - else: - yield [{"foo": "bar"}] - self.messages_produced += 1 - - consumer = TestQueueConsumer(_process, _produce, max_runs=10) - - consumer.run_forever() - - self.assertEqual(consumer.messages_processed, 5) - self.assertEqual(consumer.times_waited, 5) - - def test_ignores_exceptions_while_running(self): - def _throw(): raise ValueError - - @contextmanager - def _produce(): yield [{"foo": "bar"}] - - consumer = TestQueueConsumer(_throw, _produce, max_runs=10) - - consumer.run_forever() - - self.assertEqual(consumer.exceptions_encountered, 10) From bea55e4aba34dd8019a8e43c6cfad57bf6e3c3ee Mon Sep 17 00:00:00 2001 From: Clemens Wolff Date: Mon, 4 Jun 2018 00:17:29 -0400 Subject: [PATCH 02/19] Convert Azure Storage Queues to ServiceBus --- .dockerignore | 5 + .gitignore | 5 + docker-compose.yml | 92 +++++++++++++---- docker/api/Dockerfile | 2 + docker/docker-entrypoint.sh | 17 ++++ docker/queueconnector/Dockerfile | 16 +++ docker/setup/arm.parameters.json | 3 + docker/setup/arm.template.json | 77 +++++++++++++-- docker/setup/setup.sh | 8 +- .../templates/api-client-read-deployment.yaml | 10 -- .../api-client-write-deployment.yaml | 10 -- .../api-email-receive-deployment.yaml | 10 -- .../api-send-outbound-emails-deployment.yaml | 34 +++++-- .../api-store-inbound-emails-deployment.yaml | 34 +++++-- ...tore-written-client-emails-deployment.yaml | 34 +++++-- opwen_email_server/api/client_write.py | 4 +- opwen_email_server/api/email_receive.py | 4 +- .../api/store_written_client_emails.py | 11 ++- opwen_email_server/backend/email_sender.py | 5 - opwen_email_server/config.py | 6 +- opwen_email_server/services/queue.py | 81 ++++----------- opwen_queue_connector/Program.cs | 99 +++++++++++++++++++ opwen_queue_connector/queueconnector.csproj | 12 +++ requirements.txt | 2 +- .../api/test_store_written_client_emails.py | 3 +- .../opwen_email_server/services/test_queue.py | 91 ----------------- 26 files changed, 417 insertions(+), 258 deletions(-) create mode 100755 docker/docker-entrypoint.sh create mode 100644 docker/queueconnector/Dockerfile create mode 100644 opwen_queue_connector/Program.cs create mode 100644 opwen_queue_connector/queueconnector.csproj delete mode 100644 tests/opwen_email_server/services/test_queue.py diff --git a/.dockerignore b/.dockerignore index b186ce30..433795b6 100644 --- a/.dockerignore +++ b/.dockerignore @@ -5,3 +5,8 @@ docs/ tests/ travis/ venv/ + +.vs/ +bin/ +obj/ +out/ diff --git a/.gitignore b/.gitignore index b3b4a5a9..27b81c4f 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,8 @@ venv/ .mypy_cache/ serviceprincipal.json + +.vs/ +bin/ +obj/ +out/ diff --git a/docker-compose.yml b/docker-compose.yml index 54ac10bf..ed0fc50e 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,4 +1,4 @@ -version: '3' +version: '3.3' services: @@ -25,9 +25,10 @@ services: API_NAME: opwen_email_server/static/email-receive-spec.yaml GUNICORN_WORKERS: ${GUNICORN_WORKERS} LOKOLE_LOG_LEVEL: ${LOKOLE_LOG_LEVEL} - env_file: - - ./secrets/azure.env - - ./secrets/sendgrid.env + DOTENV_SECRETS: azure;sendgrid + secrets: + - azure + - sendgrid apiclientwrite: image: ${DOCKER_REPO}/opwenserver_api:${BUILD_TAG} @@ -38,9 +39,10 @@ services: API_NAME: opwen_email_server/static/client-write-spec.yaml GUNICORN_WORKERS: ${GUNICORN_WORKERS} LOKOLE_LOG_LEVEL: ${LOKOLE_LOG_LEVEL} - env_file: - - ./secrets/azure.env - - ./secrets/sendgrid.env + DOTENV_SECRETS: azure;sendgrid + secrets: + - azure + - sendgrid apiclientread: image: ${DOCKER_REPO}/opwenserver_api:${BUILD_TAG} @@ -51,9 +53,10 @@ services: API_NAME: opwen_email_server/static/client-read-spec.yaml GUNICORN_WORKERS: ${GUNICORN_WORKERS} LOKOLE_LOG_LEVEL: ${LOKOLE_LOG_LEVEL} - env_file: - - ./secrets/azure.env - - ./secrets/sendgrid.env + DOTENV_SECRETS: azure;sendgrid + secrets: + - azure + - sendgrid apisendoutboundemails: image: ${DOCKER_REPO}/opwenserver_api:${BUILD_TAG} @@ -64,9 +67,10 @@ services: API_NAME: opwen_email_server/static/send-outbound-emails.yaml GUNICORN_WORKERS: ${GUNICORN_WORKERS} LOKOLE_LOG_LEVEL: ${LOKOLE_LOG_LEVEL} - env_file: - - ./secrets/azure.env - - ./secrets/sendgrid.env + DOTENV_SECRETS: azure;sendgrid + secrets: + - azure + - sendgrid apistoreinboundemails: image: ${DOCKER_REPO}/opwenserver_api:${BUILD_TAG} @@ -77,9 +81,10 @@ services: API_NAME: opwen_email_server/static/store-inbound-emails.yaml GUNICORN_WORKERS: ${GUNICORN_WORKERS} LOKOLE_LOG_LEVEL: ${LOKOLE_LOG_LEVEL} - env_file: - - ./secrets/azure.env - - ./secrets/sendgrid.env + DOTENV_SECRETS: azure;sendgrid + secrets: + - azure + - sendgrid apistorewritteclientemails: image: ${DOCKER_REPO}/opwenserver_api:${BUILD_TAG} @@ -90,6 +95,55 @@ services: JOB_NAME: opwen_email_server/static/store-written-emails.yaml GUNICORN_WORKERS: ${GUNICORN_WORKERS} LOKOLE_LOG_LEVEL: ${LOKOLE_LOG_LEVEL} - env_file: - - ./secrets/azure.env - - ./secrets/sendgrid.env + DOTENV_SECRETS: azure;sendgrid + secrets: + - azure + - sendgrid + + connectorsendoutboundemails: + image: ${DOCKER_REPO}/opwenserver_queueconnector:${BUILD_TAG} + build: + context: . + dockerfile: docker/queueconnector/Dockerfile + environment: + LOKOLE_POST_URL: http://apisendoutboundemails/job/email/outbound/send + LOKOLE_SOURCE_QUEUE: sengridoutboundemails + DOTENV_SECRETS: azure + secrets: + - azure + depends_on: + - apisendoutboundemails + + connectorstoreinboundemails: + image: ${DOCKER_REPO}/opwenserver_queueconnector:${BUILD_TAG} + build: + context: . + dockerfile: docker/queueconnector/Dockerfile + environment: + LOKOLE_POST_URL: http://apistoreinboundemails/job/email/inbound/store + LOKOLE_SOURCE_QUEUE: sengridinboundemails + DOTENV_SECRETS: azure + secrets: + - azure + depends_on: + - apistoreinboundemails + + connectorstorewritteclientemails: + image: ${DOCKER_REPO}/opwenserver_queueconnector:${BUILD_TAG} + build: + context: . + dockerfile: docker/queueconnector/Dockerfile + environment: + LOKOLE_POST_URL: http://apistorewritteclientemails/job/email/client/store + LOKOLE_SOURCE_QUEUE: lokoleinboundemails + DOTENV_SECRETS: azure + secrets: + - azure + depends_on: + - apistorewritteclientemails + +secrets: + azure: + file: ./secrets/azure.env + sendgrid: + file: ./secrets/sendgrid.env diff --git a/docker/api/Dockerfile b/docker/api/Dockerfile index 073667e1..84460947 100644 --- a/docker/api/Dockerfile +++ b/docker/api/Dockerfile @@ -15,8 +15,10 @@ RUN apt-get update \ ADD opwen_email_server /app/opwen_email_server ADD runserver.py /app/server.py ADD docker/api/healthcheck.sh /app/healthcheck.sh +ADD docker/docker-entrypoint.sh /docker-entrypoint.sh EXPOSE 80 WORKDIR /app HEALTHCHECK --interval=59m --timeout=5s CMD /app/healthcheck.sh +ENTRYPOINT ["/docker-entrypoint.sh"] CMD "gunicorn" "-w" "${GUNICORN_WORKERS}" "-b" "0.0.0.0:80" "server:build_app(apis=['/app/${API_NAME}', '/app/opwen_email_server/static/healthcheck-spec.yaml'], server='tornado')" diff --git a/docker/docker-entrypoint.sh b/docker/docker-entrypoint.sh new file mode 100755 index 00000000..dcd1bd1d --- /dev/null +++ b/docker/docker-entrypoint.sh @@ -0,0 +1,17 @@ +#!/usr/bin/env sh +# This docker-entrypoint populates environment variables from docker secrets. +# The docker secrets are assumed to be files in dotenv syntax. The requested +# secrets should be declared in the environment variable DOTENV_SECRETS, with +# multiple secret names separated by a semi-colon. + +if [ ! -d /run/secrets ]; then + exec "$@" +fi + +if [ -z "${DOTENV_SECRETS}" ]; then + exec "$@" +fi + +eval "$(find /run/secrets -maxdepth 1 -type f | grep "$(echo "${DOTENV_SECRETS}" | sed 's/;/\\|/g')" | xargs cat | grep -v '^#' | sed 's/^\([^=]\+\)=\(.*\)$/if [ -z "$\1" ]; then \1="\2"; export \1; fi/g')" + +exec "$@" diff --git a/docker/queueconnector/Dockerfile b/docker/queueconnector/Dockerfile new file mode 100644 index 00000000..8dd1308d --- /dev/null +++ b/docker/queueconnector/Dockerfile @@ -0,0 +1,16 @@ +FROM microsoft/dotnet:2.0-sdk-stretch AS builder + +WORKDIR /app +COPY opwen_queue_connector/*.csproj ./ +RUN dotnet restore +COPY opwen_queue_connector/* ./ +RUN dotnet publish -c Release -o out + +FROM microsoft/dotnet:2.0-runtime-stretch + +WORKDIR /app +ADD docker/docker-entrypoint.sh /docker-entrypoint.sh +COPY --from=builder /app/out ./ + +ENTRYPOINT ["/docker-entrypoint.sh"] +CMD ["dotnet", "queueconnector.dll"] diff --git a/docker/setup/arm.parameters.json b/docker/setup/arm.parameters.json index 8ef37696..46e827dd 100644 --- a/docker/setup/arm.parameters.json +++ b/docker/setup/arm.parameters.json @@ -4,6 +4,9 @@ "parameters": { "storageAccountSKU": { "value": "" + }, + "serviceBusSKU": { + "value": "" } } } diff --git a/docker/setup/arm.template.json b/docker/setup/arm.template.json index b22048ed..6ed18d89 100644 --- a/docker/setup/arm.template.json +++ b/docker/setup/arm.template.json @@ -14,6 +14,17 @@ "metadata": { "description": "The pricing tier of the Storage Account resources that will be created for the project." } + }, + "serviceBusSKU": { + "type": "string", + "defaultValue": "Basic", + "allowedValues": [ + "Basic", + "Standard" + ], + "metadata": { + "description": "The pricing tier of the Service Bus resource that will be created for the project." + } } }, "variables": { @@ -22,6 +33,10 @@ "serverBlobsName": "[take(concat('opwenserverblobs', uniqueString(subscription().subscriptionId)), 22)]", "serverTablesName": "[take(concat('opwenservertables', uniqueString(subscription().subscriptionId)), 22)]", "serverQueuesName": "[take(concat('opwenserverqueues', uniqueString(subscription().subscriptionId)), 22)]", + "serverQueuesSasName": "worker", + "serverQueueClientPackage": "lokoleinboundemails", + "serverQueueEmailSend": "sengridoutboundemails", + "serverQueueSendgridMime": "sengridinboundemails", "location": "[resourceGroup().location]" }, "resources": [ @@ -69,15 +84,57 @@ "properties": {} }, { - "type": "Microsoft.Storage/storageAccounts", - "kind": "Storage", - "apiVersion": "2016-01-01", + "apiVersion": "2017-04-01", "name": "[variables('serverQueuesName')]", + "type": "Microsoft.ServiceBus/namespaces", "location": "[variables('location')]", "sku": { - "name": "[parameters('storageAccountSKU')]" + "name": "[parameters('serviceBusSKU')]" }, - "properties": {} + "properties": {}, + "resources": [ + { + "type": "AuthorizationRules", + "name": "[variables('serverQueuesSasName')]", + "apiVersion": "2017-04-01", + "properties": { + "rights": [ + "Listen", + "Send" + ] + }, + "dependsOn": [ + "[resourceId('Microsoft.ServiceBus/namespaces', variables('serverQueuesName'))]" + ] + }, + { + "type": "queues", + "name": "[variables('serverQueueClientPackage')]", + "apiVersion": "2017-04-01", + "properties": {}, + "dependsOn": [ + "[resourceId('Microsoft.ServiceBus/namespaces', variables('serverQueuesName'))]" + ] + }, + { + "type": "queues", + "name": "[variables('serverQueueEmailSend')]", + "apiVersion": "2017-04-01", + "properties": {}, + "dependsOn": [ + "[resourceId('Microsoft.ServiceBus/namespaces', variables('serverQueuesName'))]" + ] + }, + { + "type": "queues", + "name": "[variables('serverQueueSendgridMime')]", + "apiVersion": "2017-04-01", + "properties": {}, + "dependsOn": [ + "[resourceId('Microsoft.ServiceBus/namespaces', variables('serverQueuesName'))]" + ] + } + ] } ], "outputs": { @@ -87,7 +144,7 @@ }, "appinsightsKey": { "type": "string", - "value": "[reference(resourceId('Microsoft.Insights/components', variables('appinsightsName')), '2014-04-01').InstrumentationKey]", + "value": "[reference(resourceId('Microsoft.Insights/components', variables('appinsightsName')), '2014-04-01').InstrumentationKey]" }, "clientBlobsName": { "type": "string", @@ -117,9 +174,13 @@ "type": "string", "value": "[variables('serverQueuesName')]" }, - "serverQueuesKey": { + "serverQueuesSasName": { + "type": "string", + "value": "[variables('serverQueuesSasName')]" + }, + "serverQueuesSasKey": { "type": "string", - "value": "[listKeys(resourceId('Microsoft.Storage/storageAccounts', variables('serverQueuesName')), '2016-01-01').keys[0].value]" + "value": "[listKeys(resourceId('Microsoft.ServiceBus/namespaces', variables('serverQueuesName'), variables('serverQueuesSasName')), '2017-04-01').primaryKey]" } } } diff --git a/docker/setup/setup.sh b/docker/setup/setup.sh index 6e8098a2..1682798f 100755 --- a/docker/setup/setup.sh +++ b/docker/setup/setup.sh @@ -16,6 +16,7 @@ ## ## Optional environment variables: ## +## SERVICE_BUS_SKU ## STORAGE_ACCOUNT_SKU ## ## KUBERNETES_RESOURCE_GROUP_NAME @@ -61,6 +62,7 @@ use_resource_group "${RESOURCE_GROUP_NAME}" # storageaccountsku="${STORAGE_ACCOUNT_SKU:-Standard_GRS}" +servicebussku="${SERVICE_BUS_SKU:-Basic}" deploymentname="opwendeployment$(generate_identifier 8)" log "Creating resources via deployment ${deploymentname}" @@ -69,6 +71,7 @@ az group deployment create \ --name "${deploymentname}" \ --template-file "${scriptdir}/arm.template.json" \ --parameters storageAccountSKU="${storageaccountsku}" \ + --parameters serviceBusSKU="${servicebussku}" \ > /tmp/deployment.json cat > /secrets/azure.env << EOF @@ -78,10 +81,11 @@ LOKOLE_CLIENT_AZURE_STORAGE_KEY=$(jq -r .properties.outputs.clientBlobsKey.value LOKOLE_CLIENT_AZURE_STORAGE_NAME=$(jq -r .properties.outputs.clientBlobsName.value /tmp/deployment.json) LOKOLE_EMAIL_SERVER_AZURE_BLOBS_KEY=$(jq -r .properties.outputs.serverBlobsKey.value /tmp/deployment.json) LOKOLE_EMAIL_SERVER_AZURE_BLOBS_NAME=$(jq -r .properties.outputs.serverBlobsName.value /tmp/deployment.json) -LOKOLE_EMAIL_SERVER_AZURE_QUEUES_KEY=$(jq -r .properties.outputs.serverQueuesKey.value /tmp/deployment.json) -LOKOLE_EMAIL_SERVER_AZURE_QUEUES_NAME=$(jq -r .properties.outputs.serverQueuesName.value /tmp/deployment.json) LOKOLE_EMAIL_SERVER_AZURE_TABLES_KEY=$(jq -r .properties.outputs.serverTablesKey.value /tmp/deployment.json) LOKOLE_EMAIL_SERVER_AZURE_TABLES_NAME=$(jq -r .properties.outputs.serverTablesName.value /tmp/deployment.json) +LOKOLE_EMAIL_SERVER_QUEUES_NAMESPACE=$(jq -r .properties.outputs.serverQueuesName.value /tmp/deployment.json) +LOKOLE_EMAIL_SERVER_QUEUES_SAS_NAME=$(jq -r .properties.outputs.serverQueuesSasName.value /tmp/deployment.json) +LOKOLE_EMAIL_SERVER_QUEUES_SAS_KEY=$(jq -r .properties.outputs.serverQueuesSasKey.value /tmp/deployment.json) EOF cat > /secrets/sendgrid.env << EOF diff --git a/helm/templates/api-client-read-deployment.yaml b/helm/templates/api-client-read-deployment.yaml index b3a57af5..7674ba1b 100644 --- a/helm/templates/api-client-read-deployment.yaml +++ b/helm/templates/api-client-read-deployment.yaml @@ -47,16 +47,6 @@ spec: secretKeyRef: name: azure key: LOKOLE_EMAIL_SERVER_AZURE_BLOBS_NAME - - name: LOKOLE_EMAIL_SERVER_AZURE_QUEUES_KEY - valueFrom: - secretKeyRef: - name: azure - key: LOKOLE_EMAIL_SERVER_AZURE_QUEUES_KEY - - name: LOKOLE_EMAIL_SERVER_AZURE_QUEUES_NAME - valueFrom: - secretKeyRef: - name: azure - key: LOKOLE_EMAIL_SERVER_AZURE_QUEUES_NAME - name: LOKOLE_EMAIL_SERVER_AZURE_TABLES_KEY valueFrom: secretKeyRef: diff --git a/helm/templates/api-client-write-deployment.yaml b/helm/templates/api-client-write-deployment.yaml index d66cc644..94aa0975 100644 --- a/helm/templates/api-client-write-deployment.yaml +++ b/helm/templates/api-client-write-deployment.yaml @@ -47,16 +47,6 @@ spec: secretKeyRef: name: azure key: LOKOLE_EMAIL_SERVER_AZURE_BLOBS_NAME - - name: LOKOLE_EMAIL_SERVER_AZURE_QUEUES_KEY - valueFrom: - secretKeyRef: - name: azure - key: LOKOLE_EMAIL_SERVER_AZURE_QUEUES_KEY - - name: LOKOLE_EMAIL_SERVER_AZURE_QUEUES_NAME - valueFrom: - secretKeyRef: - name: azure - key: LOKOLE_EMAIL_SERVER_AZURE_QUEUES_NAME - name: LOKOLE_EMAIL_SERVER_AZURE_TABLES_KEY valueFrom: secretKeyRef: diff --git a/helm/templates/api-email-receive-deployment.yaml b/helm/templates/api-email-receive-deployment.yaml index 0c8307e7..86de1da4 100644 --- a/helm/templates/api-email-receive-deployment.yaml +++ b/helm/templates/api-email-receive-deployment.yaml @@ -47,16 +47,6 @@ spec: secretKeyRef: name: azure key: LOKOLE_EMAIL_SERVER_AZURE_BLOBS_NAME - - name: LOKOLE_EMAIL_SERVER_AZURE_QUEUES_KEY - valueFrom: - secretKeyRef: - name: azure - key: LOKOLE_EMAIL_SERVER_AZURE_QUEUES_KEY - - name: LOKOLE_EMAIL_SERVER_AZURE_QUEUES_NAME - valueFrom: - secretKeyRef: - name: azure - key: LOKOLE_EMAIL_SERVER_AZURE_QUEUES_NAME - name: LOKOLE_EMAIL_SERVER_AZURE_TABLES_KEY valueFrom: secretKeyRef: diff --git a/helm/templates/api-send-outbound-emails-deployment.yaml b/helm/templates/api-send-outbound-emails-deployment.yaml index 2b39afaf..1cfdafa9 100644 --- a/helm/templates/api-send-outbound-emails-deployment.yaml +++ b/helm/templates/api-send-outbound-emails-deployment.yaml @@ -18,6 +18,28 @@ spec: io.kompose.service: apisendoutboundemails spec: containers: + - name: connectorsendoutboundemails + image: {{.Values.version.imageRegistry}}/opwenserver_queueconnector:{{.Values.version.dockerTag}} + env: + - name: LOKOLE_POST_URL + value: http://0.0.0.0/job/email/outbound/send + - name: LOKOLE_SOURCE_QUEUE + value: sengridoutboundemails + - name: LOKOLE_EMAIL_SERVER_QUEUES_NAMESPACE + valueFrom: + secretKeyRef: + name: azure + key: LOKOLE_EMAIL_SERVER_QUEUES_NAMESPACE + - name: LOKOLE_EMAIL_SERVER_QUEUES_SAS_NAME + valueFrom: + secretKeyRef: + name: azure + key: LOKOLE_EMAIL_SERVER_QUEUES_SAS_NAME + - name: LOKOLE_EMAIL_SERVER_QUEUES_SAS_KEY + valueFrom: + secretKeyRef: + name: azure + key: LOKOLE_EMAIL_SERVER_QUEUES_SAS_KEY - name: apisendoutboundemails image: {{.Values.version.imageRegistry}}/opwenserver_api:{{.Values.version.dockerTag}} env: @@ -47,16 +69,6 @@ spec: secretKeyRef: name: azure key: LOKOLE_EMAIL_SERVER_AZURE_BLOBS_NAME - - name: LOKOLE_EMAIL_SERVER_AZURE_QUEUES_KEY - valueFrom: - secretKeyRef: - name: azure - key: LOKOLE_EMAIL_SERVER_AZURE_QUEUES_KEY - - name: LOKOLE_EMAIL_SERVER_AZURE_QUEUES_NAME - valueFrom: - secretKeyRef: - name: azure - key: LOKOLE_EMAIL_SERVER_AZURE_QUEUES_NAME - name: LOKOLE_EMAIL_SERVER_AZURE_TABLES_KEY valueFrom: secretKeyRef: @@ -77,6 +89,8 @@ spec: secretKeyRef: name: sendgrid key: LOKOLE_SENDGRID_KEY + ports: + - containerPort: 80 resources: {} restartPolicy: Always status: {} diff --git a/helm/templates/api-store-inbound-emails-deployment.yaml b/helm/templates/api-store-inbound-emails-deployment.yaml index 4b166ea7..d56c5f8d 100644 --- a/helm/templates/api-store-inbound-emails-deployment.yaml +++ b/helm/templates/api-store-inbound-emails-deployment.yaml @@ -18,6 +18,28 @@ spec: io.kompose.service: apistoreinboundemails spec: containers: + - name: connectorstoreinboundemails + image: {{.Values.version.imageRegistry}}/opwenserver_queueconnector:{{.Values.version.dockerTag}} + env: + - name: LOKOLE_POST_URL + value: http://0.0.0.0/job/email/inbound/store + - name: LOKOLE_SOURCE_QUEUE + value: sengridinboundemails + - name: LOKOLE_EMAIL_SERVER_QUEUES_NAMESPACE + valueFrom: + secretKeyRef: + name: azure + key: LOKOLE_EMAIL_SERVER_QUEUES_NAMESPACE + - name: LOKOLE_EMAIL_SERVER_QUEUES_SAS_NAME + valueFrom: + secretKeyRef: + name: azure + key: LOKOLE_EMAIL_SERVER_QUEUES_SAS_NAME + - name: LOKOLE_EMAIL_SERVER_QUEUES_SAS_KEY + valueFrom: + secretKeyRef: + name: azure + key: LOKOLE_EMAIL_SERVER_QUEUES_SAS_KEY - name: apistoreinboundemails image: {{.Values.version.imageRegistry}}/opwenserver_api:{{.Values.version.dockerTag}} env: @@ -47,16 +69,6 @@ spec: secretKeyRef: name: azure key: LOKOLE_EMAIL_SERVER_AZURE_BLOBS_NAME - - name: LOKOLE_EMAIL_SERVER_AZURE_QUEUES_KEY - valueFrom: - secretKeyRef: - name: azure - key: LOKOLE_EMAIL_SERVER_AZURE_QUEUES_KEY - - name: LOKOLE_EMAIL_SERVER_AZURE_QUEUES_NAME - valueFrom: - secretKeyRef: - name: azure - key: LOKOLE_EMAIL_SERVER_AZURE_QUEUES_NAME - name: LOKOLE_EMAIL_SERVER_AZURE_TABLES_KEY valueFrom: secretKeyRef: @@ -77,6 +89,8 @@ spec: secretKeyRef: name: sendgrid key: LOKOLE_SENDGRID_KEY + ports: + - containerPort: 80 resources: {} restartPolicy: Always status: {} diff --git a/helm/templates/api-store-written-client-emails-deployment.yaml b/helm/templates/api-store-written-client-emails-deployment.yaml index 405694bf..8115c757 100644 --- a/helm/templates/api-store-written-client-emails-deployment.yaml +++ b/helm/templates/api-store-written-client-emails-deployment.yaml @@ -18,6 +18,28 @@ spec: io.kompose.service: apistorewrittenclientemails spec: containers: + - name: connectorstorewrittenclientemails + image: {{.Values.version.imageRegistry}}/opwenserver_queueconnector:{{.Values.version.dockerTag}} + env: + - name: LOKOLE_POST_URL + value: http://0.0.0.0/job/email/client/store + - name: LOKOLE_SOURCE_QUEUE + value: lokoleinboundemails + - name: LOKOLE_EMAIL_SERVER_QUEUES_NAMESPACE + valueFrom: + secretKeyRef: + name: azure + key: LOKOLE_EMAIL_SERVER_QUEUES_NAMESPACE + - name: LOKOLE_EMAIL_SERVER_QUEUES_SAS_NAME + valueFrom: + secretKeyRef: + name: azure + key: LOKOLE_EMAIL_SERVER_QUEUES_SAS_NAME + - name: LOKOLE_EMAIL_SERVER_QUEUES_SAS_KEY + valueFrom: + secretKeyRef: + name: azure + key: LOKOLE_EMAIL_SERVER_QUEUES_SAS_KEY - name: apistorewrittenclientemails image: {{.Values.version.imageRegistry}}/opwenserver_api:{{.Values.version.dockerTag}} env: @@ -47,16 +69,6 @@ spec: secretKeyRef: name: azure key: LOKOLE_EMAIL_SERVER_AZURE_BLOBS_NAME - - name: LOKOLE_EMAIL_SERVER_AZURE_QUEUES_KEY - valueFrom: - secretKeyRef: - name: azure - key: LOKOLE_EMAIL_SERVER_AZURE_QUEUES_KEY - - name: LOKOLE_EMAIL_SERVER_AZURE_QUEUES_NAME - valueFrom: - secretKeyRef: - name: azure - key: LOKOLE_EMAIL_SERVER_AZURE_QUEUES_NAME - name: LOKOLE_EMAIL_SERVER_AZURE_TABLES_KEY valueFrom: secretKeyRef: @@ -77,6 +89,8 @@ spec: secretKeyRef: name: sendgrid key: LOKOLE_SENDGRID_KEY + ports: + - containerPort: 80 resources: {} restartPolicy: Always status: {} diff --git a/opwen_email_server/api/client_write.py b/opwen_email_server/api/client_write.py index 5757afe8..f64e015f 100644 --- a/opwen_email_server/api/client_write.py +++ b/opwen_email_server/api/client_write.py @@ -6,7 +6,9 @@ from opwen_email_server.services.queue import AzureQueue from opwen_email_server.utils.log import LogMixin -QUEUE = AzureQueue(account=config.QUEUES_ACCOUNT, key=config.QUEUES_KEY, +QUEUE = AzureQueue(namespace=config.QUEUES_NAMESPACE, + sas_key=config.QUEUES_SAS_KEY, + sas_name=config.QUEUES_SAS_NAME, name=constants.QUEUE_CLIENT_PACKAGE) CLIENTS = AzureAuth(account=config.TABLES_ACCOUNT, key=config.TABLES_KEY, diff --git a/opwen_email_server/api/email_receive.py b/opwen_email_server/api/email_receive.py index ff47d655..d6f08ad8 100644 --- a/opwen_email_server/api/email_receive.py +++ b/opwen_email_server/api/email_receive.py @@ -12,7 +12,9 @@ key=config.BLOBS_KEY, container=constants.CONTAINER_SENDGRID_MIME) -QUEUE = AzureQueue(account=config.QUEUES_ACCOUNT, key=config.QUEUES_KEY, +QUEUE = AzureQueue(namespace=config.QUEUES_NAMESPACE, + sas_key=config.QUEUES_SAS_KEY, + sas_name=config.QUEUES_SAS_NAME, name=constants.QUEUE_SENDGRID_MIME) CLIENTS = AzureAuth(account=config.TABLES_ACCOUNT, key=config.TABLES_KEY, diff --git a/opwen_email_server/api/store_written_client_emails.py b/opwen_email_server/api/store_written_client_emails.py index 56588710..2625c30b 100644 --- a/opwen_email_server/api/store_written_client_emails.py +++ b/opwen_email_server/api/store_written_client_emails.py @@ -1,8 +1,15 @@ +from opwen_email_server import azure_constants as constants +from opwen_email_server import config from opwen_email_server.backend import client_datastore -from opwen_email_server.backend import email_sender from opwen_email_server.backend import server_datastore +from opwen_email_server.services.queue import AzureQueue from opwen_email_server.utils.log import LogMixin +QUEUE = AzureQueue(namespace=config.QUEUES_NAMESPACE, + sas_key=config.QUEUES_SAS_KEY, + sas_name=config.QUEUES_SAS_NAME, + name=constants.QUEUE_EMAIL_SEND) + class _WrittenStorer(LogMixin): def __call__(self, resource_id: str): @@ -14,7 +21,7 @@ def __call__(self, resource_id: str): server_datastore.store_email(email_id, email) self.log_info('Stored packaged client email %s', email_id) - email_sender.QUEUE.enqueue({ + QUEUE.enqueue({ '_version': '0.1', '_type': 'email_to_send', 'resource_id': email_id, diff --git a/opwen_email_server/backend/email_sender.py b/opwen_email_server/backend/email_sender.py index 76270209..cf495116 100644 --- a/opwen_email_server/backend/email_sender.py +++ b/opwen_email_server/backend/email_sender.py @@ -1,13 +1,8 @@ from typing import Tuple -from opwen_email_server import azure_constants as constants from opwen_email_server import config -from opwen_email_server.services.queue import AzureQueue from opwen_email_server.services.sendgrid import SendgridEmailSender -QUEUE = AzureQueue(account=config.QUEUES_ACCOUNT, key=config.QUEUES_KEY, - name=constants.QUEUE_EMAIL_SEND) - EMAIL = SendgridEmailSender(key=config.EMAIL_SENDER_KEY) diff --git a/opwen_email_server/config.py b/opwen_email_server/config.py index c7955d78..71530da0 100644 --- a/opwen_email_server/config.py +++ b/opwen_email_server/config.py @@ -1,7 +1,5 @@ from os import environ -QUEUES_ACCOUNT = environ.get('LOKOLE_EMAIL_SERVER_AZURE_QUEUES_NAME', '') -QUEUES_KEY = environ.get('LOKOLE_EMAIL_SERVER_AZURE_QUEUES_KEY', '') BLOBS_ACCOUNT = environ.get('LOKOLE_EMAIL_SERVER_AZURE_BLOBS_NAME', '') BLOBS_KEY = environ.get('LOKOLE_EMAIL_SERVER_AZURE_BLOBS_KEY', '') TABLES_ACCOUNT = environ.get('LOKOLE_EMAIL_SERVER_AZURE_TABLES_NAME', '') @@ -10,6 +8,10 @@ CLIENT_STORAGE_ACCOUNT = environ.get('LOKOLE_CLIENT_AZURE_STORAGE_NAME', '') CLIENT_STORAGE_KEY = environ.get('LOKOLE_CLIENT_AZURE_STORAGE_KEY', '') +QUEUES_NAMESPACE = environ.get('LOKOLE_EMAIL_SERVER_QUEUES_NAMESPACE') +QUEUES_SAS_NAME = environ.get('LOKOLE_EMAIL_SERVER_QUEUES_SAS_NAME') +QUEUES_SAS_KEY = environ.get('LOKOLE_EMAIL_SERVER_QUEUES_SAS_KEY') + EMAIL_SENDER_KEY = environ.get('LOKOLE_SENDGRID_KEY', '') LOG_LEVEL = environ.get('LOKOLE_LOG_LEVEL', 'DEBUG') diff --git a/opwen_email_server/services/queue.py b/opwen_email_server/services/queue.py index 80102e08..52e060e8 100644 --- a/opwen_email_server/services/queue.py +++ b/opwen_email_server/services/queue.py @@ -1,9 +1,6 @@ -from contextlib import contextmanager -from json import loads - -from azure.storage.queue import QueueService +from azure.servicebus import Message +from azure.servicebus import ServiceBusService from typing import Callable -from typing import Iterable from opwen_email_server.utils.log import LogMixin from opwen_email_server.utils.serialization import to_json @@ -12,22 +9,24 @@ class AzureQueue(LogMixin): _max_message_retries = 5 - def __init__(self, account: str, key: str, name: str, - client: QueueService=None, - factory: Callable[..., QueueService]=QueueService) -> None: + def __init__(self, name: str, namespace: str, sas_name: str, sas_key: str, + client: ServiceBusService=None, + factory: Callable[..., ServiceBusService]=ServiceBusService) \ + -> None: - self._account = account - self._key = key self._name = name + self._namespace = namespace + self._sas_name = sas_name + self._sas_key = sas_key self.__client = client self._client_factory = factory @property - def _client(self) -> QueueService: + def _client(self) -> ServiceBusService: if self.__client is not None: return self.__client - client = self._client_factory(self._account, self._key) - client.create_queue(self._name) + client = self._client_factory(self._namespace, self._sas_name, + self._sas_key) self.__client = client return client @@ -36,60 +35,14 @@ def name(self) -> str: return self._name @classmethod - def _pack(cls, content: dict) -> str: - return to_json(content) - - @classmethod - def _unpack(cls, message: str) -> dict: - return loads(message) + def _pack(cls, content: dict) -> Message: + body = to_json(content).encode('utf-8') + return Message(body) def enqueue(self, content: dict): message = self._pack(content) - self._client.put_message(self._name, message) - self.log_debug('received message') - - @contextmanager # type: ignore - def dequeue(self, lock_seconds: int=10) -> Iterable[dict]: - messages = self._client.get_messages(self._name, 1, lock_seconds) - messages = list(messages) - if not messages: - yield [] # type: ignore - else: - message = messages[0] - delete_message = False - - # noinspection PyBroadException - try: - payload = self._unpack(message.content) - except Exception: - self.log_exception( - 'error unpacking message %r, purging', - message.id) - delete_message = True - yield [] # type: ignore - else: - # noinspection PyBroadException - try: - yield [payload] # type: ignore - except Exception as ex: - if message.dequeue_count > self._max_message_retries: - self.log_exception( - 'too many retries for message %r, purging:%r', - message.id, ex) - delete_message = True - else: - self.log_exception( - 'error processing message %r, retrying:%r', - message.id, ex) - else: - self.log_debug( - 'done with message %r, deleting', - message.id) - delete_message = True - - if delete_message: - self._client.delete_message(self._name, message.id, - message.pop_receipt) + self._client.send_queue_message(self._name, message) + self.log_debug('sent message') def extra_log_args(self): yield 'queue %s', self._name diff --git a/opwen_queue_connector/Program.cs b/opwen_queue_connector/Program.cs new file mode 100644 index 00000000..5e0cc78c --- /dev/null +++ b/opwen_queue_connector/Program.cs @@ -0,0 +1,99 @@ +using Microsoft.Azure.ServiceBus; +using System; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using System.Net.Http; +using System.Net.Http.Headers; + +namespace queueconnector +{ + public class Program + { + private static readonly Lazy QueueClient = new Lazy(() => + new QueueClient(new ServiceBusConnectionStringBuilder + { + Endpoint = Env.Namespace, + EntityPath = Env.Queue, + SasKey = Env.SasKey, + SasKeyName = Env.SasName + } + )); + + private static readonly Lazy HttpClient = new Lazy(() => + new HttpClient + { + BaseAddress = new Uri(Env.Url) + } + ); + + public static void Main(string[] args) + { + MainAsync().GetAwaiter().GetResult(); + } + + private static async Task MainAsync() + { + QueueClient.Value.RegisterMessageHandler(HandleMessage, new MessageHandlerOptions(HandleError) + { + MaxConcurrentCalls = 1, + AutoComplete = false + }); + + await Console.Out.WriteLineAsync($"Queue connector {Env.Queue}: Starting listening"); + + try + { + await Task.Delay(Timeout.Infinite); + } + finally + { + await QueueClient.Value.CloseAsync(); + await Console.Out.WriteLineAsync($"Queue connector {Env.Queue}: Shutting down"); + } + } + + async static private Task HandleMessage(Message message, CancellationToken token) + { + await Console.Out.WriteLineAsync($"Message {message.MessageId}: Received"); + + HttpResponseMessage response; + using (var request = new ByteArrayContent(message.Body)) + { + request.Headers.ContentType = new MediaTypeHeaderValue("application/json"); + response = await HttpClient.Value.PostAsync(string.Empty, request); + } + + if (response.IsSuccessStatusCode) + { + await QueueClient.Value.CompleteAsync(message.SystemProperties.LockToken); + await Console.Out.WriteLineAsync($"Message {message.MessageId}: Done"); + } + else + { + var error = response.Content.ReadAsStringAsync(); + await Console.Error.WriteLineAsync($"Message {message.MessageId}: Error {error}"); + } + } + + async static private Task HandleError(ExceptionReceivedEventArgs args) + { + var exception = args.Exception; + var context = args.ExceptionReceivedContext; + + await Console.Error.WriteLineAsync( + $"Message handler for {Env.Queue} encountered an exception: {exception}. " + + $"Endpoint={context.Endpoint} EntityPath={context.EntityPath} Action={context.Action}"); + } + } + + internal class Env + { + public static readonly string Namespace = Environment.GetEnvironmentVariable("LOKOLE_EMAIL_SERVER_QUEUES_NAMESPACE"); + public static readonly string SasName = Environment.GetEnvironmentVariable("LOKOLE_EMAIL_SERVER_QUEUES_SAS_NAME"); + public static readonly string SasKey = Environment.GetEnvironmentVariable("LOKOLE_EMAIL_SERVER_QUEUES_SAS_KEY"); + + public static readonly string Queue = Environment.GetEnvironmentVariable("LOKOLE_SOURCE_QUEUE"); + public static readonly string Url = Environment.GetEnvironmentVariable("LOKOLE_POST_URL"); + } +} diff --git a/opwen_queue_connector/queueconnector.csproj b/opwen_queue_connector/queueconnector.csproj new file mode 100644 index 00000000..048281ef --- /dev/null +++ b/opwen_queue_connector/queueconnector.csproj @@ -0,0 +1,12 @@ + + + + Exe + netcoreapp2.0 + + + + + + + diff --git a/requirements.txt b/requirements.txt index 9d051d80..a4ea17f2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,7 @@ applicationinsights==0.11.4 azure-cosmosdb-table==1.0.3 +azure-servicebus==0.21.1 azure-storage-blob==1.1.0 -azure-storage-queue==1.1.0 beautifulsoup4==4.6.0 connexion==1.4.2 pyzmail36==1.0.3 diff --git a/tests/opwen_email_server/api/test_store_written_client_emails.py b/tests/opwen_email_server/api/test_store_written_client_emails.py index 5401f968..1c9bc632 100644 --- a/tests/opwen_email_server/api/test_store_written_client_emails.py +++ b/tests/opwen_email_server/api/test_store_written_client_emails.py @@ -2,13 +2,12 @@ from unittest.mock import patch from opwen_email_server.backend import client_datastore -from opwen_email_server.backend import email_sender from opwen_email_server.backend import server_datastore from opwen_email_server.api import store_written_client_emails class StoreWrittenClientEmailsTests(TestCase): - @patch.object(email_sender, 'QUEUE') + @patch.object(store_written_client_emails, 'QUEUE') @patch.object(client_datastore, 'unpack_emails') @patch.object(client_datastore, 'delete') @patch.object(server_datastore, 'store_email') diff --git a/tests/opwen_email_server/services/test_queue.py b/tests/opwen_email_server/services/test_queue.py deleted file mode 100644 index 71d7ccd0..00000000 --- a/tests/opwen_email_server/services/test_queue.py +++ /dev/null @@ -1,91 +0,0 @@ -from collections import namedtuple -from unittest import TestCase -from unittest.mock import MagicMock - -from opwen_email_server.services.queue import AzureQueue - - -AzureQueueMessage = namedtuple( - 'Message', - 'id pop_receipt content dequeue_count') - - -class AzureQueueTests(TestCase): - def test_enqueue_stores_message(self): - queue, client_mock = self._given_queue() - - queue.enqueue({'foo': 'bar'}) - - self.assertEqual(client_mock.put_message.call_count, 1) - - def test_creates_queue_only_once(self): - queue, client_mock = self._given_queue() - - queue.enqueue({'foo': 'bar'}) - queue.enqueue({'foo': 'bar'}) - - self.assertEqual(client_mock.create_queue.call_count, 1) - - def test_dequeue_without_messages(self): - queue, client_mock = self._given_queue([]) - - with queue.dequeue() as messages: - pass - - self.assertEqual(client_mock.get_messages.call_count, 1) - self.assertEqual(client_mock.delete_message.call_count, 0) - self.assertEqual(messages, []) - - def test_dequeue_removes_messages(self): - queue, client_mock = self._given_queue(['{"foo":"bar"}']) - - with queue.dequeue() as messages: - pass - - self.assertEqual(client_mock.get_messages.call_count, 1) - self.assertEqual(client_mock.delete_message.call_count, 1) - self.assertEqual(messages, [{'foo': 'bar'}]) - - def test_dequeue_with_exception_does_not_remove_message(self): - queue, client_mock = self._given_queue(['{"foo":"bar"}']) - - with queue.dequeue() as _: - raise ValueError - - self.assertEqual(client_mock.get_messages.call_count, 1) - self.assertEqual(client_mock.delete_message.call_count, 0) - - def test_dequeue_with_many_exceptions_removes_message(self): - queue, client_mock = self._given_queue(['{"foo":"bar"}'], - dequeue_count=999) - - with queue.dequeue() as _: - raise ValueError - - self.assertEqual(client_mock.get_messages.call_count, 1) - self.assertEqual(client_mock.delete_message.call_count, 1) - - def test_dequeue_rejects_unparsable_messages(self): - queue, client_mock = self._given_queue(['{"corrupt']) - - with queue.dequeue() as messages: - pass - - self.assertEqual(client_mock.get_messages.call_count, 1) - self.assertEqual(client_mock.delete_message.call_count, 1) - self.assertEqual(messages, []) - - # noinspection PyTypeChecker - @classmethod - def _given_queue(cls, messages=None, dequeue_count=0): - client_mock = MagicMock() - queue = AzureQueue(account='account', key='key', name='name', - factory=lambda *args, **kwargs: client_mock) - - if messages: - client_mock.get_messages.return_value = [ - AzureQueueMessage(id=i, pop_receipt=i, content=message, - dequeue_count=dequeue_count) - for (i, message) in enumerate(messages)] - - return queue, client_mock From 180495f5371ddd8fbd9d80ee2a729071b14bcba1 Mon Sep 17 00:00:00 2001 From: Clemens Wolff Date: Mon, 4 Jun 2018 00:23:28 -0400 Subject: [PATCH 03/19] Ignore virtualenvs of all names --- .dockerignore | 2 +- .gitignore | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.dockerignore b/.dockerignore index 433795b6..1da0848c 100644 --- a/.dockerignore +++ b/.dockerignore @@ -4,7 +4,7 @@ __pycache__/ docs/ tests/ travis/ -venv/ +venv*/ .vs/ bin/ diff --git a/.gitignore b/.gitignore index 27b81c4f..cad5914e 100644 --- a/.gitignore +++ b/.gitignore @@ -5,7 +5,7 @@ __pycache__/ /build/ /dist/ /*.egg-info/ -venv/ +venv*/ .mypy_cache/ serviceprincipal.json From abb335f1b20b07c124053282506ff20fd9356161 Mon Sep 17 00:00:00 2001 From: Clemens Wolff Date: Mon, 4 Jun 2018 00:24:41 -0400 Subject: [PATCH 04/19] Make API_NAME envvar more descriptive --- docker-compose.yml | 12 ++++++------ docker/api/Dockerfile | 4 ++-- helm/templates/api-client-read-deployment.yaml | 2 +- helm/templates/api-client-write-deployment.yaml | 2 +- helm/templates/api-email-receive-deployment.yaml | 2 +- .../api-send-outbound-emails-deployment.yaml | 2 +- .../api-store-inbound-emails-deployment.yaml | 2 +- .../api-store-written-client-emails-deployment.yaml | 2 +- 8 files changed, 14 insertions(+), 14 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index ed0fc50e..ac9dfe72 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -22,7 +22,7 @@ services: context: . dockerfile: docker/api/Dockerfile environment: - API_NAME: opwen_email_server/static/email-receive-spec.yaml + CONNEXION_SPEC: opwen_email_server/static/email-receive-spec.yaml GUNICORN_WORKERS: ${GUNICORN_WORKERS} LOKOLE_LOG_LEVEL: ${LOKOLE_LOG_LEVEL} DOTENV_SECRETS: azure;sendgrid @@ -36,7 +36,7 @@ services: context: . dockerfile: docker/api/Dockerfile environment: - API_NAME: opwen_email_server/static/client-write-spec.yaml + CONNEXION_SPEC: opwen_email_server/static/client-write-spec.yaml GUNICORN_WORKERS: ${GUNICORN_WORKERS} LOKOLE_LOG_LEVEL: ${LOKOLE_LOG_LEVEL} DOTENV_SECRETS: azure;sendgrid @@ -50,7 +50,7 @@ services: context: . dockerfile: docker/api/Dockerfile environment: - API_NAME: opwen_email_server/static/client-read-spec.yaml + CONNEXION_SPEC: opwen_email_server/static/client-read-spec.yaml GUNICORN_WORKERS: ${GUNICORN_WORKERS} LOKOLE_LOG_LEVEL: ${LOKOLE_LOG_LEVEL} DOTENV_SECRETS: azure;sendgrid @@ -64,7 +64,7 @@ services: context: . dockerfile: docker/api/Dockerfile environment: - API_NAME: opwen_email_server/static/send-outbound-emails.yaml + CONNEXION_SPEC: opwen_email_server/static/send-outbound-emails.yaml GUNICORN_WORKERS: ${GUNICORN_WORKERS} LOKOLE_LOG_LEVEL: ${LOKOLE_LOG_LEVEL} DOTENV_SECRETS: azure;sendgrid @@ -78,7 +78,7 @@ services: context: . dockerfile: docker/api/Dockerfile environment: - API_NAME: opwen_email_server/static/store-inbound-emails.yaml + CONNEXION_SPEC: opwen_email_server/static/store-inbound-emails.yaml GUNICORN_WORKERS: ${GUNICORN_WORKERS} LOKOLE_LOG_LEVEL: ${LOKOLE_LOG_LEVEL} DOTENV_SECRETS: azure;sendgrid @@ -92,7 +92,7 @@ services: context: . dockerfile: docker/api/Dockerfile environment: - JOB_NAME: opwen_email_server/static/store-written-emails.yaml + CONNEXION_SPEC: opwen_email_server/static/store-written-emails.yaml GUNICORN_WORKERS: ${GUNICORN_WORKERS} LOKOLE_LOG_LEVEL: ${LOKOLE_LOG_LEVEL} DOTENV_SECRETS: azure;sendgrid diff --git a/docker/api/Dockerfile b/docker/api/Dockerfile index 84460947..3f667aef 100644 --- a/docker/api/Dockerfile +++ b/docker/api/Dockerfile @@ -1,6 +1,6 @@ FROM python:3.6 -ENV API_NAME="SET_ME" +ENV CONNEXION_SPEC="SET_ME" ENV GUNICORN_WORKERS="1" ADD requirements.txt /app/requirements.txt @@ -21,4 +21,4 @@ EXPOSE 80 WORKDIR /app HEALTHCHECK --interval=59m --timeout=5s CMD /app/healthcheck.sh ENTRYPOINT ["/docker-entrypoint.sh"] -CMD "gunicorn" "-w" "${GUNICORN_WORKERS}" "-b" "0.0.0.0:80" "server:build_app(apis=['/app/${API_NAME}', '/app/opwen_email_server/static/healthcheck-spec.yaml'], server='tornado')" +CMD "gunicorn" "-w" "${GUNICORN_WORKERS}" "-b" "0.0.0.0:80" "server:build_app(apis=['/app/${CONNEXION_SPEC}', '/app/opwen_email_server/static/healthcheck-spec.yaml'], server='tornado')" diff --git a/helm/templates/api-client-read-deployment.yaml b/helm/templates/api-client-read-deployment.yaml index 7674ba1b..177089f8 100644 --- a/helm/templates/api-client-read-deployment.yaml +++ b/helm/templates/api-client-read-deployment.yaml @@ -21,7 +21,7 @@ spec: - name: apiclientread image: {{.Values.version.imageRegistry}}/opwenserver_api:{{.Values.version.dockerTag}} env: - - name: API_NAME + - name: CONNEXION_SPEC value: opwen_email_server/static/client-read-spec.yaml - name: GUNICORN_WORKERS value: "{{.Values.server.gunicornWorkers}}" diff --git a/helm/templates/api-client-write-deployment.yaml b/helm/templates/api-client-write-deployment.yaml index 94aa0975..b60506ba 100644 --- a/helm/templates/api-client-write-deployment.yaml +++ b/helm/templates/api-client-write-deployment.yaml @@ -21,7 +21,7 @@ spec: - name: apiclientwrite image: {{.Values.version.imageRegistry}}/opwenserver_api:{{.Values.version.dockerTag}} env: - - name: API_NAME + - name: CONNEXION_SPEC value: opwen_email_server/static/client-write-spec.yaml - name: GUNICORN_WORKERS value: "{{.Values.server.gunicornWorkers}}" diff --git a/helm/templates/api-email-receive-deployment.yaml b/helm/templates/api-email-receive-deployment.yaml index 86de1da4..498168e3 100644 --- a/helm/templates/api-email-receive-deployment.yaml +++ b/helm/templates/api-email-receive-deployment.yaml @@ -21,7 +21,7 @@ spec: - name: apiemailreceive image: {{.Values.version.imageRegistry}}/opwenserver_api:{{.Values.version.dockerTag}} env: - - name: API_NAME + - name: CONNEXION_SPEC value: opwen_email_server/static/email-receive-spec.yaml - name: GUNICORN_WORKERS value: "{{.Values.server.gunicornWorkers}}" diff --git a/helm/templates/api-send-outbound-emails-deployment.yaml b/helm/templates/api-send-outbound-emails-deployment.yaml index 1cfdafa9..40f20df7 100644 --- a/helm/templates/api-send-outbound-emails-deployment.yaml +++ b/helm/templates/api-send-outbound-emails-deployment.yaml @@ -43,7 +43,7 @@ spec: - name: apisendoutboundemails image: {{.Values.version.imageRegistry}}/opwenserver_api:{{.Values.version.dockerTag}} env: - - name: API_NAME + - name: CONNEXION_SPEC value: opwen_email_server/static/send-outbound-emails.yaml - name: GUNICORN_WORKERS value: "{{.Values.server.gunicornWorkers}}" diff --git a/helm/templates/api-store-inbound-emails-deployment.yaml b/helm/templates/api-store-inbound-emails-deployment.yaml index d56c5f8d..85060a41 100644 --- a/helm/templates/api-store-inbound-emails-deployment.yaml +++ b/helm/templates/api-store-inbound-emails-deployment.yaml @@ -43,7 +43,7 @@ spec: - name: apistoreinboundemails image: {{.Values.version.imageRegistry}}/opwenserver_api:{{.Values.version.dockerTag}} env: - - name: API_NAME + - name: CONNEXION_SPEC value: opwen_email_server/static/store-inbound-emails.yaml - name: GUNICORN_WORKERS value: "{{.Values.server.gunicornWorkers}}" diff --git a/helm/templates/api-store-written-client-emails-deployment.yaml b/helm/templates/api-store-written-client-emails-deployment.yaml index 8115c757..f61f60bc 100644 --- a/helm/templates/api-store-written-client-emails-deployment.yaml +++ b/helm/templates/api-store-written-client-emails-deployment.yaml @@ -43,7 +43,7 @@ spec: - name: apistorewrittenclientemails image: {{.Values.version.imageRegistry}}/opwenserver_api:{{.Values.version.dockerTag}} env: - - name: API_NAME + - name: CONNEXION_SPEC value: opwen_email_server/static/store-written-client-emails.yaml - name: GUNICORN_WORKERS value: "{{.Values.server.gunicornWorkers}}" From af3f574cafb95819b7d5c2290dff60ff42c7c977 Mon Sep 17 00:00:00 2001 From: Clemens Wolff Date: Mon, 4 Jun 2018 00:33:22 -0400 Subject: [PATCH 05/19] Fix typecheck --- opwen_email_server/config.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/opwen_email_server/config.py b/opwen_email_server/config.py index 71530da0..9f602a69 100644 --- a/opwen_email_server/config.py +++ b/opwen_email_server/config.py @@ -8,9 +8,9 @@ CLIENT_STORAGE_ACCOUNT = environ.get('LOKOLE_CLIENT_AZURE_STORAGE_NAME', '') CLIENT_STORAGE_KEY = environ.get('LOKOLE_CLIENT_AZURE_STORAGE_KEY', '') -QUEUES_NAMESPACE = environ.get('LOKOLE_EMAIL_SERVER_QUEUES_NAMESPACE') -QUEUES_SAS_NAME = environ.get('LOKOLE_EMAIL_SERVER_QUEUES_SAS_NAME') -QUEUES_SAS_KEY = environ.get('LOKOLE_EMAIL_SERVER_QUEUES_SAS_KEY') +QUEUES_NAMESPACE = environ.get('LOKOLE_EMAIL_SERVER_QUEUES_NAMESPACE', '') +QUEUES_SAS_NAME = environ.get('LOKOLE_EMAIL_SERVER_QUEUES_SAS_NAME', '') +QUEUES_SAS_KEY = environ.get('LOKOLE_EMAIL_SERVER_QUEUES_SAS_KEY', '') EMAIL_SENDER_KEY = environ.get('LOKOLE_SENDGRID_KEY', '') From bf833052a7e83549a47ad6e9685d83867bf5d82f Mon Sep 17 00:00:00 2001 From: Clemens Wolff Date: Mon, 4 Jun 2018 00:33:49 -0400 Subject: [PATCH 06/19] Add IDE folders to ignore files --- .dockerignore | 3 +++ .gitignore | 1 + 2 files changed, 4 insertions(+) diff --git a/.dockerignore b/.dockerignore index 1da0848c..6db420ca 100644 --- a/.dockerignore +++ b/.dockerignore @@ -1,3 +1,6 @@ +.idea/ +.vscode/ + *.pyc .mypy_cache/ __pycache__/ diff --git a/.gitignore b/.gitignore index cad5914e..8bba7c23 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ .idea/ +.vscode/ *.pyc __pycache__/ From 688a244977b16e45c40273f3aea3973d7f15111f Mon Sep 17 00:00:00 2001 From: Clemens Wolff Date: Mon, 4 Jun 2018 21:10:26 -0400 Subject: [PATCH 07/19] Fix typo --- docker/setup/arm.template.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker/setup/arm.template.json b/docker/setup/arm.template.json index 6ed18d89..d4827ccd 100644 --- a/docker/setup/arm.template.json +++ b/docker/setup/arm.template.json @@ -180,7 +180,7 @@ }, "serverQueuesSasKey": { "type": "string", - "value": "[listKeys(resourceId('Microsoft.ServiceBus/namespaces', variables('serverQueuesName'), variables('serverQueuesSasName')), '2017-04-01').primaryKey]" + "value": "[listKeys(resourceId('Microsoft.ServiceBus/namespaces/AuthorizationRules', variables('serverQueuesName'), variables('serverQueuesSasName')), '2017-04-01').primaryKey]" } } } From 344a51aa9fd22f4acbe24418d1006a87c569f920 Mon Sep 17 00:00:00 2001 From: Clemens Wolff Date: Mon, 4 Jun 2018 21:30:24 -0400 Subject: [PATCH 08/19] Fix typo --- docker-compose.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index ac9dfe72..ba549526 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -86,7 +86,7 @@ services: - azure - sendgrid - apistorewritteclientemails: + apistorewrittenclientemails: image: ${DOCKER_REPO}/opwenserver_api:${BUILD_TAG} build: context: . @@ -134,13 +134,13 @@ services: context: . dockerfile: docker/queueconnector/Dockerfile environment: - LOKOLE_POST_URL: http://apistorewritteclientemails/job/email/client/store + LOKOLE_POST_URL: http://apistorewrittenclientemails/job/email/client/store LOKOLE_SOURCE_QUEUE: lokoleinboundemails DOTENV_SECRETS: azure secrets: - azure depends_on: - - apistorewritteclientemails + - apistorewrittenclientemails secrets: azure: From 7f725d6e25432e96b0c996de26d2fb2f86265193 Mon Sep 17 00:00:00 2001 From: Clemens Wolff Date: Mon, 4 Jun 2018 21:30:52 -0400 Subject: [PATCH 09/19] Fix swagger files --- opwen_email_server/api/send_outbound_emails.py | 3 ++- opwen_email_server/api/store_inbound_emails.py | 3 ++- .../api/store_written_client_emails.py | 3 ++- opwen_email_server/static/send-outbound-emails.yaml | 12 +++++++++--- opwen_email_server/static/store-inbound-emails.yaml | 12 +++++++++--- opwen_email_server/static/store-written-emails.yaml | 12 +++++++++--- 6 files changed, 33 insertions(+), 12 deletions(-) diff --git a/opwen_email_server/api/send_outbound_emails.py b/opwen_email_server/api/send_outbound_emails.py index 773b5664..07a07db9 100644 --- a/opwen_email_server/api/send_outbound_emails.py +++ b/opwen_email_server/api/send_outbound_emails.py @@ -4,7 +4,8 @@ class _Sender(LogMixin): - def __call__(self, resource_id: str): + def __call__(self, message: dict): + resource_id = message.get('resource_id', '') email = server_datastore.fetch_email(resource_id) self.log_info('Fetched outbound email %s for sending', resource_id) diff --git a/opwen_email_server/api/store_inbound_emails.py b/opwen_email_server/api/store_inbound_emails.py index 228c6869..64e00b49 100644 --- a/opwen_email_server/api/store_inbound_emails.py +++ b/opwen_email_server/api/store_inbound_emails.py @@ -6,7 +6,8 @@ class _InboundStorer(LogMixin): - def __call__(self, resource_id: str): + def __call__(self, message: dict): + resource_id = message.get('resource_id', '') mime_email = email_receive.STORAGE.fetch_text(resource_id) self.log_info('Fetched inbound MIME email %s', resource_id) diff --git a/opwen_email_server/api/store_written_client_emails.py b/opwen_email_server/api/store_written_client_emails.py index 2625c30b..2f819a8d 100644 --- a/opwen_email_server/api/store_written_client_emails.py +++ b/opwen_email_server/api/store_written_client_emails.py @@ -12,7 +12,8 @@ class _WrittenStorer(LogMixin): - def __call__(self, resource_id: str): + def __call__(self, message: dict): + resource_id = message.get('resource_id', '') emails = client_datastore.unpack_emails(resource_id) self.log_info('Fetched packaged client emails from %s', resource_id) diff --git a/opwen_email_server/static/send-outbound-emails.yaml b/opwen_email_server/static/send-outbound-emails.yaml index 38c89d0f..4efa0383 100644 --- a/opwen_email_server/static/send-outbound-emails.yaml +++ b/opwen_email_server/static/send-outbound-emails.yaml @@ -16,11 +16,17 @@ paths: consumes: - application/json parameters: - - name: resource_id - description: The id of the email to send. - type: string + - name: message in: body required: true + schema: + type: object + required: + - resource_id + properties: + resource_id: + type: string + description: The id of the email to store. responses: 200: description: The email was sent. diff --git a/opwen_email_server/static/store-inbound-emails.yaml b/opwen_email_server/static/store-inbound-emails.yaml index 86c89f04..f3d0da22 100644 --- a/opwen_email_server/static/store-inbound-emails.yaml +++ b/opwen_email_server/static/store-inbound-emails.yaml @@ -16,11 +16,17 @@ paths: consumes: - application/json parameters: - - name: resource_id - description: The id of the email to store. - type: string + - name: message in: body required: true + schema: + type: object + required: + - resource_id + properties: + resource_id: + type: string + description: The id of the email to store. responses: 200: description: The email was stored. diff --git a/opwen_email_server/static/store-written-emails.yaml b/opwen_email_server/static/store-written-emails.yaml index 7731f0ca..c0e69a66 100644 --- a/opwen_email_server/static/store-written-emails.yaml +++ b/opwen_email_server/static/store-written-emails.yaml @@ -16,11 +16,17 @@ paths: consumes: - application/json parameters: - - name: resource_id - description: The id of the email to store. - type: string + - name: message in: body required: true + schema: + type: object + required: + - resource_id + properties: + resource_id: + type: string + description: The id of the email to store. responses: 200: description: The email was stored. From 70a4645da805a744205e3d0b1a99d1148a182b2c Mon Sep 17 00:00:00 2001 From: Clemens Wolff Date: Mon, 4 Jun 2018 21:31:04 -0400 Subject: [PATCH 10/19] Fix ServiceBus endpoint format --- opwen_queue_connector/Program.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opwen_queue_connector/Program.cs b/opwen_queue_connector/Program.cs index 5e0cc78c..9cddca6d 100644 --- a/opwen_queue_connector/Program.cs +++ b/opwen_queue_connector/Program.cs @@ -13,7 +13,7 @@ public class Program private static readonly Lazy QueueClient = new Lazy(() => new QueueClient(new ServiceBusConnectionStringBuilder { - Endpoint = Env.Namespace, + Endpoint = $"sb://{Env.Namespace}.servicebus.windows.net/", EntityPath = Env.Queue, SasKey = Env.SasKey, SasKeyName = Env.SasName From 081febc032054b62641632f775469ec53e4092ad Mon Sep 17 00:00:00 2001 From: Clemens Wolff Date: Mon, 4 Jun 2018 22:28:47 -0400 Subject: [PATCH 11/19] Fix ServiceBus client instantiation --- opwen_email_server/services/queue.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/opwen_email_server/services/queue.py b/opwen_email_server/services/queue.py index 52e060e8..af5f9eec 100644 --- a/opwen_email_server/services/queue.py +++ b/opwen_email_server/services/queue.py @@ -25,8 +25,10 @@ def __init__(self, name: str, namespace: str, sas_name: str, sas_key: str, def _client(self) -> ServiceBusService: if self.__client is not None: return self.__client - client = self._client_factory(self._namespace, self._sas_name, - self._sas_key) + client = self._client_factory( + service_namespace=self._namespace, + shared_access_key_name=self._sas_name, + shared_access_key_value=self._sas_key) self.__client = client return client From eda1c3169a3f756149d668ef52bc447ad56a45a7 Mon Sep 17 00:00:00 2001 From: Clemens Wolff Date: Mon, 4 Jun 2018 23:30:59 -0400 Subject: [PATCH 12/19] Avoid potential assembly mismatch issues --- docker/queueconnector/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker/queueconnector/Dockerfile b/docker/queueconnector/Dockerfile index 8dd1308d..1189002e 100644 --- a/docker/queueconnector/Dockerfile +++ b/docker/queueconnector/Dockerfile @@ -3,7 +3,7 @@ FROM microsoft/dotnet:2.0-sdk-stretch AS builder WORKDIR /app COPY opwen_queue_connector/*.csproj ./ RUN dotnet restore -COPY opwen_queue_connector/* ./ +COPY opwen_queue_connector/*.cs ./ RUN dotnet publish -c Release -o out FROM microsoft/dotnet:2.0-runtime-stretch From 94d9897b11c4a4deb88a947653acb20dfd74e971 Mon Sep 17 00:00:00 2001 From: Clemens Wolff Date: Mon, 4 Jun 2018 23:32:00 -0400 Subject: [PATCH 13/19] Simplify job API endpoint spec --- .../api/send_outbound_emails.py | 3 +-- .../api/store_inbound_emails.py | 3 +-- .../api/store_written_client_emails.py | 3 +-- .../static/send-outbound-emails.yaml | 18 ++++--------- .../static/store-inbound-emails.yaml | 18 ++++--------- .../static/store-written-emails.yaml | 18 ++++--------- opwen_queue_connector/Program.cs | 25 +++++++++---------- opwen_queue_connector/queueconnector.csproj | 1 + 8 files changed, 31 insertions(+), 58 deletions(-) diff --git a/opwen_email_server/api/send_outbound_emails.py b/opwen_email_server/api/send_outbound_emails.py index 07a07db9..773b5664 100644 --- a/opwen_email_server/api/send_outbound_emails.py +++ b/opwen_email_server/api/send_outbound_emails.py @@ -4,8 +4,7 @@ class _Sender(LogMixin): - def __call__(self, message: dict): - resource_id = message.get('resource_id', '') + def __call__(self, resource_id: str): email = server_datastore.fetch_email(resource_id) self.log_info('Fetched outbound email %s for sending', resource_id) diff --git a/opwen_email_server/api/store_inbound_emails.py b/opwen_email_server/api/store_inbound_emails.py index 64e00b49..228c6869 100644 --- a/opwen_email_server/api/store_inbound_emails.py +++ b/opwen_email_server/api/store_inbound_emails.py @@ -6,8 +6,7 @@ class _InboundStorer(LogMixin): - def __call__(self, message: dict): - resource_id = message.get('resource_id', '') + def __call__(self, resource_id: str): mime_email = email_receive.STORAGE.fetch_text(resource_id) self.log_info('Fetched inbound MIME email %s', resource_id) diff --git a/opwen_email_server/api/store_written_client_emails.py b/opwen_email_server/api/store_written_client_emails.py index 2f819a8d..2625c30b 100644 --- a/opwen_email_server/api/store_written_client_emails.py +++ b/opwen_email_server/api/store_written_client_emails.py @@ -12,8 +12,7 @@ class _WrittenStorer(LogMixin): - def __call__(self, message: dict): - resource_id = message.get('resource_id', '') + def __call__(self, resource_id: str): emails = client_datastore.unpack_emails(resource_id) self.log_info('Fetched packaged client emails from %s', resource_id) diff --git a/opwen_email_server/static/send-outbound-emails.yaml b/opwen_email_server/static/send-outbound-emails.yaml index 4efa0383..63567bfb 100644 --- a/opwen_email_server/static/send-outbound-emails.yaml +++ b/opwen_email_server/static/send-outbound-emails.yaml @@ -8,25 +8,17 @@ basePath: '/job/email/outbound/send' paths: - '/': + '/{resource_id}': post: operationId: opwen_email_server.api.send_outbound_emails.send summary: Queue-triggered endpoint to send an outbound email. - consumes: - - application/json parameters: - - name: message - in: body + - name: resource_id + in: path required: true - schema: - type: object - required: - - resource_id - properties: - resource_id: - type: string - description: The id of the email to store. + type: string + description: The id of the email to process. responses: 200: description: The email was sent. diff --git a/opwen_email_server/static/store-inbound-emails.yaml b/opwen_email_server/static/store-inbound-emails.yaml index f3d0da22..ac5c774a 100644 --- a/opwen_email_server/static/store-inbound-emails.yaml +++ b/opwen_email_server/static/store-inbound-emails.yaml @@ -8,25 +8,17 @@ basePath: '/job/email/inbound/store' paths: - '/': + '/{resource_id}': post: operationId: opwen_email_server.api.store_inbound_emails.store summary: Queue-triggered endpoint to store an inbound email. - consumes: - - application/json parameters: - - name: message - in: body + - name: resource_id + in: path required: true - schema: - type: object - required: - - resource_id - properties: - resource_id: - type: string - description: The id of the email to store. + type: string + description: The id of the email to process. responses: 200: description: The email was stored. diff --git a/opwen_email_server/static/store-written-emails.yaml b/opwen_email_server/static/store-written-emails.yaml index c0e69a66..870dbe17 100644 --- a/opwen_email_server/static/store-written-emails.yaml +++ b/opwen_email_server/static/store-written-emails.yaml @@ -8,25 +8,17 @@ basePath: '/job/email/client/store' paths: - '/': + '/{resource_id}': post: operationId: opwen_email_server.api.store_written_client_emails.store summary: Queue-triggered endpoint to store a client email. - consumes: - - application/json parameters: - - name: message - in: body + - name: resource_id + in: path required: true - schema: - type: object - required: - - resource_id - properties: - resource_id: - type: string - description: The id of the email to store. + type: string + description: The id of the email to process. responses: 200: description: The email was stored. diff --git a/opwen_queue_connector/Program.cs b/opwen_queue_connector/Program.cs index 9cddca6d..61631a42 100644 --- a/opwen_queue_connector/Program.cs +++ b/opwen_queue_connector/Program.cs @@ -5,6 +5,9 @@ using System.Threading.Tasks; using System.Net.Http; using System.Net.Http.Headers; +using System.Text; +using Newtonsoft.Json; +using System.Collections.Generic; namespace queueconnector { @@ -20,12 +23,7 @@ public class Program } )); - private static readonly Lazy HttpClient = new Lazy(() => - new HttpClient - { - BaseAddress = new Uri(Env.Url) - } - ); + private static readonly Lazy HttpClient = new Lazy(() => new HttpClient()); public static void Main(string[] args) { @@ -55,14 +53,15 @@ private static async Task MainAsync() async static private Task HandleMessage(Message message, CancellationToken token) { - await Console.Out.WriteLineAsync($"Message {message.MessageId}: Received"); + var messageBody = Encoding.UTF8.GetString(message.Body); + await Console.Out.WriteLineAsync($"Message {message.MessageId}: Received {messageBody}"); - HttpResponseMessage response; - using (var request = new ByteArrayContent(message.Body)) - { - request.Headers.ContentType = new MediaTypeHeaderValue("application/json"); - response = await HttpClient.Value.PostAsync(string.Empty, request); - } + var messageJson = JsonConvert.DeserializeObject>(messageBody); + var resourceId = messageJson["resource_id"]; + var url = $"{Env.Url}/{resourceId}"; + + await Console.Out.WriteLineAsync($"Message {message.MessageId}: Posting to {url}"); + var response = await HttpClient.Value.PostAsync(url, null); if (response.IsSuccessStatusCode) { diff --git a/opwen_queue_connector/queueconnector.csproj b/opwen_queue_connector/queueconnector.csproj index 048281ef..ba26498b 100644 --- a/opwen_queue_connector/queueconnector.csproj +++ b/opwen_queue_connector/queueconnector.csproj @@ -7,6 +7,7 @@ + From 7091685f834126b0435f414924e83a022b99c900 Mon Sep 17 00:00:00 2001 From: Clemens Wolff Date: Mon, 4 Jun 2018 23:32:15 -0400 Subject: [PATCH 14/19] Fix error display --- opwen_queue_connector/Program.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opwen_queue_connector/Program.cs b/opwen_queue_connector/Program.cs index 61631a42..c6b7f535 100644 --- a/opwen_queue_connector/Program.cs +++ b/opwen_queue_connector/Program.cs @@ -70,7 +70,7 @@ async static private Task HandleMessage(Message message, CancellationToken token } else { - var error = response.Content.ReadAsStringAsync(); + var error = await response.Content.ReadAsStringAsync(); await Console.Error.WriteLineAsync($"Message {message.MessageId}: Error {error}"); } } From a755ee930521bf73a86ad95ea7dd730998344601 Mon Sep 17 00:00:00 2001 From: Clemens Wolff Date: Mon, 4 Jun 2018 23:32:47 -0400 Subject: [PATCH 15/19] Make test data more representative --- .../utils/test_email_parser/email-attachment.mime | 6 +++--- .../utils/test_email_parser/email-ccbcc.mime | 8 ++++---- .../utils/test_email_parser/email-html.mime | 2 +- tests/opwen_email_server/services/test_sendgrid.py | 4 ++-- tests/opwen_email_server/utils/test_email_parser.py | 10 +++++----- 5 files changed, 15 insertions(+), 15 deletions(-) diff --git a/tests/files/opwen_email_server/utils/test_email_parser/email-attachment.mime b/tests/files/opwen_email_server/utils/test_email_parser/email-attachment.mime index b2df2dbb..aadb3d35 100644 --- a/tests/files/opwen_email_server/utils/test_email_parser/email-attachment.mime +++ b/tests/files/opwen_email_server/utils/test_email_parser/email-attachment.mime @@ -1,6 +1,6 @@ Received: by mx0032p1mdw1.sendgrid.net with SMTP id j5s6OrIxAo Mon, 13 Feb 2017 06:26:01 +0000 (UTC) -Received: from mail-yw0-f176.google.com (mail-yw0-f176.google.com [209.85.161.176]) by mx0032p1mdw1.sendgrid.net (Postfix) with ESMTPS id ED0E6865CD for ; Mon, 13 Feb 2017 06:26:00 +0000 (UTC) -Received: by mail-yw0-f176.google.com with SMTP id v200so45517752ywc.3 for ; Sun, 12 Feb 2017 22:26:00 -0800 (PST) +Received: from mail-yw0-f176.google.com (mail-yw0-f176.google.com [209.85.161.176]) by mx0032p1mdw1.sendgrid.net (Postfix) with ESMTPS id ED0E6865CD for ; Mon, 13 Feb 2017 06:26:00 +0000 (UTC) +Received: by mail-yw0-f176.google.com with SMTP id v200so45517752ywc.3 for ; Sun, 12 Feb 2017 22:26:00 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:from:date:message-id:subject:to; bh=BzB59eE6aqgBqNykdZ9LSb3ZuGuXFfDT9pvFSFek5s0=; b=FMASsyJpei2ae3965M1t46+/teaQXdqdsXOPd6viine1X10sEbhuGjX3LGTq3WnMUy f+xydQsjhTTSb15LuJivlsSyLN3dPKyqON4dnfQ6t4mPY3ol0Bnl2GvQico3U4whd6QG KuQtoJqijO4O8TBTWxQqoEjVapAZhr9qK0ZfhCuDCtZ5xfEtdphYkEq+pT52ykIn/Hy/ 6zjWfHKLB9fwiyKtc7Dv2GnPUt2vW/YJs8lsA/hrCISwq2Oy9dCyEX4+IZ/xABL+KLox nEsk4/wMLmHHvv3X1sODcIshGPuX0mloj2YkugRZHnwrTm7FEAjTAhRElwNGPSedSqqW l2Cw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:from:date:message-id:subject:to; bh=BzB59eE6aqgBqNykdZ9LSb3ZuGuXFfDT9pvFSFek5s0=; b=n+2qfxqxwNZyVk1+Rd4wS19yZrA7bE2U+AoQ7Jl3d7lAl1V7VV71/jBJFkXEIy6r2Q tqsInRPhv5aQ02W70y9gWB7Bib6Cf353jTHznF4qgjZoUohY8ygNyNrwvjnj2l7T2S+g xn0sCgyYsHbjyLOLF1PoBEUbmNGKcJWf2WIUxQ3wlSZ7tV/nuVj5n/QicAL9yk5e2YsH 5CNYDCFpTFS86SULGAfgeGpTswv0fjKPVipLJFJaw7WziV+v7t4F/jBngqSV0Sy4kyPn UlNn+T7uPlp8A9XgH1tiE7DMt99l2M7qWH0FMI45pZGoVpssmHQw1cuLqJf2F2So/Dhs dhhA== X-Gm-Message-State: AMke39nSCiOqrDKq7uOX7CaQVazGrDl5fI6cBqEyIbOhxDeR85q1Sl9Ae6//kVayvLfguqV5ErlYVxvjE8p9vw== @@ -11,7 +11,7 @@ From: Clemens Wolff Date: Sun, 12 Feb 2017 22:25:20 -0800 Message-ID: Subject: With attachment -To: clemens@victoria.ascoderu.ca +To: clemens@victoria.lokole.ca Content-Type: multipart/mixed; boundary=001a114f099665b49a0548638593 --001a114f099665b49a0548638593 diff --git a/tests/files/opwen_email_server/utils/test_email_parser/email-ccbcc.mime b/tests/files/opwen_email_server/utils/test_email_parser/email-ccbcc.mime index d76eb0c1..248b76ab 100644 --- a/tests/files/opwen_email_server/utils/test_email_parser/email-ccbcc.mime +++ b/tests/files/opwen_email_server/utils/test_email_parser/email-ccbcc.mime @@ -1,13 +1,13 @@ MIME-Version: 1.0 Received: by 10.129.50.130 with HTTP; Sat, 11 Mar 2017 13:03:18 -0800 (PST) -Bcc: Laura Barluzzi +Bcc: Laura Barluzzi Date: Sat, 11 Mar 2017 13:03:18 -0800 Delivered-To: clemens.wolff@gmail.com Message-ID: Subject: Test with CC and BCC -From: Clemens Wolff -To: Clemens Wolff -Cc: Nzola Swasisa , Clemens Wolff +From: Clemens Wolff +To: Clemens Wolff +Cc: Nzola Swasisa , Clemens Wolff Content-Type: multipart/alternative; boundary=001a114e6016c08cd0054a7ace01 --001a114e6016c08cd0054a7ace01 diff --git a/tests/files/opwen_email_server/utils/test_email_parser/email-html.mime b/tests/files/opwen_email_server/utils/test_email_parser/email-html.mime index 1c79a7c5..82ebfc00 100644 --- a/tests/files/opwen_email_server/utils/test_email_parser/email-html.mime +++ b/tests/files/opwen_email_server/utils/test_email_parser/email-html.mime @@ -11,7 +11,7 @@ From: Clemens Wolff Date: Sun, 12 Feb 2017 22:25:01 -0800 Message-ID: Subject: Two recipients -To: clemens@victoria.ascoderu.ca, laura@victoria.ascoderu.ca +To: clemens@victoria.lokole.ca, laura@victoria.lokole.ca Content-Type: multipart/alternative; boundary=001a1146392641b94705486384bf --001a1146392641b94705486384bf diff --git a/tests/opwen_email_server/services/test_sendgrid.py b/tests/opwen_email_server/services/test_sendgrid.py index 93197369..6399e538 100644 --- a/tests/opwen_email_server/services/test_sendgrid.py +++ b/tests/opwen_email_server/services/test_sendgrid.py @@ -8,9 +8,9 @@ class SendgridEmailSenderTests(TestCase): - recipient1 = 'clemens@ascoderu.ca' + recipient1 = 'clemens@lokole.ca' recipient2 = 'clemens.wolff@gmail.com' - sender = 'sendgridtests@ascoderu.ca' + sender = 'sendgridtests@lokole.ca' def test_sends_email(self): sender = self._given_client() diff --git a/tests/opwen_email_server/utils/test_email_parser.py b/tests/opwen_email_server/utils/test_email_parser.py index 9e803103..8619ea02 100644 --- a/tests/opwen_email_server/utils/test_email_parser.py +++ b/tests/opwen_email_server/utils/test_email_parser.py @@ -22,8 +22,8 @@ def test_parses_email_metadata(self): self.assertEqual(email.get('from'), 'clemens.wolff@gmail.com') self.assertEqual(email.get('subject'), 'Two recipients') self.assertEqual(email.get('sent_at'), '2017-02-13 06:25') - self.assertEqual(email.get('to'), ['clemens@victoria.ascoderu.ca', - 'laura@victoria.ascoderu.ca']) + self.assertEqual(email.get('to'), ['clemens@victoria.lokole.ca', + 'laura@victoria.lokole.ca']) def test_prefers_html_body_over_text(self): mime_email = self._given_mime_email('email-html.mime') @@ -38,9 +38,9 @@ def test_parses_email_with_cc_and_bcc(self): email = email_parser.parse_mime_email(mime_email) - self.assertEqual(email.get('bcc'), ['laura@ascoderu.ca']) - self.assertEqual(email.get('cc'), ['nzola@ascoderu.ca', - 'clemens@ascoderu.ca']) + self.assertEqual(email.get('bcc'), ['laura@lokole.ca']) + self.assertEqual(email.get('cc'), ['nzola@lokole.ca', + 'clemens@lokole.ca']) def test_parses_email_with_attachments(self): mime_email = self._given_mime_email('email-attachment.mime') From 70b3f865470b37a99ded39ba1d810b04f3c2621c Mon Sep 17 00:00:00 2001 From: Clemens Wolff Date: Wed, 13 Jun 2018 16:11:57 -0400 Subject: [PATCH 16/19] Fix typo --- docker-compose.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker-compose.yml b/docker-compose.yml index ba549526..d42de290 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -128,7 +128,7 @@ services: depends_on: - apistoreinboundemails - connectorstorewritteclientemails: + connectorstorewrittenclientemails: image: ${DOCKER_REPO}/opwenserver_queueconnector:${BUILD_TAG} build: context: . From 228adc7b393b58ccfdd1ab9ae166c90cd10d65b0 Mon Sep 17 00:00:00 2001 From: Clemens Wolff Date: Wed, 13 Jun 2018 16:13:04 -0400 Subject: [PATCH 17/19] Remove unused config --- helm/values.yaml | 3 --- 1 file changed, 3 deletions(-) diff --git a/helm/values.yaml b/helm/values.yaml index 779277dd..623ff964 100644 --- a/helm/values.yaml +++ b/helm/values.yaml @@ -5,9 +5,6 @@ version: server: gunicornWorkers: 2 -worker: - pollingIntervalSeconds: 120 - logging: level: INFO From 5c1f020fe3ae90c578b4993029c0384f84c9fb2b Mon Sep 17 00:00:00 2001 From: Clemens Wolff Date: Wed, 13 Jun 2018 16:41:21 -0400 Subject: [PATCH 18/19] Fix file name --- ...store-written-emails.yaml => store-written-client-emails.yaml} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename opwen_email_server/static/{store-written-emails.yaml => store-written-client-emails.yaml} (100%) diff --git a/opwen_email_server/static/store-written-emails.yaml b/opwen_email_server/static/store-written-client-emails.yaml similarity index 100% rename from opwen_email_server/static/store-written-emails.yaml rename to opwen_email_server/static/store-written-client-emails.yaml From a1104e1edaf70d5c4c91c3ef1dfd5567fa4fdc08 Mon Sep 17 00:00:00 2001 From: Clemens Wolff Date: Wed, 13 Jun 2018 16:58:12 -0400 Subject: [PATCH 19/19] Ensure email receiver has queue credentials --- helm/templates/api-email-receive-deployment.yaml | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/helm/templates/api-email-receive-deployment.yaml b/helm/templates/api-email-receive-deployment.yaml index 498168e3..05f4d825 100644 --- a/helm/templates/api-email-receive-deployment.yaml +++ b/helm/templates/api-email-receive-deployment.yaml @@ -67,6 +67,21 @@ spec: secretKeyRef: name: sendgrid key: LOKOLE_SENDGRID_KEY + - name: LOKOLE_EMAIL_SERVER_QUEUES_NAMESPACE + valueFrom: + secretKeyRef: + name: azure + key: LOKOLE_EMAIL_SERVER_QUEUES_NAMESPACE + - name: LOKOLE_EMAIL_SERVER_QUEUES_SAS_NAME + valueFrom: + secretKeyRef: + name: azure + key: LOKOLE_EMAIL_SERVER_QUEUES_SAS_NAME + - name: LOKOLE_EMAIL_SERVER_QUEUES_SAS_KEY + valueFrom: + secretKeyRef: + name: azure + key: LOKOLE_EMAIL_SERVER_QUEUES_SAS_KEY ports: - containerPort: 80 resources: {}