Skip to content

Commit

Permalink
[SPARK-19721][SS][BRANCH-2.1] Good error message for version mismatch…
Browse files Browse the repository at this point in the history
… in log files

## Problem

There are several places where we write out version identifiers in various logs for structured streaming (usually `v1`). However, in the places where we check for this, we throw a confusing error message.

## What changes were proposed in this pull request?

This patch made two major changes:
1. added a `parseVersion(...)` method, and based on this method, fixed the following places the way they did version checking (no other place needed to do this checking):
```
HDFSMetadataLog
  - CompactibleFileStreamLog  ------------> fixed with this patch
    - FileStreamSourceLog  ---------------> inherited the fix of `CompactibleFileStreamLog`
    - FileStreamSinkLog  -----------------> inherited the fix of `CompactibleFileStreamLog`
  - OffsetSeqLog  ------------------------> fixed with this patch
  - anonymous subclass in KafkaSource  ---> fixed with this patch
```

2. changed the type of `FileStreamSinkLog.VERSION`, `FileStreamSourceLog.VERSION` etc. from `String` to `Int`, so that we can identify newer versions via `version > 1` instead of `version != "v1"`
    - note this didn't break any backwards compatibility -- we are still writing out `"v1"` and reading back `"v1"`

## Exception message with this patch
```
java.lang.IllegalStateException: Failed to read log file /private/var/folders/nn/82rmvkk568sd8p3p8tb33trw0000gn/T/spark-86867b65-0069-4ef1-b0eb-d8bd258ff5b8/0. UnsupportedLogVersion: maximum supported log version is v1, but encountered v99. The log file was produced by a newer version of Spark and cannot be read by this version. Please upgrade.
	at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.get(HDFSMetadataLog.scala:202)
	at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite$$anonfun$3$$anonfun$apply$mcV$sp$2.apply(OffsetSeqLogSuite.scala:78)
	at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite$$anonfun$3$$anonfun$apply$mcV$sp$2.apply(OffsetSeqLogSuite.scala:75)
	at org.apache.spark.sql.test.SQLTestUtils$class.withTempDir(SQLTestUtils.scala:133)
	at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite.withTempDir(OffsetSeqLogSuite.scala:26)
	at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite$$anonfun$3.apply$mcV$sp(OffsetSeqLogSuite.scala:75)
	at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite$$anonfun$3.apply(OffsetSeqLogSuite.scala:75)
	at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite$$anonfun$3.apply(OffsetSeqLogSuite.scala:75)
	at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
	at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
```

## How was this patch tested?

unit tests

Author: Liwei Lin <lwlin7@gmail.com>

Closes #17327 from lw-lin/good-msg-2.1.
  • Loading branch information
