Skip to content
Permalink
 
 
Cannot retrieve contributors at this time
import os
import boto3
import hashlib
from .helpers import safe_dumps
MESSAGE_TABLE_NAME = os.environ['DYNAMO_TABLE_NAME']
# DynamoDB stuff
def get_dynamodb(): # pragma: no cover
"""Return a dynamodb resource"""
return boto3.resource('dynamodb')
def get_table(table_name=MESSAGE_TABLE_NAME): # pragma: no cover
"""Create a dynamodb resource"""
dynamodb = get_dynamodb()
return dynamodb.Table(table_name)
def set_connection_id(connection_id, channel='general'):
"""Save an individual connection to a given channel"""
table = get_table()
conn_key = _get_channel_connections_key(channel)
coll_name = _get_connection_column_name(connection_id)
update_expr = 'SET {} = :value'.format(coll_name)
table.update_item(
Key=conn_key,
UpdateExpression=update_expr,
ExpressionAttributeValues={
':value': {
'connection_id': connection_id,
'channel': channel,
}
},
ReturnValues='UPDATED_NEW',
)
# Also update the list of channels
table.update_item(
Key=_get_channels_list_key(),
UpdateExpression='ADD channels :value',
ExpressionAttributeValues={':value': set([channel])},
)
def delete_connection_id(connection_id, channel='general'):
"""Delete an item from DynamoDB which represents a client being connected"""
table = get_table()
conn_key = _get_channel_connections_key(channel)
coll_name = _get_connection_column_name(connection_id)
update_expr = 'REMOVE {}'.format(coll_name)
table.update_item(
Key=conn_key,
UpdateExpression=update_expr,
)
def get_channels_list():
table = get_table()
row = table.get_item(Key=_get_channels_list_key())
return row.get('Item')['channels']
def get_user(connection_id):
table = get_table()
key = _get_user_key(connection_id)
row = table.get_item(Key=key)
return row.get('Item', key)
def update_channel_name(connection_id, name):
item = get_user(connection_id)
old_channel = item.get('channel_name', 'general')
delete_connection_id(connection_id, old_channel)
item['channel_name'] = name
table = get_table()
table.put_item(Item=item)
set_connection_id(connection_id, name)
def save_username(connection_id, name):
update_expr = 'SET username = :value'
table = get_table()
table.update_item(
Key=_get_user_key(connection_id),
UpdateExpression=update_expr,
ExpressionAttributeValues={':value': name},
)
def save_message(connection_id, epoch, message, channel='general'):
"""Save a message from a user"""
item = {
'pk': channel,
'epoch': epoch,
'connectionId': connection_id,
'channel': channel,
'message': message,
}
table = get_table()
table.put_item(Item=item)
def get_connected_connection_ids(channel='general'):
table = get_table()
key = _get_channel_connections_key(channel)
row = table.get_item(Key=key)
for key, payload in row.get('Item', {}).items():
if key.startswith('CONN'):
yield payload['connection_id']
def _get_channel_conn_pk(channel):
return '%s:::connections' % (channel, )
def _get_channel_message_pk(channel, connection_id):
return '%s:::%s' % (channel, connection_id)
def _get_channel_connections_key(channel):
pk = _get_channel_conn_pk(channel)
return {'pk': pk, 'epoch': 0}
def _get_channels_list_key():
return {'pk': 'channels', 'epoch': 0}
def _get_user_key(connection_id):
return {'pk': connection_id, 'epoch': 0}
def _get_connection_column_name(connection_id):
return 'CONN' + hashlib.md5(connection_id.encode()).hexdigest()[:16]
# Lambda stuff
def invoke_lambda_async(function_name, payload):
"""Invoke a Lambda function with an Event invocation type"""
_lambda = boto3.client('lambda')
return _lambda.invoke(
FunctionName=function_name,
Payload=safe_dumps(payload),
InvocationType='Event',
)