Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
FALCON-1962 Extesnion related bugs
Author: Sowmya Ramesh <sramesh@hortonworks.com>

Reviewers: "Venkat <n.r.v@live.com>"

Closes #155 from sowmyaramesh/master
  • Loading branch information
sowmyaramesh committed May 20, 2016
1 parent fbd9fbe commit 823c7d1d8397be19b6fe94a7477bebcb288c94fc
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 37 deletions.
@@ -53,10 +53,11 @@ public void validate(final Properties extensionProperties) throws FalconExceptio
}
}

Cluster srcCluster = ClusterHelper.getCluster(HiveMirroringExtensionProperties.SOURCE_CLUSTER.getName());
String srcClusterName = extensionProperties.getProperty(HiveMirroringExtensionProperties.SOURCE_CLUSTER
.getName());
Cluster srcCluster = ClusterHelper.getCluster(srcClusterName);
if (srcCluster == null) {
throw new FalconException("Cluster entity " + HiveMirroringExtensionProperties.SOURCE_CLUSTER.getName()
+ " not found");
throw new FalconException("Cluster entity " + srcClusterName + " not found");
}
String srcClusterCatalogUrl = ClusterHelper.getRegistryEndPoint(srcCluster);
Configuration srcClusterConf = ClusterHelper.getConfiguration(srcCluster);
@@ -96,10 +97,11 @@ public void validate(final Properties extensionProperties) throws FalconExceptio
}

// Verify db exists on target
Cluster targetCluster = ClusterHelper.getCluster(HiveMirroringExtensionProperties.TARGET_CLUSTER.getName());
String targetClusterName = extensionProperties.getProperty(HiveMirroringExtensionProperties.TARGET_CLUSTER
.getName());
Cluster targetCluster = ClusterHelper.getCluster(targetClusterName);
if (targetCluster == null) {
throw new FalconException("Cluster entity " + HiveMirroringExtensionProperties.TARGET_CLUSTER.getName()
+ " not found");
throw new FalconException("Cluster entity " + targetClusterName + " not found");
}
String targetClusterCatalogUrl = ClusterHelper.getRegistryEndPoint(targetCluster);
Configuration targetClusterConf = ClusterHelper.getConfiguration(targetCluster);
@@ -118,15 +120,15 @@ public Properties getAdditionalProperties(final Properties extensionProperties)
String jobName = extensionProperties.getProperty(ExtensionProperties.JOB_NAME.getName());
// Add job name as Hive DR job
additionalProperties.put(HiveMirroringExtensionProperties.HIVE_MIRRORING_JOB_NAME.getName(),
jobName + System.currentTimeMillis());
jobName);

String clusterName = extensionProperties.getProperty(ExtensionProperties.CLUSTER_NAME.getName());
// Add required properties of cluster where job should run
additionalProperties.put(HiveMirroringExtensionProperties.CLUSTER_FOR_JOB_RUN.getName(),
extensionProperties.getProperty(ExtensionProperties.CLUSTER_NAME.getName()));
Cluster jobCluster = ClusterHelper.getCluster(ExtensionProperties.CLUSTER_NAME.getName());
clusterName);
Cluster jobCluster = ClusterHelper.getCluster(clusterName);
if (jobCluster == null) {
throw new FalconException("Cluster entity " + ExtensionProperties.CLUSTER_NAME.getName()
+ " not found");
throw new FalconException("Cluster entity " + clusterName + " not found");
}
additionalProperties.put(HiveMirroringExtensionProperties.CLUSTER_FOR_JOB_RUN_WRITE_EP.getName(),
ClusterHelper.getStorageUrl(jobCluster));
@@ -139,10 +141,11 @@ public Properties getAdditionalProperties(final Properties extensionProperties)
}

