Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

XML compression #27022

Merged
merged 42 commits into from
Apr 21, 2020
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
adbdc3a
XML compression
satyaakam Apr 1, 2020
1851c6d
Changes made as per the suggestions in PR 27022
satyaakam Apr 3, 2020
6ff7e25
Resolving conflits
satyaakam Apr 3, 2020
55fd72e
merged master into compress-xml
satyaakam Apr 3, 2020
787604b
Merge branch 'master' of https://github.com/dimagi/commcare-hq into c…
satyaakam Apr 10, 2020
c843658
Changes to the compress xml as review
satyaakam Apr 10, 2020
ed006c5
Merge branch 'master' of https://github.com/dimagi/commcare-hq into c…
satyaakam Apr 10, 2020
071b011
Fixing fucntion call
satyaakam Apr 10, 2020
5511232
Missing steps to make Bower work as normal user
satyaakam Apr 14, 2020
233f255
Merge branch 'master' of https://github.com/dimagi/commcare-hq
satyaakam Apr 15, 2020
8f63335
Updated Tests and custom exceptions added
satyaakam Apr 15, 2020
1ea2d13
Merge branch 'master' into compress-xml
satyaakam Apr 15, 2020
7a43156
Fixed Lint issues
satyaakam Apr 15, 2020
b8a190b
simplify validate_args
snopoke Apr 16, 2020
acb621f
nits
snopoke Apr 16, 2020
7756fad
add missing type_code kwargs
snopoke Apr 16, 2020
2f569c3
remove unused imports
snopoke Apr 16, 2020
027d73c
Merge branch 'master' into compress-xml
snopoke Apr 16, 2020
e0d7c55
handle exception
snopoke Apr 16, 2020
95d2da0
more type_code args + test fixes
snopoke Apr 16, 2020
52c51c9
make meta.content_length = uncompressed size
snopoke Apr 16, 2020
1e0a50d
uncompressed content_length when copying a blob
snopoke Apr 16, 2020
557264f
test with compression
snopoke Apr 16, 2020
1853cbc
store compressed length and always return a BlobStream instance
snopoke Apr 17, 2020
b13480f
simplify `blobmeta.open()`
snopoke Apr 17, 2020
5f6e847
reusue util to get sizes
snopoke Apr 17, 2020
a4a90a8
minor simplication
snopoke Apr 17, 2020
f4bc531
prefer using meta for lookup if it's available
snopoke Apr 17, 2020
77ed3e5
better test
snopoke Apr 17, 2020
820f81d
Update corehq/blobs/interface.py
snopoke Apr 17, 2020
70a4090
merging upstream changes
satyaakam Apr 20, 2020
48c9e0e
Merging upstream changes
satyaakam Apr 20, 2020
aa7348f
fix argument order
snopoke Apr 17, 2020
6d2f0c0
remove GzipCompressReadStream.Buffer.content_length
snopoke Apr 20, 2020
6f6466f
Merge branch 'master' into compress-xml
snopoke Apr 20, 2020
468bdb1
GzipCompressReadStream -> GzipStream
millerdev Apr 20, 2020
d83f529
GzipStreamAttrAccessBeforeRead -> GzipStreamError
millerdev Apr 20, 2020
4e6677f
Move IO buffer out of GzipStream class
millerdev Apr 20, 2020
3b3c00f
Fix bugs in GzipStream
millerdev Apr 20, 2020
470f2ca
Merge pull request #1 from dimagi/dm/compress-xml
snopoke Apr 21, 2020
7c5d4bf
save to file before extracting in test
snopoke Apr 21, 2020
05cadf1
fallback to partial read if seek not supported
snopoke Apr 21, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion DEV_SETUP.md
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,8 @@ you'll need to install `bower` and run `bower install`. Follow these steps to in
$ sudo npm -g install bower

3. Run bower with:

$ sudo chown -R $USER:$GROUP ~/.npm
$ sudo chown -R $USER:$GROUP ~/.config
$ bower install


Expand Down
2 changes: 1 addition & 1 deletion corehq/apps/case_importer/tracking/filestorage.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def write_file(self, f, filename, domain):
def get_tempfile_ref_for_contents(self, identifier):
filename = self.get_filename(identifier)
suffix = file_extention_from_filename(filename)
content = get_blob_db().get(key=identifier).read()
content = get_blob_db().get(key=identifier, type_code=CODES.data_import).read()
return make_temp_file(content, suffix)

