From 887fd1307f3a7ee27387edfdd051df458271169d Mon Sep 17 00:00:00 2001 From: Sean Myers Date: Thu, 5 Jul 2012 18:07:35 -0700 Subject: [PATCH] Fixed little graph bug that would classify slaves and masters together --- Makefile | 4 ++-- emrio/EMRio.py | 12 ++++++++---- emrio/ec2_cost.py | 2 +- emrio/job_handler.py | 6 ++++-- emrio/optimizer.py | 28 +++++++++++++++++++++++---- emrio/simulate_jobs.py | 43 ++++++++++++++---------------------------- tests/test_optimize.py | 39 +++++++++++++++++++++++++++++--------- 7 files changed, 83 insertions(+), 51 deletions(-) diff --git a/Makefile b/Makefile index 8526f3f..1d24dc0 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -PYTHON=PYTHONPATH="$(shell pwd)" unit2 discover +UNIT2=PYTHONPATH="$(shell pwd)" unit2 test: - $(PYTHON) -v -s tests -t . + $(UNIT2) discover -v -s tests -t . diff --git a/emrio/EMRio.py b/emrio/EMRio.py index ee6d423..7549d38 100644 --- a/emrio/EMRio.py +++ b/emrio/EMRio.py @@ -30,14 +30,15 @@ def main(args): option_parser = make_option_parser() options, args = option_parser.parse_args(args) - logging.basicConfig(level=logging.INFO) if options.verbose: logging.basicConfig(level=logging.DEBUG) - logging.info('Getting job flows...') + else: + logging.basicConfig(level=logging.INFO) + logging.info('Getting job flows from Amazon...') if options.dump: + logging.info("Dumping job flow history into %s", options.dump) write_job_flow_history(options.dump) return - logging.disable(True) job_flows = get_job_flows(options) logging.info('Finding optimal instance pool (this may take a minute or two)') @@ -46,7 +47,7 @@ def main(args): pool) output_statistics(optimal_logged_hours, pool, demand_logged_hours) - logging.debug('Making graphs...') + logging.info('Making graphs...') if options.graph == 'instance_usage': instance_usage_graph(job_flows, pool) elif options.graph == 'total_usage': @@ -247,6 +248,8 @@ def get_owned_reserved_instances(): } } """ + logging.disable(logging.INFO) + logging.disable(logging.DEBUG) ec2_conn = boto.connect_ec2() boto_reserved_instances = ec2_conn.get_all_reserved_instances() ec2_conn.close() @@ -256,6 +259,7 @@ def get_owned_reserved_instances(): instance_type = reserved_instance.instance_type purchased_reserved_instances[utilization_class][instance_type] += ( reserved_instance.instance_count) + logging.disable(logging.NOTSET) return purchased_reserved_instances diff --git a/emrio/ec2_cost.py b/emrio/ec2_cost.py index 52fbd35..5c8182b 100644 --- a/emrio/ec2_cost.py +++ b/emrio/ec2_cost.py @@ -105,7 +105,7 @@ def init_empty_all_instance_types(self): """ empty_logged_hours = {} for utilization_class in self.ALL_UTILIZATION_PRIORITIES: - empty_logged_hours[utilization_class] = {} + empty_logged_hours[utilization_class] = defaultdict(int) return empty_logged_hours def init_reserve_counts(self, pool, instance_name): diff --git a/emrio/job_handler.py b/emrio/job_handler.py index 4b55d10..4455f30 100644 --- a/emrio/job_handler.py +++ b/emrio/job_handler.py @@ -213,14 +213,16 @@ def describe_all_job_flows(emr_conn, states=None, jobflow_ids=None, while True: if created_before and created_after and created_before < created_after: break - logging.disabled = True + logging.disable(logging.DEBUG) + logging.disable(logging.ERROR) + logging.disable(logging.INFO) try: results = emr_conn.describe_jobflows( states=states, jobflow_ids=jobflow_ids, created_after=created_after, created_before=created_before) except boto.exception.BotoServerError, ex: if 'ValidationError' in ex.body: - logging.disabled = False + logging.disable(logging.NOTSET) break else: raise diff --git a/emrio/optimizer.py b/emrio/optimizer.py index 5526b6d..1b0fac4 100644 --- a/emrio/optimizer.py +++ b/emrio/optimizer.py @@ -1,11 +1,13 @@ """The optimizer module holds all the functions relating to creating the best instance pool that yields the least cost over an interval of time. """ +import copy import logging from math import ceil from simulate_jobs import Simulator + class Optimizer(object): def __init__(self, job_flows, EC2, job_flows_interval=None): self.EC2 = EC2 @@ -20,7 +22,7 @@ def run(self, pre_existing_pool=None): """Take all the max_instance counts, then use that to hill climb to find the most cost efficient instance cost - Returns: + Returns: optimal_pool: dict of the best pool of instances to be used. """ if pre_existing_pool is None: @@ -53,23 +55,26 @@ def optimize_reserve_pool(self, instance_type, pool): convert_to_yearly_estimated_hours(logged_hours, self.job_flows_interval) current_min_cost, _ = self.EC2.calculate_cost(logged_hours, pool) current_cost = current_min_cost + delta_reserved_hours = ( + self.delta_reserved_instance_hours_generator(instance_type, pool)) while previous_cost >= current_cost: current_simulation_costs = self.EC2.init_reserve_costs(float('inf')) - # Add a single instance to each utilization type, and record the costs. # Choose the minimum cost utilization type. + logging.debug("Simulation hours added %d", delta_reserved_hours.next()) for utilization_class in pool: # Reset the min instances to the best values. for current_util in pool: pool[current_util][instance_type] = current_min_instances[current_util] + pool[utilization_class][instance_type] = ( current_min_instances[utilization_class] + 1) logged_hours = simulator.run() + convert_to_yearly_estimated_hours(logged_hours, self.job_flows_interval) cost, _ = self.EC2.calculate_cost(logged_hours, pool) current_simulation_costs[utilization_class] = cost - previous_cost = current_cost current_cost = min(current_simulation_costs.values()) min_util_level = None @@ -94,6 +99,22 @@ def optimize_reserve_pool(self, instance_type, pool): pool[utilization_class][instance_type] = ( current_min_instances[utilization_class]) + def delta_reserved_instance_hours_generator(self, instance_type, pool): + + starter_pool = copy.deepcopy(pool) + assert(len(self.EC2.RESERVE_PRIORITIES) > 0) + highest_util = self.EC2.RESERVE_PRIORITIES[0] + iterative_simulator = Simulator(self.job_flows, starter_pool) + previous_logged_hours = iterative_simulator.run() + previous_hours = previous_logged_hours[highest_util][instance_type] + + while True: + starter_pool[highest_util][instance_type] += 1 + current_logged_hours = iterative_simulator.run() + current_hours = current_logged_hours[highest_util][instance_type] + yield (current_hours - previous_hours) + previous_hours = current_hours + def convert_to_yearly_estimated_hours(logged_hours, interval): """Takes a min and max time and will convert to the amount of hours estimated @@ -123,4 +144,3 @@ def convert_to_yearly_estimated_hours(logged_hours, interval): for machine in logged_hours[utilization_class]: logged_hours[utilization_class][machine] = ( ceil(logged_hours[utilization_class][machine] * conversion_rate)) - diff --git a/emrio/simulate_jobs.py b/emrio/simulate_jobs.py index c4730a9..99993c3 100644 --- a/emrio/simulate_jobs.py +++ b/emrio/simulate_jobs.py @@ -27,8 +27,8 @@ JOB_ID is the id for the job running. The rest is setup like logs and pool. It is used to keep track of what the job is currently using in instances. """ - import datetime +from collections import defaultdict from config import EC2 from heapq import heapify, heappop @@ -77,7 +77,6 @@ def run(self): pool_used = EC2.init_empty_all_instance_types() jobs_running = {} - # Start simulating events. for time, event_type, job in [heappop(job_event_timeline) for i in range(len(job_event_timeline))]: @@ -100,7 +99,6 @@ def run(self): elif event_type is END: self.log_hours(logged_hours, jobs_running, job_id) self.remove_job(jobs_running, pool_used, job) - self.notify_observers(time, event_type, job, logged_hours, pool_used) return logged_hours @@ -145,7 +143,7 @@ def setup_job_event_timeline(self): def attach_log_hours_observer(self, observer): self.log_observers.append(observer) - + def attach_pool_use_observer(self, observer): self.use_pool_observers.append(observer) @@ -176,8 +174,8 @@ def log_hours(self, logged_hours, jobs, job_id): job_id -- type: String use: To index into jobs to get the amount of instances it is using. """ - for utilization_class in jobs.get(job_id, None): - for instance_type in jobs[job_id].get(utilization_class, None): + for utilization_class in jobs.get(job_id, {}): + for instance_type in jobs[job_id].get(utilization_class, {}): logged_hours[utilization_class][instance_type] = ( logged_hours[utilization_class].get(instance_type, 0) + jobs[job_id][utilization_class][instance_type]) @@ -220,8 +218,8 @@ def allocate_job(self, jobs, pool_used, job): # Record job data for use in logging later. jobs[job_id][utilization_class] = jobs[job_id].get( - utilization_class, {}) - jobs[job_id][utilization_class][instance_type] = instances_utilized + utilization_class, defaultdict(int)) + jobs[job_id][utilization_class][instance_type] += instances_utilized if instances_needed == 0: break @@ -244,13 +242,13 @@ def remove_job(self, jobs, pool_used, job): job_id = job.get('jobflowid') # Remove all the pool used by the instance then delete the job. - for utilization_class in jobs.get(job_id, None).keys(): + for utilization_class in jobs.get(job_id, {}).keys(): - for instance_type in jobs[job_id].get(utilization_class, None).keys(): + for instance_type in jobs[job_id].get(utilization_class, {}).keys(): pool_used[utilization_class][instance_type] = ( pool_used[utilization_class].get(instance_type, 0) - jobs[job_id][utilization_class][instance_type]) - if pool_used[utilization_class][instance_type] is 0: + if pool_used[utilization_class][instance_type] == 0: del pool_used[utilization_class][instance_type] del jobs[job_id] @@ -272,19 +270,7 @@ def rearrange_instances(self, jobs, pool_used, job): pool_used: May rearrange the instance usage if there is a better combination jobs: rearranges job instances. """ - job_id = job.get('jobflowid') - - # Remove the jobs currently used instances then allocate the job over again. - for utilization_class in jobs.get(job_id, None).keys(): - - for instance_type in jobs[job_id].get(utilization_class, None).keys(): - pool_used[utilization_class][instance_type] = ( - pool_used[utilization_class].get(instance_type, 0) - - jobs[job_id][utilization_class][instance_type] - ) - if pool_used[utilization_class][instance_type] is 0: - del pool_used[utilization_class][instance_type] - + self.remove_job(jobs, pool_used, job) self.allocate_job(jobs, pool_used, job) def _calculate_space_left(self, amt_used, utilization_class, instance_type): @@ -298,9 +284,10 @@ def _calculate_space_left(self, amt_used, utilization_class, instance_type): else: return float('inf') + class SimulationObserver(object): """Used to record information during each step of the simulation. - + You can attach a SimulationObserver to a Simulator if you want information about each event. For example, the graph module uses the SimulationObserver to record the instances used during each event of the simulator so that it can @@ -309,10 +296,10 @@ class SimulationObserver(object): def __init__(self, hour_graph, recorder): self.hour_graph = hour_graph self.recorder = recorder - + def update(self, time, node_type, job, data): """Records data usage for each time node in the priority queue. The logger - is called twice in a single event. So this records the state of the + is called twice in a single event. So this records the state of the simulator before and after some event Args: @@ -343,5 +330,3 @@ def update(self, time, node_type, job, data): self.recorder[utilization_class][instance_type] = [] total += data[utilization_class].get(instance_type, 0) self.recorder[utilization_class][instance_type].append(total) - - diff --git a/tests/test_optimize.py b/tests/test_optimize.py index bd9895d..1830c9c 100644 --- a/tests/test_optimize.py +++ b/tests/test_optimize.py @@ -29,8 +29,21 @@ DEFAULT_LOG[MEDIUM_UTIL][INSTANCE_NAME] = 100 -def create_parallel_jobs(amount, start_time=BASETIME, - end_time=(BASETIME + HEAVY_INTERVAL), start_count=0): +def create_parallel_jobs(amount, + start_time=BASETIME, + end_time=(BASETIME + HEAVY_INTERVAL), + start_count=0): + """Creates pseudo-jobs that will run in parallel with each other. + + Args: + amount: Amount of jobs to run in parallel. + + start_time: a datetime all the jobs start at. + + end_time: a datetime all the jobs end at. + + start_count: job id's + """ jobs = [] for i in range(amount): test_job = create_test_job(INSTANCE_NAME, BASE_INSTANCES, @@ -39,10 +52,20 @@ def create_parallel_jobs(amount, start_time=BASETIME, return jobs -def create_test_job(instance_name, count, j_id, start_time=BASETIME, - end_time=(BASETIME + HEAVY_INTERVAL)): - job1 = {'instancegroups': create_test_instancegroup(instance_name, count), - 'jobflowid': j_id, 'startdatetime': start_time, 'enddatetime': end_time} +def create_test_job(instance_name, + instance_count, + j_id, + start_time=BASETIME, + end_time=(BASETIME + HEAVY_INTERVAL)): + """Creates a simple job dict object + + """ + job1 = { + 'instancegroups': create_test_instancegroup(instance_name, instance_count), + 'jobflowid': j_id, + 'startdatetime': start_time, + 'enddatetime': end_time} + return job1 @@ -96,8 +119,6 @@ def test_downgraded_pool(self): self.assertEquals(optimized[HEAVY_UTIL], heavy_util) self.assertEquals(optimized[MEDIUM_UTIL], medium_util) - - def test_heavy_util(self): """This should just create a set of JOB_AMOUNT and then all will be assigned to heavy_util since default interval for jobs @@ -211,7 +232,7 @@ def test_interval_converter_two_months(self): def test_interval_converter_two_years(self): """Since we only want logs for one year, this should - conver the hours to half the original hours.""" + convert the hours to half the original hours.""" logs = copy.deepcopy(DEFAULT_LOG) logs_after = copy.deepcopy(DEFAULT_LOG) logs_after[MEDIUM_UTIL][INSTANCE_NAME] = ceil(