Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-19721][SS][BRANCH-2.1] Good error message for version mismatch in log files #17327

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ private[kafka010] class KafkaSource(
override def serialize(metadata: KafkaSourceOffset, out: OutputStream): Unit = {
out.write(0) // A zero byte is written to support Spark 2.1.0 (SPARK-19517)
val writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8))
writer.write(VERSION)
writer.write("v" + VERSION + "\n")
writer.write(metadata.json)
writer.flush
}
Expand All @@ -111,13 +111,13 @@ private[kafka010] class KafkaSource(
// HDFSMetadataLog guarantees that it never creates a partial file.
assert(content.length != 0)
if (content(0) == 'v') {
if (content.startsWith(VERSION)) {
KafkaSourceOffset(SerializedOffset(content.substring(VERSION.length)))
val indexOfNewLine = content.indexOf("\n")
if (indexOfNewLine > 0) {
val version = parseVersion(content.substring(0, indexOfNewLine), VERSION)
KafkaSourceOffset(SerializedOffset(content.substring(indexOfNewLine + 1)))
} else {
val versionInFile = content.substring(0, content.indexOf("\n"))
throw new IllegalStateException(
s"Unsupported format. Expected version is ${VERSION.stripLineEnd} " +
s"but was $versionInFile. Please upgrade your Spark.")
s"Log file was malformed: failed to detect the log file version line.")
}
} else {
// The log was generated by Spark 2.1.0
Expand Down Expand Up @@ -351,7 +351,7 @@ private[kafka010] object KafkaSource {
| source option "failOnDataLoss" to "false".
""".stripMargin

private val VERSION = "v1\n"
private[kafka010] val VERSION = 1

def getSortedExecutorList(sc: SparkContext): Array[String] = {
val bm = sc.env.blockManager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ class KafkaSourceSuite extends KafkaSourceTest {
override def serialize(metadata: KafkaSourceOffset, out: OutputStream): Unit = {
out.write(0)
val writer = new BufferedWriter(new OutputStreamWriter(out, UTF_8))
writer.write(s"v0\n${metadata.json}")
writer.write(s"v99999\n${metadata.json}")
writer.flush
}
}
Expand All @@ -225,7 +225,12 @@ class KafkaSourceSuite extends KafkaSourceTest {
source.getOffset.get // Read initial offset
}

assert(e.getMessage.contains("Please upgrade your Spark"))
Seq(
s"maximum supported log version is v${KafkaSource.VERSION}, but encountered v99999",
"produced by a newer version of Spark and cannot be read by this version"
).foreach { message =>
assert(e.getMessage.contains(message))
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import org.apache.spark.sql.SparkSession
* doing a compaction, it will read all old log files and merge them with the new batch.
*/
abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](
metadataLogVersion: String,
metadataLogVersion: Int,
sparkSession: SparkSession,
path: String)
extends HDFSMetadataLog[Array[T]](sparkSession, path) {
Expand Down Expand Up @@ -134,7 +134,7 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](

override def serialize(logData: Array[T], out: OutputStream): Unit = {
// called inside a try-finally where the underlying stream is closed in the caller
out.write(metadataLogVersion.getBytes(UTF_8))
out.write(("v" + metadataLogVersion).getBytes(UTF_8))
logData.foreach { data =>
out.write('\n')
out.write(Serialization.write(data).getBytes(UTF_8))
Expand All @@ -146,10 +146,7 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](
if (!lines.hasNext) {
throw new IllegalStateException("Incomplete log file")
}
val version = lines.next()
if (version != metadataLogVersion) {
throw new IllegalStateException(s"Unknown log version: ${version}")
}
val version = parseVersion(lines.next(), metadataLogVersion)
lines.map(Serialization.read[T]).toArray
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ object SinkFileStatus {
* (drops the deleted files).
*/
class FileStreamSinkLog(
metadataLogVersion: String,
metadataLogVersion: Int,
sparkSession: SparkSession,
path: String)
extends CompactibleFileStreamLog[SinkFileStatus](metadataLogVersion, sparkSession, path) {
Expand Down Expand Up @@ -106,7 +106,7 @@ class FileStreamSinkLog(
}

object FileStreamSinkLog {
val VERSION = "v1"
val VERSION = 1
val DELETE_ACTION = "delete"
val ADD_ACTION = "add"
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.sql.execution.streaming.FileStreamSource.FileEntry
import org.apache.spark.sql.internal.SQLConf

class FileStreamSourceLog(
metadataLogVersion: String,
metadataLogVersion: Int,
sparkSession: SparkSession,
path: String)
extends CompactibleFileStreamLog[FileEntry](metadataLogVersion, sparkSession, path) {
Expand Down Expand Up @@ -120,5 +120,5 @@ class FileStreamSourceLog(
}

object FileStreamSourceLog {
val VERSION = "v1"
val VERSION = 1
}
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,11 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
val input = fileManager.open(batchMetadataFile)
try {
Some(deserialize(input))
} catch {
case ise: IllegalStateException =>
// re-throw the exception with the log file path added
throw new IllegalStateException(
s"Failed to read log file $batchMetadataFile. ${ise.getMessage}", ise)
} finally {
IOUtils.closeQuietly(input)
}
Expand Down Expand Up @@ -304,6 +309,37 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
new FileSystemManager(metadataPath, hadoopConf)
}
}

/**
* Parse the log version from the given `text` -- will throw exception when the parsed version
* exceeds `maxSupportedVersion`, or when `text` is malformed (such as "xyz", "v", "v-1",
* "v123xyz" etc.)
*/
private[sql] def parseVersion(text: String, maxSupportedVersion: Int): Int = {
if (text.length > 0 && text(0) == 'v') {
val version =
try {
text.substring(1, text.length).toInt
} catch {
case _: NumberFormatException =>
throw new IllegalStateException(s"Log file was malformed: failed to read correct log " +
s"version from $text.")
}
if (version > 0) {
if (version > maxSupportedVersion) {
throw new IllegalStateException(s"UnsupportedLogVersion: maximum supported log version " +
s"is v${maxSupportedVersion}, but encountered v$version. The log file was produced " +
s"by a newer version of Spark and cannot be read by this version. Please upgrade.")
} else {
return version
}
}
}

// reaching here means we failed to read the correct log version
throw new IllegalStateException(s"Log file was malformed: failed to read correct log " +
s"version from $text.")
}
}

object HDFSMetadataLog {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,8 @@ class OffsetSeqLog(sparkSession: SparkSession, path: String)
if (!lines.hasNext) {
throw new IllegalStateException("Incomplete log file")
}
val version = lines.next()
if (version != OffsetSeqLog.VERSION) {
throw new IllegalStateException(s"Unknown log version: ${version}")
}

val version = parseVersion(lines.next(), OffsetSeqLog.VERSION)

// read metadata
val metadata = lines.next().trim match {
Expand All @@ -70,7 +68,7 @@ class OffsetSeqLog(sparkSession: SparkSession, path: String)

override protected def serialize(offsetSeq: OffsetSeq, out: OutputStream): Unit = {
// called inside a try-finally where the underlying stream is closed in the caller
out.write(OffsetSeqLog.VERSION.getBytes(UTF_8))
out.write(("v" + OffsetSeqLog.VERSION).getBytes(UTF_8))

// write metadata
out.write('\n')
Expand All @@ -88,6 +86,6 @@ class OffsetSeqLog(sparkSession: SparkSession, path: String)
}

object OffsetSeqLog {
private val VERSION = "v1"
private[streaming] val VERSION = 1
private val SERIALIZED_VOID_OFFSET = "-"
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext
defaultMinBatchesToRetain = 1,
compactibleLog => {
val logs = Array("entry_1", "entry_2", "entry_3")
val expected = s"""${FakeCompactibleFileStreamLog.VERSION}
val expected = s"""v${FakeCompactibleFileStreamLog.VERSION}
|"entry_1"
|"entry_2"
|"entry_3"""".stripMargin
Expand All @@ -132,7 +132,7 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext

baos.reset()
compactibleLog.serialize(Array(), baos)
assert(FakeCompactibleFileStreamLog.VERSION === baos.toString(UTF_8.name()))
assert(s"v${FakeCompactibleFileStreamLog.VERSION}" === baos.toString(UTF_8.name()))
})
}

