Skip to content

Commit

Permalink
[TRANSFORM] Remove HLRC from single node tests (#84220)
Browse files Browse the repository at this point in the history
  • Loading branch information
davidkyle committed Feb 23, 2022
1 parent 28758b0 commit 6155e14
Show file tree
Hide file tree
Showing 6 changed files with 199 additions and 217 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,33 +11,8 @@
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.WarningsHandler;
import org.elasticsearch.client.core.AcknowledgedResponse;
import org.elasticsearch.client.core.PageParams;
import org.elasticsearch.client.transform.DeleteTransformRequest;
import org.elasticsearch.client.transform.GetTransformRequest;
import org.elasticsearch.client.transform.GetTransformResponse;
import org.elasticsearch.client.transform.GetTransformStatsRequest;
import org.elasticsearch.client.transform.GetTransformStatsResponse;
import org.elasticsearch.client.transform.PutTransformRequest;
import org.elasticsearch.client.transform.transforms.DestConfig;
import org.elasticsearch.client.transform.transforms.QueryConfig;
import org.elasticsearch.client.transform.transforms.SourceConfig;
import org.elasticsearch.client.transform.transforms.TransformConfig;
import org.elasticsearch.client.transform.transforms.TransformStats;
import org.elasticsearch.client.transform.transforms.pivot.AggregationConfig;
import org.elasticsearch.client.transform.transforms.pivot.GroupConfig;
import org.elasticsearch.client.transform.transforms.pivot.PivotConfig;
import org.elasticsearch.client.transform.transforms.pivot.TermsGroupSource;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.xcontent.NamedXContentRegistry;
import org.elasticsearch.xpack.core.transform.TransformField;
import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants;
import org.junit.After;
Expand All @@ -50,7 +25,6 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
Expand Down Expand Up @@ -518,83 +492,84 @@ public void testGetStatsWithContinuous() throws Exception {
}, 120, TimeUnit.SECONDS);
}

