Skip to content
This repository has been archived by the owner on Apr 4, 2021. It is now read-only.

Commit

Permalink
Merge branch 'master' of https://github.com/apache/falcon
Browse files Browse the repository at this point in the history
  • Loading branch information
sandeepSamudrala committed Jan 25, 2017
2 parents 7de7798 + 21f0b69 commit 79e8d64
Show file tree
Hide file tree
Showing 13 changed files with 102 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,7 @@ void extensionCommand(CommandLine commandLine, FalconClient client) throws IOExc
validateRequiredParameter(jobName, JOB_NAME_OPT);
result = client.deleteExtensionJob(jobName, doAsUser).getMessage();
} else if (optionsList.contains(FalconCLIConstants.LIST_OPT)) {
validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
ExtensionJobList jobs = client.listExtensionJob(extensionName, doAsUser,
ExtensionJobList jobs = client.getExtensionJobs(extensionName, doAsUser,
commandLine.getOptionValue(FalconCLIConstants.SORT_ORDER_OPT));
result = jobs != null ? jobs.toString() : "No extension job (" + extensionName + ") found.";
} else if (optionsList.contains(INSTANCES_OPT)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.falcon.resource.APIResult;
import org.apache.falcon.resource.EntityList;
import org.apache.falcon.resource.EntitySummaryResult;
import org.apache.falcon.resource.ExtensionJobList;
import org.apache.falcon.resource.FeedInstanceResult;
import org.apache.falcon.resource.FeedLookupResult;
import org.apache.falcon.resource.InstanceDependencyResult;
Expand Down Expand Up @@ -294,6 +295,12 @@ public abstract APIResult submitAndScheduleExtensionJob(String extensionName, St
*/
public abstract APIResult enumerateExtensions();

/**
* Returns all registered jobs for an extension.
* @return
*/
public abstract ExtensionJobList getExtensionJobs(String extensionName, String sortOrder, String doAsUser);

/**
*
* Get list of the entities.
Expand Down
20 changes: 10 additions & 10 deletions client/src/main/java/org/apache/falcon/client/FalconClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -1023,6 +1023,16 @@ public APIResult enumerateExtensions() {
return getResponse(APIResult.class, clientResponse);
}

@Override
public ExtensionJobList getExtensionJobs(String extensionName, String sortOrder, String doAsUser) {
ClientResponse clientResponse = new ResourceBuilder()
.path(ExtensionOperations.LIST.path, extensionName)
.addQueryParam(DO_AS_OPT, doAsUser)
.addQueryParam(SORT_ORDER, sortOrder)
.call(ExtensionOperations.LIST);
return getResponse(ExtensionJobList.class, clientResponse);
}

public APIResult unregisterExtension(final String extensionName) {
ClientResponse clientResponse = new ResourceBuilder()
.path(ExtensionOperations.UNREGISTER.path, extensionName)
Expand Down Expand Up @@ -1246,16 +1256,6 @@ public APIResult deleteExtensionJob(final String jobName, final String doAsUser)
return getResponse(APIResult.class, clientResponse);
}

public ExtensionJobList listExtensionJob(final String extensionName, final String doAsUser,
final String sortOrder) {
ClientResponse clientResponse = new ResourceBuilder()
.path(ExtensionOperations.LIST.path, extensionName)
.addQueryParam(DO_AS_OPT, doAsUser)
.addQueryParam(SORT_ORDER, sortOrder)
.call(ExtensionOperations.LIST);
return getResponse(ExtensionJobList.class, clientResponse);
}

public ExtensionInstanceList listExtensionInstance(final String jobName, final String doAsUser, final String fields,
final String start, final String end, final String status,
final String orderBy, final String sortOrder,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlElementWrapper;
import javax.xml.bind.annotation.XmlRootElement;
import java.util.ArrayList;
import java.util.List;
import java.util.HashMap;
import java.util.Map;

/**
* Extension job list used for marshalling / unmarshalling with REST calls.
Expand All @@ -36,19 +36,23 @@ public class ExtensionJobList {
int numJobs;

@XmlElementWrapper(name = "jobs")
public List<String> job;
public Map<String, String> job;

public ExtensionJobList() {
numJobs = 0;
job = null;
}

public int getNumJobs() {
return numJobs;
}

public ExtensionJobList(int numJobs) {
this.numJobs = numJobs;
job = new ArrayList<>();
job = new HashMap<>();
}

public ExtensionJobList(int numJobs, List<String> extensionJobNames) {
public ExtensionJobList(int numJobs, Map<String, String> extensionJobNames) {
this.numJobs = numJobs;
this.job = extensionJobNames;
}
Expand All @@ -57,8 +61,8 @@ public ExtensionJobList(int numJobs, List<String> extensionJobNames) {
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append(numJobs).append("\n");
for (String extensionJobNames : job) {
builder.append(extensionJobNames);
for (Map.Entry<String, String> extensionJobs : job.entrySet()) {
builder.append(extensionJobs.getKey() + "\t" + extensionJobs.getValue() + "\n");
}
return builder.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
@NamedQuery(name = PersistenceConstants.GET_ALL_EXTENSION_JOBS, query = "select OBJECT(a) from ExtensionJobsBean a "),
@NamedQuery(name = PersistenceConstants.DELETE_EXTENSION_JOB, query = "delete from ExtensionJobsBean a where a.jobName = :jobName "),
@NamedQuery(name = PersistenceConstants.GET_EXTENSION_JOB, query = "select OBJECT(a) from ExtensionJobsBean a where a.jobName = :jobName"),
@NamedQuery(name = PersistenceConstants.GET_JOBS_FOR_AN_EXTENSION, query = "select a.jobName from ExtensionJobsBean a where a.extensionName = :extensionName")
@NamedQuery(name = PersistenceConstants.GET_JOBS_FOR_AN_EXTENSION, query = "select OBJECT(a) from ExtensionJobsBean a where a.extensionName = :extensionName")
})
//RESUME CHECKSTYLE CHECK LineLengthCheck
public class ExtensionJobsBean {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,18 +132,18 @@ public ExtensionBean getDetail(String extensionName) {
}
}

public List<String> getJobsForAnExtension(String extensionName) {
public List<ExtensionJobsBean> getJobsForAnExtension(String extensionName) {
List<ExtensionJobsBean> extensionJobs = new ArrayList<>();
EntityManager entityManager = getEntityManager();
beginTransaction(entityManager);
Query query = entityManager.createNamedQuery(PersistenceConstants.GET_JOBS_FOR_AN_EXTENSION);
query.setParameter(EXTENSION_NAME, extensionName);
List<String> jobNames = new ArrayList<>();
try {
jobNames.addAll((List<String>) query.getResultList());
extensionJobs.addAll(query.getResultList());
return extensionJobs;
} finally {
commitAndCloseTransaction(entityManager);
}
return jobNames;
}

public void deleteExtension(String extensionName) {
Expand Down Expand Up @@ -234,12 +234,14 @@ public ExtensionJobsBean getExtensionJobDetails(String jobName) {
}
}

List<ExtensionJobsBean> getAllExtensionJobs() {
public List<ExtensionJobsBean> getAllExtensionJobs() {
List<ExtensionJobsBean> extensionJobs = new ArrayList<>();
EntityManager entityManager = getEntityManager();
beginTransaction(entityManager);
Query q = entityManager.createNamedQuery(PersistenceConstants.GET_ALL_EXTENSION_JOBS);
try {
return q.getResultList();
extensionJobs.addAll(q.getResultList());
return extensionJobs;
} finally {
commitAndCloseTransaction(entityManager);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public void testExtensionJob() {
stateStore.storeExtensionJob("job1", "test2", feeds, processes, config);

Assert.assertEquals(stateStore.getJobsForAnExtension("test2").size(), 1);
Assert.assertEquals(stateStore.getJobsForAnExtension("test2").get(0), "job1");
Assert.assertEquals(stateStore.getJobsForAnExtension("test2").get(0).getJobName(), "job1");
Assert.assertEquals(stateStore.getAllExtensionJobs().size(), 1);
Assert.assertEquals(stateStore.getExtensionJobDetails("job1").getFeeds().get(0), "testFeed");
stateStore.deleteExtensionJob("job1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.ws.rs.core.Response;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;

Expand All @@ -55,6 +59,8 @@ public class AbstractExtensionManager extends AbstractSchedulableEntityManager {
private static final String CONFIG = "config";
private static final String CREATION_TIME = "creationTime";
private static final String LAST_UPDATE_TIME = "lastUpdatedTime";
protected static final String ASCENDING_SORT_ORDER = "asc";
protected static final String DESCENDING_SORT_ORDER = "desc";

public static final String NAME = "name";
public static final String STATUS = "status";
Expand Down Expand Up @@ -105,6 +111,39 @@ public APIResult getExtensions() {
}
}

public ExtensionJobList getExtensionJobs(String extensionName, String sortOrder, String doAsUser) {

Comparator<ExtensionJobsBean> compareByJobName = new Comparator<ExtensionJobsBean>() {
@Override
public int compare(ExtensionJobsBean o1, ExtensionJobsBean o2) {
return o1.getJobName().compareToIgnoreCase(o2.getJobName());
}
};

Map<String, String> jobAndExtensionNames = new HashMap<>();
List<ExtensionJobsBean> extensionJobs = null;
if (extensionName != null) {
extensionJobs = ExtensionStore.getMetaStore().getJobsForAnExtension(extensionName);
} else {
extensionJobs = ExtensionStore.getMetaStore().getAllExtensionJobs();
}

sortOrder = (sortOrder == null) ? ASCENDING_SORT_ORDER : sortOrder;
switch (sortOrder.toLowerCase()) {
case DESCENDING_SORT_ORDER:
Collections.sort(extensionJobs, Collections.reverseOrder(compareByJobName));
break;

default:
Collections.sort(extensionJobs, compareByJobName);
}

for (ExtensionJobsBean job : extensionJobs) {
jobAndExtensionNames.put(job.getJobName(), job.getExtensionName());
}
return new ExtensionJobList(extensionJobs.size(), jobAndExtensionNames);
}

public APIResult deleteExtensionMetadata(String extensionName) {
validateExtensionName(extensionName);
ExtensionStore metaStore = ExtensionStore.get();
Expand All @@ -119,7 +158,7 @@ public APIResult deleteExtensionMetadata(String extensionName) {

private void canDeleteExtension(String extensionName) throws FalconException {
ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
List<String> extensionJobs = metaStore.getJobsForAnExtension(extensionName);
List<ExtensionJobsBean> extensionJobs = metaStore.getJobsForAnExtension(extensionName);
if (!extensionJobs.isEmpty()) {
LOG.error("Extension:{} cannot be unregistered as {} are instances of the extension", extensionName,
ArrayUtils.toString(extensionJobs));
Expand All @@ -130,7 +169,7 @@ private void canDeleteExtension(String extensionName) throws FalconException {

protected SortedMap<EntityType, List<String>> getJobEntities(ExtensionJobsBean extensionJobsBean)
throws FalconException {
TreeMap<EntityType, List<String>> entityMap = new TreeMap<>();
TreeMap<EntityType, List<String>> entityMap = new TreeMap<>(Collections.<EntityType>reverseOrder());
entityMap.put(EntityType.PROCESS, extensionJobsBean.getProcesses());
entityMap.put(EntityType.FEED, extensionJobsBean.getFeeds());
return entityMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.xml.bind.JAXBException;
import java.util.Collections;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Arrays;
Expand All @@ -84,8 +83,6 @@
public class ExtensionManagerProxy extends AbstractExtensionManager {
public static final Logger LOG = LoggerFactory.getLogger(ExtensionManagerProxy.class);

private static final String ASCENDING_SORT_ORDER = "asc";
private static final String DESCENDING_SORT_ORDER = "desc";
private Extension extension = new Extension();
private static final String README = "README";

Expand All @@ -106,15 +103,7 @@ public ExtensionJobList getExtensionJobs(
checkIfExtensionServiceIsEnabled();
checkIfExtensionExists(extensionName);
try {
List<String> jobNames = ExtensionStore.getMetaStore().getJobsForAnExtension(extensionName);
switch (sortOrder.toLowerCase()) {
case DESCENDING_SORT_ORDER:
Collections.sort(jobNames, Collections.reverseOrder(String.CASE_INSENSITIVE_ORDER));
break;
default:
Collections.sort(jobNames, String.CASE_INSENSITIVE_ORDER);
}
return new ExtensionJobList(jobNames.size(), jobNames);
return super.getExtensionJobs(extensionName, sortOrder, doAsUser);
} catch (Throwable e) {
LOG.error("Failed to get extension job list of " + extensionName + ": ", e);
throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.falcon.resource.APIResult;
import org.apache.falcon.resource.EntityList;
import org.apache.falcon.resource.EntitySummaryResult;
import org.apache.falcon.resource.ExtensionJobList;
import org.apache.falcon.resource.FeedInstanceResult;
import org.apache.falcon.resource.FeedLookupResult;
import org.apache.falcon.resource.InstanceDependencyResult;
Expand Down Expand Up @@ -414,6 +415,11 @@ public APIResult enumerateExtensions() {
return localExtensionManager.getExtensions();
}

@Override
public ExtensionJobList getExtensionJobs(String extensionName, String sortOrder, String doAsUser) {
return localExtensionManager.getExtensionJobs(extensionName, sortOrder, doAsUser);
}

@Override
public EntityList getEntityList(String entityType, String fields, String nameSubsequence, String tagKeywords,
String filterBy, String filterTags, String orderBy, String sortOrder,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.falcon.persistence.ExtensionJobsBean;
import org.apache.falcon.resource.APIResult;
import org.apache.falcon.resource.AbstractExtensionManager;
import org.apache.falcon.resource.ExtensionJobList;
import org.apache.falcon.security.CurrentUser;

import java.io.IOException;
Expand Down Expand Up @@ -202,4 +203,8 @@ APIResult getExtensionDetails(String extensionName) {
public APIResult getExtensions() {
return super.getExtensions();
}

public ExtensionJobList getExtensionJobs(String extensionName, String sortOrder, String doAsUser) {
return super.getExtensionJobs(extensionName, sortOrder, doAsUser);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.hadoop.JailedFileSystem;
import org.apache.falcon.resource.APIResult;
import org.apache.falcon.resource.ExtensionJobList;
import org.apache.falcon.resource.InstancesResult;
import org.apache.falcon.util.DateUtil;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -244,6 +245,10 @@ APIResult submitExtensionJob(String extensionName, String jobName, String config
return falconUnitClient.submitExtensionJob(extensionName, jobName, configPath, doAsUser);
}

ExtensionJobList getExtensionJobs(String extensionName, String sortOrder, String doAsUser) {
return falconUnitClient.getExtensionJobs(extensionName, sortOrder, doAsUser);
}

public APIResult submitAndScheduleExtensionJob(String extensionName, String jobName, String configPath,
String doAsUser) {
return falconUnitClient.submitAndScheduleExtensionJob(extensionName, jobName, configPath, doAsUser);
Expand Down
5 changes: 5 additions & 0 deletions unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.falcon.resource.APIResult;
import org.apache.falcon.resource.EntityList;
import org.apache.falcon.resource.EntitySummaryResult;
import org.apache.falcon.resource.ExtensionJobList;
import org.apache.falcon.resource.FeedInstanceResult;
import org.apache.falcon.resource.InstanceDependencyResult;
import org.apache.falcon.resource.InstancesResult;
Expand Down Expand Up @@ -458,6 +459,10 @@ public void testExtensionJobOperations() throws Exception {

apiResult = submitExtensionJob(TEST_EXTENSION, TEST_JOB, null, null);
assertStatus(apiResult);

ExtensionJobList extensionJobList = getExtensionJobs(TEST_EXTENSION, null, null);
Assert.assertEquals(extensionJobList.getNumJobs(), 1);

apiResult = getExtensionJobDetails(TEST_JOB);
JSONObject resultJson = new JSONObject(apiResult.getMessage());
Assert.assertEquals(resultJson.get("extensionName"), TEST_EXTENSION);
Expand Down

0 comments on commit 79e8d64

Please sign in to comment.