Skip to content

Commit

Permalink
fix new issues
Browse files Browse the repository at this point in the history
  • Loading branch information
xiarixiaoyao committed Apr 1, 2022
1 parent 71fcd14 commit c7c98b0
Show file tree
Hide file tree
Showing 7 changed files with 16 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ object HoodieSparkUtils extends SparkAdapterSupport {

def gteqSpark3_1: Boolean = SPARK_VERSION > "3.1"

def gteqSpark3_1_3: Boolean = SPARK_VERSION >= "3.1.3"

def getMetaSchema: StructType = {
StructType(HoodieRecord.HOODIE_META_COLUMNS.asScala.map(col => {
StructField(col, StringType, nullable = true)
Expand Down
6 changes: 6 additions & 0 deletions hudi-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,12 @@
<artifactId>avro</artifactId>
</dependency>

<!-- caffeine -->
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
</dependency>

<!-- Parquet -->
<dependency>
<groupId>org.apache.parquet</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
options = optParams,
// NOTE: We have to fork the Hadoop Config here as Spark will be modifying it
// to configure Parquet reader appropriately
hadoopConf = HoodieDataSourceHelper.getConfigurationForInternalSchema(new Configuration(conf), requiredSchema.internalSchema, metaClient.getBasePath, validCommits)
hadoopConf = HoodieDataSourceHelper.getConfigurationWithInternalSchema(new Configuration(conf), requiredSchema.internalSchema, metaClient.getBasePath, validCommits)
)

new HoodieFileScanRDD(sparkSession, baseFileReader, fileSplits)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ object HoodieDataSourceHelper extends PredicateHelper with SparkAdapterSupport {
* @param tablePath hoodie table base path.
* @param validCommits valid commits, using give validCommits to validate all legal histroy Schema files, and return the latest one.
*/
def getConfigurationForInternalSchema(conf: Configuration, internalSchema: InternalSchema, tablePath: String, validCommits: String): Configuration = {
def getConfigurationWithInternalSchema(conf: Configuration, internalSchema: InternalSchema, tablePath: String, validCommits: String): Configuration = {
conf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, SerDeHelper.toJson(internalSchema))
conf.set(SparkInternalSchemaConverter.HOODIE_TABLE_PATH, tablePath)
conf.set(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST, validCommits)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext,
options = optParams,
// NOTE: We have to fork the Hadoop Config here as Spark will be modifying it
// to configure Parquet reader appropriately
hadoopConf = HoodieDataSourceHelper.getConfigurationForInternalSchema(new Configuration(conf), internalSchema, metaClient.getBasePath, validCommits)
hadoopConf = HoodieDataSourceHelper.getConfigurationWithInternalSchema(new Configuration(conf), internalSchema, metaClient.getBasePath, validCommits)
)

val requiredSchemaParquetReader = createBaseFileReader(
Expand All @@ -92,7 +92,7 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext,
options = optParams,
// NOTE: We have to fork the Hadoop Config here as Spark will be modifying it
// to configure Parquet reader appropriately
hadoopConf = HoodieDataSourceHelper.getConfigurationForInternalSchema(new Configuration(conf), requiredSchema.internalSchema, metaClient.getBasePath, validCommits)
hadoopConf = HoodieDataSourceHelper.getConfigurationWithInternalSchema(new Configuration(conf), requiredSchema.internalSchema, metaClient.getBasePath, validCommits)
)

val hoodieTableState = getTableState
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
options = optParams,
// NOTE: We have to fork the Hadoop Config here as Spark will be modifying it
// to configure Parquet reader appropriately
hadoopConf = HoodieDataSourceHelper.getConfigurationForInternalSchema(new Configuration(conf), internalSchema, metaClient.getBasePath, validCommits)
hadoopConf = HoodieDataSourceHelper.getConfigurationWithInternalSchema(new Configuration(conf), internalSchema, metaClient.getBasePath, validCommits)
)

val requiredSchemaParquetReader = createBaseFileReader(
Expand All @@ -91,7 +91,7 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
options = optParams,
// NOTE: We have to fork the Hadoop Config here as Spark will be modifying it
// to configure Parquet reader appropriately
hadoopConf = HoodieDataSourceHelper.getConfigurationForInternalSchema(new Configuration(conf), requiredSchema.internalSchema, metaClient.getBasePath, validCommits)
hadoopConf = HoodieDataSourceHelper.getConfigurationWithInternalSchema(new Configuration(conf), requiredSchema.internalSchema, metaClient.getBasePath, validCommits)
)

val tableState = getTableState
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
import org.apache.hudi.client.utils.SparkInternalSchemaConverter
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.HoodieSparkUtils
import org.apache.hudi.common.util.InternalSchemaCache
import org.apache.hudi.common.util.collection.Pair
import org.apache.hudi.internal.schema.InternalSchema
Expand All @@ -35,7 +36,6 @@ import org.apache.parquet.filter2.predicate.FilterApi
import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat, ParquetRecordReader}

import org.apache.spark.SPARK_VERSION
import org.apache.spark.TaskContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
Expand Down Expand Up @@ -158,7 +158,7 @@ class Spark312HoodieParquetFileFormat extends ParquetFileFormat {
// Try to push down filters when filter push-down is enabled.
val pushed = if (enableParquetFilterPushDown) {
val parquetSchema = footerFileMetaData.getSchema
val parquetFilters = if (SPARK_VERSION.startsWith("3.1.3")) {
val parquetFilters = if (HoodieSparkUtils.gteqSpark3_1_3) {
Spark312HoodieParquetFileFormat.createParquetFilters(
parquetSchema,
pushDownDate,
Expand Down

0 comments on commit c7c98b0

Please sign in to comment.