Skip to content

Commit

Permalink
AMBARI-4862. Hive execute JAR needs to be installed on HDFS. (Siddhar…
Browse files Browse the repository at this point in the history
…th Wagle via mahadev)
  • Loading branch information
Mahadev Konar committed Feb 27, 2014
1 parent 0ee9b81 commit 9acbc10
Show file tree
Hide file tree
Showing 8 changed files with 100 additions and 28 deletions.
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ def action_run(self):
path = self.resource.path path = self.resource.path
dest_dir = self.resource.dest_dir dest_dir = self.resource.dest_dir
kinnit_if_needed = self.resource.kinnit_if_needed kinnit_if_needed = self.resource.kinnit_if_needed
stub_path = self.resource.stub_path
owner = self.resource.owner owner = self.resource.owner
group = self.resource.group group = self.resource.group
mode = self.resource.mode mode = self.resource.mode
hdfs_usr=self.resource.hdfs_user
hadoop_conf_path = self.resource.hadoop_conf_dir hadoop_conf_path = self.resource.hadoop_conf_dir


copy_cmd = format("fs -copyFromLocal {path} {dest_dir}") copy_cmd = format("fs -copyFromLocal {path} {dest_dir}")
Expand All @@ -39,7 +39,7 @@ def action_run(self):
ExecuteHadoop(copy_cmd, ExecuteHadoop(copy_cmd,
not_if=unless_cmd, not_if=unless_cmd,
user=owner, user=owner,
conf_dir=hadoop_conf_path, conf_dir=hadoop_conf_path
) )


if not owner: if not owner:
Expand All @@ -54,14 +54,15 @@ def action_run(self):
chown_cmd = format("fs -chown {chown} {dest_dir}") chown_cmd = format("fs -chown {chown} {dest_dir}")


ExecuteHadoop(chown_cmd, ExecuteHadoop(chown_cmd,
user=owner, user=hdfs_usr,
conf_dir=hadoop_conf_path) conf_dir=hadoop_conf_path)
pass pass


if mode: if mode:
chmod_cmd = format('fs -chmod {mode} {dest_dir}') dir_mode = oct(mode)
chmod_cmd = format('fs -chmod {dir_mode} {dest_dir}')


ExecuteHadoop(chmod_cmd, ExecuteHadoop(chmod_cmd,
user=owner, user=hdfs_usr,
conf_dir=hadoop_conf_path) conf_dir=hadoop_conf_path)
pass pass
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ class CopyFromLocal(Resource):
action = ForcedListArgument(default="run") action = ForcedListArgument(default="run")


path = ResourceArgument(default=lambda obj: obj.name) path = ResourceArgument(default=lambda obj: obj.name)
dest_dir = ResourceArgument() dest_dir = ResourceArgument(required=True)
owner = ResourceArgument() owner = ResourceArgument(required=True)
group = ResourceArgument() group = ResourceArgument()
mode = ResourceArgument() mode = ResourceArgument()
kinnit_if_needed = ResourceArgument(default='') kinnit_if_needed = ResourceArgument(default='')
hadoop_conf_dir = ResourceArgument(default='/etc/hadoop/conf') hadoop_conf_dir = ResourceArgument(default='/etc/hadoop/conf')
hdfs_user = ResourceArgument(default='hdfs')


actions = Resource.actions + ["run"] actions = Resource.actions + ["run"]
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -30,17 +30,16 @@ def test_run_default_args(self, execute_hadoop_mock):
owner='user1', owner='user1',
dest_dir='/apps/test/', dest_dir='/apps/test/',
kinnit_if_needed='', kinnit_if_needed='',
stub_path='/tmp/test_stub', hdfs_user='hdfs'
ignore_on_failure=True
) )
self.assertEqual(execute_hadoop_mock.call_count, 2) self.assertEqual(execute_hadoop_mock.call_count, 2)
call_arg_list = execute_hadoop_mock.call_args_list call_arg_list = execute_hadoop_mock.call_args_list
self.assertEqual('fs -copyFromLocal /user/testdir/*.files /apps/test/', self.assertEqual('fs -copyFromLocal /user/testdir/*.files /apps/test/',
call_arg_list[0][0][0].command) call_arg_list[0][0][0].command)
self.assertEquals({'not_if': ' hadoop fs -ls /tmp/test_stub >/dev/null 2>&1', 'ignore_failures': False, 'user': 'user1', 'conf_dir': '/etc/hadoop/conf'}, self.assertEquals({'not_if': ' hadoop fs -ls /apps/test/ >/dev/null 2>&1', 'user': 'user1', 'conf_dir': '/etc/hadoop/conf'},
call_arg_list[0][0][0].arguments) call_arg_list[0][0][0].arguments)
self.assertEquals('fs -chown user1 /apps/test/', call_arg_list[1][0][0].command) self.assertEquals('fs -chown user1 /apps/test/', call_arg_list[1][0][0].command)
self.assertEquals({'user': 'user1', 'conf_dir': '/etc/hadoop/conf'}, call_arg_list[1][0][0].arguments) self.assertEquals({'user': 'hdfs', 'conf_dir': '/etc/hadoop/conf'}, call_arg_list[1][0][0].arguments)




