Skip to content

Commit

Permalink
Merge pull request #12 from jaredbischof/master
Browse files Browse the repository at this point in the history
Mostly moving arg parsing around and adding to the base subsystem class.
  • Loading branch information
jaredbischof committed Mar 21, 2013
2 parents f958a12 + 2e198bc commit 8462967
Show file tree
Hide file tree
Showing 7 changed files with 144 additions and 171 deletions.
93 changes: 21 additions & 72 deletions MCP
@@ -1,13 +1,12 @@
#!/soft/packages/python/2.6/bin/python

import argparse
import os
import inspect
import json
import sys, shlex, socket
import getpass, commands, subprocess
import sys
import re

# Getting paths
# Getting path to MCP
MCP_path = os.path.realpath(__file__)
matchObj = re.match(r'(^.*\/)', MCP_path)
MCP_dir = matchObj.group(0)
Expand All @@ -19,76 +18,26 @@ sys.path.insert(0, MCP_dir+"mcp_modules/")
json_conf_file = open(MCP_dir+'conf/conf.json')
json_conf = json.load(json_conf_file)

__doc__ = """
MG-RAST Control Program (MCP)
SERVICE \tUSER@HOST\t\t\tACTIONS
"""

service_name_to_module = {}
service_action_to_req_user = {}
for module_name in json_conf["global"]["services"]:
module_name = module_name.strip()
module = __import__(module_name)
service_name_to_module[module_name] = module
for name, obj in inspect.getmembers(module):
if inspect.isclass(obj) and name == module_name and hasattr(obj, 'actions'):
userhost = ""
if 'user' in json_conf[module_name] and 'host' in json_conf[module_name]:
userhost = json_conf[module_name]['user'] + '@' + json_conf[module_name]['host']
for action in obj.actions.split(','):
service_action_to_req_user[name + " " + action.strip()] = userhost

if userhost == '':
userhost = 'ANY '

__doc__ += name + " \t" + userhost + " \t" + obj.actions + "\n"

def main(args):
usage = "\nUsage: MCP service action\n" + __doc__

# Checking number of inputs.
if len(sys.argv) < 3:
sys.stderr.write(usage)
sys.stderr.write("\n")
else:
service = sys.argv[1]
action = sys.argv[2]

# Checking that user has entered a valid service-action pair.
service_action = service + " " + action
if service_action not in service_action_to_req_user:
sys.stderr.write("\nERROR: service-action '" + service_action + "' not available\n" + usage)
sys.stderr.write("\n")
return 0

# Checking that the command is being called by the correct user@hostname. Otherwise, pass
# the command onto the appropriate machine as the appropriate user via ssh. For the time
# being we're just going to call 'sudo -s', but this will change later.
user = getpass.getuser()
host = socket.gethostname()
if service_action_to_req_user[service_action] != "" and (user + "@" + host) != service_action_to_req_user[service_action]:
print "Handing off command to run as: " + service_action_to_req_user[service_action]
array = commands.getstatusoutput("sudo -s ssh " + service_action_to_req_user[service_action] + " " + MCP_path + " " + " ".join(map(str,sys.argv[1:])))
print array[1]
return 0

# Retrieve the appropriate "service" python class
myclass = getattr(service_name_to_module[service], service)
def main():
parser = argparse.ArgumentParser(description='MG-RAST Control Program (MCP)')
parser.add_argument('subsystem', metavar='subsystem',
help="An MG-RAST subsystem. The current list of subsystems includes: '" +
"', '".join(json_conf["global"]["subsystems"])+"'")
parser.add_argument('params', metavar='params', nargs=argparse.REMAINDER,
help='Additional arguments should specify an action followed by action parameters.')
args = parser.parse_args()
subsystem = args.subsystem
params = args.params

if subsystem in json_conf["global"]["subsystems"]:
# Retrieve the appropriate "subsystem" python module and class (module and class names should be identical)
module = __import__(subsystem)
myclass = getattr(module, subsystem)
# Create the python class instance dynamically
myservice = myclass(MCP_dir)

# Retrieve the appropriate "action" python method
myaction = getattr(myservice, action)
# Run the python method
if(len(sys.argv) == 3):
myaction()
elif(len(sys.argv) == 4):
myaction(sys.argv[3])
elif(len(sys.argv) == 5):
myaction(sys.argv[3], sys.argv[4])
mysubsystem = myclass(MCP_path)
mysubsystem.run(params)

return 0

if __name__ == "__main__":
sys.exit( main(sys.argv) )
sys.exit( main() )
35 changes: 17 additions & 18 deletions mcp_modules/mcp_api.py
@@ -1,34 +1,30 @@
import glob, json, os, sys, time
import glob, json, os, sys
from subsystem import subsystem

class mcp_api(subsystem):
actions = "start, stop, restart"
actions = [ 'start', 'stop', 'restart', 'log' ]

