Skip to content

Commit

Permalink
SPARK-1729. Update logging in Spark Sink.
Browse files Browse the repository at this point in the history
  • Loading branch information
harishreedharan committed Jul 15, 2014
1 parent 8c00289 commit 1edc806
Show file tree
Hide file tree
Showing 6 changed files with 163 additions and 23 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.flume.sink
import org.apache.log4j.{LogManager, PropertyConfigurator}
import org.slf4j.{Logger, LoggerFactory}
import org.slf4j.impl.StaticLoggerBinder

trait Logging {
// Make the log field transient so that objects with Logging can
// be serialized and used on another machine
@transient private var log_ : Logger = null

// Method to get or create the logger for this object
protected def log: Logger = {
if (log_ == null) {
initializeIfNecessary()
var className = this.getClass.getName
// Ignore trailing $'s in the class names for Scala objects
if (className.endsWith("$")) {
className = className.substring(0, className.length - 1)
}
log_ = LoggerFactory.getLogger(className)
}
log_
}

// Log methods that take only a String
protected def logInfo(msg: => String) {
if (log.isInfoEnabled) log.info(msg)
}

protected def logDebug(msg: => String) {
if (log.isDebugEnabled) log.debug(msg)
}

protected def logTrace(msg: => String) {
if (log.isTraceEnabled) log.trace(msg)
}

protected def logWarning(msg: => String) {
if (log.isWarnEnabled) log.warn(msg)
}

protected def logError(msg: => String) {
if (log.isErrorEnabled) log.error(msg)
}

// Log methods that take Throwables (Exceptions/Errors) too
protected def logInfo(msg: => String, throwable: Throwable) {
if (log.isInfoEnabled) log.info(msg, throwable)
}

protected def logDebug(msg: => String, throwable: Throwable) {
if (log.isDebugEnabled) log.debug(msg, throwable)
}

protected def logTrace(msg: => String, throwable: Throwable) {
if (log.isTraceEnabled) log.trace(msg, throwable)
}

protected def logWarning(msg: => String, throwable: Throwable) {
if (log.isWarnEnabled) log.warn(msg, throwable)
}

protected def logError(msg: => String, throwable: Throwable) {
if (log.isErrorEnabled) log.error(msg, throwable)
}

protected def isTraceEnabled(): Boolean = {
log.isTraceEnabled
}

private def initializeIfNecessary() {
if (!Logging.initialized) {
Logging.initLock.synchronized {
if (!Logging.initialized) {
initializeLogging()
}
}
}
}

private def initializeLogging() {
// If Log4j is being used, but is not initialized, load a default properties file
val binder = StaticLoggerBinder.getSingleton
val usingLog4j = binder.getLoggerFactoryClassStr.endsWith("Log4jLoggerFactory")
val log4jInitialized = LogManager.getRootLogger.getAllAppenders.hasMoreElements
if (!log4jInitialized && usingLog4j) {
val defaultLogProps = "org/apache/spark/log4j-defaults.properties"
Option(getClass.getClassLoader.getResource(defaultLogProps)) match {
case Some(url) =>
PropertyConfigurator.configure(url)
log.info(s"Using Spark's default log4j profile: $defaultLogProps")
case None =>
System.err.println(s"Spark was unable to load $defaultLogProps")
}
}
Logging.initialized = true

// Force a call into slf4j to initialize it. Avoids this happening from mutliple threads
// and triggering this: http://mailman.qos.ch/pipermail/slf4j-dev/2010-April/002956.html
log
}
}

