Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions hudi-cli/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,10 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_${scala.binary.version}</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.shell</groupId>
Expand Down
5 changes: 5 additions & 0 deletions hudi-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_${scala.binary.version}</artifactId>
<scope>provided</scope>
</dependency>

<!-- Dropwizard Metrics -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,25 @@

package org.apache.hudi.client.bootstrap;

import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieFileStatus;
import org.apache.hudi.common.bootstrap.FileStatusUtils;
import org.apache.hudi.common.util.ParquetUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.parquet.schema.MessageType;
import org.apache.spark.api.java.JavaSparkContext;

import org.apache.avro.Schema;
import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.avro.SchemaConverters;
import org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.types.StructType;

import java.util.List;
import java.util.Objects;

/**
* Bootstrap Schema Provider. Schema provided in config is used. If not available, use schema from Parquet
Expand All @@ -50,7 +58,10 @@ public BootstrapSchemaProvider(HoodieWriteConfig writeConfig) {
public final Schema getBootstrapSchema(JavaSparkContext jsc, List<Pair<String, List<HoodieFileStatus>>> partitions) {
if (writeConfig.getSchema() != null) {
// Use schema specified by user if set
return Schema.parse(writeConfig.getSchema());
Schema userSchema = Schema.parse(writeConfig.getSchema());
if (!HoodieAvroUtils.getNullSchema().equals(userSchema)) {
return userSchema;
}
}
return getBootstrapSourceSchema(jsc, partitions);
}
Expand All @@ -64,14 +75,26 @@ public final Schema getBootstrapSchema(JavaSparkContext jsc, List<Pair<String, L
*/
protected Schema getBootstrapSourceSchema(JavaSparkContext jsc,
List<Pair<String, List<HoodieFileStatus>>> partitions) {
return partitions.stream().flatMap(p -> p.getValue().stream())
.map(fs -> {
try {
Path filePath = FileStatusUtils.toPath(fs.getPath());
return ParquetUtils.readAvroSchema(jsc.hadoopConfiguration(), filePath);
} catch (Exception ex) {
return null;
}
}).filter(x -> x != null).findAny().get();
MessageType parquetSchema = partitions.stream().flatMap(p -> p.getValue().stream()).map(fs -> {
try {
Path filePath = FileStatusUtils.toPath(fs.getPath());
return ParquetUtils.readSchema(jsc.hadoopConfiguration(), filePath);
} catch (Exception ex) {
return null;
}
}).filter(Objects::nonNull).findAny()
.orElseThrow(() -> new HoodieException("Could not determine schema from the data files."));


ParquetToSparkSchemaConverter converter = new ParquetToSparkSchemaConverter(
Boolean.parseBoolean(SQLConf.PARQUET_BINARY_AS_STRING().defaultValueString()),
Boolean.parseBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().defaultValueString()));
StructType sparkSchema = converter.convert(parquetSchema);
String tableName = HoodieAvroUtils.sanitizeName(writeConfig.getTableName());
String structName = tableName + "_record";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@umehrot2 : ITTestBootstrapCommand is failing with the below exception. Adding a sanitization API to remove illegal characters from avro field names

Exception in thread "main" org.apache.avro.SchemaParseException: Illegal character in: test-table_record
    at org.apache.avro.Schema.validateName(Schema.java:1151)
    at org.apache.avro.Schema.access$200(Schema.java:81)
    at org.apache.avro.Schema$Name.<init>(Schema.java:489)
    at org.apache.avro.Schema.createRecord(Schema.java:161)
    at org.apache.avro.SchemaBuilder$RecordBuilder.fields(SchemaBuilder.java:1732)
    at org.apache.spark.sql.avro.SchemaConverters$.toAvroType(SchemaConverters.scala:173)
    at org.apache.spark.sql.avro.SchemaConverters.toAvroType(SchemaConverters.scala)
    at org.apache.hudi.client.bootstrap.BootstrapSchemaProvider.getBootstrapSourceSchema(BootstrapSchemaProvider.java:97)
    at org.apache.hudi.client.bootstrap.BootstrapSchemaProvider.getBootstrapSchema(BootstrapSchemaProvider.java:66)
    at org.apache.hudi.table.action.bootstrap.BootstrapCommitActionExecutor.listAndProcessSourcePartitions(BootstrapCommitActionExecutor.java:288)

String recordNamespace = "hoodie." + tableName;

return SchemaConverters.toAvroType(sparkSchema, false, structName, recordNamespace);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hudi.client.bootstrap.BootstrapMode;
import org.apache.hudi.client.bootstrap.selector.MetadataOnlyBootstrapModeSelector;
import org.apache.hudi.client.bootstrap.translator.IdentityBootstrapPartitionPathTranslator;
import org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex;
import org.apache.hudi.common.config.DefaultHoodieConfig;