def __init__(self, MCP_dir):
subsystem.__init__(self, MCP_dir)
self.state = { 'resource':self.__class__.__name__,
'updated':time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
'url':self.json_conf['global']['apiurl'] + "/" + str(self.json_conf['mcp_api']['version']) + "/" + self.__class__.__name__,
'status': { 'site' : 'online' }
}
self.services = self.json_conf['global']['services']
def __init__(self, MCP_path):
subsystem.__init__(self, MCP_path)
self.state['status'] = { 'site' : 'online' }

def start(self):
files = glob.glob(self.apidir+"/*")
if len(files) != 0:
sys.stderr.write("ERROR: Cannot initialize mcp_api because API directory (" + self.apidir + ") is not empty\n")
return 0

for service in self.services:
service = service.strip();
module = __import__(service)
myclass = getattr(module, service)
myservice = myclass(self.MCP_dir)

jstate = json.dumps(myservice.get_state())
f = open(self.apidir + "/" + service, 'w')
for subsystem in self.json_conf['global']['subsystems']:
subsystem = subsystem.strip();
module = __import__(subsystem)
myclass = getattr(module, subsystem)
mysubsystem = myclass(self.MCP_dir)
jstate = json.dumps(mysubsystem.state)
f = open(self.apidir + "/" + subsystem, 'w')
f.write(jstate)

return 1

def stop(self):
for file_object in os.listdir(self.apidir):
file_object_path = os.path.join(self.apidir, file_object)
Expand All @@ -37,6 +33,9 @@ def stop(self):
else:
shutil.rmtree(file_object_path)

return 1

def restart(self):
self.stop()
self.start()
return 1
15 changes: 5 additions & 10 deletions mcp_modules/memcache.py
@@ -1,19 +1,14 @@
import time
from subsystem import subsystem

class memcache(subsystem):
actions = "clear"
actions = [ 'clear', 'log' ]

def __init__(self, MCP_dir):
subsystem.__init__(self, MCP_dir)
self.state = { 'resource':self.__class__.__name__,
'url':self.json_conf['global']['apiurl'] + "/" + str(self.json_conf['mcp_api']['version']) + "/" + self.__class__.__name__,
'updated':time.strftime("%Y-%m-%d %H:%M:%S")
}
def __init__(self, MCP_path):
subsystem.__init__(self, MCP_path)
self.memhost = self.json_conf['memcache']['memhost']

def clear(self):
print "Clearing memcache:"
sout, serr = self.run_cmd(self.MCP_dir + "bin/clear_memcache.pl " + self.memhost)
sout = self.run_cmd(self.MCP_dir + "bin/clear_memcache.pl " + self.memhost)
print "memcache cleared!"
return 0
return 1
33 changes: 4 additions & 29 deletions mcp_modules/mlog.py
@@ -1,33 +1,8 @@
import json, sys, time
import json, sys
from subsystem import subsystem

class mlog(subsystem):
actions = "set"
actions = [ 'log' ]

def __init__(self, MCP_dir):
subsystem.__init__(self, MCP_dir)
self.state = { 'resource':self.__class__.__name__,
'updated':time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
'url':self.json_conf['global']['apiurl'] + "/" + str(self.json_conf['mcp_api']['version']) + "/" + self.__class__.__name__,
'log_levels':{}
}

for component in self.json_conf['mlog']['log_levels']:
self.state['log_levels'][component] = self.json_conf['mlog']['log_levels'][component]

def set(self, component, level):
if component not in self.json_conf['mlog']['log_levels']:
sys.stderr.write("ERROR: '" + component + "' is not a valid logging component.\n")
return 0
else:
try:
int(level)
except ValueError:
sys.stderr.write("ERROR: '" + level + "' is not an integer value.\n")
return 0
self.state['log_levels'][component] = int(level)
self.state['updated'] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
jstate = json.dumps(self.get_state())
f = open(self.apidir + "/" + self.__class__.__name__, 'w')
f.write(jstate)
return 1
def __init__(self, MCP_path):
subsystem.__init__(self, MCP_path)
27 changes: 10 additions & 17 deletions mcp_modules/queue.py
@@ -1,37 +1,30 @@
import time
from subsystem import subsystem

class queue(subsystem):
actions = "start, stop"
actions = [ 'start', 'stop', 'log' ]

def __init__(self, MCP_dir):
subsystem.__init__(self, MCP_dir)
def __init__(self, MCP_path):
subsystem.__init__(self, MCP_path)

# getting status of queues
sout, serr = self.run_cmd("/usr/local/bin/qstat -Q batch")
sout = self.run_cmd("/usr/local/bin/qstat -Q batch")
lines = sout.splitlines()
batch_status = 'online' if lines[len(lines)-1].split()[3] == 'yes' else 'offline'

