Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
FALCON-1974 Cluster update : Allow superuser to update bundle/coord o…
…f dependent entities

Author: bvellanki <bvellanki@hortonworks.com>

Reviewers: "Ying Zheng  <yzheng@hortonworks.com>"

Closes #151 from bvellanki/FALCON-1974
  • Loading branch information
bvellanki committed May 20, 2016
1 parent e488775 commit fcd066a0a04da49b6ed507f832059b61faa66a79
Showing 3 changed files with 99 additions and 70 deletions.
@@ -1274,7 +1274,7 @@ public String update(Entity oldEntity, Entity newEntity,
LOG.debug("Going to update! : {} for cluster {}, bundle: {}",
newEntity.toShortString(), cluster, bundle.getId());
result.append(updateInternal(oldEntity, newEntity, clusterEntity, bundle,
CurrentUser.getUser(), skipDryRun)).append("\n");
bundle.getUser(), skipDryRun)).append("\n");
LOG.info("Entity update complete: {} for cluster {}, bundle: {}", newEntity.toShortString(), cluster,
bundle.getId());
}
@@ -1434,34 +1434,39 @@ private Frequency getDelay(Feed entity, CoordinatorJob coord) {
}

private String updateInternal(Entity oldEntity, Entity newEntity, Cluster cluster, BundleJob oldBundle,
String user, Boolean skipDryRun) throws FalconException {
String user, Boolean skipDryRun) throws FalconException {
String currentUser = CurrentUser.getUser();
switchUser(user);

String clusterName = cluster.getName();

Date effectiveTime = getEffectiveTime(cluster, newEntity);
LOG.info("Effective time " + effectiveTime);
try {
//Validate that new entity can be scheduled
dryRunForUpdate(cluster, newEntity, effectiveTime, skipDryRun);

boolean suspended = BUNDLE_SUSPENDED_STATUS.contains(oldBundle.getStatus());

//Set end times for old coords
updateCoords(clusterName, oldBundle, EntityUtil.getParallel(oldEntity), effectiveTime, newEntity);
//schedule new entity
String newJobId = scheduleForUpdate(newEntity, cluster, effectiveTime);
BundleJob newBundle = null;
if (newJobId != null) {
newBundle = getBundleInfo(clusterName, newJobId);
}

//Validate that new entity can be scheduled
dryRunForUpdate(cluster, newEntity, effectiveTime, skipDryRun);

boolean suspended = BUNDLE_SUSPENDED_STATUS.contains(oldBundle.getStatus());

//Set end times for old coords
updateCoords(clusterName, oldBundle, EntityUtil.getParallel(oldEntity), effectiveTime, newEntity);

//schedule new entity
String newJobId = scheduleForUpdate(newEntity, cluster, effectiveTime, user);
BundleJob newBundle = null;
if (newJobId != null) {
newBundle = getBundleInfo(clusterName, newJobId);
}

//Sometimes updateCoords() resumes the suspended coords. So, if already suspended, resume now
//Also suspend new bundle
if (suspended) {
doBundleAction(newEntity, BundleAction.SUSPEND, cluster.getName());
//Sometimes updateCoords() resumes the suspended coords. So, if already suspended, resume now
//Also suspend new bundle
if (suspended) {
doBundleAction(newEntity, BundleAction.SUSPEND, cluster.getName());
}
return getUpdateString(newEntity, effectiveTime, oldBundle, newBundle);
} finally {
// Switch back to current user in case of exception.
switchUser(currentUser);
}

return getUpdateString(newEntity, effectiveTime, oldBundle, newBundle);
}

private Date getEffectiveTime(Cluster cluster, Entity newEntity) {
@@ -1484,27 +1489,19 @@ private void dryRunForUpdate(Cluster cluster, Entity entity, Date startTime,
}
}

private String scheduleForUpdate(Entity entity, Cluster cluster, Date startDate, String user)
throws FalconException {
private String scheduleForUpdate(Entity entity, Cluster cluster, Date startDate) throws FalconException {
Entity clone = entity.copy();

String currentUser = CurrentUser.getUser();
switchUser(user);
try {
EntityUtil.setStartDate(clone, cluster.getName(), startDate);
Path buildPath = EntityUtil.getNewStagingPath(cluster, clone);
OozieEntityBuilder builder = OozieEntityBuilder.get(clone);
Properties properties = builder.build(cluster, buildPath);
if (properties != null) {
LOG.info("Scheduling {} on cluster {} with props {}", entity.toShortString(), cluster.getName(),
EntityUtil.setStartDate(clone, cluster.getName(), startDate);
Path buildPath = EntityUtil.getNewStagingPath(cluster, clone);
OozieEntityBuilder builder = OozieEntityBuilder.get(clone);
Properties properties = builder.build(cluster, buildPath);
if (properties != null) {
LOG.info("Scheduling {} on cluster {} with props {}", entity.toShortString(), cluster.getName(),
properties);
return scheduleEntity(cluster.getName(), properties, entity);
} else {
LOG.info("No new workflow to be scheduled for this " + entity.toShortString());
return null;
}
} finally {
switchUser(currentUser);
return scheduleEntity(cluster.getName(), properties, entity);
} else {
LOG.info("No new workflow to be scheduled for this " + entity.toShortString());
return null;
}
}

@@ -375,6 +375,7 @@ protected APIResult update(Entity newEntity, String type, String entityName, Boo
public APIResult updateClusterDependents(String clusterName, String colo, Boolean skipDryRun) {
checkColo(colo);
try {
verifySuperUser();
Cluster cluster = EntityUtil.getEntity(EntityType.CLUSTER, clusterName);
verifySafemodeOperation(cluster, EntityUtil.ENTITY_OPERATION.UPDATE_CLUSTER_DEPENDENTS);
int clusterVersion = cluster.getVersion();
@@ -390,54 +391,72 @@ public APIResult updateClusterDependents(String clusterName, String colo, Boolea
Entity entity = EntityUtil.getEntity(depEntity.second, depEntity.first);
switch (entity.getEntityType()) {
case FEED:
Clusters feedClusters = ((Feed)entity).getClusters();
List<org.apache.falcon.entity.v0.feed.Cluster> updatedFeedClusters =
new ArrayList<org.apache.falcon.entity.v0.feed.Cluster>();
Feed newFeedEntity = (Feed) entity.copy();
Clusters feedClusters = newFeedEntity.getClusters();
if (feedClusters != null) {
boolean requireUpdate = false;
for(org.apache.falcon.entity.v0.feed.Cluster feedCluster : feedClusters.getClusters()) {
if (feedCluster.getName().equals(clusterName)
&& feedCluster.getVersion() != clusterVersion) {
// update feed cluster entity
feedCluster.setVersion(clusterVersion);
requireUpdate = true;
}
updatedFeedClusters.add(feedCluster);
}
((Feed)entity).getClusters().getClusters().clear();
((Feed)entity).getClusters().getClusters().addAll(updatedFeedClusters);
result.append(update(entity, entity.getEntityType().name(),
entity.getName(), skipDryRun).getMessage());
if (requireUpdate) {
result.append(getWorkflowEngine(entity).update(entity, newFeedEntity,
cluster.getName(), skipDryRun));
updateEntityInConfigStore(entity, newFeedEntity);
}
}
break;
case PROCESS:
org.apache.falcon.entity.v0.process.Clusters processClusters = ((Process)entity).getClusters();
List<org.apache.falcon.entity.v0.process.Cluster> updatedProcClusters =
new ArrayList<org.apache.falcon.entity.v0.process.Cluster>();
Process newProcessEntity = (Process) entity.copy();
org.apache.falcon.entity.v0.process.Clusters processClusters = newProcessEntity.getClusters();
if (processClusters != null) {
boolean requireUpdate = false;
for(org.apache.falcon.entity.v0.process.Cluster procCluster : processClusters.getClusters()) {
if (procCluster.getName().equals(clusterName)
&& procCluster.getVersion() != clusterVersion) {
// update feed cluster entity
procCluster.setVersion(clusterVersion);
requireUpdate = true;
}
updatedProcClusters.add(procCluster);
}
((Process)entity).getClusters().getClusters().clear();
((Process)entity).getClusters().getClusters().addAll(updatedProcClusters);
result.append(update(entity, entity.getEntityType().name(),
entity.getName(), skipDryRun).getMessage());
if (requireUpdate) {
result.append(getWorkflowEngine(entity).update(entity, newProcessEntity,
cluster.getName(), skipDryRun));
updateEntityInConfigStore(entity, newProcessEntity);
}
}
break;
default:
break;
}
}
return new APIResult(APIResult.Status.SUCCEEDED, result.toString());
} catch (FalconException e) {
} catch (Exception e) {
LOG.error("Update failed", e);
throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR);
}
}

private void updateEntityInConfigStore(Entity oldEntity, Entity newEntity) {
List<Entity> tokenList = new ArrayList<>();
try {
configStore.initiateUpdate(newEntity);
obtainEntityLocks(oldEntity, "update", tokenList);
configStore.update(newEntity.getEntityType(), newEntity);
} catch (Throwable e) {
LOG.error("Update failed", e);
throw FalconWebException.newAPIException(e);
} finally {
ConfigurationStore.get().cleanupUpdateInit();
releaseEntityLocks(oldEntity.getName(), tokenList);
}

}

private void obtainEntityLocks(Entity entity, String command, List<Entity> tokenList)
throws FalconException {
//first obtain lock for the entity for which update is issued.
@@ -483,12 +502,7 @@ private void validateUpdate(Entity oldEntity, Entity newEntity) throws FalconExc
}

if (oldEntity.getEntityType() == EntityType.CLUSTER) {
final UserGroupInformation authenticatedUGI = CurrentUser.getAuthenticatedUGI();
DefaultAuthorizationProvider authorizationProvider = new DefaultAuthorizationProvider();
if (!authorizationProvider.isSuperUser(authenticatedUGI)) {
throw new FalconException("Permission denied : "
+ "Cluster entity update can only be performed by superuser.");
}
verifySuperUser();
}

String[] props = oldEntity.getEntityType().getImmutableProperties();
@@ -529,8 +543,15 @@ protected Entity submitInternal(InputStream inputStream, String type, String doA
}

protected void verifySafemodeOperation(Entity entity, EntityUtil.ENTITY_OPERATION operation) {
// if Falcon not in safemode, return
// if Falcon not in safemode, allow everything except cluster update
if (!StartupProperties.isServerInSafeMode()) {
if (operation.equals(EntityUtil.ENTITY_OPERATION.UPDATE)
&& entity.getEntityType().equals(EntityType.CLUSTER)) {
LOG.error("Entity operation {} is only allowed on cluster entities during safemode",
operation.name());
throw FalconWebException.newAPIException("Entity operation " + operation.name()
+ " is only allowed on cluster entities during safemode");
}
return;
}

@@ -1354,4 +1375,13 @@ private boolean containsIgnoreCase(List<String> strList, String str) {
}
return false;
}

private void verifySuperUser() throws FalconException, IOException {
final UserGroupInformation authenticatedUGI = CurrentUser.getAuthenticatedUGI();
DefaultAuthorizationProvider authorizationProvider = new DefaultAuthorizationProvider();
if (!authorizationProvider.isSuperUser(authenticatedUGI)) {
throw new FalconException("Permission denied : "
+ "Cluster entity update can only be performed by superuser.");
}
}
}
@@ -72,16 +72,18 @@ public void testUpdateClusterCommands() throws Exception {
filePath = TestContext.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, overlay);
Assert.assertEquals(executeWithURL("entity -submit -type process -file " + filePath), 0);

// update cluster outside safemode, it should fail
filePath = TestContext.overlayParametersOverTemplate(TestContext.CLUSTER_UPDATED_TEMPLATE, overlay);
Assert.assertEquals(executeWithURL("entity -update -type cluster -file "
+ filePath + " -name " + overlay.get("cluster")), -1);

// Update cluster here and test that it works

// Update cluster after setting safemode and test that it works
initSafemode();
filePath = TestContext.overlayParametersOverTemplate(TestContext.CLUSTER_UPDATED_TEMPLATE, overlay);
Assert.assertEquals(executeWithURL("entity -update -type cluster -file "
+ filePath + " -name " + overlay.get("cluster")), 0);
clearSafemode();

// Try to update dependent entities
// Try to update dependent entities, it should succeed
Assert.assertEquals(executeWithURL("entity -updateClusterDependents -cluster "
+ overlay.get("cluster") + " -skipDryRun "), 0);

0 comments on commit fcd066a

Please sign in to comment.