Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HIVE-24187: Handle _files creation for HA config with same nameservic… #1515

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 8 additions & 0 deletions common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,14 @@ public static enum ConfVars {
REPLCMINTERVAL("hive.repl.cm.interval","3600s",
new TimeValidator(TimeUnit.SECONDS),
"Inteval for cmroot cleanup thread."),
REPL_HA_DATAPATH_REPLACE_REMOTE_NAMESERVICE("hive.repl.ha.datapath.replace.remote.nameservice", false,
"When HDFS is HA enabled and both source and target clusters are configured with same nameservice names," +
"enable this flag and provide a "),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sentence is incomplete

REPL_HA_DATAPATH_REPLACE_REMOTE_NAMESERVICE_NAME("hive.repl.ha.datapath.replace.remote.nameservice.name", null,
"When HDFS is HA enabled and both source and target clusters are configured with same nameservice names, " +
"enable the flag 'hive.repl.ha.datapath.replace.remote.nameservice' and provide a virtual name " +
" representing nameservice on the remote cluster (should be different from nameservice on local" +
" cluster)"),
REPL_FUNCTIONS_ROOT_DIR("hive.repl.replica.functions.root.dir","/user/${system:user.name}/repl/functions/",
"Root directory on the replica warehouse where the repl sub-system will store jars from the primary warehouse"),
REPL_APPROX_MAX_LOAD_TASKS("hive.repl.approx.max.load.tasks", 10000,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import static org.junit.Assert.assertTrue;

public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcrossInstances {
private static final String NS_REMOTE = "nsRemote";
@BeforeClass
public static void classLevelSetup() throws Exception {
HashMap<String, String> overrides = new HashMap<>();
Expand Down Expand Up @@ -1604,6 +1605,104 @@ public void testRangerReplication() throws Throwable {
.verifyResults(new String[] {"1", "2"});
}

@Test
public void testHdfsNamespaceLazyCopy() throws Throwable {
List<String> clause = getHdfsNamespaceClause();
primary.run("use " + primaryDbName)
.run("create table acid_table (key int, value int) partitioned by (load_date date) " +
"clustered by(key) into 2 buckets stored as orc tblproperties ('transactional'='true')")
.run("create table table1 (i String)")
.run("insert into table1 values (1)")
.run("insert into table1 values (2)")
.dump(primaryDbName, clause);

try{
replica.load(replicatedDbName, primaryDbName, clause);
Assert.fail("Expected the UnknownHostException to be thrown.");
} catch (IllegalArgumentException ex) {
assertTrue(ex.getMessage().contains("java.net.UnknownHostException: nsRemote"));
}
}

@Test
public void testHdfsNamespaceLazyCopyIncr() throws Throwable {
List<String> clause = getHdfsNamespaceClause();
primary.run("use " + primaryDbName)
.run("create table acid_table (key int, value int) partitioned by (load_date date) " +
"clustered by(key) into 2 buckets stored as orc tblproperties ('transactional'='true')")
.run("create table table1 (i String)")
.run("insert into table1 values (1)")
.run("insert into table1 values (2)")
.dump(primaryDbName);

replica.load(replicatedDbName, primaryDbName, clause)
.run("use " + replicatedDbName)
.run("show tables")
.verifyResults(new String[] {"acid_table", "table1"})
.run("select * from table1")
.verifyResults(new String[] {"1", "2"});

primary.run("use " + primaryDbName)
.run("insert into table1 values (3)")
.run("insert into table1 values (4)")
.dump(primaryDbName, clause);
try{
replica.load(replicatedDbName, primaryDbName, clause);
Assert.fail("Expected the UnknownHostException to be thrown.");
} catch (IllegalArgumentException ex) {
assertTrue(ex.getMessage().contains("java.net.UnknownHostException: nsRemote"));
}
}

@Test
public void testHdfsNamespaceWithDataCopy() throws Throwable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nameservice

List<String> clause = getHdfsNamespaceClause();
//NS replacement parameters has no effect when data is also copied to staging
clause.add("'" + HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET + "'='false'");
primary.run("use " + primaryDbName)
.run("create table acid_table (key int, value int) partitioned by (load_date date) " +
"clustered by(key) into 2 buckets stored as orc tblproperties ('transactional'='true')")
.run("create table table1 (i String)")
.run("insert into table1 values (1)")
.run("insert into table1 values (2)")
.dump(primaryDbName, clause);
replica.load(replicatedDbName, primaryDbName, clause)
.run("use " + replicatedDbName)
.run("show tables")
.verifyResults(new String[] {"acid_table", "table1"})
.run("select * from table1")
.verifyResults(new String[] {"1", "2"});

primary.run("use " + primaryDbName)
.run("insert into table1 values (3)")
.run("insert into table1 values (4)")
.dump(primaryDbName, clause);
replica.load(replicatedDbName, primaryDbName, clause)
.run("use " + replicatedDbName)
.run("show tables")
.verifyResults(new String[] {"acid_table", "table1"})
.run("select * from table1")
.verifyResults(new String[] {"1", "2", "3", "4"});
}

@Test
public void testCreateFunctionWithHdfsNamespace() throws Throwable {
Path identityUdfLocalPath = new Path("../../data/files/identity_udf.jar");
Path identityUdf1HdfsPath = new Path(primary.functionsRoot, "idFunc1" + File.separator + "identity_udf1.jar");
setupUDFJarOnHDFS(identityUdfLocalPath, identityUdf1HdfsPath);
List<String> clause = getHdfsNamespaceClause();
primary.run("CREATE FUNCTION " + primaryDbName
+ ".idFunc1 as 'IdentityStringUDF' "
+ "using jar '" + identityUdf1HdfsPath.toString() + "'");
WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName, clause);
try{
replica.load(replicatedDbName, primaryDbName, clause);
Assert.fail("Expected the UnknownHostException to be thrown.");
} catch (IllegalArgumentException ex) {
assertTrue(ex.getMessage().contains("java.net.UnknownHostException: nsRemote"));
}
}

@Test
public void testRangerReplicationRetryExhausted() throws Throwable {
List<String> clause = Arrays.asList("'" + HiveConf.ConfVars.REPL_INCLUDE_AUTHORIZATION_METADATA + "'='true'",
Expand Down Expand Up @@ -1963,4 +2062,12 @@ private void setupUDFJarOnHDFS(Path identityUdfLocalPath, Path identityUdfHdfsPa
FileSystem fs = primary.miniDFSCluster.getFileSystem();
fs.copyFromLocalFile(identityUdfLocalPath, identityUdfHdfsPath);
}

private List<String> getHdfsNamespaceClause() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replace with nameservice

List<String> withClause = new ArrayList<>();
withClause.add("'" + HiveConf.ConfVars.REPL_HA_DATAPATH_REPLACE_REMOTE_NAMESERVICE.varname + "'='true'");
withClause.add("'" + HiveConf.ConfVars.REPL_HA_DATAPATH_REPLACE_REMOTE_NAMESERVICE_NAME.varname + "'='"
+ NS_REMOTE + "'");
return withClause;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public int execute() {
} else {
work.getMetricCollector().reportStageEnd(getName(), Status.FAILED);
}
} catch (SemanticException ex) {
} catch (Exception ex) {
LOG.error("Failed to collect Metrics ", ex);
}
return errorCode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public int execute() {
} else {
work.getMetricCollector().reportStageEnd(getName(), Status.FAILED);
}
} catch (SemanticException ex) {
} catch (Exception ex) {
LOG.error("Failed to collect Metrics ", ex);
}
return errorCode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public int execute() {
} else {
work.getMetricCollector().reportStageEnd(getName(), Status.FAILED);
}
} catch (SemanticException ex) {
} catch (Exception ex) {
LOG.error("Failed to collect Metrics ", ex);
}
return errorCode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public int execute() {
} else {
work.getMetricCollector().reportStageEnd(getName(), Status.FAILED);
}
} catch (SemanticException ex) {
} catch (Exception ex) {
LOG.error("Failed to collect Metrics ", ex);
}
return errorCode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ public int execute() {
} else {
work.getMetricCollector().reportStageEnd(getName(), Status.FAILED);
}
} catch (SemanticException ex) {
} catch (Exception ex) {
LOG.error("Failed to collect Metrics", ex);
}
return errorCode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ public int execute() {
} else {
work.getMetricCollector().reportStageEnd(getName(), Status.FAILED);
}
} catch (SemanticException ex) {
} catch (Exception ex) {
LOG.error("Failed to collect Metrics ", ex);
}
return errorCode;
Expand Down
38 changes: 38 additions & 0 deletions ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.repl.ReplScope;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.ReplChangeManager;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.hadoop.hive.metastore.utils.StringUtils;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.ql.exec.Utilities;
Expand All @@ -49,6 +51,8 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintWriter;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -72,6 +76,40 @@ public static void writeOutput(List<List<String>> listValues, Path outputFile, H
writeOutput(listValues, outputFile, hiveConf, false);
}

