Skip to content

Commit

Permalink
[HUDI-3730] Improve meta sync class design and hierarchies (#5854)
Browse files Browse the repository at this point in the history
* [HUDI-3730] Improve meta sync class design and hierarchies (#5754)
* Implements class design proposed in RFC-55

Co-authored-by: jian.feng <fengjian428@gmial.com>
Co-authored-by: jian.feng <jian.feng@shopee.com>
  • Loading branch information
3 people committed Jul 3, 2022
1 parent c00ea84 commit c0e1587
Show file tree
Hide file tree
Showing 86 changed files with 2,964 additions and 2,864 deletions.
18 changes: 9 additions & 9 deletions docker/demo/sparksql-incremental.commands
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.hudi.DataSourceWriteOptions;
import org.apache.spark.sql.SaveMode;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.HoodieDataSourceHelpers;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HiveSyncConfigHolder;
import org.apache.hudi.sync.common.HoodieSyncConfig;
import org.apache.hudi.hive.MultiPartKeysValueExtractor;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -47,10 +47,10 @@ spark.sql("select key, `_hoodie_partition_path` as datestr, symbol, ts, open, cl
option(HoodieWriteConfig.TBL_NAME.key(), "stock_ticks_derived_mor").
option(HoodieSyncConfig.META_SYNC_TABLE_NAME.key(), "stock_ticks_derived_mor").
option(HoodieSyncConfig.META_SYNC_DATABASE_NAME.key(), "default").
option(HiveSyncConfig.HIVE_URL.key(), "jdbc:hive2://hiveserver:10000").
option(HiveSyncConfig.HIVE_USER.key(), "hive").
option(HiveSyncConfig.HIVE_PASS.key(), "hive").
option(HiveSyncConfig.HIVE_SYNC_ENABLED.key(), "true").
option(HiveSyncConfigHolder.HIVE_URL.key(), "jdbc:hive2://hiveserver:10000").
option(HiveSyncConfigHolder.HIVE_USER.key(), "hive").
option(HiveSyncConfigHolder.HIVE_PASS.key(), "hive").
option(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key(), "true").
option(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key(), "datestr").
option(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), classOf[MultiPartKeysValueExtractor].getCanonicalName).
option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING.key(), "true").
Expand Down Expand Up @@ -79,10 +79,10 @@ spark.sql("select key, `_hoodie_partition_path` as datestr, symbol, ts, open, cl
option(HoodieWriteConfig.TBL_NAME.key(), "stock_ticks_derived_mor_bs").
option(HoodieSyncConfig.META_SYNC_TABLE_NAME.key(), "stock_ticks_derived_mor_bs").
option(HoodieSyncConfig.META_SYNC_DATABASE_NAME.key(), "default").
option(HiveSyncConfig.HIVE_URL.key(), "jdbc:hive2://hiveserver:10000").
option(HiveSyncConfig.HIVE_USER.key(), "hive").
option(HiveSyncConfig.HIVE_PASS.key(), "hive").
option(HiveSyncConfig.HIVE_SYNC_ENABLED.key(), "true").
option(HiveSyncConfigHolder.HIVE_URL.key(), "jdbc:hive2://hiveserver:10000").
option(HiveSyncConfigHolder.HIVE_USER.key(), "hive").
option(HiveSyncConfigHolder.HIVE_PASS.key(), "hive").
option(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key(), "true").
option(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key(), "datestr").
option(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), classOf[MultiPartKeysValueExtractor].getCanonicalName).
option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING.key(), "true").
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.hive.AbstractHiveSyncHoodieClient;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.sync.common.HoodieSyncClient;
import org.apache.hudi.sync.common.model.Partition;

import com.amazonaws.services.glue.AWSGlue;
Expand Down Expand Up @@ -50,10 +50,6 @@
import com.amazonaws.services.glue.model.Table;
import com.amazonaws.services.glue.model.TableInput;
import com.amazonaws.services.glue.model.UpdateTableRequest;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.schema.MessageType;
Expand All @@ -69,8 +65,12 @@

import static org.apache.hudi.aws.utils.S3Utils.s3aToS3;
import static org.apache.hudi.common.util.MapUtils.nonEmpty;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_CREATE_MANAGED_TABLE;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE;
import static org.apache.hudi.hive.util.HiveSchemaUtil.getPartitionKeyType;
import static org.apache.hudi.hive.util.HiveSchemaUtil.parquetSchemaToMapSchema;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_FIELDS;
import static org.apache.hudi.sync.common.util.TableUtils.tableId;

