Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HIVE-24187: Handle _files creation for HA config with same nameservic… #1515

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 8 additions & 0 deletions common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,14 @@ public static enum ConfVars {
REPLCMINTERVAL("hive.repl.cm.interval","3600s",
new TimeValidator(TimeUnit.SECONDS),
"Inteval for cmroot cleanup thread."),
REPL_HA_DATAPATH_REPLACE_REMOTE_NAMESERVICE("hive.repl.ha.datapath.replace.remote.nameservice", false,
"When HDFS is HA enabled and both source and target clusters are configured with same nameservice name," +
"enable this flag and provide a new unique logical name for representing the remote cluster " +
"nameservice using config " + "'hive.repl.ha.datapath.replace.remote.nameservice.name'."),
REPL_HA_DATAPATH_REPLACE_REMOTE_NAMESERVICE_NAME("hive.repl.ha.datapath.replace.remote.nameservice.name", null,
"When HDFS is HA enabled and both source and target clusters are configured with same nameservice name, " +
"use this config to provide a unique logical name for nameservice on the remote cluster (should " +
"be different from nameservice name on the local cluster)"),
REPL_FUNCTIONS_ROOT_DIR("hive.repl.replica.functions.root.dir","/user/${system:user.name}/repl/functions/",
"Root directory on the replica warehouse where the repl sub-system will store jars from the primary warehouse"),
REPL_APPROX_MAX_LOAD_TASKS("hive.repl.approx.max.load.tasks", 10000,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import static org.junit.Assert.assertTrue;

public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcrossInstances {
private static final String NS_REMOTE = "nsRemote";
@BeforeClass
public static void classLevelSetup() throws Exception {
HashMap<String, String> overrides = new HashMap<>();
Expand Down Expand Up @@ -1604,6 +1605,122 @@ public void testRangerReplication() throws Throwable {
.verifyResults(new String[] {"1", "2"});
}

@Test
public void testHdfsNamespaceLazyCopy() throws Throwable {
List<String> clause = getHdfsNameserviceClause();
clause.add("'" + HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY_FOR_EXTERNAL_TABLE.varname + "'='true'");
primary.run("use " + primaryDbName)
.run("create table acid_table (key int, value int) partitioned by (load_date date) " +
"clustered by(key) into 2 buckets stored as orc tblproperties ('transactional'='true')")
.run("create table table1 (i int)")
.run("insert into table1 values (1)")
.run("insert into table1 values (2)")
.run("create external table ext_table1 (id int)")
.run("insert into ext_table1 values (3)")
.run("insert into ext_table1 values (4)")
.dump(primaryDbName, clause);

try{
replica.load(replicatedDbName, primaryDbName, clause);
Assert.fail("Expected the UnknownHostException to be thrown.");
} catch (IllegalArgumentException ex) {
assertTrue(ex.getMessage().contains("java.net.UnknownHostException: nsRemote"));
}
}

@Test
public void testHdfsNamespaceLazyCopyIncr() throws Throwable {
ArrayList clause = new ArrayList();
clause.add("'" + HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY_FOR_EXTERNAL_TABLE.varname + "'='true'");
primary.run("use " + primaryDbName)
.run("create table acid_table (key int, value int) partitioned by (load_date date) " +
"clustered by(key) into 2 buckets stored as orc tblproperties ('transactional'='true')")
.run("create table table1 (i String)")
.run("insert into table1 values (1)")
.run("insert into table1 values (2)")
.run("create external table ext_table1 (id int)")
.run("insert into ext_table1 values (3)")
.run("insert into ext_table1 values (4)")
.dump(primaryDbName);

replica.load(replicatedDbName, primaryDbName, clause)
.run("use " + replicatedDbName)
.run("show tables")
.verifyResults(new String[] {"acid_table", "table1", "ext_table1"})
.run("select * from table1")
.verifyResults(new String[] {"1", "2"})
.run("select * from ext_table1")
.verifyResults(new String[] {"3", "4"});

clause.addAll(getHdfsNameserviceClause());
primary.run("use " + primaryDbName)
.run("insert into table1 values (5)")
.run("insert into ext_table1 values (6)")
.dump(primaryDbName, clause);
try{
replica.load(replicatedDbName, primaryDbName, clause);
Assert.fail("Expected the UnknownHostException to be thrown.");
} catch (IllegalArgumentException ex) {
assertTrue(ex.getMessage().contains("java.net.UnknownHostException: nsRemote"));
}
}

@Test
public void testHdfsNamespaceWithDataCopy() throws Throwable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nameservice

List<String> clause = getHdfsNameserviceClause();
//NS replacement parameters has no effect when data is also copied to staging
clause.add("'" + HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET + "'='false'");
primary.run("use " + primaryDbName)
.run("create table acid_table (key int, value int) partitioned by (load_date date) " +
"clustered by(key) into 2 buckets stored as orc tblproperties ('transactional'='true')")
.run("create table table1 (i String)")
.run("insert into table1 values (1)")
.run("insert into table1 values (2)")
.run("create external table ext_table1 (id int)")
.run("insert into ext_table1 values (3)")
.run("insert into ext_table1 values (4)")
.dump(primaryDbName, clause);
replica.load(replicatedDbName, primaryDbName, clause)
.run("use " + replicatedDbName)
.run("show tables")
.verifyResults(new String[] {"acid_table", "table1", "ext_table1"})
.run("select * from table1")
.verifyResults(new String[] {"1", "2"})
.run("select * from ext_table1")
.verifyResults(new String[] {"3", "4"});

primary.run("use " + primaryDbName)
.run("insert into table1 values (5)")
.run("insert into ext_table1 values (6)")
.dump(primaryDbName, clause);
replica.load(replicatedDbName, primaryDbName, clause)
.run("use " + replicatedDbName)
.run("show tables")
.verifyResults(new String[] {"acid_table", "table1", "ext_table1"})
.run("select * from table1")
.verifyResults(new String[] {"1", "2", "5"})
.run("select * from ext_table1")
.verifyResults(new String[] {"3", "4", "6"});
}

@Test
public void testCreateFunctionWithHdfsNamespace() throws Throwable {
Path identityUdfLocalPath = new Path("../../data/files/identity_udf.jar");
Path identityUdf1HdfsPath = new Path(primary.functionsRoot, "idFunc1" + File.separator + "identity_udf1.jar");
setupUDFJarOnHDFS(identityUdfLocalPath, identityUdf1HdfsPath);
List<String> clause = getHdfsNameserviceClause();
primary.run("CREATE FUNCTION " + primaryDbName
+ ".idFunc1 as 'IdentityStringUDF' "
+ "using jar '" + identityUdf1HdfsPath.toString() + "'");
WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName, clause);
try{
replica.load(replicatedDbName, primaryDbName, clause);
Assert.fail("Expected the UnknownHostException to be thrown.");
} catch (IllegalArgumentException ex) {
assertTrue(ex.getMessage().contains("java.net.UnknownHostException: nsRemote"));
}
}

@Test
public void testRangerReplicationRetryExhausted() throws Throwable {
List<String> clause = Arrays.asList("'" + HiveConf.ConfVars.REPL_INCLUDE_AUTHORIZATION_METADATA + "'='true'",
Expand Down Expand Up @@ -1963,4 +2080,12 @@ private void setupUDFJarOnHDFS(Path identityUdfLocalPath, Path identityUdfHdfsPa
FileSystem fs = primary.miniDFSCluster.getFileSystem();
fs.copyFromLocalFile(identityUdfLocalPath, identityUdfHdfsPath);
}

private List<String> getHdfsNameserviceClause() {
List<String> withClause = new ArrayList<>();
withClause.add("'" + HiveConf.ConfVars.REPL_HA_DATAPATH_REPLACE_REMOTE_NAMESERVICE.varname + "'='true'");
withClause.add("'" + HiveConf.ConfVars.REPL_HA_DATAPATH_REPLACE_REMOTE_NAMESERVICE_NAME.varname + "'='"
+ NS_REMOTE + "'");
return withClause;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public int execute() {
} else {
work.getMetricCollector().reportStageEnd(getName(), Status.FAILED);
}
} catch (SemanticException ex) {
} catch (Exception ex) {
LOG.error("Failed to collect Metrics ", ex);
}
return errorCode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public int execute() {
} else {
work.getMetricCollector().reportStageEnd(getName(), Status.FAILED);
}
} catch (SemanticException ex) {
} catch (Exception ex) {
LOG.error("Failed to collect Metrics ", ex);
}
return errorCode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public int execute() {
} else {
work.getMetricCollector().reportStageEnd(getName(), Status.FAILED);
}
} catch (SemanticException ex) {
} catch (Exception ex) {
LOG.error("Failed to collect Metrics ", ex);
}
return errorCode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public int execute() {
} else {
work.getMetricCollector().reportStageEnd(getName(), Status.FAILED);
}
} catch (SemanticException ex) {
} catch (Exception ex) {
LOG.error("Failed to collect Metrics ", ex);
}
return errorCode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ public int execute() {
} else {
work.getMetricCollector().reportStageEnd(getName(), Status.FAILED);
}
} catch (SemanticException ex) {
} catch (Exception ex) {
LOG.error("Failed to collect Metrics", ex);
}
return errorCode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ public int execute() {
} else {
work.getMetricCollector().reportStageEnd(getName(), Status.FAILED);
}
} catch (SemanticException ex) {
} catch (Exception ex) {
LOG.error("Failed to collect Metrics ", ex);
}
return errorCode;
Expand Down
38 changes: 38 additions & 0 deletions ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.repl.ReplScope;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.ReplChangeManager;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.hadoop.hive.metastore.utils.StringUtils;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.ql.exec.Utilities;
Expand All @@ -49,6 +51,8 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintWriter;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -72,6 +76,40 @@ public static void writeOutput(List<List<String>> listValues, Path outputFile, H
writeOutput(listValues, outputFile, hiveConf, false);
}

