Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DRILL-4653.json - Malformed JSON should not stop the entire query from progressing #518

Closed
wants to merge 9 commits into from
Expand Up @@ -135,6 +135,9 @@ public interface ExecConstants {
BooleanValidator JSON_EXTENDED_TYPES = new BooleanValidator("store.json.extended_types", false);
BooleanValidator JSON_WRITER_UGLIFY = new BooleanValidator("store.json.writer.uglify", false);
BooleanValidator JSON_WRITER_SKIPNULLFIELDS = new BooleanValidator("store.json.writer.skip_null_fields", true);
String JSON_READER_SKIP_MALFORMED_RECORDS_FLAG = "store.json.reader.skip_malformed_records";

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you change this to 'skip_invalid_records' such that the name is somewhat consistent with the future similar option in DRILL-3764. In the future the json option would likely be subsumed by the new global option.

BooleanValidator JSON_SKIP_MALFORMED_RECORDS_VALIDATOR = new BooleanValidator(JSON_READER_SKIP_MALFORMED_RECORDS_FLAG, false);


DoubleValidator TEXT_ESTIMATED_ROW_SIZE = new RangeDoubleValidator(
"store.text.estimated_row_size_bytes", 1, Long.MAX_VALUE, 100.0);
Expand Down
Expand Up @@ -101,6 +101,7 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
ExecConstants.JSON_WRITER_UGLIFY,
ExecConstants.JSON_WRITER_SKIPNULLFIELDS,
ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE_VALIDATOR,
ExecConstants.JSON_SKIP_MALFORMED_RECORDS_VALIDATOR,
ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL_VALIDATOR,
ExecConstants.MONGO_READER_ALL_TEXT_MODE_VALIDATOR,
ExecConstants.MONGO_READER_READ_NUMBERS_AS_DOUBLE_VALIDATOR,
Expand Down
Expand Up @@ -64,6 +64,8 @@ public class JSONRecordReader extends AbstractRecordReader {
private final boolean enableAllTextMode;
private final boolean readNumbersAsDouble;
private final boolean unionEnabled;
private int parseErrorCount;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be long instead of int since parseErrorCount is cumulative..so in the worst case it could be as large as runningRecordCount.

private final boolean skipMalformedJSONRecords;

/**
* Create a JSON Record Reader that uses a file based input stream.
Expand Down Expand Up @@ -114,6 +116,7 @@ private JSONRecordReader(final FragmentContext fragmentContext, final String inp
this.enableAllTextMode = embeddedContent == null && fragmentContext.getOptions().getOption(ExecConstants.JSON_READER_ALL_TEXT_MODE_VALIDATOR);
this.readNumbersAsDouble = embeddedContent == null && fragmentContext.getOptions().getOption(ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE_VALIDATOR);
this.unionEnabled = embeddedContent == null && fragmentContext.getOptions().getOption(ExecConstants.ENABLE_UNION_TYPE);
this.skipMalformedJSONRecords = fragmentContext.getOptions().getOption(ExecConstants.JSON_SKIP_MALFORMED_RECORDS_VALIDATOR);
setColumns(columns);
}

Expand All @@ -122,7 +125,8 @@ public String toString() {
return super.toString()
+ "[hadoopPath = " + hadoopPath
+ ", recordCount = " + recordCount
+ ", runningRecordCount = " + runningRecordCount + ", ...]";
+ ", parseErrorCount = " + parseErrorCount
+ ", runningRecordCount = " + runningRecordCount + ", ...]";
}

@Override
Expand Down Expand Up @@ -189,26 +193,33 @@ private long currentRecordNumberInFile() {
public int next() {
writer.allocate();
writer.reset();

recordCount = 0;
ReadState write = null;
// Stopwatch p = new Stopwatch().start();
try{
outside: while(recordCount < DEFAULT_ROWS_PER_BATCH) {
writer.setPosition(recordCount);
write = jsonReader.write(writer);

if(write == ReadState.WRITE_SUCCEED) {
// try

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove these commented lines

// {
outside: while(recordCount < DEFAULT_ROWS_PER_BATCH){
try{
writer.setPosition(recordCount);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems this is still doing indent of 4. We use 2 spaces (see https://drill.apache.org/docs/apache-drill-contribution-guidelines/ scroll down to Step 2). Did it pass the mvn command line build without checkstyle violations ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aman,
maven checkstyle:checkstyle did not report any errors before I did my last
check in. I have changed to reflect 2 spaces for indendation.

On Thu, Jun 16, 2016 at 2:22 PM, Aman Sinha notifications@github.com
wrote:

In
exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
#518 (comment):

  •  outside: while(recordCount < DEFAULT_ROWS_PER_BATCH) {
    
  •    writer.setPosition(recordCount);
    

- write = jsonReader.write(writer);

  •    if(write == ReadState.WRITE_SUCCEED) {
    
    -// logger.debug("Wrote record.");
  •      recordCount++;
    
  •    }else{
    
    -// logger.debug("Exiting.");
  •      break outside;
    

- }

  • outside: while(recordCount < DEFAULT_ROWS_PER_BATCH){
  • try
  •  {
    
  •        writer.setPosition(recordCount);
    

seems this is still doing indent of 4. We use 2 spaces (see
https://drill.apache.org/docs/apache-drill-contribution-guidelines/
scroll down to Step 2). Did it pass the mvn command line build without
checkstyle violations ?


You are receiving this because you commented.
Reply to this email directly, view it on GitHub
https://github.com/apache/drill/pull/518/files/56a16fe3f2bbd1554d65cef6faeaeb63ba41f9a2#r67426381,
or mute the thread
https://github.com/notifications/unsubscribe/ABsaHw_13TKQS-3mIEDhxkCiglpCBA3Sks5qMb6IgaJpZM4IzZkt
.

write = jsonReader.write(writer);
if(write == ReadState.WRITE_SUCCEED) {
// logger.debug("Wrote record.");
recordCount++;
}else{
recordCount++;
}else{
// logger.debug("Exiting.");
break outside;
}

break outside;
}
}
catch(Exception ex)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor style convention: can you put the catch() on the previous line to match the closing paren

{
if(skipMalformedJSONRecords == false){
handleAndRaise("Error parsing JSON", ex);
}
++parseErrorCount;
}
}

jsonReader.ensureAtLeastOneField(writer);
jsonReader.ensureAtLeastOneField(writer);

writer.setValueCount(recordCount);
// p.stop();
Expand All @@ -217,11 +228,11 @@ public int next() {
updateRunningCount();
return recordCount;

} catch (final Exception e) {
handleAndRaise("Error parsing JSON", e);
}
// } catch (final Exception e) {
// handleAndRaise("Error parsing JSON", e);
// }
// this is never reached
return 0;
//return 0;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uncomment this (best practice since function has a return type)

}

private void updateRunningCount() {
Expand Down
Expand Up @@ -20,6 +20,7 @@
import org.apache.drill.BaseTestQuery;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.ExecConstants;
import org.junit.Test;
import org.junit.Assert;

Expand Down Expand Up @@ -179,4 +180,27 @@ public void testNestedFilter() throws Exception {
.sqlBaselineQuery(baselineQuery)
.go();
}

@Test // See DRILL-4653
public void testSkippingInvalidJSONRecords() throws Exception {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For both these tests could you pls use the testBuilder() framework ? This is the recommended way to write the unit tests .. you can see one of the other tests in this file.

String set = "alter session set `" + ExecConstants.JSON_READER_SKIP_MALFORMED_RECORDS_FLAG+ "` = true";
testNoResult(set);
test("select count(*) from cp.`jsoninput/DRILL-4653.json`");
set = "alter session set `" + ExecConstants.JSON_READER_SKIP_MALFORMED_RECORDS_FLAG+ "` = false";
testNoResult(set);
}

@Test // See DRILL-4653
public void testNotSkippingInvalidJSONRecords() throws Exception {
try
{
test("select count(*) from cp.`jsoninput/DRILL-4653.json`");
}
catch(Exception ex)
{
// do nothing just return
return;
}
throw new Exception("testNotSkippingInvalidJSONRecords");
}
}
40 changes: 40 additions & 0 deletions exec/java-exec/src/test/resources/jsoninput/DRILL-4653.json
@@ -0,0 +1,40 @@
{ "integer" : 2010,
"float" : 17.4,
"x": {
"y": "kevin",
"z": "paul"
},
"z": [
{"orange" : "yellow" , "pink": "red"},
{"pink" : "purple" }
],
"l": [4,2],
"rl": [ [2,1], [4,6] ]
}
{ "integer : -2002,
"float" : -1.2
}
{ "integer" : 2001,
"float" : 1.2,
"x": {
"y": "bill",
"z": "peter"
},
"z": [
{"pink" : "lilac" }
],
"l": [4,2],
"rl": [ [2,1], [4,6] ]
}
{ "integer" : 6005,
"float" : 1.2,
"x": {
"y": "mike",
"z": "mary"
},
"z": [
{"orange" : "stucco" }
],
"l": [4,2],
"rl": [ [2,1], [4,6] ]
}