/**
* Given a ReplChangeManger's encoded uri, replaces the namespace and returns the modified encoded uri.
*/
public static String replaceNameSpaceInEncodedURI(String cmEncodedURI, HiveConf hiveConf) throws SemanticException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replace name service?

String newNS = hiveConf.get(HiveConf.ConfVars.REPL_HA_DATAPATH_REPLACE_REMOTE_NAMESERVICE_NAME.varname);
if (StringUtils.isEmpty(newNS)) {
throw new SemanticException(ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE
.format("Configuration 'hive.repl.ha.datapath.replace.remote.nameservice.name' is not valid "
+ newNS == null ? "null" : newNS, ReplUtils.REPL_HIVE_SERVICE));
}
String[] decodedURISplits = ReplChangeManager.decodeFileUri(cmEncodedURI);
// replace both data path and repl cm root path and construct new URI. Checksum and subDir will be same as old.
String modifiedURI = ReplChangeManager.encodeFileUri(replaceHost(decodedURISplits[0], newNS), decodedURISplits[1],
replaceHost(decodedURISplits[2], newNS), decodedURISplits[3]);
LOG.debug("Modified encoded uri {}, to {} ", cmEncodedURI, modifiedURI);
return modifiedURI;
}

private static String replaceHost(String originalURIStr, String newHost) throws SemanticException {
if (StringUtils.isEmpty(originalURIStr)) {
return originalURIStr;
}
URI origUri = URI.create(originalURIStr);
try {
return new URI(origUri.getScheme(),
origUri.getUserInfo(), newHost, origUri.getPort(),
origUri.getPath(), origUri.getQuery(),
origUri.getFragment()).toString();
} catch (URISyntaxException ex) {
throw new SemanticException(ex);
}
}


