Skip to content

Commit

Permalink
Adapted to new PyActor version, Dynamic policies
Browse files Browse the repository at this point in the history
  • Loading branch information
JosepSampe committed Oct 25, 2017
1 parent fbb3bf3 commit 531bd0e
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 85 deletions.
1 change: 0 additions & 1 deletion api/controllers/actors/abstract_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from pyactor.exceptions import NotFoundError
from django.conf import settings
import logging
import Queue
import pika
import redis

Expand Down
13 changes: 7 additions & 6 deletions api/metrics/actors/swift_metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ class SwiftMetric(object):
for each observer in that tenant is subscribed. In this way, the metric
actor only sends the necessary information to each observer.
"""
_tell = ['get_value', 'attach', 'detach', 'notify', 'start_consuming', 'stop_consuming', 'init_consum', 'stop_actor']
_tell = ['attach', 'detach', 'notify', 'start_consuming', 'stop_consuming']
_ask = ['init_consum', 'stop_actor']
_ref = ['attach']

def __init__(self, metric_id, routing_key):
Expand Down Expand Up @@ -101,10 +102,10 @@ def init_consum(self):
"type": "integer"})

self.consumer = self.host.spawn(self.id + "_consumer", settings.CONSUMER_MODULE,
[self.queue, self.routing_key, self.proxy])
self.queue, self.routing_key, self.proxy)
self.start_consuming()
except Exception, e:
print e
except Exception as e:
raise ValueError(e.msg)

def stop_actor(self):
"""
Expand All @@ -116,15 +117,15 @@ def stop_actor(self):
for tenant in self._observers:
for observer in self._observers[tenant].values():
observer.stop_actor()
self.redis.hset(observer.get_id(), 'alive', 'False')
self.redis.hset(observer.get_id(), 'status', 'Stopped')

self.redis.delete("metric:" + self.name)
self.stop_consuming()
self.host.stop_actor(self.id)

except Exception as e:
logger.error(str(e))
print e
raise e

def start_consuming(self):
"""
Expand Down
14 changes: 9 additions & 5 deletions api/metrics/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,18 +102,22 @@ def start_metric(actor_id):
if actor_id not in metric_actors:
logger.info("Metric, Starting workload metric actor: " + str(actor_id))
metric_actors[actor_id] = host.spawn(actor_id, settings.METRIC_MODULE,
[actor_id, "metric." + actor_id])
actor_id, "metric." + actor_id)
metric_actors[actor_id].init_consum()
except Exception:
except Exception as e:
logger.error("Metric, Error starting workload metric actor: " + str(actor_id))
raise Exception
raise e


def stop_metric(actor_id):
if actor_id in metric_actors:
logger.info("Metric, Stopping workload metric actor: " + str(actor_id))
metric_actors[actor_id].stop_actor()
del metric_actors[actor_id]
try:
metric_actors[actor_id].stop_actor()
del metric_actors[actor_id]
except Exception as e:
logger.error("Metric, Error stopping workload metric actor: " + str(actor_id))
raise e


@csrf_exempt
Expand Down
37 changes: 26 additions & 11 deletions api/policies/actors/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import redis
import requests
import os
from policies.dsl_parser import parse_condition

from api.settings import MANAGEMENT_ACCOUNT, MANAGEMENT_ADMIN_USERNAME, \
MANAGEMENT_ADMIN_PASSWORD, KEYSTONE_ADMIN_URL, REDIS_HOST, REDIS_PORT, REDIS_DATABASE
Expand All @@ -23,8 +24,8 @@ class Rule(object):
Rule actor executes an Action that it is also defined in the policy. Once
the rule executed the action, this actor is destroyed.
"""
_ask = ['get_target']
_tell = ['update', 'start_rule', 'stop_actor']
_ask = ['get_target', 'start_rule']
_tell = ['update', 'stop_actor']

