Skip to content

Commit

Permalink
[HUDI-2883] Refactor hive sync tool / config to use reflection and st…
Browse files Browse the repository at this point in the history
…andardize configs (#4175)

- Refactor hive sync tool / config to use reflection and standardize configs

Co-authored-by: sivabalan <n.siva.b@gmail.com>
Co-authored-by: Rajesh Mahindra <rmahindra@Rajeshs-MacBook-Pro.local>
Co-authored-by: Raymond Xu <2701446+xushiyan@users.noreply.github.com>
  • Loading branch information
4 people committed Mar 22, 2022
1 parent 9b6e138 commit 5f570ea
Show file tree
Hide file tree
Showing 43 changed files with 1,523 additions and 1,219 deletions.
34 changes: 18 additions & 16 deletions docker/demo/sparksql-incremental.commands
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import org.apache.hudi.DataSourceWriteOptions;
import org.apache.spark.sql.SaveMode;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.HoodieDataSourceHelpers;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.sync.common.HoodieSyncConfig;
import org.apache.hudi.hive.MultiPartKeysValueExtractor;
import org.apache.hadoop.fs.FileSystem;

Expand All @@ -43,14 +45,14 @@ spark.sql("select key, `_hoodie_partition_path` as datestr, symbol, ts, open, cl
option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), "datestr").
option(DataSourceWriteOptions.PRECOMBINE_FIELD.key(), "ts").
option(HoodieWriteConfig.TBL_NAME.key(), "stock_ticks_derived_mor").
option(DataSourceWriteOptions.HIVE_TABLE.key(), "stock_ticks_derived_mor").
option(DataSourceWriteOptions.HIVE_DATABASE.key(), "default").
option(DataSourceWriteOptions.HIVE_URL.key(), "jdbc:hive2://hiveserver:10000").
option(DataSourceWriteOptions.HIVE_USER.key(), "hive").
option(DataSourceWriteOptions.HIVE_PASS.key(), "hive").
option(DataSourceWriteOptions.HIVE_SYNC_ENABLED.key(), "true").
option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS.key(), "datestr").
option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS.key(), classOf[MultiPartKeysValueExtractor].getCanonicalName).
option(HoodieSyncConfig.META_SYNC_TABLE_NAME.key(), "stock_ticks_derived_mor").
option(HoodieSyncConfig.META_SYNC_DATABASE_NAME.key(), "default").
option(HiveSyncConfig.HIVE_URL.key(), "jdbc:hive2://hiveserver:10000").
option(HiveSyncConfig.HIVE_USER.key(), "hive").
option(HiveSyncConfig.HIVE_PASS.key(), "hive").
option(HiveSyncConfig.HIVE_SYNC_ENABLED.key(), "true").
option(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key(), "datestr").
option(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), classOf[MultiPartKeysValueExtractor].getCanonicalName).
option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING.key(), "true").
mode(SaveMode.Overwrite).
save("/user/hive/warehouse/stock_ticks_derived_mor");
Expand All @@ -75,14 +77,14 @@ spark.sql("select key, `_hoodie_partition_path` as datestr, symbol, ts, open, cl
option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), "datestr").
option(DataSourceWriteOptions.PRECOMBINE_FIELD.key(), "ts").
option(HoodieWriteConfig.TBL_NAME.key(), "stock_ticks_derived_mor_bs").
option(DataSourceWriteOptions.HIVE_TABLE.key(), "stock_ticks_derived_mor_bs").
option(DataSourceWriteOptions.HIVE_DATABASE.key(), "default").
option(DataSourceWriteOptions.HIVE_URL.key(), "jdbc:hive2://hiveserver:10000").
option(DataSourceWriteOptions.HIVE_USER.key(), "hive").
option(DataSourceWriteOptions.HIVE_PASS.key(), "hive").
option(DataSourceWriteOptions.HIVE_SYNC_ENABLED.key(), "true").
option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS.key(), "datestr").
option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS.key(), classOf[MultiPartKeysValueExtractor].getCanonicalName).
option(HoodieSyncConfig.META_SYNC_TABLE_NAME.key(), "stock_ticks_derived_mor_bs").
option(HoodieSyncConfig.META_SYNC_DATABASE_NAME.key(), "default").
option(HiveSyncConfig.HIVE_URL.key(), "jdbc:hive2://hiveserver:10000").
option(HiveSyncConfig.HIVE_USER.key(), "hive").
option(HiveSyncConfig.HIVE_PASS.key(), "hive").
option(HiveSyncConfig.HIVE_SYNC_ENABLED.key(), "true").
option(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key(), "datestr").
option(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), classOf[MultiPartKeysValueExtractor].getCanonicalName).
option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING.key(), "true").
mode(SaveMode.Overwrite).
save("/user/hive/warehouse/stock_ticks_derived_mor_bs");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public class HoodieWriteConfig extends HoodieConfig {
public static final String DELTASTREAMER_CHECKPOINT_KEY = "deltastreamer.checkpoint.key";

public static final ConfigProperty<String> TBL_NAME = ConfigProperty
.key("hoodie.table.name")
.key(HoodieTableConfig.HOODIE_TABLE_NAME_KEY)
.noDefaultValue()
.withDocumentation("Table name that will be used for registering with metastores like HMS. Needs to be same across runs.");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ public class HoodieTableConfig extends HoodieConfig {

public static final String HOODIE_PROPERTIES_FILE = "hoodie.properties";
public static final String HOODIE_PROPERTIES_FILE_BACKUP = "hoodie.properties.backup";
public static final String HOODIE_WRITE_TABLE_NAME_KEY = "hoodie.datasource.write.table.name";
public static final String HOODIE_TABLE_NAME_KEY = "hoodie.table.name";

public static final ConfigProperty<String> DATABASE_NAME = ConfigProperty
.key("hoodie.database.name")
Expand All @@ -90,7 +92,7 @@ public class HoodieTableConfig extends HoodieConfig {
+ "we can set it to limit the table name under a specific database");

public static final ConfigProperty<String> NAME = ConfigProperty
.key("hoodie.table.name")
.key(HOODIE_TABLE_NAME_KEY)
.noDefaultValue()
.withDocumentation("Table name that will be used for registering with Hive. Needs to be same across runs.");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,24 @@ public static Object loadClass(String clazz, Class<?>[] constructorArgTypes, Obj
}
}

/**
* Check if the clazz has the target constructor or not.
*
* When catch {@link HoodieException} from {@link #loadClass}, it's inconvenient to say if the exception was thrown
* due to the instantiation's own logic or missing constructor.
*
* TODO: ReflectionUtils should throw a specific exception to indicate Reflection problem.
*/
public static boolean hasConstructor(String clazz, Class<?>[] constructorArgTypes) {
try {
getClass(clazz).getConstructor(constructorArgTypes);
return true;
} catch (NoSuchMethodException e) {
LOG.warn("Unable to instantiate class " + clazz, e);
return false;
}
}

/**
* Creates an instance of the given class. Constructor arg types are inferred.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,19 @@

package org.apache.hudi.integ.testsuite.dag.nodes;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig;
import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
import org.apache.hudi.integ.testsuite.helpers.HiveServiceProvider;
import org.apache.hudi.sync.common.HoodieSyncConfig;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

/**
* A hive query node in the DAG of operations for a workflow. used to perform a hive query with given config.
Expand All @@ -46,13 +48,14 @@ public HiveQueryNode(DeltaConfig.Config config) {
public void execute(ExecutionContext executionContext, int curItrCount) throws Exception {
log.info("Executing hive query node {}", this.getName());
this.hiveServiceProvider.startLocalHiveServiceIfNeeded(executionContext.getHoodieTestSuiteWriter().getConfiguration());
HiveSyncConfig hiveSyncConfig = DataSourceUtils
.buildHiveSyncConfig(executionContext.getHoodieTestSuiteWriter().getDeltaStreamerWrapper()
.getDeltaSyncService().getDeltaSync().getProps(),
executionContext.getHoodieTestSuiteWriter().getDeltaStreamerWrapper()
.getDeltaSyncService().getDeltaSync().getCfg().targetBasePath,
executionContext.getHoodieTestSuiteWriter().getDeltaStreamerWrapper()
.getDeltaSyncService().getDeltaSync().getCfg().baseFileFormat);
TypedProperties properties = new TypedProperties();
properties.putAll(executionContext.getHoodieTestSuiteWriter().getDeltaStreamerWrapper()
.getDeltaSyncService().getDeltaSync().getProps());
properties.put(HoodieSyncConfig.META_SYNC_BASE_PATH.key(), executionContext.getHoodieTestSuiteWriter().getDeltaStreamerWrapper()
.getDeltaSyncService().getDeltaSync().getCfg().targetBasePath);
properties.put(HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT.key(), executionContext.getHoodieTestSuiteWriter().getDeltaStreamerWrapper()
.getDeltaSyncService().getDeltaSync().getCfg().baseFileFormat);
HiveSyncConfig hiveSyncConfig = new HiveSyncConfig(properties);
this.hiveServiceProvider.syncToLocalHiveIfNeeded(executionContext.getHoodieTestSuiteWriter());
Connection con = DriverManager.getConnection(hiveSyncConfig.jdbcUrl, hiveSyncConfig.hiveUser,
hiveSyncConfig.hivePass);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,30 @@

package org.apache.hudi.integ.testsuite.dag.nodes;

import org.apache.hudi.integ.testsuite.helpers.HiveServiceProvider;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config;
import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
import org.apache.hudi.sync.common.util.SyncUtilHelpers;

import org.apache.hadoop.fs.Path;

/**
* Represents a hive sync node in the DAG of operations for a workflow. Helps to sync hoodie data to hive table.
*/
public class HiveSyncNode extends DagNode<Boolean> {

private HiveServiceProvider hiveServiceProvider;

public HiveSyncNode(Config config) {
this.config = config;
this.hiveServiceProvider = new HiveServiceProvider(config);
}

@Override
public void execute(ExecutionContext executionContext, int curItrCount) throws Exception {
log.info("Executing hive sync node");
this.hiveServiceProvider.startLocalHiveServiceIfNeeded(executionContext.getHoodieTestSuiteWriter().getConfiguration());
this.hiveServiceProvider.syncToLocalHiveIfNeeded(executionContext.getHoodieTestSuiteWriter());
this.hiveServiceProvider.stopLocalHiveServiceIfNeeded();
}

public HiveServiceProvider getHiveServiceProvider() {
return hiveServiceProvider;
SyncUtilHelpers.runHoodieMetaSync(HiveSyncTool.class.getName(), new TypedProperties(executionContext.getHoodieTestSuiteWriter().getProps()),
executionContext.getHoodieTestSuiteWriter().getConfiguration(),
new Path(executionContext.getHoodieTestSuiteWriter().getCfg().targetBasePath).getFileSystem(executionContext.getHoodieTestSuiteWriter().getConfiguration()),
executionContext.getHoodieTestSuiteWriter().getCfg().targetBasePath, executionContext.getHoodieTestSuiteWriter().getCfg().baseFileFormat);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hive.service.server.HiveServer2;

import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.hive.testutils.HiveTestService;
import org.apache.hudi.integ.testsuite.HoodieTestSuiteWriter;
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config;
Expand All @@ -46,12 +49,17 @@ public void startLocalHiveServiceIfNeeded(Configuration configuration) throws IO
}

public void syncToLocalHiveIfNeeded(HoodieTestSuiteWriter writer) {
HiveSyncTool hiveSyncTool;
if (this.config.isHiveLocal()) {
writer.getDeltaStreamerWrapper().getDeltaSyncService().getDeltaSync()
.syncHive(getLocalHiveServer().getHiveConf());
hiveSyncTool = new HiveSyncTool(writer.getWriteConfig().getProps(),
getLocalHiveServer().getHiveConf(),
FSUtils.getFs(writer.getWriteConfig().getBasePath(), getLocalHiveServer().getHiveConf()));
} else {
writer.getDeltaStreamerWrapper().getDeltaSyncService().getDeltaSync().syncHive();
hiveSyncTool = new HiveSyncTool(writer.getWriteConfig().getProps(),
getLocalHiveServer().getHiveConf(),
FSUtils.getFs(writer.getWriteConfig().getBasePath(), writer.getConfiguration()));
}
hiveSyncTool.syncHoodieTable();
}

public void stopLocalHiveServiceIfNeeded() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.integ.testsuite.HoodieTestSuiteJob;
import org.apache.hudi.integ.testsuite.HoodieTestSuiteJob.HoodieTestSuiteConfig;
Expand Down Expand Up @@ -173,10 +174,10 @@ private static TypedProperties getProperties() {
// Make path selection test suite specific
props.setProperty("hoodie.deltastreamer.source.input.selector", DFSTestSuitePathSelector.class.getName());
// Hive Configs
props.setProperty(DataSourceWriteOptions.HIVE_URL().key(), "jdbc:hive2://127.0.0.1:9999/");
props.setProperty(DataSourceWriteOptions.HIVE_DATABASE().key(), "testdb1");
props.setProperty(DataSourceWriteOptions.HIVE_TABLE().key(), "table1");
props.setProperty(DataSourceWriteOptions.HIVE_PARTITION_FIELDS().key(), "datestr");
props.setProperty(HiveSyncConfig.HIVE_URL.key(), "jdbc:hive2://127.0.0.1:9999/");
props.setProperty(HiveSyncConfig.META_SYNC_DATABASE_NAME.key(), "testdb1");
props.setProperty(HiveSyncConfig.META_SYNC_TABLE_NAME.key(), "table1");
props.setProperty(HiveSyncConfig.META_SYNC_PARTITION_FIELDS.key(), "datestr");
props.setProperty(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(), TimestampBasedKeyGenerator.class.getName());

props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@
import org.apache.hudi.connect.ControlMessage;
import org.apache.hudi.connect.writers.KafkaConnectConfigs;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.CustomAvroKeyGenerator;
import org.apache.hudi.keygen.CustomKeyGenerator;
Expand All @@ -59,7 +57,6 @@
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -269,32 +266,4 @@ public static List<WriteStatus> getWriteStatuses(ControlMessage.ParticipantInfo
ControlMessage.ConnectWriteStatus connectWriteStatus = participantInfo.getWriteStatus();
return SerializationUtils.deserialize(connectWriteStatus.getSerializedWriteStatus().toByteArray());
}

/**
* Build Hive Sync Config
* Note: This method is a temporary solution.
* Future solutions can be referred to: https://issues.apache.org/jira/browse/HUDI-3199
*/
public static HiveSyncConfig buildSyncConfig(TypedProperties props, String tableBasePath) {
HiveSyncConfig hiveSyncConfig = new HiveSyncConfig();
hiveSyncConfig.basePath = tableBasePath;
hiveSyncConfig.usePreApacheInputFormat = props.getBoolean(KafkaConnectConfigs.HIVE_USE_PRE_APACHE_INPUT_FORMAT, false);
hiveSyncConfig.databaseName = props.getString(KafkaConnectConfigs.HIVE_DATABASE, "default");
hiveSyncConfig.tableName = props.getString(KafkaConnectConfigs.HIVE_TABLE, "");
hiveSyncConfig.hiveUser = props.getString(KafkaConnectConfigs.HIVE_USER, "");
hiveSyncConfig.hivePass = props.getString(KafkaConnectConfigs.HIVE_PASS, "");
hiveSyncConfig.jdbcUrl = props.getString(KafkaConnectConfigs.HIVE_URL, "");
hiveSyncConfig.partitionFields = props.getStringList(KafkaConnectConfigs.HIVE_PARTITION_FIELDS, ",", Collections.emptyList());
hiveSyncConfig.partitionValueExtractorClass =
props.getString(KafkaConnectConfigs.HIVE_PARTITION_EXTRACTOR_CLASS, SlashEncodedDayPartitionValueExtractor.class.getName());
hiveSyncConfig.useJdbc = props.getBoolean(KafkaConnectConfigs.HIVE_USE_JDBC, true);
if (props.containsKey(KafkaConnectConfigs.HIVE_SYNC_MODE)) {
hiveSyncConfig.syncMode = props.getString(KafkaConnectConfigs.HIVE_SYNC_MODE);
}
hiveSyncConfig.autoCreateDatabase = props.getBoolean(KafkaConnectConfigs.HIVE_AUTO_CREATE_DATABASE, true);
hiveSyncConfig.ignoreExceptions = props.getBoolean(KafkaConnectConfigs.HIVE_IGNORE_EXCEPTIONS, false);
hiveSyncConfig.skipROSuffix = props.getBoolean(KafkaConnectConfigs.HIVE_SKIP_RO_SUFFIX_FOR_READ_OPTIMIZED_TABLE, false);
hiveSyncConfig.supportTimestamp = props.getBoolean(KafkaConnectConfigs.HIVE_SUPPORT_TIMESTAMP_TYPE, false);
return hiveSyncConfig;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,17 @@
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.connect.transaction.TransactionCoordinator;
import org.apache.hudi.connect.utils.KafkaConnectUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.hive.ddl.HiveSyncMode;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.keygen.factory.HoodieAvroKeyGeneratorFactory;
import org.apache.hudi.sync.common.AbstractSyncTool;
import org.apache.hudi.sync.common.HoodieSyncConfig;
import org.apache.hudi.sync.common.util.SyncUtilHelpers;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

Expand All @@ -54,7 +49,6 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

/**
Expand Down Expand Up @@ -167,43 +161,10 @@ private void syncMeta() {
if (connectConfigs.isMetaSyncEnabled()) {
Set<String> syncClientToolClasses = new HashSet<>(
Arrays.asList(connectConfigs.getMetaSyncClasses().split(",")));
FileSystem fs = FSUtils.getFs(tableBasePath, new Configuration());
for (String impl : syncClientToolClasses) {
impl = impl.trim();
switch (impl) {
case "org.apache.hudi.hive.HiveSyncTool":
syncHive();
break;
default:
FileSystem fs = FSUtils.getFs(tableBasePath, new Configuration());
Properties properties = new Properties();
properties.putAll(connectConfigs.getProps());
properties.put("basePath", tableBasePath);
AbstractSyncTool syncTool = (AbstractSyncTool) ReflectionUtils.loadClass(impl, new Class[] {Properties.class, FileSystem.class}, properties, fs);
syncTool.syncHoodieTable();
}
SyncUtilHelpers.runHoodieMetaSync(impl.trim(), connectConfigs.getProps(), hadoopConf, fs, tableBasePath, HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT.defaultValue());
}
}
}

private void syncHive() {
HiveSyncConfig hiveSyncConfig = KafkaConnectUtils.buildSyncConfig(new TypedProperties(connectConfigs.getProps()), tableBasePath);
String url;
if (!StringUtils.isNullOrEmpty(hiveSyncConfig.syncMode) && HiveSyncMode.of(hiveSyncConfig.syncMode) == HiveSyncMode.HMS) {
url = hadoopConf.get(KafkaConnectConfigs.HIVE_METASTORE_URIS);
} else {
url = hiveSyncConfig.jdbcUrl;
}

LOG.info("Syncing target hoodie table with hive table("
+ hiveSyncConfig.tableName
+ "). Hive URL :"
+ url
+ ", basePath :" + tableBasePath);
LOG.info("Hive Sync Conf => " + hiveSyncConfig);
FileSystem fs = FSUtils.getFs(tableBasePath, hadoopConf);
HiveConf hiveConf = new HiveConf();
hiveConf.addResource(fs.getConf());
LOG.info("Hive Conf => " + hiveConf.getAllProperties().toString());
new HiveSyncTool(hiveSyncConfig, hiveConf, fs).syncHoodieTable();
}
}
Loading

0 comments on commit 5f570ea

Please sign in to comment.