Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest;
import org.elasticsearch.action.admin.cluster.shards.TransportClusterSearchShardsAction;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
Expand Down Expand Up @@ -57,9 +55,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import static org.elasticsearch.core.Strings.format;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
Expand Down Expand Up @@ -144,7 +140,7 @@ public void setup(final String sourceIndex, int numOfShards, int numOfReplicas,

public void testILMDownsampleRollingRestart() throws Exception {
final InternalTestCluster cluster = internalCluster();
final List<String> masterNodes = cluster.startMasterOnlyNodes(1);
cluster.startMasterOnlyNodes(1);
cluster.startDataOnlyNodes(3);
ensureStableCluster(cluster.size());
ensureGreen();
Expand All @@ -169,46 +165,16 @@ public void testILMDownsampleRollingRestart() throws Exception {
.endObject();
};
int indexedDocs = bulkIndex(sourceIndex, sourceSupplier, DOC_COUNT);
final CountDownLatch disruptionStart = new CountDownLatch(1);
final CountDownLatch disruptionEnd = new CountDownLatch(1);

new Thread(new Disruptor(cluster, sourceIndex, new DisruptionListener() {
@Override
public void disruptionStart() {
disruptionStart.countDown();
}

@Override
public void disruptionEnd() {
disruptionEnd.countDown();
}
}, masterNodes.get(0), (ignored) -> {
try {
cluster.rollingRestart(new InternalTestCluster.RestartCallback() {
@Override
public boolean validateClusterForming() {
return true;
}
});
} catch (Exception e) {
throw new RuntimeException(e);
}
})).start();
cluster.rollingRestart(new InternalTestCluster.RestartCallback());

final String targetIndex = "downsample-1h-" + sourceIndex;
startDownsampleTaskViaIlm(sourceIndex, targetIndex, disruptionStart, disruptionEnd);
waitUntil(() -> getClusterPendingTasks(cluster.client()).pendingTasks().isEmpty(), 60, TimeUnit.SECONDS);
ensureStableCluster(cluster.numDataAndMasterNodes());
assertTargetIndex(cluster, targetIndex, indexedDocs);
startDownsampleTaskViaIlm(sourceIndex, targetIndex);
assertBusy(() -> assertTargetIndex(cluster, targetIndex, indexedDocs));
ensureGreen(targetIndex);
}

private void startDownsampleTaskViaIlm(
String sourceIndex,
String targetIndex,
CountDownLatch disruptionStart,
CountDownLatch disruptionEnd
) throws Exception {
disruptionStart.await();
private void startDownsampleTaskViaIlm(String sourceIndex, String targetIndex) throws Exception {
var request = new UpdateSettingsRequest(sourceIndex).settings(
Settings.builder().put(LifecycleSettings.LIFECYCLE_NAME, POLICY_NAME)
);
Expand All @@ -231,7 +197,6 @@ private void startDownsampleTaskViaIlm(
var getSettingsResponse = client().admin().indices().getSettings(new GetSettingsRequest().indices(targetIndex)).actionGet();
assertThat(getSettingsResponse.getSetting(targetIndex, IndexMetadata.INDEX_DOWNSAMPLE_STATUS.getKey()), equalTo("success"));
}, 60, TimeUnit.SECONDS);
disruptionEnd.await();
}

private void assertTargetIndex(final InternalTestCluster cluster, final String targetIndex, int indexedDocs) {
Expand Down Expand Up @@ -294,53 +259,4 @@ private String randomDateForRange(long start, long end) {
public interface SourceSupplier {
XContentBuilder get() throws IOException;
}

interface DisruptionListener {
void disruptionStart();

void disruptionEnd();
}

private class Disruptor implements Runnable {
final InternalTestCluster cluster;
private final String sourceIndex;
private final DisruptionListener listener;
private final String clientNode;
private final Consumer<String> disruption;

private Disruptor(
final InternalTestCluster cluster,
final String sourceIndex,
final DisruptionListener listener,
final String clientNode,
final Consumer<String> disruption
) {
this.cluster = cluster;
this.sourceIndex = sourceIndex;
this.listener = listener;
this.clientNode = clientNode;
this.disruption = disruption;
}

@Override
public void run() {
listener.disruptionStart();
try {
final String candidateNode = safeExecute(
cluster.client(clientNode),
TransportClusterSearchShardsAction.TYPE,
new ClusterSearchShardsRequest(TEST_REQUEST_TIMEOUT, sourceIndex)
).getNodes()[0].getName();
logger.info("Candidate node [" + candidateNode + "]");
disruption.accept(candidateNode);
ensureGreen(sourceIndex);
ensureStableCluster(cluster.numDataAndMasterNodes(), clientNode);

} catch (Exception e) {
logger.error("Ignoring Error while injecting disruption [" + e.getMessage() + "]");
} finally {
listener.disruptionEnd();
}
}
}
}