## Introduction to DynamoDB

In this notebook we provide an introduction to DynamoDB by doing the following:

- Create a new DynamoDB table and expose a DynamoDB stream
- Put items in the table.
- Retrieve data with Query and Scan
- Update and Conditionally update Items
- Review the stream for the sequence of changes to the table


In [1]:
chosen_region = 'us-east-2'

In [None]:
import boto3
import json
from botocore.exceptions import ClientError
from boto3.dynamodb.conditions import Key, Attr
import random
random.seed(5)

You can create tables using API calls. However, unless this is a temporary table, most likley you'll be using the AWS CLI or some automation like CloudFormation to create resources. The code below creates a table where the primary key is the combination of Author and Title.

In [None]:

# Getting Resource API object for DynamoDB service
dynamodb = boto3.resource('dynamodb', region_name=chosen_region)

# Lets create table that uses a compound key AND exposes a dynamodb stream
# Keys must be STRING, NUMBER, or BINARY

try:
    print("Creating table...")
    table = dynamodb.create_table(
        TableName='books',
        KeySchema=[
            {
                'AttributeName': 'Author',
                'KeyType': 'HASH' # ie Partion Key
            },
            {
                'AttributeName': 'Title',
                'KeyType': 'RANGE' # ie Sort Key
            }
        ],
        AttributeDefinitions=[
            {
                'AttributeName': 'Author',
                'AttributeType': 'S'
            },
            {
                'AttributeName': 'Title',
                'AttributeType': 'S'
            },

        ],
        ProvisionedThroughput={
            'ReadCapacityUnits': 5,
            'WriteCapacityUnits': 5
        },
        StreamSpecification={
        'StreamEnabled': True,
        'StreamViewType': 'NEW_AND_OLD_IMAGES'
    }
    )
    
    print("Waiting for table to become active...")
    table.wait_until_exists()
    print("Table ready!")
except ClientError as ce:
    if ce.response['Error']['Code'] == 'ResourceInUseException':
        print("Table already exists. Nothing to do.")
    else:
        print(ce)
     

It's much more common to connect to an existing table using code. Here is how you get a client to connect to an existing table.

In [None]:
import boto3
import time
dynamodb = boto3.resource('dynamodb', region_name=chosen_region)
print("Connecting to existing table")
table = dynamodb.Table('books')



DynamoDB tables can operate in a purely on-demand fashion such that Dynamo handles the scaling of the table. This is good for tables with unpredictable usage.

In [None]:
# Switching to on-demand mode from provisioned throughput (can do 1x per day)
if ((not table.billing_mode_summary) or (table.billing_mode_summary['BillingMode'] != 'PAY_PER_REQUEST')):
    table.update(BillingMode='PAY_PER_REQUEST') # or 'PROVISIONED'
else:
    print('Nothing to do. Billing mode is already on-demand.')

In [None]:
# Put a single item into the table
table.put_item(Item= { 'Author': 'Heiwad', 'Title': 'The Guide to Travel and Crab Cakes', 'Year': 2022})

In [None]:
# Conditional Put to Avoid Clobbering Existing Value
# It's avoiding the clobbering the same title only for the current hash key. Run it 2x to get an exception
try:
    table.put_item(Item= { 'Author': 'Jill',
               'Title': 'Ice Cream Part 2'},
           ConditionExpression='attribute_not_exists(Title)')
except ClientError as ce:
    if ce.response['Error']['Code'] == 'ConditionalCheckFailedException':
        print('The item already exists. PutItem Canceled. ')
    

You can add new non-key attributes as needed.

In [None]:
table.put_item(Item= { 'Author': 'Mary Wollstonecraft Shelley', 'Title': 'Frankenstein', 'Year':1997})

In [None]:
import json
# Load data from local file
print("Loading data from Local JSON file")
books = []
with open("books.json") as f:
    books = json.load(f)
print ("Loaded {0} records from file".format(str(len(books))))    

#Show a sample JSON record
books[0] 

In [None]:
#Batch write of records to database
print("Batch writing records to table")
with table.batch_writer() as batch:

    for book in books:   
        # Set the Keys
        item = {'Author': book['Author'],
                'Title': book['Title']}

        # Load in optional attributes
        if 'Editions' in book:
            item['Editions'] = set(book['Editions']) # Make sure 'Editions' is loaded as a String Set
        if 'Rating' in book:
            item['Rating'] = book['Rating']

        batch.put_item(
            Item= item
        )
   
    