/**
* Given a ReplChangeManger's encoded uri, it replaces the nameservice and returns the modified encoded uri.
*/
public static String replaceNameserviceInEncodedURI(String cmEncodedURI, HiveConf hiveConf) throws SemanticException {
String newNS = hiveConf.get(HiveConf.ConfVars.REPL_HA_DATAPATH_REPLACE_REMOTE_NAMESERVICE_NAME.varname);
if (StringUtils.isEmpty(newNS)) {
throw new SemanticException(ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE
.format("Configuration 'hive.repl.ha.datapath.replace.remote.nameservice.name' is not valid "
+ newNS == null ? "null" : newNS, ReplUtils.REPL_HIVE_SERVICE));
}
String[] decodedURISplits = ReplChangeManager.decodeFileUri(cmEncodedURI);
// replace both data path and repl cm root path and construct new URI. Checksum and subDir will be same as old.
String modifiedURI = ReplChangeManager.encodeFileUri(replaceHost(decodedURISplits[0], newNS), decodedURISplits[1],
replaceHost(decodedURISplits[2], newNS), decodedURISplits[3]);
LOG.debug("Modified encoded uri {}, to {} ", cmEncodedURI, modifiedURI);
return modifiedURI;
}

private static String replaceHost(String originalURIStr, String newHost) throws SemanticException {
if (StringUtils.isEmpty(originalURIStr)) {
return originalURIStr;
}
URI origUri = URI.create(originalURIStr);
try {
return new URI(origUri.getScheme(),
origUri.getUserInfo(), newHost, origUri.getPort(),
origUri.getPath(), origUri.getQuery(),
origUri.getFragment()).toString();
} catch (URISyntaxException ex) {
throw new SemanticException(ex);
}
}


