Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-20496][SS] Bug in KafkaWriter Looks at Unanalyzed Plans #17787

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -47,7 +47,7 @@ private[kafka010] object KafkaWriter extends Logging {
queryExecution: QueryExecution,
kafkaParameters: ju.Map[String, Object],
topic: Option[String] = None): Unit = {
val schema = queryExecution.logical.output
val schema = queryExecution.analyzed.output
schema.find(_.name == TOPIC_ATTRIBUTE_NAME).getOrElse(
if (topic == None) {
throw new AnalysisException(s"topic option required when no " +
Expand Down
Expand Up @@ -108,6 +108,31 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext {
s"save mode overwrite not allowed for kafka"))
}

test("batch - enforce analyzed plans SPARK-20496") {
val df = Seq(1, 1).toDF("key").selectExpr("key", "2 as value")
/* bad dataframe plan */
val input = df.union(df)
val topic = newTopic()
testUtils.createTopic(topic)

/* No topic field or topic option */
var writer: StreamingQuery = null
var ex: Exception = null
try {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need for try block

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

ex = intercept[AnalysisException] {
writer = createKafkaWriter(input.toDF())(
withSelectExpr = "value as key", "value"
)
writer.processAllAvailable()
}
} finally {
}
assert(ex.getMessage
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the error message when you undo your changes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"'writeStream' can be called only on streaming Dataset/DataFrame;"

.toLowerCase(Locale.ROOT)
.contains("can be called only on streaming"))

}

test("streaming - write to kafka with topic field") {
val input = MemoryStream[String]
val topic = newTopic()
Expand Down