Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-19949][SQL] unify bad record handling in CSV and JSON #17315

Closed
wants to merge 8 commits into from

Conversation

cloud-fan
Copy link
Contributor

What changes were proposed in this pull request?

Currently JSON and CSV have exactly the same logic about handling bad records, this PR tries to abstract it and put it in a upper level to reduce code duplication.

The overall idea is, we make the JSON and CSV parser to throw a BadRecordException, then the upper level, FailureSafeParser, handles bad records according to the parse mode.

Behavior changes:

  1. with PERMISSIVE mode, if the number of tokens doesn't match the schema, previously CSV parser will treat it as a legal record and parse as many tokens as possible. After this PR, we treat it as an illegal record, and put the raw record string in a special column, but we still parse as many tokens as possible.
  2. all logging is removed as they are not very useful in practice.

How was this patch tested?

existing tests

@cloud-fan
Copy link
Contributor Author

cc @sameeragarwal @HyukjinKwon

@SparkQA
Copy link

SparkQA commented Mar 16, 2017

Test build #74666 has finished for PR 17315 at commit adfde77.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class FailureSafeParser[IN](
  • case class BadRecordException(
  • class RowWithBadRecord(var row: InternalRow, index: Int, var record: UTF8String)

@HyukjinKwon
Copy link
Member

I support this idea. Let me try to take a close look within tomorrow to help.

@@ -391,9 +288,9 @@ class JacksonParser(

case token =>
// We cannot parse this token based on the given data type. So, we throw a
// SparkSQLJsonProcessingException and this exception will be caught by
// SparkSQLRuntimeException and this exception will be caught by
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RuntimeException ?

@@ -65,7 +65,7 @@ private[sql] class JSONOptions(
val allowBackslashEscapingAnyCharacter =
parameters.get("allowBackslashEscapingAnyCharacter").map(_.toBoolean).getOrElse(false)
val compressionCodec = parameters.get("compression").map(CompressionCodecs.getCodecClassName)
private val parseMode = parameters.getOrElse("mode", "PERMISSIVE")
val parseMode = parameters.getOrElse("mode", "PERMISSIVE")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about creating an enum, like what we are doing for SaveMode?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea this can be a good follow-up

@SparkQA
Copy link

SparkQA commented Mar 17, 2017

Test build #74728 has started for PR 17315 at commit 10e70fe.

@HyukjinKwon
Copy link
Member

(The test failure seems not related with the last commit. There was a related discussion about that -9 #16305 (comment))

@cloud-fan
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Mar 17, 2017

Test build #74738 has finished for PR 17315 at commit 10e70fe.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 17, 2017

Test build #74743 has finished for PR 17315 at commit b5aee0e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan, I left my opinion and questions. I think some (or many) of them might be okay to be handled in a follow-up.

}
} else {
(row, badRecord) => row.getOrElse {
for (i <- schema.indices) resultRow.setNullAt(i)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Up to my understanding, the last commit focuses on simplification. I like that but I think maybe we should use while here instead unless we are very sure that the byte codes are virtually the same or more efficient in the critical path. This change might not harm the simplification much here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran some tests with the codes below to help.

object ForWhile {
  def forloop = {
    val l = Array[Int](1,2,3)
    for (i <- l) {
    }
  }

  def whileloop = {
    val arr = Array[Int](1,2,3)
    var idx = 0
    while(idx < arr.length) {
      idx += 1
    }
  }
}
Compiled from "ForWhile.scala"
public final class ForWhile {
  public static void whileloop();
    Code:
       0: getstatic     #16                 // Field ForWhile$.MODULE$:LForWhile$;
       3: invokevirtual #18                 // Method ForWhile$.whileloop:()V
       6: return

  public static void forloop();
    Code:
       0: getstatic     #16                 // Field ForWhile$.MODULE$:LForWhile$;
       3: invokevirtual #21                 // Method ForWhile$.forloop:()V
       6: return
}

Compiled from "ForWhile.scala"
public final class ForWhile$ {
  public static final ForWhile$ MODULE$;

  public static {};
    Code:
       0: new           #2                  // class ForWhile$
       3: invokespecial #12                 // Method "<init>":()V
       6: return

  public void forloop();
    Code:
       0: getstatic     #18                 // Field scala/Array$.MODULE$:Lscala/Array$;
       3: getstatic     #23                 // Field scala/Predef$.MODULE$:Lscala/Predef$;
       6: iconst_3
       7: newarray       int
       9: dup
      10: iconst_0
      11: iconst_1
      12: iastore
      13: dup
      14: iconst_1
      15: iconst_2
      16: iastore
      17: dup
      18: iconst_2
      19: iconst_3
      20: iastore
      21: invokevirtual #27                 // Method scala/Predef$.wrapIntArray:([I)Lscala/collection/mutable/WrappedArray;
      24: getstatic     #32                 // Field scala/reflect/ClassTag$.MODULE$:Lscala/reflect/ClassTag$;
      27: invokevirtual #36                 // Method scala/reflect/ClassTag$.Int:()Lscala/reflect/ClassTag;
      30: invokevirtual #40                 // Method scala/Array$.apply:(Lscala/collection/Seq;Lscala/reflect/ClassTag;)Ljava/lang/Object;
      33: checkcast     #42                 // class "[I"
      36: astore_1
      37: getstatic     #23                 // Field scala/Predef$.MODULE$:Lscala/Predef$;
      40: aload_1
      41: invokevirtual #46                 // Method scala/Predef$.intArrayOps:([I)Lscala/collection/mutable/ArrayOps;
      44: new           #48                 // class ForWhile$$anonfun$forloop$1
      47: dup
      48: invokespecial #49                 // Method ForWhile$$anonfun$forloop$1."<init>":()V
      51: invokeinterface #55,  2           // InterfaceMethod scala/collection/mutable/ArrayOps.foreach:(Lscala/Function1;)V
      56: return

  public void whileloop();
    Code:
       0: getstatic     #18                 // Field scala/Array$.MODULE$:Lscala/Array$;
       3: getstatic     #23                 // Field scala/Predef$.MODULE$:Lscala/Predef$;
       6: iconst_3
       7: newarray       int
       9: dup
      10: iconst_0
      11: iconst_1
      12: iastore
      13: dup
      14: iconst_1
      15: iconst_2
      16: iastore
      17: dup
      18: iconst_2
      19: iconst_3
      20: iastore
      21: invokevirtual #27                 // Method scala/Predef$.wrapIntArray:([I)Lscala/collection/mutable/WrappedArray;
      24: getstatic     #32                 // Field scala/reflect/ClassTag$.MODULE$:Lscala/reflect/ClassTag$;
      27: invokevirtual #36                 // Method scala/reflect/ClassTag$.Int:()Lscala/reflect/ClassTag;
      30: invokevirtual #40                 // Method scala/Array$.apply:(Lscala/collection/Seq;Lscala/reflect/ClassTag;)Ljava/lang/Object;
      33: checkcast     #42                 // class "[I"
      36: astore_1
      37: iconst_0
      38: istore_2
      39: iload_2
      40: aload_1
      41: arraylength
      42: if_icmpge     52
      45: iload_2
      46: iconst_1
      47: iadd
      48: istore_2
      49: goto          39
      52: return
}

Compiled from "ForWhile.scala"
public final class ForWhile$$anonfun$forloop$1 extends scala.runtime.AbstractFunction1$mcVI$sp implements scala.Serializable {
  public static final long serialVersionUID;

  public final void apply(int);
    Code:
       0: aload_0
       1: iload_1
       2: invokevirtual #21                 // Method apply$mcVI$sp:(I)V
       5: return

  public void apply$mcVI$sp(int);
    Code:
       0: return

  public final java.lang.Object apply(java.lang.Object);
    Code:
       0: aload_0
       1: aload_1
       2: invokestatic  #32                 // Method scala/runtime/BoxesRunTime.unboxToInt:(Ljava/lang/Object;)I
       5: invokevirtual #34                 // Method apply:(I)V
       8: getstatic     #40                 // Field scala/runtime/BoxedUnit.UNIT:Lscala/runtime/BoxedUnit;
      11: areturn

  public ForWhile$$anonfun$forloop$1();
    Code:
       0: aload_0
       1: invokespecial #45                 // Method scala/runtime/AbstractFunction1$mcVI$sp."<init>":()V
       4: return
}

private val toResultRow: (Option[InternalRow], () => UTF8String) => InternalRow = {
if (corruptFieldIndex.isDefined) {
(row, badRecord) => {
for ((f, i) <- actualSchema.zipWithIndex) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
}

def footer: String = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems here the warnings are not being printed for both. Should we maybe consider printing shorten versions of messages for CSV and JSON?

throw BadRecordException(
() => getCurrentInput(),
getPartialResult,
new RuntimeException("Malformed CSV record"))
Copy link
Member

@HyukjinKwon HyukjinKwon Mar 17, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. So, BadRecordException contains the information about which record is malformed so that users can access to and check this. Did I understand correctly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

private val actualSchema = StructType(schema.filterNot(_.name == columnNameOfCorruptRecord))
private val resultRow = new GenericInternalRow(schema.length)

private val toResultRow: (Option[InternalRow], () => UTF8String) => InternalRow = {
Copy link
Member

@HyukjinKwon HyukjinKwon Mar 17, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, we should explain what happens within toResultRowwith some comments. This might also be okay to be included in the follow-up.


case class BadRecordException(
record: () => UTF8String,
partialResult: () => Option[InternalRow],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could leave some comments for partialResult too.

@SparkQA
Copy link

SparkQA commented Mar 20, 2017

Test build #74857 has started for PR 17315 at commit aa6736f.

error = function(e) { stop(e) }),
paste0(".*(java.lang.NumberFormatException: For input string:).*"))
s <- collect(select(df, from_json(df$col, schema2)))
expect_equal(s[[1]][[1]], NA)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also sounds a behavior change. Could you add another test case here to trigger the exception?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or a bug fix?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea it's a minor bug fix, see cloud-fan#4

I'm not sure if it worth a ticket.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uh, I see.

val parser = new FailureSafeParser[String](
input => rawParser.parse(input, createParser, UTF8String.fromString),
parsedOptions.parseMode,
schema,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about passing dataSchema to FailureSafeParser? It can simplify the codes in FailureSafeParser

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need schema anyway, passing dataSchema only saves one line in FailureSafeParser...

import org.apache.spark.unsafe.types.UTF8String

class FailureSafeParser[IN](
func: IN => Seq[InternalRow],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename it? rawParser?

resultRow
}
} else {
(row, badRecord) => row.getOrElse(nullResult)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: (row, badRecord) -> (row, _)

corruptFieldIndex.foreach { corrFieldIndex =>
require(schema(corrFieldIndex).dataType == StringType)
require(schema(corrFieldIndex).nullable)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The above checking sounds missing in the new codes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a sanity check, actually this check is already done in DataFrameReader.csv/json and JsonFileFormat/CSVFileFormat

@gatorsmile
Copy link
Member

retest this please

StructType(dataSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)),
StructType(requiredSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)),
parsedOptions)
CSVDataSource(parsedOptions).readFile(conf, file, parser, requiredSchema)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not need FailureSafeParser in this case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need, and the logic is in readFile

try {
Some(convert(checkedTokens))
for (i <- requiredSchema.indices) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(it seems this one missed. I am fine if it is new one but I am worried of the case of changing from while to for. Might be not a really big deal though).

@HyukjinKwon
Copy link
Member

Sorry, my PR got merged and made a conflict with this. These are all from me. Probably, let me try to leave some more trivial comments while it's open.

@SparkQA
Copy link

SparkQA commented Mar 20, 2017

Test build #74867 has finished for PR 17315 at commit aa6736f.

  • This patch fails Spark unit tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

For me it looks good as targrted.

@cloud-fan
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Mar 20, 2017

Test build #74882 has finished for PR 17315 at commit 20ac52f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 20, 2017

Test build #74879 has finished for PR 17315 at commit 20ac52f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Iterator(toResultRow(e.partialResult(), e.record))
case _: BadRecordException if ParseModes.isDropMalformedMode(mode) =>
Iterator.empty
case e: BadRecordException => throw e.cause
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is FAIL_FAST_MODE, if my understanding is not wrong. Should we issue the error message including FAILFAST, like what we did before?

This is also an behavior change? If users did not correctly spell the mode string, we treated it as the PERMISSIVE mode. Now, we changed it to the FAILFAST mode.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need a test case to cover it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just did a re-check. This part in CSV is kind of messy. The codes are random without any rule. At the very beginning, we should have test cases.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ParseModes.isPermissiveMode returns true if the mode string is invalid

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uh... this is kind of tricky.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should add tests in follow-up

@@ -382,11 +383,17 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
}

verifyColumnNameOfCorruptRecord(schema, parsedOptions.columnNameOfCorruptRecord)
val dataSchema = StructType(schema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: dataSchema -> actualSchema? Be consistent what we did in the other place?

(row, badRecord) => {
var i = 0
while (i < actualSchema.length) {
val f = actualSchema(i)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: f -> from

@@ -435,14 +442,20 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
}

verifyColumnNameOfCorruptRecord(schema, parsedOptions.columnNameOfCorruptRecord)
val dataSchema = StructType(schema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: dataSchema -> actualSchema? Be consistent what we did in the other place?

// specified in `columnNameOfCorruptRecord`. The raw input is retrieved by this method.
private def getCurrentInput(): String = tokenizer.getContext.currentParsedContent().stripLineEnd
// Retrieve the raw record string.
private def getCurrentInput(): UTF8String = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: getCurrentInput() -> getCurrentInput

tokens ++ new Array[String](dataSchema.length - tokens.length)
} else {
tokens.take(dataSchema.length)
def getPartialResult(): Option[InternalRow] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: getPartialResult() -> getPartialResult

@gatorsmile
Copy link
Member

LGTM except a major comment about FAIL_FAST_MODE.

Looks great! Glad to see such a code clean!

@SparkQA
Copy link

SparkQA commented Mar 21, 2017

Test build #74918 has finished for PR 17315 at commit adf7d33.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

Thanks! Merging to master.

@HyukjinKwon
Copy link
Member

@cloud-fan, do you mind if I work on a followup if you are currently not?

@cloud-fan
Copy link
Contributor Author

@HyukjinKwon yes please go ahead!

@hhamalai
Copy link

hhamalai commented Apr 6, 2017

Just to clarify, with these changes in place, should the corrupted record field be visible when the data contains errors? To me the existing behaviour is still the same as it used to be, so no corrupted record field present in the resulting rows.

@HyukjinKwon
Copy link
Member

Currently, the corrupted record field should be set explicitly if I haven't missed some changes in the related code path. Please refer the test here -

// If `schema` has `columnNameOfCorruptRecord`, it should handle corrupt records
val columnNameOfCorruptRecord = "_unparsed"
val schemaWithCorrField1 = schema.add(columnNameOfCorruptRecord, StringType)
val df2 = spark
.read
.option("mode", "Permissive")
.option("columnNameOfCorruptRecord", columnNameOfCorruptRecord)
.option("wholeFile", wholeFile)
.schema(schemaWithCorrField1)
.csv(testFile(valueMalformedFile))

This PR is about restructuring the logics about handling corrupt records so ideally this does not introduce a behaviour change.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants