Skip to content

Commit

Permalink
Add support for multiple exchanged files (#91)
Browse files Browse the repository at this point in the history
See #27
  • Loading branch information
c-w committed Dec 23, 2018
1 parent 8edd88d commit c9e573d
Show file tree
Hide file tree
Showing 8 changed files with 81 additions and 48 deletions.
22 changes: 6 additions & 16 deletions makefile
@@ -1,10 +1,3 @@
#
# System configuration
#
PYTHON=/usr/bin/python3
YARN=/usr/bin/yarn
SHELLCHECK=/usr/bin/shellcheck

#
# You shouldn't need to touch anything below this line.
#
Expand All @@ -17,13 +10,10 @@ app_runner=$(py_env)/bin/python ./manage.py devserver
.PHONY: default
default: server

$(py_env)/bin/activate: requirements.txt
test -d $(py_env) || $(PYTHON) -m venv $(py_env)
$(py_env)/bin/pip install -U pip setuptools
venv: requirements.txt requirements-dev.txt
if [ ! -d $(py_env) ]; then python3 -m venv $(py_env) && $(py_env)/bin/pip install -U pip wheel; fi
$(py_env)/bin/pip install -r requirements.txt
test -f requirements-dev.txt && $(py_env)/bin/pip install -r requirements-dev.txt

venv: $(py_env)/bin/activate
$(py_env)/bin/pip install -r requirements-dev.txt

unit-tests: venv
$(py_env)/bin/nosetests --exe --with-coverage --cover-package=$(py_packages)
Expand All @@ -33,8 +23,8 @@ tests: unit-tests
lint-python: venv
$(py_env)/bin/flake8 $(py_packages)

lint-shell: $(SHELLCHECK)
$(SHELLCHECK) --exclude=SC1090,SC1091,SC2103 $$(find . -name '*.sh' | grep -v 'node_modules/')
lint-shell:
shellcheck --exclude=SC1090,SC1091,SC2103 $$(find . -name '*.sh' | grep -v 'node_modules/')

lint: lint-python lint-shell

Expand All @@ -47,7 +37,7 @@ bandit: venv
ci: lint bandit tests

$(grunt): package.json
$(YARN) install
yarn install

build-frontend: $(grunt) Gruntfile.js
$(grunt)
Expand Down
2 changes: 1 addition & 1 deletion opwen_email_client/domain/email/client.py
Expand Up @@ -81,7 +81,7 @@ def __init__(self, *args, **kwargs):
def download(self) -> Tuple[str, str]:
root = getenv('OPWEN_REMOTE_ACCOUNT_NAME')
container = 'downloads'
resource_id = 'emails.jsonl.gz'
resource_id = 'sync.tar.gz'
local_file = path.join(root, container, resource_id)
if not path.isfile(local_file):
return '', ''
Expand Down
71 changes: 54 additions & 17 deletions opwen_email_client/domain/email/sync.py
@@ -1,7 +1,7 @@
from abc import ABCMeta
from abc import abstractmethod
from gzip import GzipFile
from io import TextIOBase
from tarfile import open as tarfile_open
from tempfile import NamedTemporaryFile
from typing import IO
from typing import Iterable
Expand Down Expand Up @@ -36,6 +36,9 @@ def download(self) -> Iterable[T]:


class AzureSync(Sync):
_emails_file = 'emails.jsonl'
_compression = 'gz'

def __init__(self, container: str, serializer: Serializer,
account_name: str, account_key: str,
email_server_client: EmailServerClient,
Expand All @@ -59,12 +62,20 @@ def _azure_client(self) -> StorageDriver:
return client

@classmethod
def _workspace(cls) -> TextIOBase:
def _workspace(cls):
return NamedTemporaryFile()

@classmethod
def _open(cls, fileobj: IO, mode: str = 'rb') -> GzipFile:
return GzipFile(fileobj=fileobj, mode=mode)
def _open(cls, fileobj, mode, name):
extension_index = name.rfind('.')
if extension_index > -1:
compression = name[extension_index + 1:]
else:
compression = cls._compression

mode = '{}|{}'.format(mode, compression)

return tarfile_open(fileobj=fileobj, mode=mode)

def _download_to_stream(self, blobname: str, container: str,
stream: IO) -> bool:
Expand All @@ -83,6 +94,16 @@ def _upload_from_stream(self, blobname: str, stream: TextIOBase):
container = self._azure_client.get_container(self._container)
container.upload_object_via_stream(stream, blobname)

@classmethod
def _get_file_from_download(cls, archive, name):
while True:
member = archive.next()
if member is None:
break
if member.name == name:
return archive.extractfile(member)
raise FileNotFoundError(name)

def download(self):
resource_id, container = self._email_server_client.download()
if not resource_id or not container:
Expand All @@ -91,24 +112,40 @@ def download(self):
with self._workspace() as workspace:
if self._download_to_stream(resource_id, container, workspace):
workspace.seek(0)
with self._open(workspace) as downloaded:
for line in downloaded:
with self._open(workspace, 'r', resource_id) as archive:
emails = self._get_file_from_download(
archive, self._emails_file)
for line in emails:
yield self._serializer.deserialize(line)

def upload(self, items):
@classmethod
def _add_file_to_upload(cls, archive, name, fobj):
fobj.seek(0)
archive.add(fobj.name, name)

def _upload_emails(self, items, archive):
uploaded_ids = []
upload_location = str(uuid4())

with self._workspace() as uploaded:
for item in items:
item = {key: value for (key, value) in item.items()
if value is not None
and key not in EXCLUDED_FIELDS}
serialized = self._serializer.serialize(item)
uploaded.write(serialized)
uploaded.write(b'\n')
uploaded_ids.append(item.get('_uid'))

self._add_file_to_upload(archive, self._emails_file, uploaded)

return uploaded_ids

def upload(self, items):
upload_location = '{}.tar.{}'.format(uuid4(), self._compression)

with self._workspace() as workspace:
with self._open(workspace, 'wb') as uploaded:
for item in items:
item = {key: value for (key, value) in item.items()
if value is not None
and key not in EXCLUDED_FIELDS}
serialized = self._serializer.serialize(item)
uploaded.write(serialized)
uploaded.write(b'\n')
uploaded_ids.append(item.get('_uid'))
with self._open(workspace, 'w', upload_location) as archive:
uploaded_ids = self._upload_emails(items, archive)

if uploaded_ids:
workspace.seek(0)
Expand Down
Binary file not shown.
Binary file added tests/files/opwen_email_client/downloads/sync.tar.gz
Binary file not shown.
24 changes: 15 additions & 9 deletions tests/opwen_email_client/domain/email/test_sync.py
Expand Up @@ -2,6 +2,7 @@
from os import mkdir
from os.path import join
from shutil import rmtree
from tempfile import NamedTemporaryFile
from tempfile import mkdtemp
from unittest import TestCase
from unittest.mock import Mock
Expand Down Expand Up @@ -35,22 +36,27 @@ def assertUploadIs(self, expected: bytes):
self.assertEqual(len(uploaded), 1, 'Expected exactly one upload')

with open(uploaded[0], 'rb') as buffer:
with self.sync._open(buffer) as fobj:
with self.sync._open(buffer, 'r', 'archive') as archive:
fobj = self.sync._get_file_from_download(
archive, self.sync._emails_file)
self.assertEqual(expected, fobj.read())

def assertNoUpload(self):
uploaded = glob(join(self._root_folder, self._upload_folder, '*'))
self.assertEqual(len(uploaded), 0, 'Expected no uploads')

def given_download(self, payload: bytes):
resource_id = str(uuid4())
download_filename = join(
self._root_folder, self._download_folder, resource_id)

with open(download_filename, 'wb') as buffer:
with self.sync._open(buffer, 'wb') as fobj:
fobj.write(payload)
buffer.seek(0)
with NamedTemporaryFile() as fobj:
fobj.write(payload)

resource_id = str(uuid4())
download_filename = join(
self._root_folder, self._download_folder, resource_id)

with open(download_filename, 'wb') as buffer:
with self.sync._open(buffer, 'w', 'tar.gz') as archive:
self.sync._add_file_to_upload(
archive, self.sync._emails_file, fobj)

self.email_server_client_mock.download.return_value = (
resource_id,
Expand Down
6 changes: 3 additions & 3 deletions travis/cd.sh
Expand Up @@ -2,15 +2,15 @@

set -eo pipefail

if [ -z "$TRAVIS_TAG" ]; then
if [[ -z "$TRAVIS_TAG" ]]; then
echo "Build is not a release, skipping CD" >&2; exit 0
fi

if [ -z "$PYPI_USERNAME" ] || [ -z "$PYPI_PASSWORD" ]; then
if [[ -z "$PYPI_USERNAME" ]] || [[ -z "$PYPI_PASSWORD" ]]; then
echo "No PyPI credentials configured, unable to publish builds" >&2; exit 1
fi

if [ -z "$DOCKER_USERNAME" ] || [ -z "$DOCKER_PASSWORD" ]; then
if [[ -z "$DOCKER_USERNAME" ]] || [[ -z "$DOCKER_PASSWORD" ]]; then
echo "No docker credentials configured, unable to publish builds" >&2; exit 1
fi

Expand Down
4 changes: 2 additions & 2 deletions travis/ci.sh
Expand Up @@ -2,8 +2,8 @@

set -eo pipefail

if [ -z "$TRAVIS_PYTHON_VERSION" ]; then
if [[ -z "$TRAVIS_PYTHON_VERSION" ]]; then
echo "Build is not targeting a Python version, can't run CI" >&2; exit 1
fi

make ci -e py_env="$HOME/virtualenv/python$TRAVIS_PYTHON_VERSION" -e SHELLCHECK="$(which shellcheck)"
make ci -e py_env="$HOME/virtualenv/python$TRAVIS_PYTHON_VERSION"

0 comments on commit c9e573d

Please sign in to comment.