diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/alert_definitions.json b/ambari-agent/src/test/python/ambari_agent/dummy_files/alert_definitions.json index d9a82a7d04c..341017c33b6 100644 --- a/ambari-agent/src/test/python/ambari_agent/dummy_files/alert_definitions.json +++ b/ambari-agent/src/test/python/ambari_agent/dummy_files/alert_definitions.json @@ -7,9 +7,9 @@ { "name": "namenode_process", "service": "HDFS", - "enabled": true, - "interval": 6, "component": "NAMENODE", + "interval": 6, + "enabled": true, "label": "NameNode process", "source": { "reporting": { diff --git a/ambari-common/src/main/python/resource_management/libraries/providers/hdfs_resource.py b/ambari-common/src/main/python/resource_management/libraries/providers/hdfs_resource.py index 23bfbc55042..a7b43c75eb5 100644 --- a/ambari-common/src/main/python/resource_management/libraries/providers/hdfs_resource.py +++ b/ambari-common/src/main/python/resource_management/libraries/providers/hdfs_resource.py @@ -74,6 +74,32 @@ class HdfsResourceJar: while execute does all the expensive creating/deleting work executing the jar with the json as parameter. """ def action_delayed(self, action_name, main_resource): + dfs_type = main_resource.resource.dfs_type + + if main_resource.resource.nameservices is None: # all nameservices + nameservices = namenode_ha_utils.get_nameservices(main_resource.resource.hdfs_site) + else: + nameservices = main_resource.resource.nameservices + + # non-federated cluster + if not nameservices: + self.action_delayed_for_nameservice(None, action_name, main_resource) + else: + for nameservice in nameservices: + try: + if not dfs_type: + raise Fail(" for fileSystem service should be set in metainfo.xml") + nameservice = dfs_type.lower() + "://" + nameservice + + self.action_delayed_for_nameservice(nameservice, action_name, main_resource) + except namenode_ha_utils.NoActiveNamenodeException as ex: + # one of ns can be down (during initial start forexample) no need to worry for federated cluster + if len(nameservices) > 1: + Logger.exception("Cannot run HdfsResource for nameservice {0}. Due to no active namenode present".format(nameservice)) + else: + raise + + def action_delayed_for_nameservice(self, nameservice, action_name, main_resource): resource = {} env = Environment.get_instance() if not 'hdfs_files' in env.config: @@ -90,6 +116,8 @@ def action_delayed(self, action_name, main_resource): elif getattr(main_resource.resource, field_name): resource[json_field_name] = getattr(main_resource.resource, field_name) + resource['nameservice'] = nameservice + # Add resource to create env.config['hdfs_files'].append(resource) @@ -159,9 +187,9 @@ def __init__(self, hdfs_site, nameservice, run_user, security_enabled, logoutput self.logoutput = logoutput @staticmethod - def is_webhdfs_available(is_webhdfs_enabled, default_fs): + def is_webhdfs_available(is_webhdfs_enabled, dfs_type): # only hdfs seems to support webHDFS - return (is_webhdfs_enabled and default_fs.startswith("hdfs")) + return (is_webhdfs_enabled and dfs_type == 'HDFS') def run_command(self, *args, **kwargs): """ @@ -562,11 +590,17 @@ def _fill_directories_list(self, target, results): class HdfsResourceProvider(Provider): def __init__(self, resource): super(HdfsResourceProvider,self).__init__(resource) + + self.assert_parameter_is_set('dfs_type') self.fsType = getattr(resource, 'dfs_type') + self.ignored_resources_list = HdfsResourceProvider.get_ignored_resources_list(self.resource.hdfs_resource_ignore_file) - if self.fsType != 'HCFS': + + if self.fsType == 'HDFS': self.assert_parameter_is_set('hdfs_site') self.webhdfs_enabled = self.resource.hdfs_site['dfs.webhdfs.enabled'] + else: + self.webhdfs_enabled = False @staticmethod def parse_path(path): @@ -629,9 +663,7 @@ def action_execute(self): self.get_hdfs_resource_executor().action_execute(self) def get_hdfs_resource_executor(self): - if self.fsType == 'HCFS': - return HdfsResourceJar() - elif WebHDFSUtil.is_webhdfs_available(self.webhdfs_enabled, self.resource.default_fs): + if WebHDFSUtil.is_webhdfs_available(self.webhdfs_enabled, self.fsType): return HdfsResourceWebHDFS() else: return HdfsResourceJar() diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/metainfo.xml b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/metainfo.xml index 19b378b7227..f26eee9f933 100644 --- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/metainfo.xml +++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/metainfo.xml @@ -21,6 +21,7 @@ HDFS HDFS + HDFS Apache Hadoop Distributed File System 2.1.0.2.0 diff --git a/ambari-server/src/main/resources/common-services/OOZIE/4.0.0.2.0/package/scripts/oozie_service.py b/ambari-server/src/main/resources/common-services/OOZIE/4.0.0.2.0/package/scripts/oozie_service.py index 29d6f4befce..612bb29bfe4 100644 --- a/ambari-server/src/main/resources/common-services/OOZIE/4.0.0.2.0/package/scripts/oozie_service.py +++ b/ambari-server/src/main/resources/common-services/OOZIE/4.0.0.2.0/package/scripts/oozie_service.py @@ -139,7 +139,7 @@ def oozie_service(action = 'start', upgrade_type=None): params.HdfsResource(None, action="execute") hdfs_share_dir_exists = True # skip time-expensive hadoop fs -ls check - elif WebHDFSUtil.is_webhdfs_available(params.is_webhdfs_enabled, params.default_fs): + elif WebHDFSUtil.is_webhdfs_available(params.is_webhdfs_enabled, params.dfs_type): # check with webhdfs is much faster than executing hadoop fs -ls. util = WebHDFSUtil(params.hdfs_site, nameservice, params.oozie_user, params.security_enabled) list_status = util.run_command(params.hdfs_share_dir, 'GETFILESTATUS', method='GET', ignore_status_codes=['404'], assertable_result=False) diff --git a/ambari-server/src/main/resources/common-services/SPARK/1.2.1/package/scripts/livy_server.py b/ambari-server/src/main/resources/common-services/SPARK/1.2.1/package/scripts/livy_server.py index 7c858c5a149..3e457748d19 100644 --- a/ambari-server/src/main/resources/common-services/SPARK/1.2.1/package/scripts/livy_server.py +++ b/ambari-server/src/main/resources/common-services/SPARK/1.2.1/package/scripts/livy_server.py @@ -114,7 +114,7 @@ def wait_for_dfs_directory_created(self, dir_path, ignored_dfs_dirs): nameservices = namenode_ha_utils.get_nameservices(params.hdfs_site) nameservice = None if not nameservices else nameservices[-1] - if WebHDFSUtil.is_webhdfs_available(params.is_webhdfs_enabled, params.default_fs): + if WebHDFSUtil.is_webhdfs_available(params.is_webhdfs_enabled, params.dfs_type): # check with webhdfs is much faster than executing hdfs dfs -test util = WebHDFSUtil(params.hdfs_site, nameservice, params.hdfs_user, params.security_enabled) list_status = util.run_command(dir_path, 'GETFILESTATUS', method='GET', ignore_status_codes=['404'], assertable_result=False) diff --git a/ambari-server/src/main/resources/common-services/SPARK2/2.0.0/package/scripts/livy2_server.py b/ambari-server/src/main/resources/common-services/SPARK2/2.0.0/package/scripts/livy2_server.py index 492fd67bfb3..27b1d253f8c 100644 --- a/ambari-server/src/main/resources/common-services/SPARK2/2.0.0/package/scripts/livy2_server.py +++ b/ambari-server/src/main/resources/common-services/SPARK2/2.0.0/package/scripts/livy2_server.py @@ -113,7 +113,7 @@ def wait_for_dfs_directory_created(self, dir_path, ignored_dfs_dirs): nameservices = namenode_ha_utils.get_nameservices(params.hdfs_site) nameservice = None if not nameservices else nameservices[-1] - if WebHDFSUtil.is_webhdfs_available(params.is_webhdfs_enabled, params.default_fs): + if WebHDFSUtil.is_webhdfs_available(params.is_webhdfs_enabled, params.dfs_type): # check with webhdfs is much faster than executing hdfs dfs -test util = WebHDFSUtil(params.hdfs_site, nameservice, params.hdfs_user, params.security_enabled) list_status = util.run_command(dir_path, 'GETFILESTATUS', method='GET', ignore_status_codes=['404'], assertable_result=False) diff --git a/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/resourcemanager.py b/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/resourcemanager.py index 601ced80b4a..99ad69f2695 100644 --- a/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/resourcemanager.py +++ b/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/resourcemanager.py @@ -230,7 +230,7 @@ def wait_for_dfs_directory_created(self, dir_path, ignored_dfs_dirs): nameservices = namenode_ha_utils.get_nameservices(params.hdfs_site) nameservice = None if not nameservices else nameservices[-1] - if WebHDFSUtil.is_webhdfs_available(params.is_webhdfs_enabled, params.default_fs): + if WebHDFSUtil.is_webhdfs_available(params.is_webhdfs_enabled, params.dfs_type): # check with webhdfs is much faster than executing hdfs dfs -test util = WebHDFSUtil(params.hdfs_site, nameservice, params.hdfs_user, params.security_enabled) list_status = util.run_command(dir_path, 'GETFILESTATUS', method='GET', ignore_status_codes=['404'], assertable_result=False) diff --git a/ambari-server/src/main/resources/stack-hooks/before-START/files/fast-hdfs-resource.jar b/ambari-server/src/main/resources/stack-hooks/before-START/files/fast-hdfs-resource.jar index 6c993bf7568..b8f633fd24e 100644 Binary files a/ambari-server/src/main/resources/stack-hooks/before-START/files/fast-hdfs-resource.jar and b/ambari-server/src/main/resources/stack-hooks/before-START/files/fast-hdfs-resource.jar differ diff --git a/ambari-server/src/main/resources/stack-hooks/before-START/scripts/shared_initialization.py b/ambari-server/src/main/resources/stack-hooks/before-START/scripts/shared_initialization.py index 541de9cc310..c26265a4de7 100644 --- a/ambari-server/src/main/resources/stack-hooks/before-START/scripts/shared_initialization.py +++ b/ambari-server/src/main/resources/stack-hooks/before-START/scripts/shared_initialization.py @@ -65,7 +65,7 @@ def setup_hadoop(): # if WebHDFS is not enabled we need this jar to create hadoop folders and copy tarballs to HDFS. if params.sysprep_skip_copy_fast_jar_hdfs: print "Skipping copying of fast-hdfs-resource.jar as host is sys prepped" - elif params.dfs_type == 'HCFS' or not WebHDFSUtil.is_webhdfs_available(params.is_webhdfs_enabled, params.default_fs): + elif params.dfs_type == 'HCFS' or not WebHDFSUtil.is_webhdfs_available(params.is_webhdfs_enabled, params.dfs_type): # for source-code of jar goto contrib/fast-hdfs-resource File(format("{ambari_libs_dir}/fast-hdfs-resource.jar"), mode=0644, diff --git a/ambari-server/src/test/python/stacks/2.0.6/hooks/before-START/test_before_start.py b/ambari-server/src/test/python/stacks/2.0.6/hooks/before-START/test_before_start.py index 8e20d17abd6..6329ee4d3b0 100644 --- a/ambari-server/src/test/python/stacks/2.0.6/hooks/before-START/test_before_start.py +++ b/ambari-server/src/test/python/stacks/2.0.6/hooks/before-START/test_before_start.py @@ -60,6 +60,10 @@ def test_hook_default(self): create_parents = True, cd_access = 'a', ) + self.assertResourceCalled('File', '/var/lib/ambari-agent/lib/fast-hdfs-resource.jar', + content = StaticFile('fast-hdfs-resource.jar'), + mode = 0644, + ) self.assertResourceCalled('File', '/etc/hadoop/conf/commons-logging.properties', content = Template('commons-logging.properties.j2'), owner = 'hdfs', @@ -137,6 +141,10 @@ def test_hook_secured(self): create_parents = True, cd_access = 'a', ) + self.assertResourceCalled('File', '/var/lib/ambari-agent/lib/fast-hdfs-resource.jar', + content = StaticFile('fast-hdfs-resource.jar'), + mode = 0644, + ) self.assertResourceCalled('File', '/etc/hadoop/conf/commons-logging.properties', content = Template('commons-logging.properties.j2'), owner = 'root', @@ -219,6 +227,10 @@ def test_hook_default_hdfs(self): create_parents = True, cd_access = 'a', ) + self.assertResourceCalled('File', '/var/lib/ambari-agent/lib/fast-hdfs-resource.jar', + content = StaticFile('fast-hdfs-resource.jar'), + mode = 0644, + ) self.assertResourceCalled('File', '/etc/hadoop/conf/commons-logging.properties', content = Template('commons-logging.properties.j2'), owner = 'hdfs', @@ -303,6 +315,10 @@ def test_hook_refresh_topology_custom_directories(self): create_parents = True, cd_access = 'a', ) + self.assertResourceCalled('File', '/var/lib/ambari-agent/lib/fast-hdfs-resource.jar', + content = StaticFile('fast-hdfs-resource.jar'), + mode = 0644, + ) self.assertResourceCalled('File', '/etc/hadoop/conf/commons-logging.properties', content = Template('commons-logging.properties.j2'), owner = 'hdfs', diff --git a/contrib/fast-hdfs-resource/src/main/java/org/apache/ambari/fast_hdfs_resource/Resource.java b/contrib/fast-hdfs-resource/src/main/java/org/apache/ambari/fast_hdfs_resource/Resource.java index 9cbfab26246..5c7cbdac655 100644 --- a/contrib/fast-hdfs-resource/src/main/java/org/apache/ambari/fast_hdfs_resource/Resource.java +++ b/contrib/fast-hdfs-resource/src/main/java/org/apache/ambari/fast_hdfs_resource/Resource.java @@ -44,6 +44,7 @@ public class Resource { private String owner; private String group; private String mode; + private String nameservice; private boolean recursiveChown; private boolean recursiveChmod; private boolean changePermissionforParents; @@ -105,6 +106,14 @@ public void setMode(String mode) { this.mode = mode; } + public String getNameservice() { + return nameservice; + } + + public void setNameservice(String nameservice) { + this.nameservice = nameservice; + } + public boolean isRecursiveChown() { return recursiveChown; } diff --git a/contrib/fast-hdfs-resource/src/main/java/org/apache/ambari/fast_hdfs_resource/Runner.java b/contrib/fast-hdfs-resource/src/main/java/org/apache/ambari/fast_hdfs_resource/Runner.java index 98119b0047e..9cf0a734358 100644 --- a/contrib/fast-hdfs-resource/src/main/java/org/apache/ambari/fast_hdfs_resource/Runner.java +++ b/contrib/fast-hdfs-resource/src/main/java/org/apache/ambari/fast_hdfs_resource/Runner.java @@ -22,6 +22,10 @@ import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -52,52 +56,88 @@ public static void main(String[] args) Gson gson = new Gson(); Resource[] resources = null; - FileSystem dfs = null; + Map fileSystemNameToInstance = new HashMap(); + Map> fileSystemToResource = new HashMap>(); - try { - Configuration conf = new Configuration(); - dfs = FileSystem.get(conf); + try { // 3 - Load data from JSON resources = (Resource[]) gson.fromJson(new FileReader(jsonFilePath), Resource[].class); - - // 4 - Connect to HDFS - System.out.println("Using filesystem uri: " + FileSystem.getDefaultUri(conf).toString()); - dfs.initialize(FileSystem.getDefaultUri(conf), conf); + Configuration conf = new Configuration(); + FileSystem dfs = null; + + // Creating connections for (Resource resource : resources) { - System.out.println("Creating: " + resource); + String nameservice = resource.getNameservice(); + + if(!fileSystemNameToInstance.containsKey(nameservice)) { + URI fileSystemUrl; + if(nameservice == null) { + fileSystemUrl = FileSystem.getDefaultUri(conf); + } else { + fileSystemUrl = new URI(nameservice); + } - Resource.checkResourceParameters(resource, dfs); + dfs = FileSystem.get(fileSystemUrl, conf); - Path pathHadoop = null; + // 4 - Connect to DFS + System.out.println("Initializing filesystem uri: " + fileSystemUrl); + dfs.initialize(fileSystemUrl, conf); - if (resource.getAction().equals("download")) { - pathHadoop = new Path(resource.getSource()); + fileSystemNameToInstance.put(nameservice, dfs); } - else { - String path = resource.getTarget(); - pathHadoop = new Path(path); - if (!resource.isManageIfExists() && dfs.exists(pathHadoop)) { - System.out.println( - String.format("Skipping the operation for not managed DFS directory %s since immutable_paths contains it.", path) - ); - continue; - } + + if(!fileSystemToResource.containsKey(nameservice)) { + fileSystemToResource.put(nameservice, new ArrayList()); } + fileSystemToResource.get(nameservice).add(resource); + } - if (resource.getAction().equals("create")) { - // 5 - Create - Resource.createResource(resource, dfs, pathHadoop); - Resource.setMode(resource, dfs, pathHadoop); - Resource.setOwner(resource, dfs, pathHadoop); - } else if (resource.getAction().equals("delete")) { - // 6 - Delete - dfs.delete(pathHadoop, true); - } else if (resource.getAction().equals("download")) { - // 7 - Download - dfs.copyToLocalFile(pathHadoop, new Path(resource.getTarget())); + //for (Resource resource : resources) { + for (Map.Entry> entry : fileSystemToResource.entrySet()) { + String nameservice = entry.getKey(); + List resourcesNameservice = entry.getValue(); + + for(Resource resource: resourcesNameservice) { + if (nameservice != null) { + System.out.println("Creating: " + resource + " in " + nameservice); + } else { + System.out.println("Creating: " + resource + " in default filesystem"); + } + + dfs = fileSystemNameToInstance.get(nameservice); + + Resource.checkResourceParameters(resource, dfs); + + Path pathHadoop = null; + + if (resource.getAction().equals("download")) { + pathHadoop = new Path(resource.getSource()); + } else { + String path = resource.getTarget(); + pathHadoop = new Path(path); + if (!resource.isManageIfExists() && dfs.exists(pathHadoop)) { + System.out.println( + String.format("Skipping the operation for not managed DFS directory %s since immutable_paths contains it.", path) + ); + continue; + } + } + + if (resource.getAction().equals("create")) { + // 5 - Create + Resource.createResource(resource, dfs, pathHadoop); + Resource.setMode(resource, dfs, pathHadoop); + Resource.setOwner(resource, dfs, pathHadoop); + } else if (resource.getAction().equals("delete")) { + // 6 - Delete + dfs.delete(pathHadoop, true); + } else if (resource.getAction().equals("download")) { + // 7 - Download + dfs.copyToLocalFile(pathHadoop, new Path(resource.getTarget())); + } } } } @@ -106,7 +146,9 @@ public static void main(String[] args) e.printStackTrace(); } finally { - dfs.close(); + for(FileSystem dfs:fileSystemNameToInstance.values()) { + dfs.close(); + } } System.out.println("All resources created.");