Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SAMZA-1539: KafkaProducer potential hang on close() when task.drop.pr… #390

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -28,7 +28,6 @@ import org.apache.kafka.clients.producer.Producer
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.clients.producer.RecordMetadata
import org.apache.kafka.common.PartitionInfo
import org.apache.kafka.common.errors.SerializationException
import org.apache.samza.system.OutgoingMessageEnvelope
import org.apache.samza.system.SystemProducer
import org.apache.samza.system.SystemProducerException
Expand All @@ -46,32 +45,30 @@ class KafkaSystemProducer(systemName: String,

// Represents a fatal error that caused the producer to close.
val fatalException: AtomicReference[SystemProducerException] = new AtomicReference[SystemProducerException]()
@volatile var producer: Producer[Array[Byte], Array[Byte]] = null
val producerLock: Object = new Object
val producerRef: AtomicReference[Producer[Array[Byte], Array[Byte]]] = new AtomicReference[Producer[Array[Byte], Array[Byte]]]()
val producerCreationLock: Object = new Object
@volatile var stopped = false

def start(): Unit = {
producer = getProducer()
producerRef.set(getProducer())
}

def stop() {
info("Stopping producer for system: " + this.systemName)

// stop() should not happen often so no need to optimize locking
producerLock.synchronized {
try {
if (producer != null) {
producer.close // Also performs the equivalent of a flush()
}
stopped = true

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it make more sense to put stopped = true after stop is done?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. The flag is used to prevent subsequent producer creations. That should happen immediately when stop() is invoked.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the logic makes sense. It might be better to call it stopTriggered or something else instead of stopped, to avoid the confusion, but that's minor and optional.

val currentProducer = producerRef.getAndSet(null)
try {
if (currentProducer != null) {
currentProducer.close // Also performs the equivalent of a flush()
}

val exception = fatalException.get()
if (exception != null) {
error("Observed an earlier send() error while closing producer", exception)
}
} catch {
case e: Exception => error("Error while closing producer for system: " + systemName, e)
} finally {
producer = null
val exception = fatalException.get()
if (exception != null) {
error("Observed an earlier send() error while closing producer", exception)
}
} catch {
case e: Exception => error("Error while closing producer for system: " + systemName, e)
}
}

Expand All @@ -82,7 +79,7 @@ class KafkaSystemProducer(systemName: String,
trace("Enqueuing message: %s, %s." format (source, envelope))

val topicName = envelope.getSystemStream.getStream
if (topicName == null || topicName == "") {
if (topicName == null || topicName.isEmpty) {
throw new IllegalArgumentException("Invalid system stream: " + envelope.getSystemStream)
}

Expand All @@ -92,10 +89,7 @@ class KafkaSystemProducer(systemName: String,
throw new SystemProducerException("Producer was unable to recover from previous exception.", globalProducerException)
}

val currentProducer = producer
if (currentProducer == null) {
throw new SystemProducerException("Kafka producer is null.")
}
val currentProducer = getOrCreateCurrentProducer

// Java-based Kafka producer API requires an "Integer" type partitionKey and does not allow custom overriding of Partitioners
// Any kind of custom partitioning has to be done on the client-side
Expand All @@ -115,7 +109,7 @@ class KafkaSystemProducer(systemName: String,
val producerException = new SystemProducerException("Failed to send message for Source: %s on System:%s Topic:%s Partition:%s"
.format(source, systemName, topicName, partitionKey), exception)

handleSendException(currentProducer, producerException, true)
handleFatalSendException(currentProducer, producerException)
}
}
})
Expand All @@ -125,18 +119,25 @@ class KafkaSystemProducer(systemName: String,
val producerException = new SystemProducerException("Failed to send message for Source: %s on System:%s Topic:%s Partition:%s"
.format(source, systemName, topicName, partitionKey), originalException)

handleSendException(currentProducer, producerException, isFatalException(originalException))
metrics.sendFailed.inc
error("Got a synchronous error from Kafka producer.", producerException)
// Synchronous exceptions are always recoverable so propagate it up and let the user decide
throw producerException
}
}


def flush(source: String) {
updateTimer(metrics.flushNs) {
metrics.flushes.inc

val currentProducer = producer
val currentProducer = producerRef.get()
if (currentProducer == null) {
if (dropProducerExceptions) {
// No producer to flush, but we're ignoring exceptions so just return.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need a WARN here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be useful. Added it.

warn("Skipping flush because the Kafka producer is null.")
metrics.flushFailed.inc
return
}
throw new SystemProducerException("Kafka producer is null.")
}

Expand All @@ -162,7 +163,14 @@ class KafkaSystemProducer(systemName: String,
}


private def handleSendException(currentProducer: Producer[Array[Byte], Array[Byte]], producerException: SystemProducerException, isFatalException: Boolean) = {
/**
* Handles a fatal exception by closing the producer and either recreating it or storing the exception
* to rethrow later, depending on the value of dropProducerExceptions.
*
* @param currentProducer the current producer for which the exception occurred. Must not be null.
* @param producerException the exception to handle.
*/
private def handleFatalSendException(currentProducer: Producer[Array[Byte], Array[Byte]], producerException: SystemProducerException): Unit = {
metrics.sendFailed.inc
error(producerException)
// The SystemProducer API is synchronous, so there's no way for us to guarantee that an exception will
Expand All @@ -172,49 +180,56 @@ class KafkaSystemProducer(systemName: String,
if (dropProducerExceptions) {
warn("Ignoring producer exception. All messages in the failed producer request will be dropped!")

if (isFatalException) {
producerLock.synchronized {
// Prevent each callback from recreating producer for the same failure.
if (currentProducer == producer) {
info("Creating a new producer for system %s." format systemName)
try {
currentProducer.close(0, TimeUnit.MILLISECONDS)
} catch {
case exception: Exception => error("Exception while closing producer.", exception)
}
producer = getProducer()
}
}
}
} else {
// If there is an exception in the callback, it means that the Kafka producer has exhausted the max-retries
// Close producer to ensure messages queued in-flight are not sent and hence, avoid re-ordering
// This works because there is only 1 callback thread and no sends can complete until the callback returns.
if (isFatalException) {
fatalException.compareAndSet(null, producerException)
// Prevent each callback from closing and nulling producer for the same failure.
if (currentProducer == producerRef.get()) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If both currentProducer and producerRef.get() are null here, it can have null pointer exception. I would recommend to have a null check to make the code more robust.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. The java doc specifies that currentProducer must not be null
  2. If currentProducer was null, the right thing to do is throw an exception because we can't properly handle the exception, so then the debate is NPE or other exception. I don't see the value.

info("Closing producer for system %s." format systemName)
try {
// send()s can get ProducerClosedException if the producer is stopped after they get the currentProducer
// reference but before producer.send() returns. That's ONLY ok when dropProducerExceptions is true.
// Also, when producer.close(0) is invoked on the Kafka IO thread, when it returns there will be no more
// messages sent over the wire. This is key to ensuring no out-of-order messages as a result of recreating
// the producer.
currentProducer.close(0, TimeUnit.MILLISECONDS)
} catch {
case exception: Exception => error("Exception while closing producer.", exception)
}
producerRef.compareAndSet(currentProducer, null)
}
} else {
// If there is an exception in the callback, it means that the Kafka producer has exhausted the max-retries
// Close producer to ensure messages queued in-flight are not sent and hence, avoid re-ordering
// This works because there is only 1 IO thread and no IO can be done until the callback returns.
// Do not create a new producer here! It cannot be done without data loss for all concurrency modes.
fatalException.compareAndSet(null, producerException)
try {
currentProducer.close(0, TimeUnit.MILLISECONDS)
} catch {
case exception: Exception => error("Exception while closing producer.", exception)
}
}
}

/**
* A fatal exception is one that corrupts the producer or otherwise makes it unusable.
* We want to handle non-fatal exceptions differently because they can often be handled by the user
* and that's preferable because it gives users that drop exceptions a way to do that with less
* data loss (no collateral damage from batches of messages getting dropped)
*
* @param exception the exception to check
* @return true if the exception is unrecoverable.
* @return the current producer. Never returns null.
*/
private def isFatalException(exception: Exception): Boolean = {
exception match {
case _: SerializationException => false
case _: ClassCastException => false
case _ => true
private def getOrCreateCurrentProducer = {
var currentProducer = producerRef.get

if (currentProducer == null) {
if (dropProducerExceptions && !stopped) {
// Note: While this lock prevents others from creating a new producer, they could still set it to null.
producerCreationLock.synchronized {
currentProducer = producerRef.get
if (currentProducer == null) {
currentProducer = getProducer()
producerRef.set(currentProducer)
}
}
// Invariant: currentProducer must not be null at this point.
} else {
throw new SystemProducerException("Kafka producer is null.")
}
}
currentProducer
}
}
Expand Up @@ -41,7 +41,7 @@ class TestKafkaSystemProducer {
systemProducer.register("test")
systemProducer.start
systemProducer.send("test", someMessage)
assertEquals(1, systemProducer.producer.asInstanceOf[MockProducer[Array[Byte], Array[Byte]]].history().size())
assertEquals(1, systemProducer.producerRef.get().asInstanceOf[MockProducer[Array[Byte], Array[Byte]]].history().size())
systemProducer.stop
}

Expand Down Expand Up @@ -207,7 +207,7 @@ class TestKafkaSystemProducer {
val mockProducer = new MockKafkaProducer(1, "test", 1)
val producer = new KafkaSystemProducer(systemName = "test",
getProducer = () => {
mockProducer.open() // A new producer is never closed
mockProducer.open() // A new producer would not already be closed, so reset it.
mockProducer
},
metrics = producerMetrics)
Expand All @@ -219,6 +219,7 @@ class TestKafkaSystemProducer {
mockProducer.setErrorNext(true, true, new RecordTooLargeException())
producer.send("test", msg3) // Callback exception
assertTrue(mockProducer.isClosed)
assertNotNull(producer.producerRef.get())
assertEquals("Should NOT have created a new producer", 1, mockProducer.getOpenCount)

val senderException = intercept[SystemProducerException] {
Expand Down Expand Up @@ -269,7 +270,7 @@ class TestKafkaSystemProducer {
val mockProducer = new MockKafkaProducer(1, "test", 1)
val producer = new KafkaSystemProducer(systemName = "test",
getProducer = () => {
mockProducer.open() // A new producer is never closed
mockProducer.open() // A new producer would not already be closed, so reset it.
mockProducer
},
metrics = producerMetrics)
Expand All @@ -286,6 +287,7 @@ class TestKafkaSystemProducer {
mockProducer.setErrorNext(true, true, new RecordTooLargeException())
producer.send("test1", msg3) // Callback exception
assertTrue(mockProducer.isClosed)
assertNotNull(producer.producerRef.get())
assertEquals("Should NOT have created a new producer", 1, mockProducer.getOpenCount)

// Subsequent sends
Expand Down Expand Up @@ -343,7 +345,7 @@ class TestKafkaSystemProducer {
val mockProducer = new MockKafkaProducer(1, "test", 1)
val producer = new KafkaSystemProducer(systemName = "test",
getProducer = () => {
mockProducer.open() // A new producer is never closed
mockProducer.open() // A new producer would not already be closed, so reset it.
mockProducer
},
metrics = producerMetrics)
Expand All @@ -359,6 +361,7 @@ class TestKafkaSystemProducer {
}
assertTrue(sendException.getCause.isInstanceOf[SerializationException])
assertFalse(mockProducer.isClosed)
assertNotNull(producer.producerRef.get())
assertEquals("Should NOT have created a new producer", 1, mockProducer.getOpenCount)

producer.send("test1", msg3) // Should be able to resend msg3
Expand Down Expand Up @@ -406,7 +409,7 @@ class TestKafkaSystemProducer {
val mockProducer = new MockKafkaProducer(1, "test", 1)
val producer = new KafkaSystemProducer(systemName = "test",
getProducer = () => {
mockProducer.open() // A new producer is never closed
mockProducer.open() // A new producer would not already be closed, so reset it.
mockProducer
},
metrics = producerMetrics,
Expand All @@ -418,13 +421,18 @@ class TestKafkaSystemProducer {
producer.send("test", msg2)
mockProducer.setErrorNext(true, true, new RecordTooLargeException())
producer.send("test", msg3) // Callback exception
assertFalse(mockProducer.isClosed)
assertEquals("Should have created a new producer", 2, mockProducer.getOpenCount)
assertTrue(mockProducer.isClosed)
assertNull(producer.producerRef.get())
assertEquals("Should not have created a new producer", 1, mockProducer.getOpenCount)

producer.send("test", msg4) // Should succeed because the producer recovered.
assertFalse(mockProducer.isClosed)
assertNotNull(producer.producerRef.get())
assertEquals("Should have created a new producer", 2, mockProducer.getOpenCount)
producer.flush("test") // Should not throw

producer.send("test", msg5) // Should be able to send again after flush
assertEquals("Should not have created a new producer", 2, mockProducer.getOpenCount)
producer.flush("test")

assertEquals(4, mockProducer.getMsgsSent) // every message except the one with the error should get sent
Expand Down Expand Up @@ -456,7 +464,7 @@ class TestKafkaSystemProducer {
val mockProducer = new MockKafkaProducer(1, "test", 1)
val producer = new KafkaSystemProducer(systemName = "test",
getProducer = () => {
mockProducer.open() // A new producer is never closed
mockProducer.open() // A new producer would not already be closed, so reset it.
mockProducer
},
metrics = producerMetrics,
Expand All @@ -473,12 +481,17 @@ class TestKafkaSystemProducer {
// Inject error for next send
mockProducer.setErrorNext(true, true, new RecordTooLargeException())
producer.send("test1", msg3) // Callback exception
assertFalse(mockProducer.isClosed)
assertEquals("Should have created a new producer", 2, mockProducer.getOpenCount)
assertTrue(mockProducer.isClosed)
assertNull(producer.producerRef.get())
assertEquals("Should not have created a new producer", 1, mockProducer.getOpenCount)

// Subsequent sends
producer.send("test1", msg4) // Should succeed because the producer recovered.
assertFalse(mockProducer.isClosed)
assertEquals("Should have created a new producer", 2, mockProducer.getOpenCount)
assertNotNull(producer.producerRef.get())
producer.send("test2", msg5) // Second source should also not have any error.
assertEquals("Should not have created a new producer", 2, mockProducer.getOpenCount)

// Flushes
producer.flush("test2") // Should not throw for test2
Expand All @@ -503,7 +516,7 @@ class TestKafkaSystemProducer {
val mockProducer = new MockKafkaProducer(1, "test", 1)
val producer = new KafkaSystemProducer(systemName = "test",
getProducer = () => {
mockProducer.open() // A new producer is never closed
mockProducer.open() // A new producer would not already be closed, so reset it.
mockProducer
},
metrics = producerMetrics,
Expand All @@ -520,9 +533,13 @@ class TestKafkaSystemProducer {
}
assertTrue(sendException.getCause.isInstanceOf[SerializationException])
assertFalse(mockProducer.isClosed)
assertNotNull(producer.producerRef.get()) // Synchronous error; producer should not be recreated
assertEquals("Should NOT have created a new producer", 1, mockProducer.getOpenCount)

producer.send("test1", msg3) // Should be able to resend msg3
assertFalse(mockProducer.isClosed)
assertEquals("Should NOT have created a new producer", 1, mockProducer.getOpenCount)
assertNotNull(producer.producerRef.get())
producer.send("test2", msg4) // Second source should not be affected

producer.flush("test1") // Flush should be unaffected
Expand Down