Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-25116][TESTS]Fix the Kafka cluster leak and clean up cached producers #22106

Closed
wants to merge 9 commits into from

Conversation

zsxwing
Copy link
Member

@zsxwing zsxwing commented Aug 14, 2018

What changes were proposed in this pull request?

KafkaContinuousSinkSuite leaks a Kafka cluster because both KafkaSourceTest and KafkaContinuousSinkSuite create a Kafka cluster but afterAll only shuts down one cluster. This leaks a Kafka cluster and causes that some Kafka thread crash and kill JVM when SBT is trying to clean up tests.

This PR fixes the leak and also adds a shut down hook to detect Kafka cluster leak.

In additions, it also fixes AdminClient leak and cleans up cached producers (When a record is writtn using a producer, the producer will keep refreshing the topic and I don't find an API to clear it except closing the producer) to eliminate the following annoying logs:

8/13 15:34:42.568 kafka-admin-client-thread | adminclient-4 WARN NetworkClient: [AdminClient clientId=adminclient-4] Connection to node 0 could not be established. Broker may not be available.
18/08/13 15:34:42.570 kafka-admin-client-thread | adminclient-6 WARN NetworkClient: [AdminClient clientId=adminclient-6] Connection to node 0 could not be established. Broker may not be available.
18/08/13 15:34:42.606 kafka-admin-client-thread | adminclient-8 WARN NetworkClient: [AdminClient clientId=adminclient-8] Connection to node -1 could not be established. Broker may not be available.
18/08/13 15:34:42.729 kafka-producer-network-thread | producer-797 WARN NetworkClient: [Producer clientId=producer-797] Connection to node -1 could not be established. Broker may not be available.
18/08/13 15:34:42.906 kafka-producer-network-thread | producer-1598 WARN NetworkClient: [Producer clientId=producer-1598] Connection to node 0 could not be established. Broker may not be available.

I also reverted b5eb542 introduced by #22097 since it doesn't help.

How was this patch tested?

Jenkins

@zsxwing zsxwing changed the title [SPARK-25116][Tests]Fix the kafka cluster leak and clean up cached producers [SPARK-25116][TESTS]Fix the Kafka cluster leak and clean up cached producers Aug 14, 2018
@SparkQA
Copy link

SparkQA commented Aug 14, 2018

Test build #4248 has finished for PR 22106 at commit 574f5fa.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 14, 2018

Test build #94761 has finished for PR 22106 at commit 574f5fa.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 14, 2018

Test build #4249 has finished for PR 22106 at commit 57a52a9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 14, 2018

Test build #4250 has finished for PR 22106 at commit 57a52a9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 14, 2018

Test build #4251 has finished for PR 22106 at commit 57a52a9.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 14, 2018

Test build #4252 has finished for PR 22106 at commit 57a52a9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 14, 2018

Test build #94763 has finished for PR 22106 at commit 57a52a9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 14, 2018

Test build #94774 has finished for PR 22106 at commit f01cf8d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 14, 2018

Test build #4253 has finished for PR 22106 at commit f01cf8d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 15, 2018

Test build #4255 has finished for PR 22106 at commit f01cf8d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 15, 2018

Test build #4256 has finished for PR 22106 at commit f01cf8d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 15, 2018

Test build #4254 has finished for PR 22106 at commit f01cf8d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 15, 2018

Test build #4257 has finished for PR 22106 at commit f01cf8d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -216,7 +216,7 @@ class KafkaContinuousInputPartitionReader(
} catch {
// We didn't read within the timeout. We're supposed to block indefinitely for new data, so
// swallow and ignore this.
case _: TimeoutException =>
case _: TimeoutException | _: org.apache.kafka.common.errors.TimeoutException =>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to fix https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/4254/

org.apache.kafka.common.errors.TimeoutException: Timeout of 3000ms expired before the position for partition failOnDataLoss-2-0 could be determined triggered a task retry but as continuous processing doesn't support task retries, it failed with org.apache.spark.sql.execution.streaming.continuous.ContinuousTaskRetryException: Continuous execution does not support task retry.

withBrokerProps = Map("auto.create.topics.enable" -> "false"))
testUtils.setup()
}
override val brokerProps = Map("auto.create.topics.enable" -> "false")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the fix for Kafka cluster leak

@SparkQA
Copy link

SparkQA commented Aug 15, 2018

Test build #94815 has finished for PR 22106 at commit 621099e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 15, 2018

Test build #4262 has finished for PR 22106 at commit 63cc11d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 15, 2018

Test build #94816 has finished for PR 22106 at commit 63cc11d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 15, 2018

Test build #4259 has finished for PR 22106 at commit 63cc11d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 15, 2018

Test build #4260 has finished for PR 22106 at commit 63cc11d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 15, 2018

Test build #4261 has finished for PR 22106 at commit 63cc11d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 15, 2018

Test build #4265 has started for PR 22106 at commit 63cc11d.

@SparkQA
Copy link

SparkQA commented Aug 15, 2018

Test build #4267 has started for PR 22106 at commit 63cc11d.

@SparkQA
Copy link

SparkQA commented Aug 15, 2018

Test build #4268 has finished for PR 22106 at commit 63cc11d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 16, 2018

Test build #4273 has finished for PR 22106 at commit 63cc11d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 16, 2018

Test build #4275 has finished for PR 22106 at commit 63cc11d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 16, 2018

Test build #4270 has finished for PR 22106 at commit 63cc11d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 16, 2018

Test build #4274 has finished for PR 22106 at commit 63cc11d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 16, 2018

Test build #4269 has finished for PR 22106 at commit 63cc11d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 16, 2018

Test build #4271 has finished for PR 22106 at commit 63cc11d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 16, 2018

Test build #4272 has finished for PR 22106 at commit 63cc11d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 16, 2018

Test build #4276 has finished for PR 22106 at commit 63cc11d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 16, 2018

Test build #4266 has finished for PR 22106 at commit 63cc11d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 16, 2018

Test build #4264 has finished for PR 22106 at commit 63cc11d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zsxwing
Copy link
Member Author

zsxwing commented Aug 17, 2018

Test failures in 4264 and 4266 are unrelated. The latest changes passed on Jenkins 15 times.

@zsxwing
Copy link
Member Author

zsxwing commented Aug 17, 2018

cc @srowen

private lazy val cacheExpireTimeout: Long =
SparkEnv.get.conf.getTimeAsMs("spark.kafka.producer.cache.timeout", "10m")
Option(SparkEnv.get).map(_.conf.getTimeAsMs(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change this to call clear in afterAll even if the SparkContext has been stopped.

@@ -130,6 +130,13 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L

/** setup the whole embedded servers, including Zookeeper and Kafka brokers */
def setup(): Unit = {
// Set up a KafkaTestUtils leak detector so that we can see where the leak KafkaTestUtils is
// created.
val exception = new SparkException("It was created at: ")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice trick

producer.close()
producer = null
}
brokerReady = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do these need to be thread safe? Is boot up, and cleanup serial?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. We set up in beforeAll and clean up in afterAll, which will be in the same thread.

Copy link
Contributor

@brkyvz brkyvz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Had one question

@zsxwing
Copy link
Member Author

zsxwing commented Aug 17, 2018

Thanks. Merging to master.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants