Skip to content
This repository has been archived by the owner on Apr 4, 2021. It is now read-only.

Commit

Permalink
Merge branch 'master' of https://github.com/apache/falcon
Browse files Browse the repository at this point in the history
  • Loading branch information
sandeepSamudrala committed Dec 7, 2016
2 parents f96a084 + c79e5e4 commit e0ad358
Show file tree
Hide file tree
Showing 27 changed files with 500 additions and 127 deletions.
Expand Up @@ -92,8 +92,10 @@ public void extensionCommand(CommandLine commandLine, FalconClient client) {
}else if (optionsList.contains(DETAIL_OPT)) {
validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
result = client.getExtensionDetail(extensionName);
result = prettyPrintJson(result);
} else if (optionsList.contains(FalconCLIConstants.SUBMIT_OPT)) {
validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
validateRequiredParameter(jobName, JOB_NAME_OPT);
validateRequiredParameter(filePath, FalconCLIConstants.FILE_PATH_OPT);
result = client.submitExtensionJob(extensionName, jobName, filePath, doAsUser).getMessage();
} else if (optionsList.contains(REGISTER_OPT)) {
Expand All @@ -103,7 +105,7 @@ public void extensionCommand(CommandLine commandLine, FalconClient client) {
} else if (optionsList.contains(FalconCLIConstants.SUBMIT_AND_SCHEDULE_OPT)) {
validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
validateRequiredParameter(filePath, FalconCLIConstants.FILE_PATH_OPT);
result = client.submitAndScheduleExtensionJob(extensionName, filePath, doAsUser).getMessage();
result = client.submitAndScheduleExtensionJob(extensionName, jobName, filePath, doAsUser).getMessage();
} else if (optionsList.contains(FalconCLIConstants.UPDATE_OPT)) {
validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
validateRequiredParameter(filePath, FalconCLIConstants.FILE_PATH_OPT);
Expand Down
5 changes: 5 additions & 0 deletions client/pom.xml
Expand Up @@ -81,6 +81,11 @@
<artifactId>jersey-json</artifactId>
</dependency>

<dependency>
<groupId>com.sun.jersey.contribs</groupId>
<artifactId>jersey-multipart</artifactId>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
Expand Down
4 changes: 2 additions & 2 deletions client/src/main/java/org/apache/falcon/ExtensionHandler.java
Expand Up @@ -113,8 +113,8 @@ private static void stageEntities(List<Entity> entities, String stagePath) {
type = entity.getEntityType();
OutputStream out;
try {
entityFile = new File(stagePath + File.separator + entity.getEntityType().toString() + "_"
+ URLEncoder.encode(entity.getName(), UTF_8));
entityFile = new File(new Path(stagePath + File.separator + entity.getEntityType().toString() + "_"
+ URLEncoder.encode(entity.getName(), UTF_8)).toUri().toURL().getPath());
if (!entityFile.createNewFile()) {
LOG.debug("Not able to stage the entities in the tmp path");
return;
Expand Down
Expand Up @@ -33,6 +33,8 @@
import org.apache.falcon.resource.TriageResult;

import java.io.ByteArrayInputStream;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
Expand Down Expand Up @@ -178,6 +180,22 @@ public abstract APIResult getStatus(EntityType entityType, String entityName, St
public abstract APIResult submitAndSchedule(String entityType, String filePath, Boolean skipDryRun, String doAsUser,
String properties);

/**
* Registers an extension.
* @param extensionName extensionName of the extension.
* @param packagePath Package location for the extension.
* @param description description of the extension.
* @return Result of the registerExtension command.
*/
public abstract String registerExtension(String extensionName, String packagePath, String description);

/**
*
* @param extensionName extensionName that needs to be unregistered
* @return Result of the unregisterExtension operation
*/
public abstract String unregisterExtension(String extensionName);

/**
* Prepare set of entities the extension has implemented and stage them to a local directory and submit them too.
* @param extensionName extension which is available in the store.
Expand All @@ -190,6 +208,19 @@ public abstract APIResult submitAndSchedule(String entityType, String filePath,
public abstract APIResult submitExtensionJob(String extensionName, String jobName, String configPath,
String doAsUser);

/**
* Prepare set of entities the extension has implemented and stage them to a local directory and submits and
* schedules them.
* @param extensionName extension which is available in the store.
* @param jobName name to be used in all the extension entities' tagging that are built as part of
* loadAndPrepare.
* @param configPath path to extension parameters.
* @return
* @throws FalconCLIException
*/
public abstract APIResult submitAndScheduleExtensionJob(String extensionName, String jobName, String configPath,
String doAsUser);

/**
*
* Get list of the entities.
Expand Down Expand Up @@ -468,6 +499,26 @@ protected InputStream getServletInputStream(String clusters, String sourceCluste
return (buffer.length() == 0) ? null : stream;
}

/**
* Converts a InputStream into ServletInputStream.
*
* @param filePath - Path of file to stream
* @return ServletInputStream
*/
protected InputStream getServletInputStream(String filePath) {

if (filePath == null) {
return null;
}
InputStream stream;
try {
stream = new FileInputStream(filePath);
} catch (FileNotFoundException e) {
throw new FalconCLIException("File not found:", e);
}
return stream;
}

public abstract SchedulableEntityInstanceResult getFeedSlaMissPendingAlerts(String entityType, String entityName,
String start, String end, String colo);

Expand Down
137 changes: 80 additions & 57 deletions client/src/main/java/org/apache/falcon/client/FalconClient.java
Expand Up @@ -23,13 +23,14 @@
import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.api.client.config.DefaultClientConfig;
import com.sun.jersey.client.urlconnection.HTTPSProperties;
import com.sun.jersey.core.header.FormDataContentDisposition;
import com.sun.jersey.multipart.FormDataBodyPart;
import com.sun.jersey.multipart.FormDataMultiPart;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.net.util.TrustManagerUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.LifeCycle;
import org.apache.falcon.ExtensionHandler;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.v0.DateValidator;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
Expand Down Expand Up @@ -64,8 +65,6 @@
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriBuilder;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -124,6 +123,7 @@ public class FalconClient extends AbstractFalconClient {


public static final String DO_AS_OPT = "doAs";
public static final String JOB_NAME_OPT = "jobName";
public static final String ENTITIES_OPT = "entities";
/**
* Name of the HTTP cookie used for the authentication token between the client and the server.
Expand Down Expand Up @@ -747,26 +747,6 @@ public String getDimensionRelations(String dimensionType, String dimensionName,
return sendMetadataDiscoveryRequest(MetadataOperations.RELATIONS, dimensionType, dimensionName, null, doAsUser);
}

/**
* Converts a InputStream into ServletInputStream.
*
* @param filePath - Path of file to stream
* @return ServletInputStream
*/
private InputStream getServletInputStream(String filePath) {

if (filePath == null) {
return null;
}
InputStream stream;
try {
stream = new FileInputStream(filePath);
} catch (FileNotFoundException e) {
throw new FalconCLIException("File not found:", e);
}
return stream;
}

private <T> T getResponse(Class<T> clazz, ClientResponse clientResponse) {
printClientResponse(clientResponse);
checkIfSuccessful(clientResponse);
Expand Down Expand Up @@ -872,6 +852,12 @@ public ClientResponse call(ExtensionOperations operation, InputStream entityStre
.accept(operation.mimeType).type(MediaType.TEXT_XML)
.method(operation.method, ClientResponse.class, entityStream);
}

public ClientResponse call(ExtensionOperations submit, FormDataMultiPart formDataMultiPart) {
return resource.header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
.accept(submit.mimeType).type(MediaType.MULTIPART_FORM_DATA)
.method(submit.method, ClientResponse.class, formDataMultiPart);
}
}

public FeedLookupResult reverseLookUp(String type, String path, String doAsUser) {
Expand Down Expand Up @@ -1040,14 +1026,17 @@ public String unregisterExtension(final String extensionName) {
}

public String getExtensionDetail(final String extensionName) {
ClientResponse clientResponse = new ResourceBuilder().path(ExtensionOperations.DETAIL.path, extensionName)
return getResponse(String.class, getExtensionDetailResponse(extensionName));
}

public ClientResponse getExtensionDetailResponse(final String extensionName) {
return new ResourceBuilder().path(ExtensionOperations.DETAIL.path, extensionName)
.call(ExtensionOperations.DETAIL);
return getResponse(String.class, clientResponse);
}

public String registerExtension(final String extensionName, final String path, final String description) {
public String registerExtension(final String extensionName, final String packagePath, final String description) {
ClientResponse clientResponse = new ResourceBuilder()
.path(ExtensionOperations.REGISTER.path, extensionName).addQueryParam(PATH, path)
.path(ExtensionOperations.REGISTER.path, extensionName).addQueryParam(PATH, packagePath)
.addQueryParam(FalconCLIConstants.DESCRIPTION, description)
.call(ExtensionOperations.REGISTER);
return getResponse(String.class, clientResponse);
Expand All @@ -1070,55 +1059,89 @@ public String getExtensionDescription(final String extensionName) {
@Override
public APIResult submitExtensionJob(final String extensionName, final String jobName, final String configPath,
final String doAsUser) {
FormDataMultiPart entitiesForm = getEntitiesForm(extensionName, jobName, configPath);
ClientResponse clientResponse = new ResourceBuilder()
.path(ExtensionOperations.DETAIL.path)
.call(ExtensionOperations.DETAIL);
JSONObject responseJson = clientResponse.getEntity(JSONObject.class);
ExtensionType extensionType;
.path(ExtensionOperations.SUBMIT.path, extensionName)
.addQueryParam(DO_AS_OPT, doAsUser)
.addQueryParam(JOB_NAME_OPT, jobName)
.call(ExtensionOperations.SUBMIT, entitiesForm);
return getResponse(APIResult.class, clientResponse);
}

private FormDataMultiPart getEntitiesForm(String extensionName, String jobName, String configPath) {
InputStream configStream = getServletInputStream(configPath);
List<Entity> entities = validateExtensionAndGetEntities(extensionName, jobName, configStream);
FormDataMultiPart formDataMultiPart = new FormDataMultiPart();

for (Entity entity : entities) {
if (EntityType.FEED.equals(entity.getEntityType())) {
formDataMultiPart.field("feeds", entity, MediaType.APPLICATION_XML_TYPE);
} else if (EntityType.PROCESS.equals(entity.getEntityType())) {
formDataMultiPart.field("processes", entity, MediaType.APPLICATION_XML_TYPE);
}
}

formDataMultiPart.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("config").build(), configStream,
MediaType.APPLICATION_OCTET_STREAM_TYPE));
try {
formDataMultiPart.close();
} catch (IOException e) {
OUT.get().print("Submit failed. Failed to submit entities");
throw new FalconCLIException("Submit failed. Failed to submit entities", e);
}
return formDataMultiPart;
}

private List<Entity> validateExtensionAndGetEntities(String extensionName, String jobName,
InputStream configStream) {
ClientResponse clientResponse = getExtensionDetailResponse(extensionName);
List<Entity> entities = getEntities(extensionName, jobName, configStream, clientResponse);
return entities;
}

private List<Entity> getEntities(String extensionName, String jobName, InputStream configStream,
ClientResponse clientResponse) {
JSONObject responseJson;
try {
responseJson = new JSONObject(clientResponse.getEntity(String.class));
} catch (JSONException e) {
OUT.get().print("Submit failed. Failed to get details for the given extension");
throw new FalconCLIException("Submit failed. Failed to get details for the given extension");
}
String extensionType;
String extensionBuildLocation;
try {
JSONObject extensionDetailsJson = new JSONObject(responseJson.get("detail").toString());
extensionType = ExtensionType.valueOf(extensionDetailsJson.get("type").toString().toUpperCase());
extensionBuildLocation = extensionDetailsJson.get("location").toString();
extensionType = responseJson.get("type").toString();
extensionBuildLocation = responseJson.get("location").toString();
} catch (JSONException e) {
OUT.get().print("Error. " + extensionName + " not found ");
return null;
throw new FalconCLIException("Submit failed. Failed to get details for the given extension");
}
InputStream configStream = getServletInputStream(configPath);

List<Entity> entities;
if (extensionType.equals(ExtensionType.CUSTOM)) {
List<Entity> entities = null;
if (!extensionType.equals(ExtensionType.CUSTOM.name())) {
try {
entities = ExtensionHandler.loadAndPrepare(extensionName, jobName, configStream, extensionBuildLocation);
entities = ExtensionHandler.loadAndPrepare(extensionName, jobName, configStream,
extensionBuildLocation);
} catch (Exception e) {
OUT.get().println("Error in building the extension");
return null;
throw new FalconCLIException("Submit failed. Failed to get details for the given extension");
}
if (entities == null || entities.isEmpty()) {
OUT.get().println("No entities got built");
return null;
}
try {
EntityUtil.applyTags(extensionName, jobName, entities);
} catch (FalconException e) {
OUT.get().println("Error in applying tags to generated entities");
throw new FalconCLIException("Submit failed. Failed to get details for the given extension");
}
}

clientResponse = new ResourceBuilder()
.path(ExtensionOperations.SUBMIT.path, extensionName)
.addQueryParam(DO_AS_OPT, doAsUser)
.call(ExtensionOperations.SUBMIT, configStream);
return getResponse(APIResult.class, clientResponse);
return entities;
}

public APIResult submitAndScheduleExtensionJob(final String extensionName, final String filePath,
final String doAsUser) {
InputStream entityStream = getServletInputStream(filePath);
public APIResult submitAndScheduleExtensionJob(final String extensionName, final String jobName,
final String configPath, final String doAsUser) {
FormDataMultiPart entitiesForm = getEntitiesForm(extensionName, jobName, configPath);
ClientResponse clientResponse = new ResourceBuilder()
.path(ExtensionOperations.SUBMIT_AND_SCHEDULE.path, extensionName)
.addQueryParam(DO_AS_OPT, doAsUser)
.call(ExtensionOperations.SUBMIT_AND_SCHEDULE, entityStream);
.call(ExtensionOperations.SUBMIT_AND_SCHEDULE, entitiesForm);
return getResponse(APIResult.class, clientResponse);
}

Expand Down
Expand Up @@ -27,7 +27,7 @@ public enum ExtensionType {

private final String text;

private ExtensionType(final String text) {
ExtensionType(final String text) {
this.text = text;
}
@Override
Expand Down
Expand Up @@ -63,7 +63,13 @@ public Boolean checkIfExtensionExists(String extensionName) {
beginTransaction(entityManager);
Query q = entityManager.createNamedQuery(PersistenceConstants.GET_EXTENSION);
q.setParameter(EXTENSION_NAME, extensionName);
if (q.getResultList().size() > 0){
int resultSize = 0;
try {
resultSize = q.getResultList().size();
} finally {
commitAndCloseTransaction(entityManager);
}
if (resultSize > 0){
return true;
}
return false;
Expand Down

0 comments on commit e0ad358

Please sign in to comment.