From 5a990666896effdf85bbb448ee5a4c2912f24040 Mon Sep 17 00:00:00 2001 From: Panagiotis Garefalakis Date: Sat, 9 Sep 2023 13:54:43 -0700 Subject: [PATCH] [FLINK-33022][runtime] Log an error when enrichers defined as part of the configuration can not be found/loaded --- .../runtime/failure/FailureEnricherUtils.java | 14 ++++++++++---- .../runtime/failure/FailureEnricherUtilsTest.java | 15 ++++++++++++++- 2 files changed, 24 insertions(+), 5 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/failure/FailureEnricherUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/failure/FailureEnricherUtils.java index f704a1ddbd783..b5396f4ff66c7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/failure/FailureEnricherUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/failure/FailureEnricherUtils.java @@ -75,9 +75,9 @@ public static Collection getFailureEnrichers( @VisibleForTesting static Collection getFailureEnrichers( final Configuration configuration, final PluginManager pluginManager) { - Set includedEnrichers = getIncludedFailureEnrichers(configuration); + final Set enrichersToLoad = getIncludedFailureEnrichers(configuration); // When empty, NO enrichers will be started. - if (includedEnrichers.isEmpty()) { + if (enrichersToLoad.isEmpty()) { return Collections.emptySet(); } final Iterator factoryIterator = @@ -87,7 +87,7 @@ static Collection getFailureEnrichers( final FailureEnricherFactory failureEnricherFactory = factoryIterator.next(); final FailureEnricher failureEnricher = failureEnricherFactory.createFailureEnricher(configuration); - if (includedEnrichers.contains(failureEnricher.getClass().getName())) { + if (enrichersToLoad.remove(failureEnricher.getClass().getName())) { failureEnrichers.add(failureEnricher); LOG.info( "Found failure enricher {} at {}.", @@ -102,10 +102,16 @@ static Collection getFailureEnrichers( LOG.debug( "Excluding failure enricher {}, not configured in enricher list ({}).", failureEnricherFactory.getClass().getName(), - includedEnrichers); + enrichersToLoad); } } + if (!enrichersToLoad.isEmpty()) { + LOG.error( + "The following failure enrichers were configured but not found on the classpath: {}.", + enrichersToLoad); + } + return filterInvalidEnrichers(failureEnrichers); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/failure/FailureEnricherUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/failure/FailureEnricherUtilsTest.java index 8eedf2d0be298..236d8c1d3e5c5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/failure/FailureEnricherUtilsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/failure/FailureEnricherUtilsTest.java @@ -108,7 +108,20 @@ public void testGetFailureEnrichers() { FailureEnricherUtils.getFailureEnrichers(configuration, createPluginManager()); assertThat(enrichers).hasSize(1); // verify that the failure enricher was created and returned - assertThat(enrichers.iterator().next()).isInstanceOf(TestEnricher.class); + assertThat(enrichers) + .satisfiesExactly( + enricher -> assertThat(enricher).isInstanceOf(TestEnricher.class)); + + // Valid plus Invalid Name combination + configuration.set( + JobManagerOptions.FAILURE_ENRICHERS_LIST, + FailureEnricherUtilsTest.class.getName() + "," + TestEnricher.class.getName()); + final Collection validInvalidEnrichers = + FailureEnricherUtils.getFailureEnrichers(configuration, createPluginManager()); + assertThat(validInvalidEnrichers).hasSize(1); + assertThat(validInvalidEnrichers) + .satisfiesExactly( + enricher -> assertThat(enricher).isInstanceOf(TestEnricher.class)); } @Test