Skip to content

Commit

Permalink
Fixed little graph bug that would classify slaves and masters together
Browse files Browse the repository at this point in the history
  • Loading branch information
Sean Myers committed Jul 6, 2012
1 parent f34c2d3 commit 887fd13
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 51 deletions.
4 changes: 2 additions & 2 deletions Makefile
@@ -1,4 +1,4 @@
PYTHON=PYTHONPATH="$(shell pwd)" unit2 discover
UNIT2=PYTHONPATH="$(shell pwd)" unit2

test:
$(PYTHON) -v -s tests -t .
$(UNIT2) discover -v -s tests -t .
12 changes: 8 additions & 4 deletions emrio/EMRio.py
Expand Up @@ -30,14 +30,15 @@
def main(args):
option_parser = make_option_parser()
options, args = option_parser.parse_args(args)
logging.basicConfig(level=logging.INFO)
if options.verbose:
logging.basicConfig(level=logging.DEBUG)
logging.info('Getting job flows...')
else:
logging.basicConfig(level=logging.INFO)
logging.info('Getting job flows from Amazon...')
if options.dump:
logging.info("Dumping job flow history into %s", options.dump)
write_job_flow_history(options.dump)
return
logging.disable(True)
job_flows = get_job_flows(options)

logging.info('Finding optimal instance pool (this may take a minute or two)')
Expand All @@ -46,7 +47,7 @@ def main(args):
pool)
output_statistics(optimal_logged_hours, pool, demand_logged_hours)

logging.debug('Making graphs...')
logging.info('Making graphs...')
if options.graph == 'instance_usage':
instance_usage_graph(job_flows, pool)
elif options.graph == 'total_usage':
Expand Down Expand Up @@ -247,6 +248,8 @@ def get_owned_reserved_instances():
}
}
"""
logging.disable(logging.INFO)
logging.disable(logging.DEBUG)
ec2_conn = boto.connect_ec2()
boto_reserved_instances = ec2_conn.get_all_reserved_instances()
ec2_conn.close()
Expand All @@ -256,6 +259,7 @@ def get_owned_reserved_instances():
instance_type = reserved_instance.instance_type
purchased_reserved_instances[utilization_class][instance_type] += (
reserved_instance.instance_count)
logging.disable(logging.NOTSET)
return purchased_reserved_instances


Expand Down
2 changes: 1 addition & 1 deletion emrio/ec2_cost.py
Expand Up @@ -105,7 +105,7 @@ def init_empty_all_instance_types(self):
"""
empty_logged_hours = {}
for utilization_class in self.ALL_UTILIZATION_PRIORITIES:
empty_logged_hours[utilization_class] = {}
empty_logged_hours[utilization_class] = defaultdict(int)
return empty_logged_hours