@patch("resource_management.libraries.providers.execute_hadoop.ExecuteHadoopProvider") @patch("resource_management.libraries.providers.execute_hadoop.ExecuteHadoopProvider")
Expand All @@ -52,16 +51,15 @@ def test_run_with_chmod(self, execute_hadoop_mock):
group='hdfs', group='hdfs',
dest_dir='/apps/test/', dest_dir='/apps/test/',
kinnit_if_needed='', kinnit_if_needed='',
stub_path='/tmp/test_stub', hdfs_user='hdfs'
ignore_on_failure=False
) )
self.assertEqual(execute_hadoop_mock.call_count, 3) self.assertEqual(execute_hadoop_mock.call_count, 3)
call_arg_list = execute_hadoop_mock.call_args_list call_arg_list = execute_hadoop_mock.call_args_list
self.assertEqual('fs -copyFromLocal /user/testdir/*.files /apps/test/', self.assertEqual('fs -copyFromLocal /user/testdir/*.files /apps/test/',
call_arg_list[0][0][0].command) call_arg_list[0][0][0].command)
self.assertEquals({'not_if': ' hadoop fs -ls /tmp/test_stub >/dev/null 2>&1', 'ignore_failures': False, 'user': 'user1', 'conf_dir': '/etc/hadoop/conf'}, self.assertEquals({'not_if': ' hadoop fs -ls /apps/test/ >/dev/null 2>&1', 'user': 'user1', 'conf_dir': '/etc/hadoop/conf'},
call_arg_list[0][0][0].arguments) call_arg_list[0][0][0].arguments)
self.assertEquals('fs -chown user1:hdfs /apps/test/', call_arg_list[1][0][0].command) self.assertEquals('fs -chown user1:hdfs /apps/test/', call_arg_list[1][0][0].command)
self.assertEquals({'user': 'user1', 'conf_dir': '/etc/hadoop/conf'}, call_arg_list[1][0][0].arguments) self.assertEquals({'user': 'hdfs', 'conf_dir': '/etc/hadoop/conf'}, call_arg_list[1][0][0].arguments)




Original file line number Original file line Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def start(self, env):
env.set_params(params) env.set_params(params)
self.configure(env) # FOR SECURITY self.configure(env) # FOR SECURITY
self.install_tez_jars(params) # Put tez jars in hdfs self.install_tez_jars(params) # Put tez jars in hdfs
self.install_hive_exec_jar(params) # Put hive exec jar in hdfs
hive_service( 'hiveserver2', hive_service( 'hiveserver2',
action = 'start' action = 'start'
) )
Expand All @@ -59,6 +60,41 @@ def status(self, env):
# Recursively check all existing gmetad pid files # Recursively check all existing gmetad pid files
check_process_status(pid_file) check_process_status(pid_file)


def install_hive_exec_jar(self, params):
hdfs_path_prefix = 'hdfs://'
if params.tez_lib_uris:
hdfs_path = params.hive_exec_hdfs_path

if hdfs_path.strip().find(hdfs_path_prefix, 0) != -1:
hdfs_path = hdfs_path.replace(hdfs_path_prefix, '')
pass

params.HdfsDirectory(hdfs_path,
action="create",
owner=params.hive_user,
mode=0755
)

if params.security_enabled:
kinit_if_needed = format("{kinit_path_local} -kt {hdfs_user_keytab} {hdfs_user};")
else:
kinit_if_needed = ""

if kinit_if_needed:
Execute(kinit_if_needed,
user=params.tez_user,
path='/bin'
)

CopyFromLocal(params.hive_exec_jar_path,
mode=0655,
owner=params.hive_user,
dest_dir=hdfs_path,
kinnit_if_needed=kinit_if_needed,
hdfs_user=params.hdfs_user
)
pass

def install_tez_jars(self, params): def install_tez_jars(self, params):
destination_hdfs_dirs = get_tez_hdfs_dir_paths(params.tez_lib_uris) destination_hdfs_dirs = get_tez_hdfs_dir_paths(params.tez_lib_uris)


Expand All @@ -83,6 +119,7 @@ def install_tez_jars(self, params):
user=params.tez_user, user=params.tez_user,
path='/bin' path='/bin'
) )
pass