@memoized
Expand Down
6 changes: 3 additions & 3 deletions corehq/apps/domain/tests/test_delete_domain.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@
from corehq.apps.users.models import DomainRequest, SQLInvitation
from corehq.apps.zapier.consts import EventTypes
from corehq.apps.zapier.models import ZapierSubscription
from corehq.blobs import NotFound, get_blob_db
from corehq.blobs import NotFound, get_blob_db, CODES
from corehq.form_processor.backends.sql.dbaccessors import (
CaseAccessorSQL,
FormAccessorSQL,
Expand Down Expand Up @@ -630,9 +630,9 @@ def test_export_delete(self):
self.domain.delete()

with self.assertRaises(NotFound):
blobdb.get(key=data_files[0].blob_id)
blobdb.get(key=data_files[0].blob_id, type_code=CODES.data_file)

with blobdb.get(key=data_files[1].blob_id) as f:
with blobdb.get(key=data_files[1].blob_id, type_code=CODES.data_file) as f:
self.assertEqual(f.read(), (self.domain2.name + " csv").encode('utf-8'))

self._assert_export_counts(self.domain.name, 0)
Expand Down
2 changes: 1 addition & 1 deletion corehq/apps/export/models/new.py
Original file line number Diff line number Diff line change
Expand Up @@ -2770,7 +2770,7 @@ def save_blob(cls, file_obj, domain, filename, description, content_type, delete
def get_blob(self):
db = get_blob_db()
try:
blob = db.get(key=self._meta.key)
blob = db.get(key=self._meta.key, type_code=CODES.data_file)
except (KeyError, NotFound) as err:
raise NotFound(str(err))
return blob
Expand Down
2 changes: 1 addition & 1 deletion corehq/apps/hqadmin/service_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ def check_blobdb():
parent_id="check_blobdb",
type_code=CODES.tempfile,
)
with db.get(key=meta.key) as fh:
with db.get(key=meta.key, type_code=CODES.tempfile) as fh:
res = fh.read()
db.delete(key=meta.key)
if res == contents:
Expand Down
2 changes: 1 addition & 1 deletion corehq/apps/ota/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def get_restore_as_string(self):
blob.close()

def _get_restore_xml(self):
return get_blob_db().get(key=self.restore_blob_id)
return get_blob_db().get(key=self.restore_blob_id, type_code=CODES.demo_user_restore)

def delete(self):
"""
Expand Down
4 changes: 2 additions & 2 deletions corehq/apps/reports/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@
DEID_EXPORT_PERMISSION,
FORM_EXPORT_PERMISSION,
)
from corehq.blobs import NotFound, get_blob_db, models
from corehq.blobs import CODES, NotFound, get_blob_db, models
from corehq.form_processor.exceptions import CaseNotFound
from corehq.form_processor.interfaces.dbaccessors import (
CaseAccessors,
Expand Down Expand Up @@ -2063,7 +2063,7 @@ def export_report(request, domain, export_hash, format):
report_class = meta.properties["report_class"]

try:
report_file = db.get(export_hash)
report_file = db.get(export_hash, type_code=CODES.tempfile)
except NotFound:
return report_not_found
with report_file:
Expand Down
5 changes: 5 additions & 0 deletions corehq/blobs/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,8 @@ class InvalidContext(Error):

class NotFound(Error):
"""Raised when an attachment cannot be found"""


class GzipStreamAttrAccessBeforeRead(Exception):
"""Raised when an attribute (eg: content_length) of the
Gzip Stream is accessed before the stream is read completely"""
5 changes: 2 additions & 3 deletions corehq/blobs/export.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import os

from . import get_blob_db, NotFound
from . import get_blob_db, NotFound, CODES
snopoke marked this conversation as resolved.
Show resolved Hide resolved
from .migrate import PROCESSING_COMPLETE_MESSAGE
from .models import BlobMeta
from .zipdb import get_export_filename, ZipBlobDB
Expand All @@ -23,10 +23,9 @@ def __exit__(self, exc_type, exc_val, exc_tb):
print(PROCESSING_COMPLETE_MESSAGE.format(self.not_found, self.total_blobs))

def process_object(self, meta):
from_db = get_blob_db()
self.total_blobs += 1
try:
content = from_db.get(key=meta.key)
content = meta.open()
except NotFound:
self.not_found += 1
else:
Expand Down
10 changes: 8 additions & 2 deletions corehq/blobs/fsdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@
"""
import os
from collections import namedtuple
from gzip import GzipFile
from hashlib import md5
from os.path import commonprefix, exists, isabs, isdir, dirname, join, realpath, sep

from corehq.blobs.exceptions import BadName, NotFound
from corehq.blobs.interface import AbstractBlobDB
from corehq.blobs.util import check_safe_key
from corehq.blobs.util import check_safe_key, GzipCompressReadStream
from corehq.util.metrics import metrics_counter

CHUNK_SIZE = 4096
Expand All @@ -28,6 +29,8 @@ def put(self, content, **blob_meta_args):
dirpath = dirname(path)
if not isdir(dirpath):
os.makedirs(dirpath)
if meta.compressed:
content = GzipCompressReadStream(content)
length = 0
digest = md5()
with open(path, "wb") as fh:
Expand All @@ -42,11 +45,14 @@ def put(self, content, **blob_meta_args):
self.metadb.put(meta)
return meta

def get(self, key):
def get(self, key=None, type_code=None, meta=None):
key = self._validate_get_args(key, type_code, meta)
path = self.get_path(key)
if not exists(path):
metrics_counter('commcare.blobdb.notfound')
raise NotFound(key)
if meta and meta.compressed:
return GzipFile(path)
return open(path, "rb")

def size(self, key):
Expand Down
26 changes: 24 additions & 2 deletions corehq/blobs/interface.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from abc import ABCMeta, abstractmethod

from . import CODES
from .metadata import MetaDB

NOT_SET = object()
Expand Down Expand Up @@ -52,15 +53,36 @@ def put(self, content, **blob_meta_args):
raise NotImplementedError

@abstractmethod
def get(self, key):
"""Get a blob
def get(self, key=None, type_code=None, meta=None):
"""Get a blob.

:param key: Blob key.
:param type_code: Blob type code.
:param meta: BlobMeta instance.

key and type_code are required if meta is not provided. If meta
is provided, then key and type_code should be None. For type_code
form_xml, meta is required.

:returns: A file-like object in binary read mode. The returned
object should be closed when finished reading.
"""
raise NotImplementedError

@staticmethod
def _validate_get_args(key, type_code, meta):
if key is not None or type_code is not None:
if meta is not None:
raise ValueError("'key' and 'meta' are mutually exclusive")
if type_code == CODES.form_xml:
raise ValueError("form XML must be loaded with 'meta' argument")
if key is None or type_code is None:
raise ValueError("'key' must be specified with 'type_code'")
return key
if meta is None:
raise ValueError("'key' and 'type_code' or 'meta' is required")
return meta.key
snopoke marked this conversation as resolved.
Show resolved Hide resolved

@abstractmethod
def exists(self, key):
"""Check if blob exists
Expand Down
1 change: 1 addition & 0 deletions corehq/blobs/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def new(self, **blob_meta_args):
"keyword arguments are incompatible with `meta` argument")
return blob_meta_args["meta"]
timeout = blob_meta_args.pop("timeout", None)
blob_meta_args['compressed'] = blob_meta_args.get('type_code') == CODES.form_xml
meta = BlobMeta(**blob_meta_args)
if not meta.domain:
raise TypeError("domain is required")
Expand Down
2 changes: 1 addition & 1 deletion corehq/blobs/migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ def migrate(self, doc):
meta = doc["_obj_not_json"]
self.total_blobs += 1
try:
content = self.db.old_db.get(key=meta.key)
content = meta.open(db=self.db.old_db)
except NotFound:
if not self.db.new_db.exists(key=meta.key):
self.save_backup(doc)
Expand Down
20 changes: 20 additions & 0 deletions corehq/blobs/migrations/0011_blobmeta_compressed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# -*- coding: utf-8 -*-
from django.db import migrations, models

