Skip to content

Commit

Permalink
Merge branch 'dev'
Browse files Browse the repository at this point in the history
  • Loading branch information
gerardparis committed Jul 4, 2016
2 parents d437475 + a6c15ca commit 3f0dcec
Show file tree
Hide file tree
Showing 28 changed files with 536 additions and 396 deletions.
12 changes: 7 additions & 5 deletions dynamic_policies/consumer.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import pika
import logging
logging.basicConfig()
from threading import Thread

logging.basicConfig()


class Consumer(object):
_sync = {}
Expand All @@ -26,14 +27,15 @@ def __init__(self, host, port, username, password, exchange, queue, routing_key,
print 'routing_key', routing_key
if routing_key:
self._channel.queue_bind(exchange=exchange,
queue=queue,
routing_key=routing_key)
queue=queue,
routing_key=routing_key)

self.consumer = self._channel.basic_consume(self.callback,
queue=queue,
no_ack=True)
queue=queue,
no_ack=True)
else:
print "You must entry a routing key"

def callback(self, ch, method, properties, body):
self.obj.notify(body)

Expand Down
84 changes: 42 additions & 42 deletions dynamic_policies/dsl_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,60 +6,60 @@
# By default, PyParsing treats \n as whitespace and ignores it
# In our grammer, \n is significant, so tell PyParsing not to ignore it
# ParserElement.setDefaultWhitespaceChars(" \t")
"""
rule ::= "FOR Tenant WHEN"+property +"[< > = <= >=]+X+"DO"+action

condition ::= property +"[< > = <= >=]+X
condition_list ::= condition
| condition_list AND condition
| condition_list OR condition
FOR Tenant WHEN"+ condition_list +"DO"+action
# rule ::= "FOR Tenant WHEN"+property +"[< > = <= >=]+X+"DO"+action
#
# condition ::= property +"[< > = <= >=]+X
# condition_list ::= condition
# | condition_list AND condition
# | condition_list OR condition
# FOR Tenant WHEN"+ condition_list +"DO"+action
#
# FOR Tenant WHEN"+ condition AND condition AND condition OR condition etc.+"DO"+action
#
# TODO: Parse = TRUE or = False or condicion number. Check to convert to float or convert to boolean.

FOR Tenant WHEN"+ condition AND condition AND condition OR condition etc.+"DO"+action
TODO: Parse = TRUE or = False or condicion number. Check to convert to float or convert to boolean.
"""

def get_redis_connection():
return redis.Redis(connection_pool=settings.REDIS_CON_POOL)


def parse_group_tenants(tokens):
r = get_redis_connection()
data = r.lrange(tokens[0], 0, -1)
return data


def parse(input_string):
#TODO Raise an exception if not metrics or not action registred
#TODO Raise an exception if group of tenants don't exists.
# TODO Raise an exception if not metrics or not action registred
# TODO Raise an exception if group of tenants don't exists.

r = get_redis_connection()

#Support words to construct the grammar.
# Support words to construct the grammar.
word = Word(alphas)
when = Suppress(Literal("WHEN"))
literal_for = Suppress(Literal("FOR"))
boolean_condition = oneOf("AND OR")

#Condition part
param = Word(alphanums+"_")+ Suppress(Literal("=")) + Word(alphanums+"_")
# Condition part
param = Word(alphanums+"_") + Suppress(Literal("=")) + Word(alphanums+"_")
metrics_workload = r.keys("metric:*")
services = map(lambda x: "".join(x.split(":")[1]), metrics_workload)
services_options = oneOf(services)
operand = oneOf("< > == != <= >=")
operand = oneOf("< > == != <= >=")
number = Regex(r"[+-]?\d+(:?\.\d*)?(:?[eE][+-]?\d+)?")
condition = Group(services_options + operand("operand") + number("limit_value"))
condition_list = operatorPrecedence(condition,[
("AND", 2, opAssoc.LEFT, ),
("OR", 2, opAssoc.LEFT, ),
])


