Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
huaxingao committed Nov 14, 2021
1 parent 0e655a8 commit 4fb313b
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 15 deletions.
Expand Up @@ -168,6 +168,11 @@ object AggregatePushDownUtils {
aggregation: Aggregation,
partitionValues: InternalRow): InternalRow = {
val groupByColNames = aggregation.groupByColumns.map(_.fieldNames.head)
assert(groupByColNames.length == partitionSchema.length &&
groupByColNames.length == partitionValues.numFields, "The number of group by columns " +
s"${groupByColNames.length} should be the same as partition schema length " +
s"${partitionSchema.length} and the number of fields ${partitionValues.numFields} " +
s"in partitionValues")
var reorderedPartColValues = Array.empty[Any]
if (!partitionSchema.names.sameElements(groupByColNames)) {
groupByColNames.foreach { col =>
Expand Down
Expand Up @@ -499,7 +499,9 @@ object OrcUtils extends Logging {
(0 until schemaWithoutGroupBy.length).toArray)
val resultRow = orcValuesDeserializer.deserializeFromValues(aggORCValues)
if (aggregation.groupByColumns.nonEmpty) {
new JoinedRow(partitionValues, resultRow)
val reOrderedPartitionValues = AggregatePushDownUtils.reOrderPartitionCol(
partitionSchema, aggregation, partitionValues)
new JoinedRow(reOrderedPartitionValues, resultRow)
} else {
resultRow
}
Expand Down
Expand Up @@ -203,7 +203,9 @@ object ParquetUtils {
}

if (aggregation.groupByColumns.nonEmpty) {
new JoinedRow(partitionValues, converter.currentRecord)
val reorderedPartitionValues = AggregatePushDownUtils.reOrderPartitionCol(
partitionSchema, aggregation, partitionValues)
new JoinedRow(reorderedPartitionValues, converter.currentRecord)
} else {
converter.currentRecord
}
Expand Down
Expand Up @@ -190,11 +190,9 @@ case class OrcPartitionReaderFactory(
private var hasNext = true
private lazy val row: InternalRow = {
Utils.tryWithResource(createORCReader(filePath, conf)) { reader =>
val partitionValues = AggregatePushDownUtils.reOrderPartitionCol(
partitionSchema, aggregation.get, file.partitionValues)
OrcUtils.createAggInternalRowFromFooter(
reader, filePath.toString, dataSchema, partitionSchema, aggregation.get,
readDataSchema, partitionValues)
readDataSchema, file.partitionValues)
}
}

Expand All @@ -220,11 +218,9 @@ case class OrcPartitionReaderFactory(
private var hasNext = true
private lazy val batch: ColumnarBatch = {
Utils.tryWithResource(createORCReader(filePath, conf)) { reader =>
val partitionValues = AggregatePushDownUtils.reOrderPartitionCol(
partitionSchema, aggregation.get, file.partitionValues)
val row = OrcUtils.createAggInternalRowFromFooter(
reader, filePath.toString, dataSchema, partitionSchema, aggregation.get,
readDataSchema, partitionValues)
readDataSchema, file.partitionValues)
AggregatePushDownUtils.convertAggregatesRowToBatch(row, readDataSchema, offHeap = false)
}
}
Expand Down
Expand Up @@ -135,12 +135,9 @@ case class ParquetPartitionReaderFactory(
private lazy val row: InternalRow = {
val footer = getFooter(file)

val partitionValues = AggregatePushDownUtils.reOrderPartitionCol(
partitionSchema, aggregation.get, file.partitionValues)

if (footer != null && footer.getBlocks.size > 0) {
ParquetUtils.createAggInternalRowFromFooter(footer, file.filePath, dataSchema,
partitionSchema, aggregation.get, readDataSchema, partitionValues,
partitionSchema, aggregation.get, readDataSchema, file.partitionValues,
getDatetimeRebaseMode(footer.getFileMetaData))
} else {
null
Expand Down Expand Up @@ -182,10 +179,8 @@ case class ParquetPartitionReaderFactory(
private val batch: ColumnarBatch = {
val footer = getFooter(file)
if (footer != null && footer.getBlocks.size > 0) {
val partitionValues = AggregatePushDownUtils.reOrderPartitionCol(
partitionSchema, aggregation.get, file.partitionValues)
val row = ParquetUtils.createAggInternalRowFromFooter(footer, file.filePath,
dataSchema, partitionSchema, aggregation.get, readDataSchema, partitionValues,
dataSchema, partitionSchema, aggregation.get, readDataSchema, file.partitionValues,
getDatetimeRebaseMode(footer.getFileMetaData))
AggregatePushDownUtils.convertAggregatesRowToBatch(
row, readDataSchema, enableOffHeapColumnVector && Option(TaskContext.get()).isDefined)
Expand Down

0 comments on commit 4fb313b

Please sign in to comment.