# Working with Streams

DynamoDB Streams captures a time-ordered sequence of item-level modifications in any DynamoDB table and stores this information in a log for up to 24 hours. Applications can access this log and view the data items as they appeared before and after they were modified, in near-real time.

The common usage is using lambda functions which are triggered by DynamoDB Streams and do further jobs in Lambda for various reasons.

Here, we will use DynamoDB Streams for game leader board to rank players in score order. Two tables will be created, one for user scoring history and the other for user total score. Threre are two known query patterns as follows.

- Show the user's scoring history up to 100 latest items
- Show the top 3 players who have the highest scores

We will use DynamoDB Streams and Lambda to update user's total score in near-real time.

The two tables are described here.

| Table/Index | Key/Attribute | Description | Sample |
| -- | -- | -- | -- |
| UserScoreEvents (Table) | UserId (PK) | User ID | harris, donald |
| | Timestamp (SK) | Timestamp when getting score | 2020-09-16 00:59:07.420452 |
| | Score (Attr) | Score the user gets | Any integer value between 0 and 9 |

| Table/Index | Key/Attribute | Description | Sample |
| -- | -- | -- | -- |
| Leaderboard (Table) | UserId (PK) | User ID | harris, donald |
| | Score (Attr) | Total score the user has gained | 58, 92 |
| | ShardNo (Attr) | For writing sharding | Any integer value between 0 and 9 |
| IndexScore (Index) | ShardNo (PK) | | |
| | Score (SK) | | |

> For write sharding, please see [this blog](https://aws.amazon.com/blogs/database/choosing-the-right-number-of-shards-for-your-large-scale-amazon-dynamodb-table/). This will be covered in DynamoDB Modeling session.

Firstly, let's create two tables and enable DynamoDB Streams for the source table `UserScoreEvents`.

In [1]:
# import and get dynamodb resource
import boto3
from boto3.dynamodb.conditions import Key, Attr
from botocore.exceptions import ClientError
from pprint import pprint, pformat
from decimal import Decimal
import time
import multiprocessing as mp
import csv
from datetime import datetime
import random
from collections import defaultdict
import heapq


dynamodb = boto3.resource('dynamodb')

In [2]:
# create UserScoreEvents table with DynamoDB Streams
user_score_events = dynamodb.create_table(
    TableName='UserScoreEvents',
    AttributeDefinitions=[
        {'AttributeName': 'UserId', 'AttributeType': 'S'},
        {'AttributeName': 'Timestamp', 'AttributeType': 'S'}
    ],
    KeySchema=[
        {'AttributeName': 'UserId', 'KeyType': 'HASH'},
        {'AttributeName': 'Timestamp', 'KeyType': 'RANGE'}
    ],
    BillingMode='PAY_PER_REQUEST',
    StreamSpecification={
        'StreamEnabled': True,
        'StreamViewType': 'NEW_IMAGE'
    }
)

user_score_events.wait_until_exists()

In [3]:
# create Leaderboard table and this is the target table of DynamoDB Streams
leaderboard = dynamodb.create_table(
    TableName='Leaderboard',
    AttributeDefinitions=[
        {'AttributeName': 'UserId', 'AttributeType': 'S'},
        {'AttributeName': 'Score', 'AttributeType': 'N'},
        {'AttributeName': 'ShardNo', 'AttributeType': 'N'},
    ],
    KeySchema=[
        {'AttributeName': 'UserId', 'KeyType': 'HASH'}
    ],
    GlobalSecondaryIndexes=[
        {
            'IndexName': 'IndexShardNoScore',
            'KeySchema': [
                {'AttributeName': 'ShardNo', 'KeyType': 'HASH'},
                {'AttributeName': 'Score', 'KeyType': 'RANGE'}
            ],
            'Projection': {
                'ProjectionType': 'ALL'
            }
        }
    ],
    BillingMode='PAY_PER_REQUEST'
)

leaderboard.wait_until_exists()

After that, we should create a Lambda function that is triggered by new items in the source table and update the user's total score in the target table. It has two steps.

- Create IAM role for Lambda
    - Role name: DynamoDBStreamReplicationExample
    - Included policies: AmazonDynamoDBFullAccess, CloudWatchLogsFullAccess
- Create Lambda function
    - Lambda function name: UpdateUserTotalScore
    - Runtime: Python 3.8
    - Role: DynamoDBStreamReplicationExample
   
After creating the Lambda function, we should make a trigger in the source table to invoke the Lambda function whenever new items are added. Go to DynamoDB service console and choose `UserScoreEvents` table. In `Triggers` tab, you can create a trigger and set the Lambda function.

Originally, lambda can be invoked only when someone call it. DynamoDB Streams stored all mutations of the base table, but doesn't call Lambda. Trigger bonds between Streams and Lambda, that invokes the given Lambda function whenever Streams gets a new record.

To see how events information is transferred to Lambda function, let's fill the code line of Lambda function as follows.

```python
import boto3


def lambda_handler(event, context):
    print(event)
```

After that, put one item in the `UserScoreEvents` table with the following command.

In [4]:
user_score_events.put_item(
    Item={
        'UserId': 'dongkyun',
        'Timestamp': str(datetime.now()),
        'Score': 1
    }
)

{'ResponseMetadata': {'RequestId': '9EDLMDSNVIKD4VBSV03E1I3SQ3VV4KQNSO5AEMVJF66Q9ASUAAJG',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'server': 'Server',
   'date': 'Tue, 06 Oct 2020 16:21:23 GMT',
   'content-type': 'application/x-amz-json-1.0',
   'content-length': '2',
   'connection': 'keep-alive',
   'x-amzn-requestid': '9EDLMDSNVIKD4VBSV03E1I3SQ3VV4KQNSO5AEMVJF66Q9ASUAAJG',
   'x-amz-crc32': '2745614147'},
  'RetryAttempts': 0}}

