diff --git a/.dockerignore b/.dockerignore index b186ce30..6db420ca 100644 --- a/.dockerignore +++ b/.dockerignore @@ -1,7 +1,15 @@ +.idea/ +.vscode/ + *.pyc .mypy_cache/ __pycache__/ docs/ tests/ travis/ -venv/ +venv*/ + +.vs/ +bin/ +obj/ +out/ diff --git a/.env b/.env index 948cd117..55fbc8b4 100644 --- a/.env +++ b/.env @@ -2,5 +2,4 @@ APP_PORT=8080 BUILD_TAG=development DOCKER_REPO=cwolff GUNICORN_WORKERS=1 -LOKOLE_QUEUE_POLL_SECONDS=30 LOKOLE_LOG_LEVEL=DEBUG diff --git a/.gitignore b/.gitignore index b3b4a5a9..8bba7c23 100644 --- a/.gitignore +++ b/.gitignore @@ -1,11 +1,17 @@ .idea/ +.vscode/ *.pyc __pycache__/ /build/ /dist/ /*.egg-info/ -venv/ +venv*/ .mypy_cache/ serviceprincipal.json + +.vs/ +bin/ +obj/ +out/ diff --git a/docker-compose.yml b/docker-compose.yml index 61156a5f..d42de290 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,4 +1,4 @@ -version: '3' +version: '3.3' services: @@ -22,12 +22,13 @@ services: context: . dockerfile: docker/api/Dockerfile environment: - API_NAME: opwen_email_server/static/email-receive-spec.yaml + CONNEXION_SPEC: opwen_email_server/static/email-receive-spec.yaml GUNICORN_WORKERS: ${GUNICORN_WORKERS} LOKOLE_LOG_LEVEL: ${LOKOLE_LOG_LEVEL} - env_file: - - ./secrets/azure.env - - ./secrets/sendgrid.env + DOTENV_SECRETS: azure;sendgrid + secrets: + - azure + - sendgrid apiclientwrite: image: ${DOCKER_REPO}/opwenserver_api:${BUILD_TAG} @@ -35,12 +36,13 @@ services: context: . dockerfile: docker/api/Dockerfile environment: - API_NAME: opwen_email_server/static/client-write-spec.yaml + CONNEXION_SPEC: opwen_email_server/static/client-write-spec.yaml GUNICORN_WORKERS: ${GUNICORN_WORKERS} LOKOLE_LOG_LEVEL: ${LOKOLE_LOG_LEVEL} - env_file: - - ./secrets/azure.env - - ./secrets/sendgrid.env + DOTENV_SECRETS: azure;sendgrid + secrets: + - azure + - sendgrid apiclientread: image: ${DOCKER_REPO}/opwenserver_api:${BUILD_TAG} @@ -48,48 +50,100 @@ services: context: . dockerfile: docker/api/Dockerfile environment: - API_NAME: opwen_email_server/static/client-read-spec.yaml + CONNEXION_SPEC: opwen_email_server/static/client-read-spec.yaml GUNICORN_WORKERS: ${GUNICORN_WORKERS} LOKOLE_LOG_LEVEL: ${LOKOLE_LOG_LEVEL} - env_file: - - ./secrets/azure.env - - ./secrets/sendgrid.env + DOTENV_SECRETS: azure;sendgrid + secrets: + - azure + - sendgrid - jobsendoutboundemails: - image: ${DOCKER_REPO}/opwenserver_job:${BUILD_TAG} + apisendoutboundemails: + image: ${DOCKER_REPO}/opwenserver_api:${BUILD_TAG} build: context: . - dockerfile: docker/job/Dockerfile + dockerfile: docker/api/Dockerfile environment: - JOB_NAME: opwen_email_server.jobs.send_outbound_emails - LOKOLE_QUEUE_POLL_SECONDS: ${LOKOLE_QUEUE_POLL_SECONDS} + CONNEXION_SPEC: opwen_email_server/static/send-outbound-emails.yaml + GUNICORN_WORKERS: ${GUNICORN_WORKERS} LOKOLE_LOG_LEVEL: ${LOKOLE_LOG_LEVEL} - env_file: - - ./secrets/azure.env - - ./secrets/sendgrid.env + DOTENV_SECRETS: azure;sendgrid + secrets: + - azure + - sendgrid - jobstoreinboundemails: - image: ${DOCKER_REPO}/opwenserver_job:${BUILD_TAG} + apistoreinboundemails: + image: ${DOCKER_REPO}/opwenserver_api:${BUILD_TAG} build: context: . - dockerfile: docker/job/Dockerfile + dockerfile: docker/api/Dockerfile environment: - JOB_NAME: opwen_email_server.jobs.store_inbound_emails - LOKOLE_QUEUE_POLL_SECONDS: ${LOKOLE_QUEUE_POLL_SECONDS} + CONNEXION_SPEC: opwen_email_server/static/store-inbound-emails.yaml + GUNICORN_WORKERS: ${GUNICORN_WORKERS} LOKOLE_LOG_LEVEL: ${LOKOLE_LOG_LEVEL} - env_file: - - ./secrets/azure.env - - ./secrets/sendgrid.env + DOTENV_SECRETS: azure;sendgrid + secrets: + - azure + - sendgrid - jobstorewritteclientemails: - image: ${DOCKER_REPO}/opwenserver_job:${BUILD_TAG} + apistorewrittenclientemails: + image: ${DOCKER_REPO}/opwenserver_api:${BUILD_TAG} build: context: . - dockerfile: docker/job/Dockerfile + dockerfile: docker/api/Dockerfile environment: - JOB_NAME: opwen_email_server.jobs.store_written_client_emails - LOKOLE_QUEUE_POLL_SECONDS: ${LOKOLE_QUEUE_POLL_SECONDS} + CONNEXION_SPEC: opwen_email_server/static/store-written-emails.yaml + GUNICORN_WORKERS: ${GUNICORN_WORKERS} LOKOLE_LOG_LEVEL: ${LOKOLE_LOG_LEVEL} - env_file: - - ./secrets/azure.env - - ./secrets/sendgrid.env + DOTENV_SECRETS: azure;sendgrid + secrets: + - azure + - sendgrid + + connectorsendoutboundemails: + image: ${DOCKER_REPO}/opwenserver_queueconnector:${BUILD_TAG} + build: + context: . + dockerfile: docker/queueconnector/Dockerfile + environment: + LOKOLE_POST_URL: http://apisendoutboundemails/job/email/outbound/send + LOKOLE_SOURCE_QUEUE: sengridoutboundemails + DOTENV_SECRETS: azure + secrets: + - azure + depends_on: + - apisendoutboundemails + + connectorstoreinboundemails: + image: ${DOCKER_REPO}/opwenserver_queueconnector:${BUILD_TAG} + build: + context: . + dockerfile: docker/queueconnector/Dockerfile + environment: + LOKOLE_POST_URL: http://apistoreinboundemails/job/email/inbound/store + LOKOLE_SOURCE_QUEUE: sengridinboundemails + DOTENV_SECRETS: azure + secrets: + - azure + depends_on: + - apistoreinboundemails + + connectorstorewrittenclientemails: + image: ${DOCKER_REPO}/opwenserver_queueconnector:${BUILD_TAG} + build: + context: . + dockerfile: docker/queueconnector/Dockerfile + environment: + LOKOLE_POST_URL: http://apistorewrittenclientemails/job/email/client/store + LOKOLE_SOURCE_QUEUE: lokoleinboundemails + DOTENV_SECRETS: azure + secrets: + - azure + depends_on: + - apistorewrittenclientemails + +secrets: + azure: + file: ./secrets/azure.env + sendgrid: + file: ./secrets/sendgrid.env diff --git a/docker/api/Dockerfile b/docker/api/Dockerfile index 073667e1..3f667aef 100644 --- a/docker/api/Dockerfile +++ b/docker/api/Dockerfile @@ -1,6 +1,6 @@ FROM python:3.6 -ENV API_NAME="SET_ME" +ENV CONNEXION_SPEC="SET_ME" ENV GUNICORN_WORKERS="1" ADD requirements.txt /app/requirements.txt @@ -15,8 +15,10 @@ RUN apt-get update \ ADD opwen_email_server /app/opwen_email_server ADD runserver.py /app/server.py ADD docker/api/healthcheck.sh /app/healthcheck.sh +ADD docker/docker-entrypoint.sh /docker-entrypoint.sh EXPOSE 80 WORKDIR /app HEALTHCHECK --interval=59m --timeout=5s CMD /app/healthcheck.sh -CMD "gunicorn" "-w" "${GUNICORN_WORKERS}" "-b" "0.0.0.0:80" "server:build_app(apis=['/app/${API_NAME}', '/app/opwen_email_server/static/healthcheck-spec.yaml'], server='tornado')" +ENTRYPOINT ["/docker-entrypoint.sh"] +CMD "gunicorn" "-w" "${GUNICORN_WORKERS}" "-b" "0.0.0.0:80" "server:build_app(apis=['/app/${CONNEXION_SPEC}', '/app/opwen_email_server/static/healthcheck-spec.yaml'], server='tornado')" diff --git a/docker/docker-entrypoint.sh b/docker/docker-entrypoint.sh new file mode 100755 index 00000000..dcd1bd1d --- /dev/null +++ b/docker/docker-entrypoint.sh @@ -0,0 +1,17 @@ +#!/usr/bin/env sh +# This docker-entrypoint populates environment variables from docker secrets. +# The docker secrets are assumed to be files in dotenv syntax. The requested +# secrets should be declared in the environment variable DOTENV_SECRETS, with +# multiple secret names separated by a semi-colon. + +if [ ! -d /run/secrets ]; then + exec "$@" +fi + +if [ -z "${DOTENV_SECRETS}" ]; then + exec "$@" +fi + +eval "$(find /run/secrets -maxdepth 1 -type f | grep "$(echo "${DOTENV_SECRETS}" | sed 's/;/\\|/g')" | xargs cat | grep -v '^#' | sed 's/^\([^=]\+\)=\(.*\)$/if [ -z "$\1" ]; then \1="\2"; export \1; fi/g')" + +exec "$@" diff --git a/docker/job/Dockerfile b/docker/job/Dockerfile deleted file mode 100644 index 72dbac7c..00000000 --- a/docker/job/Dockerfile +++ /dev/null @@ -1,20 +0,0 @@ -FROM python:3.6 - -ENV JOB_NAME="SET_ME" -ENV LOKOLE_QUEUE_ERROR_FILE="/app/queue_health.txt" - -ADD requirements.txt /app/requirements.txt -RUN apt-get update \ - && apt-get install -y libffi-dev libssl-dev ca-certificates \ - && pip3 --no-cache-dir -q install -U pip setuptools \ - && pip3 --no-cache-dir -q install -r /app/requirements.txt \ - && touch ${LOKOLE_QUEUE_ERROR_FILE} \ - && rm -rf /var/lib/apt/lists/* - -ADD opwen_email_server /app/opwen_email_server - -ADD docker/job/healthcheck.sh /app/healthcheck.sh - -WORKDIR /app -HEALTHCHECK CMD /app/healthcheck.sh -CMD "python3" "-m" "${JOB_NAME}" diff --git a/docker/job/healthcheck.sh b/docker/job/healthcheck.sh deleted file mode 100755 index 955568d8..00000000 --- a/docker/job/healthcheck.sh +++ /dev/null @@ -1,8 +0,0 @@ -#!/usr/bin/env sh - -if test -s "$LOKOLE_QUEUE_ERROR_FILE"; then - cat "$LOKOLE_QUEUE_ERROR_FILE" >&2 - exit 1 -fi - -exit 0 diff --git a/docker/queueconnector/Dockerfile b/docker/queueconnector/Dockerfile new file mode 100644 index 00000000..1189002e --- /dev/null +++ b/docker/queueconnector/Dockerfile @@ -0,0 +1,16 @@ +FROM microsoft/dotnet:2.0-sdk-stretch AS builder + +WORKDIR /app +COPY opwen_queue_connector/*.csproj ./ +RUN dotnet restore +COPY opwen_queue_connector/*.cs ./ +RUN dotnet publish -c Release -o out + +FROM microsoft/dotnet:2.0-runtime-stretch + +WORKDIR /app +ADD docker/docker-entrypoint.sh /docker-entrypoint.sh +COPY --from=builder /app/out ./ + +ENTRYPOINT ["/docker-entrypoint.sh"] +CMD ["dotnet", "queueconnector.dll"] diff --git a/docker/setup/arm.parameters.json b/docker/setup/arm.parameters.json index 8ef37696..46e827dd 100644 --- a/docker/setup/arm.parameters.json +++ b/docker/setup/arm.parameters.json @@ -4,6 +4,9 @@ "parameters": { "storageAccountSKU": { "value": "" + }, + "serviceBusSKU": { + "value": "" } } } diff --git a/docker/setup/arm.template.json b/docker/setup/arm.template.json index b22048ed..d4827ccd 100644 --- a/docker/setup/arm.template.json +++ b/docker/setup/arm.template.json @@ -14,6 +14,17 @@ "metadata": { "description": "The pricing tier of the Storage Account resources that will be created for the project." } + }, + "serviceBusSKU": { + "type": "string", + "defaultValue": "Basic", + "allowedValues": [ + "Basic", + "Standard" + ], + "metadata": { + "description": "The pricing tier of the Service Bus resource that will be created for the project." + } } }, "variables": { @@ -22,6 +33,10 @@ "serverBlobsName": "[take(concat('opwenserverblobs', uniqueString(subscription().subscriptionId)), 22)]", "serverTablesName": "[take(concat('opwenservertables', uniqueString(subscription().subscriptionId)), 22)]", "serverQueuesName": "[take(concat('opwenserverqueues', uniqueString(subscription().subscriptionId)), 22)]", + "serverQueuesSasName": "worker", + "serverQueueClientPackage": "lokoleinboundemails", + "serverQueueEmailSend": "sengridoutboundemails", + "serverQueueSendgridMime": "sengridinboundemails", "location": "[resourceGroup().location]" }, "resources": [ @@ -69,15 +84,57 @@ "properties": {} }, { - "type": "Microsoft.Storage/storageAccounts", - "kind": "Storage", - "apiVersion": "2016-01-01", + "apiVersion": "2017-04-01", "name": "[variables('serverQueuesName')]", + "type": "Microsoft.ServiceBus/namespaces", "location": "[variables('location')]", "sku": { - "name": "[parameters('storageAccountSKU')]" + "name": "[parameters('serviceBusSKU')]" }, - "properties": {} + "properties": {}, + "resources": [ + { + "type": "AuthorizationRules", + "name": "[variables('serverQueuesSasName')]", + "apiVersion": "2017-04-01", + "properties": { + "rights": [ + "Listen", + "Send" + ] + }, + "dependsOn": [ + "[resourceId('Microsoft.ServiceBus/namespaces', variables('serverQueuesName'))]" + ] + }, + { + "type": "queues", + "name": "[variables('serverQueueClientPackage')]", + "apiVersion": "2017-04-01", + "properties": {}, + "dependsOn": [ + "[resourceId('Microsoft.ServiceBus/namespaces', variables('serverQueuesName'))]" + ] + }, + { + "type": "queues", + "name": "[variables('serverQueueEmailSend')]", + "apiVersion": "2017-04-01", + "properties": {}, + "dependsOn": [ + "[resourceId('Microsoft.ServiceBus/namespaces', variables('serverQueuesName'))]" + ] + }, + { + "type": "queues", + "name": "[variables('serverQueueSendgridMime')]", + "apiVersion": "2017-04-01", + "properties": {}, + "dependsOn": [ + "[resourceId('Microsoft.ServiceBus/namespaces', variables('serverQueuesName'))]" + ] + } + ] } ], "outputs": { @@ -87,7 +144,7 @@ }, "appinsightsKey": { "type": "string", - "value": "[reference(resourceId('Microsoft.Insights/components', variables('appinsightsName')), '2014-04-01').InstrumentationKey]", + "value": "[reference(resourceId('Microsoft.Insights/components', variables('appinsightsName')), '2014-04-01').InstrumentationKey]" }, "clientBlobsName": { "type": "string", @@ -117,9 +174,13 @@ "type": "string", "value": "[variables('serverQueuesName')]" }, - "serverQueuesKey": { + "serverQueuesSasName": { + "type": "string", + "value": "[variables('serverQueuesSasName')]" + }, + "serverQueuesSasKey": { "type": "string", - "value": "[listKeys(resourceId('Microsoft.Storage/storageAccounts', variables('serverQueuesName')), '2016-01-01').keys[0].value]" + "value": "[listKeys(resourceId('Microsoft.ServiceBus/namespaces/AuthorizationRules', variables('serverQueuesName'), variables('serverQueuesSasName')), '2017-04-01').primaryKey]" } } } diff --git a/docker/setup/setup.sh b/docker/setup/setup.sh index 6e8098a2..1682798f 100755 --- a/docker/setup/setup.sh +++ b/docker/setup/setup.sh @@ -16,6 +16,7 @@ ## ## Optional environment variables: ## +## SERVICE_BUS_SKU ## STORAGE_ACCOUNT_SKU ## ## KUBERNETES_RESOURCE_GROUP_NAME @@ -61,6 +62,7 @@ use_resource_group "${RESOURCE_GROUP_NAME}" # storageaccountsku="${STORAGE_ACCOUNT_SKU:-Standard_GRS}" +servicebussku="${SERVICE_BUS_SKU:-Basic}" deploymentname="opwendeployment$(generate_identifier 8)" log "Creating resources via deployment ${deploymentname}" @@ -69,6 +71,7 @@ az group deployment create \ --name "${deploymentname}" \ --template-file "${scriptdir}/arm.template.json" \ --parameters storageAccountSKU="${storageaccountsku}" \ + --parameters serviceBusSKU="${servicebussku}" \ > /tmp/deployment.json cat > /secrets/azure.env << EOF @@ -78,10 +81,11 @@ LOKOLE_CLIENT_AZURE_STORAGE_KEY=$(jq -r .properties.outputs.clientBlobsKey.value LOKOLE_CLIENT_AZURE_STORAGE_NAME=$(jq -r .properties.outputs.clientBlobsName.value /tmp/deployment.json) LOKOLE_EMAIL_SERVER_AZURE_BLOBS_KEY=$(jq -r .properties.outputs.serverBlobsKey.value /tmp/deployment.json) LOKOLE_EMAIL_SERVER_AZURE_BLOBS_NAME=$(jq -r .properties.outputs.serverBlobsName.value /tmp/deployment.json) -LOKOLE_EMAIL_SERVER_AZURE_QUEUES_KEY=$(jq -r .properties.outputs.serverQueuesKey.value /tmp/deployment.json) -LOKOLE_EMAIL_SERVER_AZURE_QUEUES_NAME=$(jq -r .properties.outputs.serverQueuesName.value /tmp/deployment.json) LOKOLE_EMAIL_SERVER_AZURE_TABLES_KEY=$(jq -r .properties.outputs.serverTablesKey.value /tmp/deployment.json) LOKOLE_EMAIL_SERVER_AZURE_TABLES_NAME=$(jq -r .properties.outputs.serverTablesName.value /tmp/deployment.json) +LOKOLE_EMAIL_SERVER_QUEUES_NAMESPACE=$(jq -r .properties.outputs.serverQueuesName.value /tmp/deployment.json) +LOKOLE_EMAIL_SERVER_QUEUES_SAS_NAME=$(jq -r .properties.outputs.serverQueuesSasName.value /tmp/deployment.json) +LOKOLE_EMAIL_SERVER_QUEUES_SAS_KEY=$(jq -r .properties.outputs.serverQueuesSasKey.value /tmp/deployment.json) EOF cat > /secrets/sendgrid.env << EOF diff --git a/helm/templates/api-client-read-deployment.yaml b/helm/templates/api-client-read-deployment.yaml index b3a57af5..177089f8 100644 --- a/helm/templates/api-client-read-deployment.yaml +++ b/helm/templates/api-client-read-deployment.yaml @@ -21,7 +21,7 @@ spec: - name: apiclientread image: {{.Values.version.imageRegistry}}/opwenserver_api:{{.Values.version.dockerTag}} env: - - name: API_NAME + - name: CONNEXION_SPEC value: opwen_email_server/static/client-read-spec.yaml - name: GUNICORN_WORKERS value: "{{.Values.server.gunicornWorkers}}" @@ -47,16 +47,6 @@ spec: secretKeyRef: name: azure key: LOKOLE_EMAIL_SERVER_AZURE_BLOBS_NAME - - name: LOKOLE_EMAIL_SERVER_AZURE_QUEUES_KEY - valueFrom: - secretKeyRef: - name: azure - key: LOKOLE_EMAIL_SERVER_AZURE_QUEUES_KEY - - name: LOKOLE_EMAIL_SERVER_AZURE_QUEUES_NAME - valueFrom: - secretKeyRef: - name: azure - key: LOKOLE_EMAIL_SERVER_AZURE_QUEUES_NAME - name: LOKOLE_EMAIL_SERVER_AZURE_TABLES_KEY valueFrom: secretKeyRef: diff --git a/helm/templates/api-client-write-deployment.yaml b/helm/templates/api-client-write-deployment.yaml index d66cc644..b60506ba 100644 --- a/helm/templates/api-client-write-deployment.yaml +++ b/helm/templates/api-client-write-deployment.yaml @@ -21,7 +21,7 @@ spec: - name: apiclientwrite image: {{.Values.version.imageRegistry}}/opwenserver_api:{{.Values.version.dockerTag}} env: - - name: API_NAME + - name: CONNEXION_SPEC value: opwen_email_server/static/client-write-spec.yaml - name: GUNICORN_WORKERS value: "{{.Values.server.gunicornWorkers}}" @@ -47,16 +47,6 @@ spec: secretKeyRef: name: azure key: LOKOLE_EMAIL_SERVER_AZURE_BLOBS_NAME - - name: LOKOLE_EMAIL_SERVER_AZURE_QUEUES_KEY - valueFrom: - secretKeyRef: - name: azure - key: LOKOLE_EMAIL_SERVER_AZURE_QUEUES_KEY - - name: LOKOLE_EMAIL_SERVER_AZURE_QUEUES_NAME - valueFrom: - secretKeyRef: - name: azure - key: LOKOLE_EMAIL_SERVER_AZURE_QUEUES_NAME - name: LOKOLE_EMAIL_SERVER_AZURE_TABLES_KEY valueFrom: secretKeyRef: diff --git a/helm/templates/api-email-receive-deployment.yaml b/helm/templates/api-email-receive-deployment.yaml index 0c8307e7..05f4d825 100644 --- a/helm/templates/api-email-receive-deployment.yaml +++ b/helm/templates/api-email-receive-deployment.yaml @@ -21,7 +21,7 @@ spec: - name: apiemailreceive image: {{.Values.version.imageRegistry}}/opwenserver_api:{{.Values.version.dockerTag}} env: - - name: API_NAME + - name: CONNEXION_SPEC value: opwen_email_server/static/email-receive-spec.yaml - name: GUNICORN_WORKERS value: "{{.Values.server.gunicornWorkers}}" @@ -47,16 +47,6 @@ spec: secretKeyRef: name: azure key: LOKOLE_EMAIL_SERVER_AZURE_BLOBS_NAME - - name: LOKOLE_EMAIL_SERVER_AZURE_QUEUES_KEY - valueFrom: - secretKeyRef: - name: azure - key: LOKOLE_EMAIL_SERVER_AZURE_QUEUES_KEY - - name: LOKOLE_EMAIL_SERVER_AZURE_QUEUES_NAME - valueFrom: - secretKeyRef: - name: azure - key: LOKOLE_EMAIL_SERVER_AZURE_QUEUES_NAME - name: LOKOLE_EMAIL_SERVER_AZURE_TABLES_KEY valueFrom: secretKeyRef: @@ -77,6 +67,21 @@ spec: secretKeyRef: name: sendgrid key: LOKOLE_SENDGRID_KEY + - name: LOKOLE_EMAIL_SERVER_QUEUES_NAMESPACE + valueFrom: + secretKeyRef: + name: azure + key: LOKOLE_EMAIL_SERVER_QUEUES_NAMESPACE + - name: LOKOLE_EMAIL_SERVER_QUEUES_SAS_NAME + valueFrom: + secretKeyRef: + name: azure + key: LOKOLE_EMAIL_SERVER_QUEUES_SAS_NAME + - name: LOKOLE_EMAIL_SERVER_QUEUES_SAS_KEY + valueFrom: + secretKeyRef: + name: azure + key: LOKOLE_EMAIL_SERVER_QUEUES_SAS_KEY ports: - containerPort: 80 resources: {} diff --git a/helm/templates/job-store-inbound-emails-autoscaler.yaml b/helm/templates/api-send-outbound-emails-autoscaler.yaml similarity index 84% rename from helm/templates/job-store-inbound-emails-autoscaler.yaml rename to helm/templates/api-send-outbound-emails-autoscaler.yaml index 91317ac5..ed84db7e 100644 --- a/helm/templates/job-store-inbound-emails-autoscaler.yaml +++ b/helm/templates/api-send-outbound-emails-autoscaler.yaml @@ -2,12 +2,12 @@ apiVersion: autoscaling/v1 kind: HorizontalPodAutoscaler metadata: creationTimestamp: null - name: jobstoreinboundemails + name: apisendoutboundemails spec: maxReplicas: {{.Values.autoscale.maxPods}} minReplicas: {{.Values.autoscale.minPods}} scaleTargetRef: apiVersion: extensions/v1beta1 kind: Deployment - name: jobstoreinboundemails + name: apisendoutboundemails targetCPUUtilizationPercentage: {{.Values.autoscale.cpuThreshold}} diff --git a/helm/templates/job-store-written-client-emails-deployment.yaml b/helm/templates/api-send-outbound-emails-deployment.yaml similarity index 63% rename from helm/templates/job-store-written-client-emails-deployment.yaml rename to helm/templates/api-send-outbound-emails-deployment.yaml index 2b86debf..40f20df7 100644 --- a/helm/templates/job-store-written-client-emails-deployment.yaml +++ b/helm/templates/api-send-outbound-emails-deployment.yaml @@ -6,8 +6,8 @@ metadata: kompose.version: 1.13.0 (84fa826) creationTimestamp: null labels: - io.kompose.service: jobstorewrittenclientemails - name: jobstorewrittenclientemails + io.kompose.service: apisendoutboundemails + name: apisendoutboundemails spec: replicas: 1 strategy: {} @@ -15,18 +15,40 @@ spec: metadata: creationTimestamp: null labels: - io.kompose.service: jobstorewrittenclientemails + io.kompose.service: apisendoutboundemails spec: containers: - - name: jobstorewrittenclientemails - image: {{.Values.version.imageRegistry}}/opwenserver_job:{{.Values.version.dockerTag}} + - name: connectorsendoutboundemails + image: {{.Values.version.imageRegistry}}/opwenserver_queueconnector:{{.Values.version.dockerTag}} env: - - name: JOB_NAME - value: opwen_email_server.jobs.store_written_client_emails + - name: LOKOLE_POST_URL + value: http://0.0.0.0/job/email/outbound/send + - name: LOKOLE_SOURCE_QUEUE + value: sengridoutboundemails + - name: LOKOLE_EMAIL_SERVER_QUEUES_NAMESPACE + valueFrom: + secretKeyRef: + name: azure + key: LOKOLE_EMAIL_SERVER_QUEUES_NAMESPACE + - name: LOKOLE_EMAIL_SERVER_QUEUES_SAS_NAME + valueFrom: + secretKeyRef: + name: azure + key: LOKOLE_EMAIL_SERVER_QUEUES_SAS_NAME + - name: LOKOLE_EMAIL_SERVER_QUEUES_SAS_KEY + valueFrom: + secretKeyRef: + name: azure + key: LOKOLE_EMAIL_SERVER_QUEUES_SAS_KEY + - name: apisendoutboundemails + image: {{.Values.version.imageRegistry}}/opwenserver_api:{{.Values.version.dockerTag}} + env: + - name: CONNEXION_SPEC + value: opwen_email_server/static/send-outbound-emails.yaml + - name: GUNICORN_WORKERS + value: "{{.Values.server.gunicornWorkers}}" - name: LOKOLE_LOG_LEVEL value: {{.Values.logging.level}} - - name: LOKOLE_QUEUE_POLL_SECONDS - value: "{{.Values.worker.pollingIntervalSeconds}}" - name: LOKOLE_CLIENT_AZURE_STORAGE_KEY valueFrom: secretKeyRef: @@ -47,16 +69,6 @@ spec: secretKeyRef: name: azure key: LOKOLE_EMAIL_SERVER_AZURE_BLOBS_NAME - - name: LOKOLE_EMAIL_SERVER_AZURE_QUEUES_KEY - valueFrom: - secretKeyRef: - name: azure - key: LOKOLE_EMAIL_SERVER_AZURE_QUEUES_KEY - - name: LOKOLE_EMAIL_SERVER_AZURE_QUEUES_NAME - valueFrom: - secretKeyRef: - name: azure - key: LOKOLE_EMAIL_SERVER_AZURE_QUEUES_NAME - name: LOKOLE_EMAIL_SERVER_AZURE_TABLES_KEY valueFrom: secretKeyRef: @@ -77,6 +89,8 @@ spec: secretKeyRef: name: sendgrid key: LOKOLE_SENDGRID_KEY + ports: + - containerPort: 80 resources: {} restartPolicy: Always status: {} diff --git a/helm/templates/job-send-outbound-emails-autoscaler.yaml b/helm/templates/api-store-inbound-emails-autoscaler.yaml similarity index 84% rename from helm/templates/job-send-outbound-emails-autoscaler.yaml rename to helm/templates/api-store-inbound-emails-autoscaler.yaml index f0114e78..6974fa2f 100644 --- a/helm/templates/job-send-outbound-emails-autoscaler.yaml +++ b/helm/templates/api-store-inbound-emails-autoscaler.yaml @@ -2,12 +2,12 @@ apiVersion: autoscaling/v1 kind: HorizontalPodAutoscaler metadata: creationTimestamp: null - name: jobsendoutboundemails + name: apistoreinboundemails spec: maxReplicas: {{.Values.autoscale.maxPods}} minReplicas: {{.Values.autoscale.minPods}} scaleTargetRef: apiVersion: extensions/v1beta1 kind: Deployment - name: jobsendoutboundemails + name: apistoreinboundemails targetCPUUtilizationPercentage: {{.Values.autoscale.cpuThreshold}} diff --git a/helm/templates/job-store-inbound-emails-deployment.yaml b/helm/templates/api-store-inbound-emails-deployment.yaml similarity index 63% rename from helm/templates/job-store-inbound-emails-deployment.yaml rename to helm/templates/api-store-inbound-emails-deployment.yaml index c353860e..85060a41 100644 --- a/helm/templates/job-store-inbound-emails-deployment.yaml +++ b/helm/templates/api-store-inbound-emails-deployment.yaml @@ -6,8 +6,8 @@ metadata: kompose.version: 1.13.0 (84fa826) creationTimestamp: null labels: - io.kompose.service: jobstoreinboundemails - name: jobstoreinboundemails + io.kompose.service: apistoreinboundemails + name: apistoreinboundemails spec: replicas: 1 strategy: {} @@ -15,18 +15,40 @@ spec: metadata: creationTimestamp: null labels: - io.kompose.service: jobstoreinboundemails + io.kompose.service: apistoreinboundemails spec: containers: - - name: jobstoreinboundemails - image: {{.Values.version.imageRegistry}}/opwenserver_job:{{.Values.version.dockerTag}} + - name: connectorstoreinboundemails + image: {{.Values.version.imageRegistry}}/opwenserver_queueconnector:{{.Values.version.dockerTag}} env: - - name: JOB_NAME - value: opwen_email_server.jobs.store_inbound_emails + - name: LOKOLE_POST_URL + value: http://0.0.0.0/job/email/inbound/store + - name: LOKOLE_SOURCE_QUEUE + value: sengridinboundemails + - name: LOKOLE_EMAIL_SERVER_QUEUES_NAMESPACE + valueFrom: + secretKeyRef: + name: azure + key: LOKOLE_EMAIL_SERVER_QUEUES_NAMESPACE + - name: LOKOLE_EMAIL_SERVER_QUEUES_SAS_NAME + valueFrom: + secretKeyRef: + name: azure + key: LOKOLE_EMAIL_SERVER_QUEUES_SAS_NAME + - name: LOKOLE_EMAIL_SERVER_QUEUES_SAS_KEY + valueFrom: + secretKeyRef: + name: azure + key: LOKOLE_EMAIL_SERVER_QUEUES_SAS_KEY + - name: apistoreinboundemails + image: {{.Values.version.imageRegistry}}/opwenserver_api:{{.Values.version.dockerTag}} + env: + - name: CONNEXION_SPEC + value: opwen_email_server/static/store-inbound-emails.yaml + - name: GUNICORN_WORKERS + value: "{{.Values.server.gunicornWorkers}}" - name: LOKOLE_LOG_LEVEL value: {{.Values.logging.level}} - - name: LOKOLE_QUEUE_POLL_SECONDS - value: "{{.Values.worker.pollingIntervalSeconds}}" - name: LOKOLE_CLIENT_AZURE_STORAGE_KEY valueFrom: secretKeyRef: @@ -47,16 +69,6 @@ spec: secretKeyRef: name: azure key: LOKOLE_EMAIL_SERVER_AZURE_BLOBS_NAME - - name: LOKOLE_EMAIL_SERVER_AZURE_QUEUES_KEY - valueFrom: - secretKeyRef: - name: azure - key: LOKOLE_EMAIL_SERVER_AZURE_QUEUES_KEY - - name: LOKOLE_EMAIL_SERVER_AZURE_QUEUES_NAME - valueFrom: - secretKeyRef: - name: azure - key: LOKOLE_EMAIL_SERVER_AZURE_QUEUES_NAME - name: LOKOLE_EMAIL_SERVER_AZURE_TABLES_KEY valueFrom: secretKeyRef: @@ -77,6 +89,8 @@ spec: secretKeyRef: name: sendgrid key: LOKOLE_SENDGRID_KEY + ports: + - containerPort: 80 resources: {} restartPolicy: Always status: {} diff --git a/helm/templates/job-store-written-client-emails-autoscaler.yaml b/helm/templates/api-store-written-client-emails-autoscaler.yaml similarity index 81% rename from helm/templates/job-store-written-client-emails-autoscaler.yaml rename to helm/templates/api-store-written-client-emails-autoscaler.yaml index c3fd0e33..696beec8 100644 --- a/helm/templates/job-store-written-client-emails-autoscaler.yaml +++ b/helm/templates/api-store-written-client-emails-autoscaler.yaml @@ -2,12 +2,12 @@ apiVersion: autoscaling/v1 kind: HorizontalPodAutoscaler metadata: creationTimestamp: null - name: jobstorewrittenclientemails + name: apistorewrittenclientemails spec: maxReplicas: {{.Values.autoscale.maxPods}} minReplicas: {{.Values.autoscale.minPods}} scaleTargetRef: apiVersion: extensions/v1beta1 kind: Deployment - name: jobstorewrittenclientemails + name: apistorewrittenclientemails targetCPUUtilizationPercentage: {{.Values.autoscale.cpuThreshold}} diff --git a/helm/templates/job-send-outbound-emails-deployment.yaml b/helm/templates/api-store-written-client-emails-deployment.yaml similarity index 63% rename from helm/templates/job-send-outbound-emails-deployment.yaml rename to helm/templates/api-store-written-client-emails-deployment.yaml index 3d62b230..f61f60bc 100644 --- a/helm/templates/job-send-outbound-emails-deployment.yaml +++ b/helm/templates/api-store-written-client-emails-deployment.yaml @@ -6,8 +6,8 @@ metadata: kompose.version: 1.13.0 (84fa826) creationTimestamp: null labels: - io.kompose.service: jobsendoutboundemails - name: jobsendoutboundemails + io.kompose.service: apistorewrittenclientemails + name: apistorewrittenclientemails spec: replicas: 1 strategy: {} @@ -15,18 +15,40 @@ spec: metadata: creationTimestamp: null labels: - io.kompose.service: jobsendoutboundemails + io.kompose.service: apistorewrittenclientemails spec: containers: - - name: jobsendoutboundemails - image: {{.Values.version.imageRegistry}}/opwenserver_job:{{.Values.version.dockerTag}} + - name: connectorstorewrittenclientemails + image: {{.Values.version.imageRegistry}}/opwenserver_queueconnector:{{.Values.version.dockerTag}} env: - - name: JOB_NAME - value: opwen_email_server.jobs.send_outbound_emails + - name: LOKOLE_POST_URL + value: http://0.0.0.0/job/email/client/store + - name: LOKOLE_SOURCE_QUEUE + value: lokoleinboundemails + - name: LOKOLE_EMAIL_SERVER_QUEUES_NAMESPACE + valueFrom: + secretKeyRef: + name: azure + key: LOKOLE_EMAIL_SERVER_QUEUES_NAMESPACE + - name: LOKOLE_EMAIL_SERVER_QUEUES_SAS_NAME + valueFrom: + secretKeyRef: + name: azure + key: LOKOLE_EMAIL_SERVER_QUEUES_SAS_NAME + - name: LOKOLE_EMAIL_SERVER_QUEUES_SAS_KEY + valueFrom: + secretKeyRef: + name: azure + key: LOKOLE_EMAIL_SERVER_QUEUES_SAS_KEY + - name: apistorewrittenclientemails + image: {{.Values.version.imageRegistry}}/opwenserver_api:{{.Values.version.dockerTag}} + env: + - name: CONNEXION_SPEC + value: opwen_email_server/static/store-written-client-emails.yaml + - name: GUNICORN_WORKERS + value: "{{.Values.server.gunicornWorkers}}" - name: LOKOLE_LOG_LEVEL value: {{.Values.logging.level}} - - name: LOKOLE_QUEUE_POLL_SECONDS - value: "{{.Values.worker.pollingIntervalSeconds}}" - name: LOKOLE_CLIENT_AZURE_STORAGE_KEY valueFrom: secretKeyRef: @@ -47,16 +69,6 @@ spec: secretKeyRef: name: azure key: LOKOLE_EMAIL_SERVER_AZURE_BLOBS_NAME - - name: LOKOLE_EMAIL_SERVER_AZURE_QUEUES_KEY - valueFrom: - secretKeyRef: - name: azure - key: LOKOLE_EMAIL_SERVER_AZURE_QUEUES_KEY - - name: LOKOLE_EMAIL_SERVER_AZURE_QUEUES_NAME - valueFrom: - secretKeyRef: - name: azure - key: LOKOLE_EMAIL_SERVER_AZURE_QUEUES_NAME - name: LOKOLE_EMAIL_SERVER_AZURE_TABLES_KEY valueFrom: secretKeyRef: @@ -77,6 +89,8 @@ spec: secretKeyRef: name: sendgrid key: LOKOLE_SENDGRID_KEY + ports: + - containerPort: 80 resources: {} restartPolicy: Always status: {} diff --git a/helm/values.yaml b/helm/values.yaml index 779277dd..623ff964 100644 --- a/helm/values.yaml +++ b/helm/values.yaml @@ -5,9 +5,6 @@ version: server: gunicornWorkers: 2 -worker: - pollingIntervalSeconds: 120 - logging: level: INFO diff --git a/opwen_email_server/api/client_write.py b/opwen_email_server/api/client_write.py index 5757afe8..f64e015f 100644 --- a/opwen_email_server/api/client_write.py +++ b/opwen_email_server/api/client_write.py @@ -6,7 +6,9 @@ from opwen_email_server.services.queue import AzureQueue from opwen_email_server.utils.log import LogMixin -QUEUE = AzureQueue(account=config.QUEUES_ACCOUNT, key=config.QUEUES_KEY, +QUEUE = AzureQueue(namespace=config.QUEUES_NAMESPACE, + sas_key=config.QUEUES_SAS_KEY, + sas_name=config.QUEUES_SAS_NAME, name=constants.QUEUE_CLIENT_PACKAGE) CLIENTS = AzureAuth(account=config.TABLES_ACCOUNT, key=config.TABLES_KEY, diff --git a/opwen_email_server/api/email_receive.py b/opwen_email_server/api/email_receive.py index ff47d655..d6f08ad8 100644 --- a/opwen_email_server/api/email_receive.py +++ b/opwen_email_server/api/email_receive.py @@ -12,7 +12,9 @@ key=config.BLOBS_KEY, container=constants.CONTAINER_SENDGRID_MIME) -QUEUE = AzureQueue(account=config.QUEUES_ACCOUNT, key=config.QUEUES_KEY, +QUEUE = AzureQueue(namespace=config.QUEUES_NAMESPACE, + sas_key=config.QUEUES_SAS_KEY, + sas_name=config.QUEUES_SAS_NAME, name=constants.QUEUE_SENDGRID_MIME) CLIENTS = AzureAuth(account=config.TABLES_ACCOUNT, key=config.TABLES_KEY, diff --git a/opwen_email_server/api/send_outbound_emails.py b/opwen_email_server/api/send_outbound_emails.py new file mode 100644 index 00000000..773b5664 --- /dev/null +++ b/opwen_email_server/api/send_outbound_emails.py @@ -0,0 +1,17 @@ +from opwen_email_server.backend import email_sender +from opwen_email_server.backend import server_datastore +from opwen_email_server.utils.log import LogMixin + + +class _Sender(LogMixin): + def __call__(self, resource_id: str): + email = server_datastore.fetch_email(resource_id) + self.log_info('Fetched outbound email %s for sending', resource_id) + + email_sender.send(email) + self.log_info('Done sending outbound email %s', resource_id) + + return 'OK', 200 + + +send = _Sender() diff --git a/opwen_email_server/jobs/store_inbound_emails.py b/opwen_email_server/api/store_inbound_emails.py similarity index 65% rename from opwen_email_server/jobs/store_inbound_emails.py rename to opwen_email_server/api/store_inbound_emails.py index 81790879..228c6869 100644 --- a/opwen_email_server/jobs/store_inbound_emails.py +++ b/opwen_email_server/api/store_inbound_emails.py @@ -1,16 +1,12 @@ from opwen_email_server.api import email_receive from opwen_email_server.backend import server_datastore -from opwen_email_server.services.queue_consumer import QueueConsumer from opwen_email_server.utils.email_parser import inline_images from opwen_email_server.utils.email_parser import parse_mime_email +from opwen_email_server.utils.log import LogMixin -class Job(QueueConsumer): - def __init__(self): - super().__init__(email_receive.QUEUE.dequeue) - - def _process_message(self, message: dict): - resource_id = message['resource_id'] +class _InboundStorer(LogMixin): + def __call__(self, resource_id: str): mime_email = email_receive.STORAGE.fetch_text(resource_id) self.log_info('Fetched inbound MIME email %s', resource_id) @@ -22,7 +18,7 @@ def _process_message(self, message: dict): email_receive.STORAGE.delete(resource_id) self.log_info('Deleted inbound MIME email %s', resource_id) + return 'OK', 200 + -if __name__ == '__main__': - from opwen_email_server.services.queue_consumer import cli - cli(Job) +store = _InboundStorer() diff --git a/opwen_email_server/jobs/store_written_client_emails.py b/opwen_email_server/api/store_written_client_emails.py similarity index 60% rename from opwen_email_server/jobs/store_written_client_emails.py rename to opwen_email_server/api/store_written_client_emails.py index 5770a53a..2625c30b 100644 --- a/opwen_email_server/jobs/store_written_client_emails.py +++ b/opwen_email_server/api/store_written_client_emails.py @@ -1,16 +1,18 @@ -from opwen_email_server.api import client_write +from opwen_email_server import azure_constants as constants +from opwen_email_server import config from opwen_email_server.backend import client_datastore -from opwen_email_server.backend import email_sender from opwen_email_server.backend import server_datastore -from opwen_email_server.services.queue_consumer import QueueConsumer +from opwen_email_server.services.queue import AzureQueue +from opwen_email_server.utils.log import LogMixin +QUEUE = AzureQueue(namespace=config.QUEUES_NAMESPACE, + sas_key=config.QUEUES_SAS_KEY, + sas_name=config.QUEUES_SAS_NAME, + name=constants.QUEUE_EMAIL_SEND) -class Job(QueueConsumer): - def __init__(self): - super().__init__(client_write.QUEUE.dequeue) - def _process_message(self, message: dict): - resource_id = message['resource_id'] +class _WrittenStorer(LogMixin): + def __call__(self, resource_id: str): emails = client_datastore.unpack_emails(resource_id) self.log_info('Fetched packaged client emails from %s', resource_id) @@ -19,7 +21,7 @@ def _process_message(self, message: dict): server_datastore.store_email(email_id, email) self.log_info('Stored packaged client email %s', email_id) - email_sender.QUEUE.enqueue({ + QUEUE.enqueue({ '_version': '0.1', '_type': 'email_to_send', 'resource_id': email_id, @@ -30,7 +32,7 @@ def _process_message(self, message: dict): client_datastore.delete(resource_id) self.log_info('Deleted packaged client emails from %s', resource_id) + return 'OK', 200 -if __name__ == '__main__': - from opwen_email_server.services.queue_consumer import cli - cli(Job) + +store = _WrittenStorer() diff --git a/opwen_email_server/azure_constants.py b/opwen_email_server/azure_constants.py index 1a3f6789..72f4ced0 100644 --- a/opwen_email_server/azure_constants.py +++ b/opwen_email_server/azure_constants.py @@ -1,5 +1,3 @@ -from os import environ - CONTAINER_CLIENT_PACKAGES = 'compressedpackages' CONTAINER_EMAILS = 'emails' CONTAINER_SENDGRID_MIME = 'sendgridinboundemails' @@ -13,4 +11,3 @@ QUEUE_CLIENT_PACKAGE = 'lokoleinboundemails' QUEUE_EMAIL_SEND = 'sengridoutboundemails' QUEUE_SENDGRID_MIME = 'sengridinboundemails' -QUEUE_POLL_INTERVAL = float(environ.get('LOKOLE_QUEUE_POLL_SECONDS', '10')) diff --git a/opwen_email_server/backend/email_sender.py b/opwen_email_server/backend/email_sender.py index 76270209..cf495116 100644 --- a/opwen_email_server/backend/email_sender.py +++ b/opwen_email_server/backend/email_sender.py @@ -1,13 +1,8 @@ from typing import Tuple -from opwen_email_server import azure_constants as constants from opwen_email_server import config -from opwen_email_server.services.queue import AzureQueue from opwen_email_server.services.sendgrid import SendgridEmailSender -QUEUE = AzureQueue(account=config.QUEUES_ACCOUNT, key=config.QUEUES_KEY, - name=constants.QUEUE_EMAIL_SEND) - EMAIL = SendgridEmailSender(key=config.EMAIL_SENDER_KEY) diff --git a/opwen_email_server/config.py b/opwen_email_server/config.py index c7955d78..9f602a69 100644 --- a/opwen_email_server/config.py +++ b/opwen_email_server/config.py @@ -1,7 +1,5 @@ from os import environ -QUEUES_ACCOUNT = environ.get('LOKOLE_EMAIL_SERVER_AZURE_QUEUES_NAME', '') -QUEUES_KEY = environ.get('LOKOLE_EMAIL_SERVER_AZURE_QUEUES_KEY', '') BLOBS_ACCOUNT = environ.get('LOKOLE_EMAIL_SERVER_AZURE_BLOBS_NAME', '') BLOBS_KEY = environ.get('LOKOLE_EMAIL_SERVER_AZURE_BLOBS_KEY', '') TABLES_ACCOUNT = environ.get('LOKOLE_EMAIL_SERVER_AZURE_TABLES_NAME', '') @@ -10,6 +8,10 @@ CLIENT_STORAGE_ACCOUNT = environ.get('LOKOLE_CLIENT_AZURE_STORAGE_NAME', '') CLIENT_STORAGE_KEY = environ.get('LOKOLE_CLIENT_AZURE_STORAGE_KEY', '') +QUEUES_NAMESPACE = environ.get('LOKOLE_EMAIL_SERVER_QUEUES_NAMESPACE', '') +QUEUES_SAS_NAME = environ.get('LOKOLE_EMAIL_SERVER_QUEUES_SAS_NAME', '') +QUEUES_SAS_KEY = environ.get('LOKOLE_EMAIL_SERVER_QUEUES_SAS_KEY', '') + EMAIL_SENDER_KEY = environ.get('LOKOLE_SENDGRID_KEY', '') LOG_LEVEL = environ.get('LOKOLE_LOG_LEVEL', 'DEBUG') diff --git a/opwen_email_server/jobs/__init__.py b/opwen_email_server/jobs/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/opwen_email_server/jobs/send_outbound_emails.py b/opwen_email_server/jobs/send_outbound_emails.py deleted file mode 100644 index d1474584..00000000 --- a/opwen_email_server/jobs/send_outbound_emails.py +++ /dev/null @@ -1,21 +0,0 @@ -from opwen_email_server.backend import email_sender -from opwen_email_server.backend import server_datastore -from opwen_email_server.services.queue_consumer import QueueConsumer - - -class Job(QueueConsumer): - def __init__(self): - super().__init__(email_sender.QUEUE.dequeue) - - def _process_message(self, message: dict): - resource_id = message['resource_id'] - email = server_datastore.fetch_email(resource_id) - self.log_info('Fetched outbound email %s for sending', resource_id) - - email_sender.send(email) - self.log_info('Done sending outbound email %s', resource_id) - - -if __name__ == '__main__': - from opwen_email_server.services.queue_consumer import cli - cli(Job) diff --git a/opwen_email_server/services/queue.py b/opwen_email_server/services/queue.py index 80102e08..af5f9eec 100644 --- a/opwen_email_server/services/queue.py +++ b/opwen_email_server/services/queue.py @@ -1,9 +1,6 @@ -from contextlib import contextmanager -from json import loads - -from azure.storage.queue import QueueService +from azure.servicebus import Message +from azure.servicebus import ServiceBusService from typing import Callable -from typing import Iterable from opwen_email_server.utils.log import LogMixin from opwen_email_server.utils.serialization import to_json @@ -12,22 +9,26 @@ class AzureQueue(LogMixin): _max_message_retries = 5 - def __init__(self, account: str, key: str, name: str, - client: QueueService=None, - factory: Callable[..., QueueService]=QueueService) -> None: + def __init__(self, name: str, namespace: str, sas_name: str, sas_key: str, + client: ServiceBusService=None, + factory: Callable[..., ServiceBusService]=ServiceBusService) \ + -> None: - self._account = account - self._key = key self._name = name + self._namespace = namespace + self._sas_name = sas_name + self._sas_key = sas_key self.__client = client self._client_factory = factory @property - def _client(self) -> QueueService: + def _client(self) -> ServiceBusService: if self.__client is not None: return self.__client - client = self._client_factory(self._account, self._key) - client.create_queue(self._name) + client = self._client_factory( + service_namespace=self._namespace, + shared_access_key_name=self._sas_name, + shared_access_key_value=self._sas_key) self.__client = client return client @@ -36,60 +37,14 @@ def name(self) -> str: return self._name @classmethod - def _pack(cls, content: dict) -> str: - return to_json(content) - - @classmethod - def _unpack(cls, message: str) -> dict: - return loads(message) + def _pack(cls, content: dict) -> Message: + body = to_json(content).encode('utf-8') + return Message(body) def enqueue(self, content: dict): message = self._pack(content) - self._client.put_message(self._name, message) - self.log_debug('received message') - - @contextmanager # type: ignore - def dequeue(self, lock_seconds: int=10) -> Iterable[dict]: - messages = self._client.get_messages(self._name, 1, lock_seconds) - messages = list(messages) - if not messages: - yield [] # type: ignore - else: - message = messages[0] - delete_message = False - - # noinspection PyBroadException - try: - payload = self._unpack(message.content) - except Exception: - self.log_exception( - 'error unpacking message %r, purging', - message.id) - delete_message = True - yield [] # type: ignore - else: - # noinspection PyBroadException - try: - yield [payload] # type: ignore - except Exception as ex: - if message.dequeue_count > self._max_message_retries: - self.log_exception( - 'too many retries for message %r, purging:%r', - message.id, ex) - delete_message = True - else: - self.log_exception( - 'error processing message %r, retrying:%r', - message.id, ex) - else: - self.log_debug( - 'done with message %r, deleting', - message.id) - delete_message = True - - if delete_message: - self._client.delete_message(self._name, message.id, - message.pop_receipt) + self._client.send_queue_message(self._name, message) + self.log_debug('sent message') def extra_log_args(self): yield 'queue %s', self._name diff --git a/opwen_email_server/services/queue_consumer.py b/opwen_email_server/services/queue_consumer.py deleted file mode 100644 index ea9f8d37..00000000 --- a/opwen_email_server/services/queue_consumer.py +++ /dev/null @@ -1,87 +0,0 @@ -from time import sleep - -from opwen_email_server.azure_constants import QUEUE_POLL_INTERVAL -from opwen_email_server.utils.log import LogMixin -from opwen_email_server.config import QUEUE_ERROR_FILE -from opwen_email_server.utils.temporary import remove_if_exists - - -class QueueConsumer(LogMixin): - def __init__(self, dequeue, - poll_seconds: float=QUEUE_POLL_INTERVAL) -> None: - - self._dequeue = dequeue - self._poll_seconds = poll_seconds - self._is_running = True - - def _process_message(self, message: dict): - raise NotImplementedError - - def run_once(self) -> bool: - self.log_debug('queue consumer checking for messages') - did_process = False - - with self._dequeue() as messages: - self.log_debug('queue consumer got %d messages', len(messages)) - for i, message in enumerate(messages): - self.log_debug('queue consumer processing message %d', i) - self._process_message(message) - did_process = True - - return did_process - - def run_forever(self): - self.log_debug('queue consumer listening') - while self._is_running: - self.log_debug('starting polling queue') - - # noinspection PyBroadException - try: - did_process = self.run_once() - except Exception as ex: - self._report_error(ex) - else: - if did_process: - self._report_success() - else: - self._wait_for_next_message() - - def _wait_for_next_message(self): - self.log_debug('queue consumer waiting for %d', self._poll_seconds) - sleep(self._poll_seconds) - - def _report_success(self): - self.log_debug('done polling queue') - - if QUEUE_ERROR_FILE: - remove_if_exists(QUEUE_ERROR_FILE) - - def _report_error(self, ex: Exception): - self.log_exception('error polling queue:%r', ex) - - if QUEUE_ERROR_FILE: - with open(QUEUE_ERROR_FILE, 'a') as fobj: - fobj.write('{}\n'.format(ex)) - - -def cli(job_class): - from argparse import ArgumentParser - from os.path import dirname - from os.path import join - - parser = ArgumentParser() - parser.add_argument('--once', action='store_true', default=False) - args = parser.parse_args() - - try: - # noinspection PyUnresolvedReferences - from dotenv import load_dotenv - load_dotenv(join(dirname(__file__), '.env')) - except ImportError: - pass - - job = job_class() - if args.once: - job.run_once() - else: - job.run_forever() diff --git a/opwen_email_server/static/send-outbound-emails.yaml b/opwen_email_server/static/send-outbound-emails.yaml new file mode 100644 index 00000000..63567bfb --- /dev/null +++ b/opwen_email_server/static/send-outbound-emails.yaml @@ -0,0 +1,24 @@ +swagger: '2.0' + +info: + title: Opwen Cloudserver Email API. + version: '0.1' + +basePath: '/job/email/outbound/send' + +paths: + + '/{resource_id}': + + post: + operationId: opwen_email_server.api.send_outbound_emails.send + summary: Queue-triggered endpoint to send an outbound email. + parameters: + - name: resource_id + in: path + required: true + type: string + description: The id of the email to process. + responses: + 200: + description: The email was sent. diff --git a/opwen_email_server/static/store-inbound-emails.yaml b/opwen_email_server/static/store-inbound-emails.yaml new file mode 100644 index 00000000..ac5c774a --- /dev/null +++ b/opwen_email_server/static/store-inbound-emails.yaml @@ -0,0 +1,24 @@ +swagger: '2.0' + +info: + title: Opwen Cloudserver Email API. + version: '0.1' + +basePath: '/job/email/inbound/store' + +paths: + + '/{resource_id}': + + post: + operationId: opwen_email_server.api.store_inbound_emails.store + summary: Queue-triggered endpoint to store an inbound email. + parameters: + - name: resource_id + in: path + required: true + type: string + description: The id of the email to process. + responses: + 200: + description: The email was stored. diff --git a/opwen_email_server/static/store-written-client-emails.yaml b/opwen_email_server/static/store-written-client-emails.yaml new file mode 100644 index 00000000..870dbe17 --- /dev/null +++ b/opwen_email_server/static/store-written-client-emails.yaml @@ -0,0 +1,24 @@ +swagger: '2.0' + +info: + title: Opwen Cloudserver Email API. + version: '0.1' + +basePath: '/job/email/client/store' + +paths: + + '/{resource_id}': + + post: + operationId: opwen_email_server.api.store_written_client_emails.store + summary: Queue-triggered endpoint to store a client email. + parameters: + - name: resource_id + in: path + required: true + type: string + description: The id of the email to process. + responses: + 200: + description: The email was stored. diff --git a/opwen_queue_connector/Program.cs b/opwen_queue_connector/Program.cs new file mode 100644 index 00000000..c6b7f535 --- /dev/null +++ b/opwen_queue_connector/Program.cs @@ -0,0 +1,98 @@ +using Microsoft.Azure.ServiceBus; +using System; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using System.Net.Http; +using System.Net.Http.Headers; +using System.Text; +using Newtonsoft.Json; +using System.Collections.Generic; + +namespace queueconnector +{ + public class Program + { + private static readonly Lazy QueueClient = new Lazy(() => + new QueueClient(new ServiceBusConnectionStringBuilder + { + Endpoint = $"sb://{Env.Namespace}.servicebus.windows.net/", + EntityPath = Env.Queue, + SasKey = Env.SasKey, + SasKeyName = Env.SasName + } + )); + + private static readonly Lazy HttpClient = new Lazy(() => new HttpClient()); + + public static void Main(string[] args) + { + MainAsync().GetAwaiter().GetResult(); + } + + private static async Task MainAsync() + { + QueueClient.Value.RegisterMessageHandler(HandleMessage, new MessageHandlerOptions(HandleError) + { + MaxConcurrentCalls = 1, + AutoComplete = false + }); + + await Console.Out.WriteLineAsync($"Queue connector {Env.Queue}: Starting listening"); + + try + { + await Task.Delay(Timeout.Infinite); + } + finally + { + await QueueClient.Value.CloseAsync(); + await Console.Out.WriteLineAsync($"Queue connector {Env.Queue}: Shutting down"); + } + } + + async static private Task HandleMessage(Message message, CancellationToken token) + { + var messageBody = Encoding.UTF8.GetString(message.Body); + await Console.Out.WriteLineAsync($"Message {message.MessageId}: Received {messageBody}"); + + var messageJson = JsonConvert.DeserializeObject>(messageBody); + var resourceId = messageJson["resource_id"]; + var url = $"{Env.Url}/{resourceId}"; + + await Console.Out.WriteLineAsync($"Message {message.MessageId}: Posting to {url}"); + var response = await HttpClient.Value.PostAsync(url, null); + + if (response.IsSuccessStatusCode) + { + await QueueClient.Value.CompleteAsync(message.SystemProperties.LockToken); + await Console.Out.WriteLineAsync($"Message {message.MessageId}: Done"); + } + else + { + var error = await response.Content.ReadAsStringAsync(); + await Console.Error.WriteLineAsync($"Message {message.MessageId}: Error {error}"); + } + } + + async static private Task HandleError(ExceptionReceivedEventArgs args) + { + var exception = args.Exception; + var context = args.ExceptionReceivedContext; + + await Console.Error.WriteLineAsync( + $"Message handler for {Env.Queue} encountered an exception: {exception}. " + + $"Endpoint={context.Endpoint} EntityPath={context.EntityPath} Action={context.Action}"); + } + } + + internal class Env + { + public static readonly string Namespace = Environment.GetEnvironmentVariable("LOKOLE_EMAIL_SERVER_QUEUES_NAMESPACE"); + public static readonly string SasName = Environment.GetEnvironmentVariable("LOKOLE_EMAIL_SERVER_QUEUES_SAS_NAME"); + public static readonly string SasKey = Environment.GetEnvironmentVariable("LOKOLE_EMAIL_SERVER_QUEUES_SAS_KEY"); + + public static readonly string Queue = Environment.GetEnvironmentVariable("LOKOLE_SOURCE_QUEUE"); + public static readonly string Url = Environment.GetEnvironmentVariable("LOKOLE_POST_URL"); + } +} diff --git a/opwen_queue_connector/queueconnector.csproj b/opwen_queue_connector/queueconnector.csproj new file mode 100644 index 00000000..ba26498b --- /dev/null +++ b/opwen_queue_connector/queueconnector.csproj @@ -0,0 +1,13 @@ + + + + Exe + netcoreapp2.0 + + + + + + + + diff --git a/requirements.txt b/requirements.txt index 9d051d80..a4ea17f2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,7 @@ applicationinsights==0.11.4 azure-cosmosdb-table==1.0.3 +azure-servicebus==0.21.1 azure-storage-blob==1.1.0 -azure-storage-queue==1.1.0 beautifulsoup4==4.6.0 connexion==1.4.2 pyzmail36==1.0.3 diff --git a/tests/files/opwen_email_server/utils/test_email_parser/email-attachment.mime b/tests/files/opwen_email_server/utils/test_email_parser/email-attachment.mime index b2df2dbb..aadb3d35 100644 --- a/tests/files/opwen_email_server/utils/test_email_parser/email-attachment.mime +++ b/tests/files/opwen_email_server/utils/test_email_parser/email-attachment.mime @@ -1,6 +1,6 @@ Received: by mx0032p1mdw1.sendgrid.net with SMTP id j5s6OrIxAo Mon, 13 Feb 2017 06:26:01 +0000 (UTC) -Received: from mail-yw0-f176.google.com (mail-yw0-f176.google.com [209.85.161.176]) by mx0032p1mdw1.sendgrid.net (Postfix) with ESMTPS id ED0E6865CD for ; Mon, 13 Feb 2017 06:26:00 +0000 (UTC) -Received: by mail-yw0-f176.google.com with SMTP id v200so45517752ywc.3 for ; Sun, 12 Feb 2017 22:26:00 -0800 (PST) +Received: from mail-yw0-f176.google.com (mail-yw0-f176.google.com [209.85.161.176]) by mx0032p1mdw1.sendgrid.net (Postfix) with ESMTPS id ED0E6865CD for ; Mon, 13 Feb 2017 06:26:00 +0000 (UTC) +Received: by mail-yw0-f176.google.com with SMTP id v200so45517752ywc.3 for ; Sun, 12 Feb 2017 22:26:00 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:from:date:message-id:subject:to; bh=BzB59eE6aqgBqNykdZ9LSb3ZuGuXFfDT9pvFSFek5s0=; b=FMASsyJpei2ae3965M1t46+/teaQXdqdsXOPd6viine1X10sEbhuGjX3LGTq3WnMUy f+xydQsjhTTSb15LuJivlsSyLN3dPKyqON4dnfQ6t4mPY3ol0Bnl2GvQico3U4whd6QG KuQtoJqijO4O8TBTWxQqoEjVapAZhr9qK0ZfhCuDCtZ5xfEtdphYkEq+pT52ykIn/Hy/ 6zjWfHKLB9fwiyKtc7Dv2GnPUt2vW/YJs8lsA/hrCISwq2Oy9dCyEX4+IZ/xABL+KLox nEsk4/wMLmHHvv3X1sODcIshGPuX0mloj2YkugRZHnwrTm7FEAjTAhRElwNGPSedSqqW l2Cw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:from:date:message-id:subject:to; bh=BzB59eE6aqgBqNykdZ9LSb3ZuGuXFfDT9pvFSFek5s0=; b=n+2qfxqxwNZyVk1+Rd4wS19yZrA7bE2U+AoQ7Jl3d7lAl1V7VV71/jBJFkXEIy6r2Q tqsInRPhv5aQ02W70y9gWB7Bib6Cf353jTHznF4qgjZoUohY8ygNyNrwvjnj2l7T2S+g xn0sCgyYsHbjyLOLF1PoBEUbmNGKcJWf2WIUxQ3wlSZ7tV/nuVj5n/QicAL9yk5e2YsH 5CNYDCFpTFS86SULGAfgeGpTswv0fjKPVipLJFJaw7WziV+v7t4F/jBngqSV0Sy4kyPn UlNn+T7uPlp8A9XgH1tiE7DMt99l2M7qWH0FMI45pZGoVpssmHQw1cuLqJf2F2So/Dhs dhhA== X-Gm-Message-State: AMke39nSCiOqrDKq7uOX7CaQVazGrDl5fI6cBqEyIbOhxDeR85q1Sl9Ae6//kVayvLfguqV5ErlYVxvjE8p9vw== @@ -11,7 +11,7 @@ From: Clemens Wolff Date: Sun, 12 Feb 2017 22:25:20 -0800 Message-ID: Subject: With attachment -To: clemens@victoria.ascoderu.ca +To: clemens@victoria.lokole.ca Content-Type: multipart/mixed; boundary=001a114f099665b49a0548638593 --001a114f099665b49a0548638593 diff --git a/tests/files/opwen_email_server/utils/test_email_parser/email-ccbcc.mime b/tests/files/opwen_email_server/utils/test_email_parser/email-ccbcc.mime index d76eb0c1..248b76ab 100644 --- a/tests/files/opwen_email_server/utils/test_email_parser/email-ccbcc.mime +++ b/tests/files/opwen_email_server/utils/test_email_parser/email-ccbcc.mime @@ -1,13 +1,13 @@ MIME-Version: 1.0 Received: by 10.129.50.130 with HTTP; Sat, 11 Mar 2017 13:03:18 -0800 (PST) -Bcc: Laura Barluzzi +Bcc: Laura Barluzzi Date: Sat, 11 Mar 2017 13:03:18 -0800 Delivered-To: clemens.wolff@gmail.com Message-ID: Subject: Test with CC and BCC -From: Clemens Wolff -To: Clemens Wolff -Cc: Nzola Swasisa , Clemens Wolff +From: Clemens Wolff +To: Clemens Wolff +Cc: Nzola Swasisa , Clemens Wolff Content-Type: multipart/alternative; boundary=001a114e6016c08cd0054a7ace01 --001a114e6016c08cd0054a7ace01 diff --git a/tests/files/opwen_email_server/utils/test_email_parser/email-html.mime b/tests/files/opwen_email_server/utils/test_email_parser/email-html.mime index 1c79a7c5..82ebfc00 100644 --- a/tests/files/opwen_email_server/utils/test_email_parser/email-html.mime +++ b/tests/files/opwen_email_server/utils/test_email_parser/email-html.mime @@ -11,7 +11,7 @@ From: Clemens Wolff Date: Sun, 12 Feb 2017 22:25:01 -0800 Message-ID: Subject: Two recipients -To: clemens@victoria.ascoderu.ca, laura@victoria.ascoderu.ca +To: clemens@victoria.lokole.ca, laura@victoria.lokole.ca Content-Type: multipart/alternative; boundary=001a1146392641b94705486384bf --001a1146392641b94705486384bf diff --git a/tests/opwen_email_server/api/test_send_outbound_emails.py b/tests/opwen_email_server/api/test_send_outbound_emails.py new file mode 100644 index 00000000..1cc957fe --- /dev/null +++ b/tests/opwen_email_server/api/test_send_outbound_emails.py @@ -0,0 +1,20 @@ +from unittest import TestCase +from unittest.mock import patch + +from opwen_email_server.backend import email_sender +from opwen_email_server.backend import server_datastore +from opwen_email_server.api import send_outbound_emails + + +class SendOutboundEmailsTests(TestCase): + @patch.object(server_datastore, 'fetch_email') + @patch.object(email_sender, 'send') + def test_reads_message_and_stores_email(self, send_mock, fetch_mock): + email_id = '7ad33d8a-c1ee-44c7-a655-fb0d167dc380' + email = {'to': ['foo@bar.com'], '_uid': email_id} + fetch_mock.return_value = email + + send_outbound_emails.send(email_id) + + fetch_mock.assert_called_once_with(email_id) + send_mock.assert_called_once_with(email) diff --git a/tests/opwen_email_server/jobs/test_store_inbound_emails.py b/tests/opwen_email_server/api/test_store_inbound_emails.py similarity index 54% rename from tests/opwen_email_server/jobs/test_store_inbound_emails.py rename to tests/opwen_email_server/api/test_store_inbound_emails.py index 0797ecfe..54a54bc0 100644 --- a/tests/opwen_email_server/jobs/test_store_inbound_emails.py +++ b/tests/opwen_email_server/api/test_store_inbound_emails.py @@ -2,31 +2,22 @@ from unittest.mock import patch from opwen_email_server.backend import server_datastore -from opwen_email_server.jobs import store_inbound_emails +from opwen_email_server.api import store_inbound_emails class StoreInboundEmailsTests(TestCase): - @patch.object(store_inbound_emails.email_receive, 'QUEUE') @patch.object(store_inbound_emails.email_receive, 'STORAGE') @patch.object(server_datastore, 'store_email') @patch.object(store_inbound_emails, 'parse_mime_email') def test_reads_message_and_stores_email( - self, parser_mock, store_mock, storage_mock, queue_mock): + self, parser_mock, store_mock, storage_mock): email_id = '7ad33d8a-c1ee-44c7-a655-fb0d167dc380' email = {'to': ['foo@bar.com']} - self._given_message(email, email_id, parser_mock, queue_mock) - consumer = store_inbound_emails.Job() + parser_mock.return_value = email - consumer.run_once() + store_inbound_emails.store(email_id) self.assertEqual(storage_mock.fetch_text.call_count, 1) self.assertEqual(storage_mock.delete.call_count, 1) - self.assertEqual(queue_mock.dequeue.call_count, 1) store_mock.assert_called_once_with(email_id, email) - - @classmethod - def _given_message(cls, email, email_id, parser_mock, queue_mock): - queue_mock.dequeue.return_value.__enter__.return_value = \ - [{'resource_id': email_id}] - parser_mock.return_value = email diff --git a/tests/opwen_email_server/jobs/test_store_written_client_emails.py b/tests/opwen_email_server/api/test_store_written_client_emails.py similarity index 59% rename from tests/opwen_email_server/jobs/test_store_written_client_emails.py rename to tests/opwen_email_server/api/test_store_written_client_emails.py index 83c4dccd..1c9bc632 100644 --- a/tests/opwen_email_server/jobs/test_store_written_client_emails.py +++ b/tests/opwen_email_server/api/test_store_written_client_emails.py @@ -2,42 +2,30 @@ from unittest.mock import patch from opwen_email_server.backend import client_datastore -from opwen_email_server.backend import email_sender from opwen_email_server.backend import server_datastore -from opwen_email_server.jobs import store_written_client_emails +from opwen_email_server.api import store_written_client_emails class StoreWrittenClientEmailsTests(TestCase): - @patch.object(store_written_client_emails.client_write, 'QUEUE') - @patch.object(email_sender, 'QUEUE') + @patch.object(store_written_client_emails, 'QUEUE') @patch.object(client_datastore, 'unpack_emails') @patch.object(client_datastore, 'delete') @patch.object(server_datastore, 'store_email') def test_reads_message_and_stores_email( - self, store_mock, delete_mock, unpack_mock, send_queue_mock, - write_queue_mock): + self, store_mock, delete_mock, unpack_mock, send_queue_mock): resource_id = '7ad33d8a-c1ee-44c7-a655-fb0d167dc380' email1_id = '4efba428-143c-11e7-93ae-92361f002671' email2_id = 'c91636ee-143f-11e7-93ae-92361f002671' email1 = {'to': ['foo@test.com'], '_uid': email1_id} email2 = {'to': ['bar@test.com'], '_uid': email2_id} - emails = [email1, email2] - self._given_message(emails, resource_id, unpack_mock, write_queue_mock) - consumer = store_written_client_emails.Job() + unpack_mock.return_value = [email1, email2] - consumer.run_once() + store_written_client_emails.store(resource_id) self.assertEqual(unpack_mock.call_count, 1) self.assertEqual(delete_mock.call_count, 1) self.assertEqual(send_queue_mock.enqueue.call_count, 2) - self.assertEqual(write_queue_mock.dequeue.call_count, 1) self.assertEqual(store_mock.call_count, 2) store_mock.assert_any_call(email1_id, email1) store_mock.assert_any_call(email2_id, email2) - - @classmethod - def _given_message(cls, emails, resource_id, unpack_mock, queue_mock): - queue_mock.dequeue.return_value.__enter__.return_value = \ - [{'resource_id': resource_id}] - unpack_mock.return_value = emails diff --git a/tests/opwen_email_server/jobs/__init__.py b/tests/opwen_email_server/jobs/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/tests/opwen_email_server/jobs/test_send_outbound_emails.py b/tests/opwen_email_server/jobs/test_send_outbound_emails.py deleted file mode 100644 index a4ff2201..00000000 --- a/tests/opwen_email_server/jobs/test_send_outbound_emails.py +++ /dev/null @@ -1,31 +0,0 @@ -from unittest import TestCase -from unittest.mock import patch - -from opwen_email_server.backend import email_sender -from opwen_email_server.backend import server_datastore -from opwen_email_server.jobs import send_outbound_emails - - -class SendOutboundEmailsTests(TestCase): - @patch.object(email_sender, 'QUEUE') - @patch.object(server_datastore, 'fetch_email') - @patch.object(email_sender, 'send') - def test_reads_message_and_stores_email( - self, send_mock, fetch_mock, queue_mock): - - email_id = '7ad33d8a-c1ee-44c7-a655-fb0d167dc380' - email = {'to': ['foo@bar.com'], '_uid': email_id} - self._given_message(email, email_id, fetch_mock, queue_mock) - consumer = send_outbound_emails.Job() - - consumer.run_once() - - self.assertEqual(queue_mock.dequeue.call_count, 1) - fetch_mock.assert_called_once_with(email_id) - send_mock.assert_called_once_with(email) - - @classmethod - def _given_message(cls, email, email_id, fetch_mock, queue_mock): - queue_mock.dequeue.return_value.__enter__.return_value = \ - [{'resource_id': email_id}] - fetch_mock.return_value = email diff --git a/tests/opwen_email_server/services/test_queue.py b/tests/opwen_email_server/services/test_queue.py deleted file mode 100644 index 71d7ccd0..00000000 --- a/tests/opwen_email_server/services/test_queue.py +++ /dev/null @@ -1,91 +0,0 @@ -from collections import namedtuple -from unittest import TestCase -from unittest.mock import MagicMock - -from opwen_email_server.services.queue import AzureQueue - - -AzureQueueMessage = namedtuple( - 'Message', - 'id pop_receipt content dequeue_count') - - -class AzureQueueTests(TestCase): - def test_enqueue_stores_message(self): - queue, client_mock = self._given_queue() - - queue.enqueue({'foo': 'bar'}) - - self.assertEqual(client_mock.put_message.call_count, 1) - - def test_creates_queue_only_once(self): - queue, client_mock = self._given_queue() - - queue.enqueue({'foo': 'bar'}) - queue.enqueue({'foo': 'bar'}) - - self.assertEqual(client_mock.create_queue.call_count, 1) - - def test_dequeue_without_messages(self): - queue, client_mock = self._given_queue([]) - - with queue.dequeue() as messages: - pass - - self.assertEqual(client_mock.get_messages.call_count, 1) - self.assertEqual(client_mock.delete_message.call_count, 0) - self.assertEqual(messages, []) - - def test_dequeue_removes_messages(self): - queue, client_mock = self._given_queue(['{"foo":"bar"}']) - - with queue.dequeue() as messages: - pass - - self.assertEqual(client_mock.get_messages.call_count, 1) - self.assertEqual(client_mock.delete_message.call_count, 1) - self.assertEqual(messages, [{'foo': 'bar'}]) - - def test_dequeue_with_exception_does_not_remove_message(self): - queue, client_mock = self._given_queue(['{"foo":"bar"}']) - - with queue.dequeue() as _: - raise ValueError - - self.assertEqual(client_mock.get_messages.call_count, 1) - self.assertEqual(client_mock.delete_message.call_count, 0) - - def test_dequeue_with_many_exceptions_removes_message(self): - queue, client_mock = self._given_queue(['{"foo":"bar"}'], - dequeue_count=999) - - with queue.dequeue() as _: - raise ValueError - - self.assertEqual(client_mock.get_messages.call_count, 1) - self.assertEqual(client_mock.delete_message.call_count, 1) - - def test_dequeue_rejects_unparsable_messages(self): - queue, client_mock = self._given_queue(['{"corrupt']) - - with queue.dequeue() as messages: - pass - - self.assertEqual(client_mock.get_messages.call_count, 1) - self.assertEqual(client_mock.delete_message.call_count, 1) - self.assertEqual(messages, []) - - # noinspection PyTypeChecker - @classmethod - def _given_queue(cls, messages=None, dequeue_count=0): - client_mock = MagicMock() - queue = AzureQueue(account='account', key='key', name='name', - factory=lambda *args, **kwargs: client_mock) - - if messages: - client_mock.get_messages.return_value = [ - AzureQueueMessage(id=i, pop_receipt=i, content=message, - dequeue_count=dequeue_count) - for (i, message) in enumerate(messages)] - - return queue, client_mock diff --git a/tests/opwen_email_server/services/test_queue_consumer.py b/tests/opwen_email_server/services/test_queue_consumer.py deleted file mode 100644 index 29bbb6c5..00000000 --- a/tests/opwen_email_server/services/test_queue_consumer.py +++ /dev/null @@ -1,95 +0,0 @@ -from contextlib import contextmanager -from unittest import TestCase - -from opwen_email_server.services.queue_consumer import QueueConsumer - - -class TestQueueConsumer(QueueConsumer): - def __init__(self, message_processor, message_generator, max_runs): - super().__init__(message_generator, poll_seconds=0.01) - self.messages_processed = 0 - self.times_waited = 0 - self.exceptions_encountered = 0 - self._num_runs = 0 - self._max_runs = max_runs - self._message_processor = message_processor - - def _process_message(self, message: dict): - self._message_processor() - - def _track_run(self): - self._num_runs += 1 - if self._num_runs >= self._max_runs: - self._is_running = False - - def _report_success(self): - self._track_run() - self.messages_processed += 1 - - def _report_error(self, ex: Exception): - self._track_run() - self.exceptions_encountered += 1 - - def _wait_for_next_message(self): - self._track_run() - self.times_waited += 1 - - -class QueueConsumerTests(TestCase): - def test_processes_messages_immediately_if_there_is_more_work(self): - def _process(): pass - - @contextmanager - def _produce(): yield [{"foo": "bar"}] - - consumer = TestQueueConsumer(_process, _produce, max_runs=10) - - consumer.run_forever() - - self.assertEqual(consumer.messages_processed, 10) - self.assertEqual(consumer.times_waited, 0) - - def test_waits_for_new_messages_if_there_is_no_work_to_do(self): - def _process(): pass - - @contextmanager - def _produce(): yield [] - - consumer = TestQueueConsumer(_process, _produce, max_runs=10) - - consumer.run_forever() - - self.assertEqual(consumer.messages_processed, 0) - self.assertEqual(consumer.times_waited, 10) - - def test_waits_or_processes_messages_if_available(self): - self.messages_produced = 0 - - def _process(): pass - - @contextmanager - def _produce(): - if self.messages_produced % 2 == 0: - yield [] - else: - yield [{"foo": "bar"}] - self.messages_produced += 1 - - consumer = TestQueueConsumer(_process, _produce, max_runs=10) - - consumer.run_forever() - - self.assertEqual(consumer.messages_processed, 5) - self.assertEqual(consumer.times_waited, 5) - - def test_ignores_exceptions_while_running(self): - def _throw(): raise ValueError - - @contextmanager - def _produce(): yield [{"foo": "bar"}] - - consumer = TestQueueConsumer(_throw, _produce, max_runs=10) - - consumer.run_forever() - - self.assertEqual(consumer.exceptions_encountered, 10) diff --git a/tests/opwen_email_server/services/test_sendgrid.py b/tests/opwen_email_server/services/test_sendgrid.py index 93197369..6399e538 100644 --- a/tests/opwen_email_server/services/test_sendgrid.py +++ b/tests/opwen_email_server/services/test_sendgrid.py @@ -8,9 +8,9 @@ class SendgridEmailSenderTests(TestCase): - recipient1 = 'clemens@ascoderu.ca' + recipient1 = 'clemens@lokole.ca' recipient2 = 'clemens.wolff@gmail.com' - sender = 'sendgridtests@ascoderu.ca' + sender = 'sendgridtests@lokole.ca' def test_sends_email(self): sender = self._given_client() diff --git a/tests/opwen_email_server/utils/test_email_parser.py b/tests/opwen_email_server/utils/test_email_parser.py index 9e803103..8619ea02 100644 --- a/tests/opwen_email_server/utils/test_email_parser.py +++ b/tests/opwen_email_server/utils/test_email_parser.py @@ -22,8 +22,8 @@ def test_parses_email_metadata(self): self.assertEqual(email.get('from'), 'clemens.wolff@gmail.com') self.assertEqual(email.get('subject'), 'Two recipients') self.assertEqual(email.get('sent_at'), '2017-02-13 06:25') - self.assertEqual(email.get('to'), ['clemens@victoria.ascoderu.ca', - 'laura@victoria.ascoderu.ca']) + self.assertEqual(email.get('to'), ['clemens@victoria.lokole.ca', + 'laura@victoria.lokole.ca']) def test_prefers_html_body_over_text(self): mime_email = self._given_mime_email('email-html.mime') @@ -38,9 +38,9 @@ def test_parses_email_with_cc_and_bcc(self): email = email_parser.parse_mime_email(mime_email) - self.assertEqual(email.get('bcc'), ['laura@ascoderu.ca']) - self.assertEqual(email.get('cc'), ['nzola@ascoderu.ca', - 'clemens@ascoderu.ca']) + self.assertEqual(email.get('bcc'), ['laura@lokole.ca']) + self.assertEqual(email.get('cc'), ['nzola@lokole.ca', + 'clemens@lokole.ca']) def test_parses_email_with_attachments(self): mime_email = self._given_mime_email('email-attachment.mime')