sout, serr = self.run_cmd("/usr/local/bin/qstat -Q fast")
sout = self.run_cmd("/usr/local/bin/qstat -Q fast")
lines = sout.splitlines()
fast_status = 'online' if lines[len(lines)-1].split()[3] == 'yes' else 'offline'

self.state = { 'resource':self.__class__.__name__,
'updated':time.strftime("%Y-%m-%d %H:%M:%S"),
'url':self.json_conf['global']['apiurl'] + "/" + str(self.json_conf['mcp_api']['version']) + "/" + self.__class__.__name__,
'status': { 'batch': batch_status,
'fast': fast_status
}
}
self.state['status'] = { 'batch': batch_status, 'fast': fast_status }

def start(self):
print "Starting nagasaki pipeline:"
sout, serr = self.run_cmd("/usr/local/bin/qstart batch")
sout = self.run_cmd("/usr/local/bin/qstart batch")
print "nagasaki pipeline started!"
return 0
return 1

def stop(self):
print "Stopping nagasaki pipeline:"
sout, serr = self.run_cmd("/usr/local/bin/qstop batch")
sout = self.run_cmd("/usr/local/bin/qstop batch")
print "nagasaki pipeline stopped!"
return 0
return 1
86 changes: 76 additions & 10 deletions mcp_modules/subsystem.py
@@ -1,22 +1,88 @@
import argparse
import commands
import json
import shlex, subprocess
import getpass, re, socket
import shlex, shutil, subprocess
import sys, time

class subsystem (object):
def __init__(self, MCP_dir):
self.MCP_dir = MCP_dir
self.state = {}
def __init__(self, MCP_path):
self.MCP_path = MCP_path
matchObj = re.match(r'(^.*\/)', self.MCP_path)
self.MCP_dir = matchObj.group(0)

self.subsystem = self.__class__.__name__
json_conf_file = open(self.MCP_dir+'conf/conf.json')
self.json_conf = json.load(json_conf_file)
self.apidir = self.json_conf['mcp_api']['dir'] + "/" + str(self.json_conf['mcp_api']['version'])
self.log_level_max = self.json_conf['global']['log_level_max']
self.log_level_min = self.json_conf['global']['log_level_min']

self.req_host = ""
if 'req_host' in self.json_conf[self.subsystem]:
self.req_host = self.json_conf[self.subsystem]['req_host']

self.req_user = ""
if 'req_user' in self.json_conf[self.subsystem]:
self.req_user = self.json_conf[self.subsystem]['req_user']

self.state = { 'log_level':self.json_conf['global']['default_log_level'],
'subsystem':self.subsystem,
'updated':time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
'url':self.json_conf['global']['apiurl'] + "/" + str(self.json_conf['mcp_api']['version']) + "/" + self.subsystem
}

def run(self, params):
parser = argparse.ArgumentParser(prog='MCP ' + self.subsystem)
parser.add_argument('action', metavar='action',
help="An action for the subsystem '" + self.subsystem + "' to perform. Available actions include: '" + "', '".join(self.actions) + "'")
parser.add_argument('params', metavar='params', nargs=argparse.REMAINDER,
help="Additional arguments should specify an action followed by action parameters if required.")
args = parser.parse_args(params)
action = args.action
params = args.params

user = getpass.getuser()
host = socket.gethostname()
req_login = self.req_user + "@" + self.req_host
if self.req_host != "" and self.req_user != "" and (user + "@" + host) != req_login:
print "Handing off command to run as: " + req_login
sout = self.run_cmd("sudo -s ssh " + " ".join([req_login, self.MCP_path, self.subsystem, action]) + " " + " ".join(params))
return 1

myaction = getattr(self, action)
if params:
myaction(params)
else:
myaction()

def log(self, level):
try:
level = int(level)
except ValueError:
sys.stderr.write("ERROR: '" + level + "' is not an integer value.\n")
return 0

if level < self.log_level_min or level > self.log_level_max:
sys.stderr.write("ERROR: '" + str(level) + "' is not a valid logging level.\n")
return 0

self.state['log_level'] = level
self.state['updated'] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
jstate = json.dumps(self.state)
fname = self.apidir + "/" + self.__class__.__name__
f = open(fname + ".tmp", 'w')
f.write(jstate)
shutil.move(fname + ".tmp", fname)

def get_state(self):
return self.state
return 1

def run_cmd(self, cmd_str):
cmd = shlex.split(str(cmd_str))
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = proc.communicate()
if proc.returncode != 0:
raise IOError("%s\n%s"%(" ".join(cmd), stderr))
sout, serr = proc.communicate()
if proc.returncode != 0 or serr != "":
raise IOError("%s\n%s"%(" ".join(cmd), serr))
return 0

return stdout, stderr
return sout

0 comments on commit 8462967

Please sign in to comment.