public void testManyTranformsUsingHlrc() throws IOException {
AggregatorFactories.Builder aggs = AggregatorFactories.builder()
.addAggregator(AggregationBuilders.avg("review_score.avg").field("stars"))
.addAggregator(AggregationBuilders.max("timestamp.max").field("timestamp"));

TransformConfig.Builder configBuilder = TransformConfig.builder()
.setSource(
SourceConfig.builder().setIndex(REVIEWS_INDEX_NAME).setQueryConfig(new QueryConfig(QueryBuilders.matchAllQuery())).build()
)
.setDest(DestConfig.builder().setIndex("dest").build())
.setFrequency(TimeValue.timeValueSeconds(10))
.setDescription("Test 10000 transform configs")
.setPivotConfig(
PivotConfig.builder()
.setGroups(GroupConfig.builder().groupBy("by-user", TermsGroupSource.builder().setField("user_id").build()).build())
.setAggregationConfig(new AggregationConfig(aggs))
.build()
);

try (RestHighLevelClient restClient = new TestRestHighLevelClient()) {
int numberOfTransforms = randomIntBetween(1_500, 4_000);
for (int i = 0; i < numberOfTransforms; ++i) {
AcknowledgedResponse response = restClient.transform()
.putTransform(
new PutTransformRequest(configBuilder.setId(String.format(Locale.ROOT, "t-%05d", i)).build()),
RequestOptions.DEFAULT
);
assertTrue(response.isAcknowledged());
}

for (int i = 0; i < 3; ++i) {
int from = randomIntBetween(0, numberOfTransforms - 1_000);
int size = randomIntBetween(1, 1000);
@SuppressWarnings("unchecked")
public void testManyTransforms() throws IOException {
String config = transformConfig();

int numberOfTransforms = randomIntBetween(1_500, 4_000);
for (int i = 0; i < numberOfTransforms; ++i) {
String transformId = String.format(Locale.ROOT, "t-%05d", i);
final Request createTransformRequest = createRequestWithAuth("PUT", getTransformEndpoint() + transformId, null);
createTransformRequest.setJsonEntity(config);
assertOK(client().performRequest(createTransformRequest));
}

GetTransformRequest request = new GetTransformRequest("*");
request.setPageParams(new PageParams(from, size));
GetTransformStatsRequest statsRequest = new GetTransformStatsRequest("*");
statsRequest.setPageParams(new PageParams(from, size));
for (int i = 0; i < 3; ++i) {
int from = randomIntBetween(0, numberOfTransforms - 1_000);
int size = randomIntBetween(1, 1000);

GetTransformResponse response = restClient.transform().getTransform(request, RequestOptions.DEFAULT);
GetTransformStatsResponse statsResponse = restClient.transform().getTransformStats(statsRequest, RequestOptions.DEFAULT);
var transforms = getTransforms(from, size);
var statsResponse = getTransformsStateAndStats(from, size);

assertEquals(numberOfTransforms, response.getCount());
assertEquals(numberOfTransforms, statsResponse.getCount());
assertEquals(numberOfTransforms, transforms.get("count"));
assertEquals(numberOfTransforms, statsResponse.get("count"));

List<TransformConfig> configs = response.getTransformConfigurations();
List<TransformStats> stats = statsResponse.getTransformsStats();
var configs = (List<Map<String, Object>>) transforms.get("transforms");
var stats = (List<Map<String, Object>>) statsResponse.get("transforms");

assertEquals(size, configs.size());
assertEquals(size, stats.size());
assertEquals(size, configs.size());
assertEquals(size, stats.size());

assertThat(configs.get(0).getId(), equalTo(String.format(Locale.ROOT, "t-%05d", from)));
assertThat(configs.get(configs.size() - 1).getId(), equalTo(String.format(Locale.ROOT, "t-%05d", from + size - 1)));
assertThat(stats.get(0).getId(), equalTo(String.format(Locale.ROOT, "t-%05d", from)));
assertThat(stats.get(stats.size() - 1).getId(), equalTo(String.format(Locale.ROOT, "t-%05d", from + size - 1)));
assertThat(configs.get(0).get("id"), equalTo(String.format(Locale.ROOT, "t-%05d", from)));
assertThat(configs.get(configs.size() - 1).get("id"), equalTo(String.format(Locale.ROOT, "t-%05d", from + size - 1)));
assertThat(stats.get(0).get("id"), equalTo(String.format(Locale.ROOT, "t-%05d", from)));
assertThat(stats.get(stats.size() - 1).get("id"), equalTo(String.format(Locale.ROOT, "t-%05d", from + size - 1)));

if (size > 2) {
int randomElement = randomIntBetween(1, size - 1);
assertThat(configs.get(randomElement).getId(), equalTo(String.format(Locale.ROOT, "t-%05d", from + randomElement)));
assertThat(stats.get(randomElement).getId(), equalTo(String.format(Locale.ROOT, "t-%05d", from + randomElement)));
}
if (size > 2) {
int randomElement = randomIntBetween(1, size - 1);
assertThat(configs.get(randomElement).get("id"), equalTo(String.format(Locale.ROOT, "t-%05d", from + randomElement)));
assertThat(stats.get(randomElement).get("id"), equalTo(String.format(Locale.ROOT, "t-%05d", from + randomElement)));
}
}

for (int i = 0; i < numberOfTransforms; ++i) {
AcknowledgedResponse response = restClient.transform()
.deleteTransform(new DeleteTransformRequest(String.format(Locale.ROOT, "t-%05d", i)), RequestOptions.DEFAULT);
assertTrue(response.isAcknowledged());
}
for (int i = 0; i < numberOfTransforms; ++i) {
deleteTransform(String.format(Locale.ROOT, "t-%05d", i));
}
}

protected static class TestRestHighLevelClient extends RestHighLevelClient {
private static final List<NamedXContentRegistry.Entry> X_CONTENT_ENTRIES = new SearchModule(Settings.EMPTY, emptyList())
.getNamedXContents();

TestRestHighLevelClient() {
super(client(), restClient -> {}, X_CONTENT_ENTRIES);
}
private static String transformConfig() {
return """
{
"description": "Test 10000 transform configs",
"source": {
"index":""" + "\"" + REVIEWS_INDEX_NAME + "\"" + """
},
"pivot": {
"group_by": {
"by-user": {
"terms": {
"field": "user_id"
}
}
},
"aggregations": {
"review_score.avg": {
"avg": {
"field": "stars"
}
},
"timestamp.max": {
"max": {
"field": "timestamp"
}
}
}
},
"dest": {
"index":"dest"
},
"frequency": "10s"
}
""";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1347,8 +1347,8 @@ public void testPivotWithGeoBoundsAgg() throws Exception {
)).get(0);
assertThat(actualObj.get("type"), equalTo("point"));
List<Double> coordinates = (List<Double>) actualObj.get("coordinates");
assertEquals((4 + 10), coordinates.get(1), 0.000001);
assertEquals((4 + 15), coordinates.get(0), 0.000001);
assertEquals(-76.0, coordinates.get(1), 0.000001);
assertEquals(-161.0, coordinates.get(0), 0.000001);
}

public void testPivotWithGeoCentroidAgg() throws Exception {
Expand Down Expand Up @@ -1411,8 +1411,8 @@ public void testPivotWithGeoCentroidAgg() throws Exception {
assertEquals(3.878048780, actual.doubleValue(), 0.000001);
String actualString = (String) ((List<?>) XContentMapValues.extractValue("hits.hits._source.location", searchResult)).get(0);
String[] latlon = actualString.split(",");
assertEquals((4 + 10), Double.valueOf(latlon[0]), 0.000001);
assertEquals((4 + 15), Double.valueOf(latlon[1]), 0.000001);
assertEquals(-76.0, Double.valueOf(latlon[0]), 0.000001);
assertEquals(-161.0, Double.valueOf(latlon[1]), 0.000001);
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,8 @@ public void testRestrictiveBucketSelector() throws Exception {
String indexName = "special_pivot_bucket_selector_reviews";
createReviewsIndex(indexName, 1000, 327, "date", false, 5, "affiliate_id");

verifyDestIndexHitsCount(indexName, "special_pivot_bucket_selector-10", 10, 14);
verifyDestIndexHitsCount(indexName, "special_pivot_bucket_selector-10000", 10000, 14);
verifyDestIndexHitsCount(indexName, "special_pivot_bucket_selector-10", 10, 41);
verifyDestIndexHitsCount(indexName, "special_pivot_bucket_selector-10000", 10000, 41);
}

private void verifyDestIndexHitsCount(String sourceIndex, String transformId, int maxPageSearchSize, long expectedDestIndexCount)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;

public abstract class TransformRestTestCase extends ESRestTestCase {

Expand Down Expand Up @@ -88,7 +87,7 @@ protected void createReviewsIndex(
min = 10 + (i % 49);
}
int sec = 10 + (i % 49);
String location = (user + 10) + "," + (user + 15);
String location = (((user + 10) % 180) - 90) + "," + (((user + 15) % 360) - 180);

String date_string = "2017-01-" + day + "T" + hour + ":" + min + ":" + sec;
if (dateType.equals("date_nanos")) {
Expand Down Expand Up @@ -123,21 +122,33 @@ protected void createReviewsIndex(

if (i % 50 == 0) {
bulk.append("\r\n");
final Request bulkRequest = new Request("POST", "/_bulk");
bulkRequest.addParameter("refresh", "true");
bulkRequest.setJsonEntity(bulk.toString());
client().performRequest(bulkRequest);
doBulk(bulk.toString(), true);
// clear the builder
bulk.setLength(0);
day += 1;
}
}

bulk.append("\r\n");
doBulk(bulk.toString(), true);
}

final Request bulkRequest = new Request("POST", "/_bulk");
bulkRequest.addParameter("refresh", "true");
bulkRequest.setJsonEntity(bulk.toString());
client().performRequest(bulkRequest);
@SuppressWarnings("unchecked")
protected void doBulk(String bulkDocuments, boolean refresh) throws IOException {
Request bulkRequest = new Request("POST", "/_bulk");
if (refresh) {
bulkRequest.addParameter("refresh", "true");
}
bulkRequest.setJsonEntity(bulkDocuments);
bulkRequest.setOptions(RequestOptions.DEFAULT);
Response bulkResponse = client().performRequest(bulkRequest);
assertOK(bulkResponse);
var bulkMap = entityAsMap(bulkResponse);
var hasErrors = (boolean) bulkMap.get("errors");
if (hasErrors) {
var items = (List<Map<String, Object>>) bulkMap.get("items");
fail("Bulk item failures: " + items.toString());
}
}

protected void putReviewsIndex(String indexName, String dateType, boolean isDataStream) throws IOException {
Expand Down Expand Up @@ -463,6 +474,13 @@ protected static List<Map<String, Object>> getTransforms(List<Map<String, String
return transformConfigs == null ? Collections.emptyList() : transformConfigs;
}

@SuppressWarnings("unchecked")
protected static Map<String, Object> getTransforms(int from, int size) throws IOException {
Request request = new Request("GET", getTransformEndpoint() + "_all?from=" + from + "&size=" + size);
Response response = adminClient().performRequest(request);
return entityAsMap(response);
}

protected static String getTransformState(String transformId) throws IOException {
Map<?, ?> transformStatsAsMap = getTransformStateAndStats(transformId);
return transformStatsAsMap == null ? null : (String) XContentMapValues.extractValue("state", transformStatsAsMap);
Expand All @@ -477,6 +495,13 @@ protected static String getTransformState(String transformId) throws IOException
return (Map<?, ?>) transforms.get(0);
}

protected static Map<String, Object> getTransformsStateAndStats(int from, int size) throws IOException {
Response statsResponse = client().performRequest(
new Request("GET", getTransformEndpoint() + "_stats?from=" + from + "&size=" + size)
);
return entityAsMap(statsResponse);
}

protected static void deleteTransform(String transformId) throws IOException {
Request request = new Request("DELETE", getTransformEndpoint() + transformId);
request.addParameter("ignore", "404"); // Ignore 404s because they imply someone was racing us to delete this
Expand Down

0 comments on commit 6155e14

Please sign in to comment.