Skip to content

Commit

Permalink
AMBARI-23943. Hiveserver2 fails to start on viewFS enabled cluster: {…
Browse files Browse the repository at this point in the history
…hive_server2_zookeeper_namespace} is not ready yet (aonishuk)
  • Loading branch information
aonishuk committed May 25, 2018
1 parent e9374c9 commit 3ea0ad1
Show file tree
Hide file tree
Showing 12 changed files with 147 additions and 47 deletions.
Expand Up @@ -7,9 +7,9 @@
{
"name": "namenode_process",
"service": "HDFS",
"enabled": true,
"interval": 6,
"component": "NAMENODE",
"interval": 6,
"enabled": true,
"label": "NameNode process",
"source": {
"reporting": {
Expand Down
Expand Up @@ -74,6 +74,32 @@ class HdfsResourceJar:
while execute does all the expensive creating/deleting work executing the jar with the json as parameter.
"""
def action_delayed(self, action_name, main_resource):
dfs_type = main_resource.resource.dfs_type

if main_resource.resource.nameservices is None: # all nameservices
nameservices = namenode_ha_utils.get_nameservices(main_resource.resource.hdfs_site)
else:
nameservices = main_resource.resource.nameservices

# non-federated cluster
if not nameservices:
self.action_delayed_for_nameservice(None, action_name, main_resource)
else:
for nameservice in nameservices:
try:
if not dfs_type:
raise Fail("<serviceType> for fileSystem service should be set in metainfo.xml")
nameservice = dfs_type.lower() + "://" + nameservice

self.action_delayed_for_nameservice(nameservice, action_name, main_resource)
except namenode_ha_utils.NoActiveNamenodeException as ex:
# one of ns can be down (during initial start forexample) no need to worry for federated cluster
if len(nameservices) > 1:
Logger.exception("Cannot run HdfsResource for nameservice {0}. Due to no active namenode present".format(nameservice))
else:
raise

def action_delayed_for_nameservice(self, nameservice, action_name, main_resource):
resource = {}
env = Environment.get_instance()
if not 'hdfs_files' in env.config:
Expand All @@ -90,6 +116,8 @@ def action_delayed(self, action_name, main_resource):
elif getattr(main_resource.resource, field_name):
resource[json_field_name] = getattr(main_resource.resource, field_name)

resource['nameservice'] = nameservice

# Add resource to create
env.config['hdfs_files'].append(resource)

Expand Down Expand Up @@ -159,9 +187,9 @@ def __init__(self, hdfs_site, nameservice, run_user, security_enabled, logoutput
self.logoutput = logoutput

@staticmethod
def is_webhdfs_available(is_webhdfs_enabled, default_fs):
def is_webhdfs_available(is_webhdfs_enabled, dfs_type):
# only hdfs seems to support webHDFS
return (is_webhdfs_enabled and default_fs.startswith("hdfs"))
return (is_webhdfs_enabled and dfs_type == 'HDFS')

def run_command(self, *args, **kwargs):
"""
Expand Down Expand Up @@ -562,11 +590,17 @@ def _fill_directories_list(self, target, results):
class HdfsResourceProvider(Provider):
def __init__(self, resource):
super(HdfsResourceProvider,self).__init__(resource)

self.assert_parameter_is_set('dfs_type')
self.fsType = getattr(resource, 'dfs_type')

self.ignored_resources_list = HdfsResourceProvider.get_ignored_resources_list(self.resource.hdfs_resource_ignore_file)
if self.fsType != 'HCFS':

if self.fsType == 'HDFS':
self.assert_parameter_is_set('hdfs_site')
self.webhdfs_enabled = self.resource.hdfs_site['dfs.webhdfs.enabled']
else:
self.webhdfs_enabled = False

@staticmethod
def parse_path(path):
Expand Down Expand Up @@ -629,9 +663,7 @@ def action_execute(self):
self.get_hdfs_resource_executor().action_execute(self)

def get_hdfs_resource_executor(self):
if self.fsType == 'HCFS':
return HdfsResourceJar()
elif WebHDFSUtil.is_webhdfs_available(self.webhdfs_enabled, self.resource.default_fs):
if WebHDFSUtil.is_webhdfs_available(self.webhdfs_enabled, self.fsType):
return HdfsResourceWebHDFS()
else:
return HdfsResourceJar()
Expand Down
Expand Up @@ -21,6 +21,7 @@
<service>
<name>HDFS</name>
<displayName>HDFS</displayName>
<serviceType>HDFS</serviceType> <!-- This tag is used only for main fileSystem service. It sets filesystem schema for ambari -->
<comment>Apache Hadoop Distributed File System</comment>
<version>2.1.0.2.0</version>

Expand Down
Expand Up @@ -139,7 +139,7 @@ def oozie_service(action = 'start', upgrade_type=None):
params.HdfsResource(None, action="execute")

hdfs_share_dir_exists = True # skip time-expensive hadoop fs -ls check
elif WebHDFSUtil.is_webhdfs_available(params.is_webhdfs_enabled, params.default_fs):
elif WebHDFSUtil.is_webhdfs_available(params.is_webhdfs_enabled, params.dfs_type):
# check with webhdfs is much faster than executing hadoop fs -ls.
util = WebHDFSUtil(params.hdfs_site, nameservice, params.oozie_user, params.security_enabled)
list_status = util.run_command(params.hdfs_share_dir, 'GETFILESTATUS', method='GET', ignore_status_codes=['404'], assertable_result=False)
Expand Down
Expand Up @@ -114,7 +114,7 @@ def wait_for_dfs_directory_created(self, dir_path, ignored_dfs_dirs):
nameservices = namenode_ha_utils.get_nameservices(params.hdfs_site)
nameservice = None if not nameservices else nameservices[-1]

if WebHDFSUtil.is_webhdfs_available(params.is_webhdfs_enabled, params.default_fs):
if WebHDFSUtil.is_webhdfs_available(params.is_webhdfs_enabled, params.dfs_type):
# check with webhdfs is much faster than executing hdfs dfs -test
util = WebHDFSUtil(params.hdfs_site, nameservice, params.hdfs_user, params.security_enabled)
list_status = util.run_command(dir_path, 'GETFILESTATUS', method='GET', ignore_status_codes=['404'], assertable_result=False)
Expand Down
Expand Up @@ -113,7 +113,7 @@ def wait_for_dfs_directory_created(self, dir_path, ignored_dfs_dirs):
nameservices = namenode_ha_utils.get_nameservices(params.hdfs_site)
nameservice = None if not nameservices else nameservices[-1]

if WebHDFSUtil.is_webhdfs_available(params.is_webhdfs_enabled, params.default_fs):
if WebHDFSUtil.is_webhdfs_available(params.is_webhdfs_enabled, params.dfs_type):
# check with webhdfs is much faster than executing hdfs dfs -test
util = WebHDFSUtil(params.hdfs_site, nameservice, params.hdfs_user, params.security_enabled)
list_status = util.run_command(dir_path, 'GETFILESTATUS', method='GET', ignore_status_codes=['404'], assertable_result=False)
Expand Down
Expand Up @@ -230,7 +230,7 @@ def wait_for_dfs_directory_created(self, dir_path, ignored_dfs_dirs):
nameservices = namenode_ha_utils.get_nameservices(params.hdfs_site)
nameservice = None if not nameservices else nameservices[-1]

if WebHDFSUtil.is_webhdfs_available(params.is_webhdfs_enabled, params.default_fs):
if WebHDFSUtil.is_webhdfs_available(params.is_webhdfs_enabled, params.dfs_type):
# check with webhdfs is much faster than executing hdfs dfs -test
util = WebHDFSUtil(params.hdfs_site, nameservice, params.hdfs_user, params.security_enabled)
list_status = util.run_command(dir_path, 'GETFILESTATUS', method='GET', ignore_status_codes=['404'], assertable_result=False)
Expand Down
Binary file not shown.
Expand Up @@ -65,7 +65,7 @@ def setup_hadoop():
# if WebHDFS is not enabled we need this jar to create hadoop folders and copy tarballs to HDFS.
if params.sysprep_skip_copy_fast_jar_hdfs:
print "Skipping copying of fast-hdfs-resource.jar as host is sys prepped"
elif params.dfs_type == 'HCFS' or not WebHDFSUtil.is_webhdfs_available(params.is_webhdfs_enabled, params.default_fs):
elif params.dfs_type == 'HCFS' or not WebHDFSUtil.is_webhdfs_available(params.is_webhdfs_enabled, params.dfs_type):
# for source-code of jar goto contrib/fast-hdfs-resource
File(format("{ambari_libs_dir}/fast-hdfs-resource.jar"),
mode=0644,
Expand Down
Expand Up @@ -60,6 +60,10 @@ def test_hook_default(self):
create_parents = True,
cd_access = 'a',
)
self.assertResourceCalled('File', '/var/lib/ambari-agent/lib/fast-hdfs-resource.jar',
content = StaticFile('fast-hdfs-resource.jar'),
mode = 0644,
)
self.assertResourceCalled('File', '/etc/hadoop/conf/commons-logging.properties',
content = Template('commons-logging.properties.j2'),
owner = 'hdfs',
Expand Down Expand Up @@ -137,6 +141,10 @@ def test_hook_secured(self):
create_parents = True,
cd_access = 'a',
)
self.assertResourceCalled('File', '/var/lib/ambari-agent/lib/fast-hdfs-resource.jar',
content = StaticFile('fast-hdfs-resource.jar'),
mode = 0644,
)
self.assertResourceCalled('File', '/etc/hadoop/conf/commons-logging.properties',
content = Template('commons-logging.properties.j2'),
owner = 'root',
Expand Down Expand Up @@ -219,6 +227,10 @@ def test_hook_default_hdfs(self):
create_parents = True,
cd_access = 'a',
)
self.assertResourceCalled('File', '/var/lib/ambari-agent/lib/fast-hdfs-resource.jar',
content = StaticFile('fast-hdfs-resource.jar'),
mode = 0644,
)
self.assertResourceCalled('File', '/etc/hadoop/conf/commons-logging.properties',
content = Template('commons-logging.properties.j2'),
owner = 'hdfs',
Expand Down Expand Up @@ -303,6 +315,10 @@ def test_hook_refresh_topology_custom_directories(self):
create_parents = True,
cd_access = 'a',
)
self.assertResourceCalled('File', '/var/lib/ambari-agent/lib/fast-hdfs-resource.jar',
content = StaticFile('fast-hdfs-resource.jar'),
mode = 0644,
)
self.assertResourceCalled('File', '/etc/hadoop/conf/commons-logging.properties',
content = Template('commons-logging.properties.j2'),
owner = 'hdfs',
Expand Down
Expand Up @@ -44,6 +44,7 @@ public class Resource {
private String owner;
private String group;
private String mode;
private String nameservice;
private boolean recursiveChown;
private boolean recursiveChmod;
private boolean changePermissionforParents;
Expand Down Expand Up @@ -105,6 +106,14 @@ public void setMode(String mode) {
this.mode = mode;
}

public String getNameservice() {
return nameservice;
}

public void setNameservice(String nameservice) {
this.nameservice = nameservice;
}

public boolean isRecursiveChown() {
return recursiveChown;
}
Expand Down
Expand Up @@ -22,6 +22,10 @@
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -52,52 +56,88 @@ public static void main(String[] args)

Gson gson = new Gson();
Resource[] resources = null;
FileSystem dfs = null;
Map<String, FileSystem> fileSystemNameToInstance = new HashMap<String, FileSystem>();
Map<String, List<Resource>> fileSystemToResource = new HashMap<String, List<Resource>>();

try {
Configuration conf = new Configuration();
dfs = FileSystem.get(conf);

try {
// 3 - Load data from JSON
resources = (Resource[]) gson.fromJson(new FileReader(jsonFilePath),
Resource[].class);

// 4 - Connect to HDFS
System.out.println("Using filesystem uri: " + FileSystem.getDefaultUri(conf).toString());
dfs.initialize(FileSystem.getDefaultUri(conf), conf);

Configuration conf = new Configuration();
FileSystem dfs = null;

// Creating connections
for (Resource resource : resources) {
System.out.println("Creating: " + resource);
String nameservice = resource.getNameservice();

if(!fileSystemNameToInstance.containsKey(nameservice)) {
URI fileSystemUrl;
if(nameservice == null) {
fileSystemUrl = FileSystem.getDefaultUri(conf);
} else {
fileSystemUrl = new URI(nameservice);
}

Resource.checkResourceParameters(resource, dfs);
dfs = FileSystem.get(fileSystemUrl, conf);

Path pathHadoop = null;
// 4 - Connect to DFS
System.out.println("Initializing filesystem uri: " + fileSystemUrl);
dfs.initialize(fileSystemUrl, conf);

if (resource.getAction().equals("download")) {
pathHadoop = new Path(resource.getSource());
fileSystemNameToInstance.put(nameservice, dfs);
}
else {
String path = resource.getTarget();
pathHadoop = new Path(path);
if (!resource.isManageIfExists() && dfs.exists(pathHadoop)) {
System.out.println(
String.format("Skipping the operation for not managed DFS directory %s since immutable_paths contains it.", path)
);
continue;
}

if(!fileSystemToResource.containsKey(nameservice)) {
fileSystemToResource.put(nameservice, new ArrayList<Resource>());
}
fileSystemToResource.get(nameservice).add(resource);
}

if (resource.getAction().equals("create")) {
// 5 - Create
Resource.createResource(resource, dfs, pathHadoop);
Resource.setMode(resource, dfs, pathHadoop);
Resource.setOwner(resource, dfs, pathHadoop);
} else if (resource.getAction().equals("delete")) {
// 6 - Delete
dfs.delete(pathHadoop, true);
} else if (resource.getAction().equals("download")) {
// 7 - Download
dfs.copyToLocalFile(pathHadoop, new Path(resource.getTarget()));
//for (Resource resource : resources) {
for (Map.Entry<String, List<Resource>> entry : fileSystemToResource.entrySet()) {
String nameservice = entry.getKey();
List<Resource> resourcesNameservice = entry.getValue();

for(Resource resource: resourcesNameservice) {
if (nameservice != null) {
System.out.println("Creating: " + resource + " in " + nameservice);
} else {
System.out.println("Creating: " + resource + " in default filesystem");
}

dfs = fileSystemNameToInstance.get(nameservice);

Resource.checkResourceParameters(resource, dfs);

Path pathHadoop = null;

if (resource.getAction().equals("download")) {
pathHadoop = new Path(resource.getSource());
} else {
String path = resource.getTarget();
pathHadoop = new Path(path);
if (!resource.isManageIfExists() && dfs.exists(pathHadoop)) {
System.out.println(
String.format("Skipping the operation for not managed DFS directory %s since immutable_paths contains it.", path)
);
continue;
}
}

if (resource.getAction().equals("create")) {
// 5 - Create
Resource.createResource(resource, dfs, pathHadoop);
Resource.setMode(resource, dfs, pathHadoop);
Resource.setOwner(resource, dfs, pathHadoop);
} else if (resource.getAction().equals("delete")) {
// 6 - Delete
dfs.delete(pathHadoop, true);
} else if (resource.getAction().equals("download")) {
// 7 - Download
dfs.copyToLocalFile(pathHadoop, new Path(resource.getTarget()));
}
}
}
}
Expand All @@ -106,7 +146,9 @@ public static void main(String[] args)
e.printStackTrace();
}
finally {
dfs.close();
for(FileSystem dfs:fileSystemNameToInstance.values()) {
dfs.close();
}
}

System.out.println("All resources created.");
Expand Down

0 comments on commit 3ea0ad1

Please sign in to comment.