diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java index a1123d21d158..23b873f3516a 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java @@ -67,8 +67,10 @@ import org.apache.nifi.util.hive.HiveUtils; import org.apache.nifi.util.hive.ValidationResources; +import javax.security.auth.login.LoginException; import java.io.IOException; import java.io.InputStream; +import java.security.PrivilegedAction; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -321,7 +323,7 @@ protected Collection customValidate(final ValidationContext va } @OnScheduled - public void setup(final ProcessContext context) { + public void setup(final ProcessContext context) throws IOException { ComponentLog log = getLogger(); rollbackOnFailure = context.getProperty(ROLLBACK_ON_FAILURE).asBoolean(); @@ -368,9 +370,9 @@ public void setup(final ProcessContext context) { throw new ProcessException(ae); } - log.info("Successfully logged in as principal {} with keytab {}", new Object[]{resolvedPrincipal, resolvedKeytab}); + log.info("Successfully logged in as principal " + resolvedPrincipal); } else { - ugi = null; + ugi = SecurityUtil.loginSimple(hiveConfig); kerberosUserReference.set(null); } @@ -381,172 +383,181 @@ public void setup(final ProcessContext context) { } public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { - FlowFile flowFile = session.get(); - if (flowFile == null) { - return; - } - - final RecordReaderFactory recordReaderFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); - final String dbName = context.getProperty(DB_NAME).evaluateAttributeExpressions(flowFile).getValue(); - final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue(); - - final ComponentLog log = getLogger(); - String metastoreURIs = null; - if (context.getProperty(METASTORE_URI).isSet()) { - metastoreURIs = context.getProperty(METASTORE_URI).evaluateAttributeExpressions(flowFile).getValue(); - if (StringUtils.isEmpty(metastoreURIs)) { - // Shouldn't be empty at this point, log an error, penalize the flow file, and return - log.error("The '" + METASTORE_URI.getDisplayName() + "' property evaluated to null or empty, penalizing flow file, routing to failure"); - session.transfer(session.penalize(flowFile), REL_FAILURE); + getUgi().doAs((PrivilegedAction) () -> { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return null; } - } - - final String staticPartitionValuesString = context.getProperty(STATIC_PARTITION_VALUES).evaluateAttributeExpressions(flowFile).getValue(); - final boolean disableStreamingOptimizations = context.getProperty(DISABLE_STREAMING_OPTIMIZATIONS).asBoolean(); - - // Override the Hive Metastore URIs in the config if set by the user - if (metastoreURIs != null) { - hiveConfig.set(MetastoreConf.ConfVars.THRIFT_URIS.getHiveName(), metastoreURIs); - } - HiveOptions o = new HiveOptions(metastoreURIs, dbName, tableName) - .withHiveConf(hiveConfig) - .withCallTimeout(callTimeout) - .withStreamingOptimizations(!disableStreamingOptimizations); + final RecordReaderFactory recordReaderFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); + final String dbName = context.getProperty(DB_NAME).evaluateAttributeExpressions(flowFile).getValue(); + final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue(); + + final ComponentLog log = getLogger(); + String metastoreURIs = null; + if (context.getProperty(METASTORE_URI).isSet()) { + metastoreURIs = context.getProperty(METASTORE_URI).evaluateAttributeExpressions(flowFile).getValue(); + if (StringUtils.isEmpty(metastoreURIs)) { + // Shouldn't be empty at this point, log an error, penalize the flow file, and return + log.error("The '" + METASTORE_URI.getDisplayName() + "' property evaluated to null or empty, penalizing flow file, routing to failure"); + session.transfer(session.penalize(flowFile), REL_FAILURE); + } + } - if (!StringUtils.isEmpty(staticPartitionValuesString)) { - List staticPartitionValues = Arrays.stream(staticPartitionValuesString.split(",")).filter(Objects::nonNull).map(String::trim).collect(Collectors.toList()); - o = o.withStaticPartitionValues(staticPartitionValues); - } + final String staticPartitionValuesString = context.getProperty(STATIC_PARTITION_VALUES).evaluateAttributeExpressions(flowFile).getValue(); + final boolean disableStreamingOptimizations = context.getProperty(DISABLE_STREAMING_OPTIMIZATIONS).asBoolean(); - if (SecurityUtil.isSecurityEnabled(hiveConfig)) { - final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class); - o = o.withKerberosPrincipal(credentialsService.getPrincipal()).withKerberosKeytab(credentialsService.getKeytab()); - } + // Override the Hive Metastore URIs in the config if set by the user + if (metastoreURIs != null) { + hiveConfig.set(MetastoreConf.ConfVars.THRIFT_URIS.getHiveName(), metastoreURIs); + } - final HiveOptions options = o; + HiveOptions o = new HiveOptions(metastoreURIs, dbName, tableName) + .withHiveConf(hiveConfig) + .withCallTimeout(callTimeout) + .withStreamingOptimizations(!disableStreamingOptimizations); - // Store the original class loader, then explicitly set it to this class's classloader (for use by the Hive Metastore) - ClassLoader originalClassloader = Thread.currentThread().getContextClassLoader(); - Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader()); + if (!StringUtils.isEmpty(staticPartitionValuesString)) { + List staticPartitionValues = Arrays.stream(staticPartitionValuesString.split(",")).filter(Objects::nonNull).map(String::trim).collect(Collectors.toList()); + o = o.withStaticPartitionValues(staticPartitionValues); + } - StreamingConnection hiveStreamingConnection = null; + if (SecurityUtil.isSecurityEnabled(hiveConfig)) { + final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class); + final String explicitPrincipal = context.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue(); + final String resolvedPrincipal; + if (credentialsService != null) { + resolvedPrincipal = credentialsService.getPrincipal(); + o = o.withKerberosKeytab(credentialsService.getKeytab()); + } else resolvedPrincipal = explicitPrincipal; + o = o.withKerberosPrincipal(resolvedPrincipal); + } - try { - final RecordReader reader; - - try(final InputStream in = session.read(flowFile)) { - // if we fail to create the RecordReader then we want to route to failure, so we need to - // handle this separately from the other IOExceptions which normally route to retry - try { - reader = recordReaderFactory.createRecordReader(flowFile, in, getLogger()); - } catch (Exception e) { - throw new RecordReaderFactoryException("Unable to create RecordReader", e); - } + final HiveOptions options = o; - hiveStreamingConnection = makeStreamingConnection(options, reader); + // Store the original class loader, then explicitly set it to this class's classloader (for use by the Hive Metastore) + ClassLoader originalClassloader = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader()); - // Write records to Hive streaming, then commit and close - hiveStreamingConnection.beginTransaction(); - hiveStreamingConnection.write(in); - hiveStreamingConnection.commitTransaction(); - in.close(); + StreamingConnection hiveStreamingConnection = null; - Map updateAttributes = new HashMap<>(); - updateAttributes.put(HIVE_STREAMING_RECORD_COUNT_ATTR, Long.toString(hiveStreamingConnection.getConnectionStats().getRecordsWritten())); - updateAttributes.put(ATTR_OUTPUT_TABLES, options.getQualifiedTableName()); - flowFile = session.putAllAttributes(flowFile, updateAttributes); - session.getProvenanceReporter().send(flowFile, hiveStreamingConnection.getMetastoreUri()); - } catch (TransactionError te) { - if (rollbackOnFailure) { - throw new ProcessException(te.getLocalizedMessage(), te); - } else { - throw new ShouldRetryException(te.getLocalizedMessage(), te); + try { + final RecordReader reader; + + try(final InputStream in = session.read(flowFile)) { + // if we fail to create the RecordReader then we want to route to failure, so we need to + // handle this separately from the other IOExceptions which normally route to retry + try { + reader = recordReaderFactory.createRecordReader(flowFile, in, getLogger()); + } catch (Exception e) { + throw new RecordReaderFactoryException("Unable to create RecordReader", e); + } + + hiveStreamingConnection = makeStreamingConnection(options, reader); + + // Write records to Hive streaming, then commit and close + hiveStreamingConnection.beginTransaction(); + hiveStreamingConnection.write(in); + hiveStreamingConnection.commitTransaction(); + in.close(); + + Map updateAttributes = new HashMap<>(); + updateAttributes.put(HIVE_STREAMING_RECORD_COUNT_ATTR, Long.toString(hiveStreamingConnection.getConnectionStats().getRecordsWritten())); + updateAttributes.put(ATTR_OUTPUT_TABLES, options.getQualifiedTableName()); + flowFile = session.putAllAttributes(flowFile, updateAttributes); + session.getProvenanceReporter().send(flowFile, hiveStreamingConnection.getMetastoreUri()); + } catch (TransactionError te) { + if (rollbackOnFailure) { + throw new ProcessException(te.getLocalizedMessage(), te); + } else { + throw new ShouldRetryException(te.getLocalizedMessage(), te); + } + } catch (RecordReaderFactoryException rrfe) { + if (rollbackOnFailure) { + throw new ProcessException(rrfe); + } else { + log.error( + "Failed to create {} for {} - routing to failure", + new Object[]{RecordReader.class.getSimpleName(), flowFile}, + rrfe + ); + session.transfer(flowFile, REL_FAILURE); + return null; + } } - } catch (RecordReaderFactoryException rrfe) { + session.transfer(flowFile, REL_SUCCESS); + } catch (InvalidTable | SerializationError | StreamingIOFailure | IOException e) { if (rollbackOnFailure) { - throw new ProcessException(rrfe); + if (hiveStreamingConnection != null) { + abortConnection(hiveStreamingConnection); + } + throw new ProcessException(e.getLocalizedMessage(), e); } else { + Map updateAttributes = new HashMap<>(); + final String recordCountAttribute = (hiveStreamingConnection != null) ? Long.toString(hiveStreamingConnection.getConnectionStats().getRecordsWritten()) : "0"; + updateAttributes.put(HIVE_STREAMING_RECORD_COUNT_ATTR, recordCountAttribute); + updateAttributes.put(ATTR_OUTPUT_TABLES, options.getQualifiedTableName()); + flowFile = session.putAllAttributes(flowFile, updateAttributes); log.error( - "Failed to create {} for {} - routing to failure", - new Object[]{RecordReader.class.getSimpleName(), flowFile}, - rrfe + "Exception while processing {} - routing to failure", + new Object[]{flowFile}, + e ); session.transfer(flowFile, REL_FAILURE); - return; } - } - session.transfer(flowFile, REL_SUCCESS); - } catch (InvalidTable | SerializationError | StreamingIOFailure | IOException e) { - if (rollbackOnFailure) { + } catch (DiscontinuedException e) { + // The input FlowFile processing is discontinued. Keep it in the input queue. + getLogger().warn("Discontinued processing for {} due to {}", new Object[]{flowFile, e}, e); + session.transfer(flowFile, Relationship.SELF); + } catch (ConnectionError ce) { + // If we can't connect to the metastore, yield the processor + context.yield(); + throw new ProcessException("A connection to metastore cannot be established", ce); + } catch (ShouldRetryException e) { + // This exception is already a result of adjusting an error, so simply transfer the FlowFile to retry. Still need to abort the txn + getLogger().error(e.getLocalizedMessage(), e); if (hiveStreamingConnection != null) { abortConnection(hiveStreamingConnection); } - throw new ProcessException(e.getLocalizedMessage(), e); - } else { - Map updateAttributes = new HashMap<>(); - final String recordCountAttribute = (hiveStreamingConnection != null) ? Long.toString(hiveStreamingConnection.getConnectionStats().getRecordsWritten()) : "0"; - updateAttributes.put(HIVE_STREAMING_RECORD_COUNT_ATTR, recordCountAttribute); - updateAttributes.put(ATTR_OUTPUT_TABLES, options.getQualifiedTableName()); - flowFile = session.putAllAttributes(flowFile, updateAttributes); - log.error( - "Exception while processing {} - routing to failure", - new Object[]{flowFile}, - e - ); - session.transfer(flowFile, REL_FAILURE); - } - } catch (DiscontinuedException e) { - // The input FlowFile processing is discontinued. Keep it in the input queue. - getLogger().warn("Discontinued processing for {} due to {}", new Object[]{flowFile, e}, e); - session.transfer(flowFile, Relationship.SELF); - } catch (ConnectionError ce) { - // If we can't connect to the metastore, yield the processor - context.yield(); - throw new ProcessException("A connection to metastore cannot be established", ce); - } catch (ShouldRetryException e) { - // This exception is already a result of adjusting an error, so simply transfer the FlowFile to retry. Still need to abort the txn - getLogger().error(e.getLocalizedMessage(), e); - if (hiveStreamingConnection != null) { - abortConnection(hiveStreamingConnection); - } - flowFile = session.penalize(flowFile); - session.transfer(flowFile, REL_RETRY); - } catch (StreamingException se) { - // Handle all other exceptions. These are often record-based exceptions (since Hive will throw a subclass of the exception caught above) - Throwable cause = se.getCause(); - if (cause == null) cause = se; - // This is a failure on the incoming data, rollback on failure if specified; otherwise route to failure after penalizing (and abort txn in any case) - if (rollbackOnFailure) { + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_RETRY); + } catch (StreamingException se) { + // Handle all other exceptions. These are often record-based exceptions (since Hive will throw a subclass of the exception caught above) + Throwable cause = se.getCause(); + if (cause == null) cause = se; + // This is a failure on the incoming data, rollback on failure if specified; otherwise route to failure after penalizing (and abort txn in any case) + if (rollbackOnFailure) { + if (hiveStreamingConnection != null) { + abortConnection(hiveStreamingConnection); + } + throw new ProcessException(cause.getLocalizedMessage(), cause); + } else { + flowFile = session.penalize(flowFile); + Map updateAttributes = new HashMap<>(); + final String recordCountAttribute = (hiveStreamingConnection != null) ? Long.toString(hiveStreamingConnection.getConnectionStats().getRecordsWritten()) : "0"; + updateAttributes.put(HIVE_STREAMING_RECORD_COUNT_ATTR, recordCountAttribute); + updateAttributes.put(ATTR_OUTPUT_TABLES, options.getQualifiedTableName()); + flowFile = session.putAllAttributes(flowFile, updateAttributes); + log.error( + "Exception while trying to stream {} to hive - routing to failure", + new Object[]{flowFile}, + se + ); + session.transfer(flowFile, REL_FAILURE); + } + + } catch (Throwable t) { if (hiveStreamingConnection != null) { abortConnection(hiveStreamingConnection); } - throw new ProcessException(cause.getLocalizedMessage(), cause); - } else { - flowFile = session.penalize(flowFile); - Map updateAttributes = new HashMap<>(); - final String recordCountAttribute = (hiveStreamingConnection != null) ? Long.toString(hiveStreamingConnection.getConnectionStats().getRecordsWritten()) : "0"; - updateAttributes.put(HIVE_STREAMING_RECORD_COUNT_ATTR, recordCountAttribute); - updateAttributes.put(ATTR_OUTPUT_TABLES, options.getQualifiedTableName()); - flowFile = session.putAllAttributes(flowFile, updateAttributes); - log.error( - "Exception while trying to stream {} to hive - routing to failure", - new Object[]{flowFile}, - se - ); - session.transfer(flowFile, REL_FAILURE); - } - - } catch (Throwable t) { - if (hiveStreamingConnection != null) { - abortConnection(hiveStreamingConnection); + throw (t instanceof ProcessException) ? (ProcessException) t : new ProcessException(t); + } finally { + closeConnection(hiveStreamingConnection); + // Restore original class loader, might not be necessary but is good practice since the processor task changed it + Thread.currentThread().setContextClassLoader(originalClassloader); } - throw (t instanceof ProcessException) ? (ProcessException) t : new ProcessException(t); - } finally { - closeConnection(hiveStreamingConnection); - // Restore original class loader, might not be necessary but is good practice since the processor task changed it - Thread.currentThread().setContextClassLoader(originalClassloader); - } + return null; + }); } StreamingConnection makeStreamingConnection(HiveOptions options, RecordReader reader) throws StreamingException { @@ -623,5 +634,23 @@ private ShouldRetryException(String message, Throwable cause) { super(message, cause); } } + + UserGroupInformation getUgi() { + getLogger().trace("getting UGI instance"); + if (kerberosUserReference.get() != null) { + // if there's a KerberosUser associated with this UGI, check the TGT and relogin if it is close to expiring + KerberosUser kerberosUser = kerberosUserReference.get(); + getLogger().debug("kerberosUser is " + kerberosUser); + try { + getLogger().debug("checking TGT on kerberosUser [{}]", new Object[] {kerberosUser}); + kerberosUser.checkTGTAndRelogin(); + } catch (LoginException e) { + throw new ProcessException("Unable to relogin with kerberos credentials for " + kerberosUser.getPrincipal(), e); + } + } else { + getLogger().debug("kerberosUser was null, will not refresh TGT with KerberosUser"); + } + return ugi; + } } diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java index d9113fe7868e..edaefe81f60b 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java @@ -155,7 +155,7 @@ public void setUp() throws Exception { System.setProperty("java.security.krb5.kdc", "nifi.kdc"); ugi = null; - processor = new MockPutHive3Streaming(); + processor = new MockPutHive3Streaming(ugi); hiveConfigurator = mock(HiveConfigurator.class); hiveConf = new HiveConf(); when(hiveConfigurator.getConfigurationFromFiles(anyString())).thenReturn(hiveConf); @@ -272,6 +272,7 @@ public void testUgiAndKerberosUserGetsCleared() throws Exception { runner.setProperty(PutHive3Streaming.DB_NAME, "default"); runner.setProperty(PutHive3Streaming.TABLE_NAME, "users"); processor.ugi = mock(UserGroupInformation.class); + processor.kerberosUserReference.set(mock(KerberosUser.class)); runner.run(); assertNull(processor.ugi); assertNull(processor.kerberosUserReference.get()); @@ -1128,6 +1129,10 @@ private class MockPutHive3Streaming extends PutHive3Streaming { new FieldSchema("scale", serdeConstants.DOUBLE_TYPE_NAME, "") ); + private MockPutHive3Streaming(UserGroupInformation ugi) { + this.ugi = ugi; + } + @Override StreamingConnection makeStreamingConnection(HiveOptions options, RecordReader reader) throws StreamingException { @@ -1175,6 +1180,11 @@ void setFields(List schema) { public void setGeneratePermissionsFailure(boolean generatePermissionsFailure) { this.generatePermissionsFailure = generatePermissionsFailure; } + + @Override + UserGroupInformation getUgi() { + return ugi; + } } private class MockHiveStreamingConnection implements StreamingConnection {