From 2750f7f56c25e8940dc224256d2686b037549813 Mon Sep 17 00:00:00 2001 From: Rahul Raj Date: Wed, 14 Mar 2018 12:05:45 +0530 Subject: [PATCH] DRILL-6016 - Fix for Error reading INT96 created by Apache Spark --- .../columnreaders/ColumnReaderFactory.java | 7 ++- .../ParquetFixedWidthDictionaryReaders.java | 27 ++++++++++++ .../impl/writer/TestParquetWriter.java | 40 ++++++++++++------ ...k-generated-int96-timestamp.snappy.parquet | Bin 0 -> 2896 bytes .../testInt96DictChange/q1.tsv | 12 ------ 5 files changed, 59 insertions(+), 27 deletions(-) create mode 100644 exec/java-exec/src/test/resources/parquet/spark-generated-int96-timestamp.snappy.parquet delete mode 100644 exec/java-exec/src/test/resources/testframework/testParquetReader/testInt96DictChange/q1.tsv diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java index 09cdc5d5a35..ba5f1decf80 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java @@ -156,8 +156,13 @@ static ColumnReader createFixedColumnReader(ParquetRecordReader recordReader, case DOUBLE: return new ParquetFixedWidthDictionaryReaders.DictionaryFloat8Reader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (Float8Vector) v, schemaElement); case FIXED_LEN_BYTE_ARRAY: - case INT96: return new ParquetFixedWidthDictionaryReaders.DictionaryFixedBinaryReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (VarBinaryVector) v, schemaElement); + case INT96: + if (recordReader.getFragmentContext().getOptions().getOption(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP).bool_val) { + return new ParquetFixedWidthDictionaryReaders.DictionaryBinaryAsTimeStampReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (TimeStampVector) v, schemaElement); + } else { + return new ParquetFixedWidthDictionaryReaders.DictionaryFixedBinaryReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (VarBinaryVector) v, schemaElement); + } default: throw new ExecutionSetupException("Unsupported dictionary column type " + descriptor.getType().name() ); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java index 5fbac204e1d..50330465b0f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java @@ -34,6 +34,8 @@ import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.parquet.io.api.Binary; +import static org.apache.drill.exec.store.parquet.ParquetReaderUtility.NanoTimeUtils.getDateTimeValueFromBinary; + public class ParquetFixedWidthDictionaryReaders { static class DictionaryIntReader extends FixedByteAlignedReader { @@ -294,6 +296,31 @@ protected void readField(long recordsToReadInThisPass) { } } + static class DictionaryBinaryAsTimeStampReader extends FixedByteAlignedReader { + DictionaryBinaryAsTimeStampReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, + ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, TimeStampVector v, + SchemaElement schemaElement) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + } + + // this method is called by its superclass during a read loop + @Override + protected void readField(long recordsToReadInThisPass) { + + recordsReadInThisIteration = Math.min(pageReader.currentPageCount + - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass); + + for (int i = 0; i < recordsReadInThisIteration; i++){ + try { + Binary binaryTimeStampValue = pageReader.dictionaryValueReader.readBytes(); + valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, getDateTimeValueFromBinary(binaryTimeStampValue, true)); + } catch ( Exception ex) { + throw ex; + } + } + } + } + static class DictionaryFloat4Reader extends FixedByteAlignedReader { DictionaryFloat4Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, Float4Vector v, diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java index e3fc83314c1..c359e69b6dd 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java @@ -39,7 +39,6 @@ import org.apache.drill.categories.ParquetTest; import org.apache.drill.categories.SlowTest; import org.apache.drill.categories.UnlikelyTest; -import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.util.DrillVersionInfo; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.fn.interp.TestConstantFolding; @@ -780,17 +779,31 @@ public void testImpalaParquetBinaryAsVarBinary_DictChange() throws Exception { Test the reading of a binary field as drill timestamp where data is in dictionary _and_ non-dictionary encoded pages */ @Test - @Ignore("relies on particular time zone, works for UTC") public void testImpalaParquetBinaryAsTimeStamp_DictChange() throws Exception { try { testBuilder() - .sqlQuery("select int96_ts from dfs.`parquet/int96_dict_change` order by int96_ts") + .sqlQuery("select min(int96_ts) date_value from dfs.`parquet/int96_dict_change`") .optionSettingQueriesForTestQuery( "alter session set `%s` = true", ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP) .ordered() - .csvBaselineFile("testframework/testParquetReader/testInt96DictChange/q1.tsv") - .baselineTypes(TypeProtos.MinorType.TIMESTAMP) - .baselineColumns("int96_ts") + .baselineColumns("date_value") + .baselineValues(new DateTime(convertToLocalTimestamp("1970-01-01 00:00:01.000"))) + .build().run(); + } finally { + resetSessionOption(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP); + } + } + + @Test + public void testSparkParquetBinaryAsTimeStamp_DictChange() throws Exception { + try { + testBuilder() + .sqlQuery("select distinct run_date from cp.`parquet/spark-generated-int96-timestamp.snappy.parquet`") + .optionSettingQueriesForTestQuery( + "alter session set `%s` = true", ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP) + .ordered() + .baselineColumns("run_date") + .baselineValues(new DateTime(convertToLocalTimestamp("2017-12-06 16:38:43.988"))) .build().run(); } finally { resetSessionOption(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP); @@ -842,16 +855,15 @@ public void testHiveParquetTimestampAsInt96_compare() throws Exception { Test the conversion from int96 to impala timestamp with hive data including nulls. Validate against expected values */ @Test - @Ignore("relies on particular time zone") public void testHiveParquetTimestampAsInt96_basic() throws Exception { testBuilder() - .unOrdered() - .sqlQuery("SELECT cast(convert_from(timestamp_field, 'TIMESTAMP_IMPALA') as varchar(19)) as timestamp_field " - + "from cp.`parquet/part1/hive_all_types.parquet` ") - .baselineColumns("timestamp_field") - .baselineValues("2013-07-05 17:01:00") - .baselineValues((Object)null) - .go(); + .unOrdered() + .sqlQuery("SELECT convert_from(timestamp_field, 'TIMESTAMP_IMPALA') as timestamp_field " + + "from cp.`parquet/part1/hive_all_types.parquet` ") + .baselineColumns("timestamp_field") + .baselineValues(new DateTime(convertToLocalTimestamp("2013-07-06 00:01:00"))) + .baselineValues((Object)null) + .go(); } @Test diff --git a/exec/java-exec/src/test/resources/parquet/spark-generated-int96-timestamp.snappy.parquet b/exec/java-exec/src/test/resources/parquet/spark-generated-int96-timestamp.snappy.parquet new file mode 100644 index 0000000000000000000000000000000000000000..3075cecf98071cb1a7f1037f4cad3c33df920862 GIT binary patch literal 2896 zcmbVOYj6|S6}~I&%Dd9aU4vGvKqQhPJ2sWod-r+QC>h%rki#Q}F|D0Pv9JYpDuXTC z)Cpl?%Z3oZ)Pdl<2yu9X(t%DMWRjVwUWrJ{FG1L?p-QhU}P4dA$N>N2&u%<3`-?~=p%+@5rY#I zOjR9Q79$3!s%jRtP3WrFAiA#NfXEVC5G9+~Iu)Wmg0ZIALR2Ce)@(0^34^GnSMgaI zr9|ZtKB5|`p$WYTF*TcFDkzdpEW;F6~x*;U;YpNzVMJ3B56z72nRSm+OP*g&Q zPE4WRhb>dpi13jhdXti+5v&0ev2?5#B{|_SOf_N< zHeAN)nvjsN1_=|AFl`#ddre-0QtzOMZQWXSmJ^GHMc@YD5jxCQZ3u^CPdS~ zguEu1x~bxFDnd(^i3`V(UVco{2n9VrSdG|PTV41nuVSqz%2QJ>N{Sjat!1#frP}(k zfBOi~H*F64Ag30ko0eKumj^bj&MDEL7B2r(f;1UgCk3`9R6wF?n6^ zoiULqc&M+hsj^fl+%~x8WGlUCcw0;L(T4aFj_|PEDh+HIM;}D#O!vX*%l9NkCJUlM zSdsiNBD1YDM-}9GyPCWJ5)-zW&j_v=vKwN~wZ(j~q ztcuR^)X*T3c2r(!n&NqG%{#rvdV{~&v(0y3-|f!~vooto_TKvL!AnopjhrodsrGmK zO830=Ny;%4+%J`!AAT(E-!k*1Gf{{0^nuX14wIc=zW945G%04%H>01a9u>lii@DnF zZ124Jt6|x%%-~QT!a-@v+0bxb{J{CLSMK}6nwKveAFZ6CiM@QuN7G#YpZ34asQ2En zct`bMkVL%;uS~Y?tejl9Xr>aq{TZpYTfMEs$2N80#DlLd4Hqe)jUHz7$aIF^>O7h~ zu(aalJKy*c`DP4o)#HDY#D{^(?xslO%)*o9MXw($e(96Dk-DX(sZVO9b8^$?KKv{< z{_KYOu$>4;i^?iK+c+sbLhstXyztJCYa+c13I_UKF7j?WSUh7zNAJ8Hr)LHI4-cOi zNc|$&8@%Q7@%hkE?7pGN zj;Z2}RZ~BUez_*%VY3O5zY-Pv2<8Jw<}ifuVG@%zXHk#pYp4xN2V(GLF2p9+jHaq3`% zcGk?9=znzy`_SxNS6;lbxnfZGxSj-A{@nPIucPkgc7EPqKKoAFzs7e=Mhmxk7mtR0 z5B%}X*;j?0<pjT%4Sl06&r&KsYKpPaAM%-J<{HRMYy_ZT&Thi4Sd?DPT9mgu?mI#hhlXHtYjhVttu!&=q!TIv7Zsn)V z81=eY8Y@zHEv5;jlB&fln=55iEU!m&?jjz}TINxfI+()k3gsqH z!+W?P^qO}9H;k-n97lyCMuVTf3noX?jr)sXKJ)Y}0C#g2>ocRTa#bPrB+!3Bcsi7M zuc>i=x{=xAlljpQ-_JZe#@9KV&4(WaCH})WoQFevbs7IW^#1X9dTlYk+~IsIuMwYd zIFCpyaD_rs) zG%0p4+dtJiNRhL!VE&d%=p^L(+e7}rKnb7t(Kn!EXW;rvdGv3+FT4Hc1+XQQ z>qS*^F%pzrfnuHwTwnWJ_kE225CitajU;azU8C&Z6@zF{AgGjW$gYd0I#NJ6evN&+ zt1}Y^89IB?-SO;(U&q_i-6`OgQa)rhMY`|rNJlCnUESH9=I%%=vbHUqY3Yho+`TEX zKGmK#T~`at*WKIJuY>XWo=hgSCIfx|kd8&Z$BtBN+PpdTKMWAy|9|=Wd|XH9Zfj3> zb*I`pZirxQD%17j5boWKl=OPLI3~& literal 0 HcmV?d00001 diff --git a/exec/java-exec/src/test/resources/testframework/testParquetReader/testInt96DictChange/q1.tsv b/exec/java-exec/src/test/resources/testframework/testParquetReader/testInt96DictChange/q1.tsv deleted file mode 100644 index 91b9b015fab..00000000000 --- a/exec/java-exec/src/test/resources/testframework/testParquetReader/testInt96DictChange/q1.tsv +++ /dev/null @@ -1,12 +0,0 @@ -1970-01-01 00:00:01.000 -1971-01-01 00:00:01.000 -1972-01-01 00:00:01.000 -1973-01-01 00:00:01.000 -1974-01-01 00:00:01.000 -2010-01-01 00:00:01.000 -2011-01-01 00:00:01.000 -2012-01-01 00:00:01.000 -2013-01-01 00:00:01.000 -2014-01-01 00:00:01.000 -2015-01-01 00:00:01.000 -2016-01-01 00:00:01.000