Skip to content
Permalink
Browse files
[HUDI-3567] Refactor HoodieCommonUtils to make code more reasonable (#…
  • Loading branch information
huberylee committed Mar 11, 2022
1 parent b001803 commit 56cb49485d74c28500101b81afe2cc41d90fb431
Showing 11 changed files with 706 additions and 593 deletions.
@@ -24,9 +24,9 @@ import org.apache.hudi.HoodieBaseRelation.createBaseFileReader
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.hadoop.HoodieROTablePathFilter
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.{HoodieCatalystExpressionUtils, SQLContext}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources.{BaseRelation, Filter}
import org.apache.spark.sql.types.StructType
@@ -70,7 +70,8 @@ class BaseFileOnlyViewRelation(sqlContext: SQLContext,
HoodieSparkUtils.getRequiredSchema(tableAvroSchema, fetchedColumns)

val filterExpressions = convertToExpressions(filters)
val (partitionFilters, dataFilters) = filterExpressions.partition(isPartitionPredicate)
val (partitionFilters, dataFilters) = HoodieCatalystExpressionUtils.splitPartitionAndDataPredicates(
sparkSession, filterExpressions, partitionColumns)

val filePartitions = getPartitions(partitionFilters, dataFilters)

@@ -137,17 +138,4 @@ class BaseFileOnlyViewRelation(sqlContext: SQLContext,

catalystExpressions.filter(_.isDefined).map(_.get).toArray
}

/**
* Checks whether given expression only references only references partition columns
* (and involves no sub-query)
*/
private def isPartitionPredicate(condition: Expression): Boolean = {
// Validates that the provided names both resolve to the same entity
val resolvedNameEquals = sparkSession.sessionState.analyzer.resolver

condition.references.forall { r => partitionColumns.exists(resolvedNameEquals(r.name, _)) } &&
!SubqueryExpression.hasSubquery(condition)
}

}
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi

import org.apache.hudi.client.SparkRDDWriteClient
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.withSparkConf

import scala.collection.JavaConverters.mapAsJavaMapConverter
import scala.collection.immutable.Map

object HoodieCLIUtils {

def createHoodieClientFromPath(sparkSession: SparkSession,
basePath: String,
conf: Map[String, String]): SparkRDDWriteClient[_] = {
val metaClient = HoodieTableMetaClient.builder().setBasePath(basePath)
.setConf(sparkSession.sessionState.newHadoopConf()).build()
val schemaUtil = new TableSchemaResolver(metaClient)
val schemaStr = schemaUtil.getTableAvroSchemaWithoutMetadataFields.toString
val finalParameters = HoodieWriterUtils.parametersWithWriteDefaults(
withSparkConf(sparkSession, Map.empty)(
conf + (DataSourceWriteOptions.TABLE_TYPE.key() -> metaClient.getTableType.name()))
)

val jsc = new JavaSparkContext(sparkSession.sparkContext)
DataSourceUtils.createHoodieClient(jsc, schemaStr, basePath,
metaClient.getTableConfig.getTableName, finalParameters.asJava)
}
}

This file was deleted.

0 comments on commit 56cb494

Please sign in to comment.