Skip to content

Commit

Permalink
[HUDI-2757] Implement Hudi AWS Glue sync (#5076)
Browse files Browse the repository at this point in the history
  • Loading branch information
xushiyan committed Mar 28, 2022
1 parent 4ed84b2 commit 6ccbae4
Show file tree
Hide file tree
Showing 25 changed files with 1,151 additions and 204 deletions.
33 changes: 33 additions & 0 deletions hudi-aws/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@
<artifactId>hudi-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-hive-sync</artifactId>
<version>${project.version}</version>
</dependency>

<!-- Logging -->
<dependency>
Expand Down Expand Up @@ -75,6 +80,28 @@
<version>${dynamodb.lockclient.version}</version>
</dependency>

<!-- Hive -->
<dependency>
<groupId>${hive.groupid}</groupId>
<artifactId>hive-service</artifactId>
<version>${hive.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
</dependency>

<!-- AWS SDK -->
<dependency>
<groupId>com.amazonaws</groupId>
Expand Down Expand Up @@ -103,6 +130,12 @@
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-glue -->
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-glue</artifactId>
<version>${aws.sdk.version}</version>
</dependency>

<!-- Test -->
<dependency>
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.aws.sync;

import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HiveSyncTool;

import com.beust.jcommander.JCommander;
import jdk.jfr.Experimental;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.conf.HiveConf;

/**
* Currently Experimental. Utility class that implements syncing a Hudi Table with the
* AWS Glue Data Catalog (https://docs.aws.amazon.com/glue/latest/dg/populate-data-catalog.html)
* to enable querying via Glue ETLs, Athena etc.
*
* Extends HiveSyncTool since most logic is similar to Hive syncing,
* expect using a different client {@link AWSGlueCatalogSyncClient} that implements
* the necessary functionality using Glue APIs.
*/
@Experimental
public class AwsGlueCatalogSyncTool extends HiveSyncTool {

public AwsGlueCatalogSyncTool(TypedProperties props, Configuration conf, FileSystem fs) {
super(props, new HiveConf(conf, HiveConf.class), fs);
}

private AwsGlueCatalogSyncTool(HiveSyncConfig hiveSyncConfig, HiveConf hiveConf, FileSystem fs) {
super(hiveSyncConfig, hiveConf, fs);
}

@Override
protected void initClient(HiveSyncConfig hiveSyncConfig, HiveConf hiveConf) {
hoodieHiveClient = new AWSGlueCatalogSyncClient(hiveSyncConfig, hiveConf, fs);
}

public static void main(String[] args) {
// parse the params
final HiveSyncConfig cfg = new HiveSyncConfig();
JCommander cmd = new JCommander(cfg, null, args);
if (cfg.help || args.length == 0) {
cmd.usage();
System.exit(1);
}
FileSystem fs = FSUtils.getFs(cfg.basePath, new Configuration());
HiveConf hiveConf = new HiveConf();
hiveConf.addResource(fs.getConf());
new AwsGlueCatalogSyncTool(cfg, hiveConf, fs).syncHoodieTable();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi.aws.sync;

import org.apache.hudi.hive.HoodieHiveSyncException;

public class HoodieGlueSyncException extends HoodieHiveSyncException {

public HoodieGlueSyncException(String message) {
super(message);
}

public HoodieGlueSyncException(String message, Throwable t) {
super(message, t);
}
}
27 changes: 27 additions & 0 deletions hudi-aws/src/main/java/org/apache/hudi/aws/utils/S3Utils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi.aws.utils;

public final class S3Utils {

public static String s3aToS3(String s3aUrl) {
return s3aUrl.replaceFirst("(?i)^s3a://", "s3://");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -32,12 +33,21 @@
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

public class CollectionUtils {

public static final Properties EMPTY_PROPERTIES = new Properties();

public static boolean isNullOrEmpty(Collection<?> c) {
return Objects.isNull(c) || c.isEmpty();
}

public static boolean nonEmpty(Collection<?> c) {
return !isNullOrEmpty(c);
}

/**
* Combines provided arrays into one
*/
Expand Down Expand Up @@ -105,6 +115,21 @@ public static <E> List<E> diff(List<E> one, List<E> another) {
return diff;
}

public static <E> Stream<List<E>> batchesAsStream(List<E> list, int batchSize) {
ValidationUtils.checkArgument(batchSize > 0, "batch size must be positive.");
int total = list.size();
if (total <= 0) {
return Stream.empty();
}
int numFullBatches = (total - 1) / batchSize;
return IntStream.range(0, numFullBatches + 1).mapToObj(
n -> list.subList(n * batchSize, n == numFullBatches ? total : (n + 1) * batchSize));
}

public static <E> List<List<E>> batches(List<E> list, int batchSize) {
return batchesAsStream(list, batchSize).collect(Collectors.toList());
}

/**
* Determines whether two iterators contain equal elements in the same order. More specifically,
* this method returns {@code true} if {@code iterator1} and {@code iterator2} contain the same
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi.common.util;

import java.util.Map;
import java.util.Objects;

public class MapUtils {

public static boolean isNullOrEmpty(Map<?, ?> m) {
return Objects.isNull(m) || m.isEmpty();
}

public static boolean nonEmpty(Map<?, ?> m) {
return !isNullOrEmpty(m);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi.common.util;

import org.junit.jupiter.api.Test;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;

import static org.apache.hudi.common.util.CollectionUtils.batches;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

class TestCollectionUtils {

@Test
void getBatchesFromList() {
assertThrows(IllegalArgumentException.class, () -> {
batches(Collections.emptyList(), -1);
});

assertThrows(IllegalArgumentException.class, () -> {
batches(Collections.emptyList(), 0);
});

assertEquals(Collections.emptyList(), batches(Collections.emptyList(), 1));

List<List<Integer>> intsBatches1 = batches(Arrays.asList(1, 2, 3, 4, 5, 6), 3);
assertEquals(2, intsBatches1.size());
assertEquals(Arrays.asList(1, 2, 3), intsBatches1.get(0));
assertEquals(Arrays.asList(4, 5, 6), intsBatches1.get(1));

List<List<Integer>> intsBatches2 = batches(Arrays.asList(1, 2, 3, 4, 5, 6), 5);
assertEquals(2, intsBatches2.size());
assertEquals(Arrays.asList(1, 2, 3, 4, 5), intsBatches2.get(0));
assertEquals(Collections.singletonList(6), intsBatches2.get(1));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,9 @@ private void syncMeta() {
Arrays.asList(connectConfigs.getMetaSyncClasses().split(",")));
FileSystem fs = FSUtils.getFs(tableBasePath, new Configuration());
for (String impl : syncClientToolClasses) {
SyncUtilHelpers.runHoodieMetaSync(impl.trim(), connectConfigs.getProps(), hadoopConf, fs, tableBasePath, HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT.defaultValue());
// TODO kafka connect config needs to support setting base file format
String baseFileFormat = connectConfigs.getStringOrDefault(HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT);
SyncUtilHelpers.runHoodieMetaSync(impl.trim(), connectConfigs.getProps(), hadoopConf, fs, tableBasePath, baseFileFormat);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,7 @@ object HoodieSparkSqlWriter {

if (metaSyncEnabled) {
val fs = basePath.getFileSystem(spark.sessionState.newHadoopConf())
val baseFileFormat = hoodieConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT);
val properties = new TypedProperties()
properties.putAll(hoodieConfig.getProps)
properties.put(HiveSyncConfig.HIVE_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD.key, spark.sessionState.conf.getConf(StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD).toString)
Expand All @@ -572,7 +573,7 @@ object HoodieSparkSqlWriter {
hiveConf.addResource(fs.getConf)

syncClientToolClassSet.foreach(impl => {
SyncUtilHelpers.runHoodieMetaSync(impl.trim, properties, hiveConf, fs, basePath.toString, HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT.defaultValue)
SyncUtilHelpers.runHoodieMetaSync(impl.trim, properties, hiveConf, fs, basePath.toString, baseFileFormat)
})
}
true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ private void syncHoodieTable(String tableName, boolean useRealtimeInputFormat) {
LOG.info("Trying to sync hoodie table " + tableName + " with base path " + hoodieDLAClient.getBasePath()
+ " of type " + hoodieDLAClient.getTableType());
// Check if the necessary table exists
boolean tableExists = hoodieDLAClient.doesTableExist(tableName);
boolean tableExists = hoodieDLAClient.tableExists(tableName);
// Get the parquet schema for this table looking at the latest commit
MessageType schema = hoodieDLAClient.getDataSchema();
// Sync schema if needed
Expand Down
Loading

0 comments on commit 6ccbae4

Please sign in to comment.