print("Done loading table")

In [None]:
# Get a Specific Item by fully specified Key
res = table.get_item(Key={
    'Author' : 'H.G. Wells',
    'Title' : 'The Invisible Man'
})

res['Item']

In [None]:
#Query by primary Key to quickly find any item in the datbase
res = table.query(KeyConditionExpression=Key('Author').eq("H.G. Wells"))

print ('Query Results')
print('Count: ' + str(res['Count']))
print('Scanned: ' + str(res['ScannedCount']))
res['Items']


In [None]:
#Query by primary Key to quickly find any item in the datbase
res = table.query(KeyConditionExpression=Key('Author').eq("H.G. Wells"),
                  FilterExpression=Attr('Editions').contains('Kindle'))

print ('Query Results')
print('Count: ' + str(res['Count']))
print('Scanned: ' + str(res['ScannedCount']))
res['Items']


In [None]:
# Scan compares items in all partitions of the table to your filter expression.
# This is less efficient than query but gives you maximum flexibility to find items by any attribute.
res = table.scan( 
    FilterExpression=Attr('Editions')
    .contains('Kindle') & Attr('Rating').gte(5)#,
    #ProjectionExpression= "Title, Editions, Rating",
)


print ('Query Results')
print('Count: ' + str(res['Count']))
print('Scanned: ' + str(res['ScannedCount']))
res['Items']

In [None]:
## Update an Item
response = table.get_item(
    Key={
        'Title': 'Moby Dick',
        'Author': 'Herman Melville'
    },
    ConsistentRead = True # Changing Read Consistency
)
item = response['Item']
print("Pre-update version of item")
print(item)


response = table.update_item(
    Key={
        'Title': 'Moby Dick',
        'Author': 'Herman Melville'
    },
    UpdateExpression='SET Rating = :rating, Version=:ver',
    ExpressionAttributeValues={
        ':rating': 5,
        ':ver' : 2
    },
    ReturnValues = "ALL_NEW" # Reading back the updated item
)

updated_item = response['Attributes']
print("Post-update version of item")
print(updated_item)


In [None]:
# What if another thread tries to the item?
# Use Conditional Update for Optimistic Concurrency Control

response = table.update_item(
    Key={
        'Title': 'Moby Dick',
        'Author': 'Herman Melville'
    },
    UpdateExpression='SET #score = :rating, Version = :newver',
    ConditionExpression="Version = :ver",
    ExpressionAttributeNames={ "#score": "Rating" },
    ExpressionAttributeValues={
        ':rating': 1,
        ':ver': 2,
        ':newver': 3
    },
    ReturnValues= "ALL_NEW"
)

updated_item = response['Attributes']
print(updated_item)


In [None]:
# Run the same conditional update again- THIS TIME IT IS EXPECTED TO THROW AN EXCEPTION

try:
    response = table.update_item(
        Key={
            'Title': 'Moby Dick',
            'Author': 'Herman Melville'
        },
        UpdateExpression='SET #score = :rating, Version = :newver',
        ConditionExpression="Version = :ver",
        ExpressionAttributeNames={ "#score": "Rating" },
        ExpressionAttributeValues={
            ':rating': 1,
            ':ver': 2,
            ':newver': 3
        },
        ReturnValues= "ALL_NEW"
    )

    updated_item = response['Attributes']
    print(updated_item)
except ClientError as ce:
    if ce.response['Error']['Code'] == 'ConditionalCheckFailedException':
        # Put your retry code here
        print("Exception Condition Triggered as expected")
        print ("Version attribute is not the expected value ... the item must have been updated")
    else:
        print(ce)


## Multi-Item Transaction