#For tenant or group of tenants
# For tenant or group of tenants
group_id = Word(nums)
container = Group(Literal("CONTAINER")("type") + Suppress(":") + Combine(Word(alphanums) + Literal("/") + Word(alphanums+"_-")))
obj = Group(Literal("OBJECT")("type") + Suppress(":") + Combine(Word(alphanums)+Literal("/")+ Word(alphanums+"_-")+Literal("/")+ Word(alphanums+"_-.")))
tenant = Group(Literal("TENANT")("type")+ Suppress(":") + Combine(Word(alphanums)))
obj = Group(Literal("OBJECT")("type") + Suppress(":") + Combine(Word(alphanums)+Literal("/") + Word(alphanums+"_-")+Literal("/") + Word(alphanums+"_-.")))
tenant = Group(Literal("TENANT")("type") + Suppress(":") + Combine(Word(alphanums)))
tenant_group = Combine(Literal("G:") + group_id)

tenant_group_list = tenant_group + ZeroOrMore(Suppress("AND")+tenant_group)
Expand All @@ -69,7 +69,7 @@ def parse(input_string):
target = Group(delimitedList(tenant) ^ delimitedList(obj) ^ delimitedList(container) ^ delimitedList(tenant_group))
# Group(tenant_list ^ tenant_group_list ^ container_list ^ obj_list)

#Action part
# Action part
action = oneOf("SET DELETE")
sfilters_list = r.keys("filter:*")
sfilter = map(lambda x: "".join(x.split(":")[1]), sfilters_list)
Expand All @@ -78,52 +78,51 @@ def parse(input_string):
do = Suppress(Literal("DO"))
params_list = delimitedList(param)
server_execution = oneOf("PROXY OBJECT")
#TRANSCIENT
# TRANSIENT
transient = Literal("TRANSIENT")
action = Group(action("action") + oneOf(sfilter)("filter") + Optional(with_params + params_list("params") + \
Optional(Suppress("ON")+server_execution("server_execution"))) + Optional(transient("transient")))
action = Group(action("action") + oneOf(sfilter)("filter") +
Optional(with_params + params_list("params") + Optional(Suppress("ON")+server_execution("server_execution"))) +
Optional(transient("transient")))

action_list = Group(delimitedList(action))

#Object types
operand_object = oneOf("< > == != <= >=")
# Object types
operand_object = oneOf("< > == != <= >=")
object_parameter = oneOf("OBJECT_TYPE OBJECT_SIZE")
object_type = Group(Literal("OBJECT_TYPE")("type") + Literal("=") + word(alphanums)("object_value"))("object_type")
object_size = Group(Literal("OBJECT_SIZE")("type") + operand_object("operand") + number("object_value"))("object_size")
object_list = Group(object_type ^ object_size ^ object_type +","+ object_size ^ object_size +","+ object_type)
object_list = Group(object_type ^ object_size ^ object_type + "," + object_size ^ object_size + "," + object_type)
to = Suppress("TO")

#Functions post-parsed
convertToDict = lambda tokens : dict(zip(*[iter(tokens)]*2))
remove_repeted_elements = lambda tokens : [list(set(tokens[0]))]
# Functions post-parsed
convert_to_dict = lambda tokens: dict(zip(*[iter(tokens)]*2))
remove_repeted_elements = lambda tokens: [list(set(tokens[0]))]

params_list.setParseAction(convertToDict)
params_list.setParseAction(convert_to_dict)
target.setParseAction(remove_repeted_elements)
tenant_group.setParseAction(parse_group_tenants)

# Final rule structure
rule_parse = literal_for + target("target") + \
Optional(when + condition_list("condition_list")) + do + \
action_list("action_list") + Optional(to + object_list("object_list"))

#Final rule structure
rule_parse = literal_for + target("target") + Optional(when +\
condition_list("condition_list")) + do + action_list("action_list") +\
Optional(to + object_list("object_list"))

#Parse the rule
# Parse the rule
parsed_rule = rule_parse.parseString(input_string)

#Pos-parsed validation
# Pos-parsed validation
has_condition_list = True
if not parsed_rule.condition_list:
has_condition_list = False