def init_reserve_counts(self, pool, instance_name):
Expand Down
6 changes: 4 additions & 2 deletions emrio/job_handler.py
Expand Up @@ -213,14 +213,16 @@ def describe_all_job_flows(emr_conn, states=None, jobflow_ids=None,
while True:
if created_before and created_after and created_before < created_after:
break
logging.disabled = True
logging.disable(logging.DEBUG)
logging.disable(logging.ERROR)
logging.disable(logging.INFO)
try:
results = emr_conn.describe_jobflows(
states=states, jobflow_ids=jobflow_ids,
created_after=created_after, created_before=created_before)
except boto.exception.BotoServerError, ex:
if 'ValidationError' in ex.body:
logging.disabled = False
logging.disable(logging.NOTSET)
break
else:
raise
Expand Down
28 changes: 24 additions & 4 deletions emrio/optimizer.py
@@ -1,11 +1,13 @@
"""The optimizer module holds all the functions relating to creating the
best instance pool that yields the least cost over an interval of time.
"""
import copy
import logging
from math import ceil

from simulate_jobs import Simulator


class Optimizer(object):
def __init__(self, job_flows, EC2, job_flows_interval=None):
self.EC2 = EC2
Expand All @@ -20,7 +22,7 @@ def run(self, pre_existing_pool=None):
"""Take all the max_instance counts, then use that to hill climb to
find the most cost efficient instance cost
Returns:
Returns:
optimal_pool: dict of the best pool of instances to be used.
"""
if pre_existing_pool is None:
Expand Down Expand Up @@ -53,23 +55,26 @@ def optimize_reserve_pool(self, instance_type, pool):
convert_to_yearly_estimated_hours(logged_hours, self.job_flows_interval)
current_min_cost, _ = self.EC2.calculate_cost(logged_hours, pool)
current_cost = current_min_cost
delta_reserved_hours = (
self.delta_reserved_instance_hours_generator(instance_type, pool))

while previous_cost >= current_cost:
current_simulation_costs = self.EC2.init_reserve_costs(float('inf'))

# Add a single instance to each utilization type, and record the costs.
# Choose the minimum cost utilization type.
logging.debug("Simulation hours added %d", delta_reserved_hours.next())
for utilization_class in pool:
# Reset the min instances to the best values.
for current_util in pool:
pool[current_util][instance_type] = current_min_instances[current_util]

pool[utilization_class][instance_type] = (
current_min_instances[utilization_class] + 1)
logged_hours = simulator.run()

convert_to_yearly_estimated_hours(logged_hours, self.job_flows_interval)
cost, _ = self.EC2.calculate_cost(logged_hours, pool)
current_simulation_costs[utilization_class] = cost

previous_cost = current_cost
current_cost = min(current_simulation_costs.values())
min_util_level = None
Expand All @@ -94,6 +99,22 @@ def optimize_reserve_pool(self, instance_type, pool):
pool[utilization_class][instance_type] = (
current_min_instances[utilization_class])

def delta_reserved_instance_hours_generator(self, instance_type, pool):

starter_pool = copy.deepcopy(pool)
assert(len(self.EC2.RESERVE_PRIORITIES) > 0)
highest_util = self.EC2.RESERVE_PRIORITIES[0]
iterative_simulator = Simulator(self.job_flows, starter_pool)
previous_logged_hours = iterative_simulator.run()
previous_hours = previous_logged_hours[highest_util][instance_type]

while True:
starter_pool[highest_util][instance_type] += 1
current_logged_hours = iterative_simulator.run()
current_hours = current_logged_hours[highest_util][instance_type]
yield (current_hours - previous_hours)
previous_hours = current_hours


def convert_to_yearly_estimated_hours(logged_hours, interval):
"""Takes a min and max time and will convert to the amount of hours estimated
Expand Down Expand Up @@ -123,4 +144,3 @@ def convert_to_yearly_estimated_hours(logged_hours, interval):
for machine in logged_hours[utilization_class]:
logged_hours[utilization_class][machine] = (
ceil(logged_hours[utilization_class][machine] * conversion_rate))

43 changes: 14 additions & 29 deletions emrio/simulate_jobs.py
Expand Up @@ -27,8 +27,8 @@
JOB_ID is the id for the job running. The rest is setup like logs and pool.
It is used to keep track of what the job is currently using in instances.
"""

import datetime
from collections import defaultdict
from config import EC2
from heapq import heapify, heappop

Expand Down Expand Up @@ -77,7 +77,6 @@ def run(self):
pool_used = EC2.init_empty_all_instance_types()

jobs_running = {}

# Start simulating events.
for time, event_type, job in [heappop(job_event_timeline)
for i in range(len(job_event_timeline))]:
Expand All @@ -100,7 +99,6 @@ def run(self):
elif event_type is END:
self.log_hours(logged_hours, jobs_running, job_id)
self.remove_job(jobs_running, pool_used, job)

self.notify_observers(time, event_type, job, logged_hours, pool_used)
return logged_hours

Expand Down Expand Up @@ -145,7 +143,7 @@ def setup_job_event_timeline(self):

def attach_log_hours_observer(self, observer):
self.log_observers.append(observer)

def attach_pool_use_observer(self, observer):
self.use_pool_observers.append(observer)

Expand Down Expand Up @@ -176,8 +174,8 @@ def log_hours(self, logged_hours, jobs, job_id):
job_id -- type: String
use: To index into jobs to get the amount of instances it is using.
"""
for utilization_class in jobs.get(job_id, None):
for instance_type in jobs[job_id].get(utilization_class, None):
for utilization_class in jobs.get(job_id, {}):
for instance_type in jobs[job_id].get(utilization_class, {}):
logged_hours[utilization_class][instance_type] = (
logged_hours[utilization_class].get(instance_type, 0)
+ jobs[job_id][utilization_class][instance_type])
Expand Down Expand Up @@ -220,8 +218,8 @@ def allocate_job(self, jobs, pool_used, job):

# Record job data for use in logging later.
jobs[job_id][utilization_class] = jobs[job_id].get(
utilization_class, {})
jobs[job_id][utilization_class][instance_type] = instances_utilized
utilization_class, defaultdict(int))
jobs[job_id][utilization_class][instance_type] += instances_utilized

if instances_needed == 0:
break
Expand All @@ -244,13 +242,13 @@ def remove_job(self, jobs, pool_used, job):
job_id = job.get('jobflowid')

# Remove all the pool used by the instance then delete the job.
for utilization_class in jobs.get(job_id, None).keys():
for utilization_class in jobs.get(job_id, {}).keys():

for instance_type in jobs[job_id].get(utilization_class, None).keys():
for instance_type in jobs[job_id].get(utilization_class, {}).keys():
pool_used[utilization_class][instance_type] = (
pool_used[utilization_class].get(instance_type, 0) -
jobs[job_id][utilization_class][instance_type])
if pool_used[utilization_class][instance_type] is 0:
if pool_used[utilization_class][instance_type] == 0:
del pool_used[utilization_class][instance_type]
del jobs[job_id]

Expand All @@ -272,19 +270,7 @@ def rearrange_instances(self, jobs, pool_used, job):
pool_used: May rearrange the instance usage if there is a better combination
jobs: rearranges job instances.
"""
job_id = job.get('jobflowid')

# Remove the jobs currently used instances then allocate the job over again.
for utilization_class in jobs.get(job_id, None).keys():

for instance_type in jobs[job_id].get(utilization_class, None).keys():
pool_used[utilization_class][instance_type] = (
pool_used[utilization_class].get(instance_type, 0) -
jobs[job_id][utilization_class][instance_type]
)
if pool_used[utilization_class][instance_type] is 0:
del pool_used[utilization_class][instance_type]

self.remove_job(jobs, pool_used, job)
self.allocate_job(jobs, pool_used, job)

def _calculate_space_left(self, amt_used, utilization_class, instance_type):
Expand All @@ -298,9 +284,10 @@ def _calculate_space_left(self, amt_used, utilization_class, instance_type):
else:
return float('inf')


class SimulationObserver(object):
"""Used to record information during each step of the simulation.
You can attach a SimulationObserver to a Simulator if you want information
about each event. For example, the graph module uses the SimulationObserver
to record the instances used during each event of the simulator so that it can
Expand All @@ -309,10 +296,10 @@ class SimulationObserver(object):
def __init__(self, hour_graph, recorder):
self.hour_graph = hour_graph
self.recorder = recorder

def update(self, time, node_type, job, data):
"""Records data usage for each time node in the priority queue. The logger
is called twice in a single event. So this records the state of the
is called twice in a single event. So this records the state of the
simulator before and after some event
Args:
Expand Down Expand Up @@ -343,5 +330,3 @@ def update(self, time, node_type, job, data):
self.recorder[utilization_class][instance_type] = []
total += data[utilization_class].get(instance_type, 0)
self.recorder[utilization_class][instance_type].append(total)


39 changes: 30 additions & 9 deletions tests/test_optimize.py
Expand Up @@ -29,8 +29,21 @@
DEFAULT_LOG[MEDIUM_UTIL][INSTANCE_NAME] = 100


def create_parallel_jobs(amount, start_time=BASETIME,
end_time=(BASETIME + HEAVY_INTERVAL), start_count=0):
def create_parallel_jobs(amount,
start_time=BASETIME,
end_time=(BASETIME + HEAVY_INTERVAL),
start_count=0):
"""Creates pseudo-jobs that will run in parallel with each other.
Args:
amount: Amount of jobs to run in parallel.
start_time: a datetime all the jobs start at.
end_time: a datetime all the jobs end at.
start_count: job id's
"""
jobs = []
for i in range(amount):
test_job = create_test_job(INSTANCE_NAME, BASE_INSTANCES,
Expand All @@ -39,10 +52,20 @@ def create_parallel_jobs(amount, start_time=BASETIME,
return jobs


def create_test_job(instance_name, count, j_id, start_time=BASETIME,
end_time=(BASETIME + HEAVY_INTERVAL)):
job1 = {'instancegroups': create_test_instancegroup(instance_name, count),
'jobflowid': j_id, 'startdatetime': start_time, 'enddatetime': end_time}
def create_test_job(instance_name,
instance_count,
j_id,
start_time=BASETIME,
end_time=(BASETIME + HEAVY_INTERVAL)):
"""Creates a simple job dict object
"""
job1 = {
'instancegroups': create_test_instancegroup(instance_name, instance_count),
'jobflowid': j_id,
'startdatetime': start_time,
'enddatetime': end_time}

return job1


Expand Down Expand Up @@ -96,8 +119,6 @@ def test_downgraded_pool(self):
self.assertEquals(optimized[HEAVY_UTIL], heavy_util)
self.assertEquals(optimized[MEDIUM_UTIL], medium_util)



def test_heavy_util(self):
"""This should just create a set of JOB_AMOUNT and then all
will be assigned to heavy_util since default interval for jobs
Expand Down Expand Up @@ -211,7 +232,7 @@ def test_interval_converter_two_months(self):

def test_interval_converter_two_years(self):
"""Since we only want logs for one year, this should
conver the hours to half the original hours."""
convert the hours to half the original hours."""
logs = copy.deepcopy(DEFAULT_LOG)
logs_after = copy.deepcopy(DEFAULT_LOG)
logs_after[MEDIUM_UTIL][INSTANCE_NAME] = ceil(
Expand Down

0 comments on commit 887fd13

Please sign in to comment.