lw-lin authored and zsxwing committed Mar 17, 2017
1 parent 4b977ff commit 710b555
Show file tree
Hide file tree
Showing 11 changed files with 143 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ private[kafka010] class KafkaSource(
override def serialize(metadata: KafkaSourceOffset, out: OutputStream): Unit = {
out.write(0) // A zero byte is written to support Spark 2.1.0 (SPARK-19517)
val writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8))
writer.write(VERSION)
writer.write("v" + VERSION + "\n")
writer.write(metadata.json)
writer.flush
}
Expand All @@ -111,13 +111,13 @@ private[kafka010] class KafkaSource(
// HDFSMetadataLog guarantees that it never creates a partial file.
assert(content.length != 0)
if (content(0) == 'v') {
if (content.startsWith(VERSION)) {
KafkaSourceOffset(SerializedOffset(content.substring(VERSION.length)))
val indexOfNewLine = content.indexOf("\n")
if (indexOfNewLine > 0) {
val version = parseVersion(content.substring(0, indexOfNewLine), VERSION)
KafkaSourceOffset(SerializedOffset(content.substring(indexOfNewLine + 1)))
} else {
val versionInFile = content.substring(0, content.indexOf("\n"))
throw new IllegalStateException(
s"Unsupported format. Expected version is ${VERSION.stripLineEnd} " +
s"but was $versionInFile. Please upgrade your Spark.")
s"Log file was malformed: failed to detect the log file version line.")
}
} else {
// The log was generated by Spark 2.1.0
Expand Down Expand Up @@ -351,7 +351,7 @@ private[kafka010] object KafkaSource {
| source option "failOnDataLoss" to "false".
""".stripMargin

private val VERSION = "v1\n"
private[kafka010] val VERSION = 1

def getSortedExecutorList(sc: SparkContext): Array[String] = {
val bm = sc.env.blockManager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ class KafkaSourceSuite extends KafkaSourceTest {
override def serialize(metadata: KafkaSourceOffset, out: OutputStream): Unit = {
out.write(0)
val writer = new BufferedWriter(new OutputStreamWriter(out, UTF_8))
writer.write(s"v0\n${metadata.json}")
writer.write(s"v99999\n${metadata.json}")
writer.flush
}
}
Expand All @@ -225,7 +225,12 @@ class KafkaSourceSuite extends KafkaSourceTest {
source.getOffset.get // Read initial offset
}

assert(e.getMessage.contains("Please upgrade your Spark"))
Seq(
s"maximum supported log version is v${KafkaSource.VERSION}, but encountered v99999",
"produced by a newer version of Spark and cannot be read by this version"
).foreach { message =>
assert(e.getMessage.contains(message))
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import org.apache.spark.sql.SparkSession
* doing a compaction, it will read all old log files and merge them with the new batch.
*/
abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](
metadataLogVersion: String,
metadataLogVersion: Int,
sparkSession: SparkSession,
path: String)
extends HDFSMetadataLog[Array[T]](sparkSession, path) {
Expand Down Expand Up @@ -134,7 +134,7 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](

override def serialize(logData: Array[T], out: OutputStream): Unit = {
// called inside a try-finally where the underlying stream is closed in the caller
out.write(metadataLogVersion.getBytes(UTF_8))
out.write(("v" + metadataLogVersion).getBytes(UTF_8))
logData.foreach { data =>
out.write('\n')
out.write(Serialization.write(data).getBytes(UTF_8))
Expand All @@ -146,10 +146,7 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](
if (!lines.hasNext) {
throw new IllegalStateException("Incomplete log file")
}
val version = lines.next()
if (version != metadataLogVersion) {
throw new IllegalStateException(s"Unknown log version: ${version}")
}
val version = parseVersion(lines.next(), metadataLogVersion)
lines.map(Serialization.read[T]).toArray
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ object SinkFileStatus {
* (drops the deleted files).
*/
class FileStreamSinkLog(
metadataLogVersion: String,
metadataLogVersion: Int,
sparkSession: SparkSession,
path: String)
extends CompactibleFileStreamLog[SinkFileStatus](metadataLogVersion, sparkSession, path) {
Expand Down Expand Up @@ -106,7 +106,7 @@ class FileStreamSinkLog(
}

object FileStreamSinkLog {
val VERSION = "v1"
val VERSION = 1
val DELETE_ACTION = "delete"
val ADD_ACTION = "add"
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.sql.execution.streaming.FileStreamSource.FileEntry
import org.apache.spark.sql.internal.SQLConf

class FileStreamSourceLog(
metadataLogVersion: String,
metadataLogVersion: Int,
sparkSession: SparkSession,
path: String)
extends CompactibleFileStreamLog[FileEntry](metadataLogVersion, sparkSession, path) {
Expand Down Expand Up @@ -120,5 +120,5 @@ class FileStreamSourceLog(
}

object FileStreamSourceLog {
val VERSION = "v1"
val VERSION = 1
}
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,11 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
val input = fileManager.open(batchMetadataFile)
try {
Some(deserialize(input))
} catch {
case ise: IllegalStateException =>
// re-throw the exception with the log file path added
throw new IllegalStateException(
s"Failed to read log file $batchMetadataFile. ${ise.getMessage}", ise)
} finally {
IOUtils.closeQuietly(input)
}
Expand Down Expand Up @@ -304,6 +309,37 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
new FileSystemManager(metadataPath, hadoopConf)
}
}

/**
* Parse the log version from the given `text` -- will throw exception when the parsed version
* exceeds `maxSupportedVersion`, or when `text` is malformed (such as "xyz", "v", "v-1",
* "v123xyz" etc.)
*/
private[sql] def parseVersion(text: String, maxSupportedVersion: Int): Int = {
if (text.length > 0 && text(0) == 'v') {
val version =
try {
text.substring(1, text.length).toInt
} catch {
case _: NumberFormatException =>
throw new IllegalStateException(s"Log file was malformed: failed to read correct log " +
s"version from $text.")
}
if (version > 0) {
if (version > maxSupportedVersion) {
throw new IllegalStateException(s"UnsupportedLogVersion: maximum supported log version " +
s"is v${maxSupportedVersion}, but encountered v$version. The log file was produced " +
s"by a newer version of Spark and cannot be read by this version. Please upgrade.")
} else {
return version
}
}
}

// reaching here means we failed to read the correct log version
throw new IllegalStateException(s"Log file was malformed: failed to read correct log " +
s"version from $text.")
}
}

object HDFSMetadataLog {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,8 @@ class OffsetSeqLog(sparkSession: SparkSession, path: String)
if (!lines.hasNext) {
throw new IllegalStateException("Incomplete log file")
}
val version = lines.next()
if (version != OffsetSeqLog.VERSION) {
throw new IllegalStateException(s"Unknown log version: ${version}")
}

val version = parseVersion(lines.next(), OffsetSeqLog.VERSION)

// read metadata
val metadata = lines.next().trim match {
Expand All @@ -70,7 +68,7 @@ class OffsetSeqLog(sparkSession: SparkSession, path: String)

override protected def serialize(offsetSeq: OffsetSeq, out: OutputStream): Unit = {
// called inside a try-finally where the underlying stream is closed in the caller
out.write(OffsetSeqLog.VERSION.getBytes(UTF_8))
out.write(("v" + OffsetSeqLog.VERSION).getBytes(UTF_8))

// write metadata
out.write('\n')
Expand All @@ -88,6 +86,6 @@ class OffsetSeqLog(sparkSession: SparkSession, path: String)
}

object OffsetSeqLog {
private val VERSION = "v1"
private[streaming] val VERSION = 1
private val SERIALIZED_VOID_OFFSET = "-"
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext
defaultMinBatchesToRetain = 1,
compactibleLog => {
val logs = Array("entry_1", "entry_2", "entry_3")
val expected = s"""${FakeCompactibleFileStreamLog.VERSION}
val expected = s"""v${FakeCompactibleFileStreamLog.VERSION}
|"entry_1"
|"entry_2"
|"entry_3"""".stripMargin
Expand All @@ -132,7 +132,7 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext

baos.reset()
compactibleLog.serialize(Array(), baos)
assert(FakeCompactibleFileStreamLog.VERSION === baos.toString(UTF_8.name()))
assert(s"v${FakeCompactibleFileStreamLog.VERSION}" === baos.toString(UTF_8.name()))
})
}

