Skip to content

Commit

Permalink
[Transform] Reduce indexes to query based on checkpoints (#75839)
Browse files Browse the repository at this point in the history
Continuous transform reduce the amount of data to query for by detecting what has been changed
since the last checkpoint. This information is used to inject queries that narrow the scope.
The query is send to all configured indices. This change reduces the indexes to call
using checkpoint information. The number of network calls go down which in addition to performance
reduces the probability of a failure.

This change mainly helps the transforms of type latest, pivot transform require additional
changes planned for later.
  • Loading branch information
Hendrik Muhs committed Aug 26, 2021
1 parent 0e50d20 commit 4974a7c
Show file tree
Hide file tree
Showing 11 changed files with 461 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;

import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
Expand Down Expand Up @@ -312,6 +315,23 @@ public static long getBehind(TransformCheckpoint oldCheckpoint, TransformCheckpo
return newCheckPointOperationsSum - oldCheckPointOperationsSum;
}

public static Collection<String> getChangedIndices(TransformCheckpoint oldCheckpoint, TransformCheckpoint newCheckpoint) {
if (oldCheckpoint.isEmpty()) {
return newCheckpoint.indicesCheckpoints.keySet();
}

Set<String> indices = new HashSet<>();

for (Entry<String, long[]> entry : newCheckpoint.indicesCheckpoints.entrySet()) {
// compare against the old checkpoint
if (Arrays.equals(entry.getValue(), oldCheckpoint.indicesCheckpoints.get(entry.getKey())) == false) {
indices.add(entry.getKey());
}
}

return indices;
}

private static Map<String, long[]> readCheckpoints(Map<String, Object> readMap) {
Map<String, long[]> checkpoints = new TreeMap<>();
for (Map.Entry<String, Object> e : readMap.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;

import static org.elasticsearch.test.TestMatchers.matchesPattern;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThan;

public class TransformCheckpointTests extends AbstractSerializingTransformTestCase<TransformCheckpoint> {

Expand Down Expand Up @@ -191,6 +195,74 @@ public void testGetBehind() {
assertEquals((indices - 2) * shards * 10L, TransformCheckpoint.getBehind(checkpointOld, checkpointTransientNew));
}

public void testGetChangedIndices() {
String baseIndexName = randomAlphaOfLength(8);
String id = randomAlphaOfLengthBetween(1, 10);
long timestamp = randomNonNegativeLong();

TreeMap<String, long[]> checkpointsByIndexOld = new TreeMap<>();
TreeMap<String, long[]> checkpointsByIndexNew = new TreeMap<>();

int indices = randomIntBetween(5, 20);
int shards = randomIntBetween(1, 20);

for (int i = 0; i < indices; ++i) {
List<Long> checkpoints1 = new ArrayList<>();
List<Long> checkpoints2 = new ArrayList<>();

for (int j = 0; j < shards; ++j) {
long shardCheckpoint = randomLongBetween(-1, 1_000_000);
checkpoints1.add(shardCheckpoint);
if (i % 3 == 0) {
checkpoints2.add(shardCheckpoint + 10);
} else {
checkpoints2.add(shardCheckpoint);
}
}

String indexName = baseIndexName + i;

if (i < 15) {
checkpointsByIndexOld.put(indexName, checkpoints1.stream().mapToLong(l -> l).toArray());
}
if (i % 5 != 0) {
checkpointsByIndexNew.put(indexName, checkpoints2.stream().mapToLong(l -> l).toArray());
}
}
long checkpoint = randomLongBetween(10, 100);
TransformCheckpoint checkpointOld = new TransformCheckpoint(id, timestamp, checkpoint, checkpointsByIndexOld, 0L);
TransformCheckpoint checkpointNew = new TransformCheckpoint(id, timestamp, checkpoint + 1, checkpointsByIndexNew, 0L);

Set<Integer> changedIndexes = TransformCheckpoint.getChangedIndices(checkpointOld, checkpointNew)
.stream()
.map(x -> Integer.parseInt(x.substring(baseIndexName.length())))
.collect(Collectors.toSet());

assertThat(changedIndexes.size(), lessThan(indices));

for (int i = 0; i < indices; ++i) {
if (i >= 15) {
if (i % 5 == 0) {
assertFalse(changedIndexes.contains(i));
} else {
assertTrue(changedIndexes.contains(i));
}
} else if (i % 5 == 0) {
assertFalse(changedIndexes.contains(i));
} else if (i % 3 == 0) {
assertTrue(changedIndexes.contains(i));
} else {
assertFalse(changedIndexes.contains(i));
}
}

// check against empty
assertThat(
TransformCheckpoint.getChangedIndices(TransformCheckpoint.EMPTY, checkpointNew),
equalTo(checkpointNew.getIndicesCheckpoints().keySet())
);
}

private static Map<String, long[]> randomCheckpointsByIndex() {
Map<String, long[]> checkpointsByIndex = new TreeMap<>();
int indices = randomIntBetween(1, 10);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.common.logging.LoggerMessageFormat;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
Expand Down Expand Up @@ -59,6 +60,7 @@
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

Expand All @@ -71,7 +73,7 @@ class ClientTransformIndexer extends TransformIndexer {
private final AtomicBoolean oldStatsCleanedUp = new AtomicBoolean(false);

private final AtomicReference<SeqNoPrimaryTermAndIndex> seqNoPrimaryTermAndIndex;
private volatile PointInTimeBuilder pit;
private final ConcurrentHashMap<String, PointInTimeBuilder> namedPits = new ConcurrentHashMap<>();
private volatile long pitCheckpoint;
private volatile boolean disablePit = false;

Expand Down Expand Up @@ -250,11 +252,7 @@ void doGetInitialProgress(SearchRequest request, ActionListener<SearchResponse>

@Override
void doGetFieldMappings(ActionListener<Map<String, String>> fieldMappingsListener) {
SchemaUtil.getDestinationFieldMappings(
client,
getConfig().getDestination().getIndex(),
fieldMappingsListener
);
SchemaUtil.getDestinationFieldMappings(client, getConfig().getDestination().getIndex(), fieldMappingsListener);
}

/**
Expand Down Expand Up @@ -363,12 +361,20 @@ protected void onStop() {
}

private void closePointInTime() {
for (String name : namedPits.keySet()) {
closePointInTime(name);
}
}

private void closePointInTime(String name) {
PointInTimeBuilder pit = namedPits.remove(name);

if (pit == null) {
return;
}

String oldPit = pit.getEncodedId();
pit = null;

ClosePointInTimeRequest closePitRequest = new ClosePointInTimeRequest(oldPit);
ClientHelper.executeWithHeadersAsync(
transformConfig.getHeaders(),
Expand All @@ -383,20 +389,25 @@ private void closePointInTime() {
);
}

private void injectPointInTimeIfNeeded(SearchRequest searchRequest, ActionListener<SearchRequest> listener) {
private void injectPointInTimeIfNeeded(
Tuple<String, SearchRequest> namedSearchRequest,
ActionListener<Tuple<String, SearchRequest>> listener
) {
if (disablePit) {
listener.onResponse(searchRequest);
listener.onResponse(namedSearchRequest);
return;
}

SearchRequest searchRequest = namedSearchRequest.v2();
PointInTimeBuilder pit = namedPits.get(namedSearchRequest.v1());
if (pit != null) {
searchRequest.source().pointInTimeBuilder(pit);
listener.onResponse(searchRequest);
listener.onResponse(namedSearchRequest);
return;
}

// no pit, create a new one
OpenPointInTimeRequest pitRequest = new OpenPointInTimeRequest(transformConfig.getSource().getIndex()).keepAlive(PIT_KEEP_ALIVE);
OpenPointInTimeRequest pitRequest = new OpenPointInTimeRequest(searchRequest.indices()).keepAlive(PIT_KEEP_ALIVE);

ClientHelper.executeWithHeadersAsync(
transformConfig.getHeaders(),
Expand All @@ -405,11 +416,17 @@ private void injectPointInTimeIfNeeded(SearchRequest searchRequest, ActionListen
OpenPointInTimeAction.INSTANCE,
pitRequest,
ActionListener.wrap(response -> {
pit = new PointInTimeBuilder(response.getPointInTimeId()).setKeepAlive(PIT_KEEP_ALIVE);
searchRequest.source().pointInTimeBuilder(pit);
PointInTimeBuilder newPit = new PointInTimeBuilder(response.getPointInTimeId()).setKeepAlive(PIT_KEEP_ALIVE);
namedPits.put(namedSearchRequest.v1(), newPit);
searchRequest.source().pointInTimeBuilder(newPit);
pitCheckpoint = getNextCheckpoint().getCheckpoint();
logger.trace("[{}] using pit search context with id [{}]", getJobId(), pit.getEncodedId());
listener.onResponse(searchRequest);
logger.trace(
"[{}] using pit search context with id [{}]; request [{}]",
getJobId(),
newPit.getEncodedId(),
namedSearchRequest.v1()
);
listener.onResponse(namedSearchRequest);
}, e -> {
Throwable unwrappedException = ExceptionsHelper.findSearchExceptionRootCause(e);
// if point in time is not supported, disable it but do not remember forever (stopping and starting will give it another
Expand All @@ -433,25 +450,27 @@ private void injectPointInTimeIfNeeded(SearchRequest searchRequest, ActionListen
e
);
}
listener.onResponse(searchRequest);
listener.onResponse(namedSearchRequest);
})
);
}

private void doSearch(SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
logger.trace("searchRequest: {}", searchRequest);
private void doSearch(Tuple<String, SearchRequest> namedSearchRequest, ActionListener<SearchResponse> listener) {
logger.trace(() -> new ParameterizedMessage("searchRequest: [{}]", namedSearchRequest.v2()));

PointInTimeBuilder pit = namedSearchRequest.v2().pointInTimeBuilder();

ClientHelper.executeWithHeadersAsync(
transformConfig.getHeaders(),
ClientHelper.TRANSFORM_ORIGIN,
client,
SearchAction.INSTANCE,
searchRequest,
namedSearchRequest.v2(),
ActionListener.wrap(response -> {
// did the pit change?
if (response.pointInTimeId() != null && (pit == null || response.pointInTimeId() != pit.getEncodedId())) {
pit = new PointInTimeBuilder(response.pointInTimeId()).setKeepAlive(PIT_KEEP_ALIVE);
logger.trace("point in time handle has changed");
namedPits.put(namedSearchRequest.v1(), new PointInTimeBuilder(response.pointInTimeId()).setKeepAlive(PIT_KEEP_ALIVE));
logger.trace("point in time handle has changed; request [{}]", namedSearchRequest.v1());
}

listener.onResponse(response);
Expand All @@ -461,15 +480,22 @@ private void doSearch(SearchRequest searchRequest, ActionListener<SearchResponse
// succeeds a new pit gets created at the next run
Throwable unwrappedException = ExceptionsHelper.findSearchExceptionRootCause(e);
if (unwrappedException instanceof SearchContextMissingException) {
logger.warn(new ParameterizedMessage("[{}] Search context missing, falling back to normal search.", getJobId()), e);
pit = null;
searchRequest.source().pointInTimeBuilder(null);
logger.warn(
new ParameterizedMessage(
"[{}] Search context missing, falling back to normal search; request [{}]",
getJobId(),
namedSearchRequest.v1()
),
e
);
namedPits.remove(namedSearchRequest.v1());
namedSearchRequest.v2().source().pointInTimeBuilder(null);
ClientHelper.executeWithHeadersAsync(
transformConfig.getHeaders(),
ClientHelper.TRANSFORM_ORIGIN,
client,
SearchAction.INSTANCE,
searchRequest,
namedSearchRequest.v2(),
listener
);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.core.transform.transforms.SourceConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats;
import org.elasticsearch.xpack.core.transform.transforms.TransformProgress;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
Expand Down Expand Up @@ -74,13 +76,20 @@ public interface ChangeCollector {
/**
* Build the filter query to narrow the result set given the previously collected changes.
*
* TODO: it might be useful to have the full checkpoint data.
*
* @param lastCheckpointTimestamp the timestamp of the last checkpoint
* @param nextCheckpointTimestamp the timestamp of the next (in progress) checkpoint
* @param lastCheckpoint the last checkpoint
* @param nextCheckpoint the next (in progress) checkpoint
* @return a filter query, null in case of no filter
*/
QueryBuilder buildFilterQuery(long lastCheckpointTimestamp, long nextCheckpointTimestamp);
QueryBuilder buildFilterQuery(TransformCheckpoint lastCheckpoint, TransformCheckpoint nextCheckpoint);

/**
* Filter indices according to the given checkpoints.
*
* @param lastCheckpoint the last checkpoint
* @param nextCheckpoint the next (in progress) checkpoint
* @return set of indices to query
*/
Collection<String> getIndicesToQuery(TransformCheckpoint lastCheckpoint, TransformCheckpoint nextCheckpoint);

/**
* Clear the internal state to free up memory.
Expand Down

0 comments on commit 4974a7c

Please sign in to comment.