Skip to content

Commit

Permalink
Merge pull request #1731 from davidmarin/google-sdk-port-gcs
Browse files Browse the repository at this point in the history
Port GCS code from google-api-python-client to google-cloud-sdk (see #1730)
  • Loading branch information
David Marin committed Feb 16, 2018
2 parents 28c4982 + 6e6c842 commit 78824c8
Show file tree
Hide file tree
Showing 12 changed files with 488 additions and 556 deletions.
11 changes: 11 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
v0.6.2, 2018-03-?? -- farewell google-api-python-client
* runners:
* dataproc:
* replaced google-api-python-client with google-cloud-sdk
* GCSFilesystem method changes:
* api_client attr has been replaced with client
* create_bucket() no longer takes a project ID
* delete_bucket() is disabled (use get_bucket(...).delete())
* get_bucket() returns a google.cloud.storage.bucket.Bucket
* list_buckets() is disabled (use get_all_bucket_names())

v0.6.1, 2017-11-27 -- mrjob diagnose
* fixed serious error log parsing issue (#1708)
* added mrjob diagnose utility to find why previously run jobs failed (#1707)
Expand Down
31 changes: 17 additions & 14 deletions mrjob/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@
discovery = None
google_errors = None


try:
from google.api_core.exceptions import NotFound
except:
NotFound = None

try:
# Python 2
import ConfigParser as configparser
Expand Down Expand Up @@ -340,22 +346,20 @@ def _get_tmpdir(self, given_tmpdir):
if given_tmpdir:
return given_tmpdir

mrjob_buckets = self.fs.list_buckets(
self._gcp_project, prefix='mrjob-')

# Loop over buckets until we find one that matches region
# NOTE - because this is a tmpdir, we look for a GCS bucket in the
# same GCE region
chosen_bucket_name = None
gce_lower_location = self._gce_region.lower()
for tmp_bucket in mrjob_buckets:
tmp_bucket_name = tmp_bucket['name']

for tmp_bucket_name in self.fs.get_all_bucket_names(prefix='mrjob-'):
tmp_bucket = self.fs.get_bucket(tmp_bucket_name)

# NOTE - GCP ambiguous Behavior - Bucket location is being
# returned as UPPERCASE, ticket filed as of Apr 23, 2016 as docs
# suggest lowercase
lower_location = tmp_bucket['location'].lower()
if lower_location == gce_lower_location:
# suggest lowercase. (As of Feb. 12, 2018, this is still true,
# observed on google-cloud-sdk)

if tmp_bucket.location.lower() == self._gce_region.lower():
# Regions are both specified and match
log.info("using existing temp bucket %s" % tmp_bucket_name)
chosen_bucket_name = tmp_bucket_name
Expand All @@ -364,7 +368,7 @@ def _get_tmpdir(self, given_tmpdir):
# Example default - "mrjob-us-central1-RANDOMHEX"
if not chosen_bucket_name:
chosen_bucket_name = '-'.join(
['mrjob', gce_lower_location, random_identifier()])
['mrjob', self._gce_region.lower(), random_identifier()])

return 'gs://%s/tmp/' % chosen_bucket_name

Expand Down Expand Up @@ -459,9 +463,8 @@ def _create_fs_tmp_bucket(self, bucket_name, location=None):
try:
self.fs.get_bucket(bucket_name)
return
except google_errors.HttpError as e:
if not e.resp.status == 404:
raise
except NotFound:
pass

log.info('creating FS bucket %r' % bucket_name)

Expand All @@ -471,7 +474,7 @@ def _create_fs_tmp_bucket(self, bucket_name, location=None):
# job (tmp buckets ONLY)
# https://cloud.google.com/storage/docs/bucket-locations
self.fs.create_bucket(
self._gcp_project, bucket_name, location=location,
bucket_name, location=location,
object_ttl_days=_DEFAULT_CLOUD_TMP_DIR_OBJECT_TTL_DAYS)

self._wait_for_fs_sync()
Expand Down

0 comments on commit 78824c8

Please sign in to comment.