Skip to content

Commit

Permalink
Fixing #5
Browse files Browse the repository at this point in the history
* Made the reconnection a little smarter
* Converted all print statements to log and set up some (very) basic logging.
  • Loading branch information
Ben Belchak committed Oct 4, 2011
1 parent addeb8b commit e9976d9
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 34 deletions.
52 changes: 30 additions & 22 deletions campy/campy.py
Expand Up @@ -22,23 +22,24 @@
#
import threading
import re
import sys
import signal
import time
from twisted.internet.defer import Deferred
import streaming
import Queue
import logging

import settings
from pinder.campfire import Campfire
from twisted.internet import reactor

log = logging.getLogger(__name__)

class PluginThread(threading.Thread):
def __init__(self, plugins, client):
def __init__(self, campy):
threading.Thread.__init__(self)
self.plugins = plugins
self.client = client
self.plugins = campy.plugins
self.client = campy.client
self.kill_received = False
self.log = logging.getLogger(__name__)

def run(self):
while not self.kill_received:
Expand All @@ -48,7 +49,7 @@ def run(self):
time.sleep(1)
continue

print "%s: Handling %s" % (self.getName(), message)
log.debug("%s: Handling %s", self.getName(), message)
if message:
for plugin in self.plugins:
try:
Expand All @@ -68,18 +69,19 @@ def run(self):
message, speaker)
except Exception:
pass
print "%s shutting down!" % self.getName()
log.info("%s shutting down!", self.getName())


class CampyStreamConsumer(streaming.MessageReceiver):
def __init__(self, plugins, cf_client):
self.plugins = plugins
self.client = cf_client
def __init__(self, campy):
self.plugins = campy.plugins
self.client = campy.client
super(CampyStreamConsumer, self).__init__()

def connectionFailed(self, why):
print "Couldn't connect:", why
log.fatal("Couldn't connect: %s", why)
reactor.stop()
campy.die()

def messageReceived(self, message):
message_pool.put(message)
Expand All @@ -101,7 +103,7 @@ def __init__(self):
self.plugins.append(plugin_obj())

for room in settings.CAMPFIRE_ROOMS:
print "Joining %s" % room
log.debug("Joining %s" % room)
room = self.client.find_room_by_name(room)
if room:
self.rooms.append(room)
Expand All @@ -111,8 +113,8 @@ def __init__(self):
def listen(self):
for room in self.rooms:
username, password = room._connector.get_credentials()
streaming.start(username, password, room.id,
CampyStreamConsumer(self.plugins, self.client))
streaming.start(username, password, room,
CampyStreamConsumer(self))
reactor.addSystemEventTrigger('before', 'shutdown', self.die)
reactor.run()

Expand All @@ -123,6 +125,14 @@ def die(self):
room.speak("Goodbye!")
if settings.LEAVE_ON_EXIT:
room.leave()
kill_threads(get_threads())

def get_threads():
return [t.join(1) for t in threads if t is not None and t.isAlive()]

def kill_threads(threads):
for t in threads:
t.kill_received = True


if __name__ == "__main__":
Expand All @@ -131,7 +141,7 @@ def die(self):
threads = []
campy_started = False
for x in xrange(settings.NUM_THREADS):
thread = PluginThread(campy.plugins, campy.client)
thread = PluginThread(campy)
threads.append(thread)
thread.daemon = True
# I had to add this sleep statement here to get the
Expand All @@ -141,14 +151,12 @@ def die(self):

while len(threads) > 0:
try:
threads = [t.join(1) for t in threads if t is not None and t.isAlive()]
threads = get_threads()
if not campy_started:
campy_started = True
print "Campy has started!"
log.info("Campy has started!")
campy.listen()
print "Shutting down..."
log.info("Shutting down...")
except KeyboardInterrupt:
print threads
for t in threads:
t.kill_received = True
kill_threads(threads)

2 changes: 1 addition & 1 deletion campy/plugins/pivotal_tracker.py
Expand Up @@ -97,7 +97,7 @@ def handle_message(self, campfire, room, message, speaker):
story.GetStoryId()))
return

