diff --git a/external/flume-sink/src/main/scala/org/apache/spark/flume/sink/Logging.scala b/external/flume-sink/src/main/scala/org/apache/spark/flume/sink/Logging.scala new file mode 100644 index 0000000000000..8b6453362a121 --- /dev/null +++ b/external/flume-sink/src/main/scala/org/apache/spark/flume/sink/Logging.scala @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.flume.sink +import org.apache.log4j.{LogManager, PropertyConfigurator} +import org.slf4j.{Logger, LoggerFactory} +import org.slf4j.impl.StaticLoggerBinder + +trait Logging { + // Make the log field transient so that objects with Logging can + // be serialized and used on another machine + @transient private var log_ : Logger = null + + // Method to get or create the logger for this object + protected def log: Logger = { + if (log_ == null) { + initializeIfNecessary() + var className = this.getClass.getName + // Ignore trailing $'s in the class names for Scala objects + if (className.endsWith("$")) { + className = className.substring(0, className.length - 1) + } + log_ = LoggerFactory.getLogger(className) + } + log_ + } + + // Log methods that take only a String + protected def logInfo(msg: => String) { + if (log.isInfoEnabled) log.info(msg) + } + + protected def logDebug(msg: => String) { + if (log.isDebugEnabled) log.debug(msg) + } + + protected def logTrace(msg: => String) { + if (log.isTraceEnabled) log.trace(msg) + } + + protected def logWarning(msg: => String) { + if (log.isWarnEnabled) log.warn(msg) + } + + protected def logError(msg: => String) { + if (log.isErrorEnabled) log.error(msg) + } + + // Log methods that take Throwables (Exceptions/Errors) too + protected def logInfo(msg: => String, throwable: Throwable) { + if (log.isInfoEnabled) log.info(msg, throwable) + } + + protected def logDebug(msg: => String, throwable: Throwable) { + if (log.isDebugEnabled) log.debug(msg, throwable) + } + + protected def logTrace(msg: => String, throwable: Throwable) { + if (log.isTraceEnabled) log.trace(msg, throwable) + } + + protected def logWarning(msg: => String, throwable: Throwable) { + if (log.isWarnEnabled) log.warn(msg, throwable) + } + + protected def logError(msg: => String, throwable: Throwable) { + if (log.isErrorEnabled) log.error(msg, throwable) + } + + protected def isTraceEnabled(): Boolean = { + log.isTraceEnabled + } + + private def initializeIfNecessary() { + if (!Logging.initialized) { + Logging.initLock.synchronized { + if (!Logging.initialized) { + initializeLogging() + } + } + } + } + + private def initializeLogging() { + // If Log4j is being used, but is not initialized, load a default properties file + val binder = StaticLoggerBinder.getSingleton + val usingLog4j = binder.getLoggerFactoryClassStr.endsWith("Log4jLoggerFactory") + val log4jInitialized = LogManager.getRootLogger.getAllAppenders.hasMoreElements + if (!log4jInitialized && usingLog4j) { + val defaultLogProps = "org/apache/spark/log4j-defaults.properties" + Option(getClass.getClassLoader.getResource(defaultLogProps)) match { + case Some(url) => + PropertyConfigurator.configure(url) + log.info(s"Using Spark's default log4j profile: $defaultLogProps") + case None => + System.err.println(s"Spark was unable to load $defaultLogProps") + } + } + Logging.initialized = true + + // Force a call into slf4j to initialize it. Avoids this happening from mutliple threads + // and triggering this: http://mailman.qos.ch/pipermail/slf4j-dev/2010-April/002956.html + log + } +} + +private object Logging { + @volatile private var initialized = false + val initLock = new Object() + try { + // We use reflection here to handle the case where users remove the + // slf4j-to-jul bridge order to route their logs to JUL. + val bridgeClass = Class.forName("org.slf4j.bridge.SLF4JBridgeHandler") + bridgeClass.getMethod("removeHandlersForRootLogger").invoke(null) + val installed = bridgeClass.getMethod("isInstalled").invoke(null).asInstanceOf[Boolean] + if (!installed) { + bridgeClass.getMethod("install").invoke(null) + } + } catch { + case e: ClassNotFoundException => // can't log anything yet so just fail silently + } +} diff --git a/external/flume-sink/src/main/scala/org/apache/spark/flume/sink/SparkAvroCallbackHandler.scala b/external/flume-sink/src/main/scala/org/apache/spark/flume/sink/SparkAvroCallbackHandler.scala index 3651041b41638..e4925b85c81ee 100644 --- a/external/flume-sink/src/main/scala/org/apache/spark/flume/sink/SparkAvroCallbackHandler.scala +++ b/external/flume-sink/src/main/scala/org/apache/spark/flume/sink/SparkAvroCallbackHandler.scala @@ -35,8 +35,7 @@ import org.slf4j.LoggerFactory * is rolled back. */ private class SparkAvroCallbackHandler(val threads: Int, val channel: Channel, - val transactionTimeout: Int, val backOffInterval: Int) extends SparkFlumeProtocol { - private val LOG = LoggerFactory.getLogger(classOf[SparkAvroCallbackHandler]) + val transactionTimeout: Int, val backOffInterval: Int) extends SparkFlumeProtocol with Logging { val transactionExecutorOpt = Option(Executors.newFixedThreadPool(threads, new ThreadFactoryBuilder().setDaemon(true) .setNameFormat("Spark Sink Processor Thread - %d").build())) @@ -56,6 +55,7 @@ private class SparkAvroCallbackHandler(val threads: Int, val channel: Channel, * @return [[EventBatch]] instance that has a sequence number and an array of at most n events */ override def getEventBatch(n: Int): EventBatch = { + logDebug("Got getEventBatch call from Spark.") val sequenceNumber = seqBase + seqCounter.incrementAndGet() val processor = new TransactionProcessor(channel, sequenceNumber, n, transactionTimeout, backOffInterval, this) @@ -66,6 +66,7 @@ private class SparkAvroCallbackHandler(val threads: Int, val channel: Channel, val batch = processor.getEventBatch if (!SparkSinkUtils.isErrorBatch(batch)) { processorMap.put(sequenceNumber.toString, processor) + logDebug("Sending event batch with sequence number: " + sequenceNumber) } batch } @@ -75,6 +76,7 @@ private class SparkAvroCallbackHandler(val threads: Int, val channel: Channel, * @param sequenceNumber The sequence number of the event batch that was successful */ override def ack(sequenceNumber: CharSequence): Void = { + logDebug("Received Ack for batch with sequence number: " + sequenceNumber) completeTransaction(sequenceNumber, success = true) null } @@ -86,7 +88,7 @@ private class SparkAvroCallbackHandler(val threads: Int, val channel: Channel, */ override def nack(sequenceNumber: CharSequence): Void = { completeTransaction(sequenceNumber, success = false) - LOG.info("Spark failed to commit transaction. Will reattempt events.") + logInfo("Spark failed to commit transaction. Will reattempt events.") null } @@ -115,6 +117,7 @@ private class SparkAvroCallbackHandler(val threads: Int, val channel: Channel, * Shuts down the executor used to process transactions. */ def shutdown() { + logInfo("Shutting down Spark Avro Callback Handler") transactionExecutorOpt.foreach(executor => { executor.shutdownNow() }) diff --git a/external/flume-sink/src/main/scala/org/apache/spark/flume/sink/SparkSink.scala b/external/flume-sink/src/main/scala/org/apache/spark/flume/sink/SparkSink.scala index 265b37dd0b302..35b766ff04d9f 100644 --- a/external/flume-sink/src/main/scala/org/apache/spark/flume/sink/SparkSink.scala +++ b/external/flume-sink/src/main/scala/org/apache/spark/flume/sink/SparkSink.scala @@ -48,9 +48,8 @@ import org.apache.spark.flume.SparkFlumeProtocol // until an ACK or NACK comes back or the transaction times out (after the specified timeout). // When the response comes, the TransactionProcessor is retrieved and then unblocked, // at which point the transaction is committed or rolled back. -class SparkSink extends AbstractSink with Configurable { +class SparkSink extends AbstractSink with Logging with Configurable { - private val LOG = LoggerFactory.getLogger(classOf[SparkSink]) // Size of the pool to use for holding transaction processors. private var poolSize: Integer = SparkSinkConfig.DEFAULT_THREADS @@ -74,7 +73,7 @@ class SparkSink extends AbstractSink with Configurable { private val blockingLatch = new CountDownLatch(1) override def start() { - LOG.info("Starting Spark Sink: " + getName + " on port: " + port + " and interface: " + + logInfo("Starting Spark Sink: " + getName + " on port: " + port + " and interface: " + hostname + " with " + "pool size: " + poolSize + " and transaction timeout: " + transactionTimeout + ".") handler = Option(new SparkAvroCallbackHandler(poolSize, getChannel, transactionTimeout, @@ -85,19 +84,19 @@ class SparkSink extends AbstractSink with Configurable { // Netty dependencies are already available on the JVM as Flume would have pulled them in. serverOpt = Option(new NettyServer(responder, new InetSocketAddress(hostname, port))) serverOpt.foreach(server => { - LOG.info("Starting Avro server for sink: " + getName) + logInfo("Starting Avro server for sink: " + getName) server.start() }) super.start() } override def stop() { - LOG.info("Stopping Spark Sink: " + getName) + logInfo("Stopping Spark Sink: " + getName) handler.foreach(callbackHandler => { callbackHandler.shutdown() }) serverOpt.foreach(server => { - LOG.info("Stopping Avro Server for sink: " + getName) + logInfo("Stopping Avro Server for sink: " + getName) server.close() server.join() }) @@ -113,12 +112,16 @@ class SparkSink extends AbstractSink with Configurable { poolSize = ctx.getInteger(THREADS, DEFAULT_THREADS) transactionTimeout = ctx.getInteger(CONF_TRANSACTION_TIMEOUT, DEFAULT_TRANSACTION_TIMEOUT) backOffInterval = ctx.getInteger(CONF_BACKOFF_INTERVAL, DEFAULT_BACKOFF_INTERVAL) + logInfo("Configured Spark Sink with hostname: " + hostname + ", port: " + port + ", " + + "poolSize: " + poolSize + ", transactionTimeout: " + transactionTimeout + ", " + + "backoffInterval: " + backOffInterval) } override def process(): Status = { // This method is called in a loop by the Flume framework - block it until the sink is // stopped to save CPU resources. The sink runner will interrupt this thread when the sink is // being shut down. + logInfo("Blocking Sink Runner, sink will continue to run..") blockingLatch.await() Status.BACKOFF } diff --git a/external/flume-sink/src/main/scala/org/apache/spark/flume/sink/SparkSinkUtils.scala b/external/flume-sink/src/main/scala/org/apache/spark/flume/sink/SparkSinkUtils.scala index e7041c2895025..8f16246d495a0 100644 --- a/external/flume-sink/src/main/scala/org/apache/spark/flume/sink/SparkSinkUtils.scala +++ b/external/flume-sink/src/main/scala/org/apache/spark/flume/sink/SparkSinkUtils.scala @@ -25,6 +25,6 @@ object SparkSinkUtils { * @return - true if the batch represents an error */ def isErrorBatch(batch: EventBatch): Boolean = { - !batch.getErrorMsg.toString.equals("") //If there is an error message, it is an error batch. + !batch.getErrorMsg.toString.equals("") // If there is an error message, it is an error batch. } } diff --git a/external/flume-sink/src/main/scala/org/apache/spark/flume/sink/TransactionProcessor.scala b/external/flume-sink/src/main/scala/org/apache/spark/flume/sink/TransactionProcessor.scala index a4689cca5624a..1d2dddfbf7ff0 100644 --- a/external/flume-sink/src/main/scala/org/apache/spark/flume/sink/TransactionProcessor.scala +++ b/external/flume-sink/src/main/scala/org/apache/spark/flume/sink/TransactionProcessor.scala @@ -45,9 +45,7 @@ import org.slf4j.LoggerFactory */ private class TransactionProcessor(val channel: Channel, val seqNum: String, var maxBatchSize: Int, val transactionTimeout: Int, val backOffInterval: Int, - val parent: SparkAvroCallbackHandler) extends Callable[Void] { - - private val LOG = LoggerFactory.getLogger(classOf[TransactionProcessor]) + val parent: SparkAvroCallbackHandler) extends Callable[Void] with Logging { // If a real batch is not returned, we always have to return an error batch. @volatile private var eventBatch: EventBatch = new EventBatch("Unknown Error", "", @@ -88,9 +86,7 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String, * @param success True if an ACK was received and the transaction should be committed, else false. */ def batchProcessed(success: Boolean) { - if (LOG.isDebugEnabled) { - LOG.debug("Batch processed for sequence number: " + seqNum) - } + logDebug("Batch processed for sequence number: " + seqNum) batchSuccess = success batchAckLatch.countDown() } @@ -123,6 +119,8 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String, gotEventsInThisTxn = true case None => if (!gotEventsInThisTxn) { + logDebug("Sleeping for " + backOffInterval + " millis as no events were read in" + + " the current transaction") TimeUnit.MILLISECONDS.sleep(backOffInterval) } else { loop.break() @@ -133,7 +131,7 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String, if (!gotEventsInThisTxn) { val msg = "Tried several times, " + "but did not get any events from the channel!" - LOG.warn(msg) + logWarning(msg) eventBatch.setErrorMsg(msg) } else { // At this point, the events are available, so fill them into the event batch @@ -142,7 +140,7 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String, }) } catch { case e: Exception => - LOG.error("Error while processing transaction.", e) + logWarning("Error while processing transaction.", e) eventBatch.setErrorMsg(e.getMessage) try { txOpt.foreach(tx => { @@ -166,17 +164,18 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String, txOpt.foreach(tx => { if (batchSuccess) { try { + logDebug("Committing transaction") tx.commit() } catch { case e: Exception => - LOG.warn("Error while attempting to commit transaction. Transaction will be rolled " + + logWarning("Error while attempting to commit transaction. Transaction will be rolled " + "back", e) rollbackAndClose(tx, close = false) // tx will be closed later anyway } finally { tx.close() } } else { - LOG.warn("Spark could not commit transaction, NACK received. Rolling back transaction.") + logWarning("Spark could not commit transaction, NACK received. Rolling back transaction.") rollbackAndClose(tx, close = true) // This might have been due to timeout or a NACK. Either way the following call does not // cause issues. This is required to ensure the TransactionProcessor instance is not leaked @@ -192,12 +191,12 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String, */ private def rollbackAndClose(tx: Transaction, close: Boolean) { try { - LOG.warn("Spark was unable to successfully process the events. Transaction is being " + + logWarning("Spark was unable to successfully process the events. Transaction is being " + "rolled back.") tx.rollback() } catch { case e: Exception => - LOG.error("Error rolling back transaction. Rollback may have failed!", e) + logError("Error rolling back transaction. Rollback may have failed!", e) } finally { if (close) { tx.close() diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala index 5f6854afa0207..66df20863037b 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala @@ -116,7 +116,7 @@ private[streaming] class FlumePollingReceiver( logDebug("Stored events with seq:" + seq) j += 1 } - logDebug("Sending ack for sequence number: " +seq) + logDebug("Sending ack for sequence number: " + seq) // Send an ack to Flume so that Flume discards the events from its channels. client.ack(seq) logDebug("Ack sent for sequence number: " + seq)