Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hudi.common.config.DefaultHoodieConfig;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.util.ReflectionUtils;
Expand Down Expand Up @@ -51,6 +52,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
public static final String TABLE_NAME = "hoodie.table.name";
private static final String TIMELINE_LAYOUT_VERSION = "hoodie.timeline.layout.version";
private static final String BASE_PATH_PROP = "hoodie.base.path";
public static final String TABLE_BASE_FILE_FORMAT = "hoodie.table.base.file.format";
private static final String DEFAULT_TABLE_BASE_FILE_FORMAT = HoodieFileFormat.PARQUET.name();
private static final String AVRO_SCHEMA = "hoodie.avro.schema";
private static final String AVRO_SCHEMA_VALIDATE = "hoodie.avro.schema.validate";
private static final String DEFAULT_AVRO_SCHEMA_VALIDATE = "false";
Expand Down Expand Up @@ -142,6 +145,10 @@ public String getTableName() {
return props.getProperty(TABLE_NAME);
}

public String getTableBaseFileFormat() {
return props.getProperty(TABLE_BASE_FILE_FORMAT);
}

public Boolean shouldAutoCommit() {
return Boolean.parseBoolean(props.getProperty(HOODIE_AUTO_COMMIT_PROP));
}
Expand Down Expand Up @@ -743,6 +750,7 @@ public HoodieWriteConfig build() {
setDefaultOnCondition(props, !props.containsKey(FAIL_ON_TIMELINE_ARCHIVING_ENABLED_PROP),
FAIL_ON_TIMELINE_ARCHIVING_ENABLED_PROP, DEFAULT_FAIL_ON_TIMELINE_ARCHIVING_ENABLED);
setDefaultOnCondition(props, !props.containsKey(AVRO_SCHEMA_VALIDATE), AVRO_SCHEMA_VALIDATE, DEFAULT_AVRO_SCHEMA_VALIDATE);
setDefaultOnCondition(props, !props.containsKey(TABLE_BASE_FILE_FORMAT), TABLE_BASE_FILE_FORMAT, DEFAULT_TABLE_BASE_FILE_FORMAT);

// Make sure the props is propagated
setDefaultOnCondition(props, !isIndexConfigSet, HoodieIndexConfig.newBuilder().fromProperties(props).build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ public static void createHoodieProperties(FileSystem fs, Path metadataFolder, Pr
// Use latest Version as default unless forced by client
properties.setProperty(HOODIE_TIMELINE_LAYOUT_VERSION, TimelineLayoutVersion.CURR_VERSION.toString());
}
if (!properties.containsKey(HOODIE_BASE_FILE_FORMAT_PROP_NAME)) {
properties.setProperty(HOODIE_BASE_FILE_FORMAT_PROP_NAME, DEFAULT_BASE_FILE_FORMAT.name());
}
properties.store(outputStream, "Properties saved on " + new Date(System.currentTimeMillis()));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hudi.common.fs.FailSafeConsistencyGuard;
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
import org.apache.hudi.common.fs.NoOpConsistencyGuard;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
Expand Down Expand Up @@ -298,29 +299,51 @@ public synchronized HoodieArchivedTimeline getArchivedTimeline() {
return archivedTimeline;
}

/**
* Helper method to initialize a table, with given basePath, tableType, name, baseFormat, archiveFolder.
*/
public static HoodieTableMetaClient initTableType(Configuration hadoopConf, String basePath, String tableType,
String tableName, String tableBaseFormat, String archiveLogFolder, String payloadClassName) throws IOException {
return initTableType(hadoopConf, basePath, HoodieTableType.valueOf(tableType), tableName,
HoodieFileFormat.valueOf(tableBaseFormat), archiveLogFolder, payloadClassName, null);
}

/**
* Helper method to initialize a table, with given basePath, tableType, name, archiveFolder.
*/
public static HoodieTableMetaClient initTableType(Configuration hadoopConf, String basePath, String tableType,
String tableName, String archiveLogFolder, String payloadClassName) throws IOException {
return initTableType(hadoopConf, basePath, HoodieTableType.valueOf(tableType), tableName,
archiveLogFolder, payloadClassName, null);
HoodieFileFormat.PARQUET, archiveLogFolder, payloadClassName, null);
}

/**
* Helper method to initialize a given path, as a given type and table name.
*/
public static HoodieTableMetaClient initTableType(Configuration hadoopConf, String basePath,
HoodieTableType tableType, String tableName, String payloadClassName) throws IOException {
return initTableType(hadoopConf, basePath, tableType, tableName, null, payloadClassName, null);
return initTableType(hadoopConf, basePath, tableType, tableName,
HoodieFileFormat.PARQUET, null, payloadClassName, null);
}

/**
* Helper method to initialize a table, with given basePath, tableType, name, archiveFolder, payloadClassName,
* timelineLayoutVersion.
*/
public static HoodieTableMetaClient initTableType(Configuration hadoopConf, String basePath,
HoodieTableType tableType, String tableName, String archiveLogFolder, String payloadClassName,
Integer timelineLayoutVersion) throws IOException {
return initTableType(hadoopConf, basePath, tableType, tableName, HoodieFileFormat.PARQUET,
archiveLogFolder, payloadClassName, timelineLayoutVersion);
}

public static HoodieTableMetaClient initTableType(Configuration hadoopConf, String basePath,
HoodieTableType tableType, String tableName, HoodieFileFormat fileFormat, String archiveLogFolder,
String payloadClassName, Integer timelineLayoutVersion) throws IOException {
Properties properties = new Properties();
properties.setProperty(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, tableName);
properties.setProperty(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, tableType.name());
properties.setProperty(HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP_NAME, fileFormat.name());
if (tableType == HoodieTableType.MERGE_ON_READ && payloadClassName != null) {
properties.setProperty(HoodieTableConfig.HOODIE_PAYLOAD_CLASS_PROP_NAME, payloadClassName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.client.{HoodieWriteClient, WriteStatus}
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodieRecordPayload
import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecordPayload}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
import org.apache.hudi.common.config.TypedProperties
Expand Down Expand Up @@ -61,6 +61,8 @@ private[hudi] object HoodieSparkSqlWriter {
case _ => throw new HoodieException("hoodie only support org.apache.spark.serializer.KryoSerializer as spark.serializer")
}
val tableType = parameters(TABLE_TYPE_OPT_KEY)

val tableBaseFormat = parameters.getOrDefault(HoodieWriteConfig.TABLE_BASE_FILE_FORMAT, HoodieFileFormat.PARQUET.name)
val operation =
// It does not make sense to allow upsert() operation if INSERT_DROP_DUPS_OPT_KEY is true
// Auto-correct the operation to "insert" if OPERATION_OPT_KEY is set to "upsert" wrongly
Expand Down Expand Up @@ -122,7 +124,7 @@ private[hudi] object HoodieSparkSqlWriter {
// Create the table if not present
if (!exists) {
HoodieTableMetaClient.initTableType(sparkContext.hadoopConfiguration, path.get, tableType,
tblName.get, "archived", parameters(PAYLOAD_CLASS_OPT_KEY))
tblName.get, tableBaseFormat, "archived", parameters(PAYLOAD_CLASS_OPT_KEY))
}

// Create a HoodieWriteClient & issue the write.
Expand Down