Skip to content

Commit

Permalink
do not block sin server from starting without kafka.
Browse files Browse the repository at this point in the history
  • Loading branch information
wonlay committed Dec 5, 2011
1 parent 9411588 commit 097a5fe
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 18 deletions.
24 changes: 10 additions & 14 deletions app/content_store/views.py
Expand Up @@ -16,15 +16,14 @@
from django.http import HttpResponseGone
from django.http import HttpResponseNotAllowed
from django.http import HttpResponseServerError
import kafka

from twisted.internet import task, reactor

from decorators import login_required, api_key_required
from utils import enum, generate_api_key, get_local_pub_ip
from utils import ClusterLayout
from utils.ClusterLayout import Rectangle, Label, SvgPlotter
from utils import validator
from utils import kafka_send, validator

from content_store.models import ContentStore, StoreConfig
from cluster.models import Group, Node, Membership
Expand All @@ -37,9 +36,6 @@
print "sudo easy_install ./"
sys.exit(1)

kafkaHost = settings.KAFKA_HOST
kafkaPort = int(settings.KAFKA_PORT)
kafkaProducer = kafka.KafkaProducer(kafkaHost, kafkaPort)
validators = {}

@login_required
Expand All @@ -64,8 +60,8 @@ def openStore(request,store_name):

resp.update({
'ok' : True,
'kafkaHost' : kafkaHost,
'kafkaPort' : kafkaPort,
'kafkaHost' : settings.KAFKA_HOST,
'kafkaPort' : settings.KAFKA_PORT,
})
return HttpResponse(json.dumps(resp, ensure_ascii=False, cls=DateTimeAwareJSONEncoder))

Expand Down Expand Up @@ -109,8 +105,8 @@ def newStore(request,store_name):
resp = store.to_map(True)
resp.update({
'ok' : True,
'kafkaHost' : kafkaHost,
'kafkaPort' : kafkaPort,
'kafkaHost' : settings.KAFKA_HOST,
'kafkaPort' : settings.KAFKA_PORT,
})
return HttpResponse(json.dumps(resp, ensure_ascii=False, cls=DateTimeAwareJSONEncoder))

Expand Down Expand Up @@ -521,7 +517,7 @@ def addDocs(request,store_name):
str = json.dumps(doc).encode('utf-8')
messages.append(str)
if messages:
kafkaProducer.send(messages, store.unique_name.encode('utf-8'))
kafka_send(messages, store.unique_name.encode('utf-8'))
resp = {'ok':True,'numPosted':len(messages)}
return HttpResponse(json.dumps(resp))
except ValueError:
Expand Down Expand Up @@ -577,7 +573,7 @@ def updateDoc(request,store_name):
for k,v in jsonDoc.items():
existingDoc[k]=v

kafkaProducer.send([json.dumps(existingDoc).encode('utf-8')], store.unique_name.encode('utf-8'))
kafka_send([json.dumps(existingDoc).encode('utf-8')], store.unique_name.encode('utf-8'))
resp = {'ok': True,'numPosted':1}
return HttpResponse(json.dumps(resp))
except ValueError:
Expand Down Expand Up @@ -658,8 +654,8 @@ def _fix_url(files):
'store' : store,
'index' : index,
'webapp' : webapp,
'kafka_host' : kafkaHost,
'kafka_port' : kafkaPort,
'kafka_host' : settings.KAFKA_HOST,
'kafka_port' : settings.KAFKA_PORT,
'zookeeper_url' : settings.ZOOKEEPER_URL,
})

Expand Down Expand Up @@ -827,7 +823,7 @@ def delDocs(request, store_name):
for id in ids:
delDoc = {'id':id,'isDeleted':True}
delObjs.append(json.dumps(delDoc).encode('utf-8'))
kafkaProducer.send(delObjs,store.unique_name.encode('utf-8'))
kafka_send(delObjs,store.unique_name.encode('utf-8'))
resp = {'ok': True,'numDeleted':len(delObjs)}
return HttpResponse(json.dumps(resp))
except Exception as e:
Expand Down
4 changes: 2 additions & 2 deletions app/kafka.py
Expand Up @@ -46,7 +46,7 @@ def __init__(self, host, port):
self.port = port
self.connect()

def connect(self, retry=0):
def connect(self, retry = 0):
self.connection = socket.socket()
while retry >= 0:
try:
Expand All @@ -66,7 +66,7 @@ def send(self, messages, topic, partition = 0):
except socket.error, msg:
if msg.errno == errno.EPIPE:
# disconnected, reconnecting...
self.connect(retry=3)
self.connect(retry = 3)
self.send(messages, topic, partition)

if __name__ == '__main__':
Expand Down
2 changes: 1 addition & 1 deletion app/settings.py
Expand Up @@ -45,7 +45,7 @@

KAFKA_HOST = 'localhost'

KAFKA_PORT = '9092'
KAFKA_PORT = 9092

CACHES = {
'default': {
Expand Down
13 changes: 12 additions & 1 deletion app/utils/__init__.py
@@ -1,4 +1,6 @@
import commands, base64, hashlib, random, re, socket, time
import commands, base64, hashlib, kafka, random, re, socket, time

from django.conf import settings

def totimestamp(dt):
return time.mktime(dt.timetuple()) + dt.microsecond/1e6
Expand Down Expand Up @@ -32,3 +34,12 @@ def is_current_host(host, me=None):
return True
else:
return False

kafka_producer = None
def kafka_send(*args, **kwargs):
global kafka_producer
if kafka_producer is None:
kafka_producer = kafka.KafkaProducer(settings.KAFKA_HOST, int(settings.KAFKA_PORT))

kafka_producer.send(*args, **kwargs)

0 comments on commit 097a5fe

Please sign in to comment.