Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
FALCON-1085 Support Cluster entity updates in Falcon Server
Added basic documentation, https://issues.apache.org/jira/browse/FALCON-1937 will contain detailed documentation.

Author: bvellanki <bvellanki@hortonworks.com>

Reviewers: "Venkat Ranganathan <venkat@hortonworks.com>, yzheng-hortonworks <yzheng@hortonworks.com>, peeyush b <pbishnoi@hortonworks.com>"

Closes #127 from bvellanki/FALCON-1085
  • Loading branch information
bvellanki committed May 10, 2016
1 parent bb6032b commit f3ff8b27f0a77d802306f0fc9ffdff51ae6c7486
Showing 36 changed files with 817 additions and 50 deletions.
@@ -66,6 +66,8 @@ public Options createEntityOptions() {
"Submits an entity xml to Falcon");
Option update = new Option(FalconCLIConstants.UPDATE_OPT, false,
"Updates an existing entity xml");
Option updateClusterDependents = new Option(FalconCLIConstants.UPDATE_CLUSTER_DEPENDENTS_OPT, false,
"Updates dependent entities of a cluster in workflow engine");
Option schedule = new Option(FalconCLIConstants.SCHEDULE_OPT, false,
"Schedules a submited entity in Falcon");
Option suspend = new Option(FalconCLIConstants.SUSPEND_OPT, false,
@@ -96,6 +98,7 @@ public Options createEntityOptions() {
OptionGroup group = new OptionGroup();
group.addOption(submit);
group.addOption(update);
group.addOption(updateClusterDependents);
group.addOption(schedule);
group.addOption(suspend);
group.addOption(resume);
@@ -217,7 +220,8 @@ public void entityCommand(CommandLine commandLine, FalconClient client) throws F
}

EntityType entityTypeEnum = null;
if (optionsList.contains(FalconCLIConstants.LIST_OPT)) {
if (optionsList.contains(FalconCLIConstants.LIST_OPT)
|| optionsList.contains(FalconCLIConstants.UPDATE_CLUSTER_DEPENDENTS_OPT)) {
if (entityType == null) {
entityType = "";
}
@@ -255,6 +259,9 @@ public void entityCommand(CommandLine commandLine, FalconClient client) throws F
validateColo(optionsList);
validateNotEmpty(entityName, FalconCLIConstants.ENTITY_NAME_OPT);
result = client.update(entityType, entityName, filePath, skipDryRun, doAsUser).getMessage();
} else if (optionsList.contains(FalconCLIConstants.UPDATE_CLUSTER_DEPENDENTS_OPT)) {
validateNotEmpty(cluster, FalconCLIConstants.CLUSTER_OPT);
result = client.updateClusterDependents(cluster, skipDryRun, doAsUser).getMessage();
} else if (optionsList.contains(FalconCLIConstants.SUBMIT_AND_SCHEDULE_OPT)) {
validateNotEmpty(filePath, "file");
validateColo(optionsList);
@@ -47,6 +47,7 @@ private FalconCLIConstants(){
public static final String VERSION_OPT = "version";
public static final String SUBMIT_OPT = "submit";
public static final String UPDATE_OPT = "update";
public static final String UPDATE_CLUSTER_DEPENDENTS_OPT = "updateClusterDependents";
public static final String DELETE_OPT = "delete";
public static final String SUBMIT_AND_SCHEDULE_OPT = "submitAndSchedule";
public static final String VALIDATE_OPT = "validate";
@@ -236,6 +236,7 @@ protected static enum Entities {
VALIDATE("api/entities/validate/", HttpMethod.POST, MediaType.TEXT_XML),
SUBMIT("api/entities/submit/", HttpMethod.POST, MediaType.TEXT_XML),
UPDATE("api/entities/update/", HttpMethod.POST, MediaType.TEXT_XML),
UPDATEDEPENDENTS("api/entities/updateClusterDependents/", HttpMethod.POST, MediaType.TEXT_XML),
SUBMITANDSCHEDULE("api/entities/submitAndSchedule/", HttpMethod.POST, MediaType.TEXT_XML),
SCHEDULE("api/entities/schedule/", HttpMethod.POST, MediaType.TEXT_XML),
SUSPEND("api/entities/suspend/", HttpMethod.POST, MediaType.TEXT_XML),
@@ -430,6 +431,14 @@ public APIResult update(String entityType, String entityName, String filePath,
return getResponse(APIResult.class, clientResponse);
}

public APIResult updateClusterDependents(String clusterName, Boolean skipDryRun,
String doAsUser) throws FalconCLIException {
ClientResponse clientResponse = new ResourceBuilder().path(Entities.UPDATEDEPENDENTS.path, clusterName)
.addQueryParam(SKIP_DRYRUN, skipDryRun).addQueryParam(DO_AS_OPT, doAsUser)
.call(Entities.UPDATEDEPENDENTS);
return getResponse(APIResult.class, clientResponse);
}

@Override
public APIResult submitAndSchedule(String entityType, String filePath, Boolean skipDryRun,
String doAsUser, String properties) throws FalconCLIException {
@@ -75,6 +75,7 @@
<xs:attribute type="IDENTIFIER" name="name" use="required"/>
<xs:attribute type="xs:string" name="description"/>
<xs:attribute type="xs:string" name="colo" use="required"/>
<xs:attribute type="xs:int" name="version" use="optional" default="0"/>
</xs:complexType>
<xs:complexType name="locations">
<xs:annotation>
@@ -76,6 +76,7 @@
<xs:attribute type="IDENTIFIER" name="name" use="required"/>
<xs:attribute type="xs:string" name="colo" use="required"/>
<xs:attribute type="xs:string" name="description"/>
<xs:attribute type="xs:int" name="version" use="optional" default="0"/>
<xs:attribute type="datasource-type" name="type" use="required">
<xs:annotation>
<xs:documentation>
@@ -263,7 +264,7 @@
<xs:complexType name="ACL">
<xs:annotation>
<xs:documentation>
Access control list for this cluster.
Access control list for this Entity.
owner is the Owner of this entity.
group is the one which has access to read - not used at this time.
permission is not enforced at this time
@@ -129,6 +129,7 @@
</xs:sequence>
<xs:attribute type="IDENTIFIER" name="name" use="required"/>
<xs:attribute type="xs:string" name="description"/>
<xs:attribute type="xs:int" name="version" use="optional" default="0"/>
</xs:complexType>
<xs:complexType name="cluster">
<xs:annotation>
@@ -168,6 +169,7 @@
<xs:attribute type="cluster-type" name="type" use="optional"/>
<xs:attribute type="xs:string" name="partition" use="optional"/>
<xs:attribute type="frequency-type" name="delay" use="optional" />
<xs:attribute type="xs:int" name="version" use="optional" default="0"/>
</xs:complexType>
<xs:complexType name="partitions">
<xs:annotation>
@@ -188,6 +188,7 @@
<xs:element type="ACL" name="ACL" minOccurs="0"/>
</xs:sequence>
<xs:attribute type="IDENTIFIER" name="name" use="required"/>
<xs:attribute type="xs:int" name="version" use="optional" default="0"/>
</xs:complexType>

<xs:simpleType name="IDENTIFIER">
@@ -219,6 +220,7 @@
<xs:element type="validity" name="validity"/>
</xs:sequence>
<xs:attribute type="IDENTIFIER" name="name" use="required"/>
<xs:attribute type="xs:int" name="version" use="optional" default="0"/>
</xs:complexType>

<xs:complexType name="validity">
@@ -32,6 +32,8 @@
import org.apache.falcon.security.SecurityUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.List;
@@ -45,7 +47,7 @@ public final class ClusterHelper {
public static final String WORKINGDIR = "working";
public static final String NO_USER_BROKER_URL = "NA";
public static final String EMPTY_DIR_NAME = "EMPTY_DIR_DONT_DELETE";

private static final Logger LOG = LoggerFactory.getLogger(ClusterHelper.class);

private ClusterHelper() {
}
@@ -123,6 +125,9 @@ public static String getMessageBrokerImplClass(Cluster cluster) {
}

public static Interface getInterface(Cluster cluster, Interfacetype type) {
if (cluster.getInterfaces() == null) {
return null;
}
for (Interface interf : cluster.getInterfaces().getInterfaces()) {
if (interf.getType() == type) {
return interf;
@@ -143,6 +148,9 @@ private static String getNormalizedUrl(Cluster cluster, Interfacetype type) {


public static Location getLocation(Cluster cluster, ClusterLocationType clusterLocationType) {
if (cluster.getLocations() == null) {
return null;
}
for (Location loc : cluster.getLocations().getLocations()) {
if (loc.getName().equals(clusterLocationType)) {
return loc;
@@ -211,4 +219,44 @@ public static String getEmptyDir(Cluster cluster) {
return getStorageUrl(cluster) + getLocation(cluster, ClusterLocationType.STAGING).getPath()
+ "/" + EMPTY_DIR_NAME;
}

public static boolean matchInterface(final Cluster oldEntity, final Cluster newEntity,
final Interfacetype interfaceType) {
Interface oldInterface = getInterface(oldEntity, interfaceType);
Interface newInterface = getInterface(newEntity, interfaceType);
String oldEndpoint = (oldInterface == null) ? null : oldInterface.getEndpoint();
String newEndpoint = (newInterface == null) ? null : newInterface.getEndpoint();
LOG.debug("Verifying if Interfaces match for cluster {} : Old - {}, New - {}",
interfaceType.name(), oldEndpoint, newEndpoint);
return StringUtils.isBlank(oldEndpoint) && StringUtils.isBlank(newEndpoint)
|| StringUtils.isNotBlank(oldEndpoint) && oldEndpoint.equalsIgnoreCase(newEndpoint);
}

public static boolean matchLocations(final Cluster oldEntity, final Cluster newEntity,
final ClusterLocationType locationType) {
Location oldLocation = getLocation(oldEntity, locationType);
Location newLocation = getLocation(newEntity, locationType);
String oldLocationPath = (oldLocation == null) ? null : oldLocation.getPath();
String newLocationPath = (newLocation == null) ? null : newLocation.getPath();
LOG.debug("Verifying if Locations match {} : Old - {}, New - {}",
locationType.name(), oldLocationPath, newLocationPath);
return StringUtils.isBlank(oldLocationPath) && StringUtils.isBlank(newLocationPath)
|| StringUtils.isNotBlank(oldLocationPath) && oldLocationPath.equalsIgnoreCase(newLocationPath);
}

public static boolean matchProperties(final Cluster oldEntity, final Cluster newEntity) {
Map<String, String> oldProps = getClusterProperties(oldEntity);
Map<String, String> newProps = getClusterProperties(newEntity);
return oldProps.equals(newProps);
}

private static Map<String, String> getClusterProperties(final Cluster cluster) {
Map<String, String> returnProps = new HashMap<String, String>();
if (cluster.getProperties() != null) {
for (Property prop : cluster.getProperties().getProperties()) {
returnProps.put(prop.getName(), prop.getValue());
}
}
return returnProps;
}
}
@@ -79,7 +79,8 @@ public void onChange(Entity oldEntity, Entity newEntity) throws FalconException
if (oldEntity.getEntityType() != EntityType.CLUSTER) {
return;
}
throw new FalconException("change shouldn't be supported on cluster!");
onRemove(oldEntity);
onAdd(newEntity);
}

@Override
@@ -35,6 +35,7 @@
import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.cluster.ClusterLocationType;
import org.apache.falcon.entity.v0.datasource.Datasource;
import org.apache.falcon.entity.v0.datasource.DatasourceType;
import org.apache.falcon.entity.v0.cluster.Property;
import org.apache.falcon.entity.v0.feed.ClusterType;
@@ -130,6 +131,7 @@ public short getPriority() {
public enum ENTITY_OPERATION {
SUBMIT,
UPDATE,
UPDATE_CLUSTER_DEPENDENTS,
SCHEDULE,
SUBMIT_AND_SCHEDULE,
DELETE,
@@ -706,6 +708,40 @@ public static Retry getRetry(Entity entity) throws FalconException {
}
}

public static Integer getVersion(final Entity entity) throws FalconException {
switch (entity.getEntityType()) {
case FEED:
return ((Feed)entity).getVersion();
case PROCESS:
return ((Process)entity).getVersion();
case CLUSTER:
return ((Cluster)entity).getVersion();
case DATASOURCE:
return ((Datasource)entity).getVersion();
default:
throw new FalconException("Invalid entity type:" + entity.getEntityType());
}
}

public static void setVersion(Entity entity, final Integer version) throws FalconException {
switch (entity.getEntityType()) {
case FEED:
((Feed)entity).setVersion(version);
break;
case PROCESS:
((Process)entity).setVersion(version);
break;
case CLUSTER:
((Cluster)entity).setVersion(version);
break;
case DATASOURCE:
((Datasource)entity).setVersion(version);
break;
default:
throw new FalconException("Invalid entity type:" + entity.getEntityType());
}
}

//Staging path that stores scheduler configs like oozie coord/bundle xmls, parent workflow xml
//Each entity update creates a new staging path
//Base staging path is the base path for all staging dirs
@@ -1123,4 +1159,37 @@ public static String evaluateDependentPath(String feedPath, Date instanceTime) {
return instancePath;
}

/**
* Returns true if entity is dependent on cluster, else returns false.
* @param entity
* @param clusterName
* @return
*/
public static boolean isEntityDependentOnCluster(Entity entity, String clusterName) {
switch (entity.getEntityType()) {
case CLUSTER:
return entity.getName().equalsIgnoreCase(clusterName);

case FEED:
Feed feed = (Feed) entity;
for (org.apache.falcon.entity.v0.feed.Cluster cluster : feed.getClusters().getClusters()) {
if (cluster.getName().equalsIgnoreCase(clusterName)) {
return true;
}
}
break;

case PROCESS:
Process process = (Process) entity;
for (org.apache.falcon.entity.v0.process.Cluster cluster : process.getClusters().getClusters()) {
if (cluster.getName().equalsIgnoreCase(clusterName)) {
return true;
}
}
break;
default:
}
return false;
}

}
@@ -97,6 +97,14 @@ public void validate(Feed feed) throws FalconException {
cluster.getValidity().setEnd(DateUtil.NEVER);
}

// set Cluster version
int clusterVersion = ClusterHelper.getCluster(cluster.getName()).getVersion();
if (cluster.getVersion() > 0 && cluster.getVersion() > clusterVersion) {
throw new ValidationException("Feed should not set cluster to a version that does not exist");
} else {
cluster.setVersion(clusterVersion);
}

validateClusterValidity(cluster.getValidity().getStart(), cluster.getValidity().getEnd(),
cluster.getName());
validateClusterHasRegistry(feed, cluster);
@@ -91,6 +91,14 @@ public void validate(Process process) throws FalconException {
cluster.getValidity().setEnd(DateUtil.NEVER);
}

// set Cluster version
int clusterVersion = ClusterHelper.getCluster(cluster.getName()).getVersion();
if (cluster.getVersion() > 0 && cluster.getVersion() > clusterVersion) {
throw new ValidationException("Process should not set cluster to a version that does not exist");
} else {
cluster.setVersion(clusterVersion);
}

validateProcessValidity(cluster.getValidity().getStart(), cluster.getValidity().getEnd());
validateHDFSPaths(process, clusterName);
validateProperties(process);
@@ -20,12 +20,15 @@

import org.apache.commons.codec.CharEncoding;
import org.apache.falcon.FalconException;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.v0.AccessControlList;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.service.ConfigurationChangeListener;
import org.apache.falcon.service.FalconService;
import org.apache.falcon.update.UpdateHelper;
import org.apache.falcon.util.ReflectionUtils;
import org.apache.falcon.util.StartupProperties;
import org.apache.hadoop.fs.FileStatus;
@@ -242,9 +245,10 @@ public synchronized void publish(EntityType type, Entity entity) throws FalconEx
private synchronized void updateInternal(EntityType type, Entity entity) throws FalconException {
try {
if (get(type, entity.getName()) != null) {
persist(type, entity);
ConcurrentHashMap<String, Entity> entityMap = dictionary.get(type);
Entity oldEntity = entityMap.get(entity.getName());
updateVersion(oldEntity, entity);
persist(type, entity);
onChange(oldEntity, entity);
entityMap.put(entity.getName(), entity);
} else {
@@ -256,6 +260,18 @@ private synchronized void updateInternal(EntityType type, Entity entity) throws
AUDIT.info(type + "/" + entity.getName() + " is replaced into config store");
}

private void updateVersion(Entity oldentity, Entity newEntity) throws FalconException {
// increase version number for cluster only if dependent feeds/process needs to be updated.
if (oldentity.getEntityType().equals(EntityType.CLUSTER)) {
if (UpdateHelper.isClusterEntityUpdated((Cluster) oldentity, (Cluster) newEntity)) {
EntityUtil.setVersion(newEntity, EntityUtil.getVersion(oldentity) + 1);
}
} else if (!EntityUtil.equals(oldentity, newEntity)) {
// Increase version for other entities if they actually changed.
EntityUtil.setVersion(newEntity, EntityUtil.getVersion(oldentity));
}
}

public synchronized void update(EntityType type, Entity entity) throws FalconException {
if (updatesInProgress.get() == entity) {
try {

0 comments on commit f3ff8b2

Please sign in to comment.