/**
Expand All @@ -79,18 +79,18 @@
*
* @Experimental
*/
public class AWSGlueCatalogSyncClient extends AbstractHiveSyncHoodieClient {
public class AWSGlueCatalogSyncClient extends HoodieSyncClient {

private static final Logger LOG = LogManager.getLogger(AWSGlueCatalogSyncClient.class);
private static final int MAX_PARTITIONS_PER_REQUEST = 100;
private static final long BATCH_REQUEST_SLEEP_MILLIS = 1000L;
private final AWSGlue awsGlue;
private final String databaseName;

public AWSGlueCatalogSyncClient(HiveSyncConfig syncConfig, Configuration hadoopConf, FileSystem fs) {
super(syncConfig, hadoopConf, fs);
public AWSGlueCatalogSyncClient(HiveSyncConfig config) {
super(config);
this.awsGlue = AWSGlueClientBuilder.standard().build();
this.databaseName = syncConfig.databaseName;
this.databaseName = config.getStringOrDefault(META_SYNC_DATABASE_NAME);
}

@Override
Expand Down Expand Up @@ -126,7 +126,7 @@ public void addPartitionsToTable(String tableName, List<String> partitionsToAdd)
StorageDescriptor sd = table.getStorageDescriptor();
List<PartitionInput> partitionInputs = partitionsToAdd.stream().map(partition -> {
StorageDescriptor partitionSd = sd.clone();
String fullPartitionPath = FSUtils.getPartitionPath(syncConfig.basePath, partition).toString();
String fullPartitionPath = FSUtils.getPartitionPath(getBasePath(), partition).toString();
List<String> partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition);
partitionSd.setLocation(fullPartitionPath);
return new PartitionInput().withValues(partitionValues).withStorageDescriptor(partitionSd);
Expand Down Expand Up @@ -160,7 +160,7 @@ public void updatePartitionsToTable(String tableName, List<String> changedPartit
StorageDescriptor sd = table.getStorageDescriptor();
List<BatchUpdatePartitionRequestEntry> updatePartitionEntries = changedPartitions.stream().map(partition -> {
StorageDescriptor partitionSd = sd.clone();
String fullPartitionPath = FSUtils.getPartitionPath(syncConfig.basePath, partition).toString();
String fullPartitionPath = FSUtils.getPartitionPath(getBasePath(), partition).toString();
List<String> partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition);
sd.setLocation(fullPartitionPath);
PartitionInput partitionInput = new PartitionInput().withValues(partitionValues).withStorageDescriptor(partitionSd);
Expand Down Expand Up @@ -204,12 +204,12 @@ public void updateTableProperties(String tableName, Map<String, String> tablePro
}

@Override
public void updateTableDefinition(String tableName, MessageType newSchema) {
public void updateTableSchema(String tableName, MessageType newSchema) {
// ToDo Cascade is set in Hive meta sync, but need to investigate how to configure it for Glue meta
boolean cascade = syncConfig.partitionFields.size() > 0;
boolean cascade = config.getSplitStrings(META_SYNC_PARTITION_FIELDS).size() > 0;
try {
Table table = getTable(awsGlue, databaseName, tableName);
Map<String, String> newSchemaMap = parquetSchemaToMapSchema(newSchema, syncConfig.supportTimestamp, false);
Map<String, String> newSchemaMap = parquetSchemaToMapSchema(newSchema, config.getBoolean(HIVE_SUPPORT_TIMESTAMP_TYPE), false);
List<Column> newColumns = newSchemaMap.keySet().stream().map(key -> {
String keyType = getPartitionKeyType(newSchemaMap, key);
return new Column().withName(key).withType(keyType.toLowerCase()).withComment("");
Expand Down Expand Up @@ -237,21 +237,6 @@ public void updateTableDefinition(String tableName, MessageType newSchema) {
}
}

@Override
public List<FieldSchema> getTableCommentUsingMetastoreClient(String tableName) {
throw new UnsupportedOperationException("Not supported: `getTableCommentUsingMetastoreClient`");
}

@Override
public void updateTableComments(String tableName, List<FieldSchema> oldSchema, List<Schema.Field> newSchema) {
throw new UnsupportedOperationException("Not supported: `updateTableComments`");
}

@Override
public void updateTableComments(String tableName, List<FieldSchema> oldSchema, Map<String, String> newComments) {
throw new UnsupportedOperationException("Not supported: `updateTableComments`");
}

@Override
public void createTable(String tableName,
MessageType storageSchema,
Expand All @@ -265,26 +250,26 @@ public void createTable(String tableName,
}
CreateTableRequest request = new CreateTableRequest();
Map<String, String> params = new HashMap<>();
if (!syncConfig.createManagedTable) {
if (!config.getBoolean(HIVE_CREATE_MANAGED_TABLE)) {
params.put("EXTERNAL", "TRUE");
}
params.putAll(tableProperties);

try {
Map<String, String> mapSchema = parquetSchemaToMapSchema(storageSchema, syncConfig.supportTimestamp, false);
Map<String, String> mapSchema = parquetSchemaToMapSchema(storageSchema, config.getBoolean(HIVE_SUPPORT_TIMESTAMP_TYPE), false);

List<Column> schemaWithoutPartitionKeys = new ArrayList<>();
for (String key : mapSchema.keySet()) {
String keyType = getPartitionKeyType(mapSchema, key);
Column column = new Column().withName(key).withType(keyType.toLowerCase()).withComment("");
// In Glue, the full schema should exclude the partition keys
if (!syncConfig.partitionFields.contains(key)) {
if (!config.getSplitStrings(META_SYNC_PARTITION_FIELDS).contains(key)) {
schemaWithoutPartitionKeys.add(column);
}
}

// now create the schema partition
List<Column> schemaPartitionKeys = syncConfig.partitionFields.stream().map(partitionKey -> {
List<Column> schemaPartitionKeys = config.getSplitStrings(META_SYNC_PARTITION_FIELDS).stream().map(partitionKey -> {
String keyType = getPartitionKeyType(mapSchema, partitionKey);
return new Column().withName(partitionKey).withType(keyType.toLowerCase()).withComment("");
}).collect(Collectors.toList());
Expand All @@ -293,7 +278,7 @@ public void createTable(String tableName,
serdeProperties.put("serialization.format", "1");
storageDescriptor
.withSerdeInfo(new SerDeInfo().withSerializationLibrary(serdeClass).withParameters(serdeProperties))
.withLocation(s3aToS3(syncConfig.basePath))
.withLocation(s3aToS3(getBasePath()))
.withInputFormat(inputFormatClass)
.withOutputFormat(outputFormatClass)
.withColumns(schemaWithoutPartitionKeys);
Expand All @@ -320,7 +305,7 @@ public void createTable(String tableName,
}

@Override
public Map<String, String> getTableSchema(String tableName) {
public Map<String, String> getMetastoreSchema(String tableName) {
try {
// GlueMetastoreClient returns partition keys separate from Columns, hence get both and merge to
// get the Schema of the table.
Expand All @@ -340,11 +325,6 @@ public Map<String, String> getTableSchema(String tableName) {
}
}

@Override
public boolean doesTableExist(String tableName) {
return tableExists(tableName);
}

@Override
public boolean tableExists(String tableName) {
GetTableRequest request = new GetTableRequest()
Expand Down Expand Up @@ -412,11 +392,11 @@ public void close() {

@Override
public void updateLastCommitTimeSynced(String tableName) {
if (!activeTimeline.lastInstant().isPresent()) {
if (!getActiveTimeline().lastInstant().isPresent()) {
LOG.warn("No commit in active timeline.");
return;
}
final String lastCommitTimestamp = activeTimeline.lastInstant().get().getTimestamp();
final String lastCommitTimestamp = getActiveTimeline().lastInstant().get().getTimestamp();
try {
updateTableParameters(awsGlue, databaseName, tableName, Collections.singletonMap(HOODIE_LAST_COMMIT_TIME_SYNC, lastCommitTimestamp), false);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,53 +18,44 @@

package org.apache.hudi.aws.sync;

import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HiveSyncTool;

import com.beust.jcommander.JCommander;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.conf.HiveConf;

import java.util.Properties;

/**
* Currently Experimental. Utility class that implements syncing a Hudi Table with the
* AWS Glue Data Catalog (https://docs.aws.amazon.com/glue/latest/dg/populate-data-catalog.html)
* to enable querying via Glue ETLs, Athena etc.
*
* <p>
* Extends HiveSyncTool since most logic is similar to Hive syncing,
* expect using a different client {@link AWSGlueCatalogSyncClient} that implements
* the necessary functionality using Glue APIs.
*
* @Experimental
*/
public class AwsGlueCatalogSyncTool extends HiveSyncTool {

public AwsGlueCatalogSyncTool(TypedProperties props, Configuration conf, FileSystem fs) {
super(props, new HiveConf(conf, HiveConf.class), fs);
}
public class AWSGlueCatalogSyncTool extends HiveSyncTool {

public AwsGlueCatalogSyncTool(HiveSyncConfig hiveSyncConfig, HiveConf hiveConf, FileSystem fs) {
super(hiveSyncConfig, hiveConf, fs);
public AWSGlueCatalogSyncTool(Properties props, Configuration hadoopConf) {
super(props, hadoopConf);
}

@Override
protected void initClient(HiveSyncConfig hiveSyncConfig, HiveConf hiveConf) {
hoodieHiveClient = new AWSGlueCatalogSyncClient(hiveSyncConfig, hiveConf, fs);
protected void initSyncClient(HiveSyncConfig hiveSyncConfig) {
syncClient = new AWSGlueCatalogSyncClient(hiveSyncConfig);
}

public static void main(String[] args) {
// parse the params
final HiveSyncConfig cfg = new HiveSyncConfig();
JCommander cmd = new JCommander(cfg, null, args);
if (cfg.help || args.length == 0) {
final HiveSyncConfig.HiveSyncConfigParams params = new HiveSyncConfig.HiveSyncConfigParams();
JCommander cmd = JCommander.newBuilder().addObject(params).build();
cmd.parse(args);
if (params.isHelp()) {
cmd.usage();
System.exit(1);
System.exit(0);
}
FileSystem fs = FSUtils.getFs(cfg.basePath, new Configuration());
HiveConf hiveConf = new HiveConf();
hiveConf.addResource(fs.getConf());
new AwsGlueCatalogSyncTool(cfg, hiveConf, fs).syncHoodieTable();
new AWSGlueCatalogSyncTool(params.toProps(), new Configuration()).syncHoodieTable();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
Expand All @@ -29,6 +30,7 @@
import java.io.Serializable;
import java.lang.reflect.Modifier;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;

/**
Expand Down Expand Up @@ -133,6 +135,14 @@ public <T> String getString(ConfigProperty<T> configProperty) {
return rawValue.map(Object::toString).orElse(null);
}

public <T> List<String> getSplitStrings(ConfigProperty<T> configProperty) {
return getSplitStrings(configProperty, ",");
}

public <T> List<String> getSplitStrings(ConfigProperty<T> configProperty, String delimiter) {
return StringUtils.split(getString(configProperty), delimiter);
}

public String getString(String key) {
return props.getProperty(key);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ public TypedProperties(Properties defaults) {
}
}

public void setPropertyIfNonNull(String key, Object value) {
if (value != null) {
setProperty(key, value.toString());
}
}

@Override
public String getProperty(String key) {
Object oval = super.get(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi.common.util;

import javax.annotation.Nullable;

import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -65,6 +66,18 @@ public static String join(final String[] array, final String separator) {
return org.apache.hadoop.util.StringUtils.join(separator, array);
}

/**
* Wrapper of {@link java.lang.String#join(CharSequence, Iterable)}.
*
* Allow return {@code null} when {@code Iterable} is {@code null}.
*/
public static String join(CharSequence delimiter, Iterable<? extends CharSequence> elements) {
if (elements == null) {
return null;
}
return String.join(delimiter, elements);
}

public static String toHexString(byte[] bytes) {
StringBuilder sb = new StringBuilder(bytes.length * 2);
for (byte b : bytes) {
Expand All @@ -77,6 +90,9 @@ public static boolean isNullOrEmpty(String str) {
return str == null || str.length() == 0;
}

public static boolean nonEmpty(String str) {
return !isNullOrEmpty(str);
}

/**
* Returns the given string if it is non-null; the empty string otherwise.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
Expand All @@ -45,6 +46,14 @@ public void testStringJoin() {
assertNotEquals(null, StringUtils.join(STRINGS));
}

@Test
public void testStringJoinWithJavaImpl() {
assertNull(StringUtils.join(",", null));
assertEquals("", String.join(",", Collections.singletonList("")));
assertEquals(",", String.join(",", Arrays.asList("", "")));
assertEquals("a,", String.join(",", Arrays.asList("a", "")));
}

@Test
public void testStringNullToEmpty() {
String str = "This is a test";
Expand Down
Loading

0 comments on commit c0e1587

Please sign in to comment.