Skip to content

Commit

Permalink
Controller for static bw.
Browse files Browse the repository at this point in the history
  • Loading branch information
JosepSampe committed Sep 29, 2017
1 parent 54be2b0 commit 8551e3c
Show file tree
Hide file tree
Showing 8 changed files with 140 additions and 80 deletions.
1 change: 1 addition & 0 deletions api/api/actors/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
import pika

logging.getLogger("pika").propagate = False
logger = logging.getLogger(__name__)


Expand Down
59 changes: 52 additions & 7 deletions api/api/common.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,6 @@
import calendar
import logging
import os
import sys
import time

from keystoneauth1.identity import v3
from keystoneauth1 import session
from keystoneclient.v3 import client
import redis
from django.conf import settings
from django.core.management.color import color_style
from django.http import HttpResponse
Expand All @@ -16,6 +9,15 @@
from pyactor.context import set_context, create_host
from swiftclient import client as swift_client

import errno
import hashlib
import calendar
import logging
import redis
import os
import sys
import time

logger = logging.getLogger(__name__)
host = None
NODE_STATUS_THRESHOLD = 15 # seconds
Expand Down Expand Up @@ -205,3 +207,46 @@ def create_local_host():
pass

return host


def make_sure_path_exists(path):
try:
os.makedirs(path)
except OSError as exception:
if exception.errno != errno.EEXIST:
raise


def save_file(file_, path=''):
"""
Helper to save a file
"""
filename = file_.name
file_path = os.path.join(path, filename)
if os.path.isfile(file_path):
os.remove(file_path)
fd = open(str(path) + "/" + str(filename), 'wb')
for chunk in file_.chunks():
fd.write(chunk)
fd.close()
return str(path) + "/" + str(filename)


def delete_file(filename, path):
"""
Helper to save a file
"""
file_path = os.path.join(path, filename)
if os.path.isfile(file_path):
os.remove(file_path)
pyc = file_path+"c"
if os.path.isfile(pyc):
os.remove(pyc)


