Skip to content

Commit b51ee30

Browse files
pgarefhuwh
authored andcommitted
[FLINK-33022][runtime] Log an error when enrichers defined as part of the configuration can not be found/loaded
1 parent b28340c commit b51ee30

File tree

2 files changed

+24
-5
lines changed

2 files changed

+24
-5
lines changed

flink-runtime/src/main/java/org/apache/flink/runtime/failure/FailureEnricherUtils.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,9 +75,9 @@ public static Collection<FailureEnricher> getFailureEnrichers(
7575
@VisibleForTesting
7676
static Collection<FailureEnricher> getFailureEnrichers(
7777
final Configuration configuration, final PluginManager pluginManager) {
78-
Set<String> includedEnrichers = getIncludedFailureEnrichers(configuration);
78+
final Set<String> enrichersToLoad = getIncludedFailureEnrichers(configuration);
7979
// When empty, NO enrichers will be started.
80-
if (includedEnrichers.isEmpty()) {
80+
if (enrichersToLoad.isEmpty()) {
8181
return Collections.emptySet();
8282
}
8383
final Iterator<FailureEnricherFactory> factoryIterator =
@@ -87,7 +87,7 @@ static Collection<FailureEnricher> getFailureEnrichers(
8787
final FailureEnricherFactory failureEnricherFactory = factoryIterator.next();
8888
final FailureEnricher failureEnricher =
8989
failureEnricherFactory.createFailureEnricher(configuration);
90-
if (includedEnrichers.contains(failureEnricher.getClass().getName())) {
90+
if (enrichersToLoad.remove(failureEnricher.getClass().getName())) {
9191
failureEnrichers.add(failureEnricher);
9292
LOG.info(
9393
"Found failure enricher {} at {}.",
@@ -102,10 +102,16 @@ static Collection<FailureEnricher> getFailureEnrichers(
102102
LOG.debug(
103103
"Excluding failure enricher {}, not configured in enricher list ({}).",
104104
failureEnricherFactory.getClass().getName(),
105-
includedEnrichers);
105+
enrichersToLoad);
106106
}
107107
}
108108

109+
if (!enrichersToLoad.isEmpty()) {
110+
LOG.error(
111+
"The following failure enrichers were configured but not found on the classpath: {}.",
112+
enrichersToLoad);
113+
}
114+
109115
return filterInvalidEnrichers(failureEnrichers);
110116
}
111117

flink-runtime/src/test/java/org/apache/flink/runtime/failure/FailureEnricherUtilsTest.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,20 @@ public void testGetFailureEnrichers() {
108108
FailureEnricherUtils.getFailureEnrichers(configuration, createPluginManager());
109109
assertThat(enrichers).hasSize(1);
110110
// verify that the failure enricher was created and returned
111-
assertThat(enrichers.iterator().next()).isInstanceOf(TestEnricher.class);
111+
assertThat(enrichers)
112+
.satisfiesExactly(
113+
enricher -> assertThat(enricher).isInstanceOf(TestEnricher.class));
114+
115+
// Valid plus Invalid Name combination
116+
configuration.set(
117+
JobManagerOptions.FAILURE_ENRICHERS_LIST,
118+
FailureEnricherUtilsTest.class.getName() + "," + TestEnricher.class.getName());
119+
final Collection<FailureEnricher> validInvalidEnrichers =
120+
FailureEnricherUtils.getFailureEnrichers(configuration, createPluginManager());
121+
assertThat(validInvalidEnrichers).hasSize(1);
122+
assertThat(validInvalidEnrichers)
123+
.satisfiesExactly(
124+
enricher -> assertThat(enricher).isInstanceOf(TestEnricher.class));
112125
}
113126

114127
@Test

0 commit comments

Comments
 (0)