Skip to content
This repository has been archived by the owner on Apr 4, 2021. It is now read-only.

Commit

Permalink
Merge branch 'master' of https://github.com/apache/falcon
Browse files Browse the repository at this point in the history
  • Loading branch information
PracheerAgarwal-zz committed Jan 23, 2017
2 parents 778c579 + c675568 commit 9cd8c17
Show file tree
Hide file tree
Showing 13 changed files with 56 additions and 181 deletions.
Expand Up @@ -152,10 +152,7 @@ void extensionCommand(CommandLine commandLine, FalconClient client) throws IOExc
} else if (optionsList.contains(FalconCLIConstants.LIST_OPT)) {
validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
ExtensionJobList jobs = client.listExtensionJob(extensionName, doAsUser,
commandLine.getOptionValue(FalconCLIConstants.SORT_ORDER_OPT),
commandLine.getOptionValue(FalconCLIConstants.OFFSET_OPT),
commandLine.getOptionValue(FalconCLIConstants.NUM_RESULTS_OPT),
commandLine.getOptionValue(FalconCLIConstants.FIELDS_OPT));
commandLine.getOptionValue(FalconCLIConstants.SORT_ORDER_OPT));
result = jobs != null ? jobs.toString() : "No extension job (" + extensionName + ") found.";
} else if (optionsList.contains(INSTANCES_OPT)) {
validateRequiredParameter(jobName, JOB_NAME_OPT);
Expand Down
Expand Up @@ -1247,15 +1247,11 @@ public APIResult deleteExtensionJob(final String jobName, final String doAsUser)
}