After that, check the Lambda execution log in CloudWatch. Here is the event value the Lambda takes.

```json
{
  "Records": [
    {
      "eventID": "1caffc48a12f1d1fbfa1ac33f027db95",
      "eventName": "INSERT",
      "eventVersion": "1.1",
      "eventSource": "aws:dynamodb",
      "awsRegion": "ap-northeast-2",
      "dynamodb": {
        "ApproximateCreationDateTime": 1600238837.0,
        "Keys": {
          "UserId": {
            "S": "dongkyun"
          },
          "Timestamp": {
            "S": "2020-09-16 06:47:17.079179"
          }
        },
        "NewImage": {
          "Score": {
            "N": "1"
          },
          "UserId": {
            "S": "dongkyun"
          },
          "Timestamp": {
            "S": "2020-09-16 06:47:17.079179"
          }
        },
        "SequenceNumber": "300000000014403348924",
        "SizeBytes": 105,
        "StreamViewType": "NEW_IMAGE"
      },
      "eventSourceARN": "arn:aws:dynamodb:ap-northeast-2:886100642687:table/UserScoreEvents/stream/2020-09-16T06:24:46.998"
    }
  ]
}
```

Now it's clear what we should do. From the event value, we should parse `UserId` and `Score` values. With the values, we should update `Leaderboard` table by adding the given score. So, the Lambda function code will look like this.

```python
import boto3


def lambda_handler(event, context):
    leaderboard = boto3.resource('dynamodb').Table('Leaderboard')
    
    for record in event['Records']:
        if 'NewImage' in record['dynamodb']:
            item = record['dynamodb']['NewImage']
            user_id = item['UserId']['S']
            shard_no = hash(user_id) % 10
            score = int(item['Score']['N'])
            
            leaderboard.update_item(
                Key={'UserId': user_id},
                UpdateExpression='ADD Score :score SET ShardNo = :shard_no',
                ExpressionAttributeValues={
                    ':score': score,
                    ':shard_no': shard_no
                }
            )
```

