Skip to content
This repository has been archived by the owner on Jul 29, 2020. It is now read-only.

Commit

Permalink
progress on scheduler, works primitively, needs a lot of error checki…
Browse files Browse the repository at this point in the history
…ng to be put in place
  • Loading branch information
cglewis committed Sep 16, 2013
1 parent 9c2ba96 commit 7729467
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 13 deletions.
2 changes: 2 additions & 0 deletions hemlock/hemlock.py
Original file line number Diff line number Diff line change
Expand Up @@ -1620,6 +1620,7 @@ def process_action(self, debug, action, var_d, m_server, es):
data_action = "INSERT INTO users_tenants(user_id, tenant_id) VALUES(\""+var_d['--uuid']+"\", \""+var_d['--tenant_id']+"\")"
self.log.debug(debug, "Getting ready to perform the following SQL query: "+data_action)
elif "schedule" in action_a:
# !! TODO ensure that the same client is not added to the same schedule more than once
# client_add_schedule
if action_a[0] == "client":
data_action = "INSERT INTO schedules_clients(client_id, schedule_id) VALUES(\""+var_d['--uuid']+"\", \""+var_d['--schedule_id']+"\")"
Expand Down Expand Up @@ -1901,6 +1902,7 @@ def process_action(self, debug, action, var_d, m_server, es):
hemlock_base.update_hemlock(debug, system_uuid, server_dict)
# write
elif "schedule" in action_a:
# !! TODO ensure that the same client is not added to the same schedule more than once
# create a schedule that is associated with a client
data_action = "INSERT INTO schedules("
data_action2 = "INSERT INTO schedules_"+action_a[0]+"s("
Expand Down
57 changes: 44 additions & 13 deletions hemlock/hemlock_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import logging
import MySQLdb as mdb
import os
import signal
import sys

Expand Down Expand Up @@ -107,40 +108,70 @@ def check_schedules(self):
cur.execute("SELECT * FROM schedules")
results = cur.fetchall()
self.log.debug(self.debug, str(results))
m_server.commit()
m_server.close()

# !! TODO
# remove schedules that are not stored
jobs = self.sched.get_jobs()
for job in jobs:
test_log2 = open('scheduler.log', 'a')
test_log2.write(str(job)+"\n")
test_log2.write(str(job.name)+"\n")
test_log2.write("blah blah\n")
test_log2.write("foo "+str(job)+"\n")
test_log2.write("bar "+str(job.name)+"\n")
test_log2.close()

# read schedules that are stored
for schedule in results:
self.schedule_job_cron(self.job_work, str(schedule[1]), str(schedule[1]), str(schedule[3]), str(schedule[4]), str(schedule[5]), str(schedule[6]), str(schedule[7]))
self.schedule_job_cron(self.job_work, server_dict, str(schedule[1]), str(schedule[3]), str(schedule[4]), str(schedule[5]), str(schedule[6]), str(schedule[7]))

# !! TODO
# query to get everything in schedules
# updates schedules

def job_work(self, args):
def job_work(self, server_dict, name):
"""
Do the actual work that was scheduled at the scheduled tiem.
:param args: job arguments
:param server_dict: dictionary of server credentials
:param name: uuid of the client
"""
# DEBUG
# do actual work here
# !! TODO
# if streaming is already running and requested again, ignore
# if the job requested, regardless, is still running, skip this run, and log it
test_log2 = open('scheduler.log', 'a')
test_log2.write("test: ")
test_log2.write(str(args))
test_log2.write("\n")
test_log2.close()

# connect to the mysql server
try:
m_server = mdb.connect(server_dict['HEMLOCK_MYSQL_SERVER'],
server_dict['HEMLOCK_MYSQL_USERNAME'],
server_dict['HEMLOCK_MYSQL_PW'],
"hemlock")

self.log.debug(self.debug, "MySQL Handle: "+str(m_server))
except:
self.log.debug(self.debug, sys.exc_info()[0])
print "MySQL server failure"
sys.exit(0)

cur = m_server.cursor()
self.log.debug(self.debug, "MySQL Cursor: "+str(cur))

cur.execute("SELECT * FROM schedules_clients WHERE schedule_id = '"+name+"'")
results = cur.fetchall()
self.log.debug(self.debug, str(results))
m_server.commit()
m_server.close()

try:
for cred in server_dict:
os.environ[cred] = server_dict[cred]
except:
print "Unable to source hemmlock server credentials"

cmd = "hemlock client-run --uuid "+results[0][1]
result = os.system(cmd)

def init_schedule(self):
"""
Expand Down Expand Up @@ -168,12 +199,12 @@ def schedule_job(self, function, periodicity, start_time):
# DEBUG
self.sched.add_interval_job(function, seconds=periodicity, start_date=start_time)

def schedule_job_cron(self, function, args, name, minute, hour, day_of_month, month, day_of_week):
def schedule_job_cron(self, function, server_dict, name, minute, hour, day_of_month, month, day_of_week):
"""
Schedule a new cron job.
:param function: function to be called that does the work
:param args: arguments to pass to the function that does the work
:param server_dict: dictionary of server credentials
:param name: name of the job
:param minute: cron minute to run the job
:param hour: cron hour to run the job
Expand All @@ -182,7 +213,7 @@ def schedule_job_cron(self, function, args, name, minute, hour, day_of_month, mo
:param day_of_week: cron day_of_week to run the job
"""
# DEBUG
self.sched.add_cron_job(function, args=['foo'], name=name, minute=minute, hour=hour, day=day_of_month, month=month, day_of_week=day_of_week)
self.sched.add_cron_job(function, args=[server_dict, name], name=name, minute=minute, hour=hour, day=day_of_month, month=month, day_of_week=day_of_week)

if __name__ == "__main__":
hemlock_scheduler = Hemlock_Scheduler()
Expand Down

0 comments on commit 7729467

Please sign in to comment.