public ExtensionJobList listExtensionJob(final String extensionName, final String doAsUser,
final String sortOrder, final String offset,
final String numResults, final String fields) {
final String sortOrder) {
ClientResponse clientResponse = new ResourceBuilder()
.path(ExtensionOperations.LIST.path, extensionName)
.addQueryParam(DO_AS_OPT, doAsUser)
.addQueryParam(FIELDS, fields)
.addQueryParam(SORT_ORDER, sortOrder)
.addQueryParam(OFFSET, offset)
.addQueryParam(NUM_RESULTS, numResults)
.call(ExtensionOperations.LIST);
return getResponse(ExtensionJobList.class, clientResponse);
}
Expand Down
Expand Up @@ -33,10 +33,10 @@
public class ExtensionJobList {

@XmlElement
public int numJobs;
int numJobs;

@XmlElementWrapper(name = "jobs")
public List<JobElement> job;
public List<String> job;

public ExtensionJobList() {
numJobs = 0;
Expand All @@ -45,54 +45,21 @@ public ExtensionJobList() {

public ExtensionJobList(int numJobs) {
this.numJobs = numJobs;
job = new ArrayList<JobElement>();
job = new ArrayList<>();
}

public ExtensionJobList(int numJobs, List<JobElement> elements) {
public ExtensionJobList(int numJobs, List<String> extensionJobNames) {
this.numJobs = numJobs;
this.job = elements;
}

public void addJob(JobElement element) {
job.add(element);
this.job = extensionJobNames;
}

@Override
public String toString() {
StringBuilder buffer = new StringBuilder();
buffer.append(numJobs + "\n\n");
for (JobElement element : job) {
buffer.append(element.toString());
}
return buffer.toString();
}

/**
* Element for a job.
*/
public static class JobElement {
@XmlElement
public String jobName;

@XmlElement
public EntityList jobEntities;

public JobElement() {
jobName = null;
jobEntities = null;
}

public JobElement(String name, EntityList entities) {
jobName = name;
jobEntities = entities;
}

@Override
public String toString() {
StringBuilder buffer = new StringBuilder();
buffer.append("Job: " + jobName + ", #. entities: ");
buffer.append(jobEntities.toString() + "\n");
return buffer.toString();
StringBuilder builder = new StringBuilder();
builder.append(numJobs).append("\n");
for (String extensionJobNames : job) {
builder.append(extensionJobNames);
}
return builder.toString();
}
}
Expand Up @@ -45,7 +45,7 @@
@NamedQuery(name = PersistenceConstants.GET_ALL_EXTENSION_JOBS, query = "select OBJECT(a) from ExtensionJobsBean a "),
@NamedQuery(name = PersistenceConstants.DELETE_EXTENSION_JOB, query = "delete from ExtensionJobsBean a where a.jobName = :jobName "),
@NamedQuery(name = PersistenceConstants.GET_EXTENSION_JOB, query = "select OBJECT(a) from ExtensionJobsBean a where a.jobName = :jobName"),
@NamedQuery(name = PersistenceConstants.GET_JOBS_FOR_AN_EXTENSION, query = "select OBJECT(a) from ExtensionJobsBean a where a.extensionName = :extensionName")
@NamedQuery(name = PersistenceConstants.GET_JOBS_FOR_AN_EXTENSION, query = "select a.jobName from ExtensionJobsBean a where a.extensionName = :extensionName")
})
//RESUME CHECKSTYLE CHECK LineLengthCheck
public class ExtensionJobsBean {
Expand Down
4 changes: 2 additions & 2 deletions docs/src/site/twiki/falconcli/ExtensionList.twiki
Expand Up @@ -8,9 +8,9 @@ Usage:
$FALCON_HOME/bin/falcon extension -list -extensionName <<extension-name>>

Optional Args : -doAs <<user-name>>
-sortOrder <<sortOrder>> -offset <<offset-number>> -numResults <<number-of-results>> -fields <<field1,field2>>
-sortOrder <<sortOrder>>

<a href="../restapi/ExtensionList.html">Parameters and examples described here.</a>

Example:
$FALCON_HOME/bin/falcon extension -list -extensionName hdfs-mirroring -offset 0 -numResults 10
$FALCON_HOME/bin/falcon extension -list -extensionName hdfs-mirroring
45 changes: 4 additions & 41 deletions docs/src/site/twiki/restapi/ExtensionList.twiki
Expand Up @@ -10,61 +10,24 @@ List jobs generated from an extension.
---++ Parameters
* :extension-name Name of the extension.
* sortOrder <optional> Sort order by job name. Valid options: "asc" (default) and "desc".
* offset <optional> Show results from the offset. Default is 0.
* numResults <optional> Number of results to show per request. Default is 10.
* fields <optional> Output fields separated by commas. Valid options: STATUS, TAGS, PIPELINES, CLUSTERS.
* doAs <optional> Impersonate the user.

---++ Results
Total number of results and a list of jobs generated from the extension, followed by the associated entities.
Total number of results and a list of jobs implementing the given extension.

---++ Examples
---+++ Rest Call
<verbatim>
GET http://localhost:15000/api/extensions/list/billCollection?fields=status,clusters,tags
GET http://localhost:15000/api/extensions/list/billCollection
</verbatim>
---+++ Result
<verbatim>
{
"numJobs":"2",
"jobs":{
"job": [
{
"jobName": "daily-health-bill",
"jobEntities: {
"totalResults": "2",
"entity": [
{
"type":"FEED",
"name":"SampleUSHealthBill",
"status":"SUBMITTED”,
"tags":{"tag":["related=ushealthcare","department=billingDepartment","_falcon_extension_name=billCoollection","_falcon_extension_job=daily-health-bill"]},
"clusters": {"cluster":["SampleCluster1","primaryCluster”]}
},
{
"type":"PROCESS”,
"name":"SampleBillPay”,
"status":"RUNNING”,
"tags":{"tag":["related=healthcare","department=billingDepartment","_falcon_extension_name=billCoollection","_falcon_extension_job=daily-health-bill"]},
"clusters":{"cluster":"primaryCluster”}
}
]
}
},
{
"jobName": "fsa-bill",
"jobEntities": {
"totalResults": "1",
"entity":
{
"type":"PROCESS”,
"name":"FSAPay”,
"status":"RUNNING”,
"tags”:{"tag":["related=healthcare","department=billingDepartment","_falcon_extension_name=billCollection","_falcon_extension_job=fsa-bill"]},
"clusters":{"cluster":"primaryCluster”}
}
}
}
{"daily-health-bill"},
{"fsa-bill"}
]
}
}
Expand Down
Expand Up @@ -27,6 +27,7 @@

import javax.persistence.EntityManager;
import javax.persistence.Query;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

Expand Down Expand Up @@ -74,10 +75,7 @@ public Boolean checkIfExtensionExists(String extensionName) {
} finally {
commitAndCloseTransaction(entityManager);
}
if (resultSize > 0){
return true;
}
return false;
return resultSize > 0;
}

public Boolean checkIfExtensionJobExists(String jobName) {
Expand All @@ -91,18 +89,15 @@ public Boolean checkIfExtensionJobExists(String jobName) {
} finally {
commitAndCloseTransaction(entityManager);
}
if (resultSize > 0){
return true;
}
return false;
return resultSize > 0;
}

public List<ExtensionBean> getAllExtensions() {
EntityManager entityManager = getEntityManager();
beginTransaction(entityManager);
Query q = entityManager.createNamedQuery(PersistenceConstants.GET_ALL_EXTENSIONS);
try {
return (List<ExtensionBean>)q.getResultList();
return (List<ExtensionBean>) q.getResultList();
} finally {
commitAndCloseTransaction(entityManager);
}
Expand All @@ -113,7 +108,7 @@ public void deleteExtensionsOfType(ExtensionType extensionType) {
beginTransaction(entityManager);
Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_EXTENSIONS_OF_TYPE);
q.setParameter(EXTENSION_TYPE, extensionType);
try{
try {
q.executeUpdate();
} finally {
commitAndCloseTransaction(entityManager);
Expand All @@ -128,7 +123,7 @@ public ExtensionBean getDetail(String extensionName) {
try {
List resultList = q.getResultList();
if (!resultList.isEmpty()) {
return (ExtensionBean)resultList.get(0);
return (ExtensionBean) resultList.get(0);
} else {
return null;
}
Expand All @@ -137,24 +132,26 @@ public ExtensionBean getDetail(String extensionName) {
}
}

public List<ExtensionJobsBean> getJobsForAnExtension(String extensionName) {
public List<String> getJobsForAnExtension(String extensionName) {
EntityManager entityManager = getEntityManager();
beginTransaction(entityManager);
Query query = entityManager.createNamedQuery(PersistenceConstants.GET_JOBS_FOR_AN_EXTENSION);
query.setParameter(EXTENSION_NAME, extensionName);
List<String> jobNames = new ArrayList<>();
try {
return (List<ExtensionJobsBean>)query.getResultList();
jobNames.addAll((List<String>) query.getResultList());
} finally {
commitAndCloseTransaction(entityManager);
}
return jobNames;
}

public void deleteExtension(String extensionName){
public void deleteExtension(String extensionName) {
EntityManager entityManager = getEntityManager();
beginTransaction(entityManager);
Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_EXTENSION);
q.setParameter(EXTENSION_NAME, extensionName);
try{
try {
q.executeUpdate();
} finally {
commitAndCloseTransaction(entityManager);
Expand All @@ -165,7 +162,7 @@ public void storeExtensionJob(String jobName, String extensionName, List<String>
byte[] config) {
ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
boolean alreadySubmitted = false;
if (metaStore.getExtensionJobDetails(jobName) != null){
if (metaStore.getExtensionJobDetails(jobName) != null) {
alreadySubmitted = true;
}
ExtensionJobsBean extensionJobsBean = new ExtensionJobsBean();
Expand Down Expand Up @@ -195,7 +192,7 @@ public void deleteExtensionJob(String jobName) {
beginTransaction(entityManager);
Query query = entityManager.createNamedQuery(PersistenceConstants.DELETE_EXTENSION_JOB);
query.setParameter(JOB_NAME, jobName);
try{
try {
query.executeUpdate();
} finally {
commitAndCloseTransaction(entityManager);
Expand Down Expand Up @@ -237,7 +234,7 @@ public ExtensionJobsBean getExtensionJobDetails(String jobName) {
}
}

public List<ExtensionJobsBean> getAllExtensionJobs() {
List<ExtensionJobsBean> getAllExtensionJobs() {
EntityManager entityManager = getEntityManager();
beginTransaction(entityManager);
Query q = entityManager.createNamedQuery(PersistenceConstants.GET_ALL_EXTENSION_JOBS);
Expand Down
Expand Up @@ -28,7 +28,6 @@
import org.apache.falcon.extensions.jdbc.ExtensionMetaStore;
import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.persistence.ExtensionBean;
import org.apache.falcon.persistence.ExtensionJobsBean;
import org.apache.falcon.util.StartupProperties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
Expand Down Expand Up @@ -383,17 +382,6 @@ public boolean isExtensionStoreInitialized() {
return (storePath != null);
}

public List<String> getJobsForAnExtension(final String extensionName) throws FalconException {
List<ExtensionJobsBean> extensionJobs = metaStore.getJobsForAnExtension(extensionName);
List<String> extensionJobNames = new ArrayList<>();
if (null != extensionJobs && !extensionJobs.isEmpty()) {
for (ExtensionJobsBean extensionJobsBean : extensionJobs) {
extensionJobNames.add(extensionJobsBean.getJobName());
}
}
return extensionJobNames;
}

public String updateExtensionStatus(final String extensionName, String currentUser, ExtensionStatus status) throws
FalconException {
validateStatusChange(extensionName, currentUser);
Expand Down
Expand Up @@ -45,7 +45,7 @@ public class ExtensionMetaStoreTest extends AbstractTestExtensionStore {
private static ExtensionMetaStore stateStore;

@BeforeClass
public void setup() throws Exception{
public void setup() throws Exception {
initExtensionStore();
this.dfsCluster = EmbeddedCluster.newCluster("testCluster");
this.conf = dfsCluster.getConf();
Expand All @@ -58,7 +58,7 @@ public void init() {
}

@Test
public void testExtension(){
public void testExtension() {
//insert
stateStore.storeExtensionBean("test1", "test_location", ExtensionType.TRUSTED, "test_description",
"falconUser");
Expand Down Expand Up @@ -86,6 +86,8 @@ public void testExtensionJob() {
//storing again to check for entity manager merge to let submission go forward.
stateStore.storeExtensionJob("job1", "test2", feeds, processes, config);

Assert.assertEquals(stateStore.getJobsForAnExtension("test2").size(), 1);
Assert.assertEquals(stateStore.getJobsForAnExtension("test2").get(0), "job1");
Assert.assertEquals(stateStore.getAllExtensionJobs().size(), 1);
Assert.assertEquals(stateStore.getExtensionJobDetails("job1").getFeeds().get(0), "testFeed");
stateStore.deleteExtensionJob("job1");
Expand Down

0 comments on commit 9cd8c17

Please sign in to comment.