Skip to content

Commit

Permalink
fix new issues
Browse files Browse the repository at this point in the history
  • Loading branch information
xiarixiaoyao committed Apr 1, 2022
1 parent f431089 commit a85abe9
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ object HoodieSparkUtils extends SparkAdapterSupport {

def gteqSpark3_1: Boolean = SPARK_VERSION > "3.1"

def gteqSpark3_1_3: Boolean = SPARK_VERSION >= "3.1.3"

def getMetaSchema: StructType = {
StructType(HoodieRecord.HOODIE_META_COLUMNS.asScala.map(col => {
StructField(col, StringType, nullable = true)
Expand Down
6 changes: 6 additions & 0 deletions hudi-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,12 @@
<artifactId>avro</artifactId>
</dependency>

<!-- caffeine -->
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
</dependency>

<!-- Parquet -->
<dependency>
<groupId>org.apache.parquet</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
import org.apache.hudi.client.utils.SparkInternalSchemaConverter
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.HoodieSparkUtils
import org.apache.hudi.common.util.InternalSchemaCache
import org.apache.hudi.common.util.collection.Pair
import org.apache.hudi.internal.schema.InternalSchema
Expand All @@ -35,7 +36,6 @@ import org.apache.parquet.filter2.predicate.FilterApi
import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat, ParquetRecordReader}

import org.apache.spark.SPARK_VERSION
import org.apache.spark.TaskContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
Expand Down Expand Up @@ -158,7 +158,7 @@ class Spark312HoodieParquetFileFormat extends ParquetFileFormat {
// Try to push down filters when filter push-down is enabled.
val pushed = if (enableParquetFilterPushDown) {
val parquetSchema = footerFileMetaData.getSchema
val parquetFilters = if (SPARK_VERSION.startsWith("3.1.3")) {
val parquetFilters = if (HoodieSparkUtils.gteqSpark3_1_3) {
Spark312HoodieParquetFileFormat.createParquetFilters(
parquetSchema,
pushDownDate,
Expand Down

0 comments on commit a85abe9

Please sign in to comment.