public static void writeOutput(List<List<String>> listValues, Path outputFile, HiveConf hiveConf, boolean update)
throws SemanticException {
Retryable retryable = Retryable.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.parse.EximUtil;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.repl.CopyUtils;
import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
import org.slf4j.Logger;
Expand Down Expand Up @@ -97,11 +98,15 @@ private BufferedWriter writer(Context withinContext, Path dataPath) throws IOExc
}

protected void writeEncodedDumpFiles(Context withinContext, Iterable<String> files, Path dataPath)
throws IOException {
throws IOException, SemanticException {
boolean replaceNSInHACase = withinContext.hiveConf.getBoolVar(
HiveConf.ConfVars.REPL_HA_DATAPATH_REPLACE_REMOTE_NAMESERVICE);
// encoded filename/checksum of files, write into _files
try (BufferedWriter fileListWriter = writer(withinContext, dataPath)) {
for (String file : files) {
fileListWriter.write(file);
String encodedFilePath = replaceNSInHACase ? Utils.replaceNameserviceInEncodedURI(file, withinContext.hiveConf):
file;
fileListWriter.write(encodedFilePath);
fileListWriter.newLine();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ CommitTxnMessage eventMessage(String stringRepresentation) {

private void writeDumpFiles(Table qlMdTable, Partition ptn, Iterable<String> files, Context withinContext,
Path dataPath)
throws IOException, LoginException, MetaException, HiveFatalException {
throws IOException, LoginException, MetaException, HiveFatalException, SemanticException {
boolean copyAtLoad = withinContext.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET);
if (copyAtLoad) {
// encoded filename/checksum of files, write into _files
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.hadoop.hive.ql.parse.repl.dump.io;

import java.io.BufferedWriter;
import java.io.DataOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStreamWriter;
Expand All @@ -32,12 +31,9 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.ReplChangeManager;
import org.apache.hadoop.hive.metastore.utils.Retry;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.util.Retryable;
import org.apache.hadoop.hive.ql.io.AcidUtils;
Expand All @@ -48,7 +44,6 @@
import org.apache.hadoop.hive.ql.parse.repl.CopyUtils;
import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext;
import org.apache.hadoop.hive.shims.Utils;
import org.apache.hadoop.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -186,7 +181,7 @@ private void validateSrcPathListExists() throws IOException, LoginException {
* The data export here is a list of files either in table/partition that are written to the _files
* in the exportRootDataDir provided.
*/
void exportFilesAsList() throws SemanticException, IOException, LoginException {
void exportFilesAsList() throws SemanticException {
if (dataPathList.isEmpty()) {
return;
}
Expand Down Expand Up @@ -222,7 +217,7 @@ void exportFilesAsList() throws SemanticException, IOException, LoginException {
}

private void writeFilesList(FileStatus[] fileStatuses, BufferedWriter writer, String encodedSubDirs)
throws IOException {
throws IOException, SemanticException {
for (FileStatus fileStatus : fileStatuses) {
if (fileStatus.isDirectory()) {
// Write files inside the sub-directory.
Expand Down Expand Up @@ -252,10 +247,14 @@ private String encodedSubDir(String encodedParentDirs, Path subDir) {
}

private String encodedUri(FileStatus fileStatus, String encodedSubDir)
throws IOException {
throws IOException, SemanticException {
ReplChangeManager replChangeManager = ReplChangeManager.getInstance();
Path currentDataFilePath = fileStatus.getPath();
String checkSum = ReplChangeManager.checksumFor(currentDataFilePath, dataFileSystem);
return replChangeManager.encodeFileUri(currentDataFilePath.toString(), checkSum, encodedSubDir);
String cmEncodedURIL = replChangeManager.encodeFileUri(currentDataFilePath.toString(), checkSum, encodedSubDir);
if (hiveConf.getBoolVar(HiveConf.ConfVars.REPL_HA_DATAPATH_REPLACE_REMOTE_NAMESERVICE)) {
return org.apache.hadoop.hive.ql.parse.repl.dump.Utils.replaceNameserviceInEncodedURI(cmEncodedURIL, hiveConf);
}
return cmEncodedURIL;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.repl.PathBuilder;
import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;
import org.apache.thrift.protocol.TJSONProtocol;
Expand Down Expand Up @@ -67,6 +68,9 @@ public void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvi
String encodedSrcUri = ReplChangeManager.getInstance(hiveConf)
.encodeFileUri(qualifiedUri.toString(), checkSum, null);
if (copyAtLoad) {
if (hiveConf.getBoolVar(HiveConf.ConfVars.REPL_HA_DATAPATH_REPLACE_REMOTE_NAMESERVICE)) {
encodedSrcUri = Utils.replaceNameserviceInEncodedURI(encodedSrcUri, hiveConf);
}
resourceUris.add(new ResourceUri(uri.getResourceType(), encodedSrcUri));
} else {
Path newBinaryPath = new Path(functionDataRoot, qualifiedUri.getName());
Expand Down