Skip to content

Commit

Permalink
refactored batchDelete method in AWSDynamoDAO
Browse files Browse the repository at this point in the history
  • Loading branch information
albogdano committed Sep 9, 2018
1 parent f5a9cb7 commit 3068b19
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 46 deletions.
Expand Up @@ -304,7 +304,7 @@ public <P extends ParaObject> void createAll(String appid, List<P> objects) {
reqs.add(new WriteRequest().withPutRequest(new PutRequest().withItem(row)));
j++;
}
batchWrite(Collections.singletonMap(tableName, reqs));
batchWrite(Collections.singletonMap(tableName, reqs), 1);
reqs.clear();
j = 0;
}
Expand Down Expand Up @@ -418,7 +418,7 @@ public <P extends ParaObject> void deleteAll(String appid, List<P> objects) {
new AttributeValue(getKeyForAppid(object.getId(), appid))))));
}
}
batchWrite(Collections.singletonMap(getTableNameForAppid(appid), reqs));
batchWrite(Collections.singletonMap(getTableNameForAppid(appid), reqs), 1);
logger.debug("DAO.deleteAll() {}", objects.size());
}

Expand Down
Expand Up @@ -29,6 +29,7 @@
import com.amazonaws.services.dynamodbv2.document.Page;
import com.amazonaws.services.dynamodbv2.document.QueryOutcome;
import com.amazonaws.services.dynamodbv2.document.Table;
import com.amazonaws.services.dynamodbv2.document.internal.PageIterable;
import com.amazonaws.services.dynamodbv2.document.spec.QuerySpec;
import com.amazonaws.services.dynamodbv2.document.utils.ValueMap;
import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
Expand Down Expand Up @@ -67,7 +68,6 @@
import java.lang.annotation.Annotation;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -448,8 +448,9 @@ protected static <P extends ParaObject> void batchGet(Map<String, KeysAndAttribu
/**
* Writes multiple items in batch.
* @param items a map of tables->write requests
* @param backoff backoff seconds
*/
protected static void batchWrite(Map<String, List<WriteRequest>> items) {
protected static void batchWrite(Map<String, List<WriteRequest>> items, int backoff) {
if (items == null || items.isEmpty()) {
return;
}
Expand All @@ -462,9 +463,9 @@ protected static void batchWrite(Map<String, List<WriteRequest>> items) {
logger.debug("batchWrite(): total {}, cc {}", items.size(), result.getConsumedCapacity());

if (result.getUnprocessedItems() != null && !result.getUnprocessedItems().isEmpty()) {
Thread.sleep(1000);
Thread.sleep(backoff * 1000);
logger.warn("{} UNPROCESSED write requests!", result.getUnprocessedItems().size());
batchWrite(result.getUnprocessedItems());
batchWrite(result.getUnprocessedItems(), backoff * 2);
}
} catch (Exception e) {
logger.error(null, e);
Expand Down Expand Up @@ -522,12 +523,14 @@ public static <P extends ParaObject> List<P> readPageFromSharedTable(String appi
if (StringUtils.isBlank(appid)) {
return results;
}
Page<Item, QueryOutcome> items = queryGSI(appid, pager);
if (items != null) {
for (Item item : items) {
P obj = ParaObjectUtils.setAnnotatedFields(item.asMap());
if (obj != null) {
results.add(obj);
PageIterable<Item, QueryOutcome> pages = queryGSI(appid, pager);
if (pages != null) {
for (Page<Item, QueryOutcome> page : pages) {
for (Item item : page) {
P obj = ParaObjectUtils.setAnnotatedFields(item.asMap());
if (obj != null) {
results.add(obj);
}
}
}
}
Expand All @@ -537,7 +540,7 @@ public static <P extends ParaObject> List<P> readPageFromSharedTable(String appi
return results;
}

private static Page<Item, QueryOutcome> queryGSI(String appid, Pager p) {
private static PageIterable<Item, QueryOutcome> queryGSI(String appid, Pager p) {
Pager pager = (p != null) ? p : new Pager();
Index index = getSharedIndex();
QuerySpec spec = new QuerySpec().
Expand All @@ -551,7 +554,7 @@ private static Page<Item, QueryOutcome> queryGSI(String appid, Pager p) {
new KeyAttribute(Config._ID, pager.getLastKey()), // RANGE/SORT KEY
new KeyAttribute(Config._KEY, getKeyForAppid(pager.getLastKey(), appid))); // TABLE PRIMARY KEY
}
return index != null ? index.query(spec).firstPage() : null;
return index != null ? index.query(spec).pages() : null;
}

/**
Expand All @@ -562,44 +565,35 @@ public static void deleteAllFromSharedTable(String appid) {
if (StringUtils.isBlank(appid) || !isSharedAppid(appid)) {
return;
}
Pager pager = new Pager(50);
List<WriteRequest> allDeletes = new LinkedList<>();
Page<Item, QueryOutcome> items;
// read all phase
Pager pager = new Pager(25);
PageIterable<Item, QueryOutcome> pages;
Map<String, AttributeValue> lastKey = null;
do {
items = queryGSI(appid, pager);
if (items == null) {
// read all phase
pages = queryGSI(appid, pager);
if (pages == null) {
break;
}
for (Item item : items) {
String key = item.getString(Config._KEY);
// only delete rows which belong to the given appid
if (StringUtils.startsWith(key, appid.trim())) {
logger.debug("Preparing to delete '{}' from shared table, appid: '{}'.", key, appid);
pager.setLastKey(item.getString(Config._ID));
allDeletes.add(new WriteRequest().withDeleteRequest(new DeleteRequest().
withKey(Collections.singletonMap(Config._KEY, new AttributeValue(key)))));
List<WriteRequest> deletePage = new LinkedList<>();
for (Page<Item, QueryOutcome> page : pages) {
for (Item item : page) {
String key = item.getString(Config._KEY);
// only delete rows which belong to the given appid
if (StringUtils.startsWith(key, appid.trim())) {
logger.debug("Preparing to delete '{}' from shared table, appid: '{}'.", key, appid);
pager.setLastKey(item.getString(Config._ID));
deletePage.add(new WriteRequest().withDeleteRequest(new DeleteRequest().
withKey(Collections.singletonMap(Config._KEY, new AttributeValue(key)))));
}
}
lastKey = page.getLowLevelResult().getQueryResult().getLastEvaluatedKey();
}
} while (items.iterator().hasNext());

// delete all phase
final int maxItems = 20;
int batchSteps = (allDeletes.size() > maxItems) ? (allDeletes.size() / maxItems) + 1 : 1;
List<WriteRequest> reqs = new LinkedList<>();
Iterator<WriteRequest> it = allDeletes.iterator();
String tableName = getTableNameForAppid(appid);
for (int i = 0; i < batchSteps; i++) {
while (it.hasNext() && reqs.size() < maxItems) {
reqs.add(it.next());
// delete all phase
logger.info("Deleting {} items belonging to app '{}', from shared table...", deletePage.size(), appid);
if (!deletePage.isEmpty()) {
batchWrite(Collections.singletonMap(getTableNameForAppid(appid), deletePage), 1);
}
if (reqs.size() > 0) {
logger.info("Deleting {} items belonging to app '{}', from shared table (page {}/{})...",
reqs.size(), appid, i + 1, batchSteps);
batchWrite(Collections.singletonMap(tableName, reqs));
}
reqs.clear();
}
} while (lastKey != null && !lastKey.isEmpty());
}

/**
Expand Down

0 comments on commit 3068b19

Please sign in to comment.