We're all done! Test with some sample inputs to see if it works well.

In [12]:
user_score_events.put_item(
    Item={
        'UserId': 'reed',
        'Timestamp': str(datetime.now()),
        'Score': 2
    }
)

user_score_events.put_item(
    Item={
        'UserId': 'reed',
        'Timestamp': str(datetime.now()),
        'Score': 5
    }
)

user_score_events.put_item(
    Item={
        'UserId': 'wesley',
        'Timestamp': str(datetime.now()),
        'Score': 8
    }
)

{'ResponseMetadata': {'RequestId': 'R6QG9J7D14DALG79B7VG9FO9BJVV4KQNSO5AEMVJF66Q9ASUAAJG',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'server': 'Server',
   'date': 'Tue, 06 Oct 2020 16:24:48 GMT',
   'content-type': 'application/x-amz-json-1.0',
   'content-length': '2',
   'connection': 'keep-alive',
   'x-amzn-requestid': 'R6QG9J7D14DALG79B7VG9FO9BJVV4KQNSO5AEMVJF66Q9ASUAAJG',
   'x-amz-crc32': '2745614147'},
  'RetryAttempts': 0}}

In [13]:
user_score_events.scan()

{'Items': [{'Score': Decimal('8'),
   'UserId': 'wesley',
   'Timestamp': '2020-10-06 16:24:48.573743'},
  {'Score': Decimal('2'),
   'UserId': 'reed',
   'Timestamp': '2020-10-06 16:24:48.557943'},
  {'Score': Decimal('5'),
   'UserId': 'reed',
   'Timestamp': '2020-10-06 16:24:48.566050'}],
 'Count': 3,
 'ScannedCount': 3,
 'ResponseMetadata': {'RequestId': '821HACE3GQ5EEPC5NR853G7IHRVV4KQNSO5AEMVJF66Q9ASUAAJG',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'server': 'Server',
   'date': 'Tue, 06 Oct 2020 16:24:50 GMT',
   'content-type': 'application/x-amz-json-1.0',
   'content-length': '307',
   'connection': 'keep-alive',
   'x-amzn-requestid': '821HACE3GQ5EEPC5NR853G7IHRVV4KQNSO5AEMVJF66Q9ASUAAJG',
   'x-amz-crc32': '2257209555'},
  'RetryAttempts': 0}}

In [14]:
leaderboard.scan()

{'Items': [{'Score': Decimal('8'),
   'ShardNo': Decimal('4'),
   'UserId': 'wesley'},
  {'Score': Decimal('7'), 'ShardNo': Decimal('6'), 'UserId': 'reed'}],
 'Count': 2,
 'ScannedCount': 2,
 'ResponseMetadata': {'RequestId': 'FI5PHBHC5V5VI2S8POQQBK3Q8FVV4KQNSO5AEMVJF66Q9ASUAAJG',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'server': 'Server',
   'date': 'Tue, 06 Oct 2020 16:24:51 GMT',
   'content-type': 'application/x-amz-json-1.0',
   'content-length': '164',
   'connection': 'keep-alive',
   'x-amzn-requestid': 'FI5PHBHC5V5VI2S8POQQBK3Q8FVV4KQNSO5AEMVJF66Q9ASUAAJG',
   'x-amz-crc32': '3076067450'},
  'RetryAttempts': 0}}

It is perfectly working with the sample data. Let's clean up the table and simulate the real case.

In [16]:
# delete all items in the table
user_score_events.delete_item(
    Key={'UserId': 'wesley', 'Timestamp': '2020-09-16 00:59:07.428408'}
)
user_score_events.delete_item(
    Key={'UserId': 'reed', 'Timestamp': '2020-09-16 00:59:07.377314'}
)
user_score_events.delete_item(
    Key={'UserId': 'reed', 'Timestamp': '2020-09-16 00:59:07.420452'}
)
leaderboard.delete_item(
    Key={'UserId': 'reed'}
)
leaderboard.delete_item(
    Key={'UserId': 'wesley'}
)

