Skip to content
This repository has been archived by the owner on Jan 14, 2020. It is now read-only.

Commit

Permalink
Add documentation to cluster.py
Browse files Browse the repository at this point in the history
Added documentation and improved some of the Kafka methods
  • Loading branch information
ZacBlanco committed Aug 15, 2016
1 parent f8dd921 commit 97fafa1
Showing 1 changed file with 177 additions and 6 deletions.
183 changes: 177 additions & 6 deletions demo_app/cluster.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
'''This file houses utility functions for interacting with the cluster and portions of the web application. Namely the Websockets server and Echo app implementation reside here.
'''
# First thing to do...
# Import the demo_utils module :)
import sys, os
Expand All @@ -11,7 +15,6 @@
from demo_utils.shell import Shell
from demo_utils.ambari import Ambari
from demo_utils import config, generator, logs
#from geventwebsocket import WebSocketServer, WebSocketApplication, Resource
from gevent import monkey; monkey.patch_all()
from ws4py.websocket import WebSocket
from ws4py.server.geventserver import WSGIServer, WebSocketWSGIHandler, WSGIServer
Expand All @@ -22,11 +25,36 @@
logger = logs.Logger('CLUSTER.py').getLogger()

def kerberize():
'''Kerberize the cluster using a script. Untested. Can take 10-15 minutes.
This utilizes a script found at https://github.com/crazyadmins/useful-scripts/tree/master/ambari
If you're running this script on a cluster you should look in ``configuration/kerberos/ambari.props`` to make sure the proper values are present in the file or else the script will fail.
Args:
N/A
Returns:
N/A
'''
script = config.get_path('kerberos/setup_kerberos.sh')
sh = Shell()
sh.run('bash ' + script)

def create_demo_kafka_topic():
'''Creates a kafka topic for the demo if it doesn't already exist.
The caveat here in using this is that Kafka must be installed on the same machine as the demo, and thus the same machine as Ambari as well. The function will try to start the Kafka service through Ambari and then once the service is started is will use the location of the Kafka topics script to create the topic
The name for the topic is specified in ``global.conf``.
Args:
N/A
Returns:
bool: True if the creation is successful. False otherwise.
'''
conf = config.read_config('global.conf')
am_conf = conf['AMBARI']
amc = Ambari(am_conf['username'], am_conf['password'], am_conf['proto'], am_conf['server'], am_conf['port']);
Expand All @@ -37,16 +65,28 @@ def create_demo_kafka_topic():
sh = Shell()
topics_script = conf['DEMO']['kafka_topics_script']
zk = conf['DEMO']['zk_connection']
topic_name = conf['DEMO']['topic_name']
topic_name = conf['DEMO']['kafka_topic_name']
logger.info('Attempting to create new Kafka Topic')
out = sh.run(topics_script + ' --create --zookeeper ' + zk + ' --replication-factor 1 --partitions 1 --topic ' + topic_name)

logger.debug(str(out))
if len(out[1]) == 0:
return True
else:
return False

def get_kafka_topics():
'''List the kafka topics on the current installation.
Requires that Kafka is installed on the same machine and Ambari is up and running. Will start the service and use the Kafka scripts to list out all of the topics.
Args:
N/A
Returns:
list: [0] will contain the list of all the topics in a string, typically separated by newlines. [1] will contain any errors when retrieving the topics.
'''
conf = config.read_config('global.conf')
am_conf = conf['AMBARI']
amc = Ambari(am_conf['username'], am_conf['password'], am_conf['proto'], am_conf['server'], am_conf['port']);
Expand All @@ -60,9 +100,13 @@ def get_kafka_topics():
logger.info('Attempting to create new Kafka Topic')
out = sh.run(topics_script + ' --list --zookeeper ' + zk)

return out

return ['', 'Could not get start Kafka Broker']
if len(out[1]) == 0:
topics = out[0]
topics = topics.strip().split('\n')
logger.info('Kafka topics output: ' + str(topics))
return topics

return ['', 'Unable to get topics. Could not start Kafka Broker']


class ThreadedGenerator(threading.Thread):
Expand Down Expand Up @@ -136,14 +180,49 @@ def __init__(self, schema, bps, outputs, http_data_pool_size=1000):
self.exports['HTTP'] = False

def export_kafka(self, data):
'''Export a message to Kafka
This function utlizes the kafka-python library It also reads in the broker URL and topic name from ``global.conf``
Args:
data (dict/object): An object to encode from the generator to send to Kafka.
Returns:
N/A
'''
self.kafka_producer.send(self.kafka_topic, json.dumps(data).encode('utf-8'))

