Skip to content

Commit

Permalink
[TRANSFORM] Remove HLRC from multi node tests - part 1 (#84347)
Browse files Browse the repository at this point in the history
  • Loading branch information
davidkyle committed Feb 28, 2022
1 parent ed0bb2a commit 41ecab8
Show file tree
Hide file tree
Showing 6 changed files with 609 additions and 587 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,10 @@ private static TransformConfig applyRewriteForUpdate(Builder builder) {
return builder.setVersion(Version.CURRENT).build();
}

public static Builder builder() {
return new Builder();
}

public static class Builder {
private String id;
private SourceConfig source;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,23 @@

package org.elasticsearch.xpack.transform.integration;

import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.GetMappingsRequest;
import org.elasticsearch.client.indices.GetMappingsResponse;
import org.elasticsearch.client.transform.PreviewTransformResponse;
import org.elasticsearch.client.transform.transforms.TransformConfig;
import org.elasticsearch.client.transform.transforms.latest.LatestConfig;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.xpack.core.transform.transforms.QueryConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
import org.elasticsearch.xpack.core.transform.transforms.latest.LatestConfig;
import org.junit.After;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
import java.util.stream.Collectors;

import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonMap;
import static java.util.stream.Collectors.toList;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
Expand All @@ -38,20 +32,19 @@
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;

@SuppressWarnings("removal")
public class LatestIT extends TransformIntegTestCase {

private static final String SOURCE_INDEX_NAME = "basic-crud-latest-reviews";
private static final int NUM_USERS = 28;

private static final String TRANSFORM_NAME = "transform-crud-latest";

private static final Integer getUserIdForRow(int row) {
private static Integer getUserIdForRow(int row) {
int userId = row % (NUM_USERS + 1);
return userId < NUM_USERS ? userId : null;
}

private static final String getDateStringForRow(int row) {
private static String getDateStringForRow(int row) {
int month = 1 + (row / 28);
int day = 1 + (row % 28);
return "2017-" + (month < 10 ? "0" + month : month) + "-" + (day < 10 ? "0" + day : day) + "T12:30:00Z";
Expand All @@ -64,7 +57,7 @@ private static final String getDateStringForRow(int row) {
private static final String STARS = "stars";
private static final String COMMENT = "comment";

private static final Map<String, Object> row(String userId, String businessId, int count, int stars, String timestamp, String comment) {
private static Map<String, Object> row(String userId, String businessId, int count, int stars, String timestamp, String comment) {
return new HashMap<>() {
{
if (userId != null) {
Expand Down Expand Up @@ -113,59 +106,60 @@ private static final Map<String, Object> row(String userId, String businessId, i
row(null, "business_36", 86, 1, "2017-04-03T12:30:00Z", "Great stuff, deserves 1 stars") };

@After
public void cleanTransforms() throws IOException {
public void cleanTransforms() throws Exception {
cleanUp();
}

@SuppressWarnings("unchecked")
public void testLatest() throws Exception {
createReviewsIndex(SOURCE_INDEX_NAME, 100, NUM_USERS, LatestIT::getUserIdForRow, LatestIT::getDateStringForRow);

String destIndexName = "reviews-latest";
TransformConfig transformConfig = createTransformConfigBuilder(
TRANSFORM_NAME,
destIndexName,
QueryBuilders.matchAllQuery(),
QueryConfig.matchAll(),
SOURCE_INDEX_NAME
).setLatestConfig(LatestConfig.builder().setUniqueKey(USER_ID).setSort(TIMESTAMP).build()).build();
assertTrue(putTransform(transformConfig, RequestOptions.DEFAULT).isAcknowledged());
assertTrue(startTransform(transformConfig.getId(), RequestOptions.DEFAULT).isAcknowledged());
).setLatestConfig(new LatestConfig(List.of(USER_ID), TIMESTAMP)).build();
putTransform(TRANSFORM_NAME, Strings.toString(transformConfig), RequestOptions.DEFAULT);
startTransform(transformConfig.getId(), RequestOptions.DEFAULT);
waitUntilCheckpoint(transformConfig.getId(), 1L);
stopTransform(transformConfig.getId());

try (RestHighLevelClient restClient = new TestRestHighLevelClient()) {
restClient.indices().refresh(new RefreshRequest(destIndexName), RequestOptions.DEFAULT);
// Verify destination index mappings
GetMappingsResponse destIndexMapping = restClient.indices()
.getMapping(new GetMappingsRequest().indices(destIndexName), RequestOptions.DEFAULT);
assertThat(destIndexMapping.mappings().get(destIndexName).sourceAsMap(), allOf(hasKey("_meta"), hasKey("properties")));
// Verify destination index contents
SearchResponse searchResponse = restClient.search(
new SearchRequest(destIndexName).source(new SearchSourceBuilder().size(1000)),
RequestOptions.DEFAULT
);
assertThat(searchResponse.getHits().getTotalHits().value, is(equalTo(Long.valueOf(NUM_USERS + 1))));
assertThat(
Stream.of(searchResponse.getHits().getHits()).map(SearchHit::getSourceAsMap).collect(toList()),
containsInAnyOrder(EXPECTED_DEST_INDEX_ROWS)
);
}
refreshIndex(destIndexName, RequestOptions.DEFAULT);
var mappings = getIndexMapping(destIndexName, RequestOptions.DEFAULT);
assertThat(
(Map<String, Object>) XContentMapValues.extractValue(destIndexName + ".mappings", mappings),
allOf(hasKey("_meta"), hasKey("properties"))
);
var searchResponse = search(destIndexName, 1000, RequestOptions.DEFAULT);
assertThat((Integer) XContentMapValues.extractValue("hits.total.value", searchResponse), is(equalTo(NUM_USERS + 1)));
var hits = (List<Map<String, Object>>) XContentMapValues.extractValue("hits.hits", searchResponse);
var searchedDocs = hits.stream().map(h -> (Map<String, Object>) h.get("_source")).collect(Collectors.toList());
assertThat(searchedDocs, containsInAnyOrder(EXPECTED_DEST_INDEX_ROWS));
}

private Map<String, Object> search(String index, int size, RequestOptions options) throws IOException {
var r = new Request("GET", index + "/_search?size=" + size);
r.setOptions(options);
return entityAsMap(client().performRequest(r));
}

@SuppressWarnings("unchecked")
public void testLatestPreview() throws Exception {
createReviewsIndex(SOURCE_INDEX_NAME, 100, NUM_USERS, LatestIT::getUserIdForRow, LatestIT::getDateStringForRow);

TransformConfig transformConfig = createTransformConfigBuilder(
TRANSFORM_NAME,
"dummy",
QueryBuilders.matchAllQuery(),
SOURCE_INDEX_NAME
).setLatestConfig(LatestConfig.builder().setUniqueKey(USER_ID).setSort(TIMESTAMP).build()).build();
TransformConfig transformConfig = createTransformConfigBuilder(TRANSFORM_NAME, "dummy", QueryConfig.matchAll(), SOURCE_INDEX_NAME)
.setLatestConfig(new LatestConfig(List.of(USER_ID), TIMESTAMP))
.build();

PreviewTransformResponse previewResponse = previewTransform(transformConfig, RequestOptions.DEFAULT);
var previewResponse = previewTransform(Strings.toString(transformConfig), RequestOptions.DEFAULT);
// Verify preview mappings
assertThat(previewResponse.getMappings(), allOf(hasKey("_meta"), hasEntry("properties", emptyMap())));
var mappings = (Map<String, Object>) XContentMapValues.extractValue("generated_dest_index.mappings", previewResponse);
assertThat(mappings, allOf(hasKey("_meta"), hasEntry("properties", emptyMap())));
// Verify preview contents
assertThat(previewResponse.getDocs(), hasSize(NUM_USERS + 1));
assertThat(previewResponse.getDocs(), containsInAnyOrder(EXPECTED_DEST_INDEX_ROWS));
var docs = (List<Map<String, Object>>) XContentMapValues.extractValue("preview", previewResponse);
assertThat(docs, hasSize(NUM_USERS + 1));
assertThat(docs, containsInAnyOrder(EXPECTED_DEST_INDEX_ROWS));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,17 @@
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.transform.transforms.TimeSyncConfig;
import org.elasticsearch.client.transform.transforms.TransformConfig;
import org.elasticsearch.client.transform.transforms.pivot.SingleGroupSource;
import org.elasticsearch.client.transform.transforms.pivot.TermsGroupSource;
import org.elasticsearch.common.Strings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.xpack.core.transform.transforms.QueryConfig;
import org.elasticsearch.xpack.core.transform.transforms.TimeSyncConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
import org.elasticsearch.xpack.core.transform.transforms.pivot.SingleGroupSource;
import org.elasticsearch.xpack.core.transform.transforms.pivot.TermsGroupSource;
import org.junit.After;
import org.junit.Before;

Expand Down Expand Up @@ -61,8 +62,8 @@ public void testTransformFeatureReset() throws Exception {

Map<String, SingleGroupSource> groups = new HashMap<>();
groups.put("by-day", createDateHistogramGroupSourceWithCalendarInterval("timestamp", DateHistogramInterval.DAY, null));
groups.put("by-user", TermsGroupSource.builder().setField("user_id").build());
groups.put("by-business", TermsGroupSource.builder().setField("business_id").build());
groups.put("by-user", new TermsGroupSource("user_id", null, false));
groups.put("by-business", new TermsGroupSource("business_id", null, false));

AggregatorFactories.Builder aggs = AggregatorFactories.builder()
.addAggregator(AggregationBuilders.avg("review_score").field("stars"))
Expand All @@ -71,21 +72,21 @@ public void testTransformFeatureReset() throws Exception {
TransformConfig config = createTransformConfigBuilder(
transformId,
"reviews-by-user-business-day",
QueryBuilders.matchAllQuery(),
QueryConfig.matchAll(),
indexName
).setPivotConfig(createPivotConfig(groups, aggs)).build();

assertTrue(putTransform(config, RequestOptions.DEFAULT).isAcknowledged());
assertTrue(startTransform(config.getId(), RequestOptions.DEFAULT).isAcknowledged());
putTransform(transformId, Strings.toString(config), RequestOptions.DEFAULT);
startTransform(config.getId(), RequestOptions.DEFAULT);

transformId = "continuous-transform-feature-reset";
config = createTransformConfigBuilder(transformId, "reviews-by-user-business-day-cont", QueryBuilders.matchAllQuery(), indexName)
String continuousTransformId = "continuous-transform-feature-reset";
config = createTransformConfigBuilder(continuousTransformId, "reviews-by-user-business-day-cont", QueryConfig.matchAll(), indexName)
.setPivotConfig(createPivotConfig(groups, aggs))
.setSyncConfig(TimeSyncConfig.builder().setField("timestamp").setDelay(TimeValue.timeValueSeconds(1)).build())
.setSyncConfig(new TimeSyncConfig("timestamp", TimeValue.timeValueSeconds(1)))
.build();

assertTrue(putTransform(config, RequestOptions.DEFAULT).isAcknowledged());
assertTrue(startTransform(config.getId(), RequestOptions.DEFAULT).isAcknowledged());
putTransform(continuousTransformId, Strings.toString(config), RequestOptions.DEFAULT);
startTransform(continuousTransformId, RequestOptions.DEFAULT);
client().performRequest(new Request(HttpPost.METHOD_NAME, "/_features/_reset"));

Response response = adminClient().performRequest(new Request("GET", "/_cluster/state?metric=metadata"));
Expand All @@ -97,7 +98,7 @@ public void testTransformFeatureReset() throws Exception {
assertThat(transformMetadata, is(nullValue()));

// assert transforms are gone
assertThat(getTransform("_all").getCount(), equalTo(0L));
assertThat((Integer) getTransforms("_all").get("count"), equalTo(0));

// assert transform indices are gone
assertThat(ESRestTestCase.entityAsMap(adminClient().performRequest(new Request("GET", ".transform-*"))), is(anEmptyMap()));
Expand Down

0 comments on commit 41ecab8

Please sign in to comment.