Skip to content

Commit

Permalink
fix(elasticsearch): make alias creation atomic
Browse files Browse the repository at this point in the history
  • Loading branch information
david-leifker committed Feb 13, 2023
1 parent 21d9cb6 commit 9df86e9
Showing 1 changed file with 13 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -271,9 +272,10 @@ private void reindex(ReindexConfig indexState) throws Throwable {
log.warn("No change in index count after 5 minutes, re-triggering reindex #{}.", reindexCount);
submitReindex(indexState.name(), tempIndexName);
reindexCount = reindexCount + 1;
documentCountsLastUpdated = System.currentTimeMillis(); // reset timer
} else {
throw new RuntimeException(String.format("Reindex from %s to %s failed. Document count %s != %s", indexState.name(), tempIndexName,
documentCounts.getFirst(), documentCounts.getSecond()));
log.warn("Reindex retry timeout for {}.", indexState.name());
break;
}
}

Expand Down Expand Up @@ -306,21 +308,25 @@ private void reindex(ReindexConfig indexState) throws Throwable {
// Check if the original index is aliased or not
GetAliasesResponse aliasesResponse = searchClient.indices().getAlias(
new GetAliasesRequest(indexState.name()).indices(indexState.indexPattern()), RequestOptions.DEFAULT);

// If not aliased, delete the original index
final Collection<String> aliasedIndexDelete;
if (aliasesResponse.getAliases().isEmpty()) {
searchClient.indices().delete(new DeleteIndexRequest().indices(indexState.name()), RequestOptions.DEFAULT);
log.info("Deleting index {} to allow alias creation", indexState.name());
aliasedIndexDelete = List.of(indexState.name());
} else {
searchClient.indices()
.delete(new DeleteIndexRequest().indices(aliasesResponse.getAliases().keySet().toArray(new String[0])),
RequestOptions.DEFAULT);
log.info("Deleting old indices in existing alias {}", aliasesResponse.getAliases().keySet());
aliasedIndexDelete = aliasesResponse.getAliases().keySet();
}

// Add alias for the new index
AliasActions removeAction = AliasActions.remove().alias(indexState.name()).index(indexState.indexPattern());
AliasActions removeAction = AliasActions.removeIndex()
.indices(aliasedIndexDelete.toArray(new String[0]));
AliasActions addAction = AliasActions.add().alias(indexState.name()).index(tempIndexName);
searchClient.indices()
.updateAliases(new IndicesAliasesRequest().addAliasAction(removeAction).addAliasAction(addAction),
RequestOptions.DEFAULT);

log.info("Finished setting up {}", indexState.name());
}

Expand Down

0 comments on commit 9df86e9

Please sign in to comment.