Expand All @@ -142,7 +142,7 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext
defaultCompactInterval = 3,
defaultMinBatchesToRetain = 1,
compactibleLog => {
val logs = s"""${FakeCompactibleFileStreamLog.VERSION}
val logs = s"""v${FakeCompactibleFileStreamLog.VERSION}
|"entry_1"
|"entry_2"
|"entry_3"""".stripMargin
Expand All @@ -152,10 +152,36 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext

assert(Nil ===
compactibleLog.deserialize(
new ByteArrayInputStream(FakeCompactibleFileStreamLog.VERSION.getBytes(UTF_8))))
new ByteArrayInputStream(s"v${FakeCompactibleFileStreamLog.VERSION}".getBytes(UTF_8))))
})
}

test("deserialization log written by future version") {
withTempDir { dir =>
def newFakeCompactibleFileStreamLog(version: Int): FakeCompactibleFileStreamLog =
new FakeCompactibleFileStreamLog(
version,
_fileCleanupDelayMs = Long.MaxValue, // this param does not matter here in this test case
_defaultCompactInterval = 3, // this param does not matter here in this test case
_defaultMinBatchesToRetain = 1, // this param does not matter here in this test case
spark,
dir.getCanonicalPath)

val writer = newFakeCompactibleFileStreamLog(version = 2)
val reader = newFakeCompactibleFileStreamLog(version = 1)
writer.add(0, Array("entry"))
val e = intercept[IllegalStateException] {
reader.get(0)
}
Seq(
"maximum supported log version is v1, but encountered v2",
"produced by a newer version of Spark and cannot be read by this version"
).foreach { message =>
assert(e.getMessage.contains(message))
}
}
}

testWithUninterruptibleThread("compact") {
withFakeCompactibleFileStreamLog(
fileCleanupDelayMs = Long.MaxValue,
Expand Down Expand Up @@ -219,6 +245,7 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext
): Unit = {
withTempDir { file =>
val compactibleLog = new FakeCompactibleFileStreamLog(
FakeCompactibleFileStreamLog.VERSION,
fileCleanupDelayMs,
defaultCompactInterval,
defaultMinBatchesToRetain,
Expand All @@ -230,17 +257,18 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext
}

object FakeCompactibleFileStreamLog {
val VERSION = "test_version"
val VERSION = 1
}

class FakeCompactibleFileStreamLog(
metadataLogVersion: Int,
_fileCleanupDelayMs: Long,
_defaultCompactInterval: Int,
_defaultMinBatchesToRetain: Int,
sparkSession: SparkSession,
path: String)
extends CompactibleFileStreamLog[String](
FakeCompactibleFileStreamLog.VERSION,
metadataLogVersion,
sparkSession,
path
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
action = FileStreamSinkLog.ADD_ACTION))

// scalastyle:off
val expected = s"""$VERSION
val expected = s"""v$VERSION
|{"path":"/a/b/x","size":100,"isDir":false,"modificationTime":1000,"blockReplication":1,"blockSize":10000,"action":"add"}
|{"path":"/a/b/y","size":200,"isDir":false,"modificationTime":2000,"blockReplication":2,"blockSize":20000,"action":"delete"}
|{"path":"/a/b/z","size":300,"isDir":false,"modificationTime":3000,"blockReplication":3,"blockSize":30000,"action":"add"}""".stripMargin
Expand All @@ -84,14 +84,14 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
assert(expected === baos.toString(UTF_8.name()))
baos.reset()
sinkLog.serialize(Array(), baos)
assert(VERSION === baos.toString(UTF_8.name()))
assert(s"v$VERSION" === baos.toString(UTF_8.name()))
}
}