for action in parsed_rule.action_list:
if action.params:
filter_info = r.hgetall("filter:"+str(action.filter))
if "valid_parameters" in filter_info.keys():
params = eval(filter_info["valid_parameters"])
result = set(action.params.keys()).intersection(params.keys())
if len(result) == len(action.params.keys()):
#TODO Check params types.
# TODO Check params types.
return has_condition_list, parsed_rule
else:
raise Exception
Expand All @@ -133,7 +132,8 @@ def parse(input_string):
return has_condition_list, parsed_rule


# rules ="""FOR OBJECT:4f0279da74ef4584a29dc72c835fe2c9/2/2 AND OBJECT:4f0279da74ef4584a29dc72c835fe2c9/2/2 DO SET compression WITH bw=2 ON OBJECT, SET uonetrace WITH bw=2 ON PROXY """.splitlines()
# rules ="""FOR OBJECT:4f0279da74ef4584a29dc72c835fe2c9/2/2 AND OBJECT:4f0279da74ef4584a29dc72c835fe2c9/2/2 DO SET compression WITH bw=2 ON OBJECT,
# SET uonetrace WITH bw=2 ON PROXY """.splitlines()
# rules ="""FOR TENANT:4f0279da74ef4584a29dc72c835fe2c9, TENANT:2 DO SET compression TRANSIENT, SET compression TRANSIENT TO OBJECT_TYPE=DOCS""".splitlines()

# # rules = """\
Expand Down
64 changes: 41 additions & 23 deletions dynamic_policies/init_actors.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
import redis
import dsl_parser


def get_redis_connection():
return redis.StrictRedis(host="localhost", port=6379, db=0)


def start_actors():
r = get_redis_connection()

Expand All @@ -14,29 +16,39 @@ def start_actors():
global host
global metrics
tcpconf = ('tcp', ('127.0.0.1', 6375))
#momconf = ('mom',{'name':'metric_host','ip':'127.0.0.1','port':61613, 'namespace':'/topic/iostack'})
# momconf = ('mom',{'name':'metric_host','ip':'127.0.0.1','port':61613, 'namespace':'/topic/iostack'})
host = init_host(tcpconf)
metrics = {}
metrics["get_ops_tenant"] = host.spawn_id("get_ops_tenant", 'metrics.swift_metric', 'SwiftMetric', ["amq.topic", "get_ops_tenant", "metrics.get_tenant"])
metrics["put_ops_tenant"] = host.spawn_id("put_ops_tenant", 'metrics.swift_metric', 'SwiftMetric', ["amq.topic", "put_ops_tenant", "metrics.put_tenant"])

metrics["active_get_requests"] = host.spawn_id("active_get_requests", 'metrics.swift_metric', 'SwiftMetric', ["amq.topic", "active_get_requests", "metrics.active_get_requests"])
metrics["active_put_requests"] = host.spawn_id("active_put_requests", 'metrics.swift_metric', 'SwiftMetric', ["amq.topic", "active_put_requests", "metrics.active_put_requests"])
metrics["active_get_requests"] = host.spawn_id("active_get_requests", 'metrics.swift_metric', 'SwiftMetric',
["amq.topic", "active_get_requests", "metrics.active_get_requests"])
metrics["active_put_requests"] = host.spawn_id("active_put_requests", 'metrics.swift_metric', 'SwiftMetric',
["amq.topic", "active_put_requests", "metrics.active_put_requests"])

#metrics["head_ops_tenant"] = host.spawn_id("head_ops_tenant", 'metrics.collectd_metric', 'CollectdMetric', ["amq.topic", "head_ops_tenant", "collectd.*.groupingtail.tm.*.head_ops.#"])
# metrics["head_ops_tenant"] = host.spawn_id("head_ops_tenant", 'metrics.collectd_metric', 'CollectdMetric',
# ["amq.topic", "head_ops_tenant", "collectd.*.groupingtail.tm.*.head_ops.#"])
metrics["get_bw"] = host.spawn_id("get_bw_tenant", 'metrics.swift_metric', 'SwiftMetric', ["amq.topic", "get_bw", "metrics.get_bw"])
metrics["put_bw"] = host.spawn_id("put_bw_tenant", 'metrics.swift_metric', 'SwiftMetric', ["amq.topic", "put_bw", "metrics.put_bw"])