import java.io.File;
Expand Down Expand Up @@ -52,6 +53,9 @@ public class HoodieBootstrapConfig extends DefaultHoodieConfig {
public static final String DEFAULT_BOOTSTRAP_MODE_SELECTOR_REGEX = ".*";
public static final String DEFAULT_BOOTSTRAP_MODE_SELECTOR_REGEX_MODE = BootstrapMode.METADATA_ONLY.name();

public static final String BOOTSTRAP_INDEX_CLASS_PROP = "hoodie.bootstrap.index.class";
public static final String DEFAULT_BOOTSTRAP_INDEX_CLASS = HFileBootstrapIndex.class.getName();

public HoodieBootstrapConfig(Properties props) {
super(props);
}
Expand Down Expand Up @@ -129,6 +133,8 @@ public HoodieBootstrapConfig build() {
setDefaultOnCondition(props, !props.containsKey(BOOTSTRAP_MODE_SELECTOR_REGEX_MODE),
BOOTSTRAP_MODE_SELECTOR_REGEX_MODE, DEFAULT_BOOTSTRAP_MODE_SELECTOR_REGEX_MODE);
BootstrapMode.valueOf(props.getProperty(BOOTSTRAP_MODE_SELECTOR_REGEX_MODE));
setDefaultOnCondition(props, !props.containsKey(BOOTSTRAP_INDEX_CLASS_PROP), BOOTSTRAP_INDEX_CLASS_PROP,
DEFAULT_BOOTSTRAP_INDEX_CLASS);
return config;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ public HoodieBootstrapWriteMetadata execute() {
}
}

@Override
protected String getSchemaToStoreInCommit() {
return bootstrapSchema;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ public class HoodieAvroUtils {

private static ThreadLocal<BinaryDecoder> reuseDecoder = ThreadLocal.withInitial(() -> null);

// As per https://avro.apache.org/docs/current/spec.html#names
private static String INVALID_AVRO_CHARS_IN_NAMES = "[^A-Za-z0-9_]";
private static String INVALID_AVRO_FIRST_CHAR_IN_NAMES = "[^A-Za-z_]";
private static String MASK_FOR_INVALID_CHARS_IN_NAMES = "__";

// All metadata fields are optional strings.
public static final Schema METADATA_FIELD_SCHEMA =
Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING)));
Expand Down Expand Up @@ -444,4 +449,21 @@ private static boolean isLogicalTypeDate(Schema fieldSchema) {
}
return fieldSchema.getLogicalType() == LogicalTypes.date();
}

public static Schema getNullSchema() {
return Schema.create(Schema.Type.NULL);
}

