Skip to content

Commit

Permalink
Handle tables with autoscaling enabled when we are asked to spike the…
Browse files Browse the repository at this point in the history
… throughput
  • Loading branch information
Dave North committed Sep 14, 2017
1 parent 810c6cd commit d790f5f
Show file tree
Hide file tree
Showing 2 changed files with 212 additions and 38 deletions.
81 changes: 66 additions & 15 deletions app/produce-steps-json.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import glob
import tempfile
import json
import pprint

parser = argparse.ArgumentParser(
prog="produce-steps-json",
Expand Down Expand Up @@ -100,7 +101,6 @@ def main(region,filter,destination,impregion,writetput,readtput, spikedread, s3l
retCode = 0
dateStr = datetime.datetime.now().strftime("%Y/%m/%d/%H_%M.%S")

#conn = boto.dynamodb2.connect_to_region(region)
conn = boto3.client('dynamodb', region_name=region)

if conn:
Expand Down Expand Up @@ -134,16 +134,27 @@ def main(region,filter,destination,impregion,writetput,readtput, spikedread, s3l
if filter in table['name']:
myLog("Generating EMR export JSON for table: [%s]" %table['name'])

autoscale_min_spike_read_capacity = None # Assume no autoscaling
autoscale_min_reset_read_capacity = None
tableS3Path = s3ExportPath + "/" + table['name']

# Does this table have autoscaling enabled?
scalable_target_info = scalable_target_exists("table/" + table['name'],"dynamodb:table:ReadCapacityUnits")
if scalable_target_info is not None:
myLog("Table " + table['name'] + " has autoscaling enabled")
autoscale_min_spike_read_capacity = spikedread
autoscale_min_reset_read_capacity=scalable_target_info[0]['MinCapacity']
myLog("Table " + table['name'] + " has a current AS min capacity of " + str(autoscale_min_reset_read_capacity))

if spikedread is not None:
tputSpikeStep = generateThroughputUpdateStep(table['name'], "Spike", s3ScriptPath, spikedread, table['write'], region)
tputSpikeStep = generateThroughputUpdateStep(table['name'], "Spike", s3ScriptPath, spikedread, autoscale_min_spike_read_capacity, table['write'], region)
exportSteps.append(tputSpikeStep)

tableExportStep = generateTableExportStep(table['name'],tableS3Path,readtput,region)
exportSteps.append(tableExportStep)

if spikedread is not None:
tputResetStep = generateThroughputUpdateStep(table['name'], "Reset", s3ScriptPath, table['read'], table['write'], region)
tputResetStep = generateThroughputUpdateStep(table['name'], "Reset", s3ScriptPath, table['read'], autoscale_min_reset_read_capacity, table['write'], region)
exportSteps.append(tputResetStep)

tableImportStep = generateTableImportStep(table['name'],tableS3Path,writetput,impregion)
Expand All @@ -161,21 +172,36 @@ def main(region,filter,destination,impregion,writetput,readtput, spikedread, s3l
###########
## Add a JSON entry for a single table throughput update step
###########
def generateThroughputUpdateStep(tableName, stepName, s3Path, readtput, writetput, region):
def generateThroughputUpdateStep(tableName, stepName, s3Path, readtput, autoscale_min_throughput,writetput, region):
myLog("addThroughputUpdateStep %s" % tableName)

tputUpdateDict = {}
tputUpdateDict = { "Name": stepName + " Throughput: " + tableName,
"ActionOnFailure": "CONTINUE",
"Type": "CUSTOM_JAR",
"Jar": "s3://us-east-1.elasticmapreduce/libs/script-runner/script-runner.jar",
"Args": [s3Path,
region,
tableName,
readtput,
writetput
]
}

if autoscale_min_throughput:
tputUpdateDict = { "Name": stepName + " Throughput: " + tableName,
"ActionOnFailure": "CONTINUE",
"Type": "CUSTOM_JAR",
"Jar": "s3://us-east-1.elasticmapreduce/libs/script-runner/script-runner.jar",
"Args": [s3Path,
region,
tableName,
readtput,
writetput,
str(autoscale_min_throughput)
]
}
else:
tputUpdateDict = { "Name": stepName + " Throughput: " + tableName,
"ActionOnFailure": "CONTINUE",
"Type": "CUSTOM_JAR",
"Jar": "s3://us-east-1.elasticmapreduce/libs/script-runner/script-runner.jar",
"Args": [s3Path,
region,
tableName,
readtput,
writetput
]
}

return tputUpdateDict

Expand Down Expand Up @@ -306,6 +332,31 @@ def listTables(conn):

return table_list_return

# Checks if a dynamo table has a scalable target (ie. is autoscale enabled?)
def scalable_target_exists(resource_id,scalable_dimension):
response=None
retval=None

myLog("Checking if scalable target exists for " + resource_id + " for dimension " + scalable_dimension)
client = boto3.client('application-autoscaling')

try:
response = client.describe_scalable_targets(
ServiceNamespace='dynamodb',
ResourceIds=[
resource_id,
],
ScalableDimension=scalable_dimension
)
except Exception, e:
myLog("Failed to describe scalable targets " + str(e))

if response:
if response['ScalableTargets']:
retval = response['ScalableTargets']

return retval

def writeFile(content,filename):
myLog("writeFile %s" % filename)

Expand Down
169 changes: 146 additions & 23 deletions app/update-throughput.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,44 +3,167 @@ SOURCE_REGION=$1
TABLE_NAME=$2
READ_CAPACITY=$3
WRITE_CAPACITY=$4
AS_READ_MIN_TPUT=$5

MAX_ATTEMPTS=50
ATTEMPTS=0
SLEEP_SECONDS=20
RET_CODE=0

table_scaleable_read_dimension="dynamodb:table:ReadCapacityUnits"

USAGE="$0 source_region table_name read_capacity write_capacity"

if [ $# != 4 ]; then
if [ $# -lt 4 ]; then
echo $USAGE
exit 1
fi

# Does the table have an autoscaling scalable target for reads?
# If so, return the RoleARN and the max capacity
scalable_target_exists()
{
resource_id=$1
scalable_dimension=$2
region=$3

scalable_target=$(aws application-autoscaling describe-scalable-targets \
--service-namespace dynamodb \
--resource-id "${resource_id}" \
--query "ScalableTargets[?contains(ScalableDimension,\`${scalable_dimension}\`) == \`true\`].[RoleARN,MaxCapacity]" \
--region ${region} \
--output text)

#Update the table throughput
TABLE_STATUS=$(aws dynamodb update-table --region $SOURCE_REGION --table-name $TABLE_NAME --provisioned-throughput ReadCapacityUnits=${READ_CAPACITY},WriteCapacityUnits=${WRITE_CAPACITY} --query 'Table.TableStatus' --output text 2>&1)
if [ $? -ne 0 ]; then
ERROR_TYPE=$(echo $TABLE_STATUS | cut -d \( -f2 | cut -d \) -f1)
if [ "$ERROR_TYPE" == "ValidationException" ]; then
echo "Provisioned throughput already set, no action taken"
exit 0
if [ -z "${scalable_target}" ]; then
echo "false"
else
echo "${scalable_target}"
fi
}

# Add or replace a scalable target on a table or index
register_scalable_target()
{
resource_id=$1
scalable_dimension=$2
role_arn=$3
min_tput=$4
max_tput=$5
region=$6

aws application-autoscaling register-scalable-target \
--service-namespace dynamodb \
--resource-id "${resource_id}" \
--scalable-dimension "${scalable_dimension}" \
--min-capacity ${min_tput} \
--max-capacity ${max_tput} \
--role-arn ${role_arn} \
--region ${region}

status=$?

if [ ${status} -eq 0 ]; then
echo "true"
else
echo "false"
fi
}

# Poll a table until it becomes ACTIVE
wait_for_active()
{
table_name=$1
region=$2

# wait for the table to finish updating
while [ $ATTEMPTS -le $MAX_ATTEMPTS ]; do
TABLE_STATUS=$(aws dynamodb describe-table --region $region --table-name $TABLE_NAME --query 'Table.TableStatus' --output text)
echo "Checking table status, attempt ${ATTEMPTS}" 1>&2
if [ "$TABLE_STATUS" == "ACTIVE" ]; then
echo "Table transition successful" 1>&2
return 0
fi
echo "Table is $TABLE_STATUS, checking again in $SLEEP_SECONDS seconds" 1>&2
(( ATTEMPTS++ ))
sleep $SLEEP_SECONDS
done

# if we're here, the table did not become active in a reasonable time
return 1
}

#
# MAINLINE
#

table_resource_id="table/${TABLE_NAME}"
scalable_target_exists=$(scalable_target_exists ${table_resource_id} ${table_scaleable_read_dimension} ${SOURCE_REGION})

# Check if we have autoscaling enabled. If so, we need to update
# the minimum tput so that we don't keep autoscaling down
if [ "${scalable_target_exists}" != "false" ]; then
echo "Table ${resource_id} has an autoscaling policy - manipulating the min-tput"
# get the role ARN and the max capacity currently set...the min capacity we are provided
role_arn=$(echo ${scalable_target_exists}|cut -f1 -d" "); echo "role arn is ${role_arn}"
max_tput=$(echo ${scalable_target_exists}|cut -f2 -d" "); echo "max_tput is ${max_tput}"

if [[ "$(register_scalable_target ${table_resource_id} ${table_scaleable_read_dimension} ${role_arn} ${AS_READ_MIN_TPUT} ${max_tput} ${SOURCE_REGION})" == "true" ]]; then
echo "Successfully registered new scalable target for ${table_resource_id} with minimum tput ${AS_READ_MIN_TPUT}"

# Updating the min tput triggers autoscaling to update the table if there is read activity
# so we need to wait for it to finish updating
wait_for_active "${TABLE_NAME}" "${SOURCE_REGION}"
table_status=$?

if [ ${table_status} -eq 0 ]; then
echo "Table has returned to ACTIVE state"
else
echo "FAILURE: Table never transitioned to active"
RET_CODE=1
fi
else
echo "ERROR registering new scalable target for ${table_resource_id}"
fi
echo "Unable to spike throughput"
exit 1
fi

# wait for the table to finish updating
while [ $ATTEMPTS -le $MAX_ATTEMPTS ]; do
TABLE_STATUS=$(aws dynamodb describe-table --region $SOURCE_REGION --table-name $TABLE_NAME --query 'Table.TableStatus' --output text)
echo "Checking table status, attempt ${ATTEMPT}"
if [ "$TABLE_STATUS" == "ACTIVE" ]; then
echo "Table transition successful"
exit 0
echo "Updating the base table read throughput with update-table to ${READ_CAPACITY}"
if [ ${RET_CODE} -eq 0 ]; then
# Update the table throughput directly
# This is needed in case there is no autoscaling enabled OR
# if autoscaling is enabled and there is no read acitivty on the table
# Since autoscaling will never scale back down by itself
TABLE_STATUS=$(aws dynamodb update-table \
--region $SOURCE_REGION \
--table-name $TABLE_NAME \
--provisioned-throughput ReadCapacityUnits=${READ_CAPACITY},WriteCapacityUnits=${WRITE_CAPACITY} \
--query 'Table.TableStatus' \
--output text 2>&1)

if [ $? -ne 0 ]; then
ERROR_TYPE=$(echo $TABLE_STATUS | cut -d \( -f2 | cut -d \) -f1)
if [ "$ERROR_TYPE" == "ValidationException" ]; then
echo "Provisioned throughput already set, no action taken"
RET_CODE=0
else
echo "Unable to spike throughput"
echo ${TABLE_STATUS}
RET_CODE=1
fi
fi
echo "Table is $TABLE_STATUS, checking again in $SLEEP_SECONDS seconds"
(( ATTEMPTS++ ))
sleep $SLEEP_SECONDS
done

# Check the table staus again from the update-table call
wait_for_active ${TABLE_NAME} ${SOURCE_REGION}
table_status=$?

if [ ${table_status} -eq 0 ]; then
RET_CODE=0
else
echo "FAILURE: Table never transitioned to active"
RET_CODE=1
fi
else
echo "FAILURE: Table never transitioned to active"
RET_CODE=1
fi

echo "Table never transitioned to active"
exit 1
exit $RET_CODE

0 comments on commit d790f5f

Please sign in to comment.