Skip to content

Commit

Permalink
[ML] Rename DataFrameAnalyticsIndex to DestinationIndex (#51353)
Browse files Browse the repository at this point in the history
As we prepare to introduce a new index for storing additional
information about data frame analytics jobs (e.g. intrumentation),
renaming this class to `DestinationIndex` better captures what it does
and leaves its prior name available for a more suitable use.
  • Loading branch information
dimitris-athanasiou committed Jan 23, 2020
1 parent 589e033 commit 407f920
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 15 deletions.
Expand Up @@ -186,7 +186,7 @@ private void reindexDataframeAndStartAnalysis(DataFrameAnalyticsTask task, DataF
reindexRequest.setSourceQuery(config.getSource().getParsedQuery());
reindexRequest.getSearchRequest().source().fetchSource(config.getSource().getSourceFiltering());
reindexRequest.setDestIndex(config.getDest().getIndex());
reindexRequest.setScript(new Script("ctx._source." + DataFrameAnalyticsIndex.ID_COPY + " = ctx._id"));
reindexRequest.setScript(new Script("ctx._source." + DestinationIndex.ID_COPY + " = ctx._id"));

final ThreadContext threadContext = client.threadPool().getThreadContext();
final Supplier<ThreadContext.StoredContext> supplier = threadContext.newRestorableContext(false);
Expand All @@ -206,7 +206,7 @@ private void reindexDataframeAndStartAnalysis(DataFrameAnalyticsTask task, DataF
config.getId(),
Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_REUSING_DEST_INDEX, indexResponse.indices()[0]));
LOGGER.info("[{}] Using existing destination index [{}]", config.getId(), indexResponse.indices()[0]);
DataFrameAnalyticsIndex.updateMappingsToDestIndex(client, config, indexResponse, ActionListener.wrap(
DestinationIndex.updateMappingsToDestIndex(client, config, indexResponse, ActionListener.wrap(
acknowledgedResponse -> copyIndexCreatedListener.onResponse(null),
copyIndexCreatedListener::onFailure
));
Expand All @@ -217,7 +217,7 @@ private void reindexDataframeAndStartAnalysis(DataFrameAnalyticsTask task, DataF
config.getId(),
Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_CREATING_DEST_INDEX, config.getDest().getIndex()));
LOGGER.info("[{}] Creating destination index [{}]", config.getId(), config.getDest().getIndex());
DataFrameAnalyticsIndex.createDestinationIndex(client, Clock.systemUTC(), config, copyIndexCreatedListener);
DestinationIndex.createDestinationIndex(client, Clock.systemUTC(), config, copyIndexCreatedListener);
} else {
copyIndexCreatedListener.onFailure(e);
}
Expand Down
Expand Up @@ -42,9 +42,9 @@
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;

/**
* {@link DataFrameAnalyticsIndex} class encapsulates logic for creating destination index based on source index metadata.
* {@link DestinationIndex} class encapsulates logic for creating destination index based on source index metadata.
*/
public final class DataFrameAnalyticsIndex {
public final class DestinationIndex {

public static final String ID_COPY = "ml__id_copy";

Expand All @@ -65,7 +65,7 @@ public final class DataFrameAnalyticsIndex {
*/
private static final String[] PRESERVED_SETTINGS = new String[] {"index.number_of_shards", "index.number_of_replicas"};

private DataFrameAnalyticsIndex() {}
private DestinationIndex() {}

/**
* Creates destination index based on source index metadata.
Expand Down
Expand Up @@ -24,7 +24,7 @@
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ml.dataframe.analyses.DataFrameAnalysis;
import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsIndex;
import org.elasticsearch.xpack.ml.dataframe.DestinationIndex;
import org.elasticsearch.xpack.ml.extractor.ExtractedField;

import java.io.IOException;
Expand Down Expand Up @@ -131,7 +131,7 @@ private SearchRequestBuilder buildSearchRequest() {
.setScroll(SCROLL_TIMEOUT)
// This ensures the search throws if there are failures and the scroll context gets cleared automatically
.setAllowPartialSearchResults(false)
.addSort(DataFrameAnalyticsIndex.ID_COPY, SortOrder.ASC)
.addSort(DestinationIndex.ID_COPY, SortOrder.ASC)
.setIndices(context.indices)
.setSize(context.scrollSize)
.setQuery(context.query);
Expand Down
Expand Up @@ -26,7 +26,7 @@
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.ml.utils.NameResolver;
import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsIndex;
import org.elasticsearch.xpack.ml.dataframe.DestinationIndex;
import org.elasticsearch.xpack.ml.extractor.ExtractedField;
import org.elasticsearch.xpack.ml.extractor.ExtractedFields;

Expand All @@ -52,7 +52,7 @@ public class ExtractedFieldsDetector {
* Fields to ignore. These are mostly internal meta fields.
*/
private static final List<String> IGNORE_FIELDS = Arrays.asList("_id", "_field_names", "_index", "_parent", "_routing", "_seq_no",
"_source", "_type", "_uid", "_version", "_feature", "_ignored", "_nested_path", DataFrameAnalyticsIndex.ID_COPY);
"_source", "_type", "_uid", "_version", "_feature", "_ignored", "_nested_path", DestinationIndex.ID_COPY);

private final String[] index;
private final DataFrameAnalyticsConfig config;
Expand Down
Expand Up @@ -66,7 +66,7 @@
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;

public class DataFrameAnalyticsIndexTests extends ESTestCase {
public class DestinationIndexTests extends ESTestCase {

private static final String ANALYTICS_ID = "some-analytics-id";
private static final String[] SOURCE_INDEX = new String[] {"source-index"};
Expand Down Expand Up @@ -142,7 +142,7 @@ private Map<String, Object> testCreateDestinationIndex(DataFrameAnalysis analysi
doAnswer(callListenerOnResponse(getMappingsResponse))
.when(client).execute(eq(GetMappingsAction.INSTANCE), getMappingsRequestCaptor.capture(), any());

DataFrameAnalyticsIndex.createDestinationIndex(
DestinationIndex.createDestinationIndex(
client,
clock,
config,
Expand Down Expand Up @@ -229,7 +229,7 @@ public void testCreateDestinationIndex_ResultsFieldsExistsInSourceIndex() {
doAnswer(callListenerOnResponse(getSettingsResponse)).when(client).execute(eq(GetSettingsAction.INSTANCE), any(), any());
doAnswer(callListenerOnResponse(getMappingsResponse)).when(client).execute(eq(GetMappingsAction.INSTANCE), any(), any());

DataFrameAnalyticsIndex.createDestinationIndex(
DestinationIndex.createDestinationIndex(
client,
clock,
config,
Expand Down Expand Up @@ -262,7 +262,7 @@ private Map<String, Object> testUpdateMappingsToDestIndex(DataFrameAnalysis anal
doAnswer(callListenerOnResponse(new AcknowledgedResponse(true)))
.when(client).execute(eq(PutMappingAction.INSTANCE), putMappingRequestCaptor.capture(), any());

DataFrameAnalyticsIndex.updateMappingsToDestIndex(
DestinationIndex.updateMappingsToDestIndex(
client,
config,
getIndexResponse,
Expand Down Expand Up @@ -330,7 +330,7 @@ public void testUpdateMappingsToDestIndex_ResultsFieldsExistsInSourceIndex() {
ElasticsearchStatusException e =
expectThrows(
ElasticsearchStatusException.class,
() -> DataFrameAnalyticsIndex.updateMappingsToDestIndex(
() -> DestinationIndex.updateMappingsToDestIndex(
client, config, getIndexResponse, ActionListener.wrap(Assert::fail)));
assertThat(
e.getMessage(),
Expand Down

0 comments on commit 407f920

Please sign in to comment.