Skip to content

Commit

Permalink
HIVE-16866 : existing available UDF is used in TestReplicationScenari…
Browse files Browse the repository at this point in the history
…osAcrossInstances#testDropFunctionIncrementalReplication
  • Loading branch information
anishek committed Jun 9, 2017
1 parent 46e5ea9 commit e42d159
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ public String next() {
FileStatus file = files[i];
i++;
return ReplChangeManager.encodeFileUri(file.getPath().toString(),
ReplChangeManager.getChksumString(file.getPath(), fs));
ReplChangeManager.checksumFor(file.getPath(), fs));
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand Down Expand Up @@ -156,15 +156,15 @@ public void testRecyclePartTable() throws Exception {

Path part1Path = new Path(warehouse.getDefaultPartitionPath(db, tblName, ImmutableMap.of("dt", "20160101")), "part");
createFile(part1Path, "p1");
String path1Chksum = ReplChangeManager.getChksumString(part1Path, fs);
String path1Chksum = ReplChangeManager.checksumFor(part1Path, fs);

Path part2Path = new Path(warehouse.getDefaultPartitionPath(db, tblName, ImmutableMap.of("dt", "20160102")), "part");
createFile(part2Path, "p2");
String path2Chksum = ReplChangeManager.getChksumString(part2Path, fs);
String path2Chksum = ReplChangeManager.checksumFor(part2Path, fs);

Path part3Path = new Path(warehouse.getDefaultPartitionPath(db, tblName, ImmutableMap.of("dt", "20160103")), "part");
createFile(part3Path, "p3");
String path3Chksum = ReplChangeManager.getChksumString(part3Path, fs);
String path3Chksum = ReplChangeManager.checksumFor(part3Path, fs);

assertTrue(part1Path.getFileSystem(hiveConf).exists(part1Path));
assertTrue(part2Path.getFileSystem(hiveConf).exists(part2Path));
Expand Down Expand Up @@ -226,15 +226,15 @@ public void testRecycleNonPartTable() throws Exception {

Path filePath1 = new Path(warehouse.getDefaultTablePath(db, tblName), "part1");
createFile(filePath1, "f1");
String fileChksum1 = ReplChangeManager.getChksumString(filePath1, fs);
String fileChksum1 = ReplChangeManager.checksumFor(filePath1, fs);

Path filePath2 = new Path(warehouse.getDefaultTablePath(db, tblName), "part2");
createFile(filePath2, "f2");
String fileChksum2 = ReplChangeManager.getChksumString(filePath2, fs);
String fileChksum2 = ReplChangeManager.checksumFor(filePath2, fs);

Path filePath3 = new Path(warehouse.getDefaultTablePath(db, tblName), "part3");
createFile(filePath3, "f3");
String fileChksum3 = ReplChangeManager.getChksumString(filePath3, fs);
String fileChksum3 = ReplChangeManager.checksumFor(filePath3, fs);

assertTrue(filePath1.getFileSystem(hiveConf).exists(filePath1));
assertTrue(filePath2.getFileSystem(hiveConf).exists(filePath2));
Expand Down Expand Up @@ -272,26 +272,26 @@ public void testClearer() throws Exception {
fs.mkdirs(dirTbl1);
Path part11 = new Path(dirTbl1, "part1");
createFile(part11, "testClearer11");
String fileChksum11 = ReplChangeManager.getChksumString(part11, fs);
String fileChksum11 = ReplChangeManager.checksumFor(part11, fs);
Path part12 = new Path(dirTbl1, "part2");
createFile(part12, "testClearer12");
String fileChksum12 = ReplChangeManager.getChksumString(part12, fs);
String fileChksum12 = ReplChangeManager.checksumFor(part12, fs);
Path dirTbl2 = new Path(dirDb, "tbl2");
fs.mkdirs(dirTbl2);
Path part21 = new Path(dirTbl2, "part1");
createFile(part21, "testClearer21");
String fileChksum21 = ReplChangeManager.getChksumString(part21, fs);
String fileChksum21 = ReplChangeManager.checksumFor(part21, fs);
Path part22 = new Path(dirTbl2, "part2");
createFile(part22, "testClearer22");
String fileChksum22 = ReplChangeManager.getChksumString(part22, fs);
String fileChksum22 = ReplChangeManager.checksumFor(part22, fs);
Path dirTbl3 = new Path(dirDb, "tbl3");
fs.mkdirs(dirTbl3);
Path part31 = new Path(dirTbl3, "part1");
createFile(part31, "testClearer31");
String fileChksum31 = ReplChangeManager.getChksumString(part31, fs);
String fileChksum31 = ReplChangeManager.checksumFor(part31, fs);
Path part32 = new Path(dirTbl3, "part2");
createFile(part32, "testClearer32");
String fileChksum32 = ReplChangeManager.getChksumString(part32, fs);
String fileChksum32 = ReplChangeManager.checksumFor(part32, fs);

ReplChangeManager.getInstance(hiveConf).recycle(dirTbl1, false);
ReplChangeManager.getInstance(hiveConf).recycle(dirTbl2, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ public void testCreateFunctionIncrementalReplication() throws Throwable {
@Test
public void testDropFunctionIncrementalReplication() throws Throwable {
primary.run("CREATE FUNCTION " + primaryDbName
+ ".testFunction as 'com.yahoo.sketches.hive.theta.DataToSketchUDAF' "
+ "using jar 'ivy://com.yahoo.datasketches:sketches-hive:0.8.2'");
+ ".testFunction as 'hivemall.tools.string.StopwordUDF' "
+ "using jar 'ivy://io.github.myui:hivemall:0.4.0-2'");
WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName, null);
replica.load(replicatedDbName, bootStrapDump.dumpLocation)
.run("REPL STATUS " + replicatedDbName)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand Down Expand Up @@ -54,9 +54,9 @@ public class ReplChangeManager {
private String msGroup;
private FileSystem fs;

public static final String ORIG_LOC_TAG = "user.original-loc";
public static final String REMAIN_IN_TRASH_TAG = "user.remain-in-trash";
public static final String URI_FRAGMENT_SEPARATOR = "#";
private static final String ORIG_LOC_TAG = "user.original-loc";
static final String REMAIN_IN_TRASH_TAG = "user.remain-in-trash";
private static final String URI_FRAGMENT_SEPARATOR = "#";

public static ReplChangeManager getInstance(HiveConf hiveConf) throws MetaException {
if (instance == null) {
Expand All @@ -65,7 +65,7 @@ public static ReplChangeManager getInstance(HiveConf hiveConf) throws MetaExcept
return instance;
}

ReplChangeManager(HiveConf hiveConf) throws MetaException {
private ReplChangeManager(HiveConf hiveConf) throws MetaException {
try {
if (!inited) {
if (hiveConf.getBoolVar(HiveConf.ConfVars.REPLCMENABLED)) {
Expand Down Expand Up @@ -109,7 +109,7 @@ void addFile(Path path) throws MetaException {
if (fs.isDirectory(path)) {
throw new IllegalArgumentException(path + " cannot be a directory");
}
Path cmPath = getCMPath(hiveConf, getChksumString(path, fs));
Path cmPath = getCMPath(hiveConf, checksumFor(path, fs));
boolean copySuccessful = FileUtils
.copy(path.getFileSystem(hiveConf), path, cmPath.getFileSystem(hiveConf), cmPath, false,
false, hiveConf);
Expand All @@ -134,10 +134,10 @@ void addFile(Path path) throws MetaException {
* recursively move files inside directory to cmroot. Note the table must be managed table
* @param path a single file or directory
* @param ifPurge if the file should skip Trash when delete
* @return
* @return int
* @throws MetaException
*/
public int recycle(Path path, boolean ifPurge) throws MetaException {
int recycle(Path path, boolean ifPurge) throws MetaException {
if (!enabled) {
return 0;
}
Expand All @@ -151,7 +151,7 @@ public int recycle(Path path, boolean ifPurge) throws MetaException {
count += recycle(file.getPath(), ifPurge);
}
} else {
Path cmPath = getCMPath(hiveConf, getChksumString(path, fs));
Path cmPath = getCMPath(hiveConf, checksumFor(path, fs));

if (LOG.isDebugEnabled()) {
LOG.debug("Moving " + path.toString() + " to " + cmPath.toString());
Expand Down Expand Up @@ -207,7 +207,7 @@ public int recycle(Path path, boolean ifPurge) throws MetaException {
}

// Get checksum of a file
static public String getChksumString(Path path, FileSystem fs) throws IOException {
static public String checksumFor(Path path, FileSystem fs) throws IOException {
// TODO: fs checksum only available on hdfs, need to
// find a solution for other fs (eg, local fs, s3, etc)
String checksumString = null;
Expand All @@ -228,13 +228,10 @@ static public void setCmRoot(Path cmRoot) {
* to a deterministic location of cmroot. So user can retrieve the file back
* with the original location plus checksum.
* @param conf
* @param checkSum checksum of the file, can be retrieved by {@link getCksumString}
* @return
* @throws IOException
* @throws MetaException
* @param checkSum checksum of the file, can be retrieved by {@link #checksumFor(Path, FileSystem)}
* @return Path
*/
static Path getCMPath(Configuration conf, String checkSum)
throws IOException, MetaException {
static Path getCMPath(Configuration conf, String checkSum) throws IOException, MetaException {
String newFileName = checkSum;
int maxLength = conf.getInt(DFSConfigKeys.DFS_NAMENODE_MAX_COMPONENT_LENGTH_KEY,
DFSConfigKeys.DFS_NAMENODE_MAX_COMPONENT_LENGTH_DEFAULT);
Expand All @@ -250,28 +247,27 @@ static Path getCMPath(Configuration conf, String checkSum)
* Get original file specified by src and chksumString. If the file exists and checksum
* matches, return the file; otherwise, use chksumString to retrieve it from cmroot
* @param src Original file location
* @param chksumString Checksum of the original file
* @param conf
* @param checksumString Checksum of the original file
* @param hiveConf
* @return Corresponding FileStatus object
* @throws MetaException
*/
static public FileStatus getFileStatus(Path src, String chksumString,
HiveConf conf) throws MetaException {
static public FileStatus getFileStatus(Path src, String checksumString,
HiveConf hiveConf) throws MetaException {
try {
FileSystem srcFs = src.getFileSystem(conf);
if (chksumString == null) {
FileSystem srcFs = src.getFileSystem(hiveConf);
if (checksumString == null) {
return srcFs.getFileStatus(src);
}

if (!srcFs.exists(src)) {
return srcFs.getFileStatus(getCMPath(conf, chksumString));
return srcFs.getFileStatus(getCMPath(hiveConf, checksumString));
}

String currentChksumString = getChksumString(src, srcFs);
if (currentChksumString == null || chksumString.equals(currentChksumString)) {
String currentChecksumString = checksumFor(src, srcFs);
if (currentChecksumString == null || checksumString.equals(currentChecksumString)) {
return srcFs.getFileStatus(src);
} else {
return srcFs.getFileStatus(getCMPath(conf, chksumString));
return srcFs.getFileStatus(getCMPath(hiveConf, checksumString));
}
} catch (IOException e) {
throw new MetaException(StringUtils.stringifyException(e));
Expand Down Expand Up @@ -372,7 +368,7 @@ public void run() {
}

// Schedule CMClearer thread. Will be invoked by metastore
public static void scheduleCMClearer(HiveConf hiveConf) {
static void scheduleCMClearer(HiveConf hiveConf) {
if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.REPLCMENABLED)) {
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(
new BasicThreadFactory.Builder()
Expand Down
4 changes: 2 additions & 2 deletions ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand Down Expand Up @@ -167,7 +167,7 @@ protected int execute(DriverContext driverContext) {
}else{
LOG.debug("ReplCopyTask _files now tracks:" + oneSrc.getPath().toUri());
console.printInfo("Tracking file: " + oneSrc.getPath().toUri());
String chksumString = ReplChangeManager.getChksumString(oneSrc.getPath(), actualSrcFs);
String chksumString = ReplChangeManager.checksumFor(oneSrc.getPath(), actualSrcFs);
listBW.write(ReplChangeManager.encodeFileUri
(oneSrc.getPath().toUri().toString(), chksumString) + "\n");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvi
if ("hdfs".equals(inputPath.toUri().getScheme())) {
FileSystem fileSystem = inputPath.getFileSystem(hiveConf);
Path qualifiedUri = PathBuilder.fullyQualifiedHDFSUri(inputPath, fileSystem);
String checkSum = ReplChangeManager.getChksumString(qualifiedUri, fileSystem);
String checkSum = ReplChangeManager.checksumFor(qualifiedUri, fileSystem);
String newFileUri = ReplChangeManager.encodeFileUri(qualifiedUri.toString(), checkSum);
resourceUris.add(new ResourceUri(uri.getResourceType(), newFileUri));
} else {
Expand Down

0 comments on commit e42d159

Please sign in to comment.