m = re.match("%s: pt (?P<command>start|tell) next (?P<story_type>bug|feature|chore) (?P<mine>mine)?" %
m = re.match("%s: pt (?P<command>start|tell) next (?P<story_type>bug|feature|chore)(\s+)?(?P<mine>mine)?" %
settings.CAMPFIRE_BOT_NAME, body)
if m:
filter = "type:%s state:unstarted" % m.group('story_type')
Expand Down
17 changes: 13 additions & 4 deletions campy/settings.py
Expand Up @@ -20,6 +20,12 @@
# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#
import os
import sys
import logging

log = logging.getLogger(__name__)

CAMPFIRE_SUBDOMAIN = '' # Subdomain you use for campfire
CAMPFIRE_BOT_NAME = 'r' # Campfire name of the bot that matches the API_KEY
CAMPFIRE_API_KEY = '' # Campfire API key for your bot's user
Expand All @@ -44,23 +50,26 @@

ZERO_CATER_URL = "http://www.zerocater.com/seatme" #default URL

LOGLEVEL = logging.DEBUG

try:
import simplejson as json
except ImportError:
import json


import os
import sys

# Load settings from the local_settings.py file
try:
from local_settings import *
except ImportError:
pass

# Set up logging
log_format = "%(levelname)s %(asctime)s %(funcName)s %(lineno)d %(message)s"
logging.basicConfig(level=logging.DEBUG, format=log_format)

def load_from_file(filename):
print "Loading settings from %s" % filename
log.debug("Loading settings from %s" % filename)
if os.path.exists(filename):
with open(filename) as f:
attrs = json.loads(f.read())
Expand Down
56 changes: 49 additions & 7 deletions campy/streaming.py
@@ -1,11 +1,36 @@
#
# Copyright (c) 2011 Ben Belchak <ben@belchak.com>
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#
import base64
from twisted.internet import reactor, protocol
from twisted.internet import reactor, protocol, ssl
from twisted.protocols import basic
import logging
try:
import json
except ImportError:
import simplejson as json

log = logging.getLogger(__name__)

class StreamingParser(basic.LineReceiver):
delimiter = '\r'

Expand Down Expand Up @@ -39,6 +64,9 @@ def lineReceived(self, line):
try:
if line:
message = json.loads(line)
if message['type'] == 'KickMessage':
self.transport.loseConnection()
self.factory.connector.connect()
except:
pass
else:
Expand Down Expand Up @@ -70,20 +98,34 @@ def disconnect(self):

class StreamFactory(protocol.ReconnectingClientFactory):
maxDelay = 120
initialDelay = 3
protocol = StreamingParser
noisy = True

def __init__(self, consumer):
def __init__(self, consumer, room):
self.room = room
if isinstance(consumer, MessageReceiver):
self.consumer = consumer
else:
raise TypeError("consumer should be an instance of MessageReceiver")

def startedConnecting(self, connector):
log.debug("Started connecting to room %s." % self.room.name)

def clientConnectionLost(self, connector, reason):
log.debug("Lost connection. Reason: %s", reason)
protocol.ReconnectingClientFactory.clientConnectionLost(self, connector, reason)

def clientConnectionFailed(self, connector, reason):
log.debug("Connection failed. Reason: %s", reason)
protocol.ReconnectingClientFactory.clientConnectionFailed(self, connector, reason)

def make_header(self, username, password, method, uri, postdata=""):
auth_header = 'Basic ' + base64.b64encode("%s:%s" % (username, password)).strip()
header = [
"%s %s HTTP/1.1" % (method, uri),
"Authorization: %s" % auth_header,
"User-Agent: campy bot",
"User-Agent: campy bot (https://github.com/bbelchak/campy/)",
"Host: streaming.campfirenow.com",
]

Expand All @@ -96,7 +138,7 @@ def make_header(self, username, password, method, uri, postdata=""):
]
self.header = "\r\n".join(header) + "\r\n\r\n" + postdata

def start(username, password, room_id, consumer):
client = StreamFactory(consumer)
client.make_header(username, password, "GET", "/room/%s/live.json" % room_id)
reactor.connectTCP("streaming.campfirenow.com", 80, client)
def start(username, password, room, consumer):
client = StreamFactory(consumer, room)
client.make_header(username, password, "GET", "/room/%s/live.json" % room.id)
reactor.connectSSL("streaming.campfirenow.com", 443, client, ssl.ClientContextFactory())

0 comments on commit e9976d9

Please sign in to comment.