metrics["get_ops_container"] = host.spawn_id("get_ops_container", 'metrics.swift_metric', 'SwiftMetric', ["amq.topic", "get_ops_container", "metrics.get_container"])
metrics["put_ops_container"] = host.spawn_id("put_ops_container", 'metrics.swift_metric', 'SwiftMetric', ["amq.topic", "put_ops_container", "metrics.put_container"])
#metrics["head_ops_container"] = host.spawn_id("head_ops_container", 'metrics.collectd_metric', 'CollectdMetric', ["amq.topic", "head_ops_container", "collectd.*.groupingtail.cm.*.head_ops.#"])
#metrics["get_bw_container"] = host.spawn_id("get_bw_container", 'metrics.collectd_metric', 'CollectdMetric', ["amq.topic", "get_bw_container", "collectd.*.groupingtail.cm.*.get_bw.#"])
#metrics["put_bw_container"] = host.spawn_id("put_bw_container", 'metrics.collectd_metric', 'CollectdMetric', ["amq.topic", "put_bw_container", "collectd.*.groupingtail.cm.*.put_bw.#"])
metrics["get_ops_container"] = host.spawn_id("get_ops_container", 'metrics.swift_metric', 'SwiftMetric',
["amq.topic", "get_ops_container", "metrics.get_container"])
metrics["put_ops_container"] = host.spawn_id("put_ops_container", 'metrics.swift_metric', 'SwiftMetric',
["amq.topic", "put_ops_container", "metrics.put_container"])

# metrics["head_ops_container"] = host.spawn_id("head_ops_container", 'metrics.collectd_metric', 'CollectdMetric',
# ["amq.topic", "head_ops_container", "collectd.*.groupingtail.cm.*.head_ops.#"])
# metrics["get_bw_container"] = host.spawn_id("get_bw_container", 'metrics.collectd_metric', 'CollectdMetric',
# ["amq.topic", "get_bw_container", "collectd.*.groupingtail.cm.*.get_bw.#"])
# metrics["put_bw_container"] = host.spawn_id("put_bw_container", 'metrics.collectd_metric', 'CollectdMetric',
# ["amq.topic", "put_bw_container", "collectd.*.groupingtail.cm.*.put_bw.#"])

# Metrics for Bandwidth differentiation
metrics["get_bw_info"] = host.spawn_id("get_bw_info", 'metrics.bw_info', 'BwInfo', ["amq.topic","get_bw_info", "bwdifferentiation.get_bw_info.#","GET"])
metrics["put_bw_info"] = host.spawn_id("put_bw_info", 'metrics.bw_info', 'BwInfo', ["amq.topic","put_bw_info", "bwdifferentiation.put_bw_info.#","PUT"])
metrics["ssync_bw_info"] = host.spawn_id("ssync_bw_info", 'metrics.bw_info_ssync', 'BwInfoSSYNC', ["amq.topic","ssync_bw_info", "bwdifferentiation.ssync_bw_info.#","SSYNC"])
metrics["get_bw_info"] = host.spawn_id("get_bw_info", 'metrics.bw_info', 'BwInfo', ["amq.topic", "get_bw_info", "bwdifferentiation.get_bw_info.#", "GET"])
metrics["put_bw_info"] = host.spawn_id("put_bw_info", 'metrics.bw_info', 'BwInfo', ["amq.topic", "put_bw_info", "bwdifferentiation.put_bw_info.#", "PUT"])
metrics["ssync_bw_info"] = host.spawn_id("ssync_bw_info", 'metrics.bw_info_ssync', 'BwInfoSSYNC',
["amq.topic", "ssync_bw_info", "bwdifferentiation.ssync_bw_info.#", "SSYNC"])

try:
for metric in metrics.values():
Expand All @@ -46,25 +58,31 @@ def start_actors():
for metric in metrics.values():
print 'metric!', metric
metric.stop_actor()