{'ResponseMetadata': {'RequestId': 'H8PS7KM9ND499B0VF08UPHMDMJVV4KQNSO5AEMVJF66Q9ASUAAJG',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'server': 'Server',
   'date': 'Tue, 06 Oct 2020 16:25:55 GMT',
   'content-type': 'application/x-amz-json-1.0',
   'content-length': '2',
   'connection': 'keep-alive',
   'x-amzn-requestid': 'H8PS7KM9ND499B0VF08UPHMDMJVV4KQNSO5AEMVJF66Q9ASUAAJG',
   'x-amz-crc32': '2745614147'},
  'RetryAttempts': 0}}

In [17]:
# prepare functions to simulate a game user
def simulate_user(user_id, duration=10, max_score=10):
    finish_time = time.time() + duration
    items = []

    while time.time() <= finish_time:
        timestamp = str(datetime.now())
        score = random.randrange(max_score)

        item = {'UserId': user_id, 'Timestamp': timestamp, 'Score': score}
        items.append(item)

        print('{}: User [{:^10}] gets {} score'.format(timestamp, user_id, score))

        user_score_events.put_item(Item=item)
        time.sleep(random.uniform(0, 5))

    return {user_id: items}

user_scores = {}

def gather_user_scores(result):
    global user_scores

    user_scores.update(result)

In [19]:
# it will put user dongkyun's game score for 10 seconds with random gap
simulate_user('dongkyun', 10, 10)

2020-10-06 16:27:08.959560: User [ dongkyun ] gets 6 score
2020-10-06 16:27:13.851667: User [ dongkyun ] gets 2 score
2020-10-06 16:27:14.108918: User [ dongkyun ] gets 8 score
2020-10-06 16:27:14.729089: User [ dongkyun ] gets 8 score
2020-10-06 16:27:17.352352: User [ dongkyun ] gets 7 score


{'dongkyun': [{'UserId': 'dongkyun',
   'Timestamp': '2020-10-06 16:27:08.959560',
   'Score': 6},
  {'UserId': 'dongkyun',
   'Timestamp': '2020-10-06 16:27:13.851667',
   'Score': 2},
  {'UserId': 'dongkyun',
   'Timestamp': '2020-10-06 16:27:14.108918',
   'Score': 8},
  {'UserId': 'dongkyun',
   'Timestamp': '2020-10-06 16:27:14.729089',
   'Score': 8},
  {'UserId': 'dongkyun',
   'Timestamp': '2020-10-06 16:27:17.352352',
   'Score': 7}]}

In [20]:
# with 10 game users, let's see the architecture we built runs well
user_ids = ['marah', 'teegan', 'quynn', 'kuame', 'reed', 'alyssa', 'wesley', 'mariam', 'macaulay', 'maggy']
user_scores = {}

test_duration_in_seconds = 60

with mp.Pool(processes=len(user_ids)) as pool:
    for user_id in user_ids:
        pool.apply_async(simulate_user, (user_id, test_duration_in_seconds), callback=gather_user_scores)
    
    pool.close()
    pool.join()

2020-10-06 16:29:04.682309: User [  marah   ] gets 4 score
2020-10-06 16:29:04.682641: User [  quynn   ] gets 7 score
2020-10-06 16:29:04.683098: User [  alyssa  ] gets 7 score
2020-10-06 16:29:04.682957: User [   reed   ] gets 0 score
2020-10-06 16:29:04.683232: User [  wesley  ] gets 5 score
2020-10-06 16:29:04.683370: User [  mariam  ] gets 2 score
2020-10-06 16:29:04.682793: User [  kuame   ] gets 8 score
2020-10-06 16:29:04.682461: User [  teegan  ] gets 4 score
2020-10-06 16:29:04.684618: User [ macaulay ] gets 9 score
2020-10-06 16:29:04.684688: User [  maggy   ] gets 2 score
2020-10-06 16:29:04.822050: User [  marah   ] gets 1 score
2020-10-06 16:29:05.114522: User [  quynn   ] gets 4 score
2020-10-06 16:29:05.521620: User [  mariam  ] gets 1 score
2020-10-06 16:29:05.759091: User [  maggy   ] gets 0 score
2020-10-06 16:29:06.554023: User [  maggy   ] gets 4 score
2020-10-06 16:29:07.333354: User [  quynn   ] gets 0 score
2020-10-06 16:29:07.767107: User [  teegan  ] gets 7 sco

