Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
FALCON-2057 HiveDR not working with multiple users and same DB
Author: bvellanki <bvellanki@hortonworks.com>

Reviewers: "yzheng-hortonworks <yzheng@hortonworks.com>, Sowmya Ramesh <sramesh@hortonworks.com>, Venkat Ranganathan <venkat@hortonworks.com>"

Closes #203 from bvellanki/FALCON-2057

(cherry picked from commit d0bc188)
Signed-off-by: bvellanki <bvellanki@hortonworks.com>
  • Loading branch information
bvellanki committed Jul 1, 2016
1 parent 0222d39 commit 1fabb3f40bc642bb11583dae9ef6eaf12f0861d4
Showing 1 changed file with 20 additions and 3 deletions.
@@ -32,6 +32,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@@ -71,6 +72,8 @@ private void init(FileSystem targetFileSystem) throws IOException {
Path basePath = new Path(BASE_DEFAULT_STORE_PATH);
FileUtils.validatePath(fileSystem, basePath);

// Current limitation is that only users who belong to DRStatusStore.storeGroup can submit HiveDR jobs.
// BaseDir for status store is created with permissions 770 so that all eligible users can access statusStore.
Path storePath = new Path(DEFAULT_STORE_PATH);
if (!fileSystem.exists(storePath)) {
if (!FileSystem.mkdirs(fileSystem, storePath, DEFAULT_STORE_PERMISSION)) {
@@ -163,10 +166,11 @@ public void deleteReplicationStatus(String jobName, String database) throws Hive
private DBReplicationStatus getDbReplicationStatus(String source, String target, String jobName,
String database) throws HiveReplicationException{
DBReplicationStatus dbReplicationStatus = null;
Path statusDbDirPath = getStatusDbDirPath(database);
Path statusDirPath = getStatusDirPath(database, jobName);

// check if database name or jobName can contain chars not allowed by hdfs dir/file naming.
// if yes, use md5 of the same for dir names. prefer to use actual db names for readability.

try {
if (fileSystem.exists(statusDirPath)) {
dbReplicationStatus = readStatusFile(statusDirPath);
@@ -176,6 +180,15 @@ private DBReplicationStatus getDbReplicationStatus(String source, String target,
ReplicationStatus initDbStatus = new ReplicationStatus(source, target, jobName,
database, null, ReplicationStatus.Status.INIT, -1);
dbReplicationStatus = new DBReplicationStatus(initDbStatus);

// Create parent dir first with default status store permissions. FALCON-2057
if (!fileSystem.exists(statusDbDirPath)) {
if (!FileSystem.mkdirs(fileSystem, statusDbDirPath, DEFAULT_STATUS_DIR_PERMISSION)) {
String error = "mkdir failed for " + statusDbDirPath.toString();
LOG.error(error);
throw new HiveReplicationException(error);
}
}
if (!FileSystem.mkdirs(fileSystem, statusDirPath, DEFAULT_STATUS_DIR_PERMISSION)) {
String error = "mkdir failed for " + statusDirPath.toString();
LOG.error(error);
@@ -197,7 +210,11 @@ private Path getStatusDirPath(DBReplicationStatus dbReplicationStatus) {
}

public Path getStatusDirPath(String database, String jobName) {
return new Path(DEFAULT_STORE_PATH + "/" + database.toLowerCase() + "/" + jobName);
return new Path(getStatusDbDirPath(database), jobName);
}

public Path getStatusDbDirPath(String dbName) {
return new Path(new Path(BASE_DEFAULT_STORE_PATH), dbName.toLowerCase());
}

private void writeStatusFile(DBReplicationStatus dbReplicationStatus) throws HiveReplicationException {
@@ -271,7 +288,7 @@ private DBReplicationStatus readStatusFile(Path statusDirPath) throws HiveReplic
public void checkForReplicationConflict(String newSource, String jobName,
String database, String table) throws HiveReplicationException {
try {
Path globPath = new Path(DEFAULT_STORE_PATH + "/" + database.toLowerCase() + "/*/latest.json");
Path globPath = new Path(getStatusDbDirPath(database), "*" + File.separator + "latest.json");
FileStatus[] files = fileSystem.globStatus(globPath);
for(FileStatus file : files) {
DBReplicationStatus dbFileStatus = new DBReplicationStatus(IOUtils.toString(

0 comments on commit 1fabb3f

Please sign in to comment.