Skip to content

Commit

Permalink
[HUDI-25] Optimize HoodieInputformat.listStatus() for faster Hive inc…
Browse files Browse the repository at this point in the history
…remental queries on Hoodie

    Summary:
    - InputPathHandler class classifies  inputPaths into incremental, non incremental and non hoodie paths.
    - Incremental queries leverage HoodieCommitMetadata to get partitions that are affected and only lists those partitions as opposed to listing all partitions
    - listStatus() processes each category separately
  • Loading branch information
Bhavani Sudha Saktheeswaran committed Jan 8, 2020
1 parent 480fc78 commit d09eacd
Show file tree
Hide file tree
Showing 6 changed files with 601 additions and 118 deletions.
20 changes: 12 additions & 8 deletions hudi-hadoop-mr/pom.xml
Expand Up @@ -50,18 +50,10 @@
</dependency>

<!-- Hadoop -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
Expand Down Expand Up @@ -91,6 +83,18 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>

<dependency>
<groupId>junit</groupId>
Expand Down
Expand Up @@ -18,6 +18,12 @@

package org.apache.hudi.hadoop;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.log4j.LogManager;
Expand All @@ -36,6 +42,7 @@ public class HoodieHiveUtil {
public static final int DEFAULT_MAX_COMMITS = 1;
public static final int MAX_COMMIT_ALL = -1;
public static final int DEFAULT_LEVELS_TO_BASEPATH = 3;
public static final Pattern HOODIE_CONSUME_MODE_PATTERN_STRING = Pattern.compile("hoodie\\.(.*)\\.consume\\.mode");

public static Integer readMaxCommits(JobContext job, String tableName) {
String maxCommitName = String.format(HOODIE_MAX_COMMIT_PATTERN, tableName);
Expand Down Expand Up @@ -67,4 +74,22 @@ public static Path getNthParent(Path path, int n) {
}
return parent;
}

public static List<String> getIncrementalTableNames(JobContext job) {
Map<String, String> tablesModeMap = job.getConfiguration()
.getValByRegex(HOODIE_CONSUME_MODE_PATTERN_STRING.pattern());
List<String> result = tablesModeMap.entrySet().stream().map(s -> {
if (s.getValue().trim().equals(INCREMENTAL_SCAN_MODE)) {
Matcher matcher = HOODIE_CONSUME_MODE_PATTERN_STRING.matcher(s.getKey());
return (!matcher.find() ? null : matcher.group(1));
}
return null;
}).filter(s -> s != null)
.collect(Collectors.toList());
if (result == null) {
// Returns an empty list instead of null.
result = new ArrayList<>();
}
return result;
}
}

0 comments on commit d09eacd

Please sign in to comment.