def __init__(self, policy_data, controller_server):
"""
Expand Down Expand Up @@ -62,11 +63,11 @@ def __init__(self, policy_data, controller_server):
self.object_tag = policy_data['object_tag']
self.object_type = policy_data['object_type']
self.controller_server = controller_server

self.conditions = policy_data['condition']
self.condition = policy_data['condition']
self.observers_values = dict()
self.observers_proxies = dict()
self.token = None
self.applied = False

def _get_admin_token(self):
"""
Expand Down Expand Up @@ -98,19 +99,29 @@ def start_rule(self):
**check_metrics()** which subscribes the rule to all the workload
metrics necessaries.
"""
try:
self.condition_list = parse_condition(self.condition)
except:
raise ValueError("Workload Metric not started")
logger.info("Rule, Start '" + str(self.id) + "'")
logger.info('Rule, Conditions: ' + str(self.conditions))
logger.info('Rule, Conditions: ' + str(self.condition))

self.check_metrics(self.condition_list)

# Start Condition checker
# TODO: PASRE CONDITIONS STRING
def check_metrics(self, condition_list):
"""
The check_metrics method finds in the condition list all the metrics
that it needs to check the conditions, when find some metric that it
needs, call the method add_metric.
:param condition_list: The list of all the conditions.
:type condition_list: **any** List type
"""
if not isinstance(condition_list[0], list):
self._add_metric(condition_list[0].lower())
else:
for element in condition_list:
if element is not "OR" and element is not "AND":
self.check_metrics(element)
"""

def _add_metric(self, metric_name):
"""
Expand Down Expand Up @@ -150,7 +161,7 @@ def update(self, metric_name, value):
# Check the condition of the policy if all values are setted. If the
# condition result is true, it calls the method do_action
if all(val is not None for val in self.observers_values.values()):
if self._check_conditions(self.conditions):
if self._check_conditions(self.condition_list):
self._do_action()
else:
logger.error("not all values setted" + str(self.observers_values.values()))
Expand Down Expand Up @@ -190,6 +201,10 @@ def _do_action(self):
The do_action method is called after the conditions are satisfied. So
this method is responsible to execute the action defined in the policy.
"""
if self.applied:
return
else:
self.applied = True
if not self.token:
self._get_admin_token()

Expand All @@ -198,7 +213,7 @@ def _do_action(self):
if self.action == "SET":
# TODO Review if this tenant has already deployed this filter. Not deploy the same filter more than one time.

url = os.path.join(self.controller_server, 'filters', self.target_id, "deploy", str(self.filter))
url = os.path.join('http://'+self.controller_server, 'filters', self.target_id, "deploy", str(self.filter))

data = dict()

Expand All @@ -222,7 +237,7 @@ def _do_action(self):

elif self.action == "DELETE":

url = os.path.join(self.controller_server, 'filters', self.target_id, "undeploy", str(self.filter))
url = os.path.join('http://'+self.controller_server, 'filters', self.target_id, "undeploy", str(self.filter))
response = requests.put(url, headers=headers)

if 200 <= response.status_code < 300:
Expand Down
12 changes: 7 additions & 5 deletions api/policies/dsl_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,26 @@ def parse_group_tenants(tokens):
data = r.lrange(tokens[0], 0, -1)
return data


def parse_condition(input_string):

r = get_redis_connection()

metrics_workload = r.keys("metric:*")
services = map(lambda x: "".join(x.split(":")[1]), metrics_workload)
services_options = oneOf(services)
operand = oneOf("< > == != <= >=")
number = Regex(r"[+-]?\d+(:?\.\d*)?(:?[eE][+-]?\d+)?")

condition = Group(services_options + operand("operand") + number("limit_value"))
condition_list = operatorPrecedence(condition, [
("AND", 2, opAssoc.LEFT, ),
("OR", 2, opAssoc.LEFT, ),
])
rule = condition_list('condition_list')

return rule.parseString(input_string).condition_list

return rule.parseString(input_string).condition_list.asList()


def parse(input_string):
# TODO Raise an exception if not metrics or not action registered
Expand Down
95 changes: 38 additions & 57 deletions api/policies/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ def policy_list(request):

if request.method == 'PUT':
# Dynamic Policy From form
host = create_local_host()
http_host = request.META['HTTP_HOST']
data = JSONParser().parse(request)

Expand Down Expand Up @@ -158,14 +157,7 @@ def policy_list(request):
"policy_location": policy_location,
"status": 'Alive'}

if data['transient']:
rule_actors[policy_id] = host.spawn(rule_id,
settings.RULE_TRANSIENT_MODULE,
[policy_data, http_host])
else:
rule_actors[policy_id] = host.spawn(rule_id, settings.RULE_MODULE,
[policy_data, http_host])
rule_actors[policy_id].start_rule()
start_dynamic_policy_actor(policy_data, http_host)

