Skip to content
This repository has been archived by the owner on Apr 4, 2021. It is now read-only.

Commit

Permalink
FALCON-2259 Unregister an extension only if no extension jobs are dep…
Browse files Browse the repository at this point in the history
…endant on the extension
  • Loading branch information
sandeepSamudrala committed Jan 17, 2017
1 parent 7e16263 commit a70f5a9
Show file tree
Hide file tree
Showing 10 changed files with 85 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@
@NamedQueries({
@NamedQuery(name = PersistenceConstants.GET_ALL_EXTENSION_JOBS, query = "select OBJECT(a) from ExtensionJobsBean a "),
@NamedQuery(name = PersistenceConstants.DELETE_EXTENSION_JOB, query = "delete from ExtensionJobsBean a where a.jobName = :jobName "),
@NamedQuery(name = PersistenceConstants.GET_EXTENSION_JOB, query = "select OBJECT(a) from ExtensionJobsBean a where a.jobName = :jobName")
@NamedQuery(name = PersistenceConstants.GET_EXTENSION_JOB, query = "select OBJECT(a) from ExtensionJobsBean a where a.jobName = :jobName"),
@NamedQuery(name = PersistenceConstants.GET_JOBS_FOR_AN_EXTENSION, query = "select OBJECT(a) from ExtensionJobsBean a where a.extensionName = :extensionName")
})
//RESUME CHECKSTYLE CHECK LineLengthCheck
public class ExtensionJobsBean {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ private PersistenceConstants(){
public static final String GET_INSTANCE_SUMMARY_BY_STATE_WITH_RANGE = "GET_INSTANCE_SUMMARY_BY_STATE_WITH_RANGE";

public static final String GET_LATEST_INSTANCE_TIME = "GET_LATEST_INSTANCE_TIME";
public static final String GET_ENTITY_ALERTS = "GET_ENTITY_ALERTS";
public static final String GET_ALL_ENTITY_ALERTS = "GET_ALL_ENTITY_ALERTS";
public static final String GET_SLA_HIGH_CANDIDATES = "GET_SLA_HIGH_CANDIDATES";
static final String GET_ENTITY_ALERTS = "GET_ENTITY_ALERTS";
static final String GET_ALL_ENTITY_ALERTS = "GET_ALL_ENTITY_ALERTS";
static final String GET_SLA_HIGH_CANDIDATES = "GET_SLA_HIGH_CANDIDATES";
public static final String UPDATE_SLA_HIGH = "UPDATE_SLA_HIGH";

public static final String GET_ENTITY_ALERT_INSTANCE = "GET_ENTITY_ALERT_INSTANCE";
Expand All @@ -80,5 +80,6 @@ private PersistenceConstants(){
public static final String GET_ALL_EXTENSION_JOBS = "GET_ALL_EXTENSION_JOBS";
public static final String DELETE_EXTENSION_JOB = "DELETE_EXTENSION_JOB";
public static final String GET_EXTENSION_JOB = "GET_EXTENSION_JOB";
public static final String GET_JOBS_FOR_AN_EXTENSION = "GET_JOBS_FOR_AN_EXTENSION";
public static final String GET_ALL_PROCESS_INFO_INSTANCES = "GET_ALL_PROCESS_INFO_INSTANCES";
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,18 @@ public ExtensionBean getDetail(String extensionName) {
}
}

public List<ExtensionJobsBean> getJobsForAnExtension(String extensionName) {
EntityManager entityManager = getEntityManager();
beginTransaction(entityManager);
Query query = entityManager.createNamedQuery(PersistenceConstants.GET_JOBS_FOR_AN_EXTENSION);
query.setParameter(EXTENSION_NAME, extensionName);
try {
return (List<ExtensionJobsBean>)query.getResultList();
} finally {
commitAndCloseTransaction(entityManager);
}
}

public void deleteExtension(String extensionName){
EntityManager entityManager = getEntityManager();
beginTransaction(entityManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.falcon.extensions.jdbc.ExtensionMetaStore;
import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.persistence.ExtensionBean;
import org.apache.falcon.persistence.ExtensionJobsBean;
import org.apache.falcon.util.StartupProperties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
Expand Down Expand Up @@ -379,6 +380,19 @@ public boolean isExtensionStoreInitialized() {
return (storePath != null);
}

public List<String> getJobsForAnExtension(final String extensionName) throws FalconException {
List<ExtensionJobsBean> extensionJobs = metaStore.getJobsForAnExtension(extensionName);
if (null != extensionJobs && !extensionJobs.isEmpty()){
List<String> extensionJobNames = new ArrayList<>();
for (ExtensionJobsBean extensionJobsBean: extensionJobs) {
extensionJobNames.add(extensionJobsBean.getJobName());
}
return extensionJobNames;
} else {
return null;
}
}

public String updateExtensionStatus(final String extensionName, String currentUser, ExtensionStatus status) throws
FalconException {
validateStatusChange(extensionName, currentUser);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.falcon.resource;

import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.FalconWebException;
Expand Down Expand Up @@ -99,13 +100,21 @@ public APIResult getExtensions() {
}
}

public APIResult deleteExtensionMetadata(String extensionName){
public APIResult deleteExtensionMetadata(String extensionName) throws FalconException {
validateExtensionName(extensionName);
try {
return new APIResult(APIResult.Status.SUCCEEDED, ExtensionStore.get().deleteExtension(extensionName,
CurrentUser.getUser()));
} catch (Throwable e) {
throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR);
return new APIResult(APIResult.Status.SUCCEEDED, deleteExtension(extensionName));
}

private String deleteExtension(String extensionName) throws FalconException {
ExtensionStore metaStore = ExtensionStore.get();
List<String> extensionJobs = metaStore.getJobsForAnExtension(extensionName);
if (null == extensionJobs || extensionJobs.isEmpty()) {
return metaStore.deleteExtension(extensionName, CurrentUser.getUser());
} else {
LOG.error("Extension:{} cannot be unregistered as {} are instances of the extension", extensionName,
ArrayUtils.toString(extensionJobs));
throw new FalconException("Extension:" + extensionName + " cannot be unregistered as following instances"
+ " are dependent on the extension" + ArrayUtils.toString(extensionJobs));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -701,6 +701,8 @@ public APIResult deleteExtensionMetadata(
checkIfExtensionServiceIsEnabled();
try {
return super.deleteExtensionMetadata(extensionName);
} catch (FalconException e){
throw FalconWebException.newAPIException(e, Response.Status.BAD_REQUEST);
} catch (Throwable e) {
throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,11 @@ public APIResult registerExtension(String extensionName, String packagePath, Str

@Override
public APIResult unregisterExtension(String extensionName) {
return localExtensionManager.unRegisterExtension(extensionName);
try {
return localExtensionManager.unRegisterExtension(extensionName);
} catch (FalconException e) {
throw new FalconCLIException("Failed in unRegistering the exnteison"+ e.getMessage());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ APIResult registerExtensionMetadata(String extensionName, String packagePath, St
return super.registerExtensionMetadata(extensionName, packagePath, description, CurrentUser.getUser());
}

APIResult unRegisterExtension(String extensionName) {
APIResult unRegisterExtension(String extensionName) throws FalconException {
return super.deleteExtensionMetadata(extensionName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,29 +218,29 @@ private Map<String, String> updateColoAndCluster(String colo, String cluster, Ma
return props;
}

public String registerExtension(String extensionName, String packagePath, String description)
APIResult registerExtension(String extensionName, String packagePath, String description)
throws IOException, FalconException {

return falconUnitClient.registerExtension(extensionName, packagePath, description).getMessage();
return falconUnitClient.registerExtension(extensionName, packagePath, description);
}

public String disableExtension(String extensionName) {
String disableExtension(String extensionName) {
return falconUnitClient.disableExtension(extensionName).getMessage();
}

public String enableExtension(String extensionName) {
String enableExtension(String extensionName) {
return falconUnitClient.enableExtension(extensionName).getMessage();
}

public String getExtensionJobDetails(String jobName) {
return falconUnitClient.getExtensionJobDetails(jobName).getMessage();
APIResult getExtensionJobDetails(String jobName) {
return falconUnitClient.getExtensionJobDetails(jobName);
}

public String unregisterExtension(String extensionName) {
return falconUnitClient.unregisterExtension(extensionName).getMessage();
APIResult unregisterExtension(String extensionName) {
return falconUnitClient.unregisterExtension(extensionName);
}

public APIResult submitExtensionJob(String extensionName, String jobName, String configPath, String doAsUser) {
APIResult submitExtensionJob(String extensionName, String jobName, String configPath, String doAsUser) {
return falconUnitClient.submitExtensionJob(extensionName, jobName, configPath, doAsUser);
}

Expand Down
40 changes: 21 additions & 19 deletions unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.falcon.FalconException;
import org.apache.falcon.FalconWebException;
import org.apache.falcon.client.FalconCLIException;
import org.apache.falcon.entity.EntityNotRegisteredException;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.process.Process;
Expand Down Expand Up @@ -245,7 +246,7 @@ private void submitClusterAndFeeds() throws IOException {
assertStatus(result);
}

public void setDummyProperty(Process process) {
private void setDummyProperty(Process process) {
Property property = new Property();
property.setName("dummy");
property.setValue("dummy");
Expand Down Expand Up @@ -424,13 +425,12 @@ public void testRegisterAndUnregisterExtension() throws Exception {
clearDB();
submitCluster();
createExtensionPackage();

String result = registerExtension("testExtension", new Path(STORAGE_URL + EXTENSION_PATH).toString()
APIResult apiResult = registerExtension("testExtension", new Path(STORAGE_URL + EXTENSION_PATH).toString()
, "testExtension");
Assert.assertEquals(result, "Extension :testExtension registered successfully.");

result = unregisterExtension("testExtension");
Assert.assertEquals(result, "Deleted extension:testExtension");
assertStatus(apiResult);
apiResult = unregisterExtension("testExtension");
assertStatus(apiResult);
Assert.assertEquals(apiResult.getMessage(), "Deleted extension:testExtension");
}

@Test
Expand All @@ -441,8 +441,8 @@ public void testExtensionJobOperations() throws Exception {
createDir(PROCESS_APP_PATH);
fs.copyFromLocalFile(new Path(getAbsolutePath(WORKFLOW)), new Path(PROCESS_APP_PATH, "workflow.xml"));
String packageBuildLib = new Path(EXTENSION_PATH, "libs/build/").toString();
String result = registerExtension(TEST_EXTENSION, STORAGE_URL + EXTENSION_PATH, TEST_EXTENSION);
Assert.assertEquals(result, "Extension :testExtension registered successfully.");
APIResult apiResult = registerExtension(TEST_EXTENSION, STORAGE_URL + EXTENSION_PATH, TEST_EXTENSION);
assertStatus(apiResult);

disableExtension(TEST_EXTENSION);
createDir(PROCESS_APP_PATH);
Expand All @@ -457,10 +457,10 @@ public void testExtensionJobOperations() throws Exception {
}
enableExtension(TEST_EXTENSION);

APIResult apiResult = submitExtensionJob(TEST_EXTENSION, TEST_JOB, null, null);
apiResult = submitExtensionJob(TEST_EXTENSION, TEST_JOB, null, null);
assertStatus(apiResult);
result = getExtensionJobDetails(TEST_JOB);
JSONObject resultJson = new JSONObject(result);
apiResult = getExtensionJobDetails(TEST_JOB);
JSONObject resultJson = new JSONObject(apiResult.getMessage());
Assert.assertEquals(resultJson.get("extensionName"), TEST_EXTENSION);
Process process = (Process) getClient().getDefinition(EntityType.PROCESS.toString(), "sample", null);
Assert.assertEquals(process.getPipelines(), "testPipeline");
Expand All @@ -482,7 +482,7 @@ public void testExtensionJobOperations() throws Exception {
apiResult = updateExtensionJob(TEST_JOB, getAbsolutePath(EXTENSION_PROPERTIES), null);
assertStatus(apiResult);

String processes = new JSONObject(getExtensionJobDetails(TEST_JOB)).get("processes").toString();
String processes = new JSONObject(getExtensionJobDetails(TEST_JOB).getMessage()).get("processes").toString();
Assert.assertEquals(processes, "sample");
process = (Process) getClient().getDefinition(EntityType.PROCESS.toString(), "sample", null);
Assert.assertEquals(process.getPipelines(), "testSample");
Expand All @@ -491,6 +491,12 @@ public void testExtensionJobOperations() throws Exception {
assertStatus(apiResult);
Assert.assertEquals(apiResult.getMessage(), "RUNNING");

try {
unregisterExtension(TEST_EXTENSION);
Assert.fail("Should have thrown a FalconCLIException");
} catch (FalconCLIException e) {
//Do nothing. Exception expected as there are dependent extension jobs and so extension cannot be deleted.
}
apiResult = deleteExtensionJob(TEST_JOB, null);
assertStatus(apiResult);
try {
Expand All @@ -506,14 +512,10 @@ public void testExtensionJobOperations() throws Exception {
Assert.assertEquals(((APIResult) e.getResponse().getEntity()).getMessage(), "Job name not found:testJob");
//Do nothing. Exception Expected.
}
apiResult = unregisterExtension(TEST_EXTENSION);
assertStatus(apiResult);
}

@Test
public void testExtensionJobSuspendAndResume() throws Exception {

}


private void copyExtensionJar(String destDirPath) throws IOException {
File dir = new File(new Path(JARS_DIR).toUri().toURL().getPath());
for (File file : dir.listFiles()) {
Expand Down

0 comments on commit a70f5a9

Please sign in to comment.