Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@
import org.apache.hudi.common.model.BootstrapFileMapping;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.ReflectionUtils;

import java.io.Serializable;
import java.util.List;
import java.util.Map;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.ReflectionUtils;

/**
* Bootstrap Index Interface.
Expand Down Expand Up @@ -161,6 +161,6 @@ public abstract void appendNextPartition(String partitionPath,

public static BootstrapIndex getBootstrapIndex(HoodieTableMetaClient metaClient) {
return ((BootstrapIndex)(ReflectionUtils.loadClass(
metaClient.getTableConfig().getBootstrapIndexClass(), metaClient)));
metaClient.getTableConfig().getBootstrapIndexClass(), new Class[]{HoodieTableMetaClient.class}, metaClient)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ public class HoodieTableMetaClient implements Serializable {
private ConsistencyGuardConfig consistencyGuardConfig = ConsistencyGuardConfig.newBuilder().build();

private HoodieTableMetaClient(Configuration conf, String basePath, boolean loadActiveTimelineOnLoad,
ConsistencyGuardConfig consistencyGuardConfig, Option<TimelineLayoutVersion> layoutVersion,
String payloadClassName) {
ConsistencyGuardConfig consistencyGuardConfig, Option<TimelineLayoutVersion> layoutVersion,
String payloadClassName) {
LOG.info("Loading HoodieTableMetaClient from " + basePath);
this.consistencyGuardConfig = consistencyGuardConfig;
this.hadoopConf = new SerializableConfiguration(conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public static Object loadClass(String clazz, Class<?>[] constructorArgTypes, Obj
try {
return getClass(clazz).getConstructor(constructorArgTypes).newInstance(constructorArgs);
} catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
throw new HoodieException("Unable to instantiate class ", e);
throw new HoodieException("Unable to instantiate class " + clazz, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@
* InputPathHandler takes in a set of input paths and incremental tables list. Then, classifies the
* input paths to incremental, snapshot paths and non-hoodie paths. This is then accessed later to
* mutate the JobConf before processing incremental mode queries and snapshot queries.
*
* Note: We are adding jobConf of a mapreduce or spark job. The properties in the jobConf are two
* type: session properties and table properties from metastore. While session property is common
* for all the tables in a query the table properties are unique per table so there is no need to
* check if it belongs to the table for which the path handler is now instantiated. The jobConf has
* all table properties such as name, last modification time and so on which are unique to a table.
* This class is written in such a way that it can handle multiple tables and properties unique to
* a table but for table level property such check is not required.
*/
public class InputPathHandler {

Expand All @@ -63,7 +71,6 @@ public InputPathHandler(Configuration conf, Path[] inputPaths, List<String> incr
/**
* Takes in the original InputPaths and classifies each of them into incremental, snapshot and
* non-hoodie InputPaths. The logic is as follows:
*
* 1. Check if an inputPath starts with the same basepath as any of the metadata basepaths we know
* 1a. If yes, this belongs to a Hoodie table that we already know about. Simply classify this
* as incremental or snapshot - We can get the table name of this inputPath from the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@

package org.apache.hudi.hadoop.utils;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.exception.HoodieIOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

Expand Down Expand Up @@ -73,6 +74,7 @@ public class HoodieHiveUtils {
public static final int MAX_COMMIT_ALL = -1;
public static final int DEFAULT_LEVELS_TO_BASEPATH = 3;
public static final Pattern HOODIE_CONSUME_MODE_PATTERN_STRING = Pattern.compile("hoodie\\.(.*)\\.consume\\.mode");
public static final String GLOBALLY_CONSISTENT_READ_TIMESTAMP = "last_replication_timestamp";

public static boolean stopAtCompaction(JobContext job, String tableName) {
String compactionPropName = String.format(HOODIE_STOP_AT_COMPACTION_PATTERN, tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,7 @@ public static List<FileStatus> filterFileStatusForSnapshotMode(JobConf job, Map<
}

HoodieTimeline timeline = HoodieHiveUtils.getTableTimeline(metaClient.getTableConfig().getTableName(), job, metaClient);

HoodieTableFileSystemView fsView = fsViewCache.computeIfAbsent(metaClient, tableMetaClient ->
FileSystemViewManager.createInMemoryFileSystemViewWithTimeline(engineContext, tableMetaClient, buildMetadataConfig(job), timeline));
List<HoodieBaseFile> filteredBaseFiles = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.hadoop;

import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.testutils.InputFormatTestUtil;
import org.apache.hudi.hadoop.utils.HoodieHiveUtils;

import org.apache.hadoop.fs.FileStatus;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class TestGloballyConsistentTimeStampFilteringInputFormat
extends TestHoodieParquetInputFormat {

@BeforeEach
public void setUp() {
super.setUp();
}

@Test
public void testInputFormatLoad() throws IOException {
super.testInputFormatLoad();

// set filtering timestamp to 0 now the timeline wont have any commits.
InputFormatTestUtil.setupSnapshotMaxCommitTimeQueryMode(jobConf, "0");

Assertions.assertThrows(HoodieIOException.class, () -> inputFormat.getSplits(jobConf, 10));
Assertions.assertThrows(HoodieIOException.class, () -> inputFormat.listStatus(jobConf));
}

@Test
public void testInputFormatUpdates() throws IOException {
super.testInputFormatUpdates();

// set the globally replicated timestamp to 199 so only 100 is read and update is ignored.
InputFormatTestUtil.setupSnapshotMaxCommitTimeQueryMode(jobConf, "100");

FileStatus[] files = inputFormat.listStatus(jobConf);
assertEquals(10, files.length);

ensureFilesInCommit("5 files have been updated to commit 200. but should get filtered out ",
files,"200", 0);
ensureFilesInCommit("We should see 10 files from commit 100 ", files, "100", 10);
}

@Override
public void testIncrementalSimple() throws IOException {
// setting filtering timestamp to zero should not in any way alter the result of the test which
// pulls in zero files due to incremental ts being the actual commit time
jobConf.set(HoodieHiveUtils.GLOBALLY_CONSISTENT_READ_TIMESTAMP, "0");
super.testIncrementalSimple();
}

@Override
public void testIncrementalWithMultipleCommits() throws IOException {
super.testIncrementalWithMultipleCommits();

// set globally replicated timestamp to 400 so commits from 500, 600 does not show up
InputFormatTestUtil.setupSnapshotMaxCommitTimeQueryMode(jobConf, "400");
InputFormatTestUtil.setupIncremental(jobConf, "100", HoodieHiveUtils.MAX_COMMIT_ALL);

FileStatus[] files = inputFormat.listStatus(jobConf);

assertEquals(
5, files.length,"Pulling ALL commits from 100, should get us the 3 files from 400 commit, 1 file from 300 "
+ "commit and 1 file from 200 commit");
ensureFilesInCommit("Pulling 3 commits from 100, should get us the 3 files from 400 commit",
files, "400", 3);
ensureFilesInCommit("Pulling 3 commits from 100, should get us the 1 files from 300 commit",
files, "300", 1);
ensureFilesInCommit("Pulling 3 commits from 100, should get us the 1 files from 200 commit",
files, "200", 1);

List<String> commits = Arrays.asList("100", "200", "300", "400", "500", "600");
for (int idx = 0; idx < commits.size(); ++idx) {
for (int jdx = 0; jdx < commits.size(); ++jdx) {
InputFormatTestUtil.setupIncremental(jobConf, commits.get(idx), HoodieHiveUtils.MAX_COMMIT_ALL);
InputFormatTestUtil.setupSnapshotMaxCommitTimeQueryMode(jobConf, commits.get(jdx));

files = inputFormat.listStatus(jobConf);

if (jdx <= idx) {
assertEquals(0, files.length,"all commits should be filtered");
} else {
// only commits upto the timestamp is allowed
for (FileStatus file : files) {
String commitTs = FSUtils.getCommitTime(file.getPath().getName());
assertTrue(commits.indexOf(commitTs) <= jdx);
assertTrue(commits.indexOf(commitTs) > idx);
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@

public class TestHoodieParquetInputFormat {

private HoodieParquetInputFormat inputFormat;
private JobConf jobConf;
protected HoodieParquetInputFormat inputFormat;
protected JobConf jobConf;
private final HoodieFileFormat baseFileFormat = HoodieFileFormat.PARQUET;
private final String baseFileExtension = baseFileFormat.getFileExtension();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.testutils.minicluster.HdfsTestService;
import org.apache.hudi.hadoop.utils.HoodieHiveUtils;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.jupiter.api.AfterAll;
Expand Down Expand Up @@ -169,6 +171,21 @@ public void testInputPathHandler() throws IOException {
assertTrue(actualComparesToExpected(actualPaths, nonHoodiePaths));
}

@Test
public void testInputPathHandlerWithGloballyReplicatedTimeStamp() throws IOException {
JobConf jobConf = new JobConf();
jobConf.set(HoodieHiveUtils.GLOBALLY_CONSISTENT_READ_TIMESTAMP, "1");
inputPathHandler = new InputPathHandler(dfs.getConf(), inputPaths.toArray(
new Path[inputPaths.size()]), incrementalTables);
List<Path> actualPaths = inputPathHandler.getGroupedIncrementalPaths().values().stream()
.flatMap(List::stream).collect(Collectors.toList());
assertTrue(actualComparesToExpected(actualPaths, incrementalPaths));
actualPaths = inputPathHandler.getSnapshotPaths();
assertTrue(actualComparesToExpected(actualPaths, snapshotPaths));
actualPaths = inputPathHandler.getNonHoodieInputPaths();
assertTrue(actualComparesToExpected(actualPaths, nonHoodiePaths));
}

private boolean actualComparesToExpected(List<Path> actualPaths, List<Path> expectedPaths) {
if (actualPaths.size() != expectedPaths.size()) {
return false;
Expand Down
10 changes: 10 additions & 0 deletions hudi-sync/hudi-hive-sync/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@

<properties>
<main.basedir>${project.parent.basedir}</main.basedir>

<jetty.version>7.6.0.v20120127</jetty.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -148,6 +150,14 @@
<scope>test</scope>
</dependency>

<!-- Needed for running HiveServer for Tests -->
<dependency>
<groupId>org.eclipse.jetty.aggregate</groupId>
<artifactId>jetty-all</artifactId>
<scope>test</scope>
<version>${jetty.version}</version>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
Expand Down
69 changes: 69 additions & 0 deletions hudi-sync/hudi-hive-sync/run_hive_global_commit_tool.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
#!/usr/bin/env bash

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# A tool to sync the hudi table to hive from different clusters. Similar to HiveSyncTool but syncs it to more
# than one hive cluster ( currently a local and remote cluster). The common timestamp that was synced is stored as a new table property
# This is most useful when we want to ensure that across different hive clusters we want ensure consistent reads. If that is not a requirement
# then it is better to run HiveSyncTool separately.
# Note:
# The tool tries to be transactional but does not guarantee it. If the sync fails midway in one cluster it will try to roll back the committed
# timestamp from already successful sync on other clusters but that can also fail.
# The tool does not roll back any synced partitions but only the timestamp.

function error_exit {
echo "$1" >&2 ## Send message to stderr. Exclude >&2 if you don't want it that way.
exit "${2:-1}" ## Return a code specified by $2 or 1 by default.
}

if [ -z "${HADOOP_HOME}" ]; then
error_exit "Please make sure the environment variable HADOOP_HOME is setup"
fi

if [ -z "${HIVE_HOME}" ]; then
error_exit "Please make sure the environment variable HIVE_HOME is setup"
fi

DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
#Ensure we pick the right jar even for hive11 builds
HUDI_HIVE_UBER_JAR=`ls -c $DIR/../packaging/hudi-hive-bundle/target/hudi-hive-*.jar | grep -v source | head -1`

if [ -z "$HADOOP_CONF_DIR" ]; then
echo "setting hadoop conf dir"
HADOOP_CONF_DIR="${HADOOP_HOME}/etc/hadoop"
fi

## Include only specific packages from HIVE_HOME/lib to avoid version mismatches
HIVE_EXEC=`ls ${HIVE_HOME}/lib/hive-exec-*.jar | tr '\n' ':'`
HIVE_SERVICE=`ls ${HIVE_HOME}/lib/hive-service-*.jar | grep -v rpc | tr '\n' ':'`
HIVE_METASTORE=`ls ${HIVE_HOME}/lib/hive-metastore-*.jar | tr '\n' ':'`
HIVE_JDBC=`ls ${HIVE_HOME}/lib/hive-jdbc-*.jar | tr '\n' ':'`
if [ -z "${HIVE_JDBC}" ]; then
HIVE_JDBC=`ls ${HIVE_HOME}/lib/hive-jdbc-*.jar | grep -v handler | tr '\n' ':'`
fi
HIVE_JACKSON=`ls ${HIVE_HOME}/lib/jackson-*.jar | tr '\n' ':'`
HIVE_NUCLEUS=`ls ${HIVE_HOME}/lib/datanucleus*.jar | tr '\n' ':'`
HIVE_JARS=$HIVE_METASTORE:$HIVE_SERVICE:$HIVE_EXEC:$HIVE_JDBC:$HIVE_JACKSON:$HIVE_NUCLEUS

HADOOP_HIVE_JARS=${HIVE_JARS}:${HADOOP_HOME}/share/hadoop/common/*:${HADOOP_HOME}/share/hadoop/mapreduce/*:${HADOOP_HOME}/share/hadoop/hdfs/*:${HADOOP_HOME}/share/hadoop/common/lib/*:${HADOOP_HOME}/share/hadoop/hdfs/lib/*

if ! [ -z "$HIVE_CONF_DIR" ]; then
error_exit "Don't set HIVE_CONF_DIR; use config xml file"
fi

echo "Running Command : java -cp $HUDI_HIVE_UBER_JAR:${HADOOP_HIVE_JARS}:${HADOOP_CONF_DIR}:${HIVE_HOME}lib/* org.apache.hudi.hive.replication.HiveSyncGlobalCommitTool $@"
java -cp $HUDI_HIVE_UBER_JAR:${HADOOP_HIVE_JARS}:${HADOOP_CONF_DIR}:${HIVE_HOME}lib/* org.apache.hudi.hive.replication.HiveSyncGlobalCommitTool "$@"
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ public class HiveSyncConfig implements Serializable {
@Parameter(names = {"--decode-partition"}, description = "Decode the partition value if the partition has encoded during writing")
public Boolean decodePartition = false;

// enhance the similar function in child class
public static HiveSyncConfig copy(HiveSyncConfig cfg) {
HiveSyncConfig newConfig = new HiveSyncConfig();
newConfig.basePath = cfg.basePath;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ public class HiveSyncTool extends AbstractSyncTool {
public static final String SUFFIX_SNAPSHOT_TABLE = "_rt";
public static final String SUFFIX_READ_OPTIMIZED_TABLE = "_ro";

private final HiveSyncConfig cfg;
private HoodieHiveClient hoodieHiveClient = null;
private String snapshotTableName = null;
private Option<String> roTableName = null;
protected final HiveSyncConfig cfg;
protected HoodieHiveClient hoodieHiveClient = null;
protected String snapshotTableName = null;
protected Option<String> roTableName = null;

public HiveSyncTool(HiveSyncConfig cfg, HiveConf configuration, FileSystem fs) {
super(configuration.getAllProperties(), fs);
Expand Down Expand Up @@ -127,8 +127,8 @@ public void syncHoodieTable() {
}
}
}

private void syncHoodieTable(String tableName, boolean useRealtimeInputFormat,
protected void syncHoodieTable(String tableName, boolean useRealtimeInputFormat,
boolean readAsOptimized) {
LOG.info("Trying to sync hoodie table " + tableName + " with base path " + hoodieHiveClient.getBasePath()
+ " of type " + hoodieHiveClient.getTableType());
Expand Down
Loading