-
Notifications
You must be signed in to change notification settings - Fork 10
/
hdfscluster.py
85 lines (71 loc) · 2.96 KB
/
hdfscluster.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
#!/usr/bin/python
# -*- coding: UTF-8 -*-
from typing import Dict
import paramiko
from paramiko.buffered_pipe import PipeTimeout
import time
import socket
class HDFSCluster():
"""This is a class that can operate on an HDFS cluster."""
_client = None
_client_host = None
def __init__(self, host: str, username: str = "emr-user", password: str = None):
"""Initialize the HDFS cluster by connecting to the host."""
self._client = paramiko.SSHClient()
self._client_host = host
self._client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self._client.connect(host, username=username, password=password)
def exec_command(self, command: str, host: str = None, timeout: int = 5) -> Dict:
if host == None or self._client_host == host:
stdin, stdout, stderr = self._client.exec_command(command)
else:
if host.find(":") != -1:
host = host.split(":")[0]
stdin, stdout, stderr = self._client.exec_command("ssh " + host + " '" + command + "'")
stdout.channel.settimeout(timeout)
retcode = 999
try:
output_stdout = stdout.read().decode('utf-8')
output_stderr = stderr.read().decode('utf-8')
retcode = stdout.channel.recv_exit_status()
except socket.timeout as e:
output_stdout = ""
output_stderr = ""
while True:
try:
output_stdout += stdout.readline() + "\n"
except socket.timeout as e:
break
while True:
try:
output_stderr += stderr.readline() + "\n"
except socket.timeout as e:
break
stdin = None
stdout = None
stderr = None
return {"stdout": output_stdout, "stderr": output_stderr, "exitStatus": retcode}
def get_namenodes(self) -> str:
"""Get the namenode list of the HDFS cluster."""
res = self.exec_command("hdfs haadmin -getAllServiceState")
return res['stdout']
def hdfs_touchz(self) -> str:
"""Create a test file in HDFS."""
res = self.exec_command("hdfs dfs -touchz test-file")
return res
#def hdfs_cat(self, path: str) -> str:
# """Read the file in HDFS and delete the test file."""
# res = self.exec_command("hdfs dfs -cat " + path)
# self.exec_command("hdfs dfs -rm " + path)
# return res['stdout']
def namenode_log(self, host: str) -> str:
"""get one HDFS cluster namenode's log."""
res = self.exec_command("cd /mnt/disk1/log/hadoop-hdfs && tail -n 30 hadoop-hdfs-namenode-*.log", host)
return res['stdout'][-2000:]
def get_local_disk_free(self, host: str):
res = self.exec_command("df", host)
return res['stdout']
if __name__ == "__main__":
class_instance = HDFSCluster("47.93.25.211")
resp = class_instance.hdfs_touchz()
print(resp)