Skip to content

Commit

Permalink
[HUDI-1089] Refactor hudi-client to support multi-engine
Browse files Browse the repository at this point in the history
  • Loading branch information
wangxianghu committed Jul 15, 2020
1 parent b399b4a commit 13795f5
Show file tree
Hide file tree
Showing 282 changed files with 6,119 additions and 4,238 deletions.
21 changes: 18 additions & 3 deletions hudi-cli/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,12 @@
<!-- Hoodie -->
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-client</artifactId>
<artifactId>hudi-client-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-client-spark</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
Expand All @@ -168,10 +173,11 @@
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-client</artifactId>
<artifactId>hudi-client-spark</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<classifier>tests</classifier>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
Expand Down Expand Up @@ -236,6 +242,15 @@
</dependency>

<!-- Test -->
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-client-common</artifactId>
<version>${project.version}</version>
<classifier>tests</classifier>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@
import org.apache.hudi.cli.utils.CommitUtil;
import org.apache.hudi.cli.utils.InputStreamConsumer;
import org.apache.hudi.cli.utils.SparkUtil;
import org.apache.hudi.client.CompactionAdminClient.RenameOpResult;
import org.apache.hudi.client.CompactionAdminClient.ValidationOpResult;
import org.apache.hudi.client.BaseCompactionAdminClient;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
Expand Down Expand Up @@ -431,7 +430,7 @@ public String validateCompaction(
if (exitCode != 0) {
return "Failed to validate compaction for " + compactionInstant;
}
List<ValidationOpResult> res = deSerializeOperationResult(outputPathStr, HoodieCLI.fs);
List<BaseCompactionAdminClient.ValidationOpResult> res = deSerializeOperationResult(outputPathStr, HoodieCLI.fs);
boolean valid = res.stream().map(OperationResult::isSuccess).reduce(Boolean::logicalAnd).orElse(true);
String message = "\n\n\t COMPACTION PLAN " + (valid ? "VALID" : "INVALID") + "\n\n";
List<Comparable[]> rows = new ArrayList<>();
Expand Down Expand Up @@ -493,7 +492,7 @@ public String unscheduleCompaction(
if (exitCode != 0) {
return "Failed to unschedule compaction for " + compactionInstant;
}
List<RenameOpResult> res = deSerializeOperationResult(outputPathStr, HoodieCLI.fs);
List<BaseCompactionAdminClient.RenameOpResult> res = deSerializeOperationResult(outputPathStr, HoodieCLI.fs);
output =
getRenamesToBePrinted(res, limit, sortByField, descending, headerOnly, "unschedule pending compaction");
} finally {
Expand Down Expand Up @@ -537,7 +536,7 @@ public String unscheduleCompactFile(
if (exitCode != 0) {
return "Failed to unschedule compaction for file " + fileId;
}
List<RenameOpResult> res = deSerializeOperationResult(outputPathStr, HoodieCLI.fs);
List<BaseCompactionAdminClient.RenameOpResult> res = deSerializeOperationResult(outputPathStr, HoodieCLI.fs);
output = getRenamesToBePrinted(res, limit, sortByField, descending, headerOnly,
"unschedule file from pending compaction");
} finally {
Expand Down Expand Up @@ -582,7 +581,7 @@ public String repairCompaction(
if (exitCode != 0) {
return "Failed to unschedule compaction for " + compactionInstant;
}
List<RenameOpResult> res = deSerializeOperationResult(outputPathStr, HoodieCLI.fs);
List<BaseCompactionAdminClient.RenameOpResult> res = deSerializeOperationResult(outputPathStr, HoodieCLI.fs);
output = getRenamesToBePrinted(res, limit, sortByField, descending, headerOnly, "repair compaction");
} finally {
// Delete tmp file used to serialize result
Expand All @@ -593,8 +592,8 @@ public String repairCompaction(
return output;
}

private String getRenamesToBePrinted(List<RenameOpResult> res, Integer limit, String sortByField, boolean descending,
boolean headerOnly, String operation) {
private String getRenamesToBePrinted(List<BaseCompactionAdminClient.RenameOpResult> res, Integer limit, String sortByField, boolean descending,
boolean headerOnly, String operation) {

Option<Boolean> result =
Option.fromJavaOptional(res.stream().map(r -> r.isExecuted() && r.isSuccess()).reduce(Boolean::logicalAnd));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
import org.apache.hudi.cli.HoodieTableHeaderFields;
import org.apache.hudi.cli.utils.InputStreamConsumer;
import org.apache.hudi.cli.utils.SparkUtil;
import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.client.HoodieSparkWriteClient;
import org.apache.hudi.common.HoodieSparkEngineContext;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
Expand Down Expand Up @@ -162,10 +163,10 @@ public String deleteSavepoint(@CliOption(key = {"commit"}, help = "Delete a save
return String.format("Savepoint \"%s\" deleted.", instantTime);
}

private static HoodieWriteClient createHoodieClient(JavaSparkContext jsc, String basePath) throws Exception {
private static HoodieSparkWriteClient createHoodieClient(JavaSparkContext jsc, String basePath) throws Exception {
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build();
return new HoodieWriteClient(jsc, config, false);
return new HoodieSparkWriteClient(new HoodieSparkEngineContext(jsc), config, false);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@

import org.apache.hudi.cli.DedupeSparkJob;
import org.apache.hudi.cli.utils.SparkUtil;
import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.client.HoodieSparkWriteClient;
import org.apache.hudi.common.HoodieSparkEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
Expand Down Expand Up @@ -282,7 +283,7 @@ private static int deduplicatePartitionPath(JavaSparkContext jsc, String duplica
}

private static int rollback(JavaSparkContext jsc, String instantTime, String basePath) throws Exception {
HoodieWriteClient client = createHoodieClient(jsc, basePath);
HoodieSparkWriteClient client = createHoodieClient(jsc, basePath);
if (client.rollback(instantTime)) {
LOG.info(String.format("The commit \"%s\" rolled back.", instantTime));
return 0;
Expand All @@ -294,7 +295,7 @@ private static int rollback(JavaSparkContext jsc, String instantTime, String bas

private static int createSavepoint(JavaSparkContext jsc, String commitTime, String user,
String comments, String basePath) throws Exception {
HoodieWriteClient client = createHoodieClient(jsc, basePath);
HoodieSparkWriteClient client = createHoodieClient(jsc, basePath);
try {
client.savepoint(commitTime, user, comments);
LOG.info(String.format("The commit \"%s\" has been savepointed.", commitTime));
Expand All @@ -306,7 +307,7 @@ private static int createSavepoint(JavaSparkContext jsc, String commitTime, Stri
}

private static int rollbackToSavepoint(JavaSparkContext jsc, String savepointTime, String basePath) throws Exception {
HoodieWriteClient client = createHoodieClient(jsc, basePath);
HoodieSparkWriteClient client = createHoodieClient(jsc, basePath);
try {
client.restoreToSavepoint(savepointTime);
LOG.info(String.format("The commit \"%s\" rolled back.", savepointTime));
Expand All @@ -318,7 +319,7 @@ private static int rollbackToSavepoint(JavaSparkContext jsc, String savepointTim
}

private static int deleteSavepoint(JavaSparkContext jsc, String savepointTime, String basePath) throws Exception {
HoodieWriteClient client = createHoodieClient(jsc, basePath);
HoodieSparkWriteClient client = createHoodieClient(jsc, basePath);
try {
client.deleteSavepoint(savepointTime);
LOG.info(String.format("Savepoint \"%s\" deleted.", savepointTime));
Expand All @@ -329,9 +330,9 @@ private static int deleteSavepoint(JavaSparkContext jsc, String savepointTime, S
}
}

private static HoodieWriteClient createHoodieClient(JavaSparkContext jsc, String basePath) throws Exception {
private static HoodieSparkWriteClient createHoodieClient(JavaSparkContext jsc, String basePath) throws Exception {
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build();
return new HoodieWriteClient(jsc, config);
return new HoodieSparkWriteClient(new HoodieSparkEngineContext(jsc), config);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.hudi.cli.HoodieCliSparkConfig;
import org.apache.hudi.cli.commands.SparkEnvCommand;
import org.apache.hudi.cli.commands.SparkMain;
import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.client.HoodieSparkWriteClient;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
Expand Down Expand Up @@ -91,7 +91,7 @@ public static JavaSparkContext initJavaSparkConf(String name, Option<String> mas
sparkConf.set(HoodieCliSparkConfig.CLI_MAPRED_OUTPUT_COMPRESSION_CODEC, "org.apache.hadoop.io.compress.GzipCodec");
sparkConf.set(HoodieCliSparkConfig.CLI_MAPRED_OUTPUT_COMPRESSION_TYPE, "BLOCK");

HoodieWriteClient.registerClasses(sparkConf);
HoodieSparkWriteClient.registerClasses(sparkConf);
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
jsc.hadoopConfiguration().setBoolean(HoodieCliSparkConfig.CLI_PARQUET_ENABLE_SUMMARY_METADATA, false);
FSUtils.prepareHadoopConf(jsc.hadoopConfiguration());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ object SparkHelpers {
// Add current classLoad for config, if not will throw classNotFound of 'HoodieWrapperFileSystem'.
parquetConfig.getHadoopConf().setClassLoader(Thread.currentThread.getContextClassLoader)

val writer = new HoodieParquetWriter[HoodieJsonPayload, IndexedRecord](instantTime, destinationFile, parquetConfig, schema, new SparkTaskContextSupplier())
val writer = new HoodieParquetWriter[IndexedRecord](instantTime, destinationFile, parquetConfig, schema, new SparkTaskContextSupplier())
for (rec <- sourceRecords) {
val key: String = rec.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString
if (!keysToSkip.contains(key)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTimelineArchiveLog;

import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -90,8 +91,10 @@ public void init() throws IOException {
// reload the timeline and get all the commits before archive
metaClient.getActiveTimeline().reload().getAllCommitsTimeline().filterCompletedInstants();

hoodieTable = HoodieSparkTable.create(cfg,jsc.hadoopConfiguration());

// archive
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, metaClient);
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, metaClient, hoodieTable);
archiveLog.archiveIfRequired(hadoopConf);
}

Expand Down Expand Up @@ -122,7 +125,7 @@ public void testShowArchivedCommits() {
for (int i = 100; i < 104; i++) {
String instant = String.valueOf(i);
for (int j = 0; j < 3; j++) {
Comparable[] defaultComp = new Comparable[] {"commit", instant,
Comparable[] defaultComp = new Comparable[]{"commit", instant,
HoodieTestCommitMetadataGenerator.DEFAULT_SECOND_PARTITION_PATH,
HoodieTestCommitMetadataGenerator.DEFAULT_FILEID,
HoodieTestCommitMetadataGenerator.DEFAULT_PRE_COMMIT,
Expand Down Expand Up @@ -164,12 +167,12 @@ public void testShowCommits() throws IOException {
TableHeader header = new TableHeader().addTableHeaderField("CommitTime").addTableHeaderField("CommitType");
for (int i = 100; i < 103; i++) {
String instant = String.valueOf(i);
Comparable[] result = new Comparable[] {instant, "commit"};
Comparable[] result = new Comparable[]{instant, "commit"};
rows.add(result);
rows.add(result);
rows.add(result);
}
rows.add(new Comparable[] {"103", "commit"});
rows.add(new Comparable[]{"103", "commit"});
String expected = HoodiePrintHelper.print(header, new HashMap<>(), "", false, 10, false, rows);
expected = removeNonWordAndStripSpace(expected);
String got = removeNonWordAndStripSpace(cr.getResult().toString());
Expand All @@ -185,7 +188,7 @@ public void testShowCommits() throws IOException {
String instant = String.valueOf(i);
// Since HoodiePrintHelper order data by default, need to order commitMetadata
HoodieCommitMetadata metadata = HoodieTestCommitMetadataGenerator.generateCommitMetadata(tablePath, instant);
Comparable[] result = new Comparable[] {
Comparable[] result = new Comparable[]{
instant, "commit", HoodieTestCommitUtilities.convertAndOrderCommitMetadata(metadata)};
rows.add(result);
rows.add(result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTimelineArchiveLog;
import org.apache.hudi.testutils.HoodieTestDataGenerator;

Expand Down Expand Up @@ -175,7 +176,9 @@ public void testShowArchivedCommits() throws IOException {

// archive
metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient());
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, metaClient);

hoodieTable = HoodieSparkTable.create(metaClient, cfg, jsc.hadoopConfiguration());
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, metaClient, hoodieTable);
archiveLog.archiveIfRequired(jsc.hadoopConfiguration());

CommandResult cr = getShell().executeCommand(String.format("commits showarchived --startTs %s --endTs %s", "100", "104"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.hudi.cli.HoodieTableHeaderFields;
import org.apache.hudi.cli.TableHeader;
import org.apache.hudi.cli.testutils.AbstractShellIntegrationTest;
import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.client.HoodieSparkWriteClient;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
Expand Down Expand Up @@ -88,7 +88,7 @@ public void init() throws IOException {
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(tablePath)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build();

try (HoodieWriteClient client = getHoodieWriteClient(config)) {
try (HoodieSparkWriteClient client = getHoodieWriteClient(config)) {
// Rollback inflight commit3 and commit2
client.rollback(commitTime3);
client.rollback(commitTime2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.hudi.cli.commands;

import org.apache.hudi.cli.testutils.AbstractShellIntegrationTest;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.HoodieSparkTable;

import org.junit.jupiter.api.Test;
import org.springframework.shell.core.CommandResult;
Expand All @@ -39,7 +39,7 @@ public class TestUtilsCommand extends AbstractShellIntegrationTest {
*/
@Test
public void testLoadClass() {
String name = HoodieTable.class.getName();
String name = HoodieSparkTable.class.getName();
CommandResult cr = getShell().executeCommand(String.format("utils loadClass --class %s", name));
assertAll("Command runs success",
() -> assertTrue(cr.isSuccess()),
Expand Down
44 changes: 44 additions & 0 deletions hudi-client/hudi-client-common/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>hudi-client</artifactId>
<groupId>org.apache.hudi</groupId>
<version>0.6.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>hudi-client-common</artifactId>
<version>${parent.version}</version>

<name>hudi-client-common</name>
<packaging>jar</packaging>


<build>
<resources>
<resource>
<directory>../src/main/resources</directory>
</resource>
<resource>
<directory>../src/test/resources</directory>
</resource>
</resources>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.hudi.async;

import org.apache.hudi.common.util.collection.Pair;

import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

Expand Down

0 comments on commit 13795f5

Please sign in to comment.