public static void writeOutput(List<List<String>> listValues, Path outputFile, HiveConf hiveConf, boolean update)
throws SemanticException {
Retryable retryable = Retryable.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.parse.EximUtil;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.repl.CopyUtils;
import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
import org.slf4j.Logger;
Expand Down Expand Up @@ -97,11 +98,15 @@ private BufferedWriter writer(Context withinContext, Path dataPath) throws IOExc
}

protected void writeEncodedDumpFiles(Context withinContext, Iterable<String> files, Path dataPath)
throws IOException {
throws IOException, SemanticException {
boolean replaceNSInHACase = withinContext.hiveConf.getBoolVar(
HiveConf.ConfVars.REPL_HA_DATAPATH_REPLACE_REMOTE_NAMESERVICE);
// encoded filename/checksum of files, write into _files
try (BufferedWriter fileListWriter = writer(withinContext, dataPath)) {
for (String file : files) {
fileListWriter.write(file);
String encodedFilePath = replaceNSInHACase ? Utils.replaceNameSpaceInEncodedURI(file, withinContext.hiveConf):
file;
fileListWriter.write(encodedFilePath);
fileListWriter.newLine();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ CommitTxnMessage eventMessage(String stringRepresentation) {

private void writeDumpFiles(Table qlMdTable, Partition ptn, Iterable<String> files, Context withinContext,
Path dataPath)
throws IOException, LoginException, MetaException, HiveFatalException {
throws IOException, LoginException, MetaException, HiveFatalException, SemanticException {
boolean copyAtLoad = withinContext.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET);
if (copyAtLoad) {
// encoded filename/checksum of files, write into _files
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
Expand All @@ -32,13 +33,13 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.ReplChangeManager;
import org.apache.hadoop.hive.metastore.utils.Retry;
import org.apache.hadoop.hive.metastore.utils.StringUtils;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.exec.util.Retryable;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.HiveInputFormat;
Expand All @@ -48,7 +49,6 @@
import org.apache.hadoop.hive.ql.parse.repl.CopyUtils;
import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext;
import org.apache.hadoop.hive.shims.Utils;
import org.apache.hadoop.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -186,7 +186,7 @@ private void validateSrcPathListExists() throws IOException, LoginException {
* The data export here is a list of files either in table/partition that are written to the _files
* in the exportRootDataDir provided.
*/
void exportFilesAsList() throws SemanticException, IOException, LoginException {
void exportFilesAsList() throws SemanticException {
if (dataPathList.isEmpty()) {
return;
}
Expand Down Expand Up @@ -222,7 +222,7 @@ void exportFilesAsList() throws SemanticException, IOException, LoginException {
}

private void writeFilesList(FileStatus[] fileStatuses, BufferedWriter writer, String encodedSubDirs)
throws IOException {
throws IOException, SemanticException {
for (FileStatus fileStatus : fileStatuses) {
if (fileStatus.isDirectory()) {
// Write files inside the sub-directory.
Expand Down Expand Up @@ -252,10 +252,14 @@ private String encodedSubDir(String encodedParentDirs, Path subDir) {
}

private String encodedUri(FileStatus fileStatus, String encodedSubDir)
throws IOException {
throws IOException, SemanticException {
ReplChangeManager replChangeManager = ReplChangeManager.getInstance();
Path currentDataFilePath = fileStatus.getPath();
String checkSum = ReplChangeManager.checksumFor(currentDataFilePath, dataFileSystem);
return replChangeManager.encodeFileUri(currentDataFilePath.toString(), checkSum, encodedSubDir);
String cmEncodedURIL = replChangeManager.encodeFileUri(currentDataFilePath.toString(), checkSum, encodedSubDir);
if (hiveConf.getBoolVar(HiveConf.ConfVars.REPL_HA_DATAPATH_REPLACE_REMOTE_NAMESERVICE)) {
return org.apache.hadoop.hive.ql.parse.repl.dump.Utils.replaceNameSpaceInEncodedURI(cmEncodedURIL, hiveConf);
}
return cmEncodedURIL;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.repl.PathBuilder;
import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;
import org.apache.thrift.protocol.TJSONProtocol;
Expand Down Expand Up @@ -67,6 +68,9 @@ public void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvi
String encodedSrcUri = ReplChangeManager.getInstance(hiveConf)
.encodeFileUri(qualifiedUri.toString(), checkSum, null);
if (copyAtLoad) {
if (hiveConf.getBoolVar(HiveConf.ConfVars.REPL_HA_DATAPATH_REPLACE_REMOTE_NAMESERVICE)) {
encodedSrcUri = Utils.replaceNameSpaceInEncodedURI(encodedSrcUri, hiveConf);
}
resourceUris.add(new ResourceUri(uri.getResourceType(), encodedSrcUri));
} else {
Path newBinaryPath = new Path(functionDataRoot, qualifiedUri.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,20 @@ public String encodeFileUri(String fileUriStr, String fileChecksum, String encod
return encodedUri;
}

public static String encodeFileUri(String fileUriStr, String fileChecksum, String cmroot, String encodedSubDir) {
String encodedUri = fileUriStr;
if ((fileChecksum != null) && (cmroot != null)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

empty check not needed?

encodedUri = encodedUri + URI_FRAGMENT_SEPARATOR + fileChecksum + URI_FRAGMENT_SEPARATOR + cmroot;
} else {
encodedUri = encodedUri + URI_FRAGMENT_SEPARATOR + URI_FRAGMENT_SEPARATOR;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we have 2 URI_FRAGMENT_SEPARATOR

}
encodedUri = encodedUri + URI_FRAGMENT_SEPARATOR + ((encodedSubDir != null) ? encodedSubDir : "");
if (LOG.isDebugEnabled()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this check?

LOG.debug("Encoded URI: " + encodedUri);
}
return encodedUri;
}

/***
* Split uri with fragment into file uri, subdirs, checksum and source cmroot uri.
* Currently using fileuri#checksum#cmrooturi#subdirs as the format.
Expand Down