Skip to content

Commit

Permalink
Elastic Search support
Browse files Browse the repository at this point in the history
  • Loading branch information
heynemann committed Jul 7, 2013
1 parent acaece3 commit cf65ff9
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 5 deletions.
45 changes: 45 additions & 0 deletions cow/plugins/es_plugin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import logging
from json import loads

import tornadoes
from tornado.httpclient import AsyncHTTPClient, HTTPRequest

from cow.plugins import BasePlugin


class ESPlugin(BasePlugin):
@classmethod
def after_start(cls, application, io_loop=None, *args, **kw):
host = application.config.get('ELASTIC_SEARCH_HOST')
port = application.config.get('ELASTIC_SEARCH_PORT')

logging.info("Connecting to elastic search at %s:%d" % (host, port))

application.elastic_search = tornadoes.ESConnection(host, port, io_loop=io_loop)
application.elastic_search_host = host
application.elastic_search_port = port

@classmethod
def before_end(cls, application, *args, **kw):
if hasattr(application, 'elastic_search'):
logging.info("Disconnecting from elastic search...")
del application.elastic_search

@classmethod
def before_healthcheck(cls, application, callback, *args, **kw):
url = "http://%s:%d/_cluster/health?pretty=true" % (application.elastic_search_host, application.elastic_search_port)
client = AsyncHTTPClient(application.io_loop)
request_http = HTTPRequest(url, method="GET")
client.fetch(request=request_http, callback=callback)

@classmethod
def validate(cls, result, *args, **kw):
if result.code != 200:
logging.error("Elastic Search healthcheck failed with %s" % result.body)
return False

result = loads(result.body)
return result['status'] in ['green', 'yellow']
9 changes: 6 additions & 3 deletions cow/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,15 @@ def start(self, args=None):

server.start(int(options.workers))

self.plugin_after_start()
io_loop = tornado.ioloop.IOLoop.instance()
self.application.io_loop = io_loop

self.plugin_after_start(io_loop=io_loop)

logging.info('-- %s started listening in %s:%d --' % (server_name, options.bind, options.port))
tornado.ioloop.IOLoop.instance().start()
io_loop.start()
except KeyboardInterrupt:
self.plugin_before_end()
self.plugin_before_end(io_loop=io_loop)

logging.info('')
logging.info('-- %s closed by user interruption --' % server_name)
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
'ipdb',
'coveralls',
'motor',
'brukva'
'toredis'
]

setup(
Expand Down
3 changes: 3 additions & 0 deletions tests/sandbox/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,6 @@
Config.define('REDISPORT', 7780, "Database configuration", "section")
Config.define('REDISDB', 0, "Database Configuration", "section")
Config.define('REDISPASS', None, "Database Configuration", "section")

Config.define('ELASTIC_SEARCH_HOST', 'localhost', "Search configuration", "section")
Config.define('ELASTIC_SEARCH_PORT', 9200, "Search configuration", "section")
4 changes: 3 additions & 1 deletion tests/sandbox/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from cow.server import Server
from cow.plugins.motor_plugin import MotorPlugin
from cow.plugins.redis_plugin import RedisPlugin
from cow.plugins.es_plugin import ESPlugin

from tests.sandbox.handlers.test import TestHandler

Expand All @@ -17,7 +18,8 @@ def get_handlers(self):
def get_plugins(self):
return [
MotorPlugin,
RedisPlugin
RedisPlugin,
ESPlugin
]

if __name__ == '__main__':
Expand Down

0 comments on commit cf65ff9

Please sign in to comment.