Skip to content

Commit

Permalink
Cosmetic changes
Browse files Browse the repository at this point in the history
  • Loading branch information
georgeyk committed Feb 2, 2017
1 parent e529436 commit 38a05a0
Show file tree
Hide file tree
Showing 7 changed files with 10 additions and 20 deletions.
7 changes: 2 additions & 5 deletions loafer/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@


class LoaferDispatcher:

def __init__(self, routes, max_jobs=None):
self.routes = routes
jobs = max_jobs or len(routes) * 2
Expand Down Expand Up @@ -36,14 +35,12 @@ async def dispatch_message(self, message, route):
logger.warning('message will be ignored:\n{!r}\n'.format(message))
return False

# Since we don't know what will happen on message handler, use semaphore
# to protect scheduling or executing too many coroutines/threads
with await self._semaphore:
try:
await route.deliver(content)
except (DeleteMessage) as exc:
except DeleteMessage as exc:
logger.info('message acknowledged:\n{!r}\n'.format(message))
# eg, we will return True at the end
# We will return True at the end to acknowledge it
except KeepMessage as exc:
logger.info('message not acknowledged:\n{!r}\n'.format(message))
return False
Expand Down
9 changes: 4 additions & 5 deletions loafer/ext/aws/message_translators.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,18 @@


class SQSMessageTranslator:

def translate(self, message):
try:
body = message['Body']
except (KeyError, TypeError):
logger.error('missing Body key in SQS message. It really came from SQS ?'
'\nmessage={}'.format(message))
'\nmessage={!r}'.format(message))
return {'content': None}

try:
return {'content': json.loads(body)}
except json.decoder.JSONDecodeError as exc:
logger.error('error={!r} message={}'.format(exc, message))
logger.error('error={!r} message={!r}'.format(exc, message))
return {'content': None}


Expand All @@ -29,11 +28,11 @@ def translate(self, message):
except (KeyError, TypeError):
logger.error(
'Missing Body or Message key in SQS message. It really came from SNS ?'
'\nmessage={}'.format(message))
'\nmessage={!r}'.format(message))
return {'content': None}

try:
return {'content': json.loads(message)}
except (json.decoder.JSONDecodeError, TypeError) as exc:
logger.error('error={!r} message={}'.format(exc, message))
logger.error('error={!r} message={!r}'.format(exc, message))
return {'content': None}
6 changes: 2 additions & 4 deletions loafer/ext/aws/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@


class SQSProvider:

def __init__(self, source, endpoint_url=None, use_ssl=True, options=None, loop=None):
self.source = source
self.endpoint_url = endpoint_url
Expand All @@ -22,8 +21,7 @@ def __init__(self, source, endpoint_url=None, use_ssl=True, options=None, loop=N
@cached_property
def client(self):
session = aiobotocore.get_session(loop=self._loop)
return session.create_client('sqs', endpoint_url=self.endpoint_url,
use_ssl=self.use_ssl)
return session.create_client('sqs', endpoint_url=self.endpoint_url, use_ssl=self.use_ssl)

async def get_queue_url(self):
response = await self.client.get_queue_url(QueueName=self.source)
Expand All @@ -33,7 +31,7 @@ async def confirm_message(self, message):
logger.info('confirm message (ACK/deletion)')

receipt = message['ReceiptHandle']
logger.debug('receipt={}'.format(receipt))
logger.debug('receipt={!r}'.format(receipt))

queue_url = await self.get_queue_url()
return await self.client.delete_message(QueueUrl=queue_url, ReceiptHandle=receipt)
Expand Down
4 changes: 1 addition & 3 deletions loafer/managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@


class LoaferManager:

def __init__(self, routes, runner=None):
self.runner = runner or LoaferRunner(on_stop_callback=self.on_loop__stop)
self.routes = routes
Expand All @@ -21,8 +20,7 @@ def dispatcher(self):
return LoaferDispatcher(self.routes)

def run(self, forever=True):
self._future = asyncio.gather(self.dispatcher.dispatch_providers(),
loop=self.runner.loop)
self._future = asyncio.gather(self.dispatcher.dispatch_providers(), loop=self.runner.loop)
self._future.add_done_callback(self.on_future__errors)
self.runner.start(self._future, run_forever=forever)

Expand Down
2 changes: 1 addition & 1 deletion loafer/message_translators.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@
class StringMessageTranslator:

def translate(self, message):
logger.debug('{!r} will translate {}'.format(type(self).__name__, message))
logger.debug('{!r} will translate {!r}'.format(type(self).__name__, message))
return {'content': str(message)}
1 change: 0 additions & 1 deletion loafer/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@


class Route:

def __init__(self, provider, handler, name='default',
message_translator=None, error_handler=None):
self.name = name
Expand Down
1 change: 0 additions & 1 deletion loafer/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@


class LoaferRunner:

def __init__(self, loop=None, max_workers=None, on_stop_callback=None):
self._on_stop_callback = on_stop_callback
self.loop = loop or asyncio.get_event_loop()
Expand Down

0 comments on commit 38a05a0

Please sign in to comment.