try:
r.hmset(rule_id, policy_data)
Expand Down Expand Up @@ -317,68 +309,45 @@ def deploy_static_policy(request, r, parsed_rule):
#
# Dynamic Policies
#
def load_policies():
try:
r = get_redis_connection()
except RedisError:
return JSONResponse('Error connecting with DB', status=500)

dynamic_policies = r.keys("policy:*")

if dynamic_policies:
logger.info("Starting dynamic rules stored in redis")

host = create_local_host()
for policy in dynamic_policies:
policy_data = r.hgetall(policy)

if policy_data['alive'] == 'True':
_, rule_parsed = dsl_parser.parse(policy_data['policy_description'])
target = rule_parsed.target[0][1] # Tenant ID or tenant+container
for action_info in rule_parsed.action_list:
if action_info.transient:
logger.info("Transient rule: " + policy_data['policy_description'])
rule_actors[policy] = host.spawn(str(policy),
settings.RULE_TRANSIENT_MODULE,
[rule_parsed, action_info, target])
rule_actors[policy].start_rule()
else:
logger.info("Rule: "+policy_data['policy_description'])
rule_actors[policy] = host.spawn(str(policy),
settings.RULE_MODULE,
[rule_parsed, action_info, target])
rule_actors[policy].start_rule()


@csrf_exempt
def dynamic_policy_detail(request, policy_id):
"""
Delete a dynamic policy.
"""

http_host = request.META['HTTP_HOST']
try:
r = get_redis_connection()
except RedisError:
return JSONResponse('Error connecting with DB', status=500)

key = 'policy:' + str(policy_id)

if request.method == 'PUT':
data = JSONParser().parse(request)
try:
if data['status'] == 'Stopped':
policy_id = int(policy_id)
if policy_id in rule_actors:
rule_actors[policy_id].stop_actor()
del rule_actors[policy_id]
else:
policy_data = r.hgetall(key)
try:
start_dynamic_policy_actor(policy_data, http_host)
except Exception as e:
return JSONResponse(str(e), status=400)

r.hmset(key, data)
return JSONResponse("Data updated", status=201)
except DataError:
return JSONResponse("Error updating data", status=400)

elif request.method == 'DELETE':
create_local_host()

try:
policy_id = int(policy_id)
if policy_id in rule_actors:
rule_actors[int(policy_id)].stop_actor()
del rule_actors[int(policy_id)]
rule_actors[policy_id].stop_actor()
del rule_actors[policy_id]
except:
logger.info("Error stopping the rule actor: "+str(policy_id))

Expand All @@ -393,7 +362,6 @@ def dynamic_policy_detail(request, policy_id):


def deploy_dynamic_policy(r, rule_string, parsed_rule, http_host):
host = create_local_host()
rules_to_parse = dict()
project = None
container = None
Expand Down Expand Up @@ -483,17 +451,30 @@ def deploy_dynamic_policy(r, rule_string, parsed_rule, http_host):
"policy_location": policy_location,
"status": 'Alive'}

if action_info.transient:
rule_actors[policy_id] = host.spawn(rule_id, settings.RULE_TRANSIENT_MODULE,
[policy_data, http_host])
else:
rule_actors[policy_id] = host.spawn(rule_id, settings.RULE_MODULE,
[policy_data, http_host])
rule_actors[policy_id].start_rule()
start_dynamic_policy_actor(policy_data, http_host)

# Add policy into Redis
r.hmset('policy:' + str(policy_id), policy_data)


def start_dynamic_policy_actor(policy_data, http_host):
to_json_bools(policy_data, 'transient')
host = create_local_host()
transient = policy_data["transient"]
policy_id = int(policy_data["id"])
rule_id = 'policy:' + str(policy_id)
if transient:
rule_actors[policy_id] = host.spawn(rule_id, settings.RULE_TRANSIENT_MODULE, policy_data, http_host)
else:
rule_actors[policy_id] = host.spawn(rule_id, settings.RULE_MODULE, policy_data, http_host)
try:
rule_actors[policy_id].start_rule()
except Exception as e:
rule_actors[policy_id].stop_actor()
del rule_actors[policy_id]
raise ValueError("An error occurred starting the policy actor: "+str(e))


#
# Bandwidth SLO's
#
Expand Down

0 comments on commit 531bd0e

Please sign in to comment.