In [21]:
# check the data in Leaderboard table is correct
for user_id, items in user_scores.items():
    python_score = sum([item['Score'] for item in items])
    dynamodb_score = int(leaderboard.get_item(Key={'UserId': user_id})['Item']['Score'])
    
    print('User [{:^10}] Score: Python ({}), DynamoDB ({})'.format(user_id, python_score, dynamodb_score))

User [   reed   ] Score: Python (101), DynamoDB (101)
User [  maggy   ] Score: Python (93), DynamoDB (93)
User [ macaulay ] Score: Python (99), DynamoDB (99)
User [  kuame   ] Score: Python (102), DynamoDB (102)
User [  mariam  ] Score: Python (125), DynamoDB (125)
User [  quynn   ] Score: Python (129), DynamoDB (129)
User [  teegan  ] Score: Python (104), DynamoDB (104)
User [  marah   ] Score: Python (114), DynamoDB (114)
User [  alyssa  ] Score: Python (97), DynamoDB (97)
User [  wesley  ] Score: Python (103), DynamoDB (103)


Finally we're done! We can satisfy the query patterns mentioned in the beginning as follows.

In [22]:
# Return user's score history up to 10 items sorted timestamp descending
response = user_score_events.query(
    KeyConditionExpression=Key('UserId').eq('alyssa'),
    ScanIndexForward=False,
    Limit=10
)

pprint(response)

{'Count': 10,
 'Items': [{'Score': Decimal('3'),
            'Timestamp': '2020-10-06 16:30:02.195507',
            'UserId': 'alyssa'},
           {'Score': Decimal('1'),
            'Timestamp': '2020-10-06 16:30:01.203245',
            'UserId': 'alyssa'},
           {'Score': Decimal('6'),
            'Timestamp': '2020-10-06 16:30:00.852236',
            'UserId': 'alyssa'},
           {'Score': Decimal('2'),
            'Timestamp': '2020-10-06 16:29:58.441495',
            'UserId': 'alyssa'},
           {'Score': Decimal('0'),
            'Timestamp': '2020-10-06 16:29:56.754587',
            'UserId': 'alyssa'},
           {'Score': Decimal('2'),
            'Timestamp': '2020-10-06 16:29:54.203929',
            'UserId': 'alyssa'},
           {'Score': Decimal('4'),
            'Timestamp': '2020-10-06 16:29:50.446607',
            'UserId': 'alyssa'},
           {'Score': Decimal('4'),
            'Timestamp': '2020-10-06 16:29:48.457796',
            'UserId': 'alyssa'},
  

In [23]:
# Return the latest user total score from 1st to 5rd ordered by score descending
responses = []

for shard_no in range(10):
    response = leaderboard.query(
        IndexName='IndexShardNoScore',
        KeyConditionExpression=Key('ShardNo').eq(shard_no),
        ScanIndexForward=False,
        Limit=5
    )
    if response['Items']:
        for item in response['Items']:
            responses.append({
                'score': int(item['Score']),
                'user_id': item['UserId']
            })

heapq.nlargest(5, responses, key=lambda item: item['score'])

[{'score': 129, 'user_id': 'quynn'},
 {'score': 125, 'user_id': 'mariam'},
 {'score': 114, 'user_id': 'marah'},
 {'score': 104, 'user_id': 'teegan'},
 {'score': 103, 'user_id': 'wesley'}]