From ce3591ca038ee79e51b74e5d9a209633682d8af1 Mon Sep 17 00:00:00 2001 From: Serhii-Harnyk Date: Thu, 8 Sep 2016 18:11:37 +0000 Subject: [PATCH] DRILL-4842: SELECT * on JSON data results in NumberFormatException --- .../exec/vector/complex/fn/JsonReader.java | 86 ++++++++++++--- .../java/org/apache/drill/TestCTASJson.java | 21 ++-- .../org/apache/drill/TestStarQueries.java | 6 +- .../vector/complex/writer/TestJsonReader.java | 102 +++++++++++++++++- 4 files changed, 184 insertions(+), 31 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java index 9848ae76819..3b8fff0e5d5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -17,12 +17,15 @@ */ package org.apache.drill.exec.vector.complex.fn; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Maps; import io.netty.buffer.DrillBuf; import java.io.IOException; import java.io.InputStream; import java.util.BitSet; import java.util.List; +import java.util.Map; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.expression.PathSegment; @@ -55,6 +58,7 @@ public class JsonReader extends BaseJsonProcessor { private final ListVectorOutput listOutput; private final boolean extended = true; private final boolean readNumbersAsDouble; + private List path = Lists.newArrayList(); /** * Collection for tracking empty array writers during reading @@ -62,6 +66,12 @@ public class JsonReader extends BaseJsonProcessor { */ private final List emptyArrayWriters = Lists.newArrayList(); + /** + * Collection for tracking nullable fields during reading + * and storing them for creating default typed vectors + */ + private Map, MapWriter> fieldPathWriter = Maps.newHashMap(); + /** * Describes whether or not this reader can unwrap a single root array record * and treat it like a set of distinct records. @@ -149,11 +159,15 @@ public void ensureAtLeastOneField(ComplexWriter writer) { for (int j = 0; j < fieldPathList.size(); j++) { BaseWriter.MapWriter fieldWriter = writerList.get(j); PathSegment fieldPath = fieldPathList.get(j); - if (emptyStatus.get(j)) { - if (allTextMode) { - fieldWriter.varChar(fieldPath.getNameSegment().getPath()); - } else { - fieldWriter.integer(fieldPath.getNameSegment().getPath()); + if (emptyStatus.get(j) && !checkNullFields(fieldPathList)) { + initializeFieldWriter(fieldWriter, fieldPath.getNameSegment().getPath()); + } + } + + if (checkNullFields(fieldPathList)) { + for (Map.Entry, MapWriter> fieldPath : fieldPathWriter.entrySet()) { + if(fieldPath.getValue() != null) { + initializeFieldWriter(fieldPath.getValue(), fieldPath.getKey().get(fieldPath.getKey().size() - 1)); } } } @@ -170,6 +184,21 @@ public void ensureAtLeastOneField(ComplexWriter writer) { } } + private void initializeFieldWriter(MapWriter fieldWriter, String path) { + if (allTextMode) { + fieldWriter.varChar(path); + } else { + fieldWriter.integer(path); + } + } + + /** + * Check query having a '*' and existing nullable fields in result + */ + private boolean checkNullFields(List fieldPathList) { + return (fieldPathList.size() == 1) && fieldPathList.get(0).getNameSegment().getPath().equals("*") && !fieldPathWriter.isEmpty(); + } + public void setSource(int start, int end, DrillBuf buf) throws IOException { setSource(DrillBufInputStream.getStream(start, end, buf)); } @@ -229,6 +258,7 @@ public ReadState write(ComplexWriter writer) throws IOException { } } catch (com.fasterxml.jackson.core.JsonParseException ex) { if (ignoreJSONParseError()) { + path.clear(); if (processJSONException() == JsonExceptionProcessingState.END_OF_STREAM) { return ReadState.JSON_RECORD_PARSE_EOF_ERROR; } else { @@ -367,12 +397,19 @@ private void writeData(MapWriter map, FieldSelection selection, t = parser.getCurrentToken(); moveForward = true; } - if (t == JsonToken.NOT_AVAILABLE || t == JsonToken.END_OBJECT) { + + if (t == JsonToken.NOT_AVAILABLE) { + return; + } + + if (t == JsonToken.END_OBJECT) { + if (path.size() > 0) { + path.remove(path.size() - 1); + } return; } - assert t == JsonToken.FIELD_NAME : String.format( - "Expected FIELD_NAME but got %s.", t.name()); + assert t == JsonToken.FIELD_NAME : String.format("Expected FIELD_NAME but got %s.", t.name()); final String fieldName = parser.getText(); this.currentFieldName = fieldName; @@ -388,6 +425,7 @@ private void writeData(MapWriter map, FieldSelection selection, break; case START_OBJECT: if (!writeMapDataIfTyped(map, fieldName)) { + path.add(fieldName); writeData(map.map(fieldName), childSelection, false); } break; @@ -403,7 +441,7 @@ private void writeData(MapWriter map, FieldSelection selection, break; } case VALUE_NULL: - // do nothing as we don't have a type. + putFieldPath(fieldName, map); break; case VALUE_NUMBER_FLOAT: map.float8(fieldName).writeFloat8(parser.getDoubleValue()); @@ -446,12 +484,19 @@ private void writeDataAllText(MapWriter map, FieldSelection selection, t = parser.getCurrentToken(); moveForward = true; } - if (t == JsonToken.NOT_AVAILABLE || t == JsonToken.END_OBJECT) { + + if (t == JsonToken.NOT_AVAILABLE) { return; } - assert t == JsonToken.FIELD_NAME : String.format( - "Expected FIELD_NAME but got %s.", t.name()); + if (t == JsonToken.END_OBJECT) { + if (path.size() > 0) { + path.remove(path.size() - 1); + } + return; + } + + assert t == JsonToken.FIELD_NAME : String.format("Expected FIELD_NAME but got %s.", t.name()); final String fieldName = parser.getText(); this.currentFieldName = fieldName; @@ -467,6 +512,7 @@ private void writeDataAllText(MapWriter map, FieldSelection selection, break; case START_OBJECT: if (!writeMapDataIfTyped(map, fieldName)) { + path.add(fieldName); writeDataAllText(map.map(fieldName), childSelection, false); } break; @@ -482,7 +528,7 @@ private void writeDataAllText(MapWriter map, FieldSelection selection, handleString(parser, map, fieldName); break; case VALUE_NULL: - // do nothing as we don't have a type. + putFieldPath(fieldName, map); break; default: @@ -492,7 +538,19 @@ private void writeDataAllText(MapWriter map, FieldSelection selection, } } map.end(); + } + /** + * Puts copy of field path list to fieldPathWriter map. + * @param fieldName + */ + private void putFieldPath(String fieldName, MapWriter map) { + path.add(fieldName); + if (!fieldPathWriter.containsKey(path)) { + List fieldPath = ImmutableList.copyOf(path); + fieldPathWriter.put(fieldPath, map); + } + path.remove(path.size() - 1); } /** diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestCTASJson.java b/exec/java-exec/src/test/java/org/apache/drill/TestCTASJson.java index c76892d4b74..33e85ec5e3d 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/TestCTASJson.java +++ b/exec/java-exec/src/test/java/org/apache/drill/TestCTASJson.java @@ -19,11 +19,8 @@ import org.apache.drill.common.util.TestTools; -import org.apache.drill.exec.ExecConstants; import org.junit.Test; -import static org.junit.Assert.assertEquals; - public class TestCTASJson extends PlanTestBase { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestCTASJson.class); @@ -38,6 +35,8 @@ public class TestCTASJson extends PlanTestBase { public void testctas_alltypes_map() throws Exception { String testName = "ctas_alltypes_map"; test("use dfs_test.tmp"); + test("alter session set store.json.writer.skip_null_fields = true"); + test("alter session set store.format = 'json' "); test("create table " + testName + "_json as select * from cp.`json/" + testName + ".json`"); final String query = "select * from `" + testName + "_json` t1 "; @@ -47,8 +46,6 @@ public void testctas_alltypes_map() throws Exception { .sqlQuery(query) .ordered() .jsonBaselineFile("json/" + testName + ".json") - .optionSettingQueriesForTestQuery("alter session set store.format = 'json' ") - .optionSettingQueriesForTestQuery("alter session set store.json.writer.skip_null_fields = true") // DEFAULT .build() .run(); } finally { @@ -66,6 +63,8 @@ public void testctas_alltypes_map() throws Exception { public void testctas_alltypes_map_noskip() throws Exception { String testName = "ctas_alltypes_map"; test("use dfs_test.tmp"); + test("alter session set store.json.writer.skip_null_fields = false"); + test("alter session set store.format = 'json' "); test("create table " + testName + "_json as select * from cp.`json/" + testName + ".json`"); final String query = "select * from `" + testName + "_json` t1 "; @@ -75,8 +74,6 @@ public void testctas_alltypes_map_noskip() throws Exception { .sqlQuery(query) .ordered() .jsonBaselineFile("json/" + testName + "_out.json") - .optionSettingQueriesForTestQuery("alter session set store.format = 'json' ") - .optionSettingQueriesForTestQuery("alter session set store.json.writer.skip_null_fields = false") // change from DEFAULT .build() .run(); } finally{ @@ -95,6 +92,8 @@ public void testctas_alltypes_map_noskip() throws Exception { public void testctas_alltypes_repeatedmap() throws Exception { String testName = "ctas_alltypes_repeated_map"; test("use dfs_test.tmp"); + test("alter session set store.json.writer.skip_null_fields = true"); + test("alter session set store.format = 'json' "); test("create table " + testName + "_json as select * from cp.`json/" + testName + ".json`"); final String query = "select * from `" + testName + "_json` t1 "; @@ -104,9 +103,6 @@ public void testctas_alltypes_repeatedmap() throws Exception { .sqlQuery(query) .ordered() .jsonBaselineFile("json/" + testName + ".json") - .optionSettingQueriesForTestQuery("alter session set store.format = 'json' ") - .optionSettingQueriesForTestQuery( - "alter session set store.json.writer.skip_null_fields = true") // DEFAULT .build() .run(); }finally{ @@ -125,6 +121,8 @@ public void testctas_alltypes_repeatedmap() throws Exception { public void testctas_alltypes_repeated_map_noskip() throws Exception { String testName = "ctas_alltypes_repeated_map"; test("use dfs_test.tmp"); + test("alter session set store.json.writer.skip_null_fields = false"); + test("alter session set store.format = 'json' "); test("create table " + testName + "_json as select * from cp.`json/" + testName + ".json`"); final String query = "select * from `" + testName + "_json` t1 "; @@ -134,9 +132,6 @@ public void testctas_alltypes_repeated_map_noskip() throws Exception { .sqlQuery(query) .ordered() .jsonBaselineFile("json/" + testName + "_out.json") - .optionSettingQueriesForTestQuery("alter session set store.format = 'json' ") - .optionSettingQueriesForTestQuery( - "alter session set store.json.writer.skip_null_fields = false") // change from DEFAULT .build() .run(); } finally { diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestStarQueries.java b/exec/java-exec/src/test/java/org/apache/drill/TestStarQueries.java index b9dcacd8d22..a3006eff1b7 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/TestStarQueries.java +++ b/exec/java-exec/src/test/java/org/apache/drill/TestStarQueries.java @@ -126,7 +126,7 @@ public void testSelStarOrderBy() throws Exception{ .ordered() .sqlQuery(" select * from cp.`employee.json` order by last_name") .sqlBaselineQuery(" select employee_id, full_name,first_name,last_name,position_id,position_title,store_id," + - " department_id,birth_date,hire_date,salary,supervisor_id,education_level,marital_status,gender,management_role " + + " department_id,birth_date,hire_date,salary,supervisor_id,education_level,marital_status,gender,management_role,end_date " + " from cp.`employee.json` " + " order by last_name ") .build().run(); @@ -139,7 +139,7 @@ public void testSelStarOrderByLimit() throws Exception{ .ordered() .sqlQuery(" select * from cp.`employee.json` order by last_name limit 2") .sqlBaselineQuery(" select employee_id, full_name,first_name,last_name,position_id,position_title,store_id," + - " department_id,birth_date,hire_date,salary,supervisor_id,education_level,marital_status,gender,management_role " + + " department_id,birth_date,hire_date,salary,supervisor_id,education_level,marital_status,gender,management_role,end_date " + " from cp.`employee.json` " + " order by last_name limit 2") .build().run(); @@ -162,7 +162,7 @@ public void testSelStarWhereOrderBy() throws Exception{ .ordered() .sqlQuery("select * from cp.`employee.json` where first_name = 'James' order by last_name") .sqlBaselineQuery("select employee_id, full_name,first_name,last_name,position_id,position_title,store_id," + - " department_id,birth_date,hire_date,salary,supervisor_id,education_level,marital_status,gender,management_role " + + " department_id,birth_date,hire_date,salary,supervisor_id,education_level,marital_status,gender,management_role,end_date " + " from cp.`employee.json` " + " where first_name = 'James' order by last_name") .build().run(); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java index 78c2c4ca271..889274718c5 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.vector.complex.writer; +import static org.apache.commons.io.FileUtils.deleteQuietly; import static org.apache.drill.TestBuilder.listOf; import static org.apache.drill.TestBuilder.mapOf; import static org.junit.Assert.assertEquals; @@ -37,6 +38,7 @@ import org.apache.drill.BaseTestQuery; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.util.FileUtils; +import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.proto.UserBitShared; import org.apache.drill.exec.record.RecordBatchLoader; @@ -721,4 +723,102 @@ public void testFlattenEmptyArrayWithUnionType() throws Exception { testNoResult("alter session reset `exec.enable_union_type`"); } } + + @Test // DRILL-4842 + public void testSelectStarWithAllTextMode() throws Exception { + File path = new File(BaseTestQuery.getTempDir("json/input")); + path.mkdirs(); + String pathString = path.toPath().toString(); + + try (BufferedWriter writer = new BufferedWriter(new FileWriter(new File(path, "tooManyNulls.json")))) { + for (int i = 0; i < JSONRecordReader.DEFAULT_ROWS_PER_BATCH; i++) { + writer.write("{ \"c1\" : null }\n"); + } + writer.write("{ \"c1\" : \"Hello World\" }\n"); + } + + try (BufferedWriter writer = new BufferedWriter(new FileWriter(new File(path, "json_null_fields.json")))) { + writer.write("{ \"c1\" : null, \"c2\" : null, \"c3\" : null }"); + } + + try (BufferedWriter writer = new BufferedWriter(new FileWriter(new File(path, "json_nested_null_fields.json")))) { + writer.write("{\"c0\":{\"c11\": \"I am not NULL\", \"c1\": null}, \"c1\": \"I am not NULL\", \"c11\":null}"); + } + + try { + testNoResult("alter session set `store.json.all_text_mode` = true"); + testBuilder() + .sqlQuery(String.format("select c1 from dfs_test.`%s/tooManyNulls.json` WHERE c1 IN ('Hello World')", pathString)) + .unOrdered() + .baselineColumns("c1") + .baselineValues("Hello World") + .go(); + + testBuilder() + .sqlQuery(String.format("select * from dfs_test.`%s/tooManyNulls.json` WHERE c1 IN ('Hello World')", pathString)) + .unOrdered() + .baselineColumns("c1") + .baselineValues("Hello World") + .go(); + + testBuilder() + .sqlQuery(String.format("select * from dfs_test.`%s/tooManyNulls.json` WHERE c1 is not null", pathString)) + .unOrdered() + .baselineColumns("c1") + .baselineValues("Hello World") + .go(); + + testBuilder() + .sqlQuery(String.format("select * from dfs_test.`%s/json_nested_null_fields.json`", pathString)) + .unOrdered() + .sqlBaselineQuery(String.format("select c0,c1,c11 from dfs_test.`%s/json_nested_null_fields.json`", pathString)) + .go(); + + testNoResult("alter session set `store.json.all_text_mode` = false"); + + testBuilder() + .sqlQuery(String.format("select * from dfs_test.`%s/json_null_fields.json`", pathString)) + .unOrdered() + .baselineColumns("c1", "c2", "c3") + .baselineValues(null, null, null) + .go(); + + } finally { + testNoResult("alter session reset `store.json.all_text_mode`"); + deleteQuietly(path); + } + } + + + /** + * See DRILL-4653 and DRILL-4842 + */ + @Test + public void testSelectStarWithAllTextModeAndInvalidRecords() throws Exception { + File path = new File(BaseTestQuery.getTempDir("json/input")); + path.mkdirs(); + String pathString = path.toPath().toString(); + + try (BufferedWriter writer = new BufferedWriter(new FileWriter(new File(path, "json_nested_null_fields_with_err.json")))) { + writer.write("{\"c11\":null, \"c2\":{\"c0\":{\"c11\": \"1\", \"c1\": {\"c0: null}}, \"c1\": \"I am not NULL\"}}" + + "{\"c11\":null, \"c2\":{\"c0\":{\"c11\": \"2\", \"c1\": {\"c0\": null}}, \"c1\": \"I am not NULL\"}}" + + "{\"c11\":null, \"c2\":{\"c0\":{\"c11\": \"3\", \"c1\": {\"c0\": null}}, \"c1\": \"I am not NULL\"}}" + + "{\"c11\":null, \"c2\":{\"c0\":{\"c11\": \"4\", \"c1\": {\"c0\": null}}, \"c1\": \"I am not NULL\"}}"); + } + + try { + testNoResult("alter session set `" + ExecConstants.JSON_ALL_TEXT_MODE + "` = true"); + + testNoResult("alter session set `" + ExecConstants.JSON_READER_SKIP_INVALID_RECORDS_FLAG + "` = true"); + + testBuilder() + .unOrdered() + .sqlQuery(String.format("select * from dfs_test.`%s/json_nested_null_fields_with_err.json` t WHERE t.c2.c0.c1.c0 IN ('Hello World')", pathString)) + .expectsEmptyResultSet(); + } finally { + testNoResult("alter session reset `" + ExecConstants.JSON_READER_SKIP_INVALID_RECORDS_FLAG + "`"); + testNoResult("alter session reset `" + ExecConstants.JSON_ALL_TEXT_MODE + "`"); + deleteQuietly(path); + } + } }