// Properties for src cluster
Cluster srcCluster = ClusterHelper.getCluster(HiveMirroringExtensionProperties.SOURCE_CLUSTER.getName());
String srcClusterName = extensionProperties.getProperty(HiveMirroringExtensionProperties.SOURCE_CLUSTER
.getName());
Cluster srcCluster = ClusterHelper.getCluster(srcClusterName);
if (srcCluster == null) {
throw new FalconException("Cluster entity " + HiveMirroringExtensionProperties.SOURCE_CLUSTER.getName()
+ " not found");
throw new FalconException("Cluster entity " + srcClusterName + " not found");
}
additionalProperties.put(HiveMirroringExtensionProperties.SOURCE_METASTORE_URI.getName(),
ClusterHelper.getRegistryEndPoint(srcCluster));
@@ -171,10 +174,11 @@ public Properties getAdditionalProperties(final Properties extensionProperties)
}

// Properties for target cluster
Cluster targetCluster = ClusterHelper.getCluster(HiveMirroringExtensionProperties.TARGET_CLUSTER.getName());
String targetClusterName = extensionProperties.getProperty(HiveMirroringExtensionProperties.TARGET_CLUSTER
.getName());
Cluster targetCluster = ClusterHelper.getCluster(targetClusterName);
if (targetCluster == null) {
throw new FalconException("Cluster entity " + HiveMirroringExtensionProperties.TARGET_CLUSTER.getName()
+ " not found");
throw new FalconException("Cluster entity " + targetClusterName + " not found");
}
additionalProperties.put(HiveMirroringExtensionProperties.TARGET_METASTORE_URI.getName(),
ClusterHelper.getRegistryEndPoint(targetCluster));
@@ -60,10 +60,10 @@ private ExtensionProcessBuilderUtils() {
}

public static Entity createProcessFromTemplate(final String processTemplate,
final String extensionName,
final Properties extensionProperties,
final String wfPath,
final String wfLibPath) throws FalconException {
final String extensionName,
final Properties extensionProperties,
final String wfPath,
final String wfLibPath) throws FalconException {
if (StringUtils.isBlank(processTemplate) || StringUtils.isBlank(extensionName)
|| extensionProperties == null || StringUtils.isBlank(wfPath)) {
throw new FalconException("Invalid arguments passed to extension builder");
@@ -230,32 +230,26 @@ private static void bindNotificationProperties(final Notification processNotific

private static void bindACLProperties(final ACL acl,
final Properties extensionProperties) throws FalconException {
if (!SecurityUtil.isAuthorizationEnabled()) {
return;
}
String aclOwner = extensionProperties.getProperty(ExtensionProperties.JOB_ACL_OWNER.getName());
String aclGroup = extensionProperties.getProperty(ExtensionProperties.JOB_ACL_GROUP.getName());
String aclPermission = extensionProperties.getProperty(ExtensionProperties.JOB_ACL_PERMISSION.getName());

String aclowner = extensionProperties.getProperty(ExtensionProperties.JOB_ACL_OWNER.getName());
if (StringUtils.isNotEmpty(aclowner)) {
acl.setOwner(aclowner);
} else {
throw new FalconException("ACL owner extension property cannot be null or empty when authorization is "
if (SecurityUtil.isAuthorizationEnabled() && (StringUtils.isEmpty(aclOwner) || StringUtils.isEmpty(aclGroup)
|| StringUtils.isEmpty(aclPermission))) {
throw new FalconException("ACL extension properties cannot be null or empty when authorization is "
+ "enabled");
}

String aclGroup = extensionProperties.getProperty(ExtensionProperties.JOB_ACL_GROUP.getName());
if (StringUtils.isNotEmpty(aclOwner)) {
acl.setOwner(aclOwner);
}

if (StringUtils.isNotEmpty(aclGroup)) {
acl.setGroup(aclGroup);
} else {
throw new FalconException("ACL group extension property cannot be null or empty when authorization is "
+ "enabled");
}

String aclPermission = extensionProperties.getProperty(ExtensionProperties.JOB_ACL_PERMISSION.getName());
if (StringUtils.isNotEmpty(aclPermission)) {
acl.setPermission(aclPermission);
} else {
throw new FalconException("ACL permission extension property cannot be null or empty when authorization is "
+ "enabled");
}
}

0 comments on commit 823c7d1

Please sign in to comment.