Skip to content
This repository has been archived by the owner on Apr 4, 2021. It is now read-only.

Commit

Permalink
Merge branch 'master' of https://github.com/apache/falcon
Browse files Browse the repository at this point in the history
  • Loading branch information
sandeepSamudrala committed Jan 5, 2017
2 parents 73fbf75 + 3f6b690 commit 26e3350
Show file tree
Hide file tree
Showing 17 changed files with 275 additions and 138 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ public void entityCommand(CommandLine commandLine, FalconClient client) throws I
OUT.get().println(result);
}

private void validateColo(Set<String> optionsList) {
static void validateColo(Set<String> optionsList) {
if (optionsList.contains(FalconCLIConstants.COLO_OPT)) {
throw new FalconCLIException("Invalid argument : " + FalconCLIConstants.COLO_OPT);
}
Expand Down
48 changes: 31 additions & 17 deletions cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,38 +33,44 @@
import org.apache.falcon.resource.ExtensionInstanceList;
import org.apache.falcon.resource.ExtensionJobList;

import java.io.IOException;
import java.io.PrintStream;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;

import static org.apache.falcon.cli.FalconEntityCLI.validateColo;
import static org.apache.falcon.client.FalconCLIConstants.COLO_OPT;
import static org.apache.falcon.client.FalconCLIConstants.COLO_OPT_DESCRIPTION;

/**
* Falcon extensions Command Line Interface - wraps the RESTful API for extensions.
*/
public class FalconExtensionCLI {
public class FalconExtensionCLI extends FalconCLI{
public static final AtomicReference<PrintStream> OUT = new AtomicReference<>(System.out);

// Extension commands
public static final String ENUMERATE_OPT = "enumerate";
public static final String DEFINITION_OPT = "definition";
public static final String DESCRIBE_OPT = "describe";
public static final String INSTANCES_OPT = "instances";
public static final String UNREGISTER_OPT = "unregister";
public static final String DETAIL_OPT = "detail";
public static final String REGISTER_OPT = "register";
public static final String ENABLE_OPT = "enable";
public static final String DISABLE_OPT = "disable";
private static final String ENUMERATE_OPT = "enumerate";
private static final String DEFINITION_OPT = "definition";
private static final String DESCRIBE_OPT = "describe";
private static final String INSTANCES_OPT = "instances";
private static final String UNREGISTER_OPT = "unregister";
private static final String DETAIL_OPT = "detail";
private static final String REGISTER_OPT = "register";
private static final String ENABLE_OPT = "enable";
private static final String DISABLE_OPT = "disable";

// Input parameters
public static final String EXTENSION_NAME_OPT = "extensionName";
public static final String JOB_NAME_OPT = "jobName";
private static final String EXTENSION_NAME_OPT = "extensionName";
private static final String JOB_NAME_OPT = "jobName";
public static final String DESCRIPTION = "description";
public static final String PATH = "path";
private static final String PATH = "path";

public FalconExtensionCLI() {
FalconExtensionCLI() throws Exception {
super();
}

public void extensionCommand(CommandLine commandLine, FalconClient client) {
void extensionCommand(CommandLine commandLine, FalconClient client) throws IOException {
Set<String> optionsList = new HashSet<>();
for (Option option : commandLine.getOptions()) {
optionsList.add(option.getOpt());
Expand All @@ -77,6 +83,8 @@ public void extensionCommand(CommandLine commandLine, FalconClient client) {
String doAsUser = commandLine.getOptionValue(FalconCLIConstants.DO_AS_OPT);
String path = commandLine.getOptionValue(FalconCLIConstants.PATH);
String description = commandLine.getOptionValue(FalconCLIConstants.DESCRIPTION);
String colo = commandLine.getOptionValue(FalconCLIConstants.COLO_OPT);
colo = getColo(colo);

if (optionsList.contains(ENUMERATE_OPT)) {
result = client.enumerateExtensions().getMessage();
Expand Down Expand Up @@ -105,6 +113,7 @@ public void extensionCommand(CommandLine commandLine, FalconClient client) {
validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
validateRequiredParameter(jobName, JOB_NAME_OPT);
validateRequiredParameter(filePath, FalconCLIConstants.FILE_PATH_OPT);
validateColo(optionsList);
result = client.submitExtensionJob(extensionName, jobName, filePath, doAsUser).getMessage();
} else if (optionsList.contains(REGISTER_OPT)) {
validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
Expand All @@ -114,6 +123,7 @@ public void extensionCommand(CommandLine commandLine, FalconClient client) {
validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
validateRequiredParameter(jobName, JOB_NAME_OPT);
validateRequiredParameter(filePath, FalconCLIConstants.FILE_PATH_OPT);
validateColo(optionsList);
result = client.submitAndScheduleExtensionJob(extensionName, jobName, filePath, doAsUser).getMessage();
} else if (optionsList.contains(FalconCLIConstants.UPDATE_OPT)) {
validateRequiredParameter(jobName, JOB_NAME_OPT);
Expand All @@ -125,7 +135,8 @@ public void extensionCommand(CommandLine commandLine, FalconClient client) {
result = client.validateExtensionJob(extensionName, jobName, filePath, doAsUser).getMessage();
} else if (optionsList.contains(FalconCLIConstants.SCHEDULE_OPT)) {
validateRequiredParameter(jobName, JOB_NAME_OPT);
result = client.scheduleExtensionJob(jobName, doAsUser).getMessage();
colo = getColo(colo);
result = client.scheduleExtensionJob(jobName, colo, doAsUser).getMessage();
} else if (optionsList.contains(FalconCLIConstants.SUSPEND_OPT)) {
validateRequiredParameter(jobName, JOB_NAME_OPT);
result = client.suspendExtensionJob(jobName, doAsUser).getMessage();
Expand Down Expand Up @@ -170,7 +181,7 @@ public void extensionCommand(CommandLine commandLine, FalconClient client) {
OUT.get().println(result);
}

public Options createExtensionOptions() {
Options createExtensionOptions() {
Options extensionOptions = new Options();

Option enumerate = new Option(ENUMERATE_OPT, false, "Enumerate all extensions");
Expand All @@ -192,6 +203,8 @@ public Options createExtensionOptions() {
Option detail = new Option(FalconCLIConstants.DETAIL, false, "Show details of a given extension");
Option register = new Option(FalconCLIConstants.REGISTER, false, "Register an extension with Falcon. This will "
+ "make the extension available for instantiation for all users.");
Option colo = new Option(COLO_OPT, true, COLO_OPT_DESCRIPTION);
colo.setRequired(false);

OptionGroup group = new OptionGroup();
group.addOption(enumerate);
Expand Down Expand Up @@ -249,6 +262,7 @@ public Options createExtensionOptions() {
extensionOptions.addOption(filePath);
extensionOptions.addOption(path);
extensionOptions.addOption(description);
extensionOptions.addOption(colo);

return extensionOptions;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,13 @@ public abstract APIResult submitAndSchedule(String entityType, String filePath,
public abstract APIResult submitExtensionJob(String extensionName, String jobName, String configPath,
String doAsUser);

/**
* Schedules the set of entities that are part of the extension.
* @param jobName extensionJob that needs to be scheduled.
* @return APIResult stating status of scheduling the extension.
*/
public abstract APIResult scheduleExtensionJob(String jobName, String coloExpr, String doAsUser);

/**
* Prepares set of entities the extension has implemented and stage them to a local directory and submits and
* schedules them.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1200,9 +1200,10 @@ public APIResult validateExtensionJob(final String extensionName, final String j
}
}

public APIResult scheduleExtensionJob(final String jobName, final String doAsUser) {
public APIResult scheduleExtensionJob(String jobName, final String coloExpr, final String doAsUser) {
ClientResponse clientResponse = new ResourceBuilder()
.path(ExtensionOperations.SCHEDULE.path, jobName)
.addQueryParam(COLO, coloExpr)
.addQueryParam(DO_AS_OPT, doAsUser)
.call(ExtensionOperations.SCHEDULE);
return getResponse(APIResult.class, clientResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@
@Entity
@NamedQueries({
@NamedQuery(name = PersistenceConstants.GET_ALL_BACKLOG_INSTANCES, query = "select OBJECT(a) from BacklogMetricBean a "),
@NamedQuery(name = PersistenceConstants.DELETE_BACKLOG_METRIC_INSTANCE, query = "delete from BacklogMetricBean a where a.entityName = :entityName and a.clusterName = :clusterName and a.nominalTime = :nominalTime and a.entityType = :entityType")
@NamedQuery(name = PersistenceConstants.DELETE_BACKLOG_METRIC_INSTANCE, query = "delete from BacklogMetricBean a where a.entityName = :entityName and a.clusterName = :clusterName and a.nominalTime = :nominalTime and a.entityType = :entityType"),
@NamedQuery(name = PersistenceConstants.DELETE_ALL_BACKLOG_ENTITY_INSTANCES, query = "delete from BacklogMetricBean a where a.entityName = :entityName and a.entityType = :entityType")
})
//RESUME CHECKSTYLE CHECK LineLengthCheck

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.falcon.extensions.ExtensionStatus;
import org.apache.falcon.extensions.ExtensionType;
import org.apache.falcon.extensions.store.ExtensionStore;
import org.apache.falcon.persistence.ExtensionBean;
import org.apache.falcon.persistence.ExtensionJobsBean;
import org.apache.falcon.persistence.PersistenceConstants;
Expand Down Expand Up @@ -145,6 +146,11 @@ public void deleteExtension(String extensionName){

public void storeExtensionJob(String jobName, String extensionName, List<String> feeds, List<String> processes,
byte[] config) {
ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
boolean alreadySubmitted = false;
if (metaStore.getExtensionJobDetails(jobName) != null){
alreadySubmitted = true;
}
ExtensionJobsBean extensionJobsBean = new ExtensionJobsBean();
Date currentTime = new Date(System.currentTimeMillis());
extensionJobsBean.setJobName(jobName);
Expand All @@ -157,7 +163,11 @@ public void storeExtensionJob(String jobName, String extensionName, List<String>
EntityManager entityManager = getEntityManager();
try {
beginTransaction(entityManager);
entityManager.persist(extensionJobsBean);
if (alreadySubmitted) {
entityManager.merge(extensionJobsBean);
} else {
entityManager.persist(extensionJobsBean);
}
} finally {
commitAndCloseTransaction(entityManager);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ public void testExtensionJob() {

byte[] config = new byte[0];
stateStore.storeExtensionJob("job1", "test2", feeds, processes, config);
//storing again to check for entity manager merge to let submission go forward.
stateStore.storeExtensionJob("job1", "test2", feeds, processes, config);

Assert.assertEquals(stateStore.getAllExtensionJobs().size(), 1);
Assert.assertEquals(stateStore.getExtensionJobDetails("job1").getFeeds().get(0), "testFeed");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import java.util.Map;

/**
* Backlog Metric Store for entitties.
* Backlog Metric Store for entities' backlog instances.
*/
public class BacklogMetricStore {

Expand Down Expand Up @@ -70,18 +70,19 @@ public synchronized void deleteMetricInstance(String entityName, String cluster,
q.setParameter("clusterName", cluster);
q.setParameter("nominalTime", nominalTime);
q.setParameter("entityType", entityType.name());
try{
try {
q.executeUpdate();
} finally {
commitAndCloseTransaction(entityManager);
}
}

public void deleteEntityInstance(String entityName){
public void deleteEntityBackLogInstances(String entityName, String entityType) {
EntityManager entityManager = getEntityManager();
beginTransaction(entityManager);
Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_ALL_BACKLOG_ENTITY_INSTANCES);
q.setParameter("entityName", entityName);
q.setParameter("entityType", entityType);
try {
q.executeUpdate();
} finally {
Expand Down Expand Up @@ -110,7 +111,7 @@ public Map<Entity, List<MetricInfo>> getAllInstances() throws FalconException {
if (CollectionUtils.isEmpty(result)) {
return null;
}
} finally{
} finally {
entityManager.close();
}

Expand All @@ -121,7 +122,7 @@ public Map<Entity, List<MetricInfo>> getAllInstances() throws FalconException {
if (!backlogMetrics.containsKey(entity)) {
backlogMetrics.put(entity, new ArrayList<MetricInfo>());
}
List<MetricInfo> metrics = backlogMetrics.get(entity);
List<MetricInfo> metrics = backlogMetrics.get(entity);
MetricInfo metricInfo = new MetricInfo(BacklogMetricEmitterService.DATE_FORMAT.get()
.format(backlogMetricBean.getNominalTime()),
backlogMetricBean.getClusterName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,21 @@ public static Set<String> getAllColos() {
}

protected Set<String> getColosFromExpression(String coloExpr, String type, String entity) {
Set<String> colos;
final Set<String> applicableColos = getApplicableColos(type, entity);
return getColosFromExpression(coloExpr, applicableColos);
}

protected Set<String> getColosFromExpression(String coloExpr, String type, Entity entity) {
final Set<String> applicableColos = getApplicableColos(type, entity);
return getColosFromExpression(coloExpr, applicableColos);
}

private Set<String> getColosFromExpression(String coloExpr, Set<String> applicableColos) {
Set<String> colos;
if (coloExpr == null || coloExpr.equals("*") || coloExpr.isEmpty()) {
colos = applicableColos;
} else {
colos = new HashSet<String>(Arrays.asList(coloExpr.split(",")));
colos = new HashSet<>(Arrays.asList(coloExpr.split(",")));
if (!applicableColos.containsAll(colos)) {
throw FalconWebException.newAPIException("Given colos not applicable for entity operation");
}
Expand Down

0 comments on commit 26e3350

Please sign in to comment.