Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CCR: Rename follow-task parameters and stats #34836

Merged
merged 4 commits into from Oct 25, 2018
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -25,21 +25,21 @@
body:
remote_cluster: local
leader_index_patterns: ['logs-*']
max_concurrent_read_batches: 2
max_outstanding_read_requests: 2
- is_true: acknowledged

- do:
ccr.get_auto_follow_pattern:
name: my_pattern
- match: { my_pattern.remote_cluster: 'local' }
- match: { my_pattern.leader_index_patterns: ['logs-*'] }
- match: { my_pattern.max_concurrent_read_batches: 2 }
- match: { my_pattern.max_outstanding_read_requests: 2 }

- do:
ccr.get_auto_follow_pattern: {}
- match: { my_pattern.remote_cluster: 'local' }
- match: { my_pattern.leader_index_patterns: ['logs-*'] }
- match: { my_pattern.max_concurrent_read_batches: 2 }
- match: { my_pattern.max_outstanding_read_requests: 2 }

- do:
ccr.delete_auto_follow_pattern:
Expand Down
Expand Up @@ -56,21 +56,21 @@
- gte: { indices.0.shards.0.follower_global_checkpoint: -1 }
- gte: { indices.0.shards.0.follower_max_seq_no: -1 }
- gte: { indices.0.shards.0.last_requested_seq_no: -1 }
- gte: { indices.0.shards.0.number_of_concurrent_reads: 0 }
- match: { indices.0.shards.0.number_of_concurrent_writes: 0 }
- match: { indices.0.shards.0.number_of_queued_writes: 0 }
- gte: { indices.0.shards.0.mapping_version: 0 }
- gte: { indices.0.shards.0.total_fetch_time_millis: 0 }
- gte: { indices.0.shards.0.number_of_successful_fetches: 0 }
- gte: { indices.0.shards.0.number_of_failed_fetches: 0 }
- match: { indices.0.shards.0.operations_received: 0 }
- match: { indices.0.shards.0.total_transferred_bytes: 0 }
- match: { indices.0.shards.0.total_index_time_millis: 0 }
- match: { indices.0.shards.0.number_of_successful_bulk_operations: 0 }
- match: { indices.0.shards.0.number_of_failed_bulk_operations: 0 }
- match: { indices.0.shards.0.number_of_operations_indexed: 0 }
- length: { indices.0.shards.0.fetch_exceptions: 0 }
- gte: { indices.0.shards.0.time_since_last_fetch_millis: -1 }
- gte: { indices.0.shards.0.outstanding_read_requests: 0 }
- match: { indices.0.shards.0.outstanding_write_requests: 0 }
- match: { indices.0.shards.0.write_buffer_operation_count: 0 }
- gte: { indices.0.shards.0.follower_mapping_version: 0 }
- gte: { indices.0.shards.0.total_read_time_millis: 0 }
- gte: { indices.0.shards.0.successful_read_requests: 0 }
- gte: { indices.0.shards.0.failed_read_requests: 0 }
- match: { indices.0.shards.0.operations_read: 0 }
- match: { indices.0.shards.0.bytes_read: 0 }
- match: { indices.0.shards.0.total_write_time_millis: 0 }
- match: { indices.0.shards.0.successful_write_requests: 0 }
- match: { indices.0.shards.0.failed_write_requests: 0 }
- match: { indices.0.shards.0.operations_written: 0 }
- length: { indices.0.shards.0.read_exceptions: 0 }
- gte: { indices.0.shards.0.time_since_last_read_millis: -1 }

