Skip to content
This repository has been archived by the owner on Apr 4, 2021. It is now read-only.

Commit

Permalink
Merge branch 'master' of https://github.com/apache/falcon into FALCON…
Browse files Browse the repository at this point in the history
…-2260
  • Loading branch information
sandeepSamudrala committed Jan 20, 2017
2 parents 8f2a8d4 + 3c01168 commit 7609139
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public List<ExtensionBean> getAllExtensions() {
beginTransaction(entityManager);
Query q = entityManager.createNamedQuery(PersistenceConstants.GET_ALL_EXTENSIONS);
try {
return q.getResultList();
return (List<ExtensionBean>)q.getResultList();
} finally {
commitAndCloseTransaction(entityManager);
}
Expand All @@ -126,7 +126,24 @@ public ExtensionBean getDetail(String extensionName) {
Query q = entityManager.createNamedQuery(PersistenceConstants.GET_EXTENSION);
q.setParameter(EXTENSION_NAME, extensionName);
try {
return (ExtensionBean)q.getSingleResult();
List resultList = q.getResultList();
if (!resultList.isEmpty()) {
return (ExtensionBean)resultList.get(0);
} else {
return null;
}
} finally {
commitAndCloseTransaction(entityManager);
}
}

public List<ExtensionJobsBean> getJobsForAnExtension(String extensionName) {
EntityManager entityManager = getEntityManager();
beginTransaction(entityManager);
Query query = entityManager.createNamedQuery(PersistenceConstants.GET_JOBS_FOR_AN_EXTENSION);
query.setParameter(EXTENSION_NAME, extensionName);
try {
return (List<ExtensionJobsBean>)query.getResultList();
} finally {
commitAndCloseTransaction(entityManager);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public static ExtensionMetaStore getMetaStore() {
private static final String RESOURCES_DIR = "resources";
private static final String LIBS_DIR = "libs";

public static final String EXTENSION_STORE_URI = "extension.store.uri";
static final String EXTENSION_STORE_URI = "extension.store.uri";

private static final ExtensionStore STORE = new ExtensionStore();

Expand Down Expand Up @@ -104,10 +104,9 @@ private void initializeDbTable() {
ExtensionType extensionType = AbstractExtension.isExtensionTrusted(extension)
? ExtensionType.TRUSTED : ExtensionType.CUSTOM;
String description = getShortDescription(extension);
String recipeName = extension;
String location = storePath.toString() + '/' + extension;
String extensionOwner = System.getProperty("user.name");
metaStore.storeExtensionBean(recipeName, location, extensionType, description, extensionOwner);
metaStore.storeExtensionBean(extension, location, extensionType, description, extensionOwner);
}
} catch (FalconException e) {
LOG.error("Exception in ExtensionMetaStore:", e);
Expand Down Expand Up @@ -144,17 +143,20 @@ private FileSystem initializeFileSystem() {
}
}

public Map<String, String> getExtensionArtifacts(final String extensionName) throws
FalconException {
private Map<String, String> getExtensionArtifacts(final String extensionName) throws
FalconException {
Map<String, String> extensionFileMap = new HashMap<>();
Path extensionPath;
try {
RemoteIterator<LocatedFileStatus> fileStatusListIterator;
if (AbstractExtension.isExtensionTrusted(extensionName)){
if (AbstractExtension.isExtensionTrusted(extensionName)) {
extensionPath = new Path(storePath, extensionName.toLowerCase());
fileStatusListIterator = fs.listFiles(extensionPath, true);
}else{
} else {
ExtensionBean extensionBean = metaStore.getDetail(extensionName);
if (null == extensionBean) {
throw new StoreAccessException("Extension not found:" + extensionName);
}
extensionPath = new Path(extensionBean.getLocation());
FileSystem fileSystem = getHdfsFileSystem(extensionBean.getLocation());
fileStatusListIterator = fileSystem.listFiles(extensionPath, true);
Expand All @@ -176,7 +178,6 @@ public Map<String, String> getExtensionArtifacts(final String extensionName) thr
}



public Map<String, String> getExtensionResources(final String extensionName) throws StoreAccessException {
Map<String, String> extensionFileMap = new HashMap<>();
try {
Expand Down Expand Up @@ -244,10 +245,10 @@ public String getExtensionResource(final String resourcePath) throws FalconExcep
InputStream data;

ByteArrayOutputStream writer = new ByteArrayOutputStream();
if (resourcePath.startsWith("file")){
if (resourcePath.startsWith("file")) {
data = fs.open(resourceFile);
IOUtils.copyBytes(data, writer, fs.getConf(), true);
}else{
} else {
FileSystem fileSystem = getHdfsFileSystem(resourcePath);
data = fileSystem.open(resourceFile);
IOUtils.copyBytes(data, writer, fileSystem.getConf(), true);
Expand All @@ -258,7 +259,7 @@ public String getExtensionResource(final String resourcePath) throws FalconExcep
}
}

public List<String> getTrustedExtensions() throws StoreAccessException {
private List<String> getTrustedExtensions() throws StoreAccessException {
List<String> extensionList = new ArrayList<>();
try {
FileStatus[] fileStatuses = fs.listStatus(storePath);
Expand Down Expand Up @@ -298,17 +299,18 @@ private void assertURI(String part, String value) throws ValidationException {
throw new ValidationException(msg);
}
}

private FileSystem getHdfsFileSystem(String path) throws FalconException {
Configuration conf = new Configuration();
URI uri;
try {
uri = new URI(path);
} catch (URISyntaxException e){
} catch (URISyntaxException e) {
LOG.error("Exception : ", e);
throw new FalconException(e);
}
conf.set("fs.default.name", uri.getScheme() + "://" + uri.getAuthority());
return HadoopClientFactory.get().createFalconFileSystem(uri);
return HadoopClientFactory.get().createFalconFileSystem(uri);
}


Expand Down Expand Up @@ -363,7 +365,8 @@ public boolean accept(Path file) {
LOG.info("Extension :" + extensionName + " registered successfully.");
return "Extension :" + extensionName + " registered successfully.";
}
public String getResource(final String extensionName, final String resourceName) throws FalconException {

public String getResource(final String extensionName, final String resourceName) throws FalconException {
Map<String, String> resources = getExtensionArtifacts(extensionName);
if (resources.isEmpty()) {
throw new StoreAccessException("No extension resources found for " + extensionName);
Expand Down Expand Up @@ -394,7 +397,12 @@ public List<String> getJobsForAnExtension(final String extensionName) throws Fal
public String updateExtensionStatus(final String extensionName, String currentUser, ExtensionStatus status) throws
FalconException {
validateStatusChange(extensionName, currentUser);
if (metaStore.getDetail(extensionName).getStatus().equals(status)) {
ExtensionBean extensionBean = metaStore.getDetail(extensionName);
if (extensionBean == null) {
LOG.error("Extension not found: " + extensionName);
throw new FalconException("Extension not found:" + extensionName);
}
if (extensionBean.getStatus().equals(status)) {
throw new ValidationException(extensionName + " is already in " + status.toString() + " state.");
} else {
metaStore.updateExtensionStatus(extensionName, status);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,13 +194,17 @@ private JSONObject buildExtensionDetailResult(final String extensionName) throws
throw new ValidationException("No extension resources found for " + extensionName);
}

ExtensionBean bean = metaStore.getDetail(extensionName);
ExtensionBean extensionBean = metaStore.getDetail(extensionName);
if (extensionBean == null) {
LOG.error("Extension not found: " + extensionName);
throw new FalconException("Extension not found:" + extensionName);
}
JSONObject resultObject = new JSONObject();
try {
resultObject.put(NAME, bean.getExtensionName());
resultObject.put(EXTENSION_TYPE, bean.getExtensionType());
resultObject.put(EXTENSION_DESC, bean.getDescription());
resultObject.put(EXTENSION_LOCATION, bean.getLocation());
resultObject.put(NAME, extensionBean.getExtensionName());
resultObject.put(EXTENSION_TYPE, extensionBean.getExtensionType());
resultObject.put(EXTENSION_DESC, extensionBean.getDescription());
resultObject.put(EXTENSION_LOCATION, extensionBean.getLocation());
} catch (JSONException e) {
LOG.error("Exception in buildDetailResults:", e);
throw new FalconException(e);
Expand Down Expand Up @@ -230,7 +234,13 @@ private static JSONArray buildEnumerateResult() throws FalconException {

protected static void checkIfExtensionIsEnabled(String extensionName) {
ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
if (!metaStore.getDetail(extensionName).getStatus().equals(ExtensionStatus.ENABLED)) {
ExtensionBean extensionBean = metaStore.getDetail(extensionName);
if (extensionBean == null) {
LOG.error("Extension not found: " + extensionName);
throw FalconWebException.newAPIException("Extension not found:" + extensionName,
Response.Status.NOT_FOUND);
}
if (!extensionBean.getStatus().equals(ExtensionStatus.ENABLED)) {
LOG.error("Extension: " + extensionName + " is in disabled state.");
throw FalconWebException.newAPIException("Extension: " + extensionName + " is in disabled state.",
Response.Status.INTERNAL_SERVER_ERROR);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,8 @@ public APIResult getExtensionDescription(
validateExtensionName(extensionName);
try {
return new APIResult(APIResult.Status.SUCCEEDED, ExtensionStore.get().getResource(extensionName, README));
} catch (FalconException e) {
throw FalconWebException.newAPIException(e, Response.Status.BAD_REQUEST);
} catch (Throwable e) {
throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR);
}
Expand Down Expand Up @@ -706,6 +708,8 @@ public APIResult getExtensionDefinition(
try {
return new APIResult(APIResult.Status.SUCCEEDED, ExtensionStore.get().getResource(extensionName,
extensionName.toLowerCase() + EXTENSION_PROPERTY_JSON_SUFFIX));
} catch (FalconException e) {
throw FalconWebException.newAPIException(e, Response.Status.BAD_REQUEST);
} catch (Throwable e) {
throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ public void run() {
if (status.getInstances().length > 0
&& status.getInstances()[0].status == InstancesResult.
WorkflowStatus.SUCCEEDED) {
LOG.debug("Instance of nominaltime {} of entity {} has succeeded, removing "
LOG.debug("Instance of nominal time {} of entity {} has succeeded, removing "
+ "from backlog entries", nominalTimeStr, entity.getName());
backlogMetricStore.deleteMetricInstance(entity.getName(),
metricInfo.getCluster(), nominalTime, entity.getEntityType());
Expand Down

0 comments on commit 7609139

Please sign in to comment.