Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-3730] Improve meta sync class design and hierarchies #5854

Merged
merged 51 commits into from Jul 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
d4cd131
[HUDI-3730] Improve meta sync class design and hierarchies (#5754)
fengjian428 Jun 12, 2022
aa60621
[HUDI-3730] Refactor metasync classes
xushiyan Jun 13, 2022
783d8bd
fix usage in hudi-sync/hudi-hive-sync
xushiyan Jun 17, 2022
f54c59f
fix usage in hudi-sync/hudi-hive-sync
xushiyan Jun 17, 2022
797bb26
renaming
xushiyan Jun 17, 2022
151cdda
renaming
xushiyan Jun 17, 2022
62a9c05
move classes
xushiyan Jun 17, 2022
a0ac3ad
fix usage in datahub sync and aws glue sync
xushiyan Jun 17, 2022
76da5d3
fix usage in gcp sync
xushiyan Jun 17, 2022
f0f8020
fix usage in adb sync
xushiyan Jun 18, 2022
a736c79
fix usage
xushiyan Jun 18, 2022
60f52cd
move back part extractors
xushiyan Jun 18, 2022
51a1a00
fix UT in adb sync
xushiyan Jun 18, 2022
ca40034
remove interfaces
xushiyan Jun 18, 2022
ec67ad5
fix import
xushiyan Jun 18, 2022
47a5af3
fix java boolean type
xushiyan Jun 18, 2022
65cd6ed
fix style
xushiyan Jun 18, 2022
69c7e25
adjust sync tool constructor
xushiyan Jun 18, 2022
97e7dd8
more fixes
xushiyan Jun 18, 2022
6cacee9
[HUDI-3730] fix TestHiveSyncContext
Jun 18, 2022
fa7c496
use setPropertyIfNonNull
xushiyan Jun 19, 2022
11d3df9
fix SyncUtilHelpers
xushiyan Jun 19, 2022
f4bd48e
fix license SparkDataSourceTableUtils
xushiyan Jun 19, 2022
5ee4227
remove unused method and test
xushiyan Jun 19, 2022
5fd03d3
fix spark datasource options NPE
xushiyan Jun 19, 2022
0ba5d62
[WIP][Hudi 3730] fix u ts (#10)
fengjian428 Jun 19, 2022
0406f1d
Hudi 3730 u tsfixand add (#11)
fengjian428 Jun 19, 2022
709ff4a
more fixes
xushiyan Jun 20, 2022
2b17c2c
more fix
xushiyan Jun 20, 2022
7c16293
temp disable TestHiveSyncGlobalCommitTool
xushiyan Jun 20, 2022
6b1f81b
use HiveSyncConfigHolder to avoid import problem in spark datasource
xushiyan Jun 20, 2022
ea40d50
fix configs
xushiyan Jun 20, 2022
bc3dff3
fix npe
xushiyan Jun 20, 2022
39c05d2
revert SerializableConfiguration and use Configuration
xushiyan Jun 20, 2022
86866ff
temp disable ITTestHoodieDataSource
xushiyan Jun 20, 2022
73fd717
fix datahub sync config
xushiyan Jun 23, 2022
0193b00
add ut in datahub sync config
xushiyan Jun 23, 2022
802015e
[HUDI-3730] fix global tools UT (#12)
fengjian428 Jun 24, 2022
982bc22
revert IT disable
xushiyan Jun 24, 2022
7c945de
fix get database name
xushiyan Jun 25, 2022
7cc5fa0
fix multiple help args issue
xushiyan Jun 25, 2022
b38bc32
fix IT
xushiyan Jun 25, 2022
acf3943
remove metasync interface
xushiyan Jun 26, 2022
6305914
[HUDI-3730] add global tools UT (#13)
fengjian428 Jun 26, 2022
9757ddf
fix style
xushiyan Jun 26, 2022
3565896
fix rebase
xushiyan Jun 30, 2022
0091988
address comments
xushiyan Jul 2, 2022
712cf9c
add a UT and docs
xushiyan Jul 2, 2022
55b7bc1
fix getStorageSchema() impl. and remove redundant docs
xushiyan Jul 3, 2022
9b072f3
remove deprecated scanTablePartitions()
xushiyan Jul 3, 2022
c95a9d1
use static imports
xushiyan Jul 3, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
18 changes: 9 additions & 9 deletions docker/demo/sparksql-incremental.commands
Expand Up @@ -21,7 +21,7 @@ import org.apache.hudi.DataSourceWriteOptions;
import org.apache.spark.sql.SaveMode;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.HoodieDataSourceHelpers;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HiveSyncConfigHolder;
import org.apache.hudi.sync.common.HoodieSyncConfig;
import org.apache.hudi.hive.MultiPartKeysValueExtractor;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -47,10 +47,10 @@ spark.sql("select key, `_hoodie_partition_path` as datestr, symbol, ts, open, cl
option(HoodieWriteConfig.TBL_NAME.key(), "stock_ticks_derived_mor").
option(HoodieSyncConfig.META_SYNC_TABLE_NAME.key(), "stock_ticks_derived_mor").
option(HoodieSyncConfig.META_SYNC_DATABASE_NAME.key(), "default").
option(HiveSyncConfig.HIVE_URL.key(), "jdbc:hive2://hiveserver:10000").
option(HiveSyncConfig.HIVE_USER.key(), "hive").
option(HiveSyncConfig.HIVE_PASS.key(), "hive").
option(HiveSyncConfig.HIVE_SYNC_ENABLED.key(), "true").
option(HiveSyncConfigHolder.HIVE_URL.key(), "jdbc:hive2://hiveserver:10000").
option(HiveSyncConfigHolder.HIVE_USER.key(), "hive").
option(HiveSyncConfigHolder.HIVE_PASS.key(), "hive").
option(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key(), "true").
option(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key(), "datestr").
option(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), classOf[MultiPartKeysValueExtractor].getCanonicalName).
option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING.key(), "true").
Expand Down Expand Up @@ -79,10 +79,10 @@ spark.sql("select key, `_hoodie_partition_path` as datestr, symbol, ts, open, cl
option(HoodieWriteConfig.TBL_NAME.key(), "stock_ticks_derived_mor_bs").
option(HoodieSyncConfig.META_SYNC_TABLE_NAME.key(), "stock_ticks_derived_mor_bs").
option(HoodieSyncConfig.META_SYNC_DATABASE_NAME.key(), "default").
option(HiveSyncConfig.HIVE_URL.key(), "jdbc:hive2://hiveserver:10000").
option(HiveSyncConfig.HIVE_USER.key(), "hive").
option(HiveSyncConfig.HIVE_PASS.key(), "hive").
option(HiveSyncConfig.HIVE_SYNC_ENABLED.key(), "true").
option(HiveSyncConfigHolder.HIVE_URL.key(), "jdbc:hive2://hiveserver:10000").
option(HiveSyncConfigHolder.HIVE_USER.key(), "hive").
option(HiveSyncConfigHolder.HIVE_PASS.key(), "hive").
option(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key(), "true").
option(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key(), "datestr").
option(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), classOf[MultiPartKeysValueExtractor].getCanonicalName).
option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING.key(), "true").
Expand Down
Expand Up @@ -21,8 +21,8 @@
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.hive.AbstractHiveSyncHoodieClient;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.sync.common.HoodieSyncClient;
import org.apache.hudi.sync.common.model.Partition;

import com.amazonaws.services.glue.AWSGlue;
Expand Down Expand Up @@ -50,10 +50,6 @@
import com.amazonaws.services.glue.model.Table;
import com.amazonaws.services.glue.model.TableInput;
import com.amazonaws.services.glue.model.UpdateTableRequest;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.schema.MessageType;
Expand All @@ -69,8 +65,12 @@

import static org.apache.hudi.aws.utils.S3Utils.s3aToS3;
import static org.apache.hudi.common.util.MapUtils.nonEmpty;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_CREATE_MANAGED_TABLE;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE;
import static org.apache.hudi.hive.util.HiveSchemaUtil.getPartitionKeyType;
import static org.apache.hudi.hive.util.HiveSchemaUtil.parquetSchemaToMapSchema;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_FIELDS;
import static org.apache.hudi.sync.common.util.TableUtils.tableId;

/**
Expand All @@ -79,18 +79,18 @@
*
* @Experimental
*/
public class AWSGlueCatalogSyncClient extends AbstractHiveSyncHoodieClient {
public class AWSGlueCatalogSyncClient extends HoodieSyncClient {

private static final Logger LOG = LogManager.getLogger(AWSGlueCatalogSyncClient.class);
private static final int MAX_PARTITIONS_PER_REQUEST = 100;
private static final long BATCH_REQUEST_SLEEP_MILLIS = 1000L;
private final AWSGlue awsGlue;
private final String databaseName;

public AWSGlueCatalogSyncClient(HiveSyncConfig syncConfig, Configuration hadoopConf, FileSystem fs) {
super(syncConfig, hadoopConf, fs);
public AWSGlueCatalogSyncClient(HiveSyncConfig config) {
super(config);
this.awsGlue = AWSGlueClientBuilder.standard().build();
this.databaseName = syncConfig.databaseName;
this.databaseName = config.getStringOrDefault(META_SYNC_DATABASE_NAME);
}

@Override
Expand Down Expand Up @@ -126,7 +126,7 @@ public void addPartitionsToTable(String tableName, List<String> partitionsToAdd)
StorageDescriptor sd = table.getStorageDescriptor();
List<PartitionInput> partitionInputs = partitionsToAdd.stream().map(partition -> {
StorageDescriptor partitionSd = sd.clone();
String fullPartitionPath = FSUtils.getPartitionPath(syncConfig.basePath, partition).toString();
String fullPartitionPath = FSUtils.getPartitionPath(getBasePath(), partition).toString();
List<String> partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition);
partitionSd.setLocation(fullPartitionPath);
return new PartitionInput().withValues(partitionValues).withStorageDescriptor(partitionSd);
Expand Down Expand Up @@ -160,7 +160,7 @@ public void updatePartitionsToTable(String tableName, List<String> changedPartit
StorageDescriptor sd = table.getStorageDescriptor();
List<BatchUpdatePartitionRequestEntry> updatePartitionEntries = changedPartitions.stream().map(partition -> {
StorageDescriptor partitionSd = sd.clone();
String fullPartitionPath = FSUtils.getPartitionPath(syncConfig.basePath, partition).toString();
String fullPartitionPath = FSUtils.getPartitionPath(getBasePath(), partition).toString();
List<String> partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition);
sd.setLocation(fullPartitionPath);
PartitionInput partitionInput = new PartitionInput().withValues(partitionValues).withStorageDescriptor(partitionSd);
Expand Down Expand Up @@ -204,12 +204,12 @@ public void updateTableProperties(String tableName, Map<String, String> tablePro
}

@Override
public void updateTableDefinition(String tableName, MessageType newSchema) {
public void updateTableSchema(String tableName, MessageType newSchema) {
// ToDo Cascade is set in Hive meta sync, but need to investigate how to configure it for Glue meta
boolean cascade = syncConfig.partitionFields.size() > 0;
boolean cascade = config.getSplitStrings(META_SYNC_PARTITION_FIELDS).size() > 0;
try {
Table table = getTable(awsGlue, databaseName, tableName);
Map<String, String> newSchemaMap = parquetSchemaToMapSchema(newSchema, syncConfig.supportTimestamp, false);
Map<String, String> newSchemaMap = parquetSchemaToMapSchema(newSchema, config.getBoolean(HIVE_SUPPORT_TIMESTAMP_TYPE), false);
List<Column> newColumns = newSchemaMap.keySet().stream().map(key -> {
String keyType = getPartitionKeyType(newSchemaMap, key);
return new Column().withName(key).withType(keyType.toLowerCase()).withComment("");
Expand Down Expand Up @@ -237,21 +237,6 @@ public void updateTableDefinition(String tableName, MessageType newSchema) {
}
}

@Override
public List<FieldSchema> getTableCommentUsingMetastoreClient(String tableName) {
throw new UnsupportedOperationException("Not supported: `getTableCommentUsingMetastoreClient`");
}

@Override
public void updateTableComments(String tableName, List<FieldSchema> oldSchema, List<Schema.Field> newSchema) {
throw new UnsupportedOperationException("Not supported: `updateTableComments`");
}

@Override
public void updateTableComments(String tableName, List<FieldSchema> oldSchema, Map<String, String> newComments) {
throw new UnsupportedOperationException("Not supported: `updateTableComments`");
}

@Override
public void createTable(String tableName,
MessageType storageSchema,
Expand All @@ -265,26 +250,26 @@ public void createTable(String tableName,
}
CreateTableRequest request = new CreateTableRequest();
Map<String, String> params = new HashMap<>();
if (!syncConfig.createManagedTable) {
if (!config.getBoolean(HIVE_CREATE_MANAGED_TABLE)) {
params.put("EXTERNAL", "TRUE");
}
params.putAll(tableProperties);

try {
Map<String, String> mapSchema = parquetSchemaToMapSchema(storageSchema, syncConfig.supportTimestamp, false);
Map<String, String> mapSchema = parquetSchemaToMapSchema(storageSchema, config.getBoolean(HIVE_SUPPORT_TIMESTAMP_TYPE), false);

List<Column> schemaWithoutPartitionKeys = new ArrayList<>();
for (String key : mapSchema.keySet()) {
String keyType = getPartitionKeyType(mapSchema, key);
Column column = new Column().withName(key).withType(keyType.toLowerCase()).withComment("");
// In Glue, the full schema should exclude the partition keys
if (!syncConfig.partitionFields.contains(key)) {
if (!config.getSplitStrings(META_SYNC_PARTITION_FIELDS).contains(key)) {
schemaWithoutPartitionKeys.add(column);
}
}

// now create the schema partition
List<Column> schemaPartitionKeys = syncConfig.partitionFields.stream().map(partitionKey -> {
List<Column> schemaPartitionKeys = config.getSplitStrings(META_SYNC_PARTITION_FIELDS).stream().map(partitionKey -> {
String keyType = getPartitionKeyType(mapSchema, partitionKey);
return new Column().withName(partitionKey).withType(keyType.toLowerCase()).withComment("");
}).collect(Collectors.toList());
Expand All @@ -293,7 +278,7 @@ public void createTable(String tableName,
serdeProperties.put("serialization.format", "1");
storageDescriptor
.withSerdeInfo(new SerDeInfo().withSerializationLibrary(serdeClass).withParameters(serdeProperties))
.withLocation(s3aToS3(syncConfig.basePath))
.withLocation(s3aToS3(getBasePath()))
.withInputFormat(inputFormatClass)
.withOutputFormat(outputFormatClass)
.withColumns(schemaWithoutPartitionKeys);
Expand All @@ -320,7 +305,7 @@ public void createTable(String tableName,
}

@Override
public Map<String, String> getTableSchema(String tableName) {
public Map<String, String> getMetastoreSchema(String tableName) {
try {
// GlueMetastoreClient returns partition keys separate from Columns, hence get both and merge to
// get the Schema of the table.
Expand All @@ -340,11 +325,6 @@ public Map<String, String> getTableSchema(String tableName) {
}
}

@Override
public boolean doesTableExist(String tableName) {
return tableExists(tableName);
}

@Override
public boolean tableExists(String tableName) {
GetTableRequest request = new GetTableRequest()
Expand Down Expand Up @@ -412,11 +392,11 @@ public void close() {

@Override
public void updateLastCommitTimeSynced(String tableName) {
if (!activeTimeline.lastInstant().isPresent()) {
if (!getActiveTimeline().lastInstant().isPresent()) {
LOG.warn("No commit in active timeline.");
return;
}
final String lastCommitTimestamp = activeTimeline.lastInstant().get().getTimestamp();
final String lastCommitTimestamp = getActiveTimeline().lastInstant().get().getTimestamp();
try {
updateTableParameters(awsGlue, databaseName, tableName, Collections.singletonMap(HOODIE_LAST_COMMIT_TIME_SYNC, lastCommitTimestamp), false);
} catch (Exception e) {
Expand Down
Expand Up @@ -18,53 +18,44 @@

package org.apache.hudi.aws.sync;

import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HiveSyncTool;

import com.beust.jcommander.JCommander;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.conf.HiveConf;

import java.util.Properties;

/**
* Currently Experimental. Utility class that implements syncing a Hudi Table with the
* AWS Glue Data Catalog (https://docs.aws.amazon.com/glue/latest/dg/populate-data-catalog.html)
* to enable querying via Glue ETLs, Athena etc.
*
* <p>
* Extends HiveSyncTool since most logic is similar to Hive syncing,
* expect using a different client {@link AWSGlueCatalogSyncClient} that implements
* the necessary functionality using Glue APIs.
*
* @Experimental
*/
public class AwsGlueCatalogSyncTool extends HiveSyncTool {

public AwsGlueCatalogSyncTool(TypedProperties props, Configuration conf, FileSystem fs) {
super(props, new HiveConf(conf, HiveConf.class), fs);
}
public class AWSGlueCatalogSyncTool extends HiveSyncTool {

public AwsGlueCatalogSyncTool(HiveSyncConfig hiveSyncConfig, HiveConf hiveConf, FileSystem fs) {
super(hiveSyncConfig, hiveConf, fs);
public AWSGlueCatalogSyncTool(Properties props, Configuration hadoopConf) {
super(props, hadoopConf);
xushiyan marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
protected void initClient(HiveSyncConfig hiveSyncConfig, HiveConf hiveConf) {
hoodieHiveClient = new AWSGlueCatalogSyncClient(hiveSyncConfig, hiveConf, fs);
protected void initSyncClient(HiveSyncConfig hiveSyncConfig) {
syncClient = new AWSGlueCatalogSyncClient(hiveSyncConfig);
}

public static void main(String[] args) {
// parse the params
final HiveSyncConfig cfg = new HiveSyncConfig();
JCommander cmd = new JCommander(cfg, null, args);
if (cfg.help || args.length == 0) {
final HiveSyncConfig.HiveSyncConfigParams params = new HiveSyncConfig.HiveSyncConfigParams();
JCommander cmd = JCommander.newBuilder().addObject(params).build();
cmd.parse(args);
if (params.isHelp()) {
cmd.usage();
System.exit(1);
System.exit(0);
}
FileSystem fs = FSUtils.getFs(cfg.basePath, new Configuration());
HiveConf hiveConf = new HiveConf();
hiveConf.addResource(fs.getConf());
new AwsGlueCatalogSyncTool(cfg, hiveConf, fs).syncHoodieTable();
new AWSGlueCatalogSyncTool(params.toProps(), new Configuration()).syncHoodieTable();
}
}
Expand Up @@ -21,6 +21,7 @@
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
Expand All @@ -29,6 +30,7 @@
import java.io.Serializable;
import java.lang.reflect.Modifier;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;

/**
Expand Down Expand Up @@ -133,6 +135,14 @@ public <T> String getString(ConfigProperty<T> configProperty) {
return rawValue.map(Object::toString).orElse(null);
}

public <T> List<String> getSplitStrings(ConfigProperty<T> configProperty) {
return getSplitStrings(configProperty, ",");
}

public <T> List<String> getSplitStrings(ConfigProperty<T> configProperty, String delimiter) {
return StringUtils.split(getString(configProperty), delimiter);
codope marked this conversation as resolved.
Show resolved Hide resolved
}

public String getString(String key) {
return props.getProperty(key);
}
Expand Down
Expand Up @@ -49,6 +49,12 @@ public TypedProperties(Properties defaults) {
}
}

public void setPropertyIfNonNull(String key, Object value) {
if (value != null) {
setProperty(key, value.toString());
}
}

@Override
public String getProperty(String key) {
Object oval = super.get(key);
Expand Down
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi.common.util;

import javax.annotation.Nullable;

import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -65,6 +66,18 @@ public static String join(final String[] array, final String separator) {
return org.apache.hadoop.util.StringUtils.join(separator, array);
}

/**
* Wrapper of {@link java.lang.String#join(CharSequence, Iterable)}.
*
* Allow return {@code null} when {@code Iterable} is {@code null}.
*/
public static String join(CharSequence delimiter, Iterable<? extends CharSequence> elements) {
if (elements == null) {
return null;
}
return String.join(delimiter, elements);
}

public static String toHexString(byte[] bytes) {
StringBuilder sb = new StringBuilder(bytes.length * 2);
for (byte b : bytes) {
Expand All @@ -77,6 +90,9 @@ public static boolean isNullOrEmpty(String str) {
return str == null || str.length() == 0;
}

public static boolean nonEmpty(String str) {
return !isNullOrEmpty(str);
}

/**
* Returns the given string if it is non-null; the empty string otherwise.
Expand Down
Expand Up @@ -23,6 +23,7 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
Expand All @@ -45,6 +46,14 @@ public void testStringJoin() {
assertNotEquals(null, StringUtils.join(STRINGS));
}

@Test
public void testStringJoinWithJavaImpl() {
assertNull(StringUtils.join(",", null));
assertEquals("", String.join(",", Collections.singletonList("")));
assertEquals(",", String.join(",", Arrays.asList("", "")));
assertEquals("a,", String.join(",", Arrays.asList("a", "")));
}

@Test
public void testStringNullToEmpty() {
String str = "This is a test";
Expand Down