- do:
ccr.pause_follow:
Expand Down
Expand Up @@ -59,7 +59,7 @@ protected static void refresh(String index) throws IOException {

protected static void resumeFollow(String followIndex) throws IOException {
final Request request = new Request("POST", "/" + followIndex + "/_ccr/resume_follow");
request.setJsonEntity("{\"poll_timeout\": \"10ms\"}");
request.setJsonEntity("{\"read_poll_timeout\": \"10ms\"}");
assertOK(client().performRequest(request));
}

Expand All @@ -74,7 +74,7 @@ protected static void followIndex(String leaderCluster, String leaderIndex, Stri
protected static void followIndex(RestClient client, String leaderCluster, String leaderIndex, String followIndex) throws IOException {
final Request request = new Request("PUT", "/" + followIndex + "/_ccr/follow");
request.setJsonEntity("{\"remote_cluster\": \"" + leaderCluster + "\", \"leader_index\": \"" + leaderIndex +
"\", \"poll_timeout\": \"10ms\"}");
"\", \"read_poll_timeout\": \"10ms\"}");
assertOK(client.performRequest(request));
}

Expand Down Expand Up @@ -136,10 +136,10 @@ protected static void verifyCcrMonitoring(final String expectedLeaderIndex, fina
assertThat(followerIndex, equalTo(expectedFollowerIndex));

int foundNumberOfOperationsReceived =
(int) XContentMapValues.extractValue("_source.ccr_stats.operations_received", hit);
(int) XContentMapValues.extractValue("_source.ccr_stats.operations_read", hit);
numberOfOperationsReceived = Math.max(numberOfOperationsReceived, foundNumberOfOperationsReceived);
int foundNumberOfOperationsIndexed =
(int) XContentMapValues.extractValue("_source.ccr_stats.number_of_operations_indexed", hit);
(int) XContentMapValues.extractValue("_source.ccr_stats.operations_written", hit);
numberOfOperationsIndexed = Math.max(numberOfOperationsIndexed, foundNumberOfOperationsIndexed);
}

Expand Down
Expand Up @@ -324,14 +324,16 @@ private void followLeaderIndex(String autoFollowPattenName,

ResumeFollowAction.Request followRequest = new ResumeFollowAction.Request();
followRequest.setFollowerIndex(followIndexName);
followRequest.setMaxBatchOperationCount(pattern.getMaxBatchOperationCount());
followRequest.setMaxConcurrentReadBatches(pattern.getMaxConcurrentReadBatches());
followRequest.setMaxBatchSize(pattern.getMaxBatchSize());
followRequest.setMaxConcurrentWriteBatches(pattern.getMaxConcurrentWriteBatches());
followRequest.setMaxReadRequestOperationCount(pattern.getMaxReadRequestOperationCount());
followRequest.setMaxReadRequestSize(pattern.getMaxReadRequestSize());
followRequest.setMaxOutstandingReadRequests(pattern.getMaxOutstandingReadRequests());
followRequest.setMaxWriteRequestOperationCount(pattern.getMaxWriteRequestOperationCount());
followRequest.setMaxWriteRequestSize(pattern.getMaxWriteRequestSize());
followRequest.setMaxOutstandingWriteRequests(pattern.getMaxOutstandingWriteRequests());
followRequest.setMaxWriteBufferCount(pattern.getMaxWriteBufferCount());
followRequest.setMaxWriteBufferSize(pattern.getMaxWriteBufferSize());
followRequest.setMaxRetryDelay(pattern.getMaxRetryDelay());
followRequest.setPollTimeout(pattern.getPollTimeout());
followRequest.setReadPollTimeout(pattern.getPollTimeout());

PutFollowAction.Request request = new PutFollowAction.Request();
request.setRemoteCluster(remoteCluster);
Expand Down
Expand Up @@ -65,8 +65,8 @@ public static class Request extends SingleShardRequest<Request> {
private int maxOperationCount;
private ShardId shardId;
private String expectedHistoryUUID;
private TimeValue pollTimeout = TransportResumeFollowAction.DEFAULT_POLL_TIMEOUT;
private ByteSizeValue maxBatchSize = TransportResumeFollowAction.DEFAULT_MAX_BATCH_SIZE;
private TimeValue pollTimeout = TransportResumeFollowAction.DEFAULT_READ_POLL_TIMEOUT;
private ByteSizeValue maxBatchSize = TransportResumeFollowAction.DEFAULT_MAX_READ_REQUEST_SIZE;

private long relativeStartNanos;

Expand Down