Skip to content

Commit

Permalink
Merge pull request #1770 from davidmarin/google-storage-shim
Browse files Browse the repository at this point in the history
require google-cloud-storage>=1.9.0
  • Loading branch information
David Marin committed May 8, 2018
2 parents 540e442 + 4ce3367 commit dd88a72
Show file tree
Hide file tree
Showing 5 changed files with 3 additions and 90 deletions.
5 changes: 1 addition & 4 deletions mrjob/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
from mrjob.conf import combine_dicts
from mrjob.fs.composite import CompositeFilesystem
from mrjob.fs.gcs import GCSFilesystem
from mrjob.fs.gcs import _download_as_string
from mrjob.fs.gcs import is_gcs_uri
from mrjob.fs.gcs import parse_gcs_uri
from mrjob.fs.local import LocalFilesystem
Expand Down Expand Up @@ -891,9 +890,7 @@ def _get_new_driver_output_lines(self, driver_output_uri):
log_blob = self.fs._get_blob(log_uri)

try:
# TODO: use start= kwarg once google-cloud-storage 1.9 is out
#new_data = log_blob.download_as_string()[state['pos']:]
new_data = _download_as_string(log_blob, start=state['pos'])
new_data = log_blob.download_as_string(start=state['pos'])
except (google.api_core.exceptions.NotFound,
google.api_core.exceptions.RequestRangeNotSatisfiable):
# blob was just created, or no more data is available
Expand Down
72 changes: 1 addition & 71 deletions mrjob/fs/gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ def _cat_blob(self, gcs_uri):
while True:
end = start + _CAT_CHUNK_SIZE
try:
chunk = _download_as_string(blob, start=start, end=end)
chunk = blob.download_as_string(start=start, end=end)
except google.api_core.exceptions.RequestRangeNotSatisfiable:
return

Expand Down Expand Up @@ -293,73 +293,3 @@ def parse_gcs_uri(uri):
raise ValueError('Invalid GCS URI: %s' % uri)

return components.netloc, components.path[1:]


# temporary shim for incremental download, taken from
# https://github.com/GoogleCloudPlatform/google-cloud-python
# Remove this once google-cloud-storage>1.8.0 comes out.

# note that this raises RequestRangeNotSatisfiable if start is at the
# end of the blob
def _download_as_string(blob, client=None, start=None, end=None):
string_buffer = BytesIO()
_download_to_file(
blob, string_buffer, client=client, start=start, end=end)
return string_buffer.getvalue()

# don't call the functions below directly; they're just to support
# _download_as_string()

def _download_to_file(blob, file_obj, client=None, start=None, end=None):
download_url = blob._get_download_url()
headers = _get_encryption_headers(blob._encryption_key)
headers['accept-encoding'] = 'gzip'

transport = blob._get_transport(client)
try:
_do_download(
blob, transport, file_obj, download_url, headers, start, end)
except google.resumable_media.InvalidResponse as exc:
_raise_from_invalid_response(exc)


def _do_download(blob, transport, file_obj, download_url, headers,
start=None, end=None):
if blob.chunk_size is None:
download = google.resumable_media.requests.Download(
download_url, stream=file_obj, headers=headers,
start=start, end=end)
download.consume(transport)
else:
download = google.resumable_media.requests.ChunkedDownload(
download_url, blob.chunk_size, file_obj, headers=headers,
start=start if start else 0, end=end)

while not download.finished:
download.consume_next_chunk(transport)


def _get_encryption_headers(key, source=False):
if key is None:
return {}

key = google.cloud._helpers._to_bytes(key)
key_hash = hashlib.sha256(key).digest()
key_hash = base64.b64encode(key_hash)
key = base64.b64encode(key)

if source:
prefix = 'X-Goog-Copy-Source-Encryption-'
else:
prefix = 'X-Goog-Encryption-'

return {
prefix + 'Algorithm': 'AES256',
prefix + 'Key': google.cloud.helpers._bytes_to_unicode(key),
prefix + 'Key-Sha256': (
google.cloud.helpers._bytes_to_unicode(key_hash)),
}


def _raise_from_invalid_response(error):
raise google.cloud.exceptions.from_http_response(error.response)
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
'PyYAML>=3.08',
'google-cloud>=0.32.0',
'google-cloud-dataproc',
'google-cloud-storage>=1.9.0',
'grpcio>=1.9.1',
],
'provides': ['mrjob'],
Expand Down
6 changes: 0 additions & 6 deletions tests/mock_google/case.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
from .dataproc import MockGoogleDataprocJobClient
from .logging import MockGoogleLoggingClient
from .storage import MockGoogleStorageClient
from .storage import _mock_download_as_string_shim
from tests.mr_two_step_job import MRTwoStepJob
from tests.py2 import Mock
from tests.py2 import patch
Expand Down Expand Up @@ -82,11 +81,6 @@ def setUp(self):
self.start(patch('google.cloud.storage.client.Client',
self.storage_client))

self.start(patch('mrjob.dataproc._download_as_string',
_mock_download_as_string_shim))
self.start(patch('mrjob.fs.gcs._download_as_string',
_mock_download_as_string_shim))

self.start(patch('time.sleep'))

def auth_default(self, scopes=None):
Expand Down
9 changes: 0 additions & 9 deletions tests/mock_google/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,6 @@ def delete(self):

del self._fs[self.bucket.name]['blobs'][self.name]

# this mocks a future version of this method which is
# currently only available in dev. Our code accesses the start and end
# keywords through mrjob.fs.gcs._download_as_string(). See
# _mock_download_as_string_shim() below.
def download_as_string(self, client=None, start=None, end=None):
try:
data = self._fs[self.bucket.name]['blobs'][self.name]['data']
Expand Down Expand Up @@ -213,8 +209,3 @@ def _set_md5_hash(self):
pass

self.md5_hash = b64encode(md5(data).digest())


# mock mrjob.fs.gcs._download_as_string(), which is a shim
def _mock_download_as_string_shim(blob, client=None, start=None, end=None):
return blob.download_as_string(client=client, start=start, end=end)

0 comments on commit dd88a72

Please sign in to comment.