test("deserialize") {
withFileStreamSinkLog { sinkLog =>
// scalastyle:off
val logs = s"""$VERSION
val logs = s"""v$VERSION
|{"path":"/a/b/x","size":100,"isDir":false,"modificationTime":1000,"blockReplication":1,"blockSize":10000,"action":"add"}
|{"path":"/a/b/y","size":200,"isDir":false,"modificationTime":2000,"blockReplication":2,"blockSize":20000,"action":"delete"}
|{"path":"/a/b/z","size":300,"isDir":false,"modificationTime":3000,"blockReplication":3,"blockSize":30000,"action":"add"}""".stripMargin
Expand Down Expand Up @@ -125,7 +125,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {

assert(expected === sinkLog.deserialize(new ByteArrayInputStream(logs.getBytes(UTF_8))))

assert(Nil === sinkLog.deserialize(new ByteArrayInputStream(VERSION.getBytes(UTF_8))))
assert(Nil === sinkLog.deserialize(new ByteArrayInputStream(s"v$VERSION".getBytes(UTF_8))))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,33 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
}
}

test("HDFSMetadataLog: parseVersion") {
withTempDir { dir =>
val metadataLog = new HDFSMetadataLog[String](spark, dir.getAbsolutePath)
def assertLogFileMalformed(func: => Int): Unit = {
val e = intercept[IllegalStateException] { func }
assert(e.getMessage.contains(s"Log file was malformed: failed to read correct log version"))
}
assertLogFileMalformed { metadataLog.parseVersion("", 100) }
assertLogFileMalformed { metadataLog.parseVersion("xyz", 100) }
assertLogFileMalformed { metadataLog.parseVersion("v10.x", 100) }
assertLogFileMalformed { metadataLog.parseVersion("10", 100) }
assertLogFileMalformed { metadataLog.parseVersion("v0", 100) }
assertLogFileMalformed { metadataLog.parseVersion("v-10", 100) }

assert(metadataLog.parseVersion("v10", 10) === 10)
assert(metadataLog.parseVersion("v10", 100) === 10)

val e = intercept[IllegalStateException] { metadataLog.parseVersion("v200", 100) }
Seq(
"maximum supported log version is v100, but encountered v200",
"produced by a newer version of Spark and cannot be read by this version"
).foreach { message =>
assert(e.getMessage.contains(message))
}
}
}

testWithUninterruptibleThread("HDFSMetadataLog: restart") {
withTempDir { temp =>
val metadataLog = new HDFSMetadataLog[String](spark, temp.getAbsolutePath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.streaming
import java.io.File

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.util.stringToFile
import org.apache.spark.sql.test.SharedSQLContext

class OffsetSeqLogSuite extends SparkFunSuite with SharedSQLContext {
Expand Down Expand Up @@ -70,6 +71,22 @@ class OffsetSeqLogSuite extends SparkFunSuite with SharedSQLContext {
}
}

test("deserialization log written by future version") {
withTempDir { dir =>
stringToFile(new File(dir, "0"), "v99999")
val log = new OffsetSeqLog(spark, dir.getCanonicalPath)
val e = intercept[IllegalStateException] {
log.get(0)
}
Seq(
s"maximum supported log version is v${OffsetSeqLog.VERSION}, but encountered v99999",
"produced by a newer version of Spark and cannot be read by this version"
).foreach { message =>
assert(e.getMessage.contains(message))
}
}
}

test("read Spark 2.1.0 log format") {
val (batchId, offsetSeq) = readFromResource("offset-log-version-2.1.0")
assert(batchId === 0)
Expand Down

0 comments on commit 710b555

Please sign in to comment.