def export_file(self, data):
'''Write out data from the generator to a file **in CSV format**
The file to write to is found in ``global.conf``. Header lines are not written to the file. All data is appended to a single file. There is no rotation.
When a new data generator starts the file is essentially 'wiped out' so make sure to copy the data elsewhere before stopping/restarting the generator.
Args:
data (dict/object): The data from the generator here writes out the data as a CSV for easier ingestion into other places like Hive or Spark.
Returns:
N/A
'''
with open(self.export_filename, 'a') as ex_data:
line = ', '.join(map(lambda v: str(data[v]), data.keys())) + '\n'
ex_data.write(line)

def export_http(self, data):
'''Export data and POST to an Http endpoint.
Data is 'pooled' before being sent in order to save resources and overhead on requests. The default pool value is 1000 records. This means for every 1000 pieces of data, one request will be made. The data is stored as JSON in the body of the request.
The caveat here is that if you stop the data generator the remaining data in the pool will not be sent.
Args:
data (dict/object): A piece of data to POST. If the data is still below the pool size we add the data in to the data 'pool' and wait for more data to come in. When the threshold is reached a request with all of the data is sent.
Returns:
N/A
'''
if len(self.http_data_pool) >= self.http_data_pool_size:
logger.info('POSTing Data Pool')
logger.info('Data Pool Size: ' + str(len(self.http_data_pool)))
Expand All @@ -154,6 +233,11 @@ def export_http(self, data):
self.http_data_pool.append(data)

def run(self):
'''Run method for the thread implementation. Runs until the thread is killed.
Args:
N/A
'''
while self.flag:
bytes = 0
lines = 0
Expand All @@ -175,33 +259,80 @@ def run(self):
time.sleep(0.1)

def stop(self):
'''Stops the generator by setting the flag to ``False``. This causes the ``run`` method to exit and the thread to finish.
Args:
N/A
'''
self.flag = False


class WSEcho(WebSocket):
'''The WebSocket handler for the WebSocket application
This class defines three methods required for the websocket server.
The three methods are
- opened
- closed
- received_message
'''
app_name = 'WebsocketApplication'
def opened(self):
'''Defines behavior for when a new client connects to the server
In this case we add the client to our list of clients so we know who we can send messages too.
The clients are accessed through ``environ``.
'''
self.log = logs.Logger(WSEcho.app_name).getLogger()
app = self.environ['ws4py.app']
if not self in app.clients:
app.clients.append(self)
self.log.info('websocket app: client connected')

self.log.debug('Clients: ' + str(app.clients))

def closed(self, code, reason=None):
'''Defines behavior for when a client disconnects from the websocket server
In this case when a client disconnects we check and remove them from the client list.
'''
self.log = logs.Logger(WSEcho.app_name).getLogger()
app = self.environ.pop('ws4py.app')

if self in app.clients:
self.log.info('websocket app: client disconnected')
app.clients.remove(self)

self.log.debug('Clients: ' + str(app.clients))

def received_message(self, message):
'''Defines behavior for when a client sends a message to the server
In this case we don't expect the clients to send us data so we just log the message.
'''
self.log = logs.Logger(WSEcho.app_name).getLogger()
self.log.info('websocket app: Message: ' + str(message))



class WSDemoApp(object):
'''This is the Websocket Demo Application.
Here we define the initialization of the server. We also determine how to handle a request to the websocket server. In this case we don't have different routes on the websocket server other than the root path ``/``.
Here we also define other functions such as ``broadcast`` for when we want to send clients information
Args:
N/A
Returns:
N/A
'''

def __init__(self):
self.log = logs.Logger('WebSocketHandler').getLogger()
Expand All @@ -210,11 +341,27 @@ def __init__(self):
self.clients = []

def __call__(self, environ, start_response):
'''Defines behavior when a new request is received
We ignore any requests to anything that isn't the root path ``/`` and initiate the response handling in here.
This method shouldn't be modified unless you know what you're doing.
'''
if environ['PATH_INFO'] == '/':
environ['ws4py.app'] = self
return self.ws(environ, start_response)

def broadcast(self, message):
'''Broadcast a message to all server clients.
Args:
message (str): The string to broadcast to every client
Returns:
N/A
'''
for client in self.clients:
try:
client.send(message)
Expand All @@ -225,6 +372,12 @@ def broadcast(self, message):


class WSDemoServer(threading.Thread):
'''A threaded wrapper around the websockets server so that we can run the Flask and Websockets in parallel together.
Args:
port (int): The port number for the Websockets server to run on.
'''

def __init__(self, port):
threading.Thread.__init__(self)
Expand All @@ -236,13 +389,31 @@ def __init__(self, port):
pass

def run(self):
'''Runs the threaded server
Args:
N/A
Returns:
N/A
'''
self.log.info('Starting websockets server')
self.server.serve_forever()

def broadcast(self, data):
'''A wrapper on the server's broadcast method so that it can be easily accessed from the flask application
Args:
data (str): A string message to send to the client.
Returns:
N/A
'''
self.server.application.broadcast(data)

def stop(self):
'''Stops the websockets server'''
self.server.stop()


Expand Down

0 comments on commit 97fafa1

Please sign in to comment.