Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DRILL-4653.json - Malformed JSON should not stop the entire query from progressing #518

Closed
wants to merge 9 commits into from
Expand Up @@ -135,7 +135,10 @@ public interface ExecConstants {
BooleanValidator JSON_EXTENDED_TYPES = new BooleanValidator("store.json.extended_types", false);
BooleanValidator JSON_WRITER_UGLIFY = new BooleanValidator("store.json.writer.uglify", false);
BooleanValidator JSON_WRITER_SKIPNULLFIELDS = new BooleanValidator("store.json.writer.skip_null_fields", true);

String JSON_READER_SKIP_INVALID_RECORDS_FLAG = "store.json.reader.skip_invalid_records";
BooleanValidator JSON_SKIP_MALFORMED_RECORDS_VALIDATOR = new BooleanValidator(JSON_READER_SKIP_INVALID_RECORDS_FLAG, false);
String JSON_READER_PRINT_INVALID_RECORDS_LINE_NOS_FLAG = "store.json.reader.print_skipped_invalid_record_number";
BooleanValidator JSON_READER_PRINT_INVALID_RECORDS_LINE_NOS_FLAG_VALIDATOR = new BooleanValidator(JSON_READER_PRINT_INVALID_RECORDS_LINE_NOS_FLAG, false);
DoubleValidator TEXT_ESTIMATED_ROW_SIZE = new RangeDoubleValidator(
"store.text.estimated_row_size_bytes", 1, Long.MAX_VALUE, 100.0);

Expand Down
Expand Up @@ -26,6 +26,7 @@

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;

import org.apache.commons.collections.IteratorUtils;
import org.apache.drill.common.config.LogicalPlanPersistence;
import org.apache.drill.common.map.CaseInsensitiveMap;
Expand All @@ -35,6 +36,7 @@
import org.apache.drill.exec.compile.QueryClassLoader;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.server.options.OptionValue.OptionType;
import org.apache.drill.exec.server.options.TypeValidators.BooleanValidator;
import org.apache.drill.exec.store.sys.PersistentStore;
import org.apache.drill.exec.store.sys.PersistentStoreConfig;
import org.apache.drill.exec.store.sys.PersistentStoreProvider;
Expand Down Expand Up @@ -101,6 +103,8 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
ExecConstants.JSON_WRITER_UGLIFY,
ExecConstants.JSON_WRITER_SKIPNULLFIELDS,
ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE_VALIDATOR,
ExecConstants.JSON_SKIP_MALFORMED_RECORDS_VALIDATOR,
ExecConstants.JSON_READER_PRINT_INVALID_RECORDS_LINE_NOS_FLAG_VALIDATOR,
ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL_VALIDATOR,
ExecConstants.MONGO_READER_ALL_TEXT_MODE_VALIDATOR,
ExecConstants.MONGO_READER_READ_NUMBERS_AS_DOUBLE_VALIDATOR,
Expand Down
Expand Up @@ -20,16 +20,15 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.List;

import com.google.common.collect.Lists;

import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.physical.impl.OutputMutator;
import org.apache.drill.exec.store.AbstractRecordReader;
import org.apache.drill.exec.store.dfs.DrillFileSystem;
Expand Down Expand Up @@ -64,6 +63,10 @@ public class JSONRecordReader extends AbstractRecordReader {
private final boolean enableAllTextMode;
private final boolean readNumbersAsDouble;
private final boolean unionEnabled;
private long parseErrorCount;
private final boolean skipMalformedJSONRecords;
private final boolean printSkippedMalformedJSONRecordLineNumber;
ReadState write = null;

/**
* Create a JSON Record Reader that uses a file based input stream.
Expand Down Expand Up @@ -109,11 +112,12 @@ private JSONRecordReader(final FragmentContext fragmentContext, final String inp

this.fileSystem = fileSystem;
this.fragmentContext = fragmentContext;

// only enable all text mode if we aren't using embedded content mode.
this.enableAllTextMode = embeddedContent == null && fragmentContext.getOptions().getOption(ExecConstants.JSON_READER_ALL_TEXT_MODE_VALIDATOR);
this.readNumbersAsDouble = embeddedContent == null && fragmentContext.getOptions().getOption(ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE_VALIDATOR);
this.unionEnabled = embeddedContent == null && fragmentContext.getOptions().getOption(ExecConstants.ENABLE_UNION_TYPE);
this.skipMalformedJSONRecords = fragmentContext.getOptions().getOption(ExecConstants.JSON_SKIP_MALFORMED_RECORDS_VALIDATOR);
this.printSkippedMalformedJSONRecordLineNumber = fragmentContext.getOptions().getOption(ExecConstants.JSON_READER_PRINT_INVALID_RECORDS_LINE_NOS_FLAG_VALIDATOR);
setColumns(columns);
}

Expand All @@ -122,7 +126,8 @@ public String toString() {
return super.toString()
+ "[hadoopPath = " + hadoopPath
+ ", recordCount = " + recordCount
+ ", runningRecordCount = " + runningRecordCount + ", ...]";
+ ", parseErrorCount = " + parseErrorCount
+ ", runningRecordCount = " + runningRecordCount + ", ...]";
}

@Override
Expand Down Expand Up @@ -154,6 +159,7 @@ private void setupParser() throws IOException {
}else{
jsonReader.setSource(embeddedContent);
}
jsonReader.setIgnoreJSONParseErrors(skipMalformedJSONRecords);
}

protected void handleAndRaise(String suffix, Exception e) throws UserException {
Expand Down Expand Up @@ -189,39 +195,43 @@ private long currentRecordNumberInFile() {
public int next() {
writer.allocate();
writer.reset();

recordCount = 0;
ReadState write = null;
// Stopwatch p = new Stopwatch().start();
try{
outside: while(recordCount < DEFAULT_ROWS_PER_BATCH) {
parseErrorCount = 0;
if(write == ReadState.JSON_RECORD_PARSE_EOF_ERROR){
return recordCount;
}
outside: while(recordCount < DEFAULT_ROWS_PER_BATCH){
try{
writer.setPosition(recordCount);
write = jsonReader.write(writer);

if(write == ReadState.WRITE_SUCCEED) {
// logger.debug("Wrote record.");
if(write == ReadState.WRITE_SUCCEED){
recordCount++;
}else{
// logger.debug("Exiting.");
}
else if(write == ReadState.JSON_RECORD_PARSE_ERROR || write == ReadState.JSON_RECORD_PARSE_EOF_ERROR){
if(skipMalformedJSONRecords == false){
handleAndRaise("Error parsing JSON", new Exception(hadoopPath.getName() + " : line nos :" + (recordCount+1)));
}
++parseErrorCount;
if(printSkippedMalformedJSONRecordLineNumber){
logger.debug("Error parsing JSON in " + hadoopPath.getName() + " : line nos :" + (recordCount+parseErrorCount));
}
if(write == ReadState.JSON_RECORD_PARSE_EOF_ERROR){
break outside;
}
}
else{
break outside;
}

}

jsonReader.ensureAtLeastOneField(writer);

writer.setValueCount(recordCount);
// p.stop();
// System.out.println(String.format("Wrote %d records in %dms.", recordCount, p.elapsed(TimeUnit.MILLISECONDS)));

updateRunningCount();
return recordCount;

} catch (final Exception e) {
handleAndRaise("Error parsing JSON", e);
catch(IOException ex)
{
handleAndRaise("Error parsing JSON", ex);
}
}
// this is never reached
return 0;
jsonReader.ensureAtLeastOneField(writer);
writer.setValueCount(recordCount);
updateRunningCount();
return recordCount;
}

private void updateRunningCount() {
Expand Down
Expand Up @@ -30,6 +30,8 @@ public interface JsonProcessor {

public static enum ReadState {
END_OF_STREAM,
JSON_RECORD_PARSE_ERROR,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be helpful to add a comment to describe the meaning of these new states.

JSON_RECORD_PARSE_EOF_ERROR,
WRITE_SUCCEED
}

Expand All @@ -50,4 +52,7 @@ public UserException.Builder getExceptionWithContext(Throwable exception,
String msg,
Object... args);

public boolean ignoreJSONParseError() ;

public void setIgnoreJSONParseErrors(boolean ignoreJSONParseErrors);
}
Expand Up @@ -25,20 +25,38 @@
import org.apache.drill.exec.store.easy.json.JsonProcessor;

import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.TreeTraversingParser;
import com.google.common.base.Preconditions;

import org.apache.drill.common.exceptions.UserException;

public abstract class BaseJsonProcessor implements JsonProcessor {

private static final ObjectMapper MAPPER = new ObjectMapper()
.configure(JsonParser.Feature.ALLOW_COMMENTS, true)
.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);
private static final ObjectMapper MAPPER = new ObjectMapper().configure(
JsonParser.Feature.ALLOW_COMMENTS, true).configure(
JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);

private static final String JACKSON_PARSER_EOF_FILE_MSG = "Unexpected end-of-input:";

public static enum JsonExceptionProcessingState {
END_OF_STREAM, PROC_SUCCEED
}

protected JsonParser parser;
protected DrillBuf workBuf;
protected JsonToken lastSeenJsonToken = null;
boolean ignoreJSONParseErrors = false; // default False

public boolean ignoreJSONParseError() {
return ignoreJSONParseErrors;
}

public void setIgnoreJSONParseErrors(boolean ignoreJSONParseErrors) {
this.ignoreJSONParseErrors = ignoreJSONParseErrors;
}

public BaseJsonProcessor(DrillBuf workBuf) {
workBuf = Preconditions.checkNotNull(workBuf);
Expand All @@ -55,27 +73,52 @@ public void setSource(JsonNode node) {
}

@Override
public UserException.Builder getExceptionWithContext(UserException.Builder exceptionBuilder,
String field,
String msg,
Object... args) {
public UserException.Builder getExceptionWithContext(
UserException.Builder exceptionBuilder, String field, String msg,
Object... args) {
if (msg != null) {
exceptionBuilder.message(msg, args);
}
if(field != null) {
if (field != null) {
exceptionBuilder.pushContext("Field ", field);
}
exceptionBuilder.pushContext("Column ", parser.getCurrentLocation().getColumnNr()+1)
.pushContext("Line ", parser.getCurrentLocation().getLineNr());
exceptionBuilder.pushContext("Column ",
parser.getCurrentLocation().getColumnNr() + 1).pushContext("Line ",
parser.getCurrentLocation().getLineNr());
return exceptionBuilder;
}

@Override
public UserException.Builder getExceptionWithContext(Throwable e,
String field,
String msg,
Object... args) {
String field, String msg, Object... args) {
UserException.Builder exceptionBuilder = UserException.dataReadError(e);
return getExceptionWithContext(exceptionBuilder, field, msg, args);
}

/*
* DRILL - 4653 This method processes JSON tokens until it reaches end of the
* current line when it processes start of a new JSON line { - return
* PROC_SUCCEED when it sees EOF the stream - there may not be a closing }
*/

protected JsonExceptionProcessingState processJSONException()
throws IOException {
while (!parser.isClosed()) {
try {
JsonToken currentToken = parser.nextToken();
if(currentToken == JsonToken.START_OBJECT && (lastSeenJsonToken == JsonToken.END_OBJECT || lastSeenJsonToken == null))
{
lastSeenJsonToken =currentToken;
break;
}
lastSeenJsonToken =currentToken;
} catch (com.fasterxml.jackson.core.JsonParseException ex1) {
if (ex1.getOriginalMessage().startsWith(JACKSON_PARSER_EOF_FILE_MSG)) {
return JsonExceptionProcessingState.END_OF_STREAM;
}
continue;
}
}
return JsonExceptionProcessingState.PROC_SUCCEED;
}
}
Expand Up @@ -20,8 +20,11 @@
import java.io.IOException;

import com.fasterxml.jackson.core.JsonToken;

import io.netty.buffer.DrillBuf;
import org.apache.drill.exec.store.easy.json.JsonProcessor;

import org.apache.drill.exec.store.easy.json.JsonProcessor.ReadState;
import org.apache.drill.exec.store.easy.json.reader.BaseJsonProcessor.JsonExceptionProcessingState;
import org.apache.drill.exec.vector.complex.writer.BaseWriter;

public class CountingJsonReader extends BaseJsonProcessor {
Expand All @@ -32,14 +35,39 @@ public CountingJsonReader(DrillBuf workBuf) {

@Override
public ReadState write(BaseWriter.ComplexWriter writer) throws IOException {
final JsonToken token = parser.nextToken();
if (!parser.hasCurrentToken()) {
return ReadState.END_OF_STREAM;
} else if (token != JsonToken.START_OBJECT) {
throw new IllegalStateException(String.format("Cannot read from the middle of a record. Current token was %s", token));
try {
JsonToken token = lastSeenJsonToken;
if (token == null || token == JsonToken.END_OBJECT){
token = parser.nextToken();
}
lastSeenJsonToken = null;
if (!parser.hasCurrentToken()) {
return ReadState.END_OF_STREAM;
} else if (token != JsonToken.START_OBJECT) {
throw new com.fasterxml.jackson.core.JsonParseException(
parser,
String
.format(
"Cannot read from the middle of a record. Current token was %s ",
token));
// throw new
// IllegalStateException(String.format("Cannot read from the middle of a record. Current token was %s",
// token));
}
writer.rootAsMap().bit("count").writeBit(1);
parser.skipChildren();
} catch (com.fasterxml.jackson.core.JsonParseException ex) {
if (ignoreJSONParseError()) {
if (processJSONException() == JsonExceptionProcessingState.END_OF_STREAM){
return ReadState.JSON_RECORD_PARSE_EOF_ERROR;
}
else{
return ReadState.JSON_RECORD_PARSE_ERROR;
}
} else {
throw ex;
}
}
writer.rootAsMap().bit("count").writeBit(1);
parser.skipChildren();
return ReadState.WRITE_SUCCEED;
}

Expand Down