Skip to content

Commit

Permalink
[7.7] [ML] reduce InferenceProcessor.Factory log spam by not parsing …
Browse files Browse the repository at this point in the history
…pipelines (#56020) (#56127)

* [ML] reduce InferenceProcessor.Factory log spam by not parsing pipelines (#56020)

If there are ill-formed pipelines, or other pipelines are not ready to be parsed, `InferenceProcessor.Factory::accept(ClusterState)` logs warnings. This can be confusing and cause log spam.

It might lead folks to think there an issue with the inference processor. Also, they would see logs for the inference processor even though they might not be using the inference processor. Leading to more confusion.

Additionally, pipelines might not be parseable in this method as some processors require the new cluster state metadata before construction (e.g. `enrich` requires cluster metadata to be set before creating the processor).

closes #55985

* fixing for backport

Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
  • Loading branch information
benwtrent and elasticmachine committed May 13, 2020
1 parent 15a1c5b commit b911f53
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -350,8 +350,7 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet

InferenceProcessor.Factory inferenceFactory = new InferenceProcessor.Factory(parameters.client,
parameters.ingestService.getClusterService(),
this.settings,
parameters.ingestService);
this.settings);
parameters.ingestService.addIngestClusterStateListener(inferenceFactory);
return Collections.singletonMap(InferenceProcessor.TYPE, inferenceFactory);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.IngestMetadata;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.ingest.PipelineConfiguration;
import org.elasticsearch.ingest.Processor;
Expand All @@ -41,12 +40,14 @@
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

import static org.elasticsearch.ingest.Pipeline.PROCESSORS_KEY;
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;

Expand Down Expand Up @@ -162,26 +163,24 @@ public String getType() {

public static final class Factory implements Processor.Factory, Consumer<ClusterState> {

private static final String FOREACH_PROCESSOR_NAME = "foreach";
//Any more than 10 nestings of processors, we stop searching for inference processor definitions
private static final int MAX_INFERENCE_PROCESSOR_SEARCH_RECURSIONS = 10;
private static final Logger logger = LogManager.getLogger(Factory.class);

private static final Set<String> RESERVED_ML_FIELD_NAMES = new HashSet<>(Arrays.asList(
WarningInferenceResults.WARNING.getPreferredName(),
MODEL_ID));

private final Client client;
private final IngestService ingestService;
private final InferenceAuditor auditor;
private volatile int currentInferenceProcessors;
private volatile int maxIngestProcessors;
private volatile Version minNodeVersion = Version.CURRENT;

public Factory(Client client,
ClusterService clusterService,
Settings settings,
IngestService ingestService) {
public Factory(Client client, ClusterService clusterService, Settings settings) {
this.client = client;
this.maxIngestProcessors = MAX_INFERENCE_PROCESSORS.get(settings);
this.ingestService = ingestService;
this.auditor = new InferenceAuditor(client, clusterService.getNodeName());
clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_INFERENCE_PROCESSORS, this::setMaxIngestProcessors);
}
Expand All @@ -202,19 +201,66 @@ public void accept(ClusterState state) {

int count = 0;
for (PipelineConfiguration configuration : ingestMetadata.getPipelines().values()) {
Map<String, Object> configMap = configuration.getConfigAsMap();
try {
Pipeline pipeline = Pipeline.create(configuration.getId(),
configuration.getConfigAsMap(),
ingestService.getProcessorFactories(),
ingestService.getScriptService());
count += pipeline.getProcessors().stream().filter(processor -> processor instanceof InferenceProcessor).count();
List<Map<String, Object>> processorConfigs = ConfigurationUtils.readList(null, null, configMap, PROCESSORS_KEY);
for (Map<String, Object> processorConfigWithKey : processorConfigs) {
for (Map.Entry<String, Object> entry : processorConfigWithKey.entrySet()) {
count += numInferenceProcessors(entry.getKey(), entry.getValue());
}
}
// We cannot throw any exception here. It might break other pipelines.
} catch (Exception ex) {
logger.warn(new ParameterizedMessage("failure parsing pipeline config [{}]", configuration.getId()), ex);
logger.debug(
() -> new ParameterizedMessage("failed gathering processors for pipeline [{}]", configuration.getId()),
ex);
}
}
currentInferenceProcessors = count;
}

@SuppressWarnings("unchecked")
static int numInferenceProcessors(String processorType, Object processorDefinition) {
return numInferenceProcessors(processorType, (Map<String, Object>)processorDefinition, 0);
}

@SuppressWarnings("unchecked")
static int numInferenceProcessors(String processorType, Map<String, Object> processorDefinition, int level) {
int count = 0;
// arbitrary, but we must limit this somehow
if (level > MAX_INFERENCE_PROCESSOR_SEARCH_RECURSIONS) {
return count;
}
if (processorType == null || processorDefinition == null) {
return count;
}
if (TYPE.equals(processorType)) {
count++;
}
if (FOREACH_PROCESSOR_NAME.equals(processorType)) {
Map<String, Object> innerProcessor = (Map<String, Object>)processorDefinition.get("processor");
if (innerProcessor != null) {
// a foreach processor should only have a SINGLE nested processor. Iteration is for simplicity's sake.
for (Map.Entry<String, Object> innerProcessorWithName : innerProcessor.entrySet()) {
count += numInferenceProcessors(innerProcessorWithName.getKey(),
(Map<String, Object>) innerProcessorWithName.getValue(),
level + 1);
}
}
}
if (processorDefinition.containsKey(Pipeline.ON_FAILURE_KEY)) {
List<Map<String, Object>> onFailureConfigs = ConfigurationUtils.readList(
null,
null,
processorDefinition,
Pipeline.ON_FAILURE_KEY);
count += onFailureConfigs.stream()
.flatMap(map -> map.entrySet().stream())
.mapToInt(entry -> numInferenceProcessors(entry.getKey(), (Map<String, Object>)entry.getValue(), level + 1)).sum();
}
return count;
}

// Used for testing
int numInferenceProcessors() {
return currentInferenceProcessors;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,7 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
factoryMap.put(InferenceProcessor.TYPE,
new InferenceProcessor.Factory(parameters.client,
parameters.ingestService.getClusterService(),
Settings.EMPTY,
parameters.ingestService));
Settings.EMPTY));

factoryMap.put("not_inference", new NotInferenceProcessor.Factory());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,8 @@
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.ingest.IngestMetadata;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.ingest.PipelineConfiguration;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.inference.trainedmodel.ClassificationConfig;
Expand All @@ -54,22 +51,9 @@

public class InferenceProcessorFactoryTests extends ESTestCase {

private static final IngestPlugin SKINNY_PLUGIN = new IngestPlugin() {
@Override
public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
XPackLicenseState licenseState = mock(XPackLicenseState.class);
when(licenseState.isMachineLearningAllowed()).thenReturn(true);
return Collections.singletonMap(InferenceProcessor.TYPE,
new InferenceProcessor.Factory(parameters.client,
parameters.ingestService.getClusterService(),
Settings.EMPTY,
parameters.ingestService));
}
};
private Client client;
private XPackLicenseState licenseState;
private ClusterService clusterService;
private IngestService ingestService;

@Before
public void setUpVariables() {
Expand All @@ -86,8 +70,6 @@ public void setUpVariables() {
AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING,
ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING)));
clusterService = new ClusterService(settings, clusterSettings, tp);
ingestService = new IngestService(clusterService, tp, null, null,
null, Collections.singletonList(SKINNY_PLUGIN), client);
licenseState = mock(XPackLicenseState.class);
when(licenseState.isMachineLearningAllowed()).thenReturn(true);
}
Expand All @@ -97,8 +79,7 @@ public void testNumInferenceProcessors() throws Exception {

InferenceProcessor.Factory processorFactory = new InferenceProcessor.Factory(client,
clusterService,
Settings.EMPTY,
ingestService);
Settings.EMPTY);
processorFactory.accept(buildClusterState(metaData));

