diff --git a/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java b/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java index 15eb8d510..5d4412821 100644 --- a/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java +++ b/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java @@ -150,8 +150,7 @@ void extensionCommand(CommandLine commandLine, FalconClient client) throws IOExc validateRequiredParameter(jobName, JOB_NAME_OPT); result = client.deleteExtensionJob(jobName, doAsUser).getMessage(); } else if (optionsList.contains(FalconCLIConstants.LIST_OPT)) { - validateRequiredParameter(extensionName, EXTENSION_NAME_OPT); - ExtensionJobList jobs = client.listExtensionJob(extensionName, doAsUser, + ExtensionJobList jobs = client.getExtensionJobs(extensionName, doAsUser, commandLine.getOptionValue(FalconCLIConstants.SORT_ORDER_OPT)); result = jobs != null ? jobs.toString() : "No extension job (" + extensionName + ") found."; } else if (optionsList.contains(INSTANCES_OPT)) { diff --git a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java index 49392c257..7ed669f1a 100644 --- a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java +++ b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java @@ -23,6 +23,7 @@ import org.apache.falcon.resource.APIResult; import org.apache.falcon.resource.EntityList; import org.apache.falcon.resource.EntitySummaryResult; +import org.apache.falcon.resource.ExtensionJobList; import org.apache.falcon.resource.FeedInstanceResult; import org.apache.falcon.resource.FeedLookupResult; import org.apache.falcon.resource.InstanceDependencyResult; @@ -294,6 +295,12 @@ public abstract APIResult submitAndScheduleExtensionJob(String extensionName, St */ public abstract APIResult enumerateExtensions(); + /** + * Returns all registered jobs for an extension. + * @return + */ + public abstract ExtensionJobList getExtensionJobs(String extensionName, String sortOrder, String doAsUser); + /** * * Get list of the entities. diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java index aca83e3d2..25eaeb542 100644 --- a/client/src/main/java/org/apache/falcon/client/FalconClient.java +++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java @@ -1023,6 +1023,16 @@ public APIResult enumerateExtensions() { return getResponse(APIResult.class, clientResponse); } + @Override + public ExtensionJobList getExtensionJobs(String extensionName, String sortOrder, String doAsUser) { + ClientResponse clientResponse = new ResourceBuilder() + .path(ExtensionOperations.LIST.path, extensionName) + .addQueryParam(DO_AS_OPT, doAsUser) + .addQueryParam(SORT_ORDER, sortOrder) + .call(ExtensionOperations.LIST); + return getResponse(ExtensionJobList.class, clientResponse); + } + public APIResult unregisterExtension(final String extensionName) { ClientResponse clientResponse = new ResourceBuilder() .path(ExtensionOperations.UNREGISTER.path, extensionName) @@ -1246,16 +1256,6 @@ public APIResult deleteExtensionJob(final String jobName, final String doAsUser) return getResponse(APIResult.class, clientResponse); } - public ExtensionJobList listExtensionJob(final String extensionName, final String doAsUser, - final String sortOrder) { - ClientResponse clientResponse = new ResourceBuilder() - .path(ExtensionOperations.LIST.path, extensionName) - .addQueryParam(DO_AS_OPT, doAsUser) - .addQueryParam(SORT_ORDER, sortOrder) - .call(ExtensionOperations.LIST); - return getResponse(ExtensionJobList.class, clientResponse); - } - public ExtensionInstanceList listExtensionInstance(final String jobName, final String doAsUser, final String fields, final String start, final String end, final String status, final String orderBy, final String sortOrder, diff --git a/common-types/src/main/java/org/apache/falcon/resource/ExtensionJobList.java b/common-types/src/main/java/org/apache/falcon/resource/ExtensionJobList.java index 244ea66d8..78fd5d426 100644 --- a/common-types/src/main/java/org/apache/falcon/resource/ExtensionJobList.java +++ b/common-types/src/main/java/org/apache/falcon/resource/ExtensionJobList.java @@ -21,8 +21,8 @@ import javax.xml.bind.annotation.XmlElement; import javax.xml.bind.annotation.XmlElementWrapper; import javax.xml.bind.annotation.XmlRootElement; -import java.util.ArrayList; -import java.util.List; +import java.util.HashMap; +import java.util.Map; /** * Extension job list used for marshalling / unmarshalling with REST calls. @@ -36,19 +36,23 @@ public class ExtensionJobList { int numJobs; @XmlElementWrapper(name = "jobs") - public List job; + public Map job; public ExtensionJobList() { numJobs = 0; job = null; } + public int getNumJobs() { + return numJobs; + } + public ExtensionJobList(int numJobs) { this.numJobs = numJobs; - job = new ArrayList<>(); + job = new HashMap<>(); } - public ExtensionJobList(int numJobs, List extensionJobNames) { + public ExtensionJobList(int numJobs, Map extensionJobNames) { this.numJobs = numJobs; this.job = extensionJobNames; } @@ -57,8 +61,8 @@ public ExtensionJobList(int numJobs, List extensionJobNames) { public String toString() { StringBuilder builder = new StringBuilder(); builder.append(numJobs).append("\n"); - for (String extensionJobNames : job) { - builder.append(extensionJobNames); + for (Map.Entry extensionJobs : job.entrySet()) { + builder.append(extensionJobs.getKey() + "\t" + extensionJobs.getValue() + "\n"); } return builder.toString(); } diff --git a/common/src/main/java/org/apache/falcon/persistence/ExtensionJobsBean.java b/common/src/main/java/org/apache/falcon/persistence/ExtensionJobsBean.java index acb5cf4b6..b6ac79d5f 100644 --- a/common/src/main/java/org/apache/falcon/persistence/ExtensionJobsBean.java +++ b/common/src/main/java/org/apache/falcon/persistence/ExtensionJobsBean.java @@ -45,7 +45,7 @@ @NamedQuery(name = PersistenceConstants.GET_ALL_EXTENSION_JOBS, query = "select OBJECT(a) from ExtensionJobsBean a "), @NamedQuery(name = PersistenceConstants.DELETE_EXTENSION_JOB, query = "delete from ExtensionJobsBean a where a.jobName = :jobName "), @NamedQuery(name = PersistenceConstants.GET_EXTENSION_JOB, query = "select OBJECT(a) from ExtensionJobsBean a where a.jobName = :jobName"), - @NamedQuery(name = PersistenceConstants.GET_JOBS_FOR_AN_EXTENSION, query = "select a.jobName from ExtensionJobsBean a where a.extensionName = :extensionName") + @NamedQuery(name = PersistenceConstants.GET_JOBS_FOR_AN_EXTENSION, query = "select OBJECT(a) from ExtensionJobsBean a where a.extensionName = :extensionName") }) //RESUME CHECKSTYLE CHECK LineLengthCheck public class ExtensionJobsBean { diff --git a/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java b/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java index 18c854076..81a4d2b6f 100644 --- a/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java +++ b/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java @@ -132,18 +132,18 @@ public ExtensionBean getDetail(String extensionName) { } } - public List getJobsForAnExtension(String extensionName) { + public List getJobsForAnExtension(String extensionName) { + List extensionJobs = new ArrayList<>(); EntityManager entityManager = getEntityManager(); beginTransaction(entityManager); Query query = entityManager.createNamedQuery(PersistenceConstants.GET_JOBS_FOR_AN_EXTENSION); query.setParameter(EXTENSION_NAME, extensionName); - List jobNames = new ArrayList<>(); try { - jobNames.addAll((List) query.getResultList()); + extensionJobs.addAll(query.getResultList()); + return extensionJobs; } finally { commitAndCloseTransaction(entityManager); } - return jobNames; } public void deleteExtension(String extensionName) { @@ -234,12 +234,14 @@ public ExtensionJobsBean getExtensionJobDetails(String jobName) { } } - List getAllExtensionJobs() { + public List getAllExtensionJobs() { + List extensionJobs = new ArrayList<>(); EntityManager entityManager = getEntityManager(); beginTransaction(entityManager); Query q = entityManager.createNamedQuery(PersistenceConstants.GET_ALL_EXTENSION_JOBS); try { - return q.getResultList(); + extensionJobs.addAll(q.getResultList()); + return extensionJobs; } finally { commitAndCloseTransaction(entityManager); } diff --git a/extensions/src/test/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStoreTest.java b/extensions/src/test/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStoreTest.java index 4c494457e..45d58e0f2 100644 --- a/extensions/src/test/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStoreTest.java +++ b/extensions/src/test/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStoreTest.java @@ -87,7 +87,7 @@ public void testExtensionJob() { stateStore.storeExtensionJob("job1", "test2", feeds, processes, config); Assert.assertEquals(stateStore.getJobsForAnExtension("test2").size(), 1); - Assert.assertEquals(stateStore.getJobsForAnExtension("test2").get(0), "job1"); + Assert.assertEquals(stateStore.getJobsForAnExtension("test2").get(0).getJobName(), "job1"); Assert.assertEquals(stateStore.getAllExtensionJobs().size(), 1); Assert.assertEquals(stateStore.getExtensionJobDetails("job1").getFeeds().get(0), "testFeed"); stateStore.deleteExtensionJob("job1"); diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java index ab08e000c..e1b62ad57 100644 --- a/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java +++ b/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java @@ -37,7 +37,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.ws.rs.core.Response; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.SortedMap; import java.util.TreeMap; @@ -55,6 +59,8 @@ public class AbstractExtensionManager extends AbstractSchedulableEntityManager { private static final String CONFIG = "config"; private static final String CREATION_TIME = "creationTime"; private static final String LAST_UPDATE_TIME = "lastUpdatedTime"; + protected static final String ASCENDING_SORT_ORDER = "asc"; + protected static final String DESCENDING_SORT_ORDER = "desc"; public static final String NAME = "name"; public static final String STATUS = "status"; @@ -105,6 +111,39 @@ public APIResult getExtensions() { } } + public ExtensionJobList getExtensionJobs(String extensionName, String sortOrder, String doAsUser) { + + Comparator compareByJobName = new Comparator() { + @Override + public int compare(ExtensionJobsBean o1, ExtensionJobsBean o2) { + return o1.getJobName().compareToIgnoreCase(o2.getJobName()); + } + }; + + Map jobAndExtensionNames = new HashMap<>(); + List extensionJobs = null; + if (extensionName != null) { + extensionJobs = ExtensionStore.getMetaStore().getJobsForAnExtension(extensionName); + } else { + extensionJobs = ExtensionStore.getMetaStore().getAllExtensionJobs(); + } + + sortOrder = (sortOrder == null) ? ASCENDING_SORT_ORDER : sortOrder; + switch (sortOrder.toLowerCase()) { + case DESCENDING_SORT_ORDER: + Collections.sort(extensionJobs, Collections.reverseOrder(compareByJobName)); + break; + + default: + Collections.sort(extensionJobs, compareByJobName); + } + + for (ExtensionJobsBean job : extensionJobs) { + jobAndExtensionNames.put(job.getJobName(), job.getExtensionName()); + } + return new ExtensionJobList(extensionJobs.size(), jobAndExtensionNames); + } + public APIResult deleteExtensionMetadata(String extensionName) { validateExtensionName(extensionName); ExtensionStore metaStore = ExtensionStore.get(); @@ -119,7 +158,7 @@ public APIResult deleteExtensionMetadata(String extensionName) { private void canDeleteExtension(String extensionName) throws FalconException { ExtensionMetaStore metaStore = ExtensionStore.getMetaStore(); - List extensionJobs = metaStore.getJobsForAnExtension(extensionName); + List extensionJobs = metaStore.getJobsForAnExtension(extensionName); if (!extensionJobs.isEmpty()) { LOG.error("Extension:{} cannot be unregistered as {} are instances of the extension", extensionName, ArrayUtils.toString(extensionJobs)); @@ -130,7 +169,7 @@ private void canDeleteExtension(String extensionName) throws FalconException { protected SortedMap> getJobEntities(ExtensionJobsBean extensionJobsBean) throws FalconException { - TreeMap> entityMap = new TreeMap<>(); + TreeMap> entityMap = new TreeMap<>(Collections.reverseOrder()); entityMap.put(EntityType.PROCESS, extensionJobsBean.getProcesses()); entityMap.put(EntityType.FEED, extensionJobsBean.getFeeds()); return entityMap; diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java index efb548936..b6f405e30 100644 --- a/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java +++ b/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java @@ -62,7 +62,6 @@ import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import javax.xml.bind.JAXBException; -import java.util.Collections; import java.util.ArrayList; import java.util.HashSet; import java.util.Arrays; @@ -84,8 +83,6 @@ public class ExtensionManagerProxy extends AbstractExtensionManager { public static final Logger LOG = LoggerFactory.getLogger(ExtensionManagerProxy.class); - private static final String ASCENDING_SORT_ORDER = "asc"; - private static final String DESCENDING_SORT_ORDER = "desc"; private Extension extension = new Extension(); private static final String README = "README"; @@ -106,15 +103,7 @@ public ExtensionJobList getExtensionJobs( checkIfExtensionServiceIsEnabled(); checkIfExtensionExists(extensionName); try { - List jobNames = ExtensionStore.getMetaStore().getJobsForAnExtension(extensionName); - switch (sortOrder.toLowerCase()) { - case DESCENDING_SORT_ORDER: - Collections.sort(jobNames, Collections.reverseOrder(String.CASE_INSENSITIVE_ORDER)); - break; - default: - Collections.sort(jobNames, String.CASE_INSENSITIVE_ORDER); - } - return new ExtensionJobList(jobNames.size(), jobNames); + return super.getExtensionJobs(extensionName, sortOrder, doAsUser); } catch (Throwable e) { LOG.error("Failed to get extension job list of " + extensionName + ": ", e); throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR); diff --git a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java index 75aeba05e..3150bbd9b 100644 --- a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java +++ b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java @@ -36,6 +36,7 @@ import org.apache.falcon.resource.APIResult; import org.apache.falcon.resource.EntityList; import org.apache.falcon.resource.EntitySummaryResult; +import org.apache.falcon.resource.ExtensionJobList; import org.apache.falcon.resource.FeedInstanceResult; import org.apache.falcon.resource.FeedLookupResult; import org.apache.falcon.resource.InstanceDependencyResult; @@ -414,6 +415,11 @@ public APIResult enumerateExtensions() { return localExtensionManager.getExtensions(); } + @Override + public ExtensionJobList getExtensionJobs(String extensionName, String sortOrder, String doAsUser) { + return localExtensionManager.getExtensionJobs(extensionName, sortOrder, doAsUser); + } + @Override public EntityList getEntityList(String entityType, String fields, String nameSubsequence, String tagKeywords, String filterBy, String filterTags, String orderBy, String sortOrder, diff --git a/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java b/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java index ca39ddb07..addb33359 100644 --- a/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java +++ b/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java @@ -27,6 +27,7 @@ import org.apache.falcon.persistence.ExtensionJobsBean; import org.apache.falcon.resource.APIResult; import org.apache.falcon.resource.AbstractExtensionManager; +import org.apache.falcon.resource.ExtensionJobList; import org.apache.falcon.security.CurrentUser; import java.io.IOException; @@ -202,4 +203,8 @@ APIResult getExtensionDetails(String extensionName) { public APIResult getExtensions() { return super.getExtensions(); } + + public ExtensionJobList getExtensionJobs(String extensionName, String sortOrder, String doAsUser) { + return super.getExtensionJobs(extensionName, sortOrder, doAsUser); + } } diff --git a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java index e9367d52f..2edd424ac 100644 --- a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java +++ b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java @@ -35,6 +35,7 @@ import org.apache.falcon.hadoop.HadoopClientFactory; import org.apache.falcon.hadoop.JailedFileSystem; import org.apache.falcon.resource.APIResult; +import org.apache.falcon.resource.ExtensionJobList; import org.apache.falcon.resource.InstancesResult; import org.apache.falcon.util.DateUtil; import org.apache.hadoop.fs.Path; @@ -244,6 +245,10 @@ APIResult submitExtensionJob(String extensionName, String jobName, String config return falconUnitClient.submitExtensionJob(extensionName, jobName, configPath, doAsUser); } + ExtensionJobList getExtensionJobs(String extensionName, String sortOrder, String doAsUser) { + return falconUnitClient.getExtensionJobs(extensionName, sortOrder, doAsUser); + } + public APIResult submitAndScheduleExtensionJob(String extensionName, String jobName, String configPath, String doAsUser) { return falconUnitClient.submitAndScheduleExtensionJob(extensionName, jobName, configPath, doAsUser); diff --git a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java index 8949c4194..8030f20df 100644 --- a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java +++ b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java @@ -26,6 +26,7 @@ import org.apache.falcon.resource.APIResult; import org.apache.falcon.resource.EntityList; import org.apache.falcon.resource.EntitySummaryResult; +import org.apache.falcon.resource.ExtensionJobList; import org.apache.falcon.resource.FeedInstanceResult; import org.apache.falcon.resource.InstanceDependencyResult; import org.apache.falcon.resource.InstancesResult; @@ -458,6 +459,10 @@ public void testExtensionJobOperations() throws Exception { apiResult = submitExtensionJob(TEST_EXTENSION, TEST_JOB, null, null); assertStatus(apiResult); + + ExtensionJobList extensionJobList = getExtensionJobs(TEST_EXTENSION, null, null); + Assert.assertEquals(extensionJobList.getNumJobs(), 1); + apiResult = getExtensionJobDetails(TEST_JOB); JSONObject resultJson = new JSONObject(apiResult.getMessage()); Assert.assertEquals(resultJson.get("extensionName"), TEST_EXTENSION);