from corehq.sql_db.migrations import partitioned


@partitioned
class Migration(migrations.Migration):
snopoke marked this conversation as resolved.
Show resolved Hide resolved

dependencies = [
('blobs', '0010_auto_20191023_0938'),
]

operations = [
migrations.AddField(
model_name='blobmeta',
name='compressed',
field=models.NullBooleanField(),
),
]
3 changes: 2 additions & 1 deletion corehq/blobs/mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,8 @@ def fetch_attachment(self, name, stream=False):
return super(BlobMixin, self) \
.fetch_attachment(name, stream=stream)
raise NotFound(name)
blob = db.get(key=key)
meta = db.metadb.get(parent_id=self._id, key=key)
blob = meta.open()
except NotFound:
raise ResourceNotFound(
"{model} {model_id} attachment: {name!r}".format(
Expand Down
13 changes: 10 additions & 3 deletions corehq/blobs/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from django.db.models import (
BigIntegerField,
NullBooleanField,
CharField,
DateTimeField,
IntegerField,
Expand Down Expand Up @@ -59,6 +60,7 @@ class BlobMeta(PartitionedModel, Model):
properties = NullJsonField(default=dict)
created_on = DateTimeField(default=datetime.utcnow)
expires_on = DateTimeField(default=None, null=True)
compressed = NullBooleanField()

class Meta:
unique_together = [
Expand Down Expand Up @@ -91,13 +93,18 @@ def is_image(self):
"""Use content type to check if blob is an image"""
return (self.content_type or "").startswith("image/")

def open(self):
def open(self, db=None):
"""Get a file-like object containing blob content

The returned object should be closed when it is no longer needed.
"""
from . import get_blob_db
return get_blob_db().get(key=self.key)
from . import get_blob_db, CODES
if self.type_code == CODES.form_xml:
kwargs = {'meta': self}
else:
kwargs = {'key': self.key, 'type_code': self.type_code}
db = db or get_blob_db()
return db.get(**kwargs)
snopoke marked this conversation as resolved.
Show resolved Hide resolved

def blob_exists(self):
from . import get_blob_db
Expand Down
20 changes: 15 additions & 5 deletions corehq/blobs/s3db.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
import weakref
from contextlib import contextmanager
from io import RawIOBase, UnsupportedOperation
from gzip import GzipFile

from corehq.blobs.exceptions import NotFound
from corehq.blobs.interface import AbstractBlobDB
from corehq.blobs.util import check_safe_key
from corehq.blobs.util import check_safe_key, GzipCompressReadStream
from corehq.util.metrics import metrics_counter, metrics_histogram_timer
from dimagi.utils.logging import notify_exception

Expand Down Expand Up @@ -73,17 +74,23 @@ def put(self, content, **blob_meta_args):
s3_bucket.copy(source, meta.key)
else:
content.seek(0)
meta.content_length = get_file_size(content)
self.metadb.put(meta)
if meta.compressed:
content = GzipCompressReadStream(content)
with self.report_timing('put', meta.key):
s3_bucket.upload_fileobj(content, meta.key)
meta.content_length = get_file_size(content)
self.metadb.put(meta)
return meta

def get(self, key):
def get(self, key=None, type_code=None, meta=None):
key = self._validate_get_args(key, type_code, meta)
snopoke marked this conversation as resolved.
Show resolved Hide resolved
check_safe_key(key)
with maybe_not_found(throw=NotFound(key)), self.report_timing('get', key):
resp = self._s3_bucket().Object(key).get()
return BlobStream(resp["Body"], self, key)
blobstream = BlobStream(resp["Body"], self, key)
if meta and meta.compressed:
snopoke marked this conversation as resolved.
Show resolved Hide resolved
return GzipFile(blobstream)
return blobstream

def size(self, key):
check_safe_key(key)
Expand Down Expand Up @@ -189,6 +196,9 @@ def is_not_found(err, not_found_codes=["NoSuchKey", "NoSuchBucket", "404"]):


def get_file_size(fileobj):
if isinstance(fileobj, GzipCompressReadStream):
return fileobj.content_length

# botocore.response.StreamingBody has a '_content_length' attribute
length = getattr(fileobj, "_content_length", None)
if length is not None:
Expand Down
42 changes: 42 additions & 0 deletions corehq/blobs/tests/test_util.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import gzip
import tempfile
from unittest import TestCase

import corehq.blobs.util as mod
from corehq.blobs.exceptions import GzipStreamAttrAccessBeforeRead


class TestRandomUrlId(TestCase):
Expand All @@ -16,3 +19,42 @@ def test_random_id_length(self):

def test_random_id_randomness(self):
self.assertEqual(len(set(self.ids)), self.sample_size, self.ids)


class TestGzipCompressReadStream(TestCase):

def _is_gzip_compressed(self, file_):
with gzip.open(file_, 'r') as f:
try:
f.read(1)
return True
except OSError:
return False

def test_compression(self):
with tempfile.NamedTemporaryFile() as f:
f.write(b"x")
compress_stream = mod.GzipCompressReadStream(f)
with tempfile.NamedTemporaryFile() as compressed_f:
compressed_f.write(compress_stream.read())
self.assertTrue(self._is_gzip_compressed(compressed_f))

def test_content_length_access(self):
with tempfile.NamedTemporaryFile() as f:
f.seek(10)
f.write(b"x")
compress_stream = mod.GzipCompressReadStream(f)

snopoke marked this conversation as resolved.
Show resolved Hide resolved
# Try to read content_length without reading the stream
with self.assertRaises(GzipStreamAttrAccessBeforeRead):
compress_stream.content_length # noqa

# Try to read content_length after partially reading the stream
content_length = len(compress_stream.read(5))
with self.assertRaises(GzipStreamAttrAccessBeforeRead):
compress_stream.content_length # noqa

# Read content_length after completely reading the stream and check
# that it's correct
content_length += len(compress_stream.read())
self.assertEqual(compress_stream.content_length, content_length)
Loading