private object Logging {
@volatile private var initialized = false
val initLock = new Object()
try {
// We use reflection here to handle the case where users remove the
// slf4j-to-jul bridge order to route their logs to JUL.
val bridgeClass = Class.forName("org.slf4j.bridge.SLF4JBridgeHandler")
bridgeClass.getMethod("removeHandlersForRootLogger").invoke(null)
val installed = bridgeClass.getMethod("isInstalled").invoke(null).asInstanceOf[Boolean]
if (!installed) {
bridgeClass.getMethod("install").invoke(null)
}
} catch {
case e: ClassNotFoundException => // can't log anything yet so just fail silently
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ import org.slf4j.LoggerFactory
* is rolled back.
*/
private class SparkAvroCallbackHandler(val threads: Int, val channel: Channel,
val transactionTimeout: Int, val backOffInterval: Int) extends SparkFlumeProtocol {
private val LOG = LoggerFactory.getLogger(classOf[SparkAvroCallbackHandler])
val transactionTimeout: Int, val backOffInterval: Int) extends SparkFlumeProtocol with Logging {
val transactionExecutorOpt = Option(Executors.newFixedThreadPool(threads,
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("Spark Sink Processor Thread - %d").build()))
Expand All @@ -56,6 +55,7 @@ private class SparkAvroCallbackHandler(val threads: Int, val channel: Channel,
* @return [[EventBatch]] instance that has a sequence number and an array of at most n events
*/
override def getEventBatch(n: Int): EventBatch = {
logDebug("Got getEventBatch call from Spark.")
val sequenceNumber = seqBase + seqCounter.incrementAndGet()
val processor = new TransactionProcessor(channel, sequenceNumber,
n, transactionTimeout, backOffInterval, this)
Expand All @@ -66,6 +66,7 @@ private class SparkAvroCallbackHandler(val threads: Int, val channel: Channel,
val batch = processor.getEventBatch
if (!SparkSinkUtils.isErrorBatch(batch)) {
processorMap.put(sequenceNumber.toString, processor)
logDebug("Sending event batch with sequence number: " + sequenceNumber)
}
batch
}
Expand All @@ -75,6 +76,7 @@ private class SparkAvroCallbackHandler(val threads: Int, val channel: Channel,
* @param sequenceNumber The sequence number of the event batch that was successful
*/
override def ack(sequenceNumber: CharSequence): Void = {
logDebug("Received Ack for batch with sequence number: " + sequenceNumber)
completeTransaction(sequenceNumber, success = true)
null
}
Expand All @@ -86,7 +88,7 @@ private class SparkAvroCallbackHandler(val threads: Int, val channel: Channel,
*/
override def nack(sequenceNumber: CharSequence): Void = {
completeTransaction(sequenceNumber, success = false)
LOG.info("Spark failed to commit transaction. Will reattempt events.")
logInfo("Spark failed to commit transaction. Will reattempt events.")
null
}

Expand Down Expand Up @@ -115,6 +117,7 @@ private class SparkAvroCallbackHandler(val threads: Int, val channel: Channel,
* Shuts down the executor used to process transactions.
*/
def shutdown() {
logInfo("Shutting down Spark Avro Callback Handler")
transactionExecutorOpt.foreach(executor => {
executor.shutdownNow()
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,8 @@ import org.apache.spark.flume.SparkFlumeProtocol
// until an ACK or NACK comes back or the transaction times out (after the specified timeout).
// When the response comes, the TransactionProcessor is retrieved and then unblocked,
// at which point the transaction is committed or rolled back.
class SparkSink extends AbstractSink with Configurable {
class SparkSink extends AbstractSink with Logging with Configurable {

private val LOG = LoggerFactory.getLogger(classOf[SparkSink])
// Size of the pool to use for holding transaction processors.
private var poolSize: Integer = SparkSinkConfig.DEFAULT_THREADS

Expand All @@ -74,7 +73,7 @@ class SparkSink extends AbstractSink with Configurable {
private val blockingLatch = new CountDownLatch(1)

override def start() {
LOG.info("Starting Spark Sink: " + getName + " on port: " + port + " and interface: " +
logInfo("Starting Spark Sink: " + getName + " on port: " + port + " and interface: " +
hostname + " with " + "pool size: " + poolSize + " and transaction timeout: " +
transactionTimeout + ".")
handler = Option(new SparkAvroCallbackHandler(poolSize, getChannel, transactionTimeout,
Expand All @@ -85,19 +84,19 @@ class SparkSink extends AbstractSink with Configurable {
// Netty dependencies are already available on the JVM as Flume would have pulled them in.
serverOpt = Option(new NettyServer(responder, new InetSocketAddress(hostname, port)))
serverOpt.foreach(server => {
LOG.info("Starting Avro server for sink: " + getName)
logInfo("Starting Avro server for sink: " + getName)
server.start()
})
super.start()
}

override def stop() {
LOG.info("Stopping Spark Sink: " + getName)
logInfo("Stopping Spark Sink: " + getName)
handler.foreach(callbackHandler => {
callbackHandler.shutdown()
})
serverOpt.foreach(server => {
LOG.info("Stopping Avro Server for sink: " + getName)
logInfo("Stopping Avro Server for sink: " + getName)
server.close()
server.join()
})
Expand All @@ -113,12 +112,16 @@ class SparkSink extends AbstractSink with Configurable {
poolSize = ctx.getInteger(THREADS, DEFAULT_THREADS)
transactionTimeout = ctx.getInteger(CONF_TRANSACTION_TIMEOUT, DEFAULT_TRANSACTION_TIMEOUT)
backOffInterval = ctx.getInteger(CONF_BACKOFF_INTERVAL, DEFAULT_BACKOFF_INTERVAL)
logInfo("Configured Spark Sink with hostname: " + hostname + ", port: " + port + ", " +
"poolSize: " + poolSize + ", transactionTimeout: " + transactionTimeout + ", " +
"backoffInterval: " + backOffInterval)
}

override def process(): Status = {
// This method is called in a loop by the Flume framework - block it until the sink is
// stopped to save CPU resources. The sink runner will interrupt this thread when the sink is
// being shut down.
logInfo("Blocking Sink Runner, sink will continue to run..")
blockingLatch.await()
Status.BACKOFF
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,6 @@ object SparkSinkUtils {
* @return - true if the batch represents an error
*/
def isErrorBatch(batch: EventBatch): Boolean = {
!batch.getErrorMsg.toString.equals("") //If there is an error message, it is an error batch.
!batch.getErrorMsg.toString.equals("") // If there is an error message, it is an error batch.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,7 @@ import org.slf4j.LoggerFactory
*/
private class TransactionProcessor(val channel: Channel, val seqNum: String,
var maxBatchSize: Int, val transactionTimeout: Int, val backOffInterval: Int,
val parent: SparkAvroCallbackHandler) extends Callable[Void] {

private val LOG = LoggerFactory.getLogger(classOf[TransactionProcessor])
val parent: SparkAvroCallbackHandler) extends Callable[Void] with Logging {

// If a real batch is not returned, we always have to return an error batch.
@volatile private var eventBatch: EventBatch = new EventBatch("Unknown Error", "",
Expand Down Expand Up @@ -88,9 +86,7 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
* @param success True if an ACK was received and the transaction should be committed, else false.
*/
def batchProcessed(success: Boolean) {
if (LOG.isDebugEnabled) {
LOG.debug("Batch processed for sequence number: " + seqNum)
}
logDebug("Batch processed for sequence number: " + seqNum)
batchSuccess = success
batchAckLatch.countDown()
}
Expand Down Expand Up @@ -123,6 +119,8 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
gotEventsInThisTxn = true
case None =>
if (!gotEventsInThisTxn) {
logDebug("Sleeping for " + backOffInterval + " millis as no events were read in" +
" the current transaction")
TimeUnit.MILLISECONDS.sleep(backOffInterval)
} else {
loop.break()
Expand All @@ -133,7 +131,7 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
if (!gotEventsInThisTxn) {
val msg = "Tried several times, " +
"but did not get any events from the channel!"
LOG.warn(msg)
logWarning(msg)
eventBatch.setErrorMsg(msg)
} else {
// At this point, the events are available, so fill them into the event batch
Expand All @@ -142,7 +140,7 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
})
} catch {
case e: Exception =>
LOG.error("Error while processing transaction.", e)
logWarning("Error while processing transaction.", e)
eventBatch.setErrorMsg(e.getMessage)
try {
txOpt.foreach(tx => {
Expand All @@ -166,17 +164,18 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
txOpt.foreach(tx => {
if (batchSuccess) {
try {
logDebug("Committing transaction")
tx.commit()
} catch {
case e: Exception =>
LOG.warn("Error while attempting to commit transaction. Transaction will be rolled " +
logWarning("Error while attempting to commit transaction. Transaction will be rolled " +
"back", e)
rollbackAndClose(tx, close = false) // tx will be closed later anyway
} finally {
tx.close()
}
} else {
LOG.warn("Spark could not commit transaction, NACK received. Rolling back transaction.")
logWarning("Spark could not commit transaction, NACK received. Rolling back transaction.")
rollbackAndClose(tx, close = true)
// This might have been due to timeout or a NACK. Either way the following call does not
// cause issues. This is required to ensure the TransactionProcessor instance is not leaked
Expand All @@ -192,12 +191,12 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
*/
private def rollbackAndClose(tx: Transaction, close: Boolean) {
try {
LOG.warn("Spark was unable to successfully process the events. Transaction is being " +
logWarning("Spark was unable to successfully process the events. Transaction is being " +
"rolled back.")
tx.rollback()
} catch {
case e: Exception =>
LOG.error("Error rolling back transaction. Rollback may have failed!", e)
logError("Error rolling back transaction. Rollback may have failed!", e)
} finally {
if (close) {
tx.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ private[streaming] class FlumePollingReceiver(
logDebug("Stored events with seq:" + seq)
j += 1
}
logDebug("Sending ack for sequence number: " +seq)
logDebug("Sending ack for sequence number: " + seq)
// Send an ack to Flume so that Flume discards the events from its channels.
client.ack(seq)
logDebug("Ack sent for sequence number: " + seq)
Expand Down

0 comments on commit 1edc806

Please sign in to comment.