/**
* Sanitizes Name according to Avro rule for names.
* Removes characters other than the ones mentioned in https://avro.apache.org/docs/current/spec.html#names .
* @param name input name
* @return sanitized name
*/
public static String sanitizeName(String name) {
if (name.substring(0,1).matches(INVALID_AVRO_FIRST_CHAR_IN_NAMES)) {
name = name.replaceFirst(INVALID_AVRO_FIRST_CHAR_IN_NAMES, MASK_FOR_INVALID_CHARS_IN_NAMES);
}
return name.replaceAll(INVALID_AVRO_CHARS_IN_NAMES, MASK_FOR_INVALID_CHARS_IN_NAMES);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.common.model;

import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.fs.FSUtils;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
Expand Down Expand Up @@ -126,6 +127,18 @@ public HashMap<String, String> getFileIdAndFullPaths(String basePath) {
return fullPaths;
}

public Map<HoodieFileGroupId, String> getFileGroupIdAndFullPaths(String basePath) {
Map<HoodieFileGroupId, String> fileGroupIdToFullPaths = new HashMap<>();
for (Map.Entry<String, List<HoodieWriteStat>> entry : getPartitionToWriteStats().entrySet()) {
for (HoodieWriteStat stat : entry.getValue()) {
HoodieFileGroupId fileGroupId = new HoodieFileGroupId(stat.getPartitionPath(), stat.getFileId());
Path fullPath = new Path(basePath, stat.getPath());
fileGroupIdToFullPaths.put(fileGroupId, fullPath.toString());
}
}
return fileGroupIdToFullPaths;
}

public String toJsonString() throws IOException {
if (partitionToWriteStats.containsKey(null)) {
LOG.info("partition path is null for " + partitionToWriteStats.get(null));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,9 @@ public SparkParquetBootstrapDataProvider(TypedProperties props,
public JavaRDD<HoodieRecord> generateInputRecordRDD(String tableName, String sourceBasePath,
List<Pair<String, List<HoodieFileStatus>>> partitionPathsWithFiles) {
String[] filePaths = partitionPathsWithFiles.stream().map(Pair::getValue)
.flatMap(f -> f.stream().map(fs -> FileStatusUtils.toPath(fs.getPath()).toUri().getPath()))
.flatMap(f -> f.stream().map(fs -> FileStatusUtils.toPath(fs.getPath()).toString()))
.toArray(String[]::new);

Dataset inputDataset = sparkSession.read().parquet(filePaths);
try {
KeyGenerator keyGenerator = DataSourceUtils.createKeyGenerator(props);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.hudi
import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, IndexedRecord}
import org.apache.hudi.common.model.HoodieKey
import org.apache.avro.Schema
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.catalyst.encoders.RowEncoder
Expand Down Expand Up @@ -90,4 +91,9 @@ object AvroConversionUtils {
requiredFields.foreach(f => recordBuilder.set(f, record.get(positionIterator.next())))
recordBuilder.build()
}

def getAvroRecordNameAndNamespace(tableName: String): (String, String) = {
val name = HoodieAvroUtils.sanitizeName(tableName)
(s"${name}_record", s"hoodie.${name}")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ object DataSourceReadOptions {
val REALTIME_PAYLOAD_COMBINE_OPT_VAL = "payload_combine"
val DEFAULT_REALTIME_MERGE_OPT_VAL = REALTIME_PAYLOAD_COMBINE_OPT_VAL

val READ_PATHS_OPT_KEY = "hoodie.datasource.read.paths"

@Deprecated
val VIEW_TYPE_OPT_KEY = "hoodie.datasource.view.type"
@Deprecated
Expand Down Expand Up @@ -138,6 +140,7 @@ object DataSourceWriteOptions {
val INSERT_OPERATION_OPT_VAL = "insert"
val UPSERT_OPERATION_OPT_VAL = "upsert"
val DELETE_OPERATION_OPT_VAL = "delete"
val BOOTSTRAP_OPERATION_OPT_VAL = "bootstrap"
val DEFAULT_OPERATION_OPT_VAL = UPSERT_OPERATION_OPT_VAL

/**
Expand Down
95 changes: 65 additions & 30 deletions hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,16 @@

package org.apache.hudi

import org.apache.hadoop.fs.Path
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodieTableType
import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL, OPERATION_OPT_KEY}
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hadoop.HoodieROTablePathFilter
import org.apache.log4j.LogManager
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand
import org.apache.spark.sql.execution.streaming.Sink
import org.apache.spark.sql.sources._
import org.apache.spark.sql.streaming.OutputMode
Expand Down Expand Up @@ -57,22 +58,39 @@ class DefaultSource extends RelationProvider
val parameters = Map(QUERY_TYPE_OPT_KEY -> DEFAULT_QUERY_TYPE_OPT_VAL) ++ translateViewTypesToQueryTypes(optParams)

val path = parameters.get("path")
if (path.isEmpty) {
throw new HoodieException("'path' must be specified.")
val readPathsStr = parameters.get(DataSourceReadOptions.READ_PATHS_OPT_KEY)
if (path.isEmpty && readPathsStr.isEmpty) {
throw new HoodieException(s"'path' or '$READ_PATHS_OPT_KEY' or both must be specified.")
}

val fs = FSUtils.getFs(path.get, sqlContext.sparkContext.hadoopConfiguration)
val globPaths = HoodieSparkUtils.checkAndGlobPathIfNecessary(Seq(path.get), fs)
val readPaths = readPathsStr.map(p => p.split(",").toSeq).getOrElse(Seq())
val allPaths = path.map(p => Seq(p)).getOrElse(Seq()) ++ readPaths

val fs = FSUtils.getFs(allPaths.head, sqlContext.sparkContext.hadoopConfiguration)
val globPaths = HoodieSparkUtils.checkAndGlobPathIfNecessary(allPaths, fs)

val tablePath = DataSourceUtils.getTablePath(fs, globPaths.toArray)
log.info("Obtained hudi table path: " + tablePath)

val metaClient = new HoodieTableMetaClient(fs.getConf, tablePath)
val isBootstrappedTable = metaClient.getTableConfig.getBootstrapBasePath.isPresent
log.info("Is bootstrapped table => " + isBootstrappedTable)

if (parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_SNAPSHOT_OPT_VAL)) {
val metaClient = new HoodieTableMetaClient(fs.getConf, tablePath)
if (metaClient.getTableType.equals(HoodieTableType.MERGE_ON_READ)) {
new MergeOnReadSnapshotRelation(sqlContext, optParams, schema, globPaths, metaClient)
if (isBootstrappedTable) {
// Snapshot query is not supported for Bootstrapped MOR tables
log.warn("Snapshot query is not supported for Bootstrapped Merge-on-Read tables." +
" Falling back to Read Optimized query.")
new HoodieBootstrapRelation(sqlContext, schema, globPaths, metaClient, optParams)
} else {
new MergeOnReadSnapshotRelation(sqlContext, optParams, schema, globPaths, metaClient)
}
} else {
getBaseFileOnlyView(sqlContext, parameters, schema)
getBaseFileOnlyView(sqlContext, parameters, schema, readPaths, isBootstrappedTable, globPaths, metaClient)
}
} else if(parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)) {
getBaseFileOnlyView(sqlContext, parameters, schema)
getBaseFileOnlyView(sqlContext, parameters, schema, readPaths, isBootstrappedTable, globPaths, metaClient)
} else if (parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_INCREMENTAL_OPT_VAL)) {
new IncrementalRelation(sqlContext, tablePath, optParams, schema)
} else {
Expand All @@ -83,8 +101,8 @@ class DefaultSource extends RelationProvider
/**
* This DataSource API is used for writing the DataFrame at the destination. For now, we are returning a dummy
* relation here because Spark does not really make use of the relation returned, and just returns an empty
* dataset at [[SaveIntoDataSourceCommand.run()]]. This saves us the cost of creating and returning a parquet
* relation here.
* dataset at [[org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run()]]. This saves us the cost
* of creating and returning a parquet relation here.
*
* TODO: Revisit to return a concrete relation here when we support CREATE TABLE AS for Hudi with DataSource API.
* That is the only case where Spark seems to actually need a relation to be returned here
Expand All @@ -101,7 +119,13 @@ class DefaultSource extends RelationProvider
optParams: Map[String, String],
df: DataFrame): BaseRelation = {
val parameters = HoodieSparkSqlWriter.parametersWithWriteDefaults(optParams)
HoodieSparkSqlWriter.write(sqlContext, mode, parameters, df)

if (parameters(OPERATION_OPT_KEY).equals(BOOTSTRAP_OPERATION_OPT_VAL)) {
HoodieSparkSqlWriter.bootstrap(sqlContext, mode, parameters, df)
} else {
HoodieSparkSqlWriter.write(sqlContext, mode, parameters, df)
}

new HoodieEmptyRelation(sqlContext, df.schema)
}

Expand All @@ -120,23 +144,34 @@ class DefaultSource extends RelationProvider
override def shortName(): String = "hudi"

private def getBaseFileOnlyView(sqlContext: SQLContext,
optParams: Map[String, String],
schema: StructType): BaseRelation = {
optParams: Map[String, String],
schema: StructType,
extraReadPaths: Seq[String],
isBootstrappedTable: Boolean,
globPaths: Seq[Path],
metaClient: HoodieTableMetaClient): BaseRelation = {
log.warn("Loading Base File Only View.")
// this is just effectively RO view only, where `path` can contain a mix of
// non-hoodie/hoodie path files. set the path filter up
sqlContext.sparkContext.hadoopConfiguration.setClass(
"mapreduce.input.pathFilter.class",
classOf[HoodieROTablePathFilter],
classOf[org.apache.hadoop.fs.PathFilter])

log.info("Constructing hoodie (as parquet) data source with options :" + optParams)
// simply return as a regular parquet relation
DataSource.apply(
sparkSession = sqlContext.sparkSession,
userSpecifiedSchema = Option(schema),
className = "parquet",
options = optParams)
.resolveRelation()

if (isBootstrappedTable) {
// For bootstrapped tables, use our custom Spark relation for querying
new HoodieBootstrapRelation(sqlContext, schema, globPaths, metaClient, optParams)
} else {
// this is just effectively RO view only, where `path` can contain a mix of
// non-hoodie/hoodie path files. set the path filter up
sqlContext.sparkContext.hadoopConfiguration.setClass(
"mapreduce.input.pathFilter.class",
classOf[HoodieROTablePathFilter],
classOf[org.apache.hadoop.fs.PathFilter])

log.info("Constructing hoodie (as parquet) data source with options :" + optParams)
// simply return as a regular parquet relation
DataSource.apply(
sparkSession = sqlContext.sparkSession,
paths = extraReadPaths,
userSpecifiedSchema = Option(schema),
className = "parquet",
options = optParams)
.resolveRelation()
}
}
}
Loading