From 8776904697adf714a2dbc1db0fb8788afa112fb9 Mon Sep 17 00:00:00 2001 From: Koji Kawamura Date: Fri, 5 Jan 2018 19:23:28 +0900 Subject: [PATCH] NIFI-4741: Avoid DelegationToken expiration at ReportLineageToAtlas The reporting task used to hold a single AtlasClientV2 instance throughout its runtime starting from being started until being stopped. If it is configured to use Kerberos authentication for Atlas REST API, after a published DelegationToken expires (10 hours by default), the reporting task will not be able to recover from 401 Unauthorized state. In order to avoid stucking in such situation, this commit changes the way ReportLineageToAtlas uses AtlasClientV2 instance to create an instance per onTrigger execution. It also addresses Kerberos ticket expiration. This approach incurs some overheads by initiating the client each time, however, it should be insignificant from an overall processing time perspective including analyzing NiFi flow and Provenance records. --- .../apache/nifi/atlas/NiFiAtlasClient.java | 44 ++----------------- .../org/apache/nifi/atlas/NiFiAtlasHook.java | 10 +++-- .../atlas/reporting/ReportLineageToAtlas.java | 40 ++++++++++++----- .../apache/nifi/atlas/security/Kerberos.java | 3 +- .../apache/nifi/atlas/ITNiFiAtlasClient.java | 4 +- .../reporting/ITReportLineageToAtlas.java | 1 + 6 files changed, 44 insertions(+), 58 deletions(-) diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java index feb2b48ccda1..4e95a92a8f00 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java @@ -18,7 +18,6 @@ import com.sun.jersey.api.client.UniformInterfaceException; import com.sun.jersey.core.util.MultivaluedMapImpl; -import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasClientV2; import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.AtlasServiceException; @@ -29,14 +28,12 @@ import org.apache.atlas.model.typedef.AtlasEntityDef; import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef; import org.apache.atlas.model.typedef.AtlasTypesDef; -import org.apache.nifi.atlas.security.AtlasAuthN; import org.apache.nifi.util.StringUtils; import org.apache.nifi.util.Tuple; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.ws.rs.core.MultivaluedMap; -import java.io.File; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -44,7 +41,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Properties; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; @@ -82,44 +78,10 @@ public class NiFiAtlasClient { private static final Logger logger = LoggerFactory.getLogger(NiFiAtlasClient.class); - private static NiFiAtlasClient nifiClient; - private AtlasClientV2 atlasClient; + private final AtlasClientV2 atlasClient; - private NiFiAtlasClient() { - super(); - } - - public static NiFiAtlasClient getInstance() { - if (nifiClient == null) { - synchronized (NiFiAtlasClient.class) { - if (nifiClient == null) { - nifiClient = new NiFiAtlasClient(); - } - } - } - return nifiClient; - } - - public void initialize(final String[] baseUrls, final AtlasAuthN authN, final File atlasConfDir) { - - synchronized (NiFiAtlasClient.class) { - - if (atlasClient != null) { - logger.info("{} had been setup but replacing it with new one.", atlasClient); - ApplicationProperties.forceReload(); - } - - if (atlasConfDir != null) { - // If atlasConfDir is not set, atlas-application.properties will be searched under classpath. - Properties props = System.getProperties(); - final String atlasConfProp = "atlas.conf"; - props.setProperty(atlasConfProp, atlasConfDir.getAbsolutePath()); - logger.debug("{} has been set to: {}", atlasConfProp, props.getProperty(atlasConfProp)); - } - - atlasClient = authN.createClient(baseUrls); - - } + public NiFiAtlasClient(AtlasClientV2 atlasClient) { + this.atlasClient = atlasClient; } /** diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasHook.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasHook.java index 43fefff0988d..a15c93572d46 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasHook.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasHook.java @@ -60,7 +60,7 @@ public class NiFiAtlasHook extends AtlasHook implements LineageContext { private static final String CONF_PREFIX = "atlas.hook.nifi."; private static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries"; - private final NiFiAtlasClient atlasClient; + private NiFiAtlasClient atlasClient; /** * An index to resolve a qualifiedName from a GUID. @@ -81,9 +81,7 @@ protected boolean removeEldestEntry(Map.Entry eldest) { }; } - public NiFiAtlasHook(NiFiAtlasClient atlasClient) { - this.atlasClient = atlasClient; - + public NiFiAtlasHook() { final int qualifiedNameCacheSize = 10_000; this.guidToQualifiedName = createCache(qualifiedNameCacheSize); @@ -91,6 +89,10 @@ public NiFiAtlasHook(NiFiAtlasClient atlasClient) { this.typedQualifiedNameToRef = createCache(dataSetRefCacheSize); } + public void setAtlasClient(NiFiAtlasClient atlasClient) { + this.atlasClient = atlasClient; + } + @Override protected String getNumberOfRetriesPropertyKey() { return HOOK_NUM_RETRIES; diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java index 5bb60244f790..9238f95319a3 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java @@ -17,6 +17,7 @@ package org.apache.nifi.atlas.reporting; import com.sun.jersey.api.client.ClientResponse; +import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasServiceException; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.producer.ProducerConfig; @@ -279,7 +280,7 @@ public class ReportLineageToAtlas extends AbstractReportingTask { private static final String ATLAS_PROPERTY_KAFKA_BOOTSTRAP_SERVERS = ATLAS_KAFKA_PREFIX + "bootstrap.servers"; private static final String ATLAS_PROPERTY_KAFKA_CLIENT_ID = ATLAS_KAFKA_PREFIX + ProducerConfig.CLIENT_ID_CONFIG; private final ServiceLoader clusterResolverLoader = ServiceLoader.load(ClusterResolver.class); - private volatile NiFiAtlasClient atlasClient; + private volatile AtlasAuthN atlasAuthN; private volatile Properties atlasProperties; private volatile boolean isTypeDefCreated = false; private volatile String defaultClusterName; @@ -399,13 +400,13 @@ private void validateKafkaProperties(ValidationContext context, Collection urls = new ArrayList<>(); parseAtlasUrls(context.getProperty(ATLAS_URLS), urls::add); final boolean isAtlasApiSecure = urls.stream().anyMatch(url -> url.toLowerCase().startsWith("https")); @@ -476,7 +477,7 @@ private void initAtlasClient(ConfigurationContext context) throws IOException { throw new ProcessException("Default cluster name is not defined."); } - final AtlasAuthN atlasAuthN = getAtlasAuthN(atlasAuthNMethod); + atlasAuthN = getAtlasAuthN(atlasAuthNMethod); atlasAuthN.configure(context); // Create Atlas configuration file if necessary. @@ -497,16 +498,32 @@ private void initAtlasClient(ConfigurationContext context) throws IOException { } } + getLogger().debug("Force reloading Atlas application properties."); + ApplicationProperties.forceReload(); - atlasClient = NiFiAtlasClient.getInstance(); + if (confDir != null) { + // If atlasConfDir is not set, atlas-application.properties will be searched under classpath. + Properties props = System.getProperties(); + final String atlasConfProp = "atlas.conf"; + props.setProperty(atlasConfProp, confDir.getAbsolutePath()); + getLogger().debug("{} has been set to: {}", new Object[]{atlasConfProp, props.getProperty(atlasConfProp)}); + } + } + + /** + * In order to avoid authentication expiration issues (i.e. Kerberos ticket and DelegationToken expiration), + * create Atlas client instance at every onTrigger execution. + */ + private NiFiAtlasClient createNiFiAtlasClient(ReportingContext context) { + List urls = new ArrayList<>(); + parseAtlasUrls(context.getProperty(ATLAS_URLS), urls::add); try { - atlasClient.initialize(urls.toArray(new String[]{}), atlasAuthN, confDir); + return new NiFiAtlasClient(atlasAuthN.createClient(urls.toArray(new String[]{}))); } catch (final NullPointerException e) { throw new ProcessException(String.format("Failed to initialize Atlas client due to %s." + " Make sure 'atlas-application.properties' is in the directory specified with %s" + " or under root classpath if not specified.", e, ATLAS_CONF_DIR.getDisplayName()), e); } - } private AtlasAuthN getAtlasAuthN(String atlasAuthNMethod) { @@ -557,6 +574,8 @@ public void onTrigger(ReportingContext context) { // If standalone or being primary node in a NiFi cluster, this node is responsible for doing primary tasks. final boolean isResponsibleForPrimaryTasks = !isClustered || getNodeTypeProvider().isPrimary(); + final NiFiAtlasClient atlasClient = createNiFiAtlasClient(context); + // Create Entity defs in Atlas if there's none yet. if (!isTypeDefCreated) { try { @@ -578,7 +597,7 @@ public void onTrigger(ReportingContext context) { // Regardless of whether being a primary task node, each node has to analyse NiFiFlow. // Assuming each node has the same flow definition, that is guaranteed by NiFi cluster management mechanism. - final NiFiFlow nifiFlow = createNiFiFlow(context); + final NiFiFlow nifiFlow = createNiFiFlow(context, atlasClient); if (isResponsibleForPrimaryTasks) { @@ -592,11 +611,12 @@ public void onTrigger(ReportingContext context) { // NOTE: There is a race condition between the primary node and other nodes. // If a node notifies an event related to a NiFi component which is not yet created by NiFi primary node, // then the notification message will fail due to having a reference to a non-existing entity. + nifiAtlasHook.setAtlasClient(atlasClient); consumeNiFiProvenanceEvents(context, nifiFlow); } - private NiFiFlow createNiFiFlow(ReportingContext context) { + private NiFiFlow createNiFiFlow(ReportingContext context, NiFiAtlasClient atlasClient) { final ProcessGroupStatus rootProcessGroup = context.getEventAccess().getGroupStatus("root"); final String flowName = rootProcessGroup.getName(); final String nifiUrl = context.getProperty(ATLAS_NIFI_URL).evaluateAttributeExpressions().getValue(); diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/security/Kerberos.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/security/Kerberos.java index 88feba00c827..ab55b49fe4bc 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/security/Kerberos.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/security/Kerberos.java @@ -73,7 +73,8 @@ public AtlasClientV2 createClient(String[] baseUrls) { UserGroupInformation.setConfiguration(hadoopConf); final UserGroupInformation ugi; try { - ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab); + UserGroupInformation.loginUserFromKeytab(principal, keytab); + ugi = UserGroupInformation.getCurrentUser(); } catch (IOException e) { throw new RuntimeException("Failed to login with Kerberos due to: " + e, e); } diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/ITNiFiAtlasClient.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/ITNiFiAtlasClient.java index f1727b0a450a..69a3042cf467 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/ITNiFiAtlasClient.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/ITNiFiAtlasClient.java @@ -39,14 +39,14 @@ public class ITNiFiAtlasClient { @Before public void setup() { - atlasClient = NiFiAtlasClient.getInstance(); // Add your atlas server ip address into /etc/hosts as atlas.example.com PropertyContext propertyContext = mock(PropertyContext.class); when(propertyContext.getProperty(ReportLineageToAtlas.ATLAS_USER)).thenReturn(new MockPropertyValue("admin")); when(propertyContext.getProperty(ReportLineageToAtlas.ATLAS_PASSWORD)).thenReturn(new MockPropertyValue("admin")); final AtlasAuthN atlasAuthN = new Basic(); atlasAuthN.configure(propertyContext); - atlasClient.initialize(new String[]{"http://atlas.example.com:21000/"}, atlasAuthN, null); + + atlasClient = new NiFiAtlasClient(atlasAuthN.createClient(new String[]{"http://atlas.example.com:21000/"})); } @Test diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/ITReportLineageToAtlas.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/ITReportLineageToAtlas.java index 2fe7d079271c..e83495adffea 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/ITReportLineageToAtlas.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/ITReportLineageToAtlas.java @@ -409,6 +409,7 @@ private void test(TestConfiguration tc) throws InitializationException, IOExcept when(eventAccess.getGroupStatus(eq("root"))).thenReturn(tc.rootPgStatus); final ProvenanceRepository provenanceRepository = mock(ProvenanceRepository.class); + when(eventAccess.getControllerStatus()).thenReturn(tc.rootPgStatus); when(eventAccess.getProvenanceRepository()).thenReturn(provenanceRepository); when(eventAccess.getProvenanceEvents(eq(-1L), anyInt())).thenReturn(tc.provenanceRecords); when(provenanceRepository.getMaxEventId()).thenReturn((long) tc.provenanceRecords.size() - 1);