# DynamoDB Import/Export Notebook

Author: everett@bayesimpact.org

Temporary solution to our need to back up (and possibly restore) dynamodb tables. Change the settings in the next cell, and then run the entire notebook.

Good documentation on boto3, the python library for AWS APIs.
* [boto3.resource](http://boto3.readthedocs.io/en/latest/guide/dynamodb.html) (which you should use whenever possible)
* [boto3.client](http://boto3.readthedocs.io/en/latest/reference/services/dynamodb.html) (a more complete, but lower level API)

## Setup: Configure this block, then run the whole notebook

In [1]:
# Set either RUN_IMPORT or RUN_EXPORT to true.
# Here we set both for the sake of example to see
# them both in action.
RUN_EXPORT = False
EXPORT_FILENAME = "dynamodb.pickle"

RUN_IMPORT = False
IMPORT_FILENAME = "dynamodb.pickle"
IMPORT_TABLES_ADD_PREFIX = ""

# AWS keys and DynamoDB region
# If these are left blank, we'll try to find environment
# variables by the same name and use those.
AWS_ACCESS_KEY_ID = ''
AWS_SECRET_ACCESS_KEY = ''
AWS_REGION = ''

# If this is true, we connect to a dynamodb-local instance
# running on port 8000. The AWS keys above are then
# irrelevant, unless you changed it to not run as a --sharedDb.
USE_DYNAMODB_LOCAL = True

In [2]:
print("Execution plan:")
db_type = "local" if USE_DYNAMODB_LOCAL else "remote (%s)" % AWS_REGION
if RUN_EXPORT:
    print("* Exporting data from %s dynamodb to %s" % (db_type, EXPORT_FILENAME))
if RUN_IMPORT:
    print("* Importing data from %s to %s dynamodb" % (IMPORT_FILENAME, db_type))
if not (RUN_IMPORT or RUN_EXPORT):
    raise ValueError("Nothing to do! Please set RUN_EXPORT or RUN_IMPORT to True")

Execution plan:
* Exporting data from local dynamodb to dynamodb.pickle
* Importing data from dynamodb.pickle to local dynamodb


In [3]:
import boto3
import copy
import datetime
import json
import os
import pickle

## Helper functions

In [4]:
global client, resource

def set_local_db():
    global client, resource
    # We are running in a container. The local dynamodb is attached
    # to port 8000 of the host, so we need to connect to the host IP
    # as seen by the container.
    docker_host_ip = !ip route show | awk '/default/ {print $3}'
    docker_host_ip = docker_host_ip[0]
    args = {
        'endpoint_url': 'http://' + docker_host_ip + ':8000',
        'aws_access_key_id': 'foo',
        'aws_secret_access_key': 'bar',
        'region_name': 'baz'
    }
    client = boto3.client('dynamodb', **args)
    resource = boto3.resource('dynamodb', **args)

def set_client(access=AWS_ACCESS_KEY_ID, secret=AWS_SECRET_ACCESS_KEY, region=AWS_REGION):
    global client, resource
    args = {
        'aws_access_key_id': access or os.environ['AWS_ACCESS_KEY_ID'],
        'aws_secret_access_key': secret or os.environ['AWS_SECRET_ACCESS_KEY'],
        'region_name': region or os.environ['AWS_REGION']
    }
    client = boto3.client('dynamodb', **args)
    resource = boto3.resource('dynamodb', **args)

In [5]:
def scan_all_items(table_name, index_name=None):
    items = []
    start_key = None
    table = resource.Table(table_name)
    while True:
        args = {}
        if index_name:
            args['IndexName'] = index_name
        if start_key:
            args['ExclusiveStartKey'] = start_key
        response = table.scan(**args)
        items.extend(response['Items'])
        start_key = response.get('LastEvaluatedKey', None)
        if start_key is None:
            break
        print("Paginated, %d items so far" % len(items))
    return items

In [6]:
def clean_up_table_schema(schema, rename_prefix=None):
    schema = copy.deepcopy(schema)
    index_names = ['GlobalSecondaryIndexes', 'LocalSecondaryIndexes']
    table_keys = ['AttributeDefinitions', 'KeySchema', 'ProvisionedThroughput',
                 'TableName'] + index_names
    index_keys = ['IndexName', 'KeySchema', 'Projection', 'ProvisionedThroughput']
    if rename_prefix:
        schema['TableName'] = rename_prefix + schema['TableName']
        for name in index_names:
            for index in schema.get(name, []):
                index['IndexName'] = rename_prefix + index['IndexName']
    
    # Clean up indices
    for name in index_names:
        for index in schema.get(name, []):
            keys = list(index.keys())
            for k in keys:
                if k not in index_keys:
                    del index[k]
            index['ProvisionedThroughput'] = {
                'ReadCapacityUnits': index['ProvisionedThroughput']['ReadCapacityUnits'],
                'WriteCapacityUnits': index['ProvisionedThroughput']['WriteCapacityUnits'],
            }

    # Clean up table
    keys = list(schema.keys())
    for k in keys:
        if k not in table_keys:
            del schema[k]
    schema['ProvisionedThroughput'] = {
        'ReadCapacityUnits': schema['ProvisionedThroughput']['ReadCapacityUnits'],
        'WriteCapacityUnits': schema['ProvisionedThroughput']['WriteCapacityUnits'],
    }
    return schema

In [7]:
def get_table_data(name):
    result = {}
    result['Items'] = scan_all_items(name)
    result['Table'] = client.describe_table(TableName=name)['Table']
    return result

In [8]:
def get_all_table_data(table_prefix=''):
    data = {}
    table_names = [t.name for t in resource.tables.all() if t.name.startswith(table_prefix)]
    for name in table_names:
        data[name] = get_table_data(name)
        describe_table_data(data[name])
    return data

In [9]:
def describe_table_data(table_data):
    print("%40s:%8d items %8.0f kB" % (
            table_data['Table']['TableName'],
            len(table_data['Items']),
            table_data['Table']['TableSizeBytes'] / 1024))

## Task 0: Establish a connection and make sure it works

In [10]:
set_local_db() if USE_DYNAMODB_LOCAL else set_client()
for t in resource.tables.all():
    print("Found table", t.name)

Found table ursus_auditentries
Found table ursus_changedfields
Found table ursus_events
Found table ursus_feedbacks
Found table ursus_general_infos
Found table ursus_globalstates
Found table ursus_incidents
Found table ursus_involved_civilians
Found table ursus_involved_officers
Found table ursus_screeners
Found table ursus_users
Found table ursus_visits


## Task 1: Export an entire database

In [11]:
if RUN_EXPORT:
    data = get_all_table_data(table_prefix='')
    with open('dynamodb.pickle' , "wb") as f:
        pickle.dump(data, f)
    print("Saved %d bytes to %s" % (os.stat(EXPORT_FILENAME).st_size, EXPORT_FILENAME))

                      ursus_auditentries:       0 items        0 kB
                     ursus_changedfields:       0 items        0 kB
                            ursus_events:       2 items        0 kB
                         ursus_feedbacks:       0 items        0 kB
                     ursus_general_infos:       0 items        0 kB
                      ursus_globalstates:       1 items        0 kB
                         ursus_incidents:       0 items        0 kB
                ursus_involved_civilians:       0 items        0 kB
                 ursus_involved_officers:       0 items        0 kB
                         ursus_screeners:       0 items        0 kB
                             ursus_users:       1 items        0 kB
                            ursus_visits:       1 items        0 kB
Saved 9692 bytes to dynamodb.pickle


## Task 2 - import from pickled export (task 1)

In [12]:
if RUN_IMPORT:
    print("Loading %d bytes of pickled data from %s" % (
          os.stat(IMPORT_FILENAME).st_size, IMPORT_FILENAME))
    data = None
    with open(IMPORT_FILENAME, "rb") as f:
        data = pickle.load(f)

    print("Loaded data for %d tables" % len(data))
    for table_name, table_data in data.items():
        describe_table_data(table_data)

    print("Importing...")
    existing_table_names = set([t.name for t in resource.tables.all()])
    for table_name, info in data.items():
        new_name = IMPORT_TABLES_ADD_PREFIX + table_name
        print("** Importing into table %s **" % new_name)

        # Delete table if it already exists
        if new_name in existing_table_names:
            print(new_name, "already exists, deleting it first and starting afresh.")
            client.delete_table(TableName=new_name)
            print("Delete operation sent. Waiting until table no longer exists.")
            resource.Table(new_name).wait_until_not_exists()

        # Create a new table
        schema = clean_up_table_schema(info["Table"], rename_prefix=IMPORT_TABLES_ADD_PREFIX)
        items = info["Items"]
        client.create_table(**schema)
        print("Create operation sent. Waiting until table exists.")
        tbl = resource.Table(new_name)
        tbl.wait_until_exists()
        if tbl.global_secondary_indexes:
            for x in tbl.global_secondary_indexes:
                print(x["IndexStatus"], "index:", x["KeySchema"][0]["AttributeName"])


        # Populate
        print("Populating %s with %d items" % (IMPORT_TABLES_ADD_PREFIX + table_name, len(items)))
        table = resource.Table(IMPORT_TABLES_ADD_PREFIX + table_name)
        with table.batch_writer() as batch:
            for i in data[table_name]["Items"]:
                batch.put_item(Item=i)
        print("Done.")

    print("---- Data import complete")

Loading 9692 bytes of pickled data from dynamodb.pickle
Loaded data for 12 tables
                            ursus_events:       2 items        0 kB
                      ursus_auditentries:       0 items        0 kB
                 ursus_involved_officers:       0 items        0 kB
                ursus_involved_civilians:       0 items        0 kB
                         ursus_feedbacks:       0 items        0 kB
                     ursus_changedfields:       0 items        0 kB
                             ursus_users:       1 items        0 kB
                         ursus_screeners:       0 items        0 kB
                     ursus_general_infos:       0 items        0 kB
                            ursus_visits:       1 items        0 kB
                         ursus_incidents:       0 items        0 kB
                      ursus_globalstates:       1 items        0 kB
Importing...
** Importing into table ursus_events **
ursus_events already exists, deleting it first an