From 5db54190dfc44cc06d5b52f1427ebcd90b0e9094 Mon Sep 17 00:00:00 2001 From: Koji Kawamura Date: Thu, 15 Mar 2018 16:27:20 +0900 Subject: [PATCH 1/2] NIFI-4980: Typo in ReportAtlasLineage kafka kerberos service name property --- .../org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java index 07d806ad51b3..29ae013ccd80 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java @@ -260,7 +260,7 @@ public class ReportLineageToAtlas extends AbstractReportingTask { static final PropertyDescriptor KAFKA_KERBEROS_SERVICE_NAME = new PropertyDescriptor.Builder() - .name("kafka-kerberos-service-name-kafka") + .name("kafka-kerberos-service-name") .displayName("Kafka Kerberos Service Name") .description("The Kerberos principal name that Kafka runs for Atlas notification." + " This can be defined either in Kafka's JAAS config or in Kafka's config." + From 68d037deee6f677b3a050418f8889e4b5dceacf1 Mon Sep 17 00:00:00 2001 From: Koji Kawamura Date: Wed, 25 Apr 2018 10:19:20 +0900 Subject: [PATCH 2/2] Added user defined dynamic property to support migration to use new property value. --- .../atlas/reporting/ReportLineageToAtlas.java | 31 +++++++++++++++++- .../reporting/TestReportLineageToAtlas.java | 32 +++++++++++++++++++ 2 files changed, 62 insertions(+), 1 deletion(-) diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java index 29ae013ccd80..3c5ce98b26c9 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java @@ -76,12 +76,14 @@ import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; import org.apache.nifi.components.state.Scope; import org.apache.nifi.context.PropertyContext; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.controller.status.ProcessGroupStatus; import org.apache.nifi.kerberos.KerberosCredentialsService; import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.provenance.ProvenanceEventRecord; @@ -272,6 +274,17 @@ public class ReportLineageToAtlas extends AbstractReportingTask { .defaultValue("kafka") .build(); + static final PropertyDescriptor OLD_KAFKA_KERBEROS_SERVICE_NAME = new PropertyDescriptor.Builder() + .name("kafka-kerberos-service-name-kafka") + .displayName("DEPRECATED: Kafka Kerberos Service Name") + .description("The property name of 'Kafka Kerberos Service Name' has been renamed" + + " from 'kafka-kerberos-service-name-kafka' to 'kafka-kerberos-service-name' by NIFI-4980." + + " This user defined property holds the old 'Kafka Kerberos Service Name' value for migration, and it's being used until it gets deleted." + + " Please update 'Kafka Kerberos Service Name' setting accordingly." + + " Copying the old value to the new property then deleting this user defined property would be sufficient.") + .addValidator(Validator.VALID) + .build(); + static final AllowableValue LINEAGE_STRATEGY_SIMPLE_PATH = new AllowableValue("SimplePath", "Simple Path", "Map NiFi provenance events and target Atlas DataSets to statically created 'nifi_flow_path' Atlas Processes." + " See also 'Additional Details'."); @@ -345,6 +358,9 @@ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String proper return propertyDescriptor; } } + if (OLD_KAFKA_KERBEROS_SERVICE_NAME.getName().equals(propertyDescriptorName)) { + return OLD_KAFKA_KERBEROS_SERVICE_NAME; + } return null; } @@ -778,7 +794,7 @@ private void setKafkaJaasConfig(Map mapToPopulate, PropertyConte keytab = credentialsService.getKeytab(); } - String serviceName = context.getProperty(KAFKA_KERBEROS_SERVICE_NAME).evaluateAttributeExpressions().getValue(); + final String serviceName = getKafkaKerberosServiceName(context, getLogger()); if(StringUtils.isNotBlank(keytab) && StringUtils.isNotBlank(principal) && StringUtils.isNotBlank(serviceName)) { mapToPopulate.put("atlas.jaas.KafkaClient.loginModuleControlFlag", "required"); mapToPopulate.put("atlas.jaas.KafkaClient.loginModuleName", "com.sun.security.auth.module.Krb5LoginModule"); @@ -794,4 +810,17 @@ private void setKafkaJaasConfig(Map mapToPopulate, PropertyConte } } + static String getKafkaKerberosServiceName(PropertyContext context, ComponentLog logger) { + // Migration code for NIFI-4980 that corrected property name typo. + final boolean isOldPropertySet = context.getProperty(OLD_KAFKA_KERBEROS_SERVICE_NAME).isSet(); + final PropertyValue kafkaKerberosServiceNameProperty = isOldPropertySet + ? context.getProperty(OLD_KAFKA_KERBEROS_SERVICE_NAME) : context.getProperty(KAFKA_KERBEROS_SERVICE_NAME); + if (isOldPropertySet) { + logger.warn("Using old 'Kakfa Kerberos Service Name' property, migration is needed." + + " Please update ReportLineageToAtlas configuration to disable this warning message," + + " by confirming 'Kafka Kerberos Service Name' value and deleting the deprecated user defined property having the old value."); + } + return kafkaKerberosServiceNameProperty.evaluateAttributeExpressions().getValue(); + } + } diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/TestReportLineageToAtlas.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/TestReportLineageToAtlas.java index ae1d63dcb415..b65f46ae3b17 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/TestReportLineageToAtlas.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/TestReportLineageToAtlas.java @@ -17,6 +17,8 @@ package org.apache.nifi.atlas.reporting; import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.util.MockComponentLog; import org.apache.nifi.util.MockProcessContext; import org.apache.nifi.util.MockValidationContext; import org.junit.Test; @@ -31,6 +33,9 @@ import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.ATLAS_PASSWORD; import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.ATLAS_URLS; import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.ATLAS_USER; +import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.KAFKA_KERBEROS_SERVICE_NAME; +import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.OLD_KAFKA_KERBEROS_SERVICE_NAME; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; public class TestReportLineageToAtlas { @@ -86,4 +91,31 @@ public void validateAtlasUrls() throws Exception { r -> assertTrue("Atlas URLs is invalid", !r.isValid())); } + @Test + public void testKafkaKerberosServiceName() { + final ReportLineageToAtlas reportingTask = new ReportLineageToAtlas(); + final MockProcessContext processContext = new MockProcessContext(reportingTask); + final ComponentLog logger = new MockComponentLog(reportingTask.getIdentifier(), reportingTask); + + processContext.setProperty(ATLAS_URLS, "http://atlas.example.com:21000"); + processContext.setProperty(ATLAS_NIFI_URL, "http://nifi.example.com:8080/nifi"); + processContext.setProperty(ATLAS_USER, "admin"); + processContext.setProperty(ATLAS_PASSWORD, "admin"); + + // Default is 'kafka' + String serviceName = reportingTask.getKafkaKerberosServiceName(processContext, logger); + assertEquals("kafka", serviceName); + + // New one can use EL. + processContext.setProperty(KAFKA_KERBEROS_SERVICE_NAME, "${literal('new')}"); + assertTrue(processContext.isValid()); + serviceName = reportingTask.getKafkaKerberosServiceName(processContext, logger); + assertEquals("new", serviceName); + + // If there is an old property, use the ole one + processContext.setProperty(OLD_KAFKA_KERBEROS_SERVICE_NAME, "${literal('old')}"); + serviceName = reportingTask.getKafkaKerberosServiceName(processContext, logger); + assertTrue(processContext.isValid()); + assertEquals("old", serviceName); + } }