Expand All @@ -142,7 +142,7 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext
defaultCompactInterval = 3,
defaultMinBatchesToRetain = 1,
compactibleLog => {
val logs = s"""${FakeCompactibleFileStreamLog.VERSION}
val logs = s"""v${FakeCompactibleFileStreamLog.VERSION}
|"entry_1"
|"entry_2"
|"entry_3"""".stripMargin
Expand All @@ -152,10 +152,36 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext

assert(Nil ===
compactibleLog.deserialize(
new ByteArrayInputStream(FakeCompactibleFileStreamLog.VERSION.getBytes(UTF_8))))
new ByteArrayInputStream(s"v${FakeCompactibleFileStreamLog.VERSION}".getBytes(UTF_8))))
})
}

test("deserialization log written by future version") {
withTempDir { dir =>
def newFakeCompactibleFileStreamLog(version: Int): FakeCompactibleFileStreamLog =
new FakeCompactibleFileStreamLog(
version,
_fileCleanupDelayMs = Long.MaxValue, // this param does not matter here in this test case
_defaultCompactInterval = 3, // this param does not matter here in this test case
_defaultMinBatchesToRetain = 1, // this param does not matter here in this test case
spark,
dir.getCanonicalPath)

val writer = newFakeCompactibleFileStreamLog(version = 2)
val reader = newFakeCompactibleFileStreamLog(version = 1)
writer.add(0, Array("entry"))
val e = intercept[IllegalStateException] {
reader.get(0)
}
Seq(
"maximum supported log version is v1, but encountered v2",
"produced by a newer version of Spark and cannot be read by this version"
).foreach { message =>
assert(e.getMessage.contains(message))
}
}
}

testWithUninterruptibleThread("compact") {
withFakeCompactibleFileStreamLog(
fileCleanupDelayMs = Long.MaxValue,
Expand Down Expand Up @@ -219,6 +245,7 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext
): Unit = {
withTempDir { file =>
val compactibleLog = new FakeCompactibleFileStreamLog(
FakeCompactibleFileStreamLog.VERSION,
fileCleanupDelayMs,
defaultCompactInterval,
defaultMinBatchesToRetain,
Expand All @@ -230,17 +257,18 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext
}

object FakeCompactibleFileStreamLog {
val VERSION = "test_version"
val VERSION = 1
}

class FakeCompactibleFileStreamLog(
metadataLogVersion: Int,
_fileCleanupDelayMs: Long,
_defaultCompactInterval: Int,
_defaultMinBatchesToRetain: Int,
sparkSession: SparkSession,
path: String)
extends CompactibleFileStreamLog[String](
FakeCompactibleFileStreamLog.VERSION,
metadataLogVersion,
sparkSession,
path
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
action = FileStreamSinkLog.ADD_ACTION))

// scalastyle:off
val expected = s"""$VERSION
val expected = s"""v$VERSION
|{"path":"/a/b/x","size":100,"isDir":false,"modificationTime":1000,"blockReplication":1,"blockSize":10000,"action":"add"}
|{"path":"/a/b/y","size":200,"isDir":false,"modificationTime":2000,"blockReplication":2,"blockSize":20000,"action":"delete"}
|{"path":"/a/b/z","size":300,"isDir":false,"modificationTime":3000,"blockReplication":3,"blockSize":30000,"action":"add"}""".stripMargin
Expand All @@ -84,14 +84,14 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
assert(expected === baos.toString(UTF_8.name()))
baos.reset()
sinkLog.serialize(Array(), baos)
assert(VERSION === baos.toString(UTF_8.name()))
assert(s"v$VERSION" === baos.toString(UTF_8.name()))
}
}

