From e300218dc619eb00bc66cea3fb32e5c88daeb519 Mon Sep 17 00:00:00 2001 From: Pracheer Agarwal Date: Tue, 13 Dec 2016 17:14:11 +0530 Subject: [PATCH 1/6] WIP --- .../apache/falcon/persistence/ExtensionBean.java | 14 ++++++++++++++ .../falcon/persistence/ExtensionJobsBean.java | 14 +++++++++++++- .../falcon/extensions/jdbc/ExtensionMetaStore.java | 6 ++++-- 3 files changed, 31 insertions(+), 3 deletions(-) diff --git a/common/src/main/java/org/apache/falcon/persistence/ExtensionBean.java b/common/src/main/java/org/apache/falcon/persistence/ExtensionBean.java index 2cade5b28..fbeaefb77 100644 --- a/common/src/main/java/org/apache/falcon/persistence/ExtensionBean.java +++ b/common/src/main/java/org/apache/falcon/persistence/ExtensionBean.java @@ -74,6 +74,11 @@ public class ExtensionBean { @Column(name = "creation_time") private Date creationTime; + @Basic + @NotNull + @Column(name = "extension_owner") + private String extensionOwner; + public ExtensionType getExtensionType() { return extensionType; } @@ -114,4 +119,13 @@ public String getDescription() { public void setDescription(String description) { this.description = description; } + + public String getExtensionOwner() { + return extensionOwner; + } + + public void setExtensionOwner(String extensionOwner) { + this.extensionOwner = extensionOwner; + } + } diff --git a/common/src/main/java/org/apache/falcon/persistence/ExtensionJobsBean.java b/common/src/main/java/org/apache/falcon/persistence/ExtensionJobsBean.java index 2dc66f878..05cd4f6de 100644 --- a/common/src/main/java/org/apache/falcon/persistence/ExtensionJobsBean.java +++ b/common/src/main/java/org/apache/falcon/persistence/ExtensionJobsBean.java @@ -86,6 +86,11 @@ public class ExtensionJobsBean { @Column(name = "last_updated_time") private Date lastUpdatedTime; + @Basic + @NotNull + @Column(name = "extension_owner") + private String extensionOwner; + public String getJobName() { return jobName; } @@ -102,7 +107,6 @@ public void setCreationTime(Date creationTime) { this.creationTime = creationTime; } - public byte[] getConfig() { return config; } @@ -142,4 +146,12 @@ public List getProcesses() { public void setProcesses(List processes) { this.processes = processes.toArray(new String[processes.size()]); } + + public String getExtensionOwner() { + return extensionOwner; + } + + public void setExtensionOwner(String extensionOwner) { + this.extensionOwner = extensionOwner; + } } diff --git a/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java b/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java index 456c97c82..a755766e2 100644 --- a/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java +++ b/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java @@ -42,13 +42,14 @@ private EntityManager getEntityManager() { } public void storeExtensionBean(String extensionName, String location, ExtensionType extensionType, - String description){ + String description, String extensionOwner){ ExtensionBean extensionBean = new ExtensionBean(); extensionBean.setLocation(location); extensionBean.setExtensionName(extensionName); extensionBean.setExtensionType(extensionType); extensionBean.setCreationTime(new Date(System.currentTimeMillis())); extensionBean.setDescription(description); + extensionBean.setExtensionOwner(extensionOwner); EntityManager entityManager = getEntityManager(); try { beginTransaction(entityManager); @@ -123,7 +124,7 @@ public void deleteExtension(String extensionName){ } public void storeExtensionJob(String jobName, String extensionName, List feeds, List processes, - byte[] config) { + byte[] config, String extensionOwner) { ExtensionJobsBean extensionJobsBean = new ExtensionJobsBean(); Date currentTime = new Date(System.currentTimeMillis()); extensionJobsBean.setJobName(jobName); @@ -133,6 +134,7 @@ public void storeExtensionJob(String jobName, String extensionName, List extensionJobsBean.setProcesses(processes); extensionJobsBean.setConfig(config); extensionJobsBean.setLastUpdatedTime(currentTime); + extensionJobsBean.setExtensionOwner(extensionOwner); EntityManager entityManager = getEntityManager(); try { beginTransaction(entityManager); From 7bb7b139d2d4ee1abc7fc8aa4b929057edf35127 Mon Sep 17 00:00:00 2001 From: Pracheer Agarwal Date: Wed, 14 Dec 2016 14:09:57 +0530 Subject: [PATCH 2/6] FALCON-2207 ACL check while deleting extensions --- .../falcon/persistence/ExtensionJobsBean.java | 13 ----- .../extensions/jdbc/ExtensionMetaStore.java | 3 +- .../extensions/store/ExtensionStore.java | 22 ++++--- .../jdbc/ExtensionMetaStoreTest.java | 5 +- .../extensions/store/ExtensionStoreTest.java | 58 ++++++++++++++----- .../resource/extensions/ExtensionManager.java | 5 +- 6 files changed, 63 insertions(+), 43 deletions(-) diff --git a/common/src/main/java/org/apache/falcon/persistence/ExtensionJobsBean.java b/common/src/main/java/org/apache/falcon/persistence/ExtensionJobsBean.java index 05cd4f6de..15a4dac26 100644 --- a/common/src/main/java/org/apache/falcon/persistence/ExtensionJobsBean.java +++ b/common/src/main/java/org/apache/falcon/persistence/ExtensionJobsBean.java @@ -86,11 +86,6 @@ public class ExtensionJobsBean { @Column(name = "last_updated_time") private Date lastUpdatedTime; - @Basic - @NotNull - @Column(name = "extension_owner") - private String extensionOwner; - public String getJobName() { return jobName; } @@ -146,12 +141,4 @@ public List getProcesses() { public void setProcesses(List processes) { this.processes = processes.toArray(new String[processes.size()]); } - - public String getExtensionOwner() { - return extensionOwner; - } - - public void setExtensionOwner(String extensionOwner) { - this.extensionOwner = extensionOwner; - } } diff --git a/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java b/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java index a755766e2..661850057 100644 --- a/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java +++ b/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java @@ -124,7 +124,7 @@ public void deleteExtension(String extensionName){ } public void storeExtensionJob(String jobName, String extensionName, List feeds, List processes, - byte[] config, String extensionOwner) { + byte[] config) { ExtensionJobsBean extensionJobsBean = new ExtensionJobsBean(); Date currentTime = new Date(System.currentTimeMillis()); extensionJobsBean.setJobName(jobName); @@ -134,7 +134,6 @@ public void storeExtensionJob(String jobName, String extensionName, List extensionJobsBean.setProcesses(processes); extensionJobsBean.setConfig(config); extensionJobsBean.setLastUpdatedTime(currentTime); - extensionJobsBean.setExtensionOwner(extensionOwner); EntityManager entityManager = getEntityManager(); try { beginTransaction(entityManager); diff --git a/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java b/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java index 76196b77b..b3057f3e3 100644 --- a/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java +++ b/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java @@ -26,6 +26,7 @@ import org.apache.falcon.extensions.ExtensionType; import org.apache.falcon.extensions.jdbc.ExtensionMetaStore; import org.apache.falcon.hadoop.HadoopClientFactory; +import org.apache.falcon.security.CurrentUser; import org.apache.falcon.util.StartupProperties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -103,7 +104,8 @@ private void initializeDbTable() { String description = getShortDescription(extension); String recipeName = extension; String location = storePath.toString() + '/' + extension; - metaStore.storeExtensionBean(recipeName, location, extensionType, description); + String extensionOwner = CurrentUser.getUser(); + metaStore.storeExtensionBean(recipeName, location, extensionType, description, extensionOwner); } } catch (FalconException e){ LOG.error("Exception in ExtensionMetaStore:", e); @@ -251,21 +253,25 @@ public List getExtensions() throws StoreAccessException { } return extensionList; } - public String deleteExtension(final String extensionName) throws ValidationException{ + + public String deleteExtension(final String extensionName, String currentUser) throws ValidationException, + FalconException{ ExtensionType extensionType = AbstractExtension.isExtensionTrusted(extensionName) ? ExtensionType.TRUSTED : ExtensionType.CUSTOM; if (extensionType.equals(ExtensionType.TRUSTED)){ throw new ValidationException(extensionName + " is trusted cannot be deleted."); - } - if (metaStore.checkIfExtensionExists(extensionName)) { + } else if (!metaStore.checkIfExtensionExists(extensionName)){ + throw new FalconException("Extension:" + extensionName + " is not registered with Falcon."); + } else if (!metaStore.getDetail(extensionName).getExtensionOwner().equals(currentUser)) { + throw new FalconException("User: " + currentUser + " is not allowed to delete extension: " + extensionName); + } else { metaStore.deleteExtension(extensionName); return "Deleted extension:" + extensionName; - } else { - return "Extension:" + extensionName + " is not registered with Falcon."; } } - public String registerExtension(final String extensionName, final String path, final String description) + public String registerExtension(final String extensionName, final String path, final String description, + String extensionOwner) throws URISyntaxException, FalconException { Configuration conf = new Configuration(); URI uri = new URI(path); @@ -306,7 +312,7 @@ public boolean accept(Path file){ } if (!metaStore.checkIfExtensionExists(extensionName)){ - metaStore.storeExtensionBean(extensionName, path, ExtensionType.CUSTOM, description); + metaStore.storeExtensionBean(extensionName, path, ExtensionType.CUSTOM, description, extensionOwner); }else{ throw new ValidationException(extensionName + " already exists."); } diff --git a/extensions/src/test/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStoreTest.java b/extensions/src/test/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStoreTest.java index d96fc1f74..60c5ea92b 100644 --- a/extensions/src/test/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStoreTest.java +++ b/extensions/src/test/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStoreTest.java @@ -21,6 +21,7 @@ import org.apache.falcon.extensions.ExtensionType; import org.apache.falcon.extensions.store.AbstractTestExtensionStore; import org.apache.falcon.persistence.ExtensionBean; +import org.apache.falcon.security.CurrentUser; import org.apache.falcon.service.FalconJPAService; import org.apache.hadoop.conf.Configuration; @@ -60,7 +61,7 @@ public void init() { @Test public void testExtension(){ //insert - stateStore.storeExtensionBean("test1", "test_location", ExtensionType.TRUSTED, "test_description"); + stateStore.storeExtensionBean("test1", "test_location", ExtensionType.TRUSTED, "test_description", "falcon"); Assert.assertEquals(stateStore.getAllExtensions().size(), 1); //check data @@ -73,7 +74,7 @@ public void testExtension(){ @Test public void testExtensionJob() { - stateStore.storeExtensionBean("test2", "test_location", ExtensionType.CUSTOM, "test2_description"); + stateStore.storeExtensionBean("test2", "test_location", ExtensionType.CUSTOM, "test2_description", "falcon"); List processes = new ArrayList<>(); processes.add("testProcess"); List feeds = new ArrayList<>(); diff --git a/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java b/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java index 50c9b7f38..efa72e61a 100644 --- a/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java +++ b/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java @@ -25,6 +25,7 @@ import org.apache.falcon.extensions.jdbc.ExtensionMetaStore; import org.apache.falcon.extensions.mirroring.hdfs.HdfsMirroringExtension; import org.apache.falcon.hadoop.JailedFileSystem; +import org.apache.falcon.security.CurrentUser; import org.apache.falcon.service.FalconJPAService; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.LocalFileSystem; @@ -104,32 +105,57 @@ public void clean() { } } - @Test public void testRegisterExtension() throws IOException, URISyntaxException, FalconException{ - createLibs(); - createReadmeAndJar(); - createMETA(); + String extensionPath = EXTENSION_PATH + "testRegister"; + createLibs(extensionPath); + createReadmeAndJar(extensionPath); + createMETA(extensionPath); store = ExtensionStore.get(); - store.registerExtension("test", STORAGE_URL + EXTENSION_PATH, "test desc"); + store.registerExtension("test", STORAGE_URL + extensionPath, "test desc", "falcon"); ExtensionMetaStore metaStore = new ExtensionMetaStore(); Assert.assertEquals(metaStore.getAllExtensions().size(), 1); } @Test(expectedExceptions=ValidationException.class) public void testFailureCaseRegisterExtension() throws IOException, URISyntaxException, FalconException{ + String extensionPath = EXTENSION_PATH + "testRegister"; + store = ExtensionStore.get(); + createLibs(extensionPath); + store.registerExtension("test", STORAGE_URL + EXTENSION_PATH, "test desc", "falcon"); + } + + @Test + public void testDeleteExtension() throws IOException, URISyntaxException, FalconException{ + String extensionPath = EXTENSION_PATH + "testDelete"; + createLibs(extensionPath); + createReadmeAndJar(extensionPath); + createMETA(extensionPath); + store = ExtensionStore.get(); + store.registerExtension("toBeDeleted", STORAGE_URL + extensionPath, "test desc", "falcon"); + store.deleteExtension("toBeDeleted", "falcon"); + ExtensionMetaStore metaStore = new ExtensionMetaStore(); + Assert.assertEquals(metaStore.getAllExtensions().size(), 0); + } + + @Test(expectedExceptions=FalconException.class) + public void testFailureDeleteExtension() throws IOException, URISyntaxException, FalconException{ + String extensionPath = EXTENSION_PATH + "testACLOnDelete"; + createLibs(extensionPath); + createReadmeAndJar(extensionPath); + createMETA(extensionPath); store = ExtensionStore.get(); - createLibs(); - store.registerExtension("test", STORAGE_URL + EXTENSION_PATH, "test desc"); + store.registerExtension("ACLFailure", STORAGE_URL + extensionPath, "test desc", "oozie"); + store.deleteExtension("ACLFailure", "falcon"); } - private void createMETA() throws IOException{ - Path path = new Path(EXTENSION_PATH + "/META"); + private void createMETA(String extensionPath) throws IOException{ + Path path = new Path(extensionPath + "/META"); if (fs.exists(path)){ fs.delete(path, true); } fs.mkdirs(path); - path = new Path(EXTENSION_PATH + "/META/test.properties"); + path = new Path(extensionPath + "/META/test.properties"); OutputStream os = fs.create(path); BufferedWriter br = new BufferedWriter(new OutputStreamWriter(os, "UTF-8")); br.write("Hello World"); @@ -141,23 +167,23 @@ private void createMETA() throws IOException{ br.close(); } - private void createLibs() throws IOException{ - Path path = new Path(EXTENSION_PATH); + private void createLibs(String extensionPath) throws IOException{ + Path path = new Path(extensionPath); if (fs.exists(path)){ fs.delete(path, true); } fs.mkdirs(path); - path = new Path(EXTENSION_PATH + "/libs//libs/build"); + path = new Path(extensionPath + "/libs//libs/build"); fs.mkdirs(path); } - private void createReadmeAndJar() throws IOException{ - Path path = new Path(EXTENSION_PATH + "/README"); + private void createReadmeAndJar(String extensionPath) throws IOException{ + Path path = new Path(extensionPath + "/README"); if (fs.exists(path)){ fs.delete(path, true); } fs.create(path); - path = new Path(EXTENSION_PATH + "/libs/build/test.jar"); + path = new Path(extensionPath + "/libs/build/test.jar"); OutputStream os = fs.create(path); BufferedWriter br = new BufferedWriter(new OutputStreamWriter(os, "UTF-8")); br.write("Hello World"); diff --git a/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java b/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java index 79e36918b..f0b04bfa5 100644 --- a/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java +++ b/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java @@ -41,6 +41,7 @@ import org.apache.falcon.resource.ExtensionInstanceList; import org.apache.falcon.resource.ExtensionJobList; import org.apache.falcon.resource.InstancesResult; +import org.apache.falcon.security.CurrentUser; import org.apache.falcon.service.Services; import org.apache.falcon.util.DeploymentUtil; import org.codehaus.jettison.json.JSONArray; @@ -493,7 +494,7 @@ public String deleteExtensionMetadata( checkIfExtensionServiceIsEnabled(); validateExtensionName(extensionName); try { - return ExtensionStore.get().deleteExtension(extensionName); + return ExtensionStore.get().deleteExtension(extensionName, CurrentUser.getUser()); } catch (Throwable e) { throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR); } @@ -510,7 +511,7 @@ public String registerExtensionMetadata( checkIfExtensionServiceIsEnabled(); validateExtensionName(extensionName); try { - return ExtensionStore.get().registerExtension(extensionName, path, description); + return ExtensionStore.get().registerExtension(extensionName, path, description, CurrentUser.getUser()); } catch (Throwable e) { throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR); } From 3149eefdce58b81163b688fb6b7f45d6d5379ec3 Mon Sep 17 00:00:00 2001 From: Pracheer Agarwal Date: Wed, 14 Dec 2016 14:40:21 +0530 Subject: [PATCH 3/6] FALCON-2207 ACL check while deleting extensions, code review changes --- .../org/apache/falcon/extensions/store/ExtensionStore.java | 3 +-- .../apache/falcon/extensions/jdbc/ExtensionMetaStoreTest.java | 1 - .../org/apache/falcon/extensions/store/ExtensionStoreTest.java | 1 - 3 files changed, 1 insertion(+), 4 deletions(-) diff --git a/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java b/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java index b3057f3e3..89ffb2966 100644 --- a/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java +++ b/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java @@ -254,8 +254,7 @@ public List getExtensions() throws StoreAccessException { return extensionList; } - public String deleteExtension(final String extensionName, String currentUser) throws ValidationException, - FalconException{ + public String deleteExtension(final String extensionName, String currentUser) throws FalconException{ ExtensionType extensionType = AbstractExtension.isExtensionTrusted(extensionName) ? ExtensionType.TRUSTED : ExtensionType.CUSTOM; if (extensionType.equals(ExtensionType.TRUSTED)){ diff --git a/extensions/src/test/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStoreTest.java b/extensions/src/test/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStoreTest.java index 60c5ea92b..a4f46d4b0 100644 --- a/extensions/src/test/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStoreTest.java +++ b/extensions/src/test/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStoreTest.java @@ -21,7 +21,6 @@ import org.apache.falcon.extensions.ExtensionType; import org.apache.falcon.extensions.store.AbstractTestExtensionStore; import org.apache.falcon.persistence.ExtensionBean; -import org.apache.falcon.security.CurrentUser; import org.apache.falcon.service.FalconJPAService; import org.apache.hadoop.conf.Configuration; diff --git a/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java b/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java index efa72e61a..3084da3e2 100644 --- a/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java +++ b/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java @@ -25,7 +25,6 @@ import org.apache.falcon.extensions.jdbc.ExtensionMetaStore; import org.apache.falcon.extensions.mirroring.hdfs.HdfsMirroringExtension; import org.apache.falcon.hadoop.JailedFileSystem; -import org.apache.falcon.security.CurrentUser; import org.apache.falcon.service.FalconJPAService; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.LocalFileSystem; From 089094c3a3389ccbb023e3e3db6e2bb6e3f4f333 Mon Sep 17 00:00:00 2001 From: Pracheer Agarwal Date: Wed, 14 Dec 2016 16:15:59 +0530 Subject: [PATCH 4/6] FALCON-2207 ACL check while deleting extensions, code review changes --- .../src/main/java/org/apache/falcon/util/HdfsClassLoader.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/src/main/java/org/apache/falcon/util/HdfsClassLoader.java b/common/src/main/java/org/apache/falcon/util/HdfsClassLoader.java index bacc09212..23fc7f7e3 100644 --- a/common/src/main/java/org/apache/falcon/util/HdfsClassLoader.java +++ b/common/src/main/java/org/apache/falcon/util/HdfsClassLoader.java @@ -44,7 +44,7 @@ public class HdfsClassLoader extends URLClassLoader { private static final Logger LOG = LoggerFactory.getLogger(HdfsClassLoader.class); - private static Map classLoaderCache = new ConcurrentHashMap(); + private static Map classLoaderCache = new ConcurrentHashMap<>(); private static final Object LOCK = new Object(); public static ClassLoader load(final String name, final List jarHdfsPath) throws IOException { From c5ddc4acc2ecb44450699e443ed04482d2e30ccd Mon Sep 17 00:00:00 2001 From: Pracheer Agarwal Date: Wed, 14 Dec 2016 16:19:08 +0530 Subject: [PATCH 5/6] FALCON-2207 backmerge --- .gitignore | 3 + .../apache/falcon/cli/FalconExtensionCLI.java | 16 +- .../org/apache/falcon/ExtensionHandler.java | 29 +++ .../falcon/client/AbstractFalconClient.java | 11 +- .../apache/falcon/client/FalconClient.java | 86 ++++--- .../entity/parser/ProcessEntityParser.java | 39 +-- common/src/main/resources/startup.properties | 12 +- .../extensions/jdbc/ExtensionMetaStore.java | 18 ++ .../jdbc/ExtensionMetaStoreTest.java | 1 + .../resource/extensions/ExtensionManager.java | 223 ++++++++++++++---- src/conf/startup.properties | 11 +- src/main/assemblies/distributed-package.xml | 10 - .../apache/falcon/unit/FalconUnitClient.java | 35 ++- .../falcon/unit/LocalExtensionManager.java | 21 +- .../falcon/unit/FalconUnitTestBase.java | 4 + .../apache/falcon/unit/TestFalconUnit.java | 4 + 16 files changed, 384 insertions(+), 139 deletions(-) diff --git a/.gitignore b/.gitignore index f23a34df6..b6733d702 100644 --- a/.gitignore +++ b/.gitignore @@ -42,3 +42,6 @@ logs falcon-ui/dist falcon-ui/node falcon-ui/node_modules + +#Resources jar +unit/src/test/resources/falcon-extensions-*-tests.jar diff --git a/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java b/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java index 9b88abee3..c8c66bfe1 100644 --- a/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java +++ b/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java @@ -89,10 +89,16 @@ public void extensionCommand(CommandLine commandLine, FalconClient client) { } else if (optionsList.contains(UNREGISTER_OPT)) { validateRequiredParameter(extensionName, EXTENSION_NAME_OPT); result = client.unregisterExtension(extensionName); - }else if (optionsList.contains(DETAIL_OPT)) { - validateRequiredParameter(extensionName, EXTENSION_NAME_OPT); - result = client.getExtensionDetail(extensionName); - result = prettyPrintJson(result); + } else if (optionsList.contains(DETAIL_OPT)) { + if (optionsList.contains(JOB_NAME_OPT)) { + validateRequiredParameter(jobName, JOB_NAME_OPT); + result = client.getExtensionJobDetails(jobName); + result = prettyPrintJson(result); + } else { + validateRequiredParameter(extensionName, EXTENSION_NAME_OPT); + result = client.getExtensionDetail(extensionName); + result = prettyPrintJson(result); + } } else if (optionsList.contains(FalconCLIConstants.SUBMIT_OPT)) { validateRequiredParameter(extensionName, EXTENSION_NAME_OPT); validateRequiredParameter(jobName, JOB_NAME_OPT); @@ -113,7 +119,7 @@ public void extensionCommand(CommandLine commandLine, FalconClient client) { } else if (optionsList.contains(FalconCLIConstants.VALIDATE_OPT)) { validateRequiredParameter(extensionName, EXTENSION_NAME_OPT); validateRequiredParameter(filePath, FalconCLIConstants.FILE_PATH_OPT); - result = client.validateExtensionJob(extensionName, filePath, doAsUser).getMessage(); + result = client.validateExtensionJob(extensionName, jobName, filePath, doAsUser).getMessage(); } else if (optionsList.contains(FalconCLIConstants.SCHEDULE_OPT)) { validateRequiredParameter(jobName, JOB_NAME_OPT); result = client.scheduleExtensionJob(jobName, doAsUser).getMessage(); diff --git a/client/src/main/java/org/apache/falcon/ExtensionHandler.java b/client/src/main/java/org/apache/falcon/ExtensionHandler.java index c18a7cc5d..8168b23c7 100644 --- a/client/src/main/java/org/apache/falcon/ExtensionHandler.java +++ b/client/src/main/java/org/apache/falcon/ExtensionHandler.java @@ -27,6 +27,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,6 +44,8 @@ import java.util.List; import java.util.ServiceLoader; +import static org.apache.falcon.client.FalconClient.OUT; + /** * Handler class that is responsible for preparing Extension entities. */ @@ -50,6 +54,8 @@ public final class ExtensionHandler { public static final Logger LOG = LoggerFactory.getLogger(ExtensionHandler.class); private static final String UTF_8 = CharEncoding.UTF_8; private static final String TMP_BASE_DIR = String.format("file://%s", System.getProperty("java.io.tmpdir")); + private static final String LOCATION = "location"; + private static final String TYPE = "type"; public List getEntities(ClassLoader extensionClassloader, String extensionName, String jobName, InputStream configStream) throws IOException, FalconException { @@ -186,4 +192,27 @@ public static List getFilesInPath(URL fileURL) throws MalformedURLException urls.add(fileURL); return urls; } + + + public static String getExtensionLocation(String extensionName, JSONObject extensionDetailJson) { + String extensionBuildPath; + try { + extensionBuildPath = extensionDetailJson.get(LOCATION).toString(); + } catch (JSONException e) { + OUT.get().print("Error. " + extensionName + " not found "); + throw new FalconCLIException("Failed to get extension type for the given extension"); + } + return extensionBuildPath; + } + + public static String getExtensionType(String extensionName, JSONObject extensionDetailJson) { + String extensionType; + try { + extensionType = extensionDetailJson.get(TYPE).toString(); + } catch (JSONException e) { + OUT.get().print("Error. " + extensionName + " not found "); + throw new FalconCLIException("Failed to get extension type for the given extension"); + } + return extensionType; + } } diff --git a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java index e9a10fd37..fc6bc14f4 100644 --- a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java +++ b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java @@ -197,7 +197,7 @@ public abstract APIResult submitAndSchedule(String entityType, String filePath, public abstract String unregisterExtension(String extensionName); /** - * Prepare set of entities the extension has implemented and stage them to a local directory and submit them too. + * Prepares set of entities the extension has implemented and stage them to a local directory and submit them too. * @param extensionName extension which is available in the store. * @param jobName name to be used in all the extension entities' tagging that are built as part of * loadAndPrepare. @@ -209,7 +209,7 @@ public abstract APIResult submitExtensionJob(String extensionName, String jobNam String doAsUser); /** - * Prepare set of entities the extension has implemented and stage them to a local directory and submits and + * Prepares set of entities the extension has implemented and stage them to a local directory and submits and * schedules them. * @param extensionName extension which is available in the store. * @param jobName name to be used in all the extension entities' tagging that are built as part of @@ -221,6 +221,13 @@ public abstract APIResult submitExtensionJob(String extensionName, String jobNam public abstract APIResult submitAndScheduleExtensionJob(String extensionName, String jobName, String configPath, String doAsUser); + /** + * Prepares set of entities the extension has implemented to validate the extension job. + * @param jobName job name of the extension job. + * @return + */ + public abstract String getExtensionJobDetails(final String jobName); + /** * * Get list of the entities. diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java index 9820686e4..0ccbe4846 100644 --- a/client/src/main/java/org/apache/falcon/client/FalconClient.java +++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java @@ -23,8 +23,6 @@ import com.sun.jersey.api.client.WebResource; import com.sun.jersey.api.client.config.DefaultClientConfig; import com.sun.jersey.client.urlconnection.HTTPSProperties; -import com.sun.jersey.core.header.FormDataContentDisposition; -import com.sun.jersey.multipart.FormDataBodyPart; import com.sun.jersey.multipart.FormDataMultiPart; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; @@ -141,7 +139,9 @@ public boolean verify(String hostname, SSLSession sslSession) { return true; } }; - private static final String TAG_SEPARATOR = ","; + private static final String FEEDS = "feeds"; + private static final String PROCESSES = "processes"; + private static final String CONFIG = "config"; private final WebResource service; private final AuthenticatedURL.Token authenticationToken; @@ -357,6 +357,7 @@ protected static enum ExtensionOperations { DELETE("api/extension/delete", HttpMethod.POST, MediaType.TEXT_XML), UNREGISTER("api/extension/unregister/", HttpMethod.POST, MediaType.TEXT_PLAIN), DETAIL("api/extension/detail/", HttpMethod.GET, MediaType.APPLICATION_JSON), + JOB_DETAILS("api/extension/extensionJobDetails/", HttpMethod.GET, MediaType.APPLICATION_JSON), REGISTER("api/extension/register/", HttpMethod.POST, MediaType.TEXT_PLAIN); private String path; @@ -1029,6 +1030,12 @@ public String getExtensionDetail(final String extensionName) { return getResponse(String.class, getExtensionDetailResponse(extensionName)); } + public String getExtensionJobDetails(final String jobName) { + ClientResponse clientResponse = new ResourceBuilder().path(ExtensionOperations.JOB_DETAILS.path, jobName) + .call(ExtensionOperations.JOB_DETAILS); + return getResponse(String.class, clientResponse); + } + public ClientResponse getExtensionDetailResponse(final String extensionName) { return new ResourceBuilder().path(ExtensionOperations.DETAIL.path, extensionName) .call(ExtensionOperations.DETAIL); @@ -1073,16 +1080,16 @@ private FormDataMultiPart getEntitiesForm(String extensionName, String jobName, List entities = validateExtensionAndGetEntities(extensionName, jobName, configStream); FormDataMultiPart formDataMultiPart = new FormDataMultiPart(); - for (Entity entity : entities) { - if (EntityType.FEED.equals(entity.getEntityType())) { - formDataMultiPart.field("feeds", entity, MediaType.APPLICATION_XML_TYPE); - } else if (EntityType.PROCESS.equals(entity.getEntityType())) { - formDataMultiPart.field("processes", entity, MediaType.APPLICATION_XML_TYPE); + if (entities != null && !entities.isEmpty()) { + for (Entity entity : entities) { + if (EntityType.FEED.equals(entity.getEntityType())) { + formDataMultiPart.field(FEEDS, entity, MediaType.APPLICATION_XML_TYPE); + } else if (EntityType.PROCESS.equals(entity.getEntityType())) { + formDataMultiPart.field(PROCESSES, entity, MediaType.APPLICATION_XML_TYPE); + } } } - - formDataMultiPart.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("config").build(), configStream, - MediaType.APPLICATION_OCTET_STREAM_TYPE)); + formDataMultiPart.field(CONFIG, configStream, MediaType.APPLICATION_OCTET_STREAM_TYPE); try { formDataMultiPart.close(); } catch (IOException e) { @@ -1094,30 +1101,28 @@ private FormDataMultiPart getEntitiesForm(String extensionName, String jobName, private List validateExtensionAndGetEntities(String extensionName, String jobName, InputStream configStream) { - ClientResponse clientResponse = getExtensionDetailResponse(extensionName); - List entities = getEntities(extensionName, jobName, configStream, clientResponse); + JSONObject extensionDetailJson = getExtensionDetailJson(extensionName); + String extensionType = ExtensionHandler.getExtensionType(extensionName, extensionDetailJson); + String extensionBuildLocation = ExtensionHandler.getExtensionLocation(extensionName, extensionDetailJson); + List entities = getEntities(extensionName, jobName, configStream, extensionType, + extensionBuildLocation); return entities; } - private List getEntities(String extensionName, String jobName, InputStream configStream, - ClientResponse clientResponse) { - JSONObject responseJson; - try { - responseJson = new JSONObject(clientResponse.getEntity(String.class)); - } catch (JSONException e) { - OUT.get().print("Submit failed. Failed to get details for the given extension"); - throw new FalconCLIException("Submit failed. Failed to get details for the given extension"); - } - String extensionType; - String extensionBuildLocation; + private JSONObject getExtensionDetailJson(String extensionName) { + ClientResponse clientResponse = getExtensionDetailResponse(extensionName); + JSONObject extensionDetailJson; try { - extensionType = responseJson.get("type").toString(); - extensionBuildLocation = responseJson.get("location").toString(); + extensionDetailJson = new JSONObject(clientResponse.getEntity(String.class)); } catch (JSONException e) { - OUT.get().print("Error. " + extensionName + " not found "); - throw new FalconCLIException("Submit failed. Failed to get details for the given extension"); + OUT.get().print("Failed to get details for the given extension"); + throw new FalconCLIException("Failed to get details for the given extension"); } + return extensionDetailJson; + } + private List getEntities(String extensionName, String jobName, InputStream configStream, + String extensionType, String extensionBuildLocation) { List entities = null; if (!extensionType.equals(ExtensionType.CUSTOM.name())) { try { @@ -1125,11 +1130,11 @@ private List getEntities(String extensionName, String jobName, InputStre extensionBuildLocation); } catch (Exception e) { OUT.get().println("Error in building the extension"); - throw new FalconCLIException("Submit failed. Failed to get details for the given extension"); + throw new FalconCLIException("Failed to prepare entities for the given extension"); } if (entities == null || entities.isEmpty()) { OUT.get().println("No entities got built"); - throw new FalconCLIException("Submit failed. Failed to get details for the given extension"); + throw new FalconCLIException("Failed to prepare entities for the given extension"); } } return entities; @@ -1154,13 +1159,20 @@ public APIResult updateExtensionJob(final String extensionName, final String fil return getResponse(APIResult.class, clientResponse); } - public APIResult validateExtensionJob(final String extensionName, final String filePath, final String doAsUser) { - InputStream entityStream = getServletInputStream(filePath); - ClientResponse clientResponse = new ResourceBuilder() - .path(ExtensionOperations.VALIDATE.path, extensionName) - .addQueryParam(DO_AS_OPT, doAsUser) - .call(ExtensionOperations.VALIDATE, entityStream); - return getResponse(APIResult.class, clientResponse); + public APIResult validateExtensionJob(final String extensionName, final String jobName, + final String configPath, final String doAsUser) { + String extensionType = ExtensionHandler.getExtensionType(extensionName, getExtensionDetailJson(extensionName)); + InputStream configStream = getServletInputStream(configPath); + if (ExtensionType.TRUSTED.name().equalsIgnoreCase(extensionType)) { + ClientResponse clientResponse = new ResourceBuilder() + .path(ExtensionOperations.VALIDATE.path, extensionName) + .addQueryParam(DO_AS_OPT, doAsUser) + .call(ExtensionOperations.VALIDATE, configStream); + return getResponse(APIResult.class, clientResponse); + } else { + validateExtensionAndGetEntities(extensionName, jobName, configStream); + return new APIResult(APIResult.Status.SUCCEEDED, "Validated successfully"); + } } public APIResult scheduleExtensionJob(final String jobName, final String doAsUser) { diff --git a/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java index 38fa3aece..b97775268 100644 --- a/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java +++ b/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java @@ -74,6 +74,10 @@ public ProcessEntityParser() { @Override public void validate(Process process) throws FalconException { + validate(process, true); + } + + public void validate(Process process, boolean checkDependentFeeds) throws FalconException { if (process.getTimezone() == null) { process.setTimezone(TimeZone.getTimeZone("UTC")); } @@ -106,24 +110,27 @@ public void validate(Process process) throws FalconException { validateHDFSPaths(process, clusterName); validateProperties(process); - if (process.getInputs() != null) { - for (Input input : process.getInputs().getInputs()) { - validateEntityExists(EntityType.FEED, input.getFeed()); - Feed feed = ConfigurationStore.get().get(EntityType.FEED, input.getFeed()); - CrossEntityValidations.validateFeedDefinedForCluster(feed, clusterName); - CrossEntityValidations.validateFeedRetentionPeriod(input.getStart(), feed, clusterName); - CrossEntityValidations.validateInstanceRange(process, input, feed); - validateInputPartition(input, feed); - validateOptionalInputsForTableStorage(feed, input); + if (checkDependentFeeds) { + if (process.getInputs() != null) { + for (Input input : process.getInputs().getInputs()) { + validateEntityExists(EntityType.FEED, input.getFeed()); + Feed feed = ConfigurationStore.get().get(EntityType.FEED, input.getFeed()); + CrossEntityValidations.validateFeedDefinedForCluster(feed, clusterName); + CrossEntityValidations.validateFeedRetentionPeriod(input.getStart(), feed, clusterName); + CrossEntityValidations.validateInstanceRange(process, input, feed); + validateInputPartition(input, feed); + validateOptionalInputsForTableStorage(feed, input); + } } - } - if (process.getOutputs() != null) { - for (Output output : process.getOutputs().getOutputs()) { - validateEntityExists(EntityType.FEED, output.getFeed()); - Feed feed = ConfigurationStore.get().get(EntityType.FEED, output.getFeed()); - CrossEntityValidations.validateFeedDefinedForCluster(feed, clusterName); - CrossEntityValidations.validateInstance(process, output, feed); + + if (process.getOutputs() != null) { + for (Output output : process.getOutputs().getOutputs()) { + validateEntityExists(EntityType.FEED, output.getFeed()); + Feed feed = ConfigurationStore.get().get(EntityType.FEED, output.getFeed()); + CrossEntityValidations.validateFeedDefinedForCluster(feed, clusterName); + CrossEntityValidations.validateInstance(process, output, feed); + } } } } diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties index f91f3b696..01386f124 100644 --- a/common/src/main/resources/startup.properties +++ b/common/src/main/resources/startup.properties @@ -35,7 +35,6 @@ org.apache.falcon.workflow.WorkflowJobEndNotificationService, \ org.apache.falcon.service.ProcessSubscriberService,\ org.apache.falcon.service.FalconJPAService,\ - org.apache.falcon.extensions.ExtensionService,\ org.apache.falcon.service.EntitySLAMonitoringService,\ org.apache.falcon.service.LifecyclePolicyMap,\ org.apache.falcon.entity.store.ConfigurationStore,\ @@ -45,6 +44,17 @@ org.apache.falcon.service.LogCleanupService,\ org.apache.falcon.service.GroupsService,\ org.apache.falcon.service.ProxyUserService + + +##Add if you want to use Metadata Service +## Also enable all the properties which starts with falcon.graph prefix +#org.apache.falcon.metadata.MetadataMappingService,\ + +##Add if you want to use Trusted or User Extensions +## In case of distributed Mode enable ExtensionService only on Prism +## It should come after FalconJPAService in application services +#org.apache.falcon.extensions.ExtensionService,\ + ##Add if you want to send data to graphite # org.apache.falcon.metrics.MetricNotificationService\ ## Add if you want to use Falcon Azure integration ## diff --git a/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java b/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java index 661850057..a32b5aef4 100644 --- a/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java +++ b/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java @@ -155,6 +155,24 @@ public void deleteExtensionJob(String jobName) { } } + public ExtensionJobsBean getExtensionJobDetails(String jobName) { + EntityManager entityManager = getEntityManager(); + beginTransaction(entityManager); + Query query = entityManager.createNamedQuery(PersistenceConstants.GET_EXTENSION_JOB); + query.setParameter(JOB_NAME, jobName); + List jobsBeanList; + try { + jobsBeanList = query.getResultList(); + } finally { + commitAndCloseTransaction(entityManager); + } + if (jobsBeanList != null && !jobsBeanList.isEmpty()) { + return jobsBeanList.get(0); + } else { + return null; + } + } + public List getAllExtensionJobs() { EntityManager entityManager = getEntityManager(); beginTransaction(entityManager); diff --git a/extensions/src/test/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStoreTest.java b/extensions/src/test/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStoreTest.java index a4f46d4b0..26dcdf52a 100644 --- a/extensions/src/test/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStoreTest.java +++ b/extensions/src/test/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStoreTest.java @@ -83,6 +83,7 @@ public void testExtensionJob() { stateStore.storeExtensionJob("job1", "test2", feeds, processes, config); Assert.assertEquals(stateStore.getAllExtensionJobs().size(), 1); + Assert.assertEquals(stateStore.getExtensionJobDetails("job1").getFeeds().get(0), "testFeed"); stateStore.deleteExtensionJob("job1"); Assert.assertEquals(stateStore.getAllExtensionJobs().size(), 0); } diff --git a/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java b/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java index f0b04bfa5..dbd4ad184 100644 --- a/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java +++ b/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java @@ -18,16 +18,20 @@ package org.apache.falcon.resource.extensions; +import com.sun.jersey.multipart.FormDataBodyPart; import com.sun.jersey.multipart.FormDataParam; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.falcon.FalconException; import org.apache.falcon.FalconWebException; import org.apache.falcon.entity.EntityUtil; +import org.apache.falcon.entity.parser.ProcessEntityParser; import org.apache.falcon.entity.parser.ValidationException; import org.apache.falcon.entity.store.StoreAccessException; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.feed.Feed; +import org.apache.falcon.entity.v0.process.Process; import org.apache.falcon.extensions.Extension; import org.apache.falcon.extensions.ExtensionProperties; import org.apache.falcon.extensions.ExtensionService; @@ -35,6 +39,7 @@ import org.apache.falcon.extensions.jdbc.ExtensionMetaStore; import org.apache.falcon.extensions.store.ExtensionStore; import org.apache.falcon.persistence.ExtensionBean; +import org.apache.falcon.persistence.ExtensionJobsBean; import org.apache.falcon.resource.APIResult; import org.apache.falcon.resource.AbstractSchedulableEntityManager; import org.apache.falcon.resource.EntityList; @@ -44,6 +49,7 @@ import org.apache.falcon.security.CurrentUser; import org.apache.falcon.service.Services; import org.apache.falcon.util.DeploymentUtil; +import org.apache.hadoop.security.authorize.AuthorizationException; import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; @@ -80,19 +86,29 @@ public class ExtensionManager extends AbstractSchedulableEntityManager { public static final Logger LOG = LoggerFactory.getLogger(ExtensionManager.class); - public static final String TAG_PREFIX_EXTENSION_NAME = "_falcon_extension_name="; - public static final String TAG_PREFIX_EXTENSION_JOB = "_falcon_extension_job="; - public static final String ASCENDING_SORT_ORDER = "asc"; - public static final String DESCENDING_SORT_ORDER = "desc"; + private static final String TAG_PREFIX_EXTENSION_NAME = "_falcon_extension_name="; + private static final String TAG_PREFIX_EXTENSION_JOB = "_falcon_extension_job="; + private static final String ASCENDING_SORT_ORDER = "asc"; + private static final String DESCENDING_SORT_ORDER = "desc"; private Extension extension = new Extension(); + private static final String EXTENSION_RESULTS = "extensions"; private static final String TOTAL_RESULTS = "totalResults"; private static final String README = "README"; - private static final String EXTENSION_NAME = "name"; + private static final String NAME = "name"; private static final String EXTENSION_TYPE = "type"; private static final String EXTENSION_DESC = "description"; - public static final String EXTENSION_LOCATION = "location"; + private static final String EXTENSION_LOCATION = "location"; + private static final String JOB_NAME = "jobName"; + + private static final String EXTENSION_NAME = "extensionName"; + private static final String FEEDS = "feeds"; + private static final String PROCESSES = "processes"; + private static final String CONFIG = "config"; + private static final String CREATION_TIME = "creationTime"; + private static final String LAST_UPDATE_TIME = "lastUpdatedTime"; + private static final String EXTENSION_PROPERTY_JSON_SUFFIX = "-properties.json"; //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck @@ -295,38 +311,65 @@ public APIResult delete(@PathParam("job-name") String jobName, @POST @Path("submit/{extension-name}") - @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.MULTIPART_FORM_DATA}) + @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.MULTIPART_FORM_DATA, + MediaType.APPLICATION_OCTET_STREAM}) @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON}) public APIResult submit( @PathParam("extension-name") String extensionName, @Context HttpServletRequest request, @DefaultValue("") @QueryParam("doAs") String doAsUser, @QueryParam("jobName") String jobName, - @FormDataParam("entities") List entities, + @FormDataParam("processes") List processForms, + @FormDataParam("feeds") List feedForms, @FormDataParam("config") InputStream config) { checkIfExtensionServiceIsEnabled(); + Map> entityMap; + try { - entities = getEntityList(extensionName, entities, config); - submitEntities(extensionName, doAsUser, jobName, entities, config); + entityMap = getEntityList(extensionName, jobName, feedForms, processForms, config); + submitEntities(extensionName, doAsUser, jobName, entityMap, config); } catch (FalconException | IOException e) { - LOG.error("Error when submitting extension job: ", e); + LOG.error("Error while submitting extension job: ", e); throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR); } return new APIResult(APIResult.Status.SUCCEEDED, "Extension job submitted successfully" + jobName); } - private void validateEntities(List entities) throws FalconException { - for (Entity entity : entities) { - if (!EntityType.FEED.equals(entity.getEntityType()) && !EntityType.PROCESS.equals(entity.getEntityType())) { - LOG.error("Cluster entity is not allowed for submission via submitEntities: {}", entity.getName()); - throw new FalconException("Cluster entity is not allowed for submission in extensions submission"); + private Map> getEntityList(String extensionName, String jobName, + List feedForms, + List processForms, InputStream config) + throws FalconException, IOException{ + List processes = getProcesses(processForms); + List feeds = getFeeds(feedForms); + ExtensionType extensionType = getExtensionType(extensionName); + List entities; + Map> entityMap = new HashMap<>(); + if (ExtensionType.TRUSTED.equals(extensionType)) { + entities = generateEntities(extensionName, config); + List trustedFeeds = new ArrayList<>(); + List trustedProcesses = new ArrayList<>(); + for (Entity entity : entities) { + if (EntityType.FEED.equals(entity.getEntityType())) { + trustedFeeds.add(entity); + } else { + trustedProcesses.add(entity); + } } - super.validate(entity); + entityMap.put(EntityType.PROCESS, trustedProcesses); + entityMap.put(EntityType.FEED, trustedFeeds); + return entityMap; + } else { + EntityUtil.applyTags(extensionName, jobName, processes); + EntityUtil.applyTags(extensionName, jobName, feeds); + entityMap.put(EntityType.PROCESS, processes); + entityMap.put(EntityType.FEED, feeds); + return entityMap; } } + private ExtensionType getExtensionType(String extensionName) { - ExtensionMetaStore metaStore = ExtensionStore.get().getMetaStore(); + ExtensionMetaStore metaStore = ExtensionStore.getMetaStore(); ExtensionBean extensionDetails = metaStore.getDetail(extensionName); return extensionDetails.getExtensionType(); } @@ -340,50 +383,90 @@ public APIResult submitAndSchedule( @Context HttpServletRequest request, @DefaultValue("") @QueryParam("doAs") String doAsUser, @QueryParam("jobName") String jobName, - @FormDataParam("entities") List entities, + @FormDataParam("processes") List processForms, + @FormDataParam("feeds") List feedForms, @FormDataParam("config") InputStream config) { checkIfExtensionServiceIsEnabled(); + Map> entityMap; try { - entities = getEntityList(extensionName, entities, config); - submitEntities(extensionName, doAsUser, jobName, entities, config); - for (Entity entity : entities) { - scheduleInternal(entity.getEntityType().name(), entity.getName(), null, null); - } + entityMap = getEntityList(extensionName, jobName, feedForms, processForms, config); + submitEntities(extensionName, doAsUser, jobName, entityMap, config); + scheduleEntities(entityMap); } catch (FalconException | IOException e) { - LOG.error("Error when submitting extension job: ", e); + LOG.error("Error while submitting extension job: ", e); throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR); } return new APIResult(APIResult.Status.SUCCEEDED, "Extension job submitted and scheduled successfully"); } - protected void submitEntities(String extensionName, String doAsUser, String jobName, List entities, - InputStream configStream) throws FalconException, IOException { - validateEntities(entities); - List feeds = new ArrayList<>(); - List processes = new ArrayList<>(); - for (Entity entity : entities) { - submitInternal(entity, doAsUser); - if (EntityType.FEED.equals(entity.getEntityType())) { - feeds.add(entity.getName()); - } else if (EntityType.PROCESS.equals(entity.getEntityType())) { - processes.add(entity.getName()); + private List getFeeds(List feedForms) { + List feeds = new ArrayList<>(); + if (feedForms != null && !feedForms.isEmpty()) { + for (FormDataBodyPart formDataBodyPart : feedForms) { + feeds.add(formDataBodyPart.getValueAs(Feed.class)); } } - ExtensionMetaStore metaStore = ExtensionStore.get().getMetaStore(); + return feeds; + } + + private List getProcesses(List processForms) { + List processes = new ArrayList<>(); + if (processForms != null && !processForms.isEmpty()) { + for (FormDataBodyPart formDataBodyPart : processForms) { + processes.add(formDataBodyPart.getValueAs(Process.class)); + } + } + return processes; + } + + protected void submitEntities(String extensionName, String doAsUser, String jobName, + Map> entityMap, InputStream configStream) + throws FalconException, IOException { + List feeds = entityMap.get(EntityType.FEED); + List processes = entityMap.get(EntityType.PROCESS); + validateFeeds(feeds); + validateProcesses(processes); + List feedNames = new ArrayList<>(); + List processNames = new ArrayList<>(); + for (Entity feed : feeds) { + submitInternal(feed, doAsUser); + feedNames.add(feed.getName()); + } + for (Entity process: processes) { + submitInternal(process, doAsUser); + processNames.add(process.getName()); + } + + ExtensionMetaStore metaStore = ExtensionStore.getMetaStore(); byte[] configBytes = null; if (configStream != null) { configBytes = IOUtils.toByteArray(configStream); } - metaStore.storeExtensionJob(jobName, extensionName, feeds, processes, configBytes); + metaStore.storeExtensionJob(jobName, extensionName, feedNames, processNames, configBytes); } - private List getEntityList(String extensionName, List entities, InputStream config) - throws FalconException, IOException { - ExtensionType extensionType = getExtensionType(extensionName); - if (ExtensionType.TRUSTED.equals(extensionType)) { - entities = generateEntities(extensionName, config); + protected void scheduleEntities(Map> entityMap) throws FalconException, + AuthorizationException { + for (Object feed: entityMap.get(EntityType.FEED)) { + scheduleInternal(EntityType.FEED.name(), ((Feed)feed).getName(), null, null); + } + for (Object process: entityMap.get(EntityType.PROCESS)) { + scheduleInternal(EntityType.PROCESS.name(), ((Process)process).getName(), null, null); + } + } + + + private void validateFeeds(List feeds) throws FalconException { + for (Entity feed : feeds) { + super.validate(feed); + } + } + + private void validateProcesses(List processes) throws FalconException { + ProcessEntityParser processEntityParser = new ProcessEntityParser(); + for (Entity process : processes) { + processEntityParser.validate((Process)process, false); } - return entities; } @POST @@ -416,6 +499,10 @@ public APIResult validate( @Context HttpServletRequest request, @DefaultValue("") @QueryParam("doAs") String doAsUser) { checkIfExtensionServiceIsEnabled(); + ExtensionType extensionType = getExtensionType(extensionName); + if (!ExtensionType.TRUSTED.equals(extensionType)) { + throw FalconWebException.newAPIException("Extension validation is supported only for trusted extensions"); + } try { List entities = generateEntities(extensionName, request.getInputStream()); for (Entity entity : entities) { @@ -474,16 +561,27 @@ public String getExtensionDescription( @GET @Path("detail/{extension-name}") @Produces({MediaType.APPLICATION_JSON}) - public Response getDetail(@PathParam("extension-name") String extensionName){ + public Response getDetail(@PathParam("extension-name") String extensionName) { checkIfExtensionServiceIsEnabled(); validateExtensionName(extensionName); try { - return Response.ok(buildDetailResult(extensionName)).build(); + return Response.ok(buildExtensionDetailResult(extensionName)).build(); } catch (Throwable e) { throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR); } } + @GET + @Path("extensionJobDetails/{job-name}") + @Produces({MediaType.APPLICATION_JSON}) + public String getExtensionJobDetail(@PathParam("job-name") String jobName) { + checkIfExtensionServiceIsEnabled(); + try { + return buildExtensionJobDetailResult(jobName).toString(); + } catch (FalconException e) { + throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR); + } + } @POST @Path("unregister/{extension-name}") @@ -507,7 +605,7 @@ public String deleteExtensionMetadata( public String registerExtensionMetadata( @PathParam("extension-name") String extensionName, @QueryParam("path") String path, - @QueryParam("description") String description){ + @QueryParam("description") String description) { checkIfExtensionServiceIsEnabled(); validateExtensionName(extensionName); try { @@ -541,13 +639,13 @@ private static void validateExtensionName(final String extensionName) { private static JSONArray buildEnumerateResult() throws FalconException { JSONArray results = new JSONArray(); - ExtensionMetaStore metaStore = ExtensionStore.get().getMetaStore(); + ExtensionMetaStore metaStore = ExtensionStore.getMetaStore(); List extensionBeanList = metaStore.getAllExtensions(); for (ExtensionBean extensionBean : extensionBeanList) { JSONObject resultObject = new JSONObject(); try { - resultObject.put(EXTENSION_NAME, extensionBean.getExtensionName().toLowerCase()); + resultObject.put(NAME, extensionBean.getExtensionName().toLowerCase()); resultObject.put(EXTENSION_TYPE, extensionBean.getExtensionType()); resultObject.put(EXTENSION_DESC, extensionBean.getDescription()); resultObject.put(EXTENSION_LOCATION, extensionBean.getLocation()); @@ -574,8 +672,29 @@ private List generateEntities(String extensionName, InputStream configSt return entities; } - private JSONObject buildDetailResult(final String extensionName) throws FalconException { - ExtensionMetaStore metaStore = ExtensionStore.get().getMetaStore(); + private JSONObject buildExtensionJobDetailResult(final String jobName) throws FalconException { + ExtensionMetaStore metaStore = ExtensionStore.getMetaStore(); + ExtensionJobsBean jobsBean = metaStore.getExtensionJobDetails(jobName); + if (jobsBean == null) { + throw new ValidationException("Job name not found:" + jobName); + } + JSONObject detailsObject = new JSONObject(); + try { + detailsObject.put(JOB_NAME, jobsBean.getJobName()); + detailsObject.put(EXTENSION_NAME, jobsBean.getExtensionName()); + detailsObject.put(FEEDS, StringUtils.join(jobsBean.getFeeds(), ",")); + detailsObject.put(PROCESSES, StringUtils.join(jobsBean.getProcesses(), ",")); + detailsObject.put(CONFIG, jobsBean.getConfig()); + detailsObject.put(CREATION_TIME, jobsBean.getCreationTime()); + detailsObject.put(LAST_UPDATE_TIME, jobsBean.getLastUpdatedTime()); + } catch (JSONException e) { + LOG.error("Exception while building extension jon details for job {}", jobName, e); + } + return detailsObject; + } + + private JSONObject buildExtensionDetailResult(final String extensionName) throws FalconException { + ExtensionMetaStore metaStore = ExtensionStore.getMetaStore(); if (!metaStore.checkIfExtensionExists(extensionName)){ throw new ValidationException("No extension resources found for " + extensionName); @@ -584,7 +703,7 @@ private JSONObject buildDetailResult(final String extensionName) throws FalconEx ExtensionBean bean = metaStore.getDetail(extensionName); JSONObject resultObject = new JSONObject(); try { - resultObject.put(EXTENSION_NAME, bean.getExtensionName()); + resultObject.put(NAME, bean.getExtensionName()); resultObject.put(EXTENSION_TYPE, bean.getExtensionType()); resultObject.put(EXTENSION_DESC, bean.getDescription()); resultObject.put(EXTENSION_LOCATION, bean.getLocation()); diff --git a/src/conf/startup.properties b/src/conf/startup.properties index 901c3a9a2..67347cd98 100644 --- a/src/conf/startup.properties +++ b/src/conf/startup.properties @@ -43,16 +43,23 @@ org.apache.falcon.workflow.WorkflowJobEndNotificationService, \ org.apache.falcon.service.ProcessSubscriberService,\ org.apache.falcon.service.FalconJPAService,\ - org.apache.falcon.extensions.ExtensionService,\ org.apache.falcon.service.LifecyclePolicyMap,\ org.apache.falcon.entity.store.ConfigurationStore,\ org.apache.falcon.rerun.service.RetryService,\ org.apache.falcon.rerun.service.LateRunService,\ - org.apache.falcon.metadata.MetadataMappingService,\ org.apache.falcon.service.LogCleanupService,\ org.apache.falcon.service.GroupsService,\ org.apache.falcon.service.ProxyUserService +##Add if you want to use Metadata Service +## Also enable all the properties which starts with falcon.graph prefix +#org.apache.falcon.metadata.MetadataMappingService,\ + +##Add if you want to use Trusted or User Extensions +## In case of distributed Mode enable ExtensionService only on Prism +## It should come after FalconJPAService in application services +#org.apache.falcon.extensions.ExtensionService,\ + ##For feed SLA monitoring enable these two # org.apache.falcon.service.FalconJPAService,\ # org.apache.falcon.service.EntitySLAMonitoringService,\ diff --git a/src/main/assemblies/distributed-package.xml b/src/main/assemblies/distributed-package.xml index 09665dc61..4683a8123 100644 --- a/src/main/assemblies/distributed-package.xml +++ b/src/main/assemblies/distributed-package.xml @@ -247,16 +247,6 @@ 0755 - - ../ - extensions/mirroring - - */** - - 0770 - 0770 - - ../examples/app examples/app diff --git a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java index 00c2ad192..c9e1d4c59 100644 --- a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java +++ b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java @@ -58,6 +58,7 @@ import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.Date; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -284,21 +285,38 @@ public APIResult submitExtensionJob(String extensionName, String jobName, String InputStream configStream = getServletInputStream(configPath); try { - List entities = getEntities(extensionName, jobName, configStream); - return localExtensionManager.submitExtensionJob(extensionName, jobName, configStream, entities); + Map> entityMap = getEntityTypeListMap(extensionName, jobName, configStream); + return localExtensionManager.submitExtensionJob(extensionName, jobName, configStream, entityMap); } catch (FalconException | IOException e) { throw new FalconCLIException("Failed in submitting extension job " + jobName); } } + private Map> getEntityTypeListMap(String extensionName, String jobName, InputStream configStream) { + List entities = getEntities(extensionName, jobName, configStream); + List feeds = new ArrayList<>(); + List processes = new ArrayList<>(); + for (Entity entity : entities) { + if (EntityType.FEED.equals(entity.getEntityType())) { + feeds.add(entity); + } else if (EntityType.PROCESS.equals(entity.getEntityType())) { + processes.add(entity); + } + } + Map> entityMap = new HashMap<>(); + entityMap.put(EntityType.PROCESS, processes); + entityMap.put(EntityType.FEED, feeds); + return entityMap; + } + private List getEntities(String extensionName, String jobName, InputStream configStream) { - String packagePath = ExtensionStore.get().getMetaStore().getDetail(extensionName).getLocation(); + String packagePath = ExtensionStore.getMetaStore().getDetail(extensionName).getLocation(); List entities; try { entities = ExtensionHandler.loadAndPrepare(extensionName, jobName, configStream, packagePath); } catch (FalconException | IOException e) { - throw new FalconCLIException("Failed in generating entties" + jobName); + throw new FalconCLIException("Failed in generating entities" + jobName); } return entities; } @@ -308,14 +326,19 @@ public APIResult submitAndScheduleExtensionJob(String extensionName, String jobN String doAsUser) { InputStream configStream = getServletInputStream(configPath); try { - List entities = getEntities(extensionName, jobName, configStream); + Map> entityMap = getEntityTypeListMap(extensionName, jobName, configStream); return localExtensionManager.submitAndSchedulableExtensionJob(extensionName, jobName, configStream, - entities); + entityMap); } catch (FalconException | IOException e) { throw new FalconCLIException("Failed in submitting extension job " + jobName); } } + @Override + public String getExtensionJobDetails(final String jobName) { + return localExtensionManager.getExtensionJobDetails(jobName); + } + @Override public EntityList getEntityList(String entityType, String fields, String nameSubsequence, String tagKeywords, String filterBy, String filterTags, String orderBy, String sortOrder, diff --git a/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java b/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java index 5d2710c5b..da486db87 100644 --- a/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java +++ b/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java @@ -20,12 +20,14 @@ import org.apache.falcon.FalconException; import org.apache.falcon.entity.v0.Entity; +import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.resource.APIResult; import org.apache.falcon.resource.extensions.ExtensionManager; import java.io.IOException; import java.io.InputStream; import java.util.List; +import java.util.Map; /** * A proxy implementation of the extension operations in local mode. @@ -33,18 +35,17 @@ public class LocalExtensionManager extends ExtensionManager { public LocalExtensionManager() {} - public APIResult submitExtensionJob(String extensionName, String jobName, InputStream config, List entities) - throws FalconException, IOException { - submitEntities(extensionName, null, jobName, entities, config); + public APIResult submitExtensionJob(String extensionName, String jobName, InputStream config, + Map> entityMap) throws FalconException, IOException { + submitEntities(extensionName, null, jobName, entityMap, config); return new APIResult(APIResult.Status.SUCCEEDED, "Extension job submitted successfully" + jobName); } public APIResult submitAndSchedulableExtensionJob(String extensionName, String jobName, InputStream config, - List entities) throws FalconException, IOException { - submitEntities(extensionName, null, jobName, entities, config); - for (Entity entity : entities) { - scheduleInternal(entity.getEntityType().name(), entity.getName(), null, null); - } + Map> entityMap) + throws FalconException, IOException { + submitEntities(extensionName, null, jobName, entityMap, config); + scheduleEntities(entityMap); return new APIResult(APIResult.Status.SUCCEEDED, "Extension job submitted successfully" + jobName); } @@ -57,4 +58,8 @@ public String unRegisterExtension(String extensionName) { return super.deleteExtensionMetadata(extensionName); } + public String getExtensionJobDetails(String jobName){ + return super.getExtensionJobDetail(jobName); + } + } diff --git a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java index 4ed716101..6f7174719 100644 --- a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java +++ b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java @@ -224,6 +224,10 @@ public String registerExtension(String extensionName, String packagePath, String return falconUnitClient.registerExtension(extensionName, packagePath, description); } + public String getExtensionJobDetails(String jobName) { + return falconUnitClient.getExtensionJobDetails(jobName); + } + public String unregisterExtension(String extensionName) { return falconUnitClient.unregisterExtension(extensionName); } diff --git a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java index 3555b22f4..b7a6e399c 100644 --- a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java +++ b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java @@ -32,6 +32,7 @@ import org.apache.falcon.service.FalconJPAService; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; +import org.codehaus.jettison.json.JSONObject; import org.testng.Assert; import org.testng.annotations.Test; @@ -426,6 +427,9 @@ public void testSubmitAndScheduleExtensionJob() throws Exception { copyExtensionJar(packageBuildLib); APIResult apiResult = submitAndScheduleExtensionJob("testExtension", "testJob", null, null); assertStatus(apiResult); + result = getExtensionJobDetails("testJob"); + JSONObject resultJson = new JSONObject(result); + Assert.assertEquals(resultJson.get("extensionName"), "testExtension"); apiResult = getClient().getStatus(EntityType.PROCESS, "sample", CLUSTER_NAME, null, false); assertStatus(apiResult); From aca55dcd3dd3548bc9169068fdfca237da687f32 Mon Sep 17 00:00:00 2001 From: Pracheer Agarwal Date: Thu, 15 Dec 2016 13:36:46 +0530 Subject: [PATCH 6/6] FALCON-2207 indentation changes and review comments changes --- .../falcon/persistence/ExtensionBean.java | 1 - .../extensions/jdbc/ExtensionMetaStore.java | 2 +- .../extensions/store/ExtensionStore.java | 35 ++++++++-------- .../jdbc/ExtensionMetaStoreTest.java | 6 ++- .../extensions/store/ExtensionStoreTest.java | 41 +++++++++---------- 5 files changed, 42 insertions(+), 43 deletions(-) diff --git a/common/src/main/java/org/apache/falcon/persistence/ExtensionBean.java b/common/src/main/java/org/apache/falcon/persistence/ExtensionBean.java index fbeaefb77..79cfe1658 100644 --- a/common/src/main/java/org/apache/falcon/persistence/ExtensionBean.java +++ b/common/src/main/java/org/apache/falcon/persistence/ExtensionBean.java @@ -95,7 +95,6 @@ public void setCreationTime(Date creationTime) { this.creationTime = creationTime; } - public String getExtensionName() { return extensionName; } diff --git a/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java b/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java index a32b5aef4..fd4d860a5 100644 --- a/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java +++ b/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java @@ -42,7 +42,7 @@ private EntityManager getEntityManager() { } public void storeExtensionBean(String extensionName, String location, ExtensionType extensionType, - String description, String extensionOwner){ + String description, String extensionOwner) { ExtensionBean extensionBean = new ExtensionBean(); extensionBean.setLocation(location); extensionBean.setExtensionName(extensionName); diff --git a/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java b/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java index 89ffb2966..df63779f6 100644 --- a/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java +++ b/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java @@ -95,7 +95,7 @@ private ExtensionStore() { } private void initializeDbTable() { - try{ + try { metaStore.deleteExtensionsOfType(ExtensionType.TRUSTED); List extensions = getExtensions(); for (String extension : extensions) { @@ -107,7 +107,7 @@ private void initializeDbTable() { String extensionOwner = CurrentUser.getUser(); metaStore.storeExtensionBean(recipeName, location, extensionType, description, extensionOwner); } - } catch (FalconException e){ + } catch (FalconException e) { LOG.error("Exception in ExtensionMetaStore:", e); throw new RuntimeException(e); } @@ -254,12 +254,12 @@ public List getExtensions() throws StoreAccessException { return extensionList; } - public String deleteExtension(final String extensionName, String currentUser) throws FalconException{ + public String deleteExtension(final String extensionName, String currentUser) throws FalconException { ExtensionType extensionType = AbstractExtension.isExtensionTrusted(extensionName) ? ExtensionType.TRUSTED : ExtensionType.CUSTOM; - if (extensionType.equals(ExtensionType.TRUSTED)){ + if (extensionType.equals(ExtensionType.TRUSTED)) { throw new ValidationException(extensionName + " is trusted cannot be deleted."); - } else if (!metaStore.checkIfExtensionExists(extensionName)){ + } else if (!metaStore.checkIfExtensionExists(extensionName)) { throw new FalconException("Extension:" + extensionName + " is not registered with Falcon."); } else if (!metaStore.getDetail(extensionName).getExtensionOwner().equals(currentUser)) { throw new FalconException("User: " + currentUser + " is not allowed to delete extension: " + extensionName); @@ -270,49 +270,48 @@ public String deleteExtension(final String extensionName, String currentUser) th } public String registerExtension(final String extensionName, final String path, final String description, - String extensionOwner) - throws URISyntaxException, FalconException { + String extensionOwner) throws URISyntaxException, FalconException { Configuration conf = new Configuration(); URI uri = new URI(path); conf.set("fs.defaultFS", uri.getScheme() + "://" + uri.getAuthority()); - FileSystem fileSystem = HadoopClientFactory.get().createFalconFileSystem(uri); + FileSystem fileSystem = HadoopClientFactory.get().createFalconFileSystem(uri); try { fileSystem.listStatus(new Path(uri.getPath() + "/README")); - } catch (IOException e){ + } catch (IOException e) { LOG.error("Exception in registering Extension:{}", extensionName, e); throw new ValidationException("README file is not present in the " + path); } - PathFilter filter=new PathFilter(){ - public boolean accept(Path file){ + PathFilter filter = new PathFilter() { + public boolean accept(Path file) { return file.getName().endsWith(".jar"); } }; FileStatus[] jarStatus; try { jarStatus = fileSystem.listStatus(new Path(uri.getPath(), "libs/build"), filter); - if (jarStatus.length <=0) { + if (jarStatus.length <= 0) { throw new ValidationException("Jars are not present in the " + uri.getPath() + "/libs/build."); } - } catch (IOException e){ + } catch (IOException e) { LOG.error("Exception in registering Extension:{}", extensionName, e); throw new ValidationException("Jars are not present in the " + uri.getPath() + "/libs/build."); } FileStatus[] propStatus; - try{ + try { propStatus = fileSystem.listStatus(new Path(uri.getPath() + "/META")); - if (propStatus.length <=0){ + if (propStatus.length <= 0) { throw new ValidationException("No properties file is not present in the " + uri.getPath() + "/META" + " structure."); } - } catch (IOException e){ + } catch (IOException e) { LOG.error("Exception in registering Extension:{}", extensionName, e); throw new ValidationException("Directory is not present in the " + uri.getPath() + "/META" + " structure."); } - if (!metaStore.checkIfExtensionExists(extensionName)){ + if (!metaStore.checkIfExtensionExists(extensionName)) { metaStore.storeExtensionBean(extensionName, path, ExtensionType.CUSTOM, description, extensionOwner); - }else{ + } else { throw new ValidationException(extensionName + " already exists."); } return "Extension :" + extensionName + " registered successfully."; diff --git a/extensions/src/test/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStoreTest.java b/extensions/src/test/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStoreTest.java index 26dcdf52a..1688abb1c 100644 --- a/extensions/src/test/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStoreTest.java +++ b/extensions/src/test/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStoreTest.java @@ -60,7 +60,8 @@ public void init() { @Test public void testExtension(){ //insert - stateStore.storeExtensionBean("test1", "test_location", ExtensionType.TRUSTED, "test_description", "falcon"); + stateStore.storeExtensionBean("test1", "test_location", ExtensionType.TRUSTED, "test_description", + "falconUser"); Assert.assertEquals(stateStore.getAllExtensions().size(), 1); //check data @@ -73,7 +74,8 @@ public void testExtension(){ @Test public void testExtensionJob() { - stateStore.storeExtensionBean("test2", "test_location", ExtensionType.CUSTOM, "test2_description", "falcon"); + stateStore.storeExtensionBean("test2", "test_location", ExtensionType.CUSTOM, "test2_description", + "falconUser"); List processes = new ArrayList<>(); processes.add("testProcess"); List feeds = new ArrayList<>(); diff --git a/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java b/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java index 3084da3e2..773fea2cf 100644 --- a/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java +++ b/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java @@ -44,7 +44,7 @@ import java.util.Map; /** - * Tests for extension store. + * Tests for extension store. */ public class ExtensionStoreTest extends AbstractTestExtensionStore { private static Map resourcesMap; @@ -105,52 +105,52 @@ public void clean() { } @Test - public void testRegisterExtension() throws IOException, URISyntaxException, FalconException{ + public void testRegisterExtension() throws IOException, URISyntaxException, FalconException { String extensionPath = EXTENSION_PATH + "testRegister"; createLibs(extensionPath); createReadmeAndJar(extensionPath); createMETA(extensionPath); store = ExtensionStore.get(); - store.registerExtension("test", STORAGE_URL + extensionPath, "test desc", "falcon"); + store.registerExtension("test", STORAGE_URL + extensionPath, "test desc", "falconUser"); ExtensionMetaStore metaStore = new ExtensionMetaStore(); Assert.assertEquals(metaStore.getAllExtensions().size(), 1); } - @Test(expectedExceptions=ValidationException.class) - public void testFailureCaseRegisterExtension() throws IOException, URISyntaxException, FalconException{ + @Test(expectedExceptions = ValidationException.class) + public void testFailureCaseRegisterExtension() throws IOException, URISyntaxException, FalconException { String extensionPath = EXTENSION_PATH + "testRegister"; store = ExtensionStore.get(); createLibs(extensionPath); - store.registerExtension("test", STORAGE_URL + EXTENSION_PATH, "test desc", "falcon"); + store.registerExtension("test", STORAGE_URL + EXTENSION_PATH, "test desc", "falconUser"); } @Test - public void testDeleteExtension() throws IOException, URISyntaxException, FalconException{ + public void testDeleteExtension() throws IOException, URISyntaxException, FalconException { String extensionPath = EXTENSION_PATH + "testDelete"; createLibs(extensionPath); createReadmeAndJar(extensionPath); createMETA(extensionPath); store = ExtensionStore.get(); - store.registerExtension("toBeDeleted", STORAGE_URL + extensionPath, "test desc", "falcon"); - store.deleteExtension("toBeDeleted", "falcon"); + store.registerExtension("toBeDeleted", STORAGE_URL + extensionPath, "test desc", "falconUser"); + store.deleteExtension("toBeDeleted", "falconUser"); ExtensionMetaStore metaStore = new ExtensionMetaStore(); Assert.assertEquals(metaStore.getAllExtensions().size(), 0); } - @Test(expectedExceptions=FalconException.class) - public void testFailureDeleteExtension() throws IOException, URISyntaxException, FalconException{ + @Test(expectedExceptions = FalconException.class) + public void testFailureDeleteExtension() throws IOException, URISyntaxException, FalconException { String extensionPath = EXTENSION_PATH + "testACLOnDelete"; createLibs(extensionPath); createReadmeAndJar(extensionPath); createMETA(extensionPath); store = ExtensionStore.get(); - store.registerExtension("ACLFailure", STORAGE_URL + extensionPath, "test desc", "oozie"); - store.deleteExtension("ACLFailure", "falcon"); + store.registerExtension("ACLFailure", STORAGE_URL + extensionPath, "test desc", "oozieUser"); + store.deleteExtension("ACLFailure", "falconUser"); } - private void createMETA(String extensionPath) throws IOException{ + private void createMETA(String extensionPath) throws IOException { Path path = new Path(extensionPath + "/META"); - if (fs.exists(path)){ + if (fs.exists(path)) { fs.delete(path, true); } fs.mkdirs(path); @@ -158,7 +158,7 @@ private void createMETA(String extensionPath) throws IOException{ OutputStream os = fs.create(path); BufferedWriter br = new BufferedWriter(new OutputStreamWriter(os, "UTF-8")); br.write("Hello World"); - if (fs.exists(path)){ + if (fs.exists(path)) { fs.delete(path, true); } br.write("test properties"); @@ -166,9 +166,9 @@ private void createMETA(String extensionPath) throws IOException{ br.close(); } - private void createLibs(String extensionPath) throws IOException{ + private void createLibs(String extensionPath) throws IOException { Path path = new Path(extensionPath); - if (fs.exists(path)){ + if (fs.exists(path)) { fs.delete(path, true); } fs.mkdirs(path); @@ -176,9 +176,9 @@ private void createLibs(String extensionPath) throws IOException{ fs.mkdirs(path); } - private void createReadmeAndJar(String extensionPath) throws IOException{ + private void createReadmeAndJar(String extensionPath) throws IOException { Path path = new Path(extensionPath + "/README"); - if (fs.exists(path)){ + if (fs.exists(path)) { fs.delete(path, true); } fs.create(path); @@ -202,6 +202,5 @@ private void clearDB() { em.close(); } } - }