app_dir_path = None app_dir_path = None
lib_dir_path = None lib_dir_path = None
Expand All @@ -102,7 +139,8 @@ def install_tez_jars(self, params):
mode=0655, mode=0655,
owner=params.tez_user, owner=params.tez_user,
dest_dir=app_dir_path, dest_dir=app_dir_path,
kinnit_if_needed=kinit_if_needed kinnit_if_needed=kinit_if_needed,
hdfs_user=params.hdfs_user
) )
pass pass


Expand All @@ -112,6 +150,7 @@ def install_tez_jars(self, params):
owner=params.tez_user, owner=params.tez_user,
dest_dir=lib_dir_path, dest_dir=lib_dir_path,
kinnit_if_needed=kinit_if_needed, kinnit_if_needed=kinit_if_needed,
hdfs_user=params.hdfs_user
) )
pass pass


Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -155,9 +155,11 @@
tez_lib_uris = default("/configurations/tez-site/tez.lib.uris", None) tez_lib_uris = default("/configurations/tez-site/tez.lib.uris", None)
tez_local_api_jars = '/usr/lib/tez/tez*.jar' tez_local_api_jars = '/usr/lib/tez/tez*.jar'
tez_local_lib_jars = '/usr/lib/tez/lib/*.jar' tez_local_lib_jars = '/usr/lib/tez/lib/*.jar'
tez_stub_path = '/tmp/tez_jars_copied'
tez_user = 'tez' tez_user = 'tez'


hive_exec_jar_path = '/usr/lib/hive/lib/hive-exec.jar'
hive_exec_hdfs_path = default('/configurations/hive-site/hive.jar.directory', '/apps/hive/install')

import functools import functools
#create partial functions with common arguments for every HdfsDirectory call #create partial functions with common arguments for every HdfsDirectory call
#to create hdfs directory we need to call params.HdfsDirectory in code #to create hdfs directory we need to call params.HdfsDirectory in code
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -88,19 +88,22 @@ def webhcat():
owner=params.webhcat_user, owner=params.webhcat_user,
mode=0755, mode=0755,
dest_dir=format("{webhcat_apps_dir}/hadoop-streaming.jar"), dest_dir=format("{webhcat_apps_dir}/hadoop-streaming.jar"),
kinnit_if_needed=kinit_if_needed kinnit_if_needed=kinit_if_needed,
hdfs_user=params.hdfs_user
) )


CopyFromLocal('/usr/share/HDP-webhcat/pig.tar.gz', CopyFromLocal('/usr/share/HDP-webhcat/pig.tar.gz',
owner=params.webhcat_user, owner=params.webhcat_user,
mode=0755, mode=0755,
dest_dir=format("{webhcat_apps_dir}/pig.tar.gz"), dest_dir=format("{webhcat_apps_dir}/pig.tar.gz"),
kinnit_if_needed=kinit_if_needed kinnit_if_needed=kinit_if_needed,
hdfs_user=params.hdfs_user
) )


CopyFromLocal('/usr/share/HDP-webhcat/hive.tar.gz', CopyFromLocal('/usr/share/HDP-webhcat/hive.tar.gz',
owner=params.webhcat_user, owner=params.webhcat_user,
mode=0755, mode=0755,
dest_dir=format("{webhcat_apps_dir}/hive.tar.gz"), dest_dir=format("{webhcat_apps_dir}/hive.tar.gz"),
kinnit_if_needed=kinit_if_needed kinnit_if_needed=kinit_if_needed,
hdfs_user=params.hdfs_user
) )
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -102,16 +102,38 @@ def test_start_default(self):
mode=0655, mode=0655,
owner='tez', owner='tez',
dest_dir='/apps/tez/', dest_dir='/apps/tez/',
kinnit_if_needed='' kinnit_if_needed='',
hdfs_user='hdfs'
) )


self.assertResourceCalled('CopyFromLocal', '/usr/lib/tez/lib/*.jar', self.assertResourceCalled('CopyFromLocal', '/usr/lib/tez/lib/*.jar',
mode=0655, mode=0655,
owner='tez', owner='tez',
dest_dir='/apps/tez/lib/', dest_dir='/apps/tez/lib/',
kinnit_if_needed='' kinnit_if_needed='',
hdfs_user='hdfs'
) )


self.assertResourceCalled('HdfsDirectory', '/apps/hive/install',
security_enabled = False,
mode = 0755,
owner = 'hive',
keytab = UnknownConfigurationMock(),
conf_dir = '/etc/hadoop/conf',
hdfs_user = 'hdfs',
kinit_path_local = '/usr/bin/kinit',
action = ['create']
)

self.assertResourceCalled('CopyFromLocal', '/usr/lib/hive/lib/hive-exec.jar',
mode=0655,
owner='hive',
dest_dir='/apps/hive/install',
kinnit_if_needed='',
hdfs_user='hdfs'
)