test("deserialize") {
withFileStreamSinkLog { sinkLog =>
// scalastyle:off
val logs = s"""$VERSION
val logs = s"""v$VERSION
|{"path":"/a/b/x","size":100,"isDir":false,"modificationTime":1000,"blockReplication":1,"blockSize":10000,"action":"add"}
|{"path":"/a/b/y","size":200,"isDir":false,"modificationTime":2000,"blockReplication":2,"blockSize":20000,"action":"delete"}
|{"path":"/a/b/z","size":300,"isDir":false,"modificationTime":3000,"blockReplication":3,"blockSize":30000,"action":"add"}""".stripMargin
Expand Down Expand Up @@ -125,7 +125,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {

assert(expected === sinkLog.deserialize(new ByteArrayInputStream(logs.getBytes(UTF_8))))

assert(Nil === sinkLog.deserialize(new ByteArrayInputStream(VERSION.getBytes(UTF_8))))
assert(Nil === sinkLog.deserialize(new ByteArrayInputStream(s"v$VERSION".getBytes(UTF_8))))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,33 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
}
}

test("HDFSMetadataLog: parseVersion") {
withTempDir { dir =>
val metadataLog = new HDFSMetadataLog[String](spark, dir.getAbsolutePath)
def assertLogFileMalformed(func: => Int): Unit = {
val e = intercept[IllegalStateException] { func }
assert(e.getMessage.contains(s"Log file was malformed: failed to read correct log version"))
}
assertLogFileMalformed { metadataLog.parseVersion("", 100) }
assertLogFileMalformed { metadataLog.parseVersion("xyz", 100) }
assertLogFileMalformed { metadataLog.parseVersion("v10.x", 100) }
assertLogFileMalformed { metadataLog.parseVersion("10", 100) }
assertLogFileMalformed { metadataLog.parseVersion("v0", 100) }
assertLogFileMalformed { metadataLog.parseVersion("v-10", 100) }

assert(metadataLog.parseVersion("v10", 10) === 10)
assert(metadataLog.parseVersion("v10", 100) === 10)

val e = intercept[IllegalStateException] { metadataLog.parseVersion("v200", 100) }
Seq(
"maximum supported log version is v100, but encountered v200",
"produced by a newer version of Spark and cannot be read by this version"
).foreach { message =>
assert(e.getMessage.contains(message))
}
}
}

testWithUninterruptibleThread("HDFSMetadataLog: restart") {
withTempDir { temp =>
val metadataLog = new HDFSMetadataLog[String](spark, temp.getAbsolutePath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.streaming
import java.io.File

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.util.stringToFile
import org.apache.spark.sql.test.SharedSQLContext

class OffsetSeqLogSuite extends SparkFunSuite with SharedSQLContext {
Expand Down Expand Up @@ -70,6 +71,22 @@ class OffsetSeqLogSuite extends SparkFunSuite with SharedSQLContext {
}
}

test("deserialization log written by future version") {
withTempDir { dir =>
stringToFile(new File(dir, "0"), "v99999")
val log = new OffsetSeqLog(spark, dir.getCanonicalPath)
val e = intercept[IllegalStateException] {
log.get(0)
}
Seq(
s"maximum supported log version is v${OffsetSeqLog.VERSION}, but encountered v99999",
"produced by a newer version of Spark and cannot be read by this version"
).foreach { message =>
assert(e.getMessage.contains(message))
}
}
}

test("read Spark 2.1.0 log format") {
val (batchId, offsetSeq) = readFromResource("offset-log-version-2.1.0")
assert(batchId === 0)
Expand Down