Skip to content

Commit

Permalink
[HUDI-4584] Cleaning up Spark utilities (apache#6351)
Browse files Browse the repository at this point in the history
Cleans up Spark utilities and removes duplication
  • Loading branch information
Alexey Kudinkin authored and fengjian committed Apr 5, 2023
1 parent 6e017f3 commit 94c0a95
Show file tree
Hide file tree
Showing 26 changed files with 407 additions and 423 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ protected static class SparkRowConverter {
private final SparkRowSerDe rowSerDe;

SparkRowConverter(StructType schema) {
this.rowSerDe = HoodieSparkUtils.getDeserializer(schema);
this.rowSerDe = HoodieSparkUtils.getCatalystRowSerDe(schema);
this.avroConverter = AvroConversionUtils.createConverterToAvro(schema, STRUCT_NAME, NAMESPACE);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@

package org.apache.hudi.table.upgrade;

import org.apache.hudi.HoodieSparkUtils;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.util.SparkKeyGenUtils;

/**
* Spark upgrade and downgrade helper.
Expand All @@ -47,6 +47,6 @@ public HoodieTable getTable(HoodieWriteConfig config, HoodieEngineContext contex

@Override
public String getPartitionColumns(HoodieWriteConfig config) {
return HoodieSparkUtils.getPartitionColumns(config.getProps());
return SparkKeyGenUtils.getPartitionColumns(config.getProps());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,12 @@
package org.apache.hudi

import org.apache.avro.Schema.Type
import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, IndexedRecord}
import org.apache.avro.generic.GenericRecord
import org.apache.avro.{AvroRuntimeException, JsonProperties, Schema}
import org.apache.hudi.HoodieSparkUtils.sparkAdapter
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType}
import org.apache.spark.sql.{Dataset, Row, SparkSession}

Expand Down Expand Up @@ -90,8 +89,7 @@ object AvroConversionUtils {
@Deprecated
def createConverterToRow(sourceAvroSchema: Schema,
targetSqlType: StructType): GenericRecord => Row = {
val encoder = RowEncoder.apply(targetSqlType).resolveAndBind()
val serde = sparkAdapter.createSparkRowSerDe(encoder)
val serde = sparkAdapter.createSparkRowSerDe(targetSqlType)
val converter = AvroConversionUtils.createAvroToInternalRowConverter(sourceAvroSchema, targetSqlType)

avro => converter.apply(avro).map(serde.deserializeRow).get
Expand All @@ -104,8 +102,7 @@ object AvroConversionUtils {
def createConverterToAvro(sourceSqlType: StructType,
structName: String,
recordNamespace: String): Row => GenericRecord = {
val encoder = RowEncoder.apply(sourceSqlType).resolveAndBind()
val serde = sparkAdapter.createSparkRowSerDe(encoder)
val serde = sparkAdapter.createSparkRowSerDe(sourceSqlType)
val avroSchema = AvroConversionUtils.convertStructTypeToAvroSchema(sourceSqlType, structName, recordNamespace)
val (nullable, _) = resolveAvroTypeNullability(avroSchema)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,13 @@ package org.apache.hudi

import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.client.utils.SparkRowSerDe
import org.apache.hudi.common.config.TypedProperties
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.internal.schema.InternalSchema
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
import org.apache.hudi.internal.schema.utils.InternalSchemaUtils
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
import org.apache.hudi.keygen.{BaseKeyGenerator, CustomAvroKeyGenerator, CustomKeyGenerator, KeyGenerator}
import org.apache.spark.SPARK_VERSION
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, Literal}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import java.util.Properties

import org.apache.hudi.avro.HoodieAvroUtils

import scala.collection.JavaConverters._

Expand Down Expand Up @@ -72,63 +58,6 @@ object HoodieSparkUtils extends SparkAdapterSupport with SparkVersionsSupport {
}))
}

/**
* This method copied from [[org.apache.spark.deploy.SparkHadoopUtil]].
* [[org.apache.spark.deploy.SparkHadoopUtil]] becomes private since Spark 3.0.0 and hence we had to copy it locally.
*/
def isGlobPath(pattern: Path): Boolean = {
pattern.toString.exists("{}[]*?\\".toSet.contains)
}

/**
* This method is inspired from [[org.apache.spark.deploy.SparkHadoopUtil]] with some modifications like
* skipping meta paths.
*/
def globPath(fs: FileSystem, pattern: Path): Seq[Path] = {
// find base path to assist in skipping meta paths
var basePath = pattern.getParent
while (basePath.getName.equals("*")) {
basePath = basePath.getParent
}

Option(fs.globStatus(pattern)).map { statuses => {
val nonMetaStatuses = statuses.filterNot(entry => {
// skip all entries in meta path
var leafPath = entry.getPath
// walk through every parent until we reach base path. if .hoodie is found anywhere, path needs to be skipped
while (!leafPath.equals(basePath) && !leafPath.getName.equals(HoodieTableMetaClient.METAFOLDER_NAME)) {
leafPath = leafPath.getParent
}
leafPath.getName.equals(HoodieTableMetaClient.METAFOLDER_NAME)
})
nonMetaStatuses.map(_.getPath.makeQualified(fs.getUri, fs.getWorkingDirectory)).toSeq
}
}.getOrElse(Seq.empty[Path])
}

/**
* This method copied from [[org.apache.spark.deploy.SparkHadoopUtil]].
* [[org.apache.spark.deploy.SparkHadoopUtil]] becomes private since Spark 3.0.0 and hence we had to copy it locally.
*/
def globPathIfNecessary(fs: FileSystem, pattern: Path): Seq[Path] = {
if (isGlobPath(pattern)) globPath(fs, pattern) else Seq(pattern)
}

/**
* Checks to see whether input path contains a glob pattern and if yes, maps it to a list of absolute paths
* which match the glob pattern. Otherwise, returns original path
*
* @param paths List of absolute or globbed paths
* @param fs File system
* @return list of absolute file paths
*/
def checkAndGlobPathIfNecessary(paths: Seq[String], fs: FileSystem): Seq[Path] = {
paths.flatMap(path => {
val qualified = new Path(path).makeQualified(fs.getUri, fs.getWorkingDirectory)
globPathIfNecessary(fs, qualified)
})
}

/**
* @deprecated please use other overload [[createRdd]]
*/
Expand Down Expand Up @@ -182,142 +111,7 @@ object HoodieSparkUtils extends SparkAdapterSupport with SparkVersionsSupport {
}
}

def getDeserializer(structType: StructType) : SparkRowSerDe = {
val encoder = RowEncoder.apply(structType).resolveAndBind()
sparkAdapter.createSparkRowSerDe(encoder)
}

/**
* Convert Filters to Catalyst Expressions and joined by And. If convert success return an
* Non-Empty Option[Expression],or else return None.
*/
def convertToCatalystExpressions(filters: Seq[Filter],
tableSchema: StructType): Seq[Option[Expression]] = {
filters.map(convertToCatalystExpression(_, tableSchema))
}


/**
* Convert Filters to Catalyst Expressions and joined by And. If convert success return an
* Non-Empty Option[Expression],or else return None.
*/
def convertToCatalystExpression(filters: Array[Filter],
tableSchema: StructType): Option[Expression] = {
val expressions = convertToCatalystExpressions(filters, tableSchema)
if (expressions.forall(p => p.isDefined)) {
if (expressions.isEmpty) {
None
} else if (expressions.length == 1) {
expressions.head
} else {
Some(expressions.map(_.get).reduce(org.apache.spark.sql.catalyst.expressions.And))
}
} else {
None
}
}

/**
* Convert Filter to Catalyst Expression. If convert success return an Non-Empty
* Option[Expression],or else return None.
*/
def convertToCatalystExpression(filter: Filter, tableSchema: StructType): Option[Expression] = {
Option(
filter match {
case EqualTo(attribute, value) =>
org.apache.spark.sql.catalyst.expressions.EqualTo(toAttribute(attribute, tableSchema), Literal.create(value))
case EqualNullSafe(attribute, value) =>
org.apache.spark.sql.catalyst.expressions.EqualNullSafe(toAttribute(attribute, tableSchema), Literal.create(value))
case GreaterThan(attribute, value) =>
org.apache.spark.sql.catalyst.expressions.GreaterThan(toAttribute(attribute, tableSchema), Literal.create(value))
case GreaterThanOrEqual(attribute, value) =>
org.apache.spark.sql.catalyst.expressions.GreaterThanOrEqual(toAttribute(attribute, tableSchema), Literal.create(value))
case LessThan(attribute, value) =>
org.apache.spark.sql.catalyst.expressions.LessThan(toAttribute(attribute, tableSchema), Literal.create(value))
case LessThanOrEqual(attribute, value) =>
org.apache.spark.sql.catalyst.expressions.LessThanOrEqual(toAttribute(attribute, tableSchema), Literal.create(value))
case In(attribute, values) =>
val attrExp = toAttribute(attribute, tableSchema)
val valuesExp = values.map(v => Literal.create(v))
org.apache.spark.sql.catalyst.expressions.In(attrExp, valuesExp)
case IsNull(attribute) =>
org.apache.spark.sql.catalyst.expressions.IsNull(toAttribute(attribute, tableSchema))
case IsNotNull(attribute) =>
org.apache.spark.sql.catalyst.expressions.IsNotNull(toAttribute(attribute, tableSchema))
case And(left, right) =>
val leftExp = convertToCatalystExpression(left, tableSchema)
val rightExp = convertToCatalystExpression(right, tableSchema)
if (leftExp.isEmpty || rightExp.isEmpty) {
null
} else {
org.apache.spark.sql.catalyst.expressions.And(leftExp.get, rightExp.get)
}
case Or(left, right) =>
val leftExp = convertToCatalystExpression(left, tableSchema)
val rightExp = convertToCatalystExpression(right, tableSchema)
if (leftExp.isEmpty || rightExp.isEmpty) {
null
} else {
org.apache.spark.sql.catalyst.expressions.Or(leftExp.get, rightExp.get)
}
case Not(child) =>
val childExp = convertToCatalystExpression(child, tableSchema)
if (childExp.isEmpty) {
null
} else {
org.apache.spark.sql.catalyst.expressions.Not(childExp.get)
}
case StringStartsWith(attribute, value) =>
val leftExp = toAttribute(attribute, tableSchema)
val rightExp = Literal.create(s"$value%")
sparkAdapter.getCatalystPlanUtils.createLike(leftExp, rightExp)
case StringEndsWith(attribute, value) =>
val leftExp = toAttribute(attribute, tableSchema)
val rightExp = Literal.create(s"%$value")
sparkAdapter.getCatalystPlanUtils.createLike(leftExp, rightExp)
case StringContains(attribute, value) =>
val leftExp = toAttribute(attribute, tableSchema)
val rightExp = Literal.create(s"%$value%")
sparkAdapter.getCatalystPlanUtils.createLike(leftExp, rightExp)
case _ => null
}
)
}

/**
* @param properties config properties
* @return partition columns
*/
def getPartitionColumns(properties: Properties): String = {
val props = new TypedProperties(properties)
val keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(props)
getPartitionColumns(keyGenerator, props)
}

/**
* @param keyGen key generator
* @return partition columns
*/
def getPartitionColumns(keyGen: KeyGenerator, typedProperties: TypedProperties): String = {
keyGen match {
// For CustomKeyGenerator and CustomAvroKeyGenerator, the partition path filed format
// is: "field_name: field_type", we extract the field_name from the partition path field.
case c: BaseKeyGenerator
if c.isInstanceOf[CustomKeyGenerator] || c.isInstanceOf[CustomAvroKeyGenerator] =>
c.getPartitionPathFields.asScala.map(pathField =>
pathField.split(CustomAvroKeyGenerator.SPLIT_REGEX)
.headOption.getOrElse(s"Illegal partition path field format: '$pathField' for ${c.getClass.getSimpleName}"))
.mkString(",")

case b: BaseKeyGenerator => b.getPartitionPathFields.asScala.mkString(",")
case _ => typedProperties.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())
}
}

private def toAttribute(columnName: String, tableSchema: StructType): AttributeReference = {
val field = tableSchema.find(p => p.name == columnName)
assert(field.isDefined, s"Cannot find column: $columnName, Table Columns are: " +
s"${tableSchema.fieldNames.mkString(",")}")
AttributeReference(columnName, field.get.dataType, field.get.nullable)()
def getCatalystRowSerDe(structType: StructType) : SparkRowSerDe = {
sparkAdapter.createSparkRowSerDe(structType)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.util

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hudi.common.table.HoodieTableMetaClient

/**
* TODO convert to Java, move to hudi-common
*/
object PathUtils {

/**
* This method copied from [[org.apache.spark.deploy.SparkHadoopUtil]].
* [[org.apache.spark.deploy.SparkHadoopUtil]] becomes private since Spark 3.0.0 and hence we had to copy it locally.
*/
def isGlobPath(pattern: Path): Boolean = {
pattern.toString.exists("{}[]*?\\".toSet.contains)
}

/**
* This method is inspired from [[org.apache.spark.deploy.SparkHadoopUtil]] with some modifications like
* skipping meta paths.
*/
def globPath(fs: FileSystem, pattern: Path): Seq[Path] = {
// find base path to assist in skipping meta paths
var basePath = pattern.getParent
while (basePath.getName.equals("*")) {
basePath = basePath.getParent
}

Option(fs.globStatus(pattern)).map { statuses => {
val nonMetaStatuses = statuses.filterNot(entry => {
// skip all entries in meta path
var leafPath = entry.getPath
// walk through every parent until we reach base path. if .hoodie is found anywhere, path needs to be skipped
while (!leafPath.equals(basePath) && !leafPath.getName.equals(HoodieTableMetaClient.METAFOLDER_NAME)) {
leafPath = leafPath.getParent
}
leafPath.getName.equals(HoodieTableMetaClient.METAFOLDER_NAME)
})
nonMetaStatuses.map(_.getPath.makeQualified(fs.getUri, fs.getWorkingDirectory)).toSeq
}
}.getOrElse(Seq.empty[Path])
}

/**
* This method copied from [[org.apache.spark.deploy.SparkHadoopUtil]].
* [[org.apache.spark.deploy.SparkHadoopUtil]] becomes private since Spark 3.0.0 and hence we had to copy it locally.
*/
def globPathIfNecessary(fs: FileSystem, pattern: Path): Seq[Path] = {
if (isGlobPath(pattern)) globPath(fs, pattern) else Seq(pattern)
}

/**
* Checks to see whether input path contains a glob pattern and if yes, maps it to a list of absolute paths
* which match the glob pattern. Otherwise, returns original path
*
* @param paths List of absolute or globbed paths
* @param fs File system
* @return list of absolute file paths
*/
def checkAndGlobPathIfNecessary(paths: Seq[String], fs: FileSystem): Seq[Path] = {
paths.flatMap(path => {
val qualified = new Path(path).makeQualified(fs.getUri, fs.getWorkingDirectory)
globPathIfNecessary(fs, qualified)
})
}
}
Loading

0 comments on commit 94c0a95

Please sign in to comment.