rules = {}
#rules["get_bw"] = host.spawn_id("abstract_enforcement_algorithm_get", 'rules.min_slo_tenant_global_share_spare_bw', 'MinTenantSLOGlobalSpareBWShare', ["abstract_enforcement_algorithm_get","GET"])
rules["get_bw"] = host.spawn_id("abstract_enforcement_algorithm_get", 'rules.simple_proportional_bandwidth', 'SimpleProportionalBandwidthPerTenant', ["abstract_enforcement_algorithm_get","GET"])
# rules["get_bw"] = host.spawn_id("abstract_enforcement_algorithm_get", 'rules.min_slo_tenant_global_share_spare_bw', 'MinTenantSLOGlobalSpareBWShare',
# ["abstract_enforcement_algorithm_get","GET"])
rules["get_bw"] = host.spawn_id("abstract_enforcement_algorithm_get", 'rules.simple_proportional_bandwidth', 'SimpleProportionalBandwidthPerTenant',
["abstract_enforcement_algorithm_get", "GET"])
rules["get_bw"].run("get_bw_info")

#rules["put_bw"] = host.spawn_id("abstract_enforcement_algorithm_put", 'rules.min_slo_tenant_global_share_spare_bw', 'MinTenantSLOGlobalSpareBWShare', ["abstract_enforcement_algorithm_put","PUT"])
rules["put_bw"] = host.spawn_id("abstract_enforcement_algorithm_put", 'rules.simple_proportional_bandwidth', 'SimpleProportionalBandwidthPerTenant', ["abstract_enforcement_algorithm_put","PUT"])
# rules["put_bw"] = host.spawn_id("abstract_enforcement_algorithm_put", 'rules.min_slo_tenant_global_share_spare_bw', 'MinTenantSLOGlobalSpareBWShare',
# ["abstract_enforcement_algorithm_put","PUT"])
rules["put_bw"] = host.spawn_id("abstract_enforcement_algorithm_put", 'rules.simple_proportional_bandwidth', 'SimpleProportionalBandwidthPerTenant',
["abstract_enforcement_algorithm_put", "PUT"])
rules["put_bw"].run("put_bw_info")

rules["ssync_bw"] = host.spawn_id("abstract_enforcement_algorithm_ssync", 'rules.simple_proportional_replication_bandwidth', 'SimpleProportionalReplicationBandwidth', ["abstract_enforcement_algorithm_ssync","SSYNC"])
rules["ssync_bw"] = host.spawn_id("abstract_enforcement_algorithm_ssync", 'rules.simple_proportional_replication_bandwidth',
'SimpleProportionalReplicationBandwidth', ["abstract_enforcement_algorithm_ssync", "SSYNC"])
rules["ssync_bw"].run("ssync_bw_info")

start_redis_rules(host, rules)

return host



def start_redis_rules(host, rules):
''' START DYNAMIC POLICIES STORED IN REDIS, IF ANY '''
# START DYNAMIC POLICIES STORED IN REDIS, IF ANY
r = get_redis_connection()
dynamic_policies = r.keys("policy:*")

Expand All @@ -77,7 +95,7 @@ def start_redis_rules(host, rules):
if policy_data['alive'] == 'True':
_, rule_parsed = dsl_parser.parse(policy_data['policy'])

target = rule_parsed.target[0][1] # Tenant ID or tenant+container
target = rule_parsed.target[0][1] # Tenant ID or tenant+container

for action_info in rule_parsed.action_list:
if action_info.transient:
Expand All @@ -88,12 +106,12 @@ def start_redis_rules(host, rules):
print 'Rule:', policy_data['policy']
rules[policy] = host.spawn_id(str(policy), 'rule', 'Rule', [rule_parsed, action_info, target, host])
rules[policy].start_rule()



def main():
print "-- Starting workload metric actors --"
start_controller('pyactive_thread')
serve_forever(start_actors)

if __name__ == "__main__":
main()

0 comments on commit 3f0dcec

Please sign in to comment.