Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Rename DataFrameAnalyticsIndex to DestinationIndex #51353

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -186,7 +186,7 @@ private void reindexDataframeAndStartAnalysis(DataFrameAnalyticsTask task, DataF
reindexRequest.setSourceQuery(config.getSource().getParsedQuery());
reindexRequest.getSearchRequest().source().fetchSource(config.getSource().getSourceFiltering());
reindexRequest.setDestIndex(config.getDest().getIndex());
reindexRequest.setScript(new Script("ctx._source." + DataFrameAnalyticsIndex.ID_COPY + " = ctx._id"));
reindexRequest.setScript(new Script("ctx._source." + DestinationIndex.ID_COPY + " = ctx._id"));

final ThreadContext threadContext = client.threadPool().getThreadContext();
final Supplier<ThreadContext.StoredContext> supplier = threadContext.newRestorableContext(false);
Expand All @@ -206,7 +206,7 @@ private void reindexDataframeAndStartAnalysis(DataFrameAnalyticsTask task, DataF
config.getId(),
Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_REUSING_DEST_INDEX, indexResponse.indices()[0]));
LOGGER.info("[{}] Using existing destination index [{}]", config.getId(), indexResponse.indices()[0]);
DataFrameAnalyticsIndex.updateMappingsToDestIndex(client, config, indexResponse, ActionListener.wrap(
DestinationIndex.updateMappingsToDestIndex(client, config, indexResponse, ActionListener.wrap(
acknowledgedResponse -> copyIndexCreatedListener.onResponse(null),
copyIndexCreatedListener::onFailure
));
Expand All @@ -217,7 +217,7 @@ private void reindexDataframeAndStartAnalysis(DataFrameAnalyticsTask task, DataF
config.getId(),
Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_CREATING_DEST_INDEX, config.getDest().getIndex()));
LOGGER.info("[{}] Creating destination index [{}]", config.getId(), config.getDest().getIndex());
DataFrameAnalyticsIndex.createDestinationIndex(client, Clock.systemUTC(), config, copyIndexCreatedListener);
DestinationIndex.createDestinationIndex(client, Clock.systemUTC(), config, copyIndexCreatedListener);
} else {
copyIndexCreatedListener.onFailure(e);
}
Expand Down
Expand Up @@ -42,9 +42,9 @@
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;

/**
* {@link DataFrameAnalyticsIndex} class encapsulates logic for creating destination index based on source index metadata.
* {@link DestinationIndex} class encapsulates logic for creating destination index based on source index metadata.
*/
public final class DataFrameAnalyticsIndex {
public final class DestinationIndex {

public static final String ID_COPY = "ml__id_copy";

Expand All @@ -65,7 +65,7 @@ public final class DataFrameAnalyticsIndex {
*/
private static final String[] PRESERVED_SETTINGS = new String[] {"index.number_of_shards", "index.number_of_replicas"};

private DataFrameAnalyticsIndex() {}
private DestinationIndex() {}

/**
* Creates destination index based on source index metadata.
Expand Down
Expand Up @@ -24,7 +24,7 @@
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ml.dataframe.analyses.DataFrameAnalysis;
import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsIndex;
import org.elasticsearch.xpack.ml.dataframe.DestinationIndex;
import org.elasticsearch.xpack.ml.extractor.ExtractedField;

import java.io.IOException;
Expand Down Expand Up @@ -131,7 +131,7 @@ private SearchRequestBuilder buildSearchRequest() {
.setScroll(SCROLL_TIMEOUT)
// This ensures the search throws if there are failures and the scroll context gets cleared automatically
.setAllowPartialSearchResults(false)
.addSort(DataFrameAnalyticsIndex.ID_COPY, SortOrder.ASC)
.addSort(DestinationIndex.ID_COPY, SortOrder.ASC)
.setIndices(context.indices)
.setSize(context.scrollSize)
.setQuery(context.query);
Expand Down
Expand Up @@ -26,7 +26,7 @@
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.ml.utils.NameResolver;
import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsIndex;
import org.elasticsearch.xpack.ml.dataframe.DestinationIndex;
import org.elasticsearch.xpack.ml.extractor.ExtractedField;
import org.elasticsearch.xpack.ml.extractor.ExtractedFields;

Expand All @@ -52,7 +52,7 @@ public class ExtractedFieldsDetector {
* Fields to ignore. These are mostly internal meta fields.
*/
private static final List<String> IGNORE_FIELDS = Arrays.asList("_id", "_field_names", "_index", "_parent", "_routing", "_seq_no",
"_source", "_type", "_uid", "_version", "_feature", "_ignored", "_nested_path", DataFrameAnalyticsIndex.ID_COPY);
"_source", "_type", "_uid", "_version", "_feature", "_ignored", "_nested_path", DestinationIndex.ID_COPY);

private final String[] index;
private final DataFrameAnalyticsConfig config;
Expand Down
Expand Up @@ -66,7 +66,7 @@
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;

public class DataFrameAnalyticsIndexTests extends ESTestCase {
public class DestinationIndexTests extends ESTestCase {

private static final String ANALYTICS_ID = "some-analytics-id";
private static final String[] SOURCE_INDEX = new String[] {"source-index"};
Expand Down Expand Up @@ -142,7 +142,7 @@ private Map<String, Object> testCreateDestinationIndex(DataFrameAnalysis analysi
doAnswer(callListenerOnResponse(getMappingsResponse))
.when(client).execute(eq(GetMappingsAction.INSTANCE), getMappingsRequestCaptor.capture(), any());

DataFrameAnalyticsIndex.createDestinationIndex(
DestinationIndex.createDestinationIndex(
client,
clock,
config,
Expand Down Expand Up @@ -229,7 +229,7 @@ public void testCreateDestinationIndex_ResultsFieldsExistsInSourceIndex() {
doAnswer(callListenerOnResponse(getSettingsResponse)).when(client).execute(eq(GetSettingsAction.INSTANCE), any(), any());
doAnswer(callListenerOnResponse(getMappingsResponse)).when(client).execute(eq(GetMappingsAction.INSTANCE), any(), any());

DataFrameAnalyticsIndex.createDestinationIndex(
DestinationIndex.createDestinationIndex(
client,
clock,
config,
Expand Down Expand Up @@ -262,7 +262,7 @@ private Map<String, Object> testUpdateMappingsToDestIndex(DataFrameAnalysis anal
doAnswer(callListenerOnResponse(new AcknowledgedResponse(true)))
.when(client).execute(eq(PutMappingAction.INSTANCE), putMappingRequestCaptor.capture(), any());

DataFrameAnalyticsIndex.updateMappingsToDestIndex(
DestinationIndex.updateMappingsToDestIndex(
client,
config,
getIndexResponse,
Expand Down Expand Up @@ -330,7 +330,7 @@ public void testUpdateMappingsToDestIndex_ResultsFieldsExistsInSourceIndex() {
ElasticsearchStatusException e =
expectThrows(
ElasticsearchStatusException.class,
() -> DataFrameAnalyticsIndex.updateMappingsToDestIndex(
() -> DestinationIndex.updateMappingsToDestIndex(
client, config, getIndexResponse, ActionListener.wrap(Assert::fail)));
assertThat(
e.getMessage(),
Expand Down