Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 54 additions & 4 deletions awscli/customizations/s3/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@
# language governing permissions and limitations under the License.
import logging
import sys
import threading
from collections import namedtuple

from s3transfer.subscribers import BaseSubscriber

from awscli.compat import queue
from awscli.customizations.s3.executor import ShutdownThreadRequest
from awscli.customizations.s3.utils import relative_path
from awscli.customizations.s3.utils import human_readable_size
from awscli.customizations.s3.utils import uni_print
Expand Down Expand Up @@ -138,7 +141,13 @@ def _get_src_dest(self, future):
return src, dest


class ResultRecorder(object):
class BaseResultHandler(object):
"""Base handler class to be called in the ResultProcessor"""
def __call__(self, result):
raise NotImplementedError('__call__()')


class ResultRecorder(BaseResultHandler):
"""Records and track transfer statistics based on results receieved"""
def __init__(self):
self.bytes_transferred = 0
Expand All @@ -161,7 +170,7 @@ def __init__(self):
WarningResult: self._record_warning_result,
}

def record_result(self, result):
def __call__(self, result):
"""Record the result of an individual Result object"""
self._result_handler_map.get(type(result), self._record_noop)(
result=result)
Expand Down Expand Up @@ -217,7 +226,7 @@ def _record_warning_result(self, **kwargs):
self.files_warned += 1


class ResultPrinter(object):
class ResultPrinter(BaseResultHandler):
PROGRESS_FORMAT = (
'Completed {bytes_completed}/{expected_bytes_completed} with '
'{remaining_files} file(s) remaining.'
Expand Down Expand Up @@ -260,7 +269,7 @@ def __init__(self, result_recorder, out_file=sys.stdout,
WarningResult: self._print_warning,
}

def print_result(self, result):
def __call__(self, result):
"""Print the progress of the ongoing transfer based on a result"""
self._result_handler_map.get(type(result), self._print_noop)(
result=result)
Expand Down Expand Up @@ -356,3 +365,44 @@ def _print_progress(self, **kwargs):

def _print_success(self, result, **kwargs):
pass


class ResultProcessor(threading.Thread):
def __init__(self, result_queue, result_handlers=None):
"""Thread to process results from result queue

This includes recording statistics and printing transfer status

:param result_queue: The result queue to process results from
:param result_handlers: A list of callables that take a result in as
a parameter to process the result for that handler.
"""
threading.Thread.__init__(self)
self._result_queue = result_queue
self._result_handlers = result_handlers
if self._result_handlers is None:
self._result_handlers = []

def run(self):
while True:
try:
result = self._result_queue.get(True)
if isinstance(result, ShutdownThreadRequest):
LOGGER.debug(
'Shutdown request received in result processing '
'thread, shutting down result thread.')
break
LOGGER.debug('Received result: %s', result)
self._process_result(result)
except queue.Empty:
pass

def _process_result(self, result):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth just having a generic ResultHandler interface and having the __init__ take a list of result handlers? You could then either write adapters for the recorder/printer or update them to have the same interface.

What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Posssibly. I was actually considering doing that. The only reason I did not was because I did not see needing it in doing the port, but taking an arbitrary list of handlers is pretty flexible so I would be totally fine with making that the interface.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, don't feel too strongly about it for this specific PR, it would just clean up the code a little bit:

for handler in self._result_handlers:
    handler.handle_result(result)

vs. having to check if a particular handler was None or not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm updating it. I did not like the None check either.

for result_handler in self._result_handlers:
try:
LOGGER.debug('Processing %s with %s', result, result_handler)
result_handler(result)
except Exception as e:
LOGGER.debug(
'Error processing result %s with handler %s: %s',
result, result_handler, e, exc_info=True)
Loading