Skip to content

Commit

Permalink
This closes #1914
Browse files Browse the repository at this point in the history
  • Loading branch information
aaltay committed Feb 4, 2017
2 parents fbd69dc + c2d8d71 commit 6e220bb
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 10 deletions.
71 changes: 61 additions & 10 deletions sdks/python/apache_beam/io/gcsio.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import logging
import multiprocessing
import os
import Queue
import re
import threading
import traceback
Expand Down Expand Up @@ -63,6 +64,10 @@
# +---------------+------------+-------------+-------------+-------------+
DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024

# This is the number of seconds the library will wait for a partial-file read
# operation from GCS to complete before retrying.
DEFAULT_READ_SEGMENT_TIMEOUT_SECONDS = 60

# This is the size of chunks used when writing to GCS.
WRITE_CHUNK_SIZE = 8 * 1024 * 1024

Expand Down Expand Up @@ -393,18 +398,20 @@ def __init__(self,
client,
path,
mode='r',
buffer_size=DEFAULT_READ_BUFFER_SIZE):
buffer_size=DEFAULT_READ_BUFFER_SIZE,
segment_timeout=DEFAULT_READ_SEGMENT_TIMEOUT_SECONDS):
self.client = client
self.path = path
self.bucket, self.name = parse_gcs_path(path)
self.mode = mode
self.buffer_size = buffer_size
self.segment_timeout = segment_timeout

# Get object state.
get_request = (storage.StorageObjectsGetRequest(
self.get_request = (storage.StorageObjectsGetRequest(
bucket=self.bucket, object=self.name))
try:
metadata = self._get_object_metadata(get_request)
metadata = self._get_object_metadata(self.get_request)
except HttpError as http_error:
if http_error.status_code == 404:
raise IOError(errno.ENOENT, 'Not found: %s' % self.path)
Expand All @@ -415,13 +422,13 @@ def __init__(self,
self.size = metadata.size

# Ensure read is from file of the correct generation.
get_request.generation = metadata.generation
self.get_request.generation = metadata.generation

# Initialize read buffer state.
self.download_stream = cStringIO.StringIO()
self.downloader = transfer.Download(
self.download_stream, auto_transfer=False, chunksize=buffer_size)
self.client.objects.Get(get_request, download=self.downloader)
self.download_stream, auto_transfer=False, chunksize=self.buffer_size)
self.client.objects.Get(self.get_request, download=self.downloader)
self.position = 0
self.buffer = ''
self.buffer_start_position = 0
Expand Down Expand Up @@ -539,7 +546,47 @@ def _fetch_next_if_buffer_exhausted(self):
self.buffer_start_position + len(self.buffer) <= self.position):
bytes_to_request = min(self._remaining(), self.buffer_size)
self.buffer_start_position = self.position
self.buffer = self._get_segment(self.position, bytes_to_request)
retry_count = 0
while retry_count <= 10:
queue = Queue.Queue()
t = threading.Thread(target=self._fetch_to_queue,
args=(queue, self._get_segment,
(self.position, bytes_to_request)))
t.daemon = True
t.start()
try:
result, exn, tb = queue.get(timeout=self.segment_timeout)
except Queue.Empty:
logging.warning(
('Timed out fetching %d bytes from position %d of %s after %f '
'seconds; retrying...'), bytes_to_request, self.position,
self.path, self.segment_timeout)
retry_count += 1
# Reinitialize download objects.
self.download_stream = cStringIO.StringIO()
self.downloader = transfer.Download(
self.download_stream, auto_transfer=False,
chunksize=self.buffer_size)
self.client.objects.Get(self.get_request, download=self.downloader)
continue
if exn:
logging.error(
('Exception while fetching %d bytes from position %d of %s: '
'%s\n%s'),
bytes_to_request, self.position, self.path, exn, tb)
raise exn
self.buffer = result
return
raise GcsIOError(
'Reached retry limit for _fetch_next_if_buffer_exhausted.')

def _fetch_to_queue(self, queue, func, args):
try:
value = func(*args)
queue.put((value, None, None))
except Exception as e: # pylint: disable=broad-except
tb = traceback.format_exc()
queue.put((None, e, tb))

def _remaining(self):
return self.size - self.position
Expand All @@ -555,11 +602,15 @@ def _get_segment(self, start, size):
"""Get the given segment of the current GCS file."""
if size == 0:
return ''
# The objects self.downloader and self.download_stream may be recreated if
# this call times out; we save them locally to avoid any threading issues.
downloader = self.downloader
download_stream = self.download_stream
end = start + size - 1
self.downloader.GetRange(start, end)
value = self.download_stream.getvalue()
downloader.GetRange(start, end)
value = download_stream.getvalue()
# Clear the cStringIO object after we've read its contents.
self.download_stream.truncate(0)
download_stream.truncate(0)
assert len(value) == size
return value

Expand Down
38 changes: 38 additions & 0 deletions sdks/python/apache_beam/io/gcsio_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import os
import random
import threading
import time
import unittest

import httplib2
Expand Down Expand Up @@ -427,6 +428,43 @@ def test_full_file_read(self):
f.seek(0)
self.assertEqual(f.read(), random_file.contents)

def test_flaky_file_read(self):
file_name = 'gs://gcsio-test/flaky_file'
file_size = 5 * 1024 * 1024 + 100
random_file = self._insert_random_file(self.client, file_name, file_size)
f = self.gcs.open(file_name)
random.seed(0)
f.buffer_size = 1024 * 1024
f.segment_timeout = 0.1
self.assertEqual(f.mode, 'r')
f._real_get_segment = f._get_segment

def flaky_get_segment(start, size):
if random.randint(0, 3) == 1:
time.sleep(600)
return f._real_get_segment(start, size)

f._get_segment = flaky_get_segment
self.assertEqual(f.read(), random_file.contents)

# Test exception handling in file read.
def failing_get_segment(unused_start, unused_size):
raise IOError('Could not read.')

f._get_segment = failing_get_segment
f.seek(0)
with self.assertRaises(IOError):
f.read()

# Test retry limit in hanging file read.
def hanging_get_segment(unused_start, unused_size):
time.sleep(600)

f._get_segment = hanging_get_segment
f.seek(0)
with self.assertRaises(gcsio.GcsIOError):
f.read()

def test_file_random_seek(self):
file_name = 'gs://gcsio-test/seek_file'
file_size = 5 * 1024 * 1024 - 100
Expand Down

0 comments on commit 6e220bb

Please sign in to comment.