Skip to content

Commit

Permalink
David leifker/elasticsearch optimization ext (#6919)
Browse files Browse the repository at this point in the history
  • Loading branch information
david-leifker committed Dec 31, 2022
1 parent 5a3089f commit 4915420
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,27 @@
import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult;
import com.linkedin.gms.factory.config.ConfigurationProvider;
import com.linkedin.gms.factory.search.BaseElasticSearchComponentsFactory;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

import com.linkedin.metadata.search.elasticsearch.indexbuilder.ReindexConfig;
import com.linkedin.metadata.shared.ElasticSearchIndexed;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.NotImplementedException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.elasticsearch.client.GetAliasesResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.indices.ResizeRequest;

import static com.linkedin.datahub.upgrade.buildindices.IndexUtils.*;
import static com.linkedin.datahub.upgrade.buildindices.IndexUtils.getAllReindexConfigs;


@RequiredArgsConstructor
Expand Down Expand Up @@ -52,14 +58,31 @@ public Function<UpgradeContext, UpgradeStepResult> executable() {
.collect(Collectors.toList());

for (ReindexConfig indexConfig : indexConfigs) {
UpdateSettingsRequest request = new UpdateSettingsRequest(indexConfig.name());
String indexName = indexConfig.name();

GetAliasesResponse aliasResponse = getAlias(indexConfig.name());
if (!aliasResponse.getAliases().isEmpty()) {
Set<String> indices = aliasResponse.getAliases().keySet();
if (indices.size() != 1) {
throw new NotImplementedException(
String.format("Clone not supported for %s indices in alias %s. Indicies: %s",
indices.size(),
indexConfig.name(),
String.join(",", indices)));
}
indexName = indices.stream().findFirst().get();
log.info("Alias {} resolved to index {}", indexConfig.name(), indexName);
}

UpdateSettingsRequest request = new UpdateSettingsRequest(indexName);
Map<String, Object> indexSettings = ImmutableMap.of("index.blocks.write", "true");

request.settings(indexSettings);
boolean ack;
try {
ack =
_esComponents.getSearchClient().indices().putSettings(request, RequestOptions.DEFAULT).isAcknowledged();
log.info("Updated index {} with new settings. Settings: {}, Acknowledged: {}", indexName, indexSettings, ack);
} catch (ElasticsearchStatusException ese) {
// Cover first run case, indices won't exist so settings updates won't work nor will the rest of the preConfigure steps.
// Since no data are in there they are skippable.
Expand All @@ -69,7 +92,7 @@ public Function<UpgradeContext, UpgradeStepResult> executable() {
}
throw ese;
}
log.info("Updated index {} with new settings. Settings: {}, Acknowledged: {}", indexConfig.name(), indexSettings, ack);

if (!ack) {
log.error("Partial index settings update, some indices may still be blocking writes."
+ " Please fix the error and re-run the BuildIndices upgrade job.");
Expand All @@ -79,10 +102,10 @@ public Function<UpgradeContext, UpgradeStepResult> executable() {
// Clone indices
if (_configurationProvider.getElasticSearch().getBuildIndices().isCloneIndices()) {
String clonedName = indexConfig.name() + "_clone_" + System.currentTimeMillis();
ResizeRequest resizeRequest = new ResizeRequest(clonedName, indexConfig.name());
ResizeRequest resizeRequest = new ResizeRequest(clonedName, indexName);
boolean cloneAck =
_esComponents.getSearchClient().indices().clone(resizeRequest, RequestOptions.DEFAULT).isAcknowledged();
log.info("Cloned index {} into {}, Acknowledged: {}", indexConfig.name(), clonedName, cloneAck);
log.info("Cloned index {} into {}, Acknowledged: {}", indexName, clonedName, cloneAck);
if (!cloneAck) {
log.error("Partial index settings update, cloned indices may need to be cleaned up: {}", clonedName);
return new DefaultUpgradeStepResult(id(), UpgradeStepResult.Result.FAILED);
Expand All @@ -96,4 +119,9 @@ public Function<UpgradeContext, UpgradeStepResult> executable() {
return new DefaultUpgradeStepResult(id(), UpgradeStepResult.Result.SUCCEEDED);
};
}

private GetAliasesResponse getAlias(String name) throws IOException {
return _esComponents.getSearchClient().indices()
.getAlias(new GetAliasesRequest(name), RequestOptions.DEFAULT);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ elasticsearch:
settingsOverrides: ${ELASTICSEARCH_INDEX_BUILDER_SETTINGS_OVERRIDES:#{null}}
entitySettingsOverrides: ${ELASTICSEARCH_INDEX_BUILDER_ENTITY_SETTINGS_OVERRIDES:#{null}}
buildIndices:
cloneIndices: ${ELASTICSEARCH_BUILD_INDICES_CLONE_INDICES:true}
initialBackOffMs: ${ELASTICSEARCH_BUILD_INDICES_INITIAL_BACK_OFF_MILLIS:5000}
maxBackOffs: ${ELASTICSEARCH_BUILD_INDICES_MAX_BACK_OFFS:5}
backOffFactor: ${ELASTICSEARCH_BUILD_INDICES_BACK_OFF_FACTOR:2} # Multiplicative factor for back off, default values will result in waiting 5min 15s
Expand Down

0 comments on commit 4915420

Please sign in to comment.