Skip to content

Commit

Permalink
[SPARK-22403][SS] Add optional checkpointLocation argument to Structu…
Browse files Browse the repository at this point in the history
…redKafkaWordCount example

## What changes were proposed in this pull request?

When run in YARN cluster mode, the StructuredKafkaWordCount example fails because Spark tries to create a temporary checkpoint location in a subdirectory of the path given by java.io.tmpdir, and YARN sets java.io.tmpdir to a path in the local filesystem that usually does not correspond to an existing path in the distributed filesystem.
Add an optional checkpointLocation argument to the StructuredKafkaWordCount example so that users can specify the checkpoint location and avoid this issue.

## How was this patch tested?

Built and ran the example manually on YARN client and cluster mode.

Author: Wing Yew Poon <wypoon@cloudera.com>

Closes #19703 from wypoon/SPARK-22403.
  • Loading branch information
wypoon authored and zsxwing committed Nov 10, 2017
1 parent 9eb7096 commit 11c4021
Showing 1 changed file with 10 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@
// scalastyle:off println
package org.apache.spark.examples.sql.streaming

import java.util.UUID

import org.apache.spark.sql.SparkSession

/**
* Consumes messages from one or more topics in Kafka and does wordcount.
* Usage: StructuredKafkaWordCount <bootstrap-servers> <subscribe-type> <topics>
* [<checkpoint-location>]
* <bootstrap-servers> The Kafka "bootstrap.servers" configuration. A
* comma-separated list of host:port.
* <subscribe-type> There are three kinds of type, i.e. 'assign', 'subscribe',
Expand All @@ -36,6 +39,8 @@ import org.apache.spark.sql.SparkSession
* |- Only one of "assign, "subscribe" or "subscribePattern" options can be
* | specified for Kafka source.
* <topics> Different value format depends on the value of 'subscribe-type'.
* <checkpoint-location> Directory in which to create checkpoints. If not
* provided, defaults to a randomized directory in /tmp.
*
* Example:
* `$ bin/run-example \
Expand All @@ -46,11 +51,13 @@ object StructuredKafkaWordCount {
def main(args: Array[String]): Unit = {
if (args.length < 3) {
System.err.println("Usage: StructuredKafkaWordCount <bootstrap-servers> " +
"<subscribe-type> <topics>")
"<subscribe-type> <topics> [<checkpoint-location>]")
System.exit(1)
}

val Array(bootstrapServers, subscribeType, topics) = args
val Array(bootstrapServers, subscribeType, topics, _*) = args
val checkpointLocation =
if (args.length > 3) args(3) else "/tmp/temporary-" + UUID.randomUUID.toString

val spark = SparkSession
.builder
Expand All @@ -76,6 +83,7 @@ object StructuredKafkaWordCount {
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.option("checkpointLocation", checkpointLocation)
.start()

query.awaitTermination()
Expand Down

0 comments on commit 11c4021

Please sign in to comment.