assertThat(processorFactory.numInferenceProcessors(), equalTo(0));
Expand All @@ -111,11 +92,61 @@ public void testNumInferenceProcessors() throws Exception {
assertThat(processorFactory.numInferenceProcessors(), equalTo(3));
}

public void testNumInferenceProcessorsRecursivelyDefined() throws Exception {
MetaData metadata = null;

InferenceProcessor.Factory processorFactory = new InferenceProcessor.Factory(client,
clusterService,
Settings.EMPTY);
processorFactory.accept(buildClusterState(metadata));

Map<String, PipelineConfiguration> configurations = new HashMap<>();
configurations.put("pipeline_with_model_top_level",
randomBoolean() ?
newConfigurationWithInferenceProcessor("top_level") :
newConfigurationWithForeachProcessorProcessor("top_level"));
try(XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().map(Collections.singletonMap("processors",
Collections.singletonList(
Collections.singletonMap("set",
new HashMap<String, Object>() {{
put("field", "foo");
put("value", "bar");
put("on_failure",
Arrays.asList(
inferenceProcessorForModel("second_level"),
forEachProcessorWithInference("third_level")));
}}))))) {
configurations.put("pipeline_with_model_nested",
new PipelineConfiguration("pipeline_with_model_nested", BytesReference.bytes(xContentBuilder), XContentType.JSON));
}

IngestMetadata ingestMetadata = new IngestMetadata(configurations);

ClusterState cs = ClusterState.builder(new ClusterName("_name"))
.metaData(MetaData.builder().putCustom(IngestMetadata.TYPE, ingestMetadata))
.nodes(DiscoveryNodes.builder()
.add(new DiscoveryNode("min_node",
new TransportAddress(InetAddress.getLoopbackAddress(), 9300),
Version.CURRENT))
.add(new DiscoveryNode("current_node",
new TransportAddress(InetAddress.getLoopbackAddress(), 9302),
Version.CURRENT))
.localNodeId("_node_id")
.masterNodeId("_node_id"))
.build();

processorFactory.accept(cs);
assertThat(processorFactory.numInferenceProcessors(), equalTo(3));
}

public void testNumInferenceWhenLevelExceedsMaxRecurions() {
assertThat(InferenceProcessor.Factory.numInferenceProcessors(InferenceProcessor.TYPE, Collections.emptyMap(), 100), equalTo(0));
}

public void testCreateProcessorWithTooManyExisting() throws Exception {
InferenceProcessor.Factory processorFactory = new InferenceProcessor.Factory(client,
clusterService,
Settings.builder().put(InferenceProcessor.MAX_INFERENCE_PROCESSORS.getKey(), 1).build(),
ingestService);
Settings.builder().put(InferenceProcessor.MAX_INFERENCE_PROCESSORS.getKey(), 1).build());

processorFactory.accept(buildClusterStateWithModelReferences("model1"));

Expand All @@ -129,8 +160,7 @@ public void testCreateProcessorWithTooManyExisting() throws Exception {
public void testCreateProcessorWithInvalidInferenceConfig() {
InferenceProcessor.Factory processorFactory = new InferenceProcessor.Factory(client,
clusterService,
Settings.EMPTY,
ingestService);
Settings.EMPTY);

Map<String, Object> config = new HashMap<String, Object>() {{
put(InferenceProcessor.FIELD_MAP, Collections.emptyMap());
Expand Down Expand Up @@ -170,8 +200,7 @@ public void testCreateProcessorWithInvalidInferenceConfig() {
public void testCreateProcessorWithTooOldMinNodeVersion() throws IOException {
InferenceProcessor.Factory processorFactory = new InferenceProcessor.Factory(client,
clusterService,
Settings.EMPTY,
ingestService);
Settings.EMPTY);
processorFactory.accept(builderClusterStateWithModelReferences(Version.V_7_5_0, "model1"));

Map<String, Object> regression = new HashMap<String, Object>() {{
Expand Down Expand Up @@ -214,8 +243,7 @@ public void testCreateProcessorWithTooOldMinNodeVersion() throws IOException {
public void testCreateProcessor() {
InferenceProcessor.Factory processorFactory = new InferenceProcessor.Factory(client,
clusterService,
Settings.EMPTY,
ingestService);
Settings.EMPTY);

Map<String, Object> regression = new HashMap<String, Object>() {{
put(InferenceProcessor.FIELD_MAP, Collections.emptyMap());
Expand Down Expand Up @@ -249,8 +277,7 @@ public void testCreateProcessor() {
public void testCreateProcessorWithDuplicateFields() {
InferenceProcessor.Factory processorFactory = new InferenceProcessor.Factory(client,
clusterService,
Settings.EMPTY,
ingestService);
Settings.EMPTY);

Map<String, Object> regression = new HashMap<String, Object>() {{
put(InferenceProcessor.FIELD_MAP, Collections.emptyMap());
Expand Down Expand Up @@ -280,7 +307,8 @@ private static ClusterState buildClusterStateWithModelReferences(String... model
private static ClusterState builderClusterStateWithModelReferences(Version minNodeVersion, String... modelId) throws IOException {
Map<String, PipelineConfiguration> configurations = new HashMap<>(modelId.length);
for (String id : modelId) {
configurations.put("pipeline_with_model_" + id, newConfigurationWithInferenceProcessor(id));
configurations.put("pipeline_with_model_" + id,
randomBoolean() ? newConfigurationWithInferenceProcessor(id) : newConfigurationWithForeachProcessorProcessor(id));
}
IngestMetadata ingestMetadata = new IngestMetadata(configurations);

Expand All @@ -300,17 +328,35 @@ private static ClusterState builderClusterStateWithModelReferences(Version minNo

private static PipelineConfiguration newConfigurationWithInferenceProcessor(String modelId) throws IOException {
try(XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().map(Collections.singletonMap("processors",
Collections.singletonList(
Collections.singletonMap(InferenceProcessor.TYPE,
new HashMap<String, Object>() {{
put(InferenceProcessor.MODEL_ID, modelId);
put(InferenceProcessor.INFERENCE_CONFIG,
Collections.singletonMap(RegressionConfig.NAME.getPreferredName(), Collections.emptyMap()));
put(InferenceProcessor.TARGET_FIELD, "new_field");
put(InferenceProcessor.FIELD_MAP, Collections.singletonMap("source", "dest"));
}}))))) {
Collections.singletonList(inferenceProcessorForModel(modelId))))) {
return new PipelineConfiguration("pipeline_with_model_" + modelId, BytesReference.bytes(xContentBuilder), XContentType.JSON);
}
}

private static PipelineConfiguration newConfigurationWithForeachProcessorProcessor(String modelId) throws IOException {
try(XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().map(Collections.singletonMap("processors",
Collections.singletonList(forEachProcessorWithInference(modelId))))) {
return new PipelineConfiguration("pipeline_with_model_" + modelId, BytesReference.bytes(xContentBuilder), XContentType.JSON);
}
}

private static Map<String, Object> forEachProcessorWithInference(String modelId) {
return Collections.singletonMap("foreach",
new HashMap<String, Object>() {{
put("field", "foo");
put("processor", inferenceProcessorForModel(modelId));
}});
}

private static Map<String, Object> inferenceProcessorForModel(String modelId) {
return Collections.singletonMap(InferenceProcessor.TYPE,
new HashMap<String, Object>() {{
put(InferenceProcessor.MODEL_ID, modelId);
put(InferenceProcessor.INFERENCE_CONFIG,
Collections.singletonMap(RegressionConfig.NAME.getPreferredName(), Collections.emptyMap()));
put(InferenceProcessor.TARGET_FIELD, "new_field");
put(InferenceProcessor.FIELD_MAP, Collections.singletonMap("source", "dest"));
}});
}

}

0 comments on commit b911f53

Please sign in to comment.