self.assertResourceCalled('Execute', 'env JAVA_HOME=/usr/jdk64/jdk1.7.0_45 /tmp/start_hiveserver2_script /var/log/hive/hive-server2.out /var/log/hive/hive-server2.log /var/run/hive/hive-server.pid /etc/hive/conf.server', self.assertResourceCalled('Execute', 'env JAVA_HOME=/usr/jdk64/jdk1.7.0_45 /tmp/start_hiveserver2_script /var/log/hive/hive-server2.out /var/log/hive/hive-server2.log /var/run/hive/hive-server.pid /etc/hive/conf.server',
not_if = 'ls /var/run/hive/hive-server.pid >/dev/null 2>&1 && ps `cat /var/run/hive/hive-server.pid` >/dev/null 2>&1', not_if = 'ls /var/run/hive/hive-server.pid >/dev/null 2>&1 && ps `cat /var/run/hive/hive-server.pid` >/dev/null 2>&1',
user = 'hive' user = 'hive'
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -158,19 +158,22 @@ def assert_configure_default(self):
owner='hcat', owner='hcat',
mode=0755, mode=0755,
dest_dir=format('/apps/webhcat/hadoop-streaming.jar'), dest_dir=format('/apps/webhcat/hadoop-streaming.jar'),
kinnit_if_needed='' kinnit_if_needed='',
hdfs_user='hdfs'
) )
self.assertResourceCalled('CopyFromLocal', '/usr/share/HDP-webhcat/pig.tar.gz', self.assertResourceCalled('CopyFromLocal', '/usr/share/HDP-webhcat/pig.tar.gz',
owner='hcat', owner='hcat',
mode=0755, mode=0755,
dest_dir=format('/apps/webhcat/pig.tar.gz'), dest_dir=format('/apps/webhcat/pig.tar.gz'),
kinnit_if_needed='' kinnit_if_needed='',
hdfs_user='hdfs'
) )
self.assertResourceCalled('CopyFromLocal', '/usr/share/HDP-webhcat/hive.tar.gz', self.assertResourceCalled('CopyFromLocal', '/usr/share/HDP-webhcat/hive.tar.gz',
owner='hcat', owner='hcat',
mode=0755, mode=0755,
dest_dir=format('/apps/webhcat/hive.tar.gz'), dest_dir=format('/apps/webhcat/hive.tar.gz'),
kinnit_if_needed='' kinnit_if_needed='',
hdfs_user='hdfs'
) )


def assert_configure_secured(self): def assert_configure_secured(self):
Expand Down Expand Up @@ -237,17 +240,20 @@ def assert_configure_secured(self):
owner='hcat', owner='hcat',
mode=0755, mode=0755,
dest_dir=format('/apps/webhcat/hadoop-streaming.jar'), dest_dir=format('/apps/webhcat/hadoop-streaming.jar'),
kinnit_if_needed='/usr/bin/kinit -kt /etc/security/keytabs/smokeuser.headless.keytab ambari-qa;' kinnit_if_needed='/usr/bin/kinit -kt /etc/security/keytabs/smokeuser.headless.keytab ambari-qa;',
hdfs_user='hdfs'
) )
self.assertResourceCalled('CopyFromLocal', '/usr/share/HDP-webhcat/pig.tar.gz', self.assertResourceCalled('CopyFromLocal', '/usr/share/HDP-webhcat/pig.tar.gz',
owner='hcat', owner='hcat',
mode=0755, mode=0755,
dest_dir=format('/apps/webhcat/pig.tar.gz'), dest_dir=format('/apps/webhcat/pig.tar.gz'),
kinnit_if_needed='/usr/bin/kinit -kt /etc/security/keytabs/smokeuser.headless.keytab ambari-qa;' kinnit_if_needed='/usr/bin/kinit -kt /etc/security/keytabs/smokeuser.headless.keytab ambari-qa;',
hdfs_user='hdfs'
) )
self.assertResourceCalled('CopyFromLocal', '/usr/share/HDP-webhcat/hive.tar.gz', self.assertResourceCalled('CopyFromLocal', '/usr/share/HDP-webhcat/hive.tar.gz',
owner='hcat', owner='hcat',
mode=0755, mode=0755,
dest_dir=format('/apps/webhcat/hive.tar.gz'), dest_dir=format('/apps/webhcat/hive.tar.gz'),
kinnit_if_needed='/usr/bin/kinit -kt /etc/security/keytabs/smokeuser.headless.keytab ambari-qa;' kinnit_if_needed='/usr/bin/kinit -kt /etc/security/keytabs/smokeuser.headless.keytab ambari-qa;',
hdfs_user='hdfs'
) )

0 comments on commit 9acbc10

Please sign in to comment.