In [None]:
client = table.meta.client
# Multi-Item Write Transaction - Supports ConditionCheck, Put, Update, and Delete from different tables
client.transact_write_items(TransactItems=[{
    'ConditionCheck': {
        'TableName': 'books',
        'Key': {
            'Title': 'Moby Dick',
            'Author': 'Herman Melville'
        },
        'ConditionExpression': 'Version = :ver',
        'ExpressionAttributeValues': { ':ver': 3}
    }

},
{
    'Put' : {
        'TableName': 'books',
        'Item': {
            'Title': 'Tales from the bottom of the Sea',
            'Author': 'Captain Ahab',
            'Rating': 3
        },

    }
},{
    'Put': {
        'TableName': 'books',
        'Item': {
            'Title': 'Winning',
            'Author': 'The Whale',
            'Rating': 5
        },

    }
}])


### DynamoDB Stream

Let's explore the stream for this table

In [None]:
# Get list of streams for DynamoDB in for 'books' table

import boto3
ddbstreams = boto3.client('dynamodbstreams',region_name=chosen_region)
streams = ddbstreams.list_streams(TableName='books') 
streams

In [None]:
# Describe the Stream
stream = streams['Streams'][0]['StreamArn']
stream_description = ddbstreams.describe_stream(StreamArn=stream,Limit=100)
stream_description

In [None]:
# Pick one of the shards from the description
shardId = stream_description['StreamDescription']['Shards'][0]['ShardId']
shardId

In [None]:
get_iterator_res = ddbstreams.get_shard_iterator(
    StreamArn=stream,
    ShardId=shardId,
    ShardIteratorType='TRIM_HORIZON')

iterator = get_iterator_res['ShardIterator']
iterator

In [None]:
get_records_res = ddbstreams.get_records(ShardIterator=iterator)

next_iterator = get_records_res['NextShardIterator']
records = get_records_res['Records']


In [None]:
# Print each change to the Table out from the current shard of the stream
# In a production table you will likely have multiple shards.

if (len(records) > 0):
    table_fmt = "{:<12} {:<30} {:<20}"
    print (table_fmt.format('Event','Author','Title'))
    for record in records:
        eventName = record['eventName']
        author = record['dynamodb']['Keys']['Author']['S']
        title = record['dynamodb']['Keys']['Title']['S']
        print (table_fmt.format(eventName, author, title))
else:
    print("No records found on this iteration")


In [None]:
# Let's read through ALL the available shards in the order they were listed (not necessarily in order events)

streamArn = ddbstreams.list_streams(Limit=100)['Streams'][0]['StreamArn']
for shard in ddbstreams.describe_stream(StreamArn=streamArn)['StreamDescription']['Shards']:
    shardId = shard['ShardId']
    
    iterator = ddbstreams.get_shard_iterator(StreamArn=streamArn,
                                 ShardId=shardId,
                                 ShardIteratorType='TRIM_HORIZON')['ShardIterator']
    
    get_records_res = ddbstreams.get_records(ShardIterator=iterator)
    print("Checking " + shardId)
    loop = True
    count = 0
    while loop:

        # With 'Trim Horizon' you may still get some cases where no records are returned for the first few reads
        records = get_records_res['Records']
        if (len(records) > 0):
            print('    Found records on iteration number: ' + str(count))
            for record in get_records_res['Records']:
                eventName = record['eventName']
                author = record['dynamodb']['Keys']['Author']['S']
                title = record['dynamodb']['Keys']['Title']['S']
                print ('    ' + eventName + ' - ' + author + ' - ' + title)

       
        # Check to see if there is more data in the shard.
        if 'NextShardIterator' in get_records_res:
            count = count + 1
            next_iterator = get_records_res['NextShardIterator']
            get_records_res = ddbstreams.get_records(ShardIterator=next_iterator)
            
            # For DEMO - an ACTIVE shard will keep returning a next iterator indefinitely
            # Setting MAX iterations so demo will exit
            if count > 150:
                print('    Shard still is still active but reached polling limit for demo.')
                loop = False

        # No Next Iterator present - end the loop
        else:
            print('    Iteration: ' + str(count) + ' - No next iterator found. Shard is closed.')
            loop = False

    
print("Done!") 

In [None]:
# Cleaning up by deleting table
print("Deleting table...")
try:
    table.delete()
    print("Waiting for table to be deleted...")
    table.wait_until_not_exists()
    print("Table deleted!")
except ClientError as ce:
    if ce.response['Error']['Code'] == 'ResourceNotFoundException':
        print("No table found. Nothing to do.")
