Skip to content

Commit

Permalink
[7.14][Transform] Fix transform fails when getting field mappings (#7…
Browse files Browse the repository at this point in the history
…5694) (#75730)

move field mapping retrieval from task startup into indexer

fixes #75693
  • Loading branch information
Hendrik Muhs committed Jul 27, 2021
1 parent efa6f10 commit 3b1ac6f
Show file tree
Hide file tree
Showing 9 changed files with 66 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.common.logging.LoggerMessageFormat;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
Expand All @@ -46,6 +46,7 @@
import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex;
import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
import org.elasticsearch.xpack.transform.transforms.pivot.SchemaUtil;
import org.elasticsearch.xpack.transform.utils.ExceptionRootCauseFinder;

import java.util.Collection;
Expand Down Expand Up @@ -74,7 +75,6 @@ class ClientTransformIndexer extends TransformIndexer {
TransformAuditor auditor,
TransformIndexerStats initialStats,
TransformConfig transformConfig,
Map<String, String> fieldMappings,
TransformProgress transformProgress,
TransformCheckpoint lastCheckpoint,
TransformCheckpoint nextCheckpoint,
Expand All @@ -88,7 +88,6 @@ class ClientTransformIndexer extends TransformIndexer {
checkpointProvider,
auditor,
transformConfig,
fieldMappings,
ExceptionsHelper.requireNonNull(initialState, "initialState"),
initialPosition,
initialStats == null ? new TransformIndexerStats() : initialStats,
Expand Down Expand Up @@ -239,6 +238,15 @@ void doGetInitialProgress(SearchRequest request, ActionListener<SearchResponse>
);
}

@Override
void doGetFieldMappings(ActionListener<Map<String, String>> fieldMappingsListener) {
SchemaUtil.getDestinationFieldMappings(
client,
getConfig().getDestination().getIndex(),
fieldMappingsListener
);
}

@Override
protected void doSaveState(IndexerState indexerState, TransformIndexerPosition position, Runnable next) {
if (context.getTaskState() == TransformTaskState.FAILED) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,13 @@
import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex;
import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;

import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;

class ClientTransformIndexerBuilder {
private ParentTaskAssigningClient parentTaskClient;
private TransformConfigManager transformsConfigManager;
private TransformCheckpointService transformsCheckpointService;
private TransformAuditor auditor;
private Map<String, String> fieldMappings;
private TransformConfig transformConfig;
private TransformIndexerStats initialStats;
private IndexerState indexerState = IndexerState.STOPPED;
Expand Down Expand Up @@ -57,7 +55,6 @@ ClientTransformIndexer build(ThreadPool threadPool, TransformContext context) {
auditor,
initialStats,
transformConfig,
fieldMappings,
progress,
TransformCheckpoint.isNullOrEmpty(lastCheckpoint) ? TransformCheckpoint.EMPTY : lastCheckpoint,
TransformCheckpoint.isNullOrEmpty(nextCheckpoint) ? TransformCheckpoint.EMPTY : nextCheckpoint,
Expand Down Expand Up @@ -92,11 +89,6 @@ ClientTransformIndexerBuilder setAuditor(TransformAuditor auditor) {
return this;
}

ClientTransformIndexerBuilder setFieldMappings(Map<String, String> fieldMappings) {
this.fieldMappings = fieldMappings;
return this;
}

ClientTransformIndexerBuilder setTransformConfig(TransformConfig transformConfig) {
this.transformConfig = transformConfig;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ private enum RunState {

protected final AtomicReference<Collection<ActionListener<Void>>> saveStateListeners = new AtomicReference<>();

private final Map<String, String> fieldMappings;
private volatile Map<String, String> fieldMappings;

// the function of the transform, e.g. pivot or latest
private Function function;
Expand Down Expand Up @@ -132,7 +132,6 @@ public TransformIndexer(
CheckpointProvider checkpointProvider,
TransformAuditor auditor,
TransformConfig transformConfig,
Map<String, String> fieldMappings,
AtomicReference<IndexerState> initialState,
TransformIndexerPosition initialPosition,
TransformIndexerStats jobStats,
Expand All @@ -146,7 +145,6 @@ public TransformIndexer(
this.checkpointProvider = ExceptionsHelper.requireNonNull(checkpointProvider, "checkpointProvider");
this.auditor = ExceptionsHelper.requireNonNull(auditor, "auditor");
this.transformConfig = ExceptionsHelper.requireNonNull(transformConfig, "transformConfig");
this.fieldMappings = ExceptionsHelper.requireNonNull(fieldMappings, "fieldMappings");
this.progress = transformProgress;
this.lastCheckpoint = ExceptionsHelper.requireNonNull(lastCheckpoint, "lastCheckpoint");
this.nextCheckpoint = ExceptionsHelper.requireNonNull(nextCheckpoint, "nextCheckpoint");
Expand All @@ -162,6 +160,8 @@ public TransformIndexer(

abstract void doGetInitialProgress(SearchRequest request, ActionListener<SearchResponse> responseListener);

abstract void doGetFieldMappings(ActionListener<Map<String, String>> fieldMappingsListener);

abstract void doDeleteByQuery(DeleteByQueryRequest deleteByQueryRequest, ActionListener<BulkByScrollResponse> responseListener);

abstract void refreshDestinationIndex(ActionListener<RefreshResponse> responseListener);
Expand Down Expand Up @@ -272,7 +272,7 @@ protected void onStart(long now, ActionListener<Boolean> listener) {
// On each run, we need to get the total number of docs and reset the count of processed docs
// Since multiple checkpoints can be executed in the task while it is running on the same node, we need to gather
// the progress here, and not in the executor.
ActionListener<Void> updateConfigListener = ActionListener.wrap(updateConfigResponse -> {
ActionListener<Void> configurationReadyListener = ActionListener.wrap(r -> {
initializeFunction();

if (initialRun()) {
Expand Down Expand Up @@ -318,27 +318,42 @@ protected void onStart(long now, ActionListener<Boolean> listener) {
}
}, listener::onFailure);

ActionListener<Map<String, String>> fieldMappingsListener = ActionListener.wrap(fieldMappings -> {
this.fieldMappings = fieldMappings;
configurationReadyListener.onResponse(null);
}, listener::onFailure);

ActionListener<Void> reLoadFieldMappingsListener = ActionListener.wrap(
updateConfigResponse -> { doGetFieldMappings(fieldMappingsListener); },
listener::onFailure
);

// If we are continuous, we will want to verify we have the latest stored configuration
ActionListener<Void> changedSourceListener = ActionListener.wrap(r -> {
if (isContinuous()) {
transformsConfigManager.getTransformConfiguration(getJobId(), ActionListener.wrap(config -> {
transformConfig = config;
logger.debug("[{}] successfully refreshed transform config from index.", getJobId());
updateConfigListener.onResponse(null);
if (transformConfig.equals(config) && fieldMappings != null) {
logger.trace("[{}] transform config has not changed.", getJobId());
configurationReadyListener.onResponse(null);
} else {
transformConfig = config;
logger.debug("[{}] successfully refreshed transform config from index.", getJobId());
reLoadFieldMappingsListener.onResponse(null);
}
}, failure -> {
String msg = TransformMessages.getMessage(TransformMessages.FAILED_TO_RELOAD_TRANSFORM_CONFIGURATION, getJobId());
logger.error(msg, failure);
// If the transform config index or the transform config is gone, something serious occurred
// We are in an unknown state and should fail out
if (failure instanceof ResourceNotFoundException) {
updateConfigListener.onFailure(new TransformConfigLostOnReloadException(msg, failure));
reLoadFieldMappingsListener.onFailure(new TransformConfigLostOnReloadException(msg, failure));
} else {
auditor.warning(getJobId(), msg);
updateConfigListener.onResponse(null);
reLoadFieldMappingsListener.onResponse(null);
}
}));
} else {
updateConfigListener.onResponse(null);
reLoadFieldMappingsListener.onResponse(null);
}
}, listener::onFailure);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.common.ValidationException;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.persistent.AllocatedPersistentTask;
import org.elasticsearch.persistent.PersistentTaskState;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
Expand All @@ -48,7 +48,6 @@
import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex;
import org.elasticsearch.xpack.transform.persistence.TransformInternalIndex;
import org.elasticsearch.xpack.transform.transforms.pivot.SchemaUtil;

import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -202,7 +201,7 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable TransformTa
}
);

// <7> load next checkpoint
// <6> load next checkpoint
ActionListener<TransformCheckpoint> getTransformNextCheckpointListener = ActionListener.wrap(nextCheckpoint -> {

if (nextCheckpoint.isEmpty()) {
Expand All @@ -225,7 +224,7 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable TransformTa
markAsFailed(buildTask, msg);
});

// <6> load last checkpoint
// <5> load last checkpoint
ActionListener<TransformCheckpoint> getTransformLastCheckpointListener = ActionListener.wrap(lastCheckpoint -> {
indexerBuilder.setLastCheckpoint(lastCheckpoint);

Expand All @@ -238,7 +237,7 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable TransformTa
markAsFailed(buildTask, msg);
});

// <5> Set the previous stats (if they exist), initialize the indexer, start the task (If it is STOPPED)
// <4> Set the previous stats (if they exist), initialize the indexer, start the task (If it is STOPPED)
// Since we don't create the task until `_start` is called, if we see that the task state is stopped, attempt to start
// Schedule execution regardless
ActionListener<Tuple<TransformStoredDoc, SeqNoPrimaryTermAndIndex>> transformStatsActionListener = ActionListener.wrap(
Expand Down Expand Up @@ -286,26 +285,12 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable TransformTa
}
);

// <4> set fieldmappings for the indexer, get the previous stats (if they exist)
ActionListener<Map<String, String>> getFieldMappingsListener = ActionListener.wrap(fieldMappings -> {
indexerBuilder.setFieldMappings(fieldMappings);
transformServices.getConfigManager().getTransformStoredDoc(transformId, transformStatsActionListener);
}, error -> {
String msg = TransformMessages.getMessage(
TransformMessages.UNABLE_TO_GATHER_FIELD_MAPPINGS,
indexerBuilder.getTransformConfig().getDestination().getIndex()
);
logger.error(msg, error);
markAsFailed(buildTask, msg);
});

// <3> Validate the transform, assigning it to the indexer, and get the field mappings
// <3> Validate the transform, assigning it to the indexer, and get the previous stats (if they exist)
ActionListener<TransformConfig> getTransformConfigListener = ActionListener.wrap(config -> {
ValidationException validationException = config.validate(null);
if (validationException == null) {
indexerBuilder.setTransformConfig(config);
SchemaUtil.getDestinationFieldMappings(buildTask.getParentTaskClient(), config.getDestination().getIndex(),
getFieldMappingsListener);
transformServices.getConfigManager().getTransformStoredDoc(transformId, transformStatsActionListener);
} else {
auditor.error(transformId, validationException.getMessage());
markAsFailed(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ public void testAudiOnFinishFrequency() {
mock(TransformAuditor.class),
mock(TransformIndexerStats.class),
mock(TransformConfig.class),
Collections.emptyMap(),
null,
new TransformCheckpoint("transform", Instant.now().toEpochMilli(), 0L, Collections.emptyMap(), Instant.now().toEpochMilli()),
new TransformCheckpoint("transform", Instant.now().toEpochMilli(), 2L, Collections.emptyMap(), Instant.now().toEpochMilli()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ class MockedTransformIndexer extends TransformIndexer {
IndexBasedTransformConfigManager transformsConfigManager,
CheckpointProvider checkpointProvider,
TransformConfig transformConfig,
Map<String, String> fieldMappings,
TransformAuditor auditor,
AtomicReference<IndexerState> initialState,
TransformIndexerPosition initialPosition,
Expand All @@ -120,7 +119,6 @@ class MockedTransformIndexer extends TransformIndexer {
checkpointProvider,
auditor,
transformConfig,
fieldMappings,
initialState,
initialPosition,
jobStats,
Expand Down Expand Up @@ -260,6 +258,11 @@ void doDeleteByQuery(DeleteByQueryRequest deleteByQueryRequest, ActionListener<B
void refreshDestinationIndex(ActionListener<RefreshResponse> responseListener) {
responseListener.onResponse(new RefreshResponse(1, 1, 0, Collections.emptyList()));
}

@Override
void doGetFieldMappings(ActionListener<Map<String, String>> fieldMappingsListener) {
fieldMappingsListener.onResponse(Collections.emptyMap());
}
}

@Before
Expand Down Expand Up @@ -722,7 +725,6 @@ private MockedTransformIndexer createMockIndexer(
transformConfigManager,
mock(CheckpointProvider.class),
config,
Collections.emptyMap(),
auditor,
state,
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ class MockedTransformIndexer extends TransformIndexer {
CheckpointProvider checkpointProvider,
TransformAuditor auditor,
TransformConfig transformConfig,
Map<String, String> fieldMappings,
AtomicReference<IndexerState> initialState,
TransformIndexerPosition initialPosition,
TransformIndexerStats jobStats,
Expand All @@ -124,7 +123,6 @@ class MockedTransformIndexer extends TransformIndexer {
checkpointProvider,
auditor,
transformConfig,
fieldMappings,
initialState,
initialPosition,
jobStats,
Expand Down Expand Up @@ -247,6 +245,11 @@ public int getSaveStateListenerCallCount() {
public TransformState getPersistedState() {
return persistedState;
}

@Override
void doGetFieldMappings(ActionListener<Map<String, String>> fieldMappingsListener) {
fieldMappingsListener.onResponse(Collections.emptyMap());
}
}

@Before
Expand Down Expand Up @@ -600,7 +603,6 @@ private MockedTransformIndexer createMockIndexer(
checkpointProvider,
auditor,
config,
Collections.emptyMap(),
state,
null,
jobStats,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ class MockedTransformIndexer extends TransformIndexer {
CheckpointProvider checkpointProvider,
TransformAuditor auditor,
TransformConfig transformConfig,
Map<String, String> fieldMappings,
AtomicReference<IndexerState> initialState,
TransformIndexerPosition initialPosition,
TransformIndexerStats jobStats,
Expand All @@ -126,7 +125,6 @@ class MockedTransformIndexer extends TransformIndexer {
checkpointProvider,
auditor,
transformConfig,
fieldMappings,
initialState,
initialPosition,
jobStats,
Expand Down Expand Up @@ -254,6 +252,11 @@ protected IterationResult<TransformIndexerPosition> doProcess(SearchResponse sea
);
}

@Override
void doGetFieldMappings(ActionListener<Map<String, String>> fieldMappingsListener) {
fieldMappingsListener.onResponse(Collections.emptyMap());
}

public boolean waitingForNextSearch() {
return super.getScheduledNextSearch() != null;
}
Expand Down Expand Up @@ -440,7 +443,6 @@ private MockedTransformIndexer createMockIndexer(
checkpointProvider,
auditor,
config,
Collections.emptyMap(),
state,
null,
jobStats,
Expand Down

0 comments on commit 3b1ac6f

Please sign in to comment.