Skip to content
This repository has been archived by the owner on Apr 4, 2021. It is now read-only.

Commit

Permalink
Merge branch 'master' of https://github.com/apache/falcon
Browse files Browse the repository at this point in the history
  • Loading branch information
sandeepSamudrala committed Dec 15, 2016
2 parents e0ad358 + 617d5ab commit 194f36a
Show file tree
Hide file tree
Showing 18 changed files with 462 additions and 154 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,6 @@ logs
falcon-ui/dist
falcon-ui/node
falcon-ui/node_modules

#Resources jar
unit/src/test/resources/falcon-extensions-*-tests.jar
16 changes: 11 additions & 5 deletions cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,16 @@ public void extensionCommand(CommandLine commandLine, FalconClient client) {
} else if (optionsList.contains(UNREGISTER_OPT)) {
validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
result = client.unregisterExtension(extensionName);
}else if (optionsList.contains(DETAIL_OPT)) {
validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
result = client.getExtensionDetail(extensionName);
result = prettyPrintJson(result);
} else if (optionsList.contains(DETAIL_OPT)) {
if (optionsList.contains(JOB_NAME_OPT)) {
validateRequiredParameter(jobName, JOB_NAME_OPT);
result = client.getExtensionJobDetails(jobName);
result = prettyPrintJson(result);
} else {
validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
result = client.getExtensionDetail(extensionName);
result = prettyPrintJson(result);
}
} else if (optionsList.contains(FalconCLIConstants.SUBMIT_OPT)) {
validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
validateRequiredParameter(jobName, JOB_NAME_OPT);
Expand All @@ -113,7 +119,7 @@ public void extensionCommand(CommandLine commandLine, FalconClient client) {
} else if (optionsList.contains(FalconCLIConstants.VALIDATE_OPT)) {
validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
validateRequiredParameter(filePath, FalconCLIConstants.FILE_PATH_OPT);
result = client.validateExtensionJob(extensionName, filePath, doAsUser).getMessage();
result = client.validateExtensionJob(extensionName, jobName, filePath, doAsUser).getMessage();
} else if (optionsList.contains(FalconCLIConstants.SCHEDULE_OPT)) {
validateRequiredParameter(jobName, JOB_NAME_OPT);
result = client.scheduleExtensionJob(jobName, doAsUser).getMessage();
Expand Down
29 changes: 29 additions & 0 deletions client/src/main/java/org/apache/falcon/ExtensionHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -42,6 +44,8 @@
import java.util.List;
import java.util.ServiceLoader;

import static org.apache.falcon.client.FalconClient.OUT;

/**
* Handler class that is responsible for preparing Extension entities.
*/
Expand All @@ -50,6 +54,8 @@ public final class ExtensionHandler {
public static final Logger LOG = LoggerFactory.getLogger(ExtensionHandler.class);
private static final String UTF_8 = CharEncoding.UTF_8;
private static final String TMP_BASE_DIR = String.format("file://%s", System.getProperty("java.io.tmpdir"));
private static final String LOCATION = "location";
private static final String TYPE = "type";

public List<Entity> getEntities(ClassLoader extensionClassloader, String extensionName, String jobName,
InputStream configStream) throws IOException, FalconException {
Expand Down Expand Up @@ -186,4 +192,27 @@ public static List<URL> getFilesInPath(URL fileURL) throws MalformedURLException
urls.add(fileURL);
return urls;
}


public static String getExtensionLocation(String extensionName, JSONObject extensionDetailJson) {
String extensionBuildPath;
try {
extensionBuildPath = extensionDetailJson.get(LOCATION).toString();
} catch (JSONException e) {
OUT.get().print("Error. " + extensionName + " not found ");
throw new FalconCLIException("Failed to get extension type for the given extension");
}
return extensionBuildPath;
}

public static String getExtensionType(String extensionName, JSONObject extensionDetailJson) {
String extensionType;
try {
extensionType = extensionDetailJson.get(TYPE).toString();
} catch (JSONException e) {
OUT.get().print("Error. " + extensionName + " not found ");
throw new FalconCLIException("Failed to get extension type for the given extension");
}
return extensionType;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ public abstract APIResult submitAndSchedule(String entityType, String filePath,
public abstract String unregisterExtension(String extensionName);

/**
* Prepare set of entities the extension has implemented and stage them to a local directory and submit them too.
* Prepares set of entities the extension has implemented and stage them to a local directory and submit them too.
* @param extensionName extension which is available in the store.
* @param jobName name to be used in all the extension entities' tagging that are built as part of
* loadAndPrepare.
Expand All @@ -209,7 +209,7 @@ public abstract APIResult submitExtensionJob(String extensionName, String jobNam
String doAsUser);

/**
* Prepare set of entities the extension has implemented and stage them to a local directory and submits and
* Prepares set of entities the extension has implemented and stage them to a local directory and submits and
* schedules them.
* @param extensionName extension which is available in the store.
* @param jobName name to be used in all the extension entities' tagging that are built as part of
Expand All @@ -221,6 +221,13 @@ public abstract APIResult submitExtensionJob(String extensionName, String jobNam
public abstract APIResult submitAndScheduleExtensionJob(String extensionName, String jobName, String configPath,
String doAsUser);

/**
* Prepares set of entities the extension has implemented to validate the extension job.
* @param jobName job name of the extension job.
* @return
*/
public abstract String getExtensionJobDetails(final String jobName);

/**
*
* Get list of the entities.
Expand Down Expand Up @@ -514,7 +521,7 @@ protected InputStream getServletInputStream(String filePath) {
try {
stream = new FileInputStream(filePath);
} catch (FileNotFoundException e) {
throw new FalconCLIException("File not found:", e);
throw new FalconCLIException("File not found:" + filePath, e);
}
return stream;
}
Expand Down
86 changes: 49 additions & 37 deletions client/src/main/java/org/apache/falcon/client/FalconClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.api.client.config.DefaultClientConfig;
import com.sun.jersey.client.urlconnection.HTTPSProperties;
import com.sun.jersey.core.header.FormDataContentDisposition;
import com.sun.jersey.multipart.FormDataBodyPart;
import com.sun.jersey.multipart.FormDataMultiPart;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -141,7 +139,9 @@ public boolean verify(String hostname, SSLSession sslSession) {
return true;
}
};
private static final String TAG_SEPARATOR = ",";
private static final String FEEDS = "feeds";
private static final String PROCESSES = "processes";
private static final String CONFIG = "config";
private final WebResource service;
private final AuthenticatedURL.Token authenticationToken;

Expand Down Expand Up @@ -357,6 +357,7 @@ protected static enum ExtensionOperations {
DELETE("api/extension/delete", HttpMethod.POST, MediaType.TEXT_XML),
UNREGISTER("api/extension/unregister/", HttpMethod.POST, MediaType.TEXT_PLAIN),
DETAIL("api/extension/detail/", HttpMethod.GET, MediaType.APPLICATION_JSON),
JOB_DETAILS("api/extension/extensionJobDetails/", HttpMethod.GET, MediaType.APPLICATION_JSON),
REGISTER("api/extension/register/", HttpMethod.POST, MediaType.TEXT_PLAIN);

private String path;
Expand Down Expand Up @@ -1029,6 +1030,12 @@ public String getExtensionDetail(final String extensionName) {
return getResponse(String.class, getExtensionDetailResponse(extensionName));
}

public String getExtensionJobDetails(final String jobName) {
ClientResponse clientResponse = new ResourceBuilder().path(ExtensionOperations.JOB_DETAILS.path, jobName)
.call(ExtensionOperations.JOB_DETAILS);
return getResponse(String.class, clientResponse);
}

public ClientResponse getExtensionDetailResponse(final String extensionName) {
return new ResourceBuilder().path(ExtensionOperations.DETAIL.path, extensionName)
.call(ExtensionOperations.DETAIL);
Expand Down Expand Up @@ -1073,16 +1080,16 @@ private FormDataMultiPart getEntitiesForm(String extensionName, String jobName,
List<Entity> entities = validateExtensionAndGetEntities(extensionName, jobName, configStream);
FormDataMultiPart formDataMultiPart = new FormDataMultiPart();

for (Entity entity : entities) {
if (EntityType.FEED.equals(entity.getEntityType())) {
formDataMultiPart.field("feeds", entity, MediaType.APPLICATION_XML_TYPE);
} else if (EntityType.PROCESS.equals(entity.getEntityType())) {
formDataMultiPart.field("processes", entity, MediaType.APPLICATION_XML_TYPE);
if (entities != null && !entities.isEmpty()) {
for (Entity entity : entities) {
if (EntityType.FEED.equals(entity.getEntityType())) {
formDataMultiPart.field(FEEDS, entity, MediaType.APPLICATION_XML_TYPE);
} else if (EntityType.PROCESS.equals(entity.getEntityType())) {
formDataMultiPart.field(PROCESSES, entity, MediaType.APPLICATION_XML_TYPE);
}
}
}

formDataMultiPart.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("config").build(), configStream,
MediaType.APPLICATION_OCTET_STREAM_TYPE));
formDataMultiPart.field(CONFIG, configStream, MediaType.APPLICATION_OCTET_STREAM_TYPE);
try {
formDataMultiPart.close();
} catch (IOException e) {
Expand All @@ -1094,42 +1101,40 @@ private FormDataMultiPart getEntitiesForm(String extensionName, String jobName,

private List<Entity> validateExtensionAndGetEntities(String extensionName, String jobName,
InputStream configStream) {
ClientResponse clientResponse = getExtensionDetailResponse(extensionName);
List<Entity> entities = getEntities(extensionName, jobName, configStream, clientResponse);
JSONObject extensionDetailJson = getExtensionDetailJson(extensionName);
String extensionType = ExtensionHandler.getExtensionType(extensionName, extensionDetailJson);
String extensionBuildLocation = ExtensionHandler.getExtensionLocation(extensionName, extensionDetailJson);
List<Entity> entities = getEntities(extensionName, jobName, configStream, extensionType,
extensionBuildLocation);
return entities;
}

private List<Entity> getEntities(String extensionName, String jobName, InputStream configStream,
ClientResponse clientResponse) {
JSONObject responseJson;
try {
responseJson = new JSONObject(clientResponse.getEntity(String.class));
} catch (JSONException e) {
OUT.get().print("Submit failed. Failed to get details for the given extension");
throw new FalconCLIException("Submit failed. Failed to get details for the given extension");
}
String extensionType;
String extensionBuildLocation;
private JSONObject getExtensionDetailJson(String extensionName) {
ClientResponse clientResponse = getExtensionDetailResponse(extensionName);
JSONObject extensionDetailJson;
try {
extensionType = responseJson.get("type").toString();
extensionBuildLocation = responseJson.get("location").toString();
extensionDetailJson = new JSONObject(clientResponse.getEntity(String.class));
} catch (JSONException e) {
OUT.get().print("Error. " + extensionName + " not found ");
throw new FalconCLIException("Submit failed. Failed to get details for the given extension");
OUT.get().print("Failed to get details for the given extension");
throw new FalconCLIException("Failed to get details for the given extension");
}
return extensionDetailJson;
}

private List<Entity> getEntities(String extensionName, String jobName, InputStream configStream,
String extensionType, String extensionBuildLocation) {
List<Entity> entities = null;
if (!extensionType.equals(ExtensionType.CUSTOM.name())) {
try {
entities = ExtensionHandler.loadAndPrepare(extensionName, jobName, configStream,
extensionBuildLocation);
} catch (Exception e) {
OUT.get().println("Error in building the extension");
throw new FalconCLIException("Submit failed. Failed to get details for the given extension");
throw new FalconCLIException("Failed to prepare entities for the given extension");
}
if (entities == null || entities.isEmpty()) {
OUT.get().println("No entities got built");
throw new FalconCLIException("Submit failed. Failed to get details for the given extension");
throw new FalconCLIException("Failed to prepare entities for the given extension");
}
}
return entities;
Expand All @@ -1154,13 +1159,20 @@ public APIResult updateExtensionJob(final String extensionName, final String fil
return getResponse(APIResult.class, clientResponse);
}

public APIResult validateExtensionJob(final String extensionName, final String filePath, final String doAsUser) {
InputStream entityStream = getServletInputStream(filePath);
ClientResponse clientResponse = new ResourceBuilder()
.path(ExtensionOperations.VALIDATE.path, extensionName)
.addQueryParam(DO_AS_OPT, doAsUser)
.call(ExtensionOperations.VALIDATE, entityStream);
return getResponse(APIResult.class, clientResponse);
public APIResult validateExtensionJob(final String extensionName, final String jobName,
final String configPath, final String doAsUser) {
String extensionType = ExtensionHandler.getExtensionType(extensionName, getExtensionDetailJson(extensionName));
InputStream configStream = getServletInputStream(configPath);
if (ExtensionType.TRUSTED.name().equalsIgnoreCase(extensionType)) {
ClientResponse clientResponse = new ResourceBuilder()
.path(ExtensionOperations.VALIDATE.path, extensionName)
.addQueryParam(DO_AS_OPT, doAsUser)
.call(ExtensionOperations.VALIDATE, configStream);
return getResponse(APIResult.class, clientResponse);
} else {
validateExtensionAndGetEntities(extensionName, jobName, configStream);
return new APIResult(APIResult.Status.SUCCEEDED, "Validated successfully");
}
}

public APIResult scheduleExtensionJob(final String jobName, final String doAsUser) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
*/
public class CatalogStorage extends Configured implements Storage {

private static final Logger LOG = LoggerFactory.getLogger(EvictionHelper.class);
private static final Logger LOG = LoggerFactory.getLogger(CatalogStorage.class);

// constants to be used while preparing HCatalog partition filter query
private static final String FILTER_ST_BRACKET = "(";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ public ProcessEntityParser() {

@Override
public void validate(Process process) throws FalconException {
validate(process, true);
}

public void validate(Process process, boolean checkDependentFeeds) throws FalconException {
if (process.getTimezone() == null) {
process.setTimezone(TimeZone.getTimeZone("UTC"));
}
Expand Down Expand Up @@ -106,24 +110,27 @@ public void validate(Process process) throws FalconException {
validateHDFSPaths(process, clusterName);
validateProperties(process);

if (process.getInputs() != null) {
for (Input input : process.getInputs().getInputs()) {
validateEntityExists(EntityType.FEED, input.getFeed());
Feed feed = ConfigurationStore.get().get(EntityType.FEED, input.getFeed());
CrossEntityValidations.validateFeedDefinedForCluster(feed, clusterName);
CrossEntityValidations.validateFeedRetentionPeriod(input.getStart(), feed, clusterName);
CrossEntityValidations.validateInstanceRange(process, input, feed);
validateInputPartition(input, feed);
validateOptionalInputsForTableStorage(feed, input);
if (checkDependentFeeds) {
if (process.getInputs() != null) {
for (Input input : process.getInputs().getInputs()) {
validateEntityExists(EntityType.FEED, input.getFeed());
Feed feed = ConfigurationStore.get().get(EntityType.FEED, input.getFeed());
CrossEntityValidations.validateFeedDefinedForCluster(feed, clusterName);
CrossEntityValidations.validateFeedRetentionPeriod(input.getStart(), feed, clusterName);
CrossEntityValidations.validateInstanceRange(process, input, feed);
validateInputPartition(input, feed);
validateOptionalInputsForTableStorage(feed, input);
}
}
}

if (process.getOutputs() != null) {
for (Output output : process.getOutputs().getOutputs()) {
validateEntityExists(EntityType.FEED, output.getFeed());
Feed feed = ConfigurationStore.get().get(EntityType.FEED, output.getFeed());
CrossEntityValidations.validateFeedDefinedForCluster(feed, clusterName);
CrossEntityValidations.validateInstance(process, output, feed);

if (process.getOutputs() != null) {
for (Output output : process.getOutputs().getOutputs()) {
validateEntityExists(EntityType.FEED, output.getFeed());
Feed feed = ConfigurationStore.get().get(EntityType.FEED, output.getFeed());
CrossEntityValidations.validateFeedDefinedForCluster(feed, clusterName);
CrossEntityValidations.validateInstance(process, output, feed);
}
}
}
}
Expand Down

0 comments on commit 194f36a

Please sign in to comment.