/
ColumnStatsIndexSupport.scala
503 lines (440 loc) · 25 KB
/
ColumnStatsIndexSupport.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi
import org.apache.hudi.ColumnStatsIndexSupport._
import org.apache.hudi.HoodieCatalystUtils.{withPersistedData, withPersistedDataset}
import org.apache.hudi.HoodieConversionUtils.toScalaOption
import org.apache.hudi.avro.model._
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.data.HoodieData
import org.apache.hudi.common.function.SerializableFunction
import org.apache.hudi.common.model.{FileSlice, HoodieRecord}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.util.BinaryUtil.toBytes
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.common.util.collection
import org.apache.hudi.common.util.hash.ColumnIndexID
import org.apache.hudi.data.HoodieJavaRDD
import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadata, HoodieTableMetadataUtil, MetadataPartitionType}
import org.apache.hudi.util.JFunction
import org.apache.avro.Conversions.DecimalConversion
import org.apache.avro.generic.GenericData
import org.apache.spark.sql.HoodieUnsafeUtils.{createDataFrameFromInternalRows, createDataFrameFromRDD, createDataFrameFromRows}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.storage.StorageLevel
import java.nio.ByteBuffer
import scala.collection.JavaConverters._
import scala.collection.immutable.TreeSet
import scala.collection.mutable.ListBuffer
import scala.collection.parallel.mutable.ParHashMap
class ColumnStatsIndexSupport(spark: SparkSession,
tableSchema: StructType,
@transient metadataConfig: HoodieMetadataConfig,
@transient metaClient: HoodieTableMetaClient,
allowCaching: Boolean = false)
extends SparkBaseIndexSupport(spark, metadataConfig, metaClient) {
@transient private lazy val cachedColumnStatsIndexViews: ParHashMap[Seq[String], DataFrame] = ParHashMap()
// NOTE: Since [[metadataConfig]] is transient this has to be eagerly persisted, before this will be passed
// on to the executor
private val inMemoryProjectionThreshold = metadataConfig.getColumnStatsIndexInMemoryProjectionThreshold
private lazy val indexedColumns: Set[String] = {
val customIndexedColumns = metadataConfig.getColumnsEnabledForColumnStatsIndex
// Column Stats Index could index either
// - The whole table
// - Only configured columns
if (customIndexedColumns.isEmpty) {
tableSchema.fieldNames.toSet
} else {
customIndexedColumns.asScala.toSet
}
}
override def getIndexName: String = ColumnStatsIndexSupport.INDEX_NAME
override def computeCandidateFileNames(fileIndex: HoodieFileIndex,
queryFilters: Seq[Expression],
queryReferencedColumns: Seq[String],
prunedPartitionsAndFileSlices: Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])],
shouldPushDownFilesFilter: Boolean
): Option[Set[String]] = {
if (isIndexAvailable && queryFilters.nonEmpty && queryReferencedColumns.nonEmpty) {
// NOTE: Since executing on-cluster via Spark API has its own non-trivial amount of overhead,
// it's most often preferential to fetch Column Stats Index w/in the same process (usually driver),
// w/o resorting to on-cluster execution.
// For that we use a simple-heuristic to determine whether we should read and process CSI in-memory or
// on-cluster: total number of rows of the expected projected portion of the index has to be below the
// threshold (of 100k records)
val readInMemory = shouldReadInMemory(fileIndex, queryReferencedColumns, inMemoryProjectionThreshold)
val prunedFileNames = getPrunedFileNames(prunedPartitionsAndFileSlices)
// NOTE: If partition pruning doesn't prune any files, then there's no need to apply file filters
// when loading the Column Statistics Index
val prunedFileNamesOpt = if (shouldPushDownFilesFilter) Some(prunedFileNames) else None
loadTransposed(queryReferencedColumns, readInMemory, prunedFileNamesOpt) { transposedColStatsDF =>
Some(getCandidateFiles(transposedColStatsDF, queryFilters, prunedFileNames))
}
} else {
Option.empty
}
}
override def invalidateCaches(): Unit = {
cachedColumnStatsIndexViews.foreach { case (_, df) => df.unpersist() }
cachedColumnStatsIndexViews.clear()
}
/**
* Returns true in cases when Column Stats Index is built and available as standalone partition
* w/in the Metadata Table
*/
def isIndexAvailable: Boolean = {
checkState(metadataConfig.isEnabled, "Metadata Table support has to be enabled")
metaClient.getTableConfig.getMetadataPartitions.contains(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS)
}
/**
* Loads view of the Column Stats Index in a transposed format where single row coalesces every columns'
* statistics for a single file, returning it as [[DataFrame]]
*
* Please check out scala-doc of the [[transpose]] method explaining this view in more details
*/
def loadTransposed[T](targetColumns: Seq[String],
shouldReadInMemory: Boolean,
prunedFileNamesOpt: Option[Set[String]] = None)(block: DataFrame => T): T = {
cachedColumnStatsIndexViews.get(targetColumns) match {
case Some(cachedDF) =>
block(cachedDF)
case None =>
val colStatsRecords: HoodieData[HoodieMetadataColumnStats] = prunedFileNamesOpt match {
case Some(prunedFileNames) =>
val filterFunction = new SerializableFunction[HoodieMetadataColumnStats, java.lang.Boolean] {
override def apply(r: HoodieMetadataColumnStats): java.lang.Boolean = {
prunedFileNames.contains(r.getFileName)
}
}
loadColumnStatsIndexRecords(targetColumns, shouldReadInMemory).filter(filterFunction)
case None =>
loadColumnStatsIndexRecords(targetColumns, shouldReadInMemory)
}
withPersistedData(colStatsRecords, StorageLevel.MEMORY_ONLY) {
val (transposedRows, indexSchema) = transpose(colStatsRecords, targetColumns)
val df = if (shouldReadInMemory) {
// NOTE: This will instantiate a [[Dataset]] backed by [[LocalRelation]] holding all of the rows
// of the transposed table in memory, facilitating execution of the subsequently chained operations
// on it locally (on the driver; all such operations are actually going to be performed by Spark's
// Optimizer)
createDataFrameFromRows(spark, transposedRows.collectAsList().asScala.toSeq, indexSchema)
} else {
val rdd = HoodieJavaRDD.getJavaRDD(transposedRows)
spark.createDataFrame(rdd, indexSchema)
}
if (allowCaching) {
cachedColumnStatsIndexViews.put(targetColumns, df)
// NOTE: Instead of collecting the rows from the index and hold them in memory, we instead rely
// on Spark as (potentially distributed) cache managing data lifecycle, while we simply keep
// the referenced to persisted [[DataFrame]] instance
df.persist(StorageLevel.MEMORY_ONLY)
block(df)
} else {
withPersistedDataset(df) {
block(df)
}
}
}
}
}
/**
* Loads a view of the Column Stats Index in a raw format, returning it as [[DataFrame]]
*
* Please check out scala-doc of the [[transpose]] method explaining this view in more details
*/
def load(targetColumns: Seq[String] = Seq.empty, shouldReadInMemory: Boolean = false): DataFrame = {
// NOTE: If specific columns have been provided, we can considerably trim down amount of data fetched
// by only fetching Column Stats Index records pertaining to the requested columns.
// Otherwise we fallback to read whole Column Stats Index
if (targetColumns.nonEmpty) {
loadColumnStatsIndexForColumnsInternal(targetColumns, shouldReadInMemory)
} else {
loadFullColumnStatsIndexInternal()
}
}
/**
* Transposes and converts the raw table format of the Column Stats Index representation,
* where each row/record corresponds to individual (column, file) pair, into the table format
* where each row corresponds to single file with statistic for individual columns collated
* w/in such row:
*
* Metadata Table Column Stats Index format:
*
* <pre>
* +---------------------------+------------+------------+------------+-------------+
* | fileName | columnName | minValue | maxValue | num_nulls |
* +---------------------------+------------+------------+------------+-------------+
* | one_base_file.parquet | A | 1 | 10 | 0 |
* | another_base_file.parquet | A | -10 | 0 | 5 |
* +---------------------------+------------+------------+------------+-------------+
* </pre>
*
* Returned table format
*
* <pre>
* +---------------------------+------------+------------+-------------+
* | file | A_minValue | A_maxValue | A_nullCount |
* +---------------------------+------------+------------+-------------+
* | one_base_file.parquet | 1 | 10 | 0 |
* | another_base_file.parquet | -10 | 0 | 5 |
* +---------------------------+------------+------------+-------------+
* </pre>
*
* NOTE: Column Stats Index might potentially contain statistics for many columns (if not all), while
* query at hand might only be referencing a handful of those. As such, we collect all the
* column references from the filtering expressions, and only transpose records corresponding to the
* columns referenced in those
*
* @param colStatsRecords [[HoodieData[HoodieMetadataColumnStats]]] bearing raw Column Stats Index records
* @param queryColumns target columns to be included into the final table
* @return reshaped table according to the format outlined above
*/
private def transpose(colStatsRecords: HoodieData[HoodieMetadataColumnStats], queryColumns: Seq[String]): (HoodieData[Row], StructType) = {
val tableSchemaFieldMap = tableSchema.fields.map(f => (f.name, f)).toMap
// NOTE: We're sorting the columns to make sure final index schema matches layout
// of the transposed table
val sortedTargetColumnsSet = TreeSet(queryColumns:_*)
// NOTE: This is a trick to avoid pulling all of [[ColumnStatsIndexSupport]] object into the lambdas'
// closures below
val indexedColumns = this.indexedColumns
// NOTE: It's crucial to maintain appropriate ordering of the columns
// matching table layout: hence, we cherry-pick individual columns
// instead of simply filtering in the ones we're interested in the schema
val (indexSchema, targetIndexedColumns) = composeIndexSchema(sortedTargetColumnsSet.toSeq, indexedColumns, tableSchema)
// Here we perform complex transformation which requires us to modify the layout of the rows
// of the dataset, and therefore we rely on low-level RDD API to avoid incurring encoding/decoding
// penalty of the [[Dataset]], since it's required to adhere to its schema at all times, while
// RDDs are not;
val transposedRows: HoodieData[Row] = colStatsRecords
// NOTE: Explicit conversion is required for Scala 2.11
.filter(JFunction.toJavaSerializableFunction(r => sortedTargetColumnsSet.contains(r.getColumnName)))
.mapToPair(JFunction.toJavaSerializablePairFunction(r => {
if (r.getMinValue == null && r.getMaxValue == null) {
// Corresponding row could be null in either of the 2 cases
// - Column contains only null values (in that case both min/max have to be nulls)
// - This is a stubbed Column Stats record (used as a tombstone)
collection.Pair.of(r.getFileName, r)
} else {
val minValueWrapper = r.getMinValue
val maxValueWrapper = r.getMaxValue
checkState(minValueWrapper != null && maxValueWrapper != null, "Invalid Column Stats record: either both min/max have to be null, or both have to be non-null")
val colName = r.getColumnName
val colType = tableSchemaFieldMap(colName).dataType
val minValue = deserialize(tryUnpackValueWrapper(minValueWrapper), colType)
val maxValue = deserialize(tryUnpackValueWrapper(maxValueWrapper), colType)
// Update min-/max-value structs w/ unwrapped values in-place
r.setMinValue(minValue)
r.setMaxValue(maxValue)
collection.Pair.of(r.getFileName, r)
}
}))
.groupByKey()
.map(JFunction.toJavaSerializableFunction(p => {
val columnRecordsSeq: Seq[HoodieMetadataColumnStats] = p.getValue.asScala.toSeq
val fileName: String = p.getKey
val valueCount: Long = columnRecordsSeq.head.getValueCount
// To properly align individual rows (corresponding to a file) w/in the transposed projection, we need
// to align existing column-stats for individual file with the list of expected ones for the
// whole transposed projection (a superset of all files)
val columnRecordsMap = columnRecordsSeq.map(r => (r.getColumnName, r)).toMap
val alignedColStatRecordsSeq = targetIndexedColumns.map(columnRecordsMap.get)
val coalescedRowValuesSeq =
alignedColStatRecordsSeq.foldLeft(ListBuffer[Any](fileName, valueCount)) {
case (acc, opt) =>
opt match {
case Some(colStatRecord) =>
acc ++= Seq(colStatRecord.getMinValue, colStatRecord.getMaxValue, colStatRecord.getNullCount)
case None =>
// NOTE: This could occur in either of the following cases:
// 1. When certain columns exist in the schema but are absent in some data files due to
// schema evolution or other reasons, these columns will not be present in the column stats.
// In this case, we fill in default values by setting the min, max and null-count to null
// (this behavior is consistent with reading non-existent columns from Parquet).
// 2. When certain columns are present both in the schema and the data files,
// but the column stats are absent for these columns due to their types not supporting indexing,
// we also set these columns to default values.
//
// This approach prevents errors during data skipping and, because the filter includes an isNull check,
// these conditions will not affect the accurate return of files from data skipping.
acc ++= Seq(null, null, null)
}
}
Row(coalescedRowValuesSeq.toSeq: _*)
}))
(transposedRows, indexSchema)
}
private def loadColumnStatsIndexForColumnsInternal(targetColumns: Seq[String], shouldReadInMemory: Boolean): DataFrame = {
val colStatsDF = {
val colStatsRecords: HoodieData[HoodieMetadataColumnStats] = loadColumnStatsIndexRecords(targetColumns, shouldReadInMemory)
// NOTE: Explicit conversion is required for Scala 2.11
val catalystRows: HoodieData[InternalRow] = colStatsRecords.mapPartitions(JFunction.toJavaSerializableFunction(it => {
val converter = AvroConversionUtils.createAvroToInternalRowConverter(HoodieMetadataColumnStats.SCHEMA$, columnStatsRecordStructType)
it.asScala.map(r => converter(r).orNull).asJava
}), false)
if (shouldReadInMemory) {
// NOTE: This will instantiate a [[Dataset]] backed by [[LocalRelation]] holding all of the rows
// of the transposed table in memory, facilitating execution of the subsequently chained operations
// on it locally (on the driver; all such operations are actually going to be performed by Spark's
// Optimizer)
createDataFrameFromInternalRows(spark, catalystRows.collectAsList().asScala.toSeq, columnStatsRecordStructType)
} else {
createDataFrameFromRDD(spark, HoodieJavaRDD.getJavaRDD(catalystRows), columnStatsRecordStructType)
}
}
colStatsDF.select(targetColumnStatsIndexColumns.map(col): _*)
}
def loadColumnStatsIndexRecords(targetColumns: Seq[String], shouldReadInMemory: Boolean): HoodieData[HoodieMetadataColumnStats] = {
// Read Metadata Table's Column Stats Index records into [[HoodieData]] container by
// - Fetching the records from CSI by key-prefixes (encoded column names)
// - Extracting [[HoodieMetadataColumnStats]] records
// - Filtering out nulls
checkState(targetColumns.nonEmpty)
// TODO encoding should be done internally w/in HoodieBackedTableMetadata
val encodedTargetColumnNames = targetColumns.map(colName => new ColumnIndexID(colName).asBase64EncodedString())
val metadataRecords: HoodieData[HoodieRecord[HoodieMetadataPayload]] =
metadataTable.getRecordsByKeyPrefixes(encodedTargetColumnNames.asJava, HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS, shouldReadInMemory)
val columnStatsRecords: HoodieData[HoodieMetadataColumnStats] =
// NOTE: Explicit conversion is required for Scala 2.11
metadataRecords.map(JFunction.toJavaSerializableFunction(record => {
toScalaOption(record.getData.getInsertValue(null, null))
.map(metadataRecord => metadataRecord.asInstanceOf[HoodieMetadataRecord].getColumnStatsMetadata)
.orNull
}))
.filter(JFunction.toJavaSerializableFunction(columnStatsRecord => columnStatsRecord != null))
columnStatsRecords
}
private def loadFullColumnStatsIndexInternal(): DataFrame = {
val metadataTablePath = HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePathV2.toString)
// Read Metadata Table's Column Stats Index into Spark's [[DataFrame]]
val colStatsDF = spark.read.format("org.apache.hudi")
.options(metadataConfig.getProps.asScala)
.load(s"$metadataTablePath/${MetadataPartitionType.COLUMN_STATS.getPartitionPath}")
val requiredIndexColumns =
targetColumnStatsIndexColumns.map(colName =>
col(s"${HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS}.${colName}"))
colStatsDF.where(col(HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS).isNotNull)
.select(requiredIndexColumns: _*)
}
}
object ColumnStatsIndexSupport {
val INDEX_NAME = "COLUMN_STATS"
private val expectedAvroSchemaValues = Set("BooleanWrapper", "IntWrapper", "LongWrapper", "FloatWrapper", "DoubleWrapper",
"BytesWrapper", "StringWrapper", "DateWrapper", "DecimalWrapper", "TimeMicrosWrapper", "TimestampMicrosWrapper")
/**
* Target Column Stats Index columns which internally are mapped onto fields of the corresponding
* Column Stats record payload ([[HoodieMetadataColumnStats]]) persisted w/in Metadata Table
*/
private val targetColumnStatsIndexColumns = Seq(
HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME,
HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE,
HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE,
HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT,
HoodieMetadataPayload.COLUMN_STATS_FIELD_VALUE_COUNT,
HoodieMetadataPayload.COLUMN_STATS_FIELD_COLUMN_NAME
)
private val columnStatsRecordStructType: StructType = AvroConversionUtils.convertAvroSchemaToStructType(HoodieMetadataColumnStats.SCHEMA$)
/**
* @VisibleForTesting
*/
def composeIndexSchema(targetColumnNames: Seq[String], indexedColumns: Set[String], tableSchema: StructType): (StructType, Seq[String]) = {
val fileNameField = StructField(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME, StringType, nullable = true, Metadata.empty)
val valueCountField = StructField(HoodieMetadataPayload.COLUMN_STATS_FIELD_VALUE_COUNT, LongType, nullable = true, Metadata.empty)
val targetIndexedColumns = targetColumnNames.filter(indexedColumns.contains(_))
val targetIndexedFields = targetIndexedColumns.map(colName => tableSchema.fields.find(f => f.name == colName).get)
(StructType(
targetIndexedFields.foldLeft(Seq(fileNameField, valueCountField)) {
case (acc, field) =>
acc ++ Seq(
composeColumnStatStructType(field.name, HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE, field.dataType),
composeColumnStatStructType(field.name, HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE, field.dataType),
composeColumnStatStructType(field.name, HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT, LongType))
}
), targetIndexedColumns)
}
@inline def getMinColumnNameFor(colName: String): String =
formatColName(colName, HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE)
@inline def getMaxColumnNameFor(colName: String): String =
formatColName(colName, HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE)
@inline def getNullCountColumnNameFor(colName: String): String =
formatColName(colName, HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT)
@inline def getValueCountColumnNameFor: String =
HoodieMetadataPayload.COLUMN_STATS_FIELD_VALUE_COUNT
@inline private def formatColName(col: String, statName: String) = { // TODO add escaping for
String.format("%s_%s", col, statName)
}
@inline private def composeColumnStatStructType(col: String, statName: String, dataType: DataType) =
StructField(formatColName(col, statName), dataType, nullable = true, Metadata.empty)
private def tryUnpackValueWrapper(valueWrapper: AnyRef): Any = {
valueWrapper match {
case w: BooleanWrapper => w.getValue
case w: IntWrapper => w.getValue
case w: LongWrapper => w.getValue
case w: FloatWrapper => w.getValue
case w: DoubleWrapper => w.getValue
case w: BytesWrapper => w.getValue
case w: StringWrapper => w.getValue
case w: DateWrapper => w.getValue
case w: DecimalWrapper => w.getValue
case w: TimeMicrosWrapper => w.getValue
case w: TimestampMicrosWrapper => w.getValue
case r: GenericData.Record if expectedAvroSchemaValues.contains(r.getSchema.getName) =>
r.get("value")
case _ => throw new UnsupportedOperationException(s"Not recognized value wrapper type (${valueWrapper.getClass.getSimpleName})")
}
}
val decConv = new DecimalConversion()
private def deserialize(value: Any, dataType: DataType): Any = {
dataType match {
// NOTE: Since we can't rely on Avro's "date", and "timestamp-micros" logical-types, we're
// manually encoding corresponding values as int and long w/in the Column Stats Index and
// here we have to decode those back into corresponding logical representation.
case TimestampType => DateTimeUtils.toJavaTimestamp(value.asInstanceOf[Long])
case DateType => DateTimeUtils.toJavaDate(value.asInstanceOf[Int])
// Standard types
case StringType => value
case BooleanType => value
// Numeric types
case FloatType => value
case DoubleType => value
case LongType => value
case IntegerType => value
// NOTE: All integral types of size less than Int are encoded as Ints in MT
case ShortType => value.asInstanceOf[Int].toShort
case ByteType => value.asInstanceOf[Int].toByte
// TODO fix
case _: DecimalType =>
value match {
case buffer: ByteBuffer =>
val logicalType = DecimalWrapper.SCHEMA$.getField("value").schema().getLogicalType
decConv.fromBytes(buffer, null, logicalType)
case _ => value
}
case BinaryType =>
value match {
case b: ByteBuffer => toBytes(b)
case other => other
}
case _ =>
throw new UnsupportedOperationException(s"Data type for the statistic value is not recognized $dataType")
}
}
}