Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AMBARI-23943. Hiveserver2 fails to start on viewFS enabled cluster #1365

Merged
merged 1 commit into from May 24, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -7,9 +7,9 @@
{
"name": "namenode_process",
"service": "HDFS",
"enabled": true,
"interval": 6,
"component": "NAMENODE",
"interval": 6,
"enabled": true,
"label": "NameNode process",
"source": {
"reporting": {
Expand Down
Expand Up @@ -74,6 +74,32 @@ class HdfsResourceJar:
while execute does all the expensive creating/deleting work executing the jar with the json as parameter.
"""
def action_delayed(self, action_name, main_resource):
dfs_type = main_resource.resource.dfs_type

if main_resource.resource.nameservices is None: # all nameservices
nameservices = namenode_ha_utils.get_nameservices(main_resource.resource.hdfs_site)
else:
nameservices = main_resource.resource.nameservices

# non-federated cluster
if not nameservices:
self.action_delayed_for_nameservice(None, action_name, main_resource)
else:
for nameservice in nameservices:
try:
if not dfs_type:
raise Fail("<serviceType> for fileSystem service should be set in metainfo.xml")
nameservice = dfs_type.lower() + "://" + nameservice

self.action_delayed_for_nameservice(nameservice, action_name, main_resource)
except namenode_ha_utils.NoActiveNamenodeException as ex:
# one of ns can be down (during initial start forexample) no need to worry for federated cluster
if len(nameservices) > 1:
Logger.exception("Cannot run HdfsResource for nameservice {0}. Due to no active namenode present".format(nameservice))
else:
raise

def action_delayed_for_nameservice(self, nameservice, action_name, main_resource):
resource = {}
env = Environment.get_instance()
if not 'hdfs_files' in env.config:
Expand All @@ -90,6 +116,8 @@ def action_delayed(self, action_name, main_resource):
elif getattr(main_resource.resource, field_name):
resource[json_field_name] = getattr(main_resource.resource, field_name)

resource['nameservice'] = nameservice

# Add resource to create
env.config['hdfs_files'].append(resource)

Expand Down Expand Up @@ -159,9 +187,9 @@ def __init__(self, hdfs_site, nameservice, run_user, security_enabled, logoutput
self.logoutput = logoutput

@staticmethod
def is_webhdfs_available(is_webhdfs_enabled, default_fs):
def is_webhdfs_available(is_webhdfs_enabled, dfs_type):
# only hdfs seems to support webHDFS
return (is_webhdfs_enabled and default_fs.startswith("hdfs"))
return (is_webhdfs_enabled and dfs_type == 'HDFS')

def run_command(self, *args, **kwargs):
"""
Expand Down Expand Up @@ -562,11 +590,17 @@ def _fill_directories_list(self, target, results):
class HdfsResourceProvider(Provider):
def __init__(self, resource):
super(HdfsResourceProvider,self).__init__(resource)

self.assert_parameter_is_set('dfs_type')
self.fsType = getattr(resource, 'dfs_type')

self.ignored_resources_list = HdfsResourceProvider.get_ignored_resources_list(self.resource.hdfs_resource_ignore_file)
if self.fsType != 'HCFS':

if self.fsType == 'HDFS':
self.assert_parameter_is_set('hdfs_site')
self.webhdfs_enabled = self.resource.hdfs_site['dfs.webhdfs.enabled']
else:
self.webhdfs_enabled = False

@staticmethod
def parse_path(path):
Expand Down Expand Up @@ -629,9 +663,7 @@ def action_execute(self):
self.get_hdfs_resource_executor().action_execute(self)

def get_hdfs_resource_executor(self):
if self.fsType == 'HCFS':
return HdfsResourceJar()
elif WebHDFSUtil.is_webhdfs_available(self.webhdfs_enabled, self.resource.default_fs):
if WebHDFSUtil.is_webhdfs_available(self.webhdfs_enabled, self.fsType):
return HdfsResourceWebHDFS()
else:
return HdfsResourceJar()
Expand Down
Expand Up @@ -21,6 +21,7 @@
<service>
<name>HDFS</name>
<displayName>HDFS</displayName>
<serviceType>HDFS</serviceType> <!-- This tag is used only for main fileSystem service. It sets filesystem schema for ambari -->
<comment>Apache Hadoop Distributed File System</comment>
<version>2.1.0.2.0</version>

Expand Down
Expand Up @@ -139,7 +139,7 @@ def oozie_service(action = 'start', upgrade_type=None):
params.HdfsResource(None, action="execute")

hdfs_share_dir_exists = True # skip time-expensive hadoop fs -ls check
elif WebHDFSUtil.is_webhdfs_available(params.is_webhdfs_enabled, params.default_fs):
elif WebHDFSUtil.is_webhdfs_available(params.is_webhdfs_enabled, params.dfs_type):
# check with webhdfs is much faster than executing hadoop fs -ls.
util = WebHDFSUtil(params.hdfs_site, nameservice, params.oozie_user, params.security_enabled)
list_status = util.run_command(params.hdfs_share_dir, 'GETFILESTATUS', method='GET', ignore_status_codes=['404'], assertable_result=False)
Expand Down
Expand Up @@ -114,7 +114,7 @@ def wait_for_dfs_directory_created(self, dir_path, ignored_dfs_dirs):
nameservices = namenode_ha_utils.get_nameservices(params.hdfs_site)
nameservice = None if not nameservices else nameservices[-1]

if WebHDFSUtil.is_webhdfs_available(params.is_webhdfs_enabled, params.default_fs):
if WebHDFSUtil.is_webhdfs_available(params.is_webhdfs_enabled, params.dfs_type):
# check with webhdfs is much faster than executing hdfs dfs -test
util = WebHDFSUtil(params.hdfs_site, nameservice, params.hdfs_user, params.security_enabled)
list_status = util.run_command(dir_path, 'GETFILESTATUS', method='GET', ignore_status_codes=['404'], assertable_result=False)
Expand Down
Expand Up @@ -113,7 +113,7 @@ def wait_for_dfs_directory_created(self, dir_path, ignored_dfs_dirs):
nameservices = namenode_ha_utils.get_nameservices(params.hdfs_site)
nameservice = None if not nameservices else nameservices[-1]

if WebHDFSUtil.is_webhdfs_available(params.is_webhdfs_enabled, params.default_fs):
if WebHDFSUtil.is_webhdfs_available(params.is_webhdfs_enabled, params.dfs_type):
# check with webhdfs is much faster than executing hdfs dfs -test
util = WebHDFSUtil(params.hdfs_site, nameservice, params.hdfs_user, params.security_enabled)
list_status = util.run_command(dir_path, 'GETFILESTATUS', method='GET', ignore_status_codes=['404'], assertable_result=False)
Expand Down
Expand Up @@ -230,7 +230,7 @@ def wait_for_dfs_directory_created(self, dir_path, ignored_dfs_dirs):
nameservices = namenode_ha_utils.get_nameservices(params.hdfs_site)
nameservice = None if not nameservices else nameservices[-1]

if WebHDFSUtil.is_webhdfs_available(params.is_webhdfs_enabled, params.default_fs):
if WebHDFSUtil.is_webhdfs_available(params.is_webhdfs_enabled, params.dfs_type):
# check with webhdfs is much faster than executing hdfs dfs -test
util = WebHDFSUtil(params.hdfs_site, nameservice, params.hdfs_user, params.security_enabled)
list_status = util.run_command(dir_path, 'GETFILESTATUS', method='GET', ignore_status_codes=['404'], assertable_result=False)
Expand Down
Binary file not shown.
Expand Up @@ -65,7 +65,7 @@ def setup_hadoop():
# if WebHDFS is not enabled we need this jar to create hadoop folders and copy tarballs to HDFS.
if params.sysprep_skip_copy_fast_jar_hdfs:
print "Skipping copying of fast-hdfs-resource.jar as host is sys prepped"
elif params.dfs_type == 'HCFS' or not WebHDFSUtil.is_webhdfs_available(params.is_webhdfs_enabled, params.default_fs):
elif params.dfs_type == 'HCFS' or not WebHDFSUtil.is_webhdfs_available(params.is_webhdfs_enabled, params.dfs_type):
# for source-code of jar goto contrib/fast-hdfs-resource
File(format("{ambari_libs_dir}/fast-hdfs-resource.jar"),
mode=0644,
Expand Down
Expand Up @@ -60,6 +60,10 @@ def test_hook_default(self):
create_parents = True,
cd_access = 'a',
)
self.assertResourceCalled('File', '/var/lib/ambari-agent/lib/fast-hdfs-resource.jar',
content = StaticFile('fast-hdfs-resource.jar'),
mode = 0644,
)
self.assertResourceCalled('File', '/etc/hadoop/conf/commons-logging.properties',
content = Template('commons-logging.properties.j2'),
owner = 'hdfs',
Expand Down Expand Up @@ -137,6 +141,10 @@ def test_hook_secured(self):
create_parents = True,
cd_access = 'a',
)
self.assertResourceCalled('File', '/var/lib/ambari-agent/lib/fast-hdfs-resource.jar',
content = StaticFile('fast-hdfs-resource.jar'),
mode = 0644,
)
self.assertResourceCalled('File', '/etc/hadoop/conf/commons-logging.properties',
content = Template('commons-logging.properties.j2'),
owner = 'root',
Expand Down Expand Up @@ -219,6 +227,10 @@ def test_hook_default_hdfs(self):
create_parents = True,
cd_access = 'a',
)
self.assertResourceCalled('File', '/var/lib/ambari-agent/lib/fast-hdfs-resource.jar',
content = StaticFile('fast-hdfs-resource.jar'),
mode = 0644,
)
self.assertResourceCalled('File', '/etc/hadoop/conf/commons-logging.properties',
content = Template('commons-logging.properties.j2'),
owner = 'hdfs',
Expand Down Expand Up @@ -303,6 +315,10 @@ def test_hook_refresh_topology_custom_directories(self):
create_parents = True,
cd_access = 'a',
)
self.assertResourceCalled('File', '/var/lib/ambari-agent/lib/fast-hdfs-resource.jar',
content = StaticFile('fast-hdfs-resource.jar'),
mode = 0644,
)
self.assertResourceCalled('File', '/etc/hadoop/conf/commons-logging.properties',
content = Template('commons-logging.properties.j2'),
owner = 'hdfs',
Expand Down
Expand Up @@ -44,6 +44,7 @@ public class Resource {
private String owner;
private String group;
private String mode;
private String nameservice;
private boolean recursiveChown;
private boolean recursiveChmod;
private boolean changePermissionforParents;
Expand Down Expand Up @@ -105,6 +106,14 @@ public void setMode(String mode) {
this.mode = mode;
}

public String getNameservice() {
return nameservice;
}

public void setNameservice(String nameservice) {
this.nameservice = nameservice;
}

public boolean isRecursiveChown() {
return recursiveChown;
}
Expand Down
Expand Up @@ -22,6 +22,10 @@
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -52,52 +56,88 @@ public static void main(String[] args)

Gson gson = new Gson();
Resource[] resources = null;
FileSystem dfs = null;
Map<String, FileSystem> fileSystemNameToInstance = new HashMap<String, FileSystem>();
Map<String, List<Resource>> fileSystemToResource = new HashMap<String, List<Resource>>();

try {
Configuration conf = new Configuration();
dfs = FileSystem.get(conf);

try {
// 3 - Load data from JSON
resources = (Resource[]) gson.fromJson(new FileReader(jsonFilePath),
Resource[].class);

// 4 - Connect to HDFS
System.out.println("Using filesystem uri: " + FileSystem.getDefaultUri(conf).toString());
dfs.initialize(FileSystem.getDefaultUri(conf), conf);

Configuration conf = new Configuration();
FileSystem dfs = null;

// Creating connections
for (Resource resource : resources) {
System.out.println("Creating: " + resource);
String nameservice = resource.getNameservice();

if(!fileSystemNameToInstance.containsKey(nameservice)) {
URI fileSystemUrl;
if(nameservice == null) {
fileSystemUrl = FileSystem.getDefaultUri(conf);
} else {
fileSystemUrl = new URI(nameservice);
}

Resource.checkResourceParameters(resource, dfs);
dfs = FileSystem.get(fileSystemUrl, conf);

Path pathHadoop = null;
// 4 - Connect to DFS
System.out.println("Initializing filesystem uri: " + fileSystemUrl);
dfs.initialize(fileSystemUrl, conf);

if (resource.getAction().equals("download")) {
pathHadoop = new Path(resource.getSource());
fileSystemNameToInstance.put(nameservice, dfs);
}
else {
String path = resource.getTarget();
pathHadoop = new Path(path);
if (!resource.isManageIfExists() && dfs.exists(pathHadoop)) {
System.out.println(
String.format("Skipping the operation for not managed DFS directory %s since immutable_paths contains it.", path)
);
continue;
}

if(!fileSystemToResource.containsKey(nameservice)) {
fileSystemToResource.put(nameservice, new ArrayList<Resource>());
}
fileSystemToResource.get(nameservice).add(resource);
}

if (resource.getAction().equals("create")) {
// 5 - Create
Resource.createResource(resource, dfs, pathHadoop);
Resource.setMode(resource, dfs, pathHadoop);
Resource.setOwner(resource, dfs, pathHadoop);
} else if (resource.getAction().equals("delete")) {
// 6 - Delete
dfs.delete(pathHadoop, true);
} else if (resource.getAction().equals("download")) {
// 7 - Download
dfs.copyToLocalFile(pathHadoop, new Path(resource.getTarget()));
//for (Resource resource : resources) {
for (Map.Entry<String, List<Resource>> entry : fileSystemToResource.entrySet()) {
String nameservice = entry.getKey();
List<Resource> resourcesNameservice = entry.getValue();

for(Resource resource: resourcesNameservice) {
if (nameservice != null) {
System.out.println("Creating: " + resource + " in " + nameservice);
} else {
System.out.println("Creating: " + resource + " in default filesystem");
}

dfs = fileSystemNameToInstance.get(nameservice);

Resource.checkResourceParameters(resource, dfs);

Path pathHadoop = null;

if (resource.getAction().equals("download")) {
pathHadoop = new Path(resource.getSource());
} else {
String path = resource.getTarget();
pathHadoop = new Path(path);
if (!resource.isManageIfExists() && dfs.exists(pathHadoop)) {
System.out.println(
String.format("Skipping the operation for not managed DFS directory %s since immutable_paths contains it.", path)
);
continue;
}
}

if (resource.getAction().equals("create")) {
// 5 - Create
Resource.createResource(resource, dfs, pathHadoop);
Resource.setMode(resource, dfs, pathHadoop);
Resource.setOwner(resource, dfs, pathHadoop);
} else if (resource.getAction().equals("delete")) {
// 6 - Delete
dfs.delete(pathHadoop, true);
} else if (resource.getAction().equals("download")) {
// 7 - Download
dfs.copyToLocalFile(pathHadoop, new Path(resource.getTarget()));
}
}
}
}
Expand All @@ -106,7 +146,9 @@ public static void main(String[] args)
e.printStackTrace();
}
finally {
dfs.close();
for(FileSystem dfs:fileSystemNameToInstance.values()) {
dfs.close();
}
}

System.out.println("All resources created.");
Expand Down