Skip to content

Commit

Permalink
fix merging of adjacent Kinesis shards. Remove duplicates in EMR down…
Browse files Browse the repository at this point in the history
…scale candidates list
  • Loading branch information
whummer committed Nov 16, 2016
1 parent a02afe6 commit abaa766
Show file tree
Hide file tree
Showing 9 changed files with 63 additions and 13 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ make server

## Change Log

* v0.2.6: Fix merging of adjacent Kinesis shards. Fix duplicates in EMR downscale candidates
* v0.2.5: Add SQLAlchemy and store config and monitoring data in a proper database
* v0.2.4: Support temporary STS tokens via assumed IAM roles
* v0.2.3: Use boto3 for AWS API calls
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def run(self):

setup(
name='themis-autoscaler',
version='0.2.5',
version='0.2.6',
description='Themis is an autoscaler for Elastic Map Reduce (EMR) clusters on Amazon Web Services.',
author='Atlassian and others',
maintainer='Waldemar Hummer',
Expand Down
2 changes: 1 addition & 1 deletion themis/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ def get_kinesis_state(stream_id):
in: path
"""
app_config = config.get_config()
stream = resources.get_resource(SECTION_KINESIS, stream_id)
stream = resources.get_resource(SECTION_KINESIS, stream_id, reload=True)
monitoring_interval_secs = int(app_config.general.monitoring_time_window)
info = kinesis_monitoring.collect_info(stream, monitoring_interval_secs=monitoring_interval_secs)
return jsonify(info)
Expand Down
8 changes: 6 additions & 2 deletions themis/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
SECTION_EMR = 'emr'
SECTION_KINESIS = 'kinesis'

# environment variable names
ENV_THEMIS_DB_URL = 'THEMIS_DB_URL'

# set this to override config for testing
TEST_CONFIG = None

Expand Down Expand Up @@ -107,13 +110,14 @@ class GeneralConfiguration(ConfigObject):
'autoscaling_kinesis_streams': 'Comma-separated list of Kinesis stream names to auto-scale',
'scaling_loop_interval': 'Loop interval seconds',
'db_url': ('Database connection URL. ' +
'Examples: sqlite:///themis.data.db or mysql://user:pass@host:port/dbname'),
'Examples: sqlite:///themis.data.db or mysql://user:pass@host:port/dbname ' +
'This value can be initialized via the $THEMIS_DB_URL environment variable.'),
'monitoring_time_window': 'Time period (seconds) of historical monitoring data to consider for scaling'
}

def __init__(self):
self.ssh_keys = '$SSH_KEY_ETL_PROD'
self.db_url = 'sqlite:///themis.data.db'
self.db_url = os.environ.get(ENV_THEMIS_DB_URL) or 'sqlite:///themis.data.db'
self.roles_to_assume = ''
self.autoscaling_clusters = ''
self.autoscaling_kinesis_streams = ''
Expand Down
10 changes: 9 additions & 1 deletion themis/monitoring/emr_monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,9 @@ def do_add_stats(nodelist, result_map):
if 'presto_state' not in item:
item['presto_state'] = 'N/A'
if item['presto_state'] != aws_common.PRESTO_STATE_ACTIVE:
if result_map['active']:
LOG.info('Status of node %s is %s, setting "<nodes>.active=False"' %
(item.get('host'), item['presto_state']))
result_map['active'] = False
result_map['average']['cpu'] = 'NaN'
result_map['average']['mem'] = 'NaN'
Expand Down Expand Up @@ -354,7 +357,12 @@ def get_time_based_scaling_config(cluster_id, config=None):
return {}


def update_resources(resource_config):
def reload_resource(resource):
# TODO
pass


def update_resources(resource_config, resource=None):
return resource_config


Expand Down
14 changes: 11 additions & 3 deletions themis/monitoring/kinesis_monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from themis import config
import themis.model.resources_model
import themis.model.kinesis_model
import themis.monitoring.resources
from themis.model.aws_model import *
from themis.config import *
from themis.util.common import *
Expand Down Expand Up @@ -32,12 +33,19 @@ def update_config(old_config, new_config, section, resource=None):
config.CONFIG_LISTENERS.add(update_config)


def reload_resource(resource):
stream_id = resource if isinstance(resource, basestring) else resource.id
stream = retrieve_stream_details(stream_id)
themis.monitoring.resources.save_resource(SECTION_KINESIS, stream)
return stream


def update_resources(resource_config):
for resource in resource_config:
id = resource.id
for res in resource_config:
id = res.id
enabled = config.get_value('enable_enhanced_monitoring', section=SECTION_KINESIS, resource=id)
if enabled == 'true':
resource.enhanced_monitoring = [MONITORING_METRICS_ALL]
res.enhanced_monitoring = [MONITORING_METRICS_ALL]
return resource_config


Expand Down
16 changes: 12 additions & 4 deletions themis/monitoring/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@ def update_config(old_config, new_config, section, resource=None):

def update_resources(section=None):
cfg = load_resources_config()
cfg.kinesis = themis.monitoring.kinesis_monitoring.update_resources(cfg.kinesis)
cfg.emr = themis.monitoring.emr_monitoring.update_resources(cfg.emr)
if not section or section == SECTION_KINESIS:
cfg.kinesis = themis.monitoring.kinesis_monitoring.update_resources(cfg.kinesis)
if not section or section == SECTION_EMR:
cfg.emr = themis.monitoring.emr_monitoring.update_resources(cfg.emr)
save_resources_file(cfg)
return cfg


def get_resources(section=None, reload=False):
Expand All @@ -39,13 +42,18 @@ def get_resources(section=None, reload=False):
reloaded = True
config = load_resources_config()
if reloaded:
update_resources(section)
config = update_resources(section)
if not section:
return config.get_all()
return config.get(section)


def get_resource(section, resource_id):
def get_resource(section, resource_id, reload=False):
if reload:
if section == SECTION_KINESIS:
themis.monitoring.kinesis_monitoring.reload_resource(resource_id)
if section == SECTION_EMR:
themis.monitoring.emr_monitoring.reload_resource(resource_id)
section_resources = get_resources(section)
for resource in section_resources:
if resource.id == resource_id:
Expand Down
19 changes: 18 additions & 1 deletion themis/scaling/emr_scaling.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,18 @@ def get_node_groups_or_preferred_markets(cluster_id, info=None, config=None):
return result


def remove_duplicates(nodes):
result = []
for n in nodes:
contained = False
for n1 in result:
if n1.get('iid') == n.get('iid') and n1.get('cid') == n.get('cid'):
contained = True
if not contained:
result.append(n)
return result


def get_termination_candidates(info, config=None):
result = []
cluster_id = info['cluster_id']
Expand All @@ -70,6 +82,7 @@ def get_termination_candidates_for_market_or_group(info, preferred):
return candidates


# TODO: merge with execute_dsl_string in util/expr.py !
def execute_dsl_string(dsl_str, context, config=None):
expr_context = expr.ExprContext(context)
allnodes = expr_context.allnodes
Expand Down Expand Up @@ -123,12 +136,16 @@ def get_nodes_to_terminate(info, config=None):
if not isinstance(num_downsize, int) or num_downsize <= 0:
return []

candidates = get_termination_candidates(info, config=config)
candidates_orig = get_termination_candidates(info, config=config)
candidates = remove_duplicates(candidates_orig)
candidates = sort_nodes_by_load(candidates, desc=False)

if len(candidates) < num_downsize:
LOG.warning('Not enough candidate nodes to perform downsize operation: %s < %s' %
(len(candidates), num_downsize))
cluster_id = info['cluster_id']
preferred_list = get_node_groups_or_preferred_markets(cluster_id, info=info, config=config)
LOG.warning('Initial candidates, preferred inst. groups: %s - %s' % (candidates_orig, preferred_list))

result = []
if candidates:
Expand Down
4 changes: 4 additions & 0 deletions themis/scaling/kinesis_scaling.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ def __init__(self, shard1, shard2):
self.shard1 = shard1
self.shard2 = shard2
# TODO check if adjacent
if long(shard2.start_key) < long(shard1.start_key):
# swap shards
self.shard1 = shard2
self.shard2 = shard1

def length(self):
return self.shard1.length() + self.shard2.length()
Expand Down

0 comments on commit abaa766

Please sign in to comment.