From c9e573dd1f9de83317b28d81eda325dc89aeec2c Mon Sep 17 00:00:00 2001 From: Clemens Wolff Date: Sun, 23 Dec 2018 12:36:25 -0500 Subject: [PATCH] Add support for multiple exchanged files (#91) See https://github.com/ascoderu/opwen-cloudserver/issues/27 --- makefile | 22 ++---- opwen_email_client/domain/email/client.py | 2 +- opwen_email_client/domain/email/sync.py | 71 +++++++++++++----- .../downloads/emails.jsonl.gz | Bin 178 -> 0 bytes .../opwen_email_client/downloads/sync.tar.gz | Bin 0 -> 264 bytes .../domain/email/test_sync.py | 24 +++--- travis/cd.sh | 6 +- travis/ci.sh | 4 +- 8 files changed, 81 insertions(+), 48 deletions(-) delete mode 100644 tests/files/opwen_email_client/downloads/emails.jsonl.gz create mode 100644 tests/files/opwen_email_client/downloads/sync.tar.gz diff --git a/makefile b/makefile index a070b182..230b1c4c 100644 --- a/makefile +++ b/makefile @@ -1,10 +1,3 @@ -# -# System configuration -# -PYTHON=/usr/bin/python3 -YARN=/usr/bin/yarn -SHELLCHECK=/usr/bin/shellcheck - # # You shouldn't need to touch anything below this line. # @@ -17,13 +10,10 @@ app_runner=$(py_env)/bin/python ./manage.py devserver .PHONY: default default: server -$(py_env)/bin/activate: requirements.txt - test -d $(py_env) || $(PYTHON) -m venv $(py_env) - $(py_env)/bin/pip install -U pip setuptools +venv: requirements.txt requirements-dev.txt + if [ ! -d $(py_env) ]; then python3 -m venv $(py_env) && $(py_env)/bin/pip install -U pip wheel; fi $(py_env)/bin/pip install -r requirements.txt - test -f requirements-dev.txt && $(py_env)/bin/pip install -r requirements-dev.txt - -venv: $(py_env)/bin/activate + $(py_env)/bin/pip install -r requirements-dev.txt unit-tests: venv $(py_env)/bin/nosetests --exe --with-coverage --cover-package=$(py_packages) @@ -33,8 +23,8 @@ tests: unit-tests lint-python: venv $(py_env)/bin/flake8 $(py_packages) -lint-shell: $(SHELLCHECK) - $(SHELLCHECK) --exclude=SC1090,SC1091,SC2103 $$(find . -name '*.sh' | grep -v 'node_modules/') +lint-shell: + shellcheck --exclude=SC1090,SC1091,SC2103 $$(find . -name '*.sh' | grep -v 'node_modules/') lint: lint-python lint-shell @@ -47,7 +37,7 @@ bandit: venv ci: lint bandit tests $(grunt): package.json - $(YARN) install + yarn install build-frontend: $(grunt) Gruntfile.js $(grunt) diff --git a/opwen_email_client/domain/email/client.py b/opwen_email_client/domain/email/client.py index 02b8f5e3..90bf2c9c 100644 --- a/opwen_email_client/domain/email/client.py +++ b/opwen_email_client/domain/email/client.py @@ -81,7 +81,7 @@ def __init__(self, *args, **kwargs): def download(self) -> Tuple[str, str]: root = getenv('OPWEN_REMOTE_ACCOUNT_NAME') container = 'downloads' - resource_id = 'emails.jsonl.gz' + resource_id = 'sync.tar.gz' local_file = path.join(root, container, resource_id) if not path.isfile(local_file): return '', '' diff --git a/opwen_email_client/domain/email/sync.py b/opwen_email_client/domain/email/sync.py index b6ca7175..8436de52 100644 --- a/opwen_email_client/domain/email/sync.py +++ b/opwen_email_client/domain/email/sync.py @@ -1,7 +1,7 @@ from abc import ABCMeta from abc import abstractmethod -from gzip import GzipFile from io import TextIOBase +from tarfile import open as tarfile_open from tempfile import NamedTemporaryFile from typing import IO from typing import Iterable @@ -36,6 +36,9 @@ def download(self) -> Iterable[T]: class AzureSync(Sync): + _emails_file = 'emails.jsonl' + _compression = 'gz' + def __init__(self, container: str, serializer: Serializer, account_name: str, account_key: str, email_server_client: EmailServerClient, @@ -59,12 +62,20 @@ def _azure_client(self) -> StorageDriver: return client @classmethod - def _workspace(cls) -> TextIOBase: + def _workspace(cls): return NamedTemporaryFile() @classmethod - def _open(cls, fileobj: IO, mode: str = 'rb') -> GzipFile: - return GzipFile(fileobj=fileobj, mode=mode) + def _open(cls, fileobj, mode, name): + extension_index = name.rfind('.') + if extension_index > -1: + compression = name[extension_index + 1:] + else: + compression = cls._compression + + mode = '{}|{}'.format(mode, compression) + + return tarfile_open(fileobj=fileobj, mode=mode) def _download_to_stream(self, blobname: str, container: str, stream: IO) -> bool: @@ -83,6 +94,16 @@ def _upload_from_stream(self, blobname: str, stream: TextIOBase): container = self._azure_client.get_container(self._container) container.upload_object_via_stream(stream, blobname) + @classmethod + def _get_file_from_download(cls, archive, name): + while True: + member = archive.next() + if member is None: + break + if member.name == name: + return archive.extractfile(member) + raise FileNotFoundError(name) + def download(self): resource_id, container = self._email_server_client.download() if not resource_id or not container: @@ -91,24 +112,40 @@ def download(self): with self._workspace() as workspace: if self._download_to_stream(resource_id, container, workspace): workspace.seek(0) - with self._open(workspace) as downloaded: - for line in downloaded: + with self._open(workspace, 'r', resource_id) as archive: + emails = self._get_file_from_download( + archive, self._emails_file) + for line in emails: yield self._serializer.deserialize(line) - def upload(self, items): + @classmethod + def _add_file_to_upload(cls, archive, name, fobj): + fobj.seek(0) + archive.add(fobj.name, name) + + def _upload_emails(self, items, archive): uploaded_ids = [] - upload_location = str(uuid4()) + + with self._workspace() as uploaded: + for item in items: + item = {key: value for (key, value) in item.items() + if value is not None + and key not in EXCLUDED_FIELDS} + serialized = self._serializer.serialize(item) + uploaded.write(serialized) + uploaded.write(b'\n') + uploaded_ids.append(item.get('_uid')) + + self._add_file_to_upload(archive, self._emails_file, uploaded) + + return uploaded_ids + + def upload(self, items): + upload_location = '{}.tar.{}'.format(uuid4(), self._compression) with self._workspace() as workspace: - with self._open(workspace, 'wb') as uploaded: - for item in items: - item = {key: value for (key, value) in item.items() - if value is not None - and key not in EXCLUDED_FIELDS} - serialized = self._serializer.serialize(item) - uploaded.write(serialized) - uploaded.write(b'\n') - uploaded_ids.append(item.get('_uid')) + with self._open(workspace, 'w', upload_location) as archive: + uploaded_ids = self._upload_emails(items, archive) if uploaded_ids: workspace.seek(0) diff --git a/tests/files/opwen_email_client/downloads/emails.jsonl.gz b/tests/files/opwen_email_client/downloads/emails.jsonl.gz deleted file mode 100644 index 971dfc785244d3bb93435f36ede42151084c5e0e..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 178 zcmV;j08RfNiwFo>M5$W<17&StX>4;YYIARHYydTk%?g7s5QOiAzJu)9l|$5~IDjPB`1PDsN&hIKO!CJ)Wg zjZPbbarS+thmN6N-1LvoYOh;OzpEy; z#vSup%JXmk>BZ-z7fGLekX+c6`03`{53Sd#dY!!Mq|KL3o4G5}48A6Y5? ziR0g8dOSt1CFR=g?+*q-;$mF)_wCj&7C(4_ob)$ zwq)&y>pEWfVw%_bTbqi5^frfWx|MmbZ0plCt3u=7`f9gq`||qd%Zu~b8PLH8hx;EM Le$;2sU|;|MI~RYv literal 0 HcmV?d00001 diff --git a/tests/opwen_email_client/domain/email/test_sync.py b/tests/opwen_email_client/domain/email/test_sync.py index 7076c2e1..cdcc0c6e 100644 --- a/tests/opwen_email_client/domain/email/test_sync.py +++ b/tests/opwen_email_client/domain/email/test_sync.py @@ -2,6 +2,7 @@ from os import mkdir from os.path import join from shutil import rmtree +from tempfile import NamedTemporaryFile from tempfile import mkdtemp from unittest import TestCase from unittest.mock import Mock @@ -35,7 +36,9 @@ def assertUploadIs(self, expected: bytes): self.assertEqual(len(uploaded), 1, 'Expected exactly one upload') with open(uploaded[0], 'rb') as buffer: - with self.sync._open(buffer) as fobj: + with self.sync._open(buffer, 'r', 'archive') as archive: + fobj = self.sync._get_file_from_download( + archive, self.sync._emails_file) self.assertEqual(expected, fobj.read()) def assertNoUpload(self): @@ -43,14 +46,17 @@ def assertNoUpload(self): self.assertEqual(len(uploaded), 0, 'Expected no uploads') def given_download(self, payload: bytes): - resource_id = str(uuid4()) - download_filename = join( - self._root_folder, self._download_folder, resource_id) - - with open(download_filename, 'wb') as buffer: - with self.sync._open(buffer, 'wb') as fobj: - fobj.write(payload) - buffer.seek(0) + with NamedTemporaryFile() as fobj: + fobj.write(payload) + + resource_id = str(uuid4()) + download_filename = join( + self._root_folder, self._download_folder, resource_id) + + with open(download_filename, 'wb') as buffer: + with self.sync._open(buffer, 'w', 'tar.gz') as archive: + self.sync._add_file_to_upload( + archive, self.sync._emails_file, fobj) self.email_server_client_mock.download.return_value = ( resource_id, diff --git a/travis/cd.sh b/travis/cd.sh index 4643eb36..a9e299ea 100755 --- a/travis/cd.sh +++ b/travis/cd.sh @@ -2,15 +2,15 @@ set -eo pipefail -if [ -z "$TRAVIS_TAG" ]; then +if [[ -z "$TRAVIS_TAG" ]]; then echo "Build is not a release, skipping CD" >&2; exit 0 fi -if [ -z "$PYPI_USERNAME" ] || [ -z "$PYPI_PASSWORD" ]; then +if [[ -z "$PYPI_USERNAME" ]] || [[ -z "$PYPI_PASSWORD" ]]; then echo "No PyPI credentials configured, unable to publish builds" >&2; exit 1 fi -if [ -z "$DOCKER_USERNAME" ] || [ -z "$DOCKER_PASSWORD" ]; then +if [[ -z "$DOCKER_USERNAME" ]] || [[ -z "$DOCKER_PASSWORD" ]]; then echo "No docker credentials configured, unable to publish builds" >&2; exit 1 fi diff --git a/travis/ci.sh b/travis/ci.sh index a52fb218..14e21179 100755 --- a/travis/ci.sh +++ b/travis/ci.sh @@ -2,8 +2,8 @@ set -eo pipefail -if [ -z "$TRAVIS_PYTHON_VERSION" ]; then +if [[ -z "$TRAVIS_PYTHON_VERSION" ]]; then echo "Build is not targeting a Python version, can't run CI" >&2; exit 1 fi -make ci -e py_env="$HOME/virtualenv/python$TRAVIS_PYTHON_VERSION" -e SHELLCHECK="$(which shellcheck)" +make ci -e py_env="$HOME/virtualenv/python$TRAVIS_PYTHON_VERSION"