Skip to content
This repository has been archived by the owner on Apr 4, 2021. It is now read-only.

Commit

Permalink
FALCON-2199 Delete API support for extension job (user extension)
Browse files Browse the repository at this point in the history
This pull request is dependent on #331

Author: sandeep <sandysmdl@gmail.com>

Reviewers: @pallavi-rao

Closes #333 from sandeepSamudrala/FALCON-2199
  • Loading branch information
sandeepSamudrala authored and Pallavi Rao committed Jan 2, 2017
1 parent 4f42dc1 commit 42d3794
Show file tree
Hide file tree
Showing 11 changed files with 165 additions and 62 deletions.
4 changes: 1 addition & 3 deletions client/src/main/java/org/apache/falcon/ExtensionHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public final class ExtensionHandler {
private static final String TMP_BASE_DIR = String.format("file://%s", System.getProperty("java.io.tmpdir"));
private static final String LOCATION = "location";
private static final String TYPE = "type";
private static final String NAME = "name";
private static final String NAME = "extensionName";
private static final String EXTENSION_BUILDER_INTERFACE_SERVICE_FILE =
"META-INF/services/org.apache.falcon.extensions.ExtensionBuilder";

Expand Down Expand Up @@ -185,8 +185,6 @@ static List<URL> getFilesInPath(URL fileURL) throws MalformedURLException {
for (File innerFile : files) {
if (innerFile.isFile()) {
urls.add(innerFile.toURI().toURL());
} else {
urls.addAll(getFilesInPath(file.toURI().toURL()));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,6 @@ public abstract APIResult submitAndSchedule(String entityType, String filePath,
* loadAndPrepare.
* @param configPath path to extension parameters.
* @return
* @throws FalconCLIException
*/
public abstract APIResult submitExtensionJob(String extensionName, String jobName, String configPath,
String doAsUser);
Expand All @@ -216,7 +215,6 @@ public abstract APIResult submitExtensionJob(String extensionName, String jobNam
* loadAndPrepare.
* @param configPath path to extension parameters.
* @return
* @throws FalconCLIException
*/
public abstract APIResult submitAndScheduleExtensionJob(String extensionName, String jobName, String configPath,
String doAsUser);
Expand All @@ -227,10 +225,15 @@ public abstract APIResult submitAndScheduleExtensionJob(String extensionName, St
* loadAndPrepare.
* @param configPath path to extension parameters.
* @return
* @throws FalconCLIException
*/
public abstract APIResult updateExtensionJob(String jobName, String configPath, String doAsUser);

/**
* Deletes the entities that are part of the extension job and then deleted the job from the DB.
* @param jobName name of the extension job that needs to be deleted.
* @return APIResult status of the deletion query.
*/
public abstract APIResult deleteExtensionJob(final String jobName, final String doAsUser);
/**
* Prepares set of entities the extension has implemented to validate the extension job.
* @param jobName job name of the extension job.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.FalconWebException;
import org.apache.falcon.entity.EntityNotRegisteredException;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.parser.ValidationException;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.extensions.jdbc.ExtensionMetaStore;
import org.apache.falcon.extensions.store.ExtensionStore;
import org.apache.falcon.persistence.ExtensionBean;
Expand All @@ -33,7 +37,11 @@
import org.slf4j.LoggerFactory;

import javax.ws.rs.core.Response;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;

/**
* A base class for managing Extension Operations.
Expand Down Expand Up @@ -106,6 +114,28 @@ public APIResult deleteExtensionMetadata(String extensionName){
}
}

protected SortedMap<EntityType, List<Entity>> getJobEntities(ExtensionJobsBean extensionJobsBean)
throws FalconException, IOException {
TreeMap<EntityType, List<Entity>> entityMap = new TreeMap<>();
List<String> processes = extensionJobsBean.getProcesses();
List<String> feeds = extensionJobsBean.getFeeds();
entityMap.put(EntityType.PROCESS, getEntities(processes, EntityType.PROCESS));
entityMap.put(EntityType.FEED, getEntities(feeds, EntityType.FEED));
return entityMap;
}

private List<Entity> getEntities(List<String> entityNames, EntityType entityType) throws FalconException {
List<Entity> entities = new ArrayList<>();
for (String entityName : entityNames) {
try {
entities.add(EntityUtil.getEntity(entityType, entityName));
} catch (EntityNotRegisteredException e) {
LOG.error("Entity {} not found during deletion nothing to do", entityName);
}
}
return entities;
}

private JSONObject buildExtensionJobDetailResult(final String jobName) throws FalconException {
ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
ExtensionJobsBean jobsBean = metaStore.getExtensionJobDetails(jobName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@

import org.apache.falcon.FalconException;
import org.apache.falcon.FalconRuntimException;
import org.apache.falcon.FalconWebException;
import org.apache.falcon.entity.EntityNotRegisteredException;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.resource.APIResult;
import org.apache.falcon.resource.channel.Channel;
Expand All @@ -36,6 +39,9 @@
import static org.apache.falcon.resource.AbstractEntityManager.getApplicableColos;
import static org.apache.falcon.resource.proxy.SchedulableEntityManagerProxy.FALCON_TAG;

/**
* Proxy Util class to proxy entity management apis from prism to servers.
*/
class EntityProxyUtil {
private final Map<String, Channel> entityManagerChannels = new HashMap<>();
private final Map<String, Channel> configSyncChannels = new HashMap<>();
Expand Down Expand Up @@ -89,6 +95,31 @@ protected APIResult doExecute(String colo) throws FalconException {
return results;
}

Map<String, APIResult> proxyDelete(final String type, final String entityName,
final HttpServletRequest bufferedRequest) {
Map<String, APIResult> results = new HashMap<>();
results.put(FALCON_TAG, new EntityProxy(type, entityName) {
@Override
public APIResult execute() {
try {
EntityUtil.getEntity(type, entityName);
return super.execute();
} catch (EntityNotRegisteredException e) {
return new APIResult(APIResult.Status.SUCCEEDED,
entityName + "(" + type + ") doesn't exist. Nothing to do");
} catch (FalconException e) {
throw FalconWebException.newAPIException(e);
}
}

@Override
protected APIResult doExecute(String colo) throws FalconException {
return getConfigSyncChannel(colo).invoke("delete", bufferedRequest, type, entityName, colo);
}
}.execute());
return results;
}

Map<String, APIResult> proxyUpdate(final String type, final String entityName, final Boolean skipDryRun,
final HttpServletRequest bufferedRequest, Entity newEntity) {
final Set<String> oldColos = getApplicableColos(type, entityName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ public class ExtensionManagerProxy extends AbstractExtensionManager {
private String currentColo = DeploymentUtil.getCurrentColo();
private EntityProxyUtil entityProxyUtil = new EntityProxyUtil();


private static final String EXTENSION_PROPERTY_JSON_SUFFIX = "-properties.json";
//SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
@GET
Expand Down Expand Up @@ -271,28 +270,26 @@ public APIResult resume(@PathParam("job-name") String jobName,
@Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
@Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON})
public APIResult delete(@PathParam("job-name") String jobName,
@Context HttpServletRequest request,
@DefaultValue("") @QueryParam("doAs") String doAsUser) {
checkIfExtensionServiceIsEnabled();
try {
List<Entity> entities = getEntityList("", "", "", TAG_PREFIX_EXTENSION_JOB + jobName, "", doAsUser);
if (entities.isEmpty()) {
// return failure if the extension job doesn't exist
return new APIResult(APIResult.Status.SUCCEEDED,
"Extension job " + jobName + " doesn't exist. Nothing to delete.");
}
ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
ExtensionJobsBean extensionJobsBean = metaStore.getExtensionJobDetails(jobName);
if (extensionJobsBean == null) {
// return failure if the extension job doesn't exist
return new APIResult(APIResult.Status.SUCCEEDED,
"Extension job " + jobName + " doesn't exist. Nothing to delete.");
}

for (Entity entity : entities) {
// TODO(yzheng): need to remember the entity dependency graph for clean ordered removal
canRemove(entity);
if (entity.getEntityType().isSchedulable() && !DeploymentUtil.isPrism()) {
getWorkflowEngine(entity).delete(entity);
}
configStore.remove(entity.getEntityType(), entity.getName());
}
} catch (FalconException | IOException e) {
SortedMap<EntityType, List<Entity>> entityMap;
try {
entityMap = getJobEntities(extensionJobsBean);
deleteEntities(entityMap, request);
} catch (FalconException | IOException | JAXBException e) {
LOG.error("Error when deleting extension job: " + jobName + ": ", e);
throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR);
}
metaStore.deleteExtensionJob(jobName);
return new APIResult(APIResult.Status.SUCCEEDED, "Extension job " + jobName + " deleted successfully");
}

Expand Down Expand Up @@ -423,6 +420,21 @@ private BufferedRequest getBufferedRequest(HttpServletRequest request) {
return new BufferedRequest(request);
}

private void deleteEntities(SortedMap<EntityType, List<Entity>> entityMap, HttpServletRequest request)
throws IOException, JAXBException {
for (Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()) {
for (final Entity entity : entry.getValue()) {
final HttpServletRequest bufferedRequest = getEntityStream(entity, entity.getEntityType(), request);
final String entityType = entity.getEntityType().toString();
final String entityName = entity.getName();
entityProxyUtil.proxyDelete(entityType, entityName, bufferedRequest);
if (!embeddedMode) {
super.delete(bufferedRequest, entityType, entityName, currentColo);
}
}
}
}

private void submitEntities(String extensionName, String jobName,
SortedMap<EntityType, List<Entity>> entityMap, InputStream configStream,
HttpServletRequest request) throws FalconException, IOException, JAXBException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.commons.lang.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.FalconWebException;
import org.apache.falcon.entity.EntityNotRegisteredException;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
Expand Down Expand Up @@ -222,29 +221,8 @@ public APIResult delete(
throw FalconWebException.newAPIException(e);
}
final HttpServletRequest bufferedRequest = new BufferedRequest(request);
Map<String, APIResult> results = new HashMap<String, APIResult>();

results.put(FALCON_TAG, new EntityProxy(type, entityName) {
@Override
public APIResult execute() {
try {
EntityUtil.getEntity(type, entityName);
return super.execute();
} catch (EntityNotRegisteredException e) {
return new APIResult(APIResult.Status.SUCCEEDED,
entityName + "(" + type + ") doesn't exist. Nothing to do");
} catch (FalconException e) {
throw FalconWebException.newAPIException(e);
}
}

@Override
protected APIResult doExecute(String colo) throws FalconException {
return entityProxyUtil.getConfigSyncChannel(colo).invoke("delete", bufferedRequest, type, entityName,
colo);
}
}.execute());

Map<String, APIResult> results = new HashMap<>();
results.putAll(entityProxyUtil.proxyDelete(type, entityName, bufferedRequest));
// delete only if deleted from everywhere
if (!embeddedMode && results.get(FALCON_TAG).getStatus() == APIResult.Status.SUCCEEDED) {
results.put(PRISM_TAG, super.delete(bufferedRequest, type, entityName, currentColo));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,14 @@ public void onRemove(Entity entity) throws FalconException{
if (entity.getEntityType() != EntityType.PROCESS){
return;
}
backlogMetricStore.deleteEntityInstance(entity.getName());
entityBacklogs.remove(entity);
Process process = EntityUtil.getEntity(entity.getEntityType(), entity.getName());
for(Cluster cluster : process.getClusters().getClusters()){
dropMetric(cluster.getName(), process);
Process process = (Process) entity;
if (process.getSla() != null) {
backlogMetricStore.deleteEntityInstance(entity.getName());
entityBacklogs.remove(entity);
process = EntityUtil.getEntity(entity.getEntityType(), entity.getName());
for (Cluster cluster : process.getClusters().getClusters()) {
dropMetric(cluster.getName(), process);
}
}
}

Expand Down
13 changes: 12 additions & 1 deletion unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@ public APIResult submitAndScheduleExtensionJob(String extensionName, String jobN
}
}

@Override
public APIResult updateExtensionJob(String jobName, String configPath, String doAsUser) {
InputStream configStream = getServletInputStream(configPath);
try {
Expand All @@ -344,10 +345,20 @@ public APIResult updateExtensionJob(String jobName, String configPath, String do
return localExtensionManager.updateExtensionJob(extensionName, jobName, configStream,
entityMap);
} catch (FalconException | IOException e) {
throw new FalconCLIException("Failed in updating the extension job " + jobName);
throw new FalconCLIException("Failed in updating the extension job:" + jobName);
}
}

@Override
public APIResult deleteExtensionJob(String jobName, String doAsUser) {
try {
return localExtensionManager.deleteExtensionJob(jobName);
} catch (FalconException | IOException e) {
throw new FalconCLIException("Failed to delete the extension job:" + jobName);
}
}


@Override
public APIResult getExtensionJobDetails(final String jobName) {
return localExtensionManager.getExtensionJobDetails(jobName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import org.apache.falcon.FalconException;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.extensions.jdbc.ExtensionMetaStore;
import org.apache.falcon.extensions.store.ExtensionStore;
import org.apache.falcon.persistence.ExtensionJobsBean;
import org.apache.falcon.resource.APIResult;
import org.apache.falcon.resource.AbstractExtensionManager;
import org.apache.falcon.security.CurrentUser;
Expand Down Expand Up @@ -86,6 +88,19 @@ public APIResult submitAndSchedulableExtensionJob(String extensionName, String j
return new APIResult(APIResult.Status.SUCCEEDED, "Extension job submitted successfully" + jobName);
}

public APIResult deleteExtensionJob(String jobName) throws FalconException, IOException{
ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
ExtensionJobsBean extensionJobsBean = metaStore.getExtensionJobDetails(jobName);
SortedMap<EntityType, List<Entity>> entityMap = getJobEntities(extensionJobsBean);
for (Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()) {
for (Entity entity : entry.getValue()) {
delete(entity.getEntityType().name(), entity.getName(), null);
}
}
ExtensionStore.getMetaStore().deleteExtensionJob(jobName);
return new APIResult(APIResult.Status.SUCCEEDED, "Extension job " + jobName + " deleted successfully");
}

public APIResult updateExtensionJob(String extensionName, String jobName, InputStream configStream,
SortedMap<EntityType, List<Entity>> entityMap)
throws FalconException, IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,9 @@ APIResult updateExtensionJob(String jobName, String configPath, String doAsUser)
return falconUnitClient.updateExtensionJob(jobName, configPath, doAsUser);
}

APIResult deleteExtensionJob(String jobName, String doAsUser) {
return falconUnitClient.deleteExtensionJob(jobName, doAsUser);
}

public static String overlayParametersOverTemplate(String template,
Map<String, String> overlay) throws IOException {
Expand Down

0 comments on commit 42d3794

Please sign in to comment.