Skip to content

Commit

Permalink
COntroller Actor start and stop
Browse files Browse the repository at this point in the history
  • Loading branch information
JosepSampe committed Sep 28, 2017
1 parent 4cc0b8f commit 54be2b0
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 18 deletions.
Empty file added api/api/__init__.py
Empty file.
16 changes: 10 additions & 6 deletions api/controllers/actors/abstract_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,13 @@ def _disconnect_rmq(self):
pass

def _send_message_rmq(self, routing_key, message):
self._channel.basic_publish(exchange=self.rmq_exchange,
routing_key=routing_key,
body=str(message))
self._connect_rmq()
params = dict(exchange=self.rmq_exchange, routing_key=routing_key, body=str(message))
try:
self._channel.basic_publish(**params)
except Exception as e:
logger.error(e.message)
self.__disconnect_rmq()

def _init_consum(self, queue, routing_key):
try:
Expand Down Expand Up @@ -89,7 +93,7 @@ def run(self):
Entry Method
"""
self._subscribe_metrics()
self._connect_rmq()
# self._connect_rmq()

def stop_actor(self):
"""
Expand All @@ -100,8 +104,8 @@ def stop_actor(self):
if self.metrics:
for metric in self.metrics:
metric_actor = self.host.lookup(metric)
metric_actor.detach(self.proxy, self.get_target())
self._disconnect_rmq()
metric_actor.detach(self.id, self.get_target())
# self._disconnect_rmq()
self.host.stop_actor(self.id)
except Exception as e:
logger.error(str(e.message))
12 changes: 7 additions & 5 deletions api/metrics/actors/swift_metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def attach(self, observer):
key will be the tenant assigned in the observer, and the value will be
the PyActor proxy to connect to the observer.
:param observer: The PyActor proxy of the oberver rule that calls this method.
:param observer: The PyActor proxy of the observer rule that calls this method.
:type observer: **any** PyActor Proxy type
"""

Expand All @@ -72,15 +72,17 @@ def attach(self, observer):
def detach(self, observer, target):
"""
Asynchronous method. This method allows to be called remotely.
It is called from observers in order to unsubscribe from this workload
It is called from observers in order to unsuscribe from this workload
metric.
:param observer: The PyActor actor id of the oberver rule that calls this method.
:param observer: The PyActor actor id of the observer rule that calls this method.
:type observer: String
"""
logger.info('Metric, Detaching observer: ' + str(observer))
try:
del self._observers[target][observer]
if len(self._observers[target]) == 0:
del self._observers[target]
logger.info('Metric, observer detached: ' + str(observer))
except KeyError:
pass

Expand Down Expand Up @@ -195,7 +197,7 @@ def _aggregate_and_send_info(self):
for observer in self._observers[target].values():
observer.update(self.name, aggregate[target])

if "ALL" in self._observers:
if "ALL" in self._observers and len(metric_list) > 0:
for observer in self._observers["ALL"].values():
observer.update(self.name, metric_list)

Expand Down
1 change: 1 addition & 0 deletions api/policies/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@ def deploy_dynamic_policy(r, rule_string, parsed_rule):
"policy_location": policy_location,
"alive": True})


#
# Bandwidth SLO's
#
Expand Down
20 changes: 13 additions & 7 deletions controller_samples/static_bandwidth.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,20 @@ def compute_data(self, metric_data):

for metric in metric_data:
host = metric['host']
project = metric['project']
project_id = metric['project_id']
method = metric['method']
# storage_policy = metric['storage_policy']
calculated_bw = metric['value']
bw = str(round(calculated_bw, 1))
storage_policy = metric['storage_policy']

print host, project, method, calculated_bw

#assignation = os.path.join(project, method, storage_policy, bw)
#self._send_message_rmq(host, assignation)
if project_id in bw_slos and storage_policy in bw_slos[project_id]:
bw = bw_slos[project_id][storage_policy]




#bw = str(round(calculated_bw, 1))

bw = bw_slos[project_id][storage_policy]

assignation = os.path.join(project_id, method, storage_policy, bw)
self._send_message_rmq(host, assignation)

0 comments on commit 54be2b0

Please sign in to comment.