Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 36 additions & 23 deletions dynamodump.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
JSON_INDENT = 2
AWS_SLEEP_INTERVAL = 10 # seconds
LOCAL_SLEEP_INTERVAL = 1 # seconds
BATCH_WRITE_SLEEP_INTERVAL = 0.15 # seconds
MAX_BATCH_WRITE = 25 # DynamoDB limit
SCHEMA_FILE = "schema.json"
DATA_DIR = "data"
Expand Down Expand Up @@ -147,16 +148,18 @@ def mkdir_p(path):
def batch_write(conn, sleep_interval, table_name, put_requests):
request_items = {table_name: put_requests}
i = 1
sleep = sleep_interval
while True:
response = conn.batch_write_item(request_items)
unprocessed_items = response["UnprocessedItems"]

if len(unprocessed_items) == 0:
break

if len(unprocessed_items) > 0 and i <= MAX_RETRY:
logging.debug(str(len(unprocessed_items)) + " unprocessed items, retrying.. [" + str(i) + "]")
logging.debug(str(len(unprocessed_items)) + " unprocessed items, retrying after %s seconds.. [%s/%s]" % (str(sleep), str(i), str(MAX_RETRY)))
request_items = unprocessed_items
time.sleep(sleep)
sleep += sleep_interval
i += 1
else:
logging.info("Max retries reached, failed to processed batch write: " + json.dumps(unprocessed_items,
Expand Down Expand Up @@ -321,26 +324,27 @@ def do_restore(conn, sleep_interval, source_table, destination_table, write_capa
table_local_secondary_indexes = table.get("LocalSecondaryIndexes")
table_global_secondary_indexes = table.get("GlobalSecondaryIndexes")

if not args.dataOnly:
# override table write capacity if specified, else use RESTORE_WRITE_CAPACITY if original write capacity is lower
if write_capacity is None:
if original_write_capacity < RESTORE_WRITE_CAPACITY:
write_capacity = RESTORE_WRITE_CAPACITY
else:
write_capacity = original_write_capacity
# override table write capacity if specified, else use RESTORE_WRITE_CAPACITY if original write capacity is lower
if write_capacity is None:
if original_write_capacity < RESTORE_WRITE_CAPACITY:
write_capacity = RESTORE_WRITE_CAPACITY
else:
write_capacity = original_write_capacity

# override GSI write capacities if specified, else use RESTORE_WRITE_CAPACITY if original write capacity is lower
original_gsi_write_capacities = []
if table_global_secondary_indexes is not None:
for gsi in table_global_secondary_indexes:
original_gsi_write_capacities.append(gsi["ProvisionedThroughput"]["WriteCapacityUnits"])

# override GSI write capacities if specified, else use RESTORE_WRITE_CAPACITY if original write capacity is lower
original_gsi_write_capacities = []
if table_global_secondary_indexes is not None:
for gsi in table_global_secondary_indexes:
original_gsi_write_capacities.append(gsi["ProvisionedThroughput"]["WriteCapacityUnits"])
if gsi["ProvisionedThroughput"]["WriteCapacityUnits"] < int(write_capacity):
gsi["ProvisionedThroughput"]["WriteCapacityUnits"] = int(write_capacity)

if gsi["ProvisionedThroughput"]["WriteCapacityUnits"] < int(write_capacity):
gsi["ProvisionedThroughput"]["WriteCapacityUnits"] = int(write_capacity)
# temp provisioned throughput for restore
table_provisioned_throughput = {"ReadCapacityUnits": int(original_read_capacity),
"WriteCapacityUnits": int(write_capacity)}

# temp provisioned throughput for restore
table_provisioned_throughput = {"ReadCapacityUnits": int(original_read_capacity),
"WriteCapacityUnits": int(write_capacity)}
if not args.dataOnly:

logging.info("Creating " + destination_table + " table with temp write capacity of " + str(write_capacity))

Expand All @@ -363,6 +367,11 @@ def do_restore(conn, sleep_interval, source_table, destination_table, write_capa

# wait for table creation completion
wait_for_active_table(conn, destination_table, "created")
else:
# update provisioned capacity
if int(write_capacity) > original_write_capacity:
update_provisioned_throughput(conn, destination_table, original_read_capacity, write_capacity,
False)

if not args.schemaOnly:
# read data files
Expand All @@ -384,16 +393,16 @@ def do_restore(conn, sleep_interval, source_table, destination_table, write_capa
# flush every MAX_BATCH_WRITE
if len(put_requests) == MAX_BATCH_WRITE:
logging.debug("Writing next " + str(MAX_BATCH_WRITE) + " items to " + destination_table + "..")
batch_write(conn, sleep_interval, destination_table, put_requests)
batch_write(conn, BATCH_WRITE_SLEEP_INTERVAL, destination_table, put_requests)
del put_requests[:]

# flush remainder
if len(put_requests) > 0:
batch_write(conn, sleep_interval, destination_table, put_requests)
batch_write(conn, BATCH_WRITE_SLEEP_INTERVAL, destination_table, put_requests)

if not args.dataOnly and not args.skipThroughputUpdate:
if not args.skipThroughputUpdate:
# revert to original table write capacity if it has been modified
if write_capacity != original_write_capacity:
if int(write_capacity) != original_write_capacity:
update_provisioned_throughput(conn, destination_table, original_read_capacity, original_write_capacity,
False)

Expand Down Expand Up @@ -424,11 +433,15 @@ def do_restore(conn, sleep_interval, source_table, destination_table, write_capa
"Control plane limit exceeded, retrying updating throughput of GlobalSecondaryIndexes in " + destination_table + "..")
time.sleep(sleep_interval)

# wait for table to become active
wait_for_active_table(conn, destination_table, "active")

logging.info("Restore for " + source_table + " to " + destination_table + " table completed. Time taken: " + str(
datetime.datetime.now().replace(microsecond=0) - start_time))
else:
logging.info("Empty schema of " + source_table + " table created. Time taken: " + str(datetime.datetime.now().replace(microsecond=0) - start_time))


# parse args
parser = argparse.ArgumentParser(description="Simple DynamoDB backup/restore/empty.")
parser.add_argument("-m", "--mode", help="'backup' or 'restore' or 'empty'")
Expand Down
1 change: 1 addition & 0 deletions test/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,6 @@ def test_schema(self):
def test_data(self):
self.assertEqual(self.test_table_data, self.restored_test_table_data)


if __name__ == '__main__':
unittest.main()