def md5(fname):
hash_md5 = hashlib.md5()
with open(fname, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
3 changes: 0 additions & 3 deletions api/api/startup.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@ def run():
are stopped, so we need to ensure the correct values in redis.
"""

# Add source directories to sys path
sys.path.insert(0, settings.CONTROLLERS_DIR)

r = redis.Redis(connection_pool=settings.REDIS_CON_POOL)

# Workload metric definitions
Expand Down
29 changes: 16 additions & 13 deletions api/controllers/actors/abstract_controller.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from redis.exceptions import RedisError
from pyactor.exceptions import NotFoundError
from django.conf import settings
import logging
import Queue
import pika
import redis

logging.getLogger("pika").propagate = False
logger = logging.getLogger(__name__)


Expand All @@ -13,8 +15,6 @@ class AbstractController(object):
_ask = ['get_target']
_tell = ['update', 'run', 'stop_actor', 'notify']

metrics = []

def __init__(self):
self.rmq_user = settings.RABBITMQ_USERNAME
self.rmq_pass = settings.RABBITMQ_PASSWORD
Expand All @@ -28,15 +28,19 @@ def __init__(self):
try:
self.redis = redis.Redis(connection_pool=settings.REDIS_CON_POOL)
except RedisError:
logger.info('"Error connecting with Redis DB"')
logger.error('"Error connecting with Redis DB"')

self.metrics = dict()
self.metric_data = Queue.Queue()
self.rmq_messages = Queue.Queue()

def _subscribe_metrics(self):
for metric in self.metrics:
metric_actor = self.host.lookup(metric)
metric_actor.attach(self.proxy)
try:
for metric in self.metrics:
metric_actor = self.host.lookup(metric)
metric_actor.attach(self.proxy)
except NotFoundError as e:
logger.error(e)

def _connect_rmq(self):
parameters = pika.ConnectionParameters(host=self.rmq_host,
Expand All @@ -58,7 +62,7 @@ def _send_message_rmq(self, routing_key, message):
self._channel.basic_publish(**params)
except Exception as e:
logger.error(e.message)
self.__disconnect_rmq()
self._disconnect_rmq()

def _init_consum(self, queue, routing_key):
try:
Expand All @@ -71,7 +75,7 @@ def _init_consum(self, queue, routing_key):
def notify(self, body):
"""
Method called from the consumer to indicate the value consumed from the
rabbitmq queue. After receive the value, this value is communicated to
RabbitMQ queue. After receive the value, this value is communicated to
all the observers subscribed to this metric.
"""
self.rmq_messages.put(body)
Expand All @@ -84,7 +88,7 @@ def get_target(self):

def update(self, metric_name, metric_data):
"""
Method called from the Swift Metric to indicate the new metric dada
Method called from the Swift Metric to indicate the new metric data
"""
self.compute_data(metric_data)

Expand All @@ -105,7 +109,6 @@ def stop_actor(self):
for metric in self.metrics:
metric_actor = self.host.lookup(metric)
metric_actor.detach(self.id, self.get_target())
# self._disconnect_rmq()
self.host.stop_actor(self.id)
except Exception as e:
logger.error(str(e.message))
except NotFoundError as e:
logger.error(e)
self.host.stop_actor(self.id)
33 changes: 21 additions & 12 deletions api/controllers/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@
import os

from api.common import to_json_bools, JSONResponse, get_redis_connection, \
create_local_host, controller_actors
from filters.views import save_file, make_sure_path_exists
create_local_host, controller_actors, make_sure_path_exists, save_file, delete_file

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -76,10 +75,18 @@ def controller_detail(request, controller_id):
return JSONResponse("Data updated", status=status.HTTP_201_CREATED)
except DataError:
return JSONResponse("Error updating data", status=status.HTTP_400_BAD_REQUEST)
except ValueError:
return JSONResponse("Error starting controller", status=status.HTTP_400_BAD_REQUEST)

elif request.method == 'DELETE':
stop_controller(controller_id)
r.delete("controller:" + str(controller_id))
try:
controller = r.hgetall('controller:' + str(controller_id))
delete_file(controller['controller_name'], settings.CONTROLLERS_DIR)
r.delete("controller:" + str(controller_id))
except:
return JSONResponse("Error deleting controller", status=status.HTTP_400_BAD_REQUEST)

# If this is the last controller, the counter is reset
keys = r.keys('controller:*')
if not keys:
Expand Down Expand Up @@ -107,9 +114,9 @@ def post(self, request):
return JSONResponse("Invalid format or empty request", status=status.HTTP_400_BAD_REQUEST)

controller_id = r.incr("controllers:id")

try:
data['id'] = controller_id

file_obj = request.FILES['file']

make_sure_path_exists(settings.CONTROLLERS_DIR)
Expand All @@ -126,8 +133,9 @@ def post(self, request):

except DataError:
return JSONResponse("Error to save the object", status=status.HTTP_400_BAD_REQUEST)
except Exception as e:
print e
except ValueError:
return JSONResponse("Error starting/stoping controller", status=status.HTTP_400_BAD_REQUEST)
except Exception:
return JSONResponse("Error uploading file", status=status.HTTP_400_BAD_REQUEST)

def get(self, request, controller_id):
Expand Down Expand Up @@ -158,21 +166,22 @@ def get(self, request, controller_id):

def start_controller(controller_id, controller_name, controller_class_name):
host = create_local_host()

controller_location = os.path.join(controller_name, controller_class_name)
try:
if controller_id not in controller_actors:
controller_actors[controller_id] = host.spawn(controller_name, controller_location)
controller_actors[controller_id].run()
logger.info("Controller, Started controller actor: "+controller_location)
except Exception as e:
logger.error(e.message)
except:
raise ValueError


def stop_controller(controller_id):
if controller_id in controller_actors:
try:
controller_actors[controller_id].stop_actor()
except Exception as e:
print e.message
del controller_actors[controller_id]
logger.info("Controller, Stopped controller actor: " + str(controller_id))
del controller_actors[controller_id]
logger.info("Controller, Stopped controller actor: " + str(controller_id))
except:
raise ValueError
33 changes: 2 additions & 31 deletions api/filters/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,13 @@
from swiftclient import client as swift_client
from swiftclient.exceptions import ClientException
from operator import itemgetter
import errno
import hashlib
import json
import logging
import mimetypes
import os

from api.common import rsync_dir_with_nodes, to_json_bools, JSONResponse, get_redis_connection, get_token_connection
from api.common import rsync_dir_with_nodes, to_json_bools, JSONResponse, \
get_redis_connection, get_token_connection, make_sure_path_exists, save_file, md5
from api.exceptions import SwiftClientError, StorletNotFoundException, FileSynchronizationException

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -541,34 +540,6 @@ def unset_filter(r, target, filter_data, token):
r.hdel("pipeline:" + str(target), key)


def make_sure_path_exists(path):
try:
os.makedirs(path)
except OSError as exception:
if exception.errno != errno.EEXIST:
raise


def save_file(file_, path=''):
"""
Little helper to save a file
"""
filename = file_.name
fd = open(str(path) + "/" + str(filename), 'wb')
for chunk in file_.chunks():
fd.write(chunk)
fd.close()
return str(path) + "/" + str(filename)


def md5(fname):
hash_md5 = hashlib.md5()
with open(fname, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()


#
# DSL Mappings
#
Expand Down
3 changes: 1 addition & 2 deletions api/metrics/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@
import os

from api.common import to_json_bools, JSONResponse, get_redis_connection, \
rsync_dir_with_nodes, create_local_host, metric_actors
rsync_dir_with_nodes, create_local_host, metric_actors, make_sure_path_exists, save_file

from api.exceptions import FileSynchronizationException
from filters.views import save_file, make_sure_path_exists


logger = logging.getLogger(__name__)
Expand Down

0 comments on commit 8551e3c

Please sign in to comment.