Permalink
Switch branches/tags
Nothing to show
Find file
Fetching contributors…
Cannot retrieve contributors at this time
307 lines (254 sloc) 11.4 KB
# Copyright (c) 2008-2010, Michael Gorven, Stefano Rivera
# Released under terms of the MIT/X/Expat Licence. See COPYING for details.
import re
from datetime import datetime, timedelta
from random import choice
import logging
import ibid
from ibid.config import IntOption, ListOption, DictOption
from ibid.plugins import Processor, handler
from ibid.plugins.identity import identify
class Addressed(Processor):
priority = -1500
addressed = False
names = ListOption('names', 'Names to respond to', [ibid.config['botname']])
verbs = ListOption('verbs', u'Verbs to ignore', ('is', 'has', 'was', 'might', 'may', 'would', 'will', "isn't", "hasn't", "wasn't", "wouldn't", "won't", 'can', "can't", 'did', "didn't", 'said', 'says', 'should', "shouldn't", 'does', "doesn't"))
def setup(self):
names = '|'.join(re.escape(x) for x in self.names)
verbs = '|'.join(re.escape(x) for x in self.verbs)
self.patterns = [
re.compile(r'^\s*(?P<nick>%s)' % names
+ r'(?:\s*[:;.?>!,-]+\s+|\s+|\s*[,:]\s*)(?P<body>.*)',
re.I | re.DOTALL),
# "hello there, bot"-style addressing. But we want to be sure that
# there wasn't normal addressing too:
re.compile(r'^(?:\S+:.*|(?P<body>.*),\s*(?P<nick>%s))[\s?!.]*$' % names,
re.I | re.DOTALL)
]
self.verb_pattern = re.compile(r'^(?:%s)\s+(?:%s)\s+' % (names, verbs),
re.I | re.DOTALL)
@handler
def handle_addressed(self, event):
if 'addressed' not in event:
event.addressed = False
if self.verb_pattern.match(event.message['stripped']):
return
for pattern in self.patterns:
matches = pattern.search(event.message['stripped'])
if matches and matches.group('nick'):
new_message = matches.group('body')
event.addressed = matches.group('nick')
event.message['clean'] = new_message
event.message['deaddressed'] = \
pattern.search(event.message['raw']).group('body')
class Strip(Processor):
priority = -1600
addressed = False
event_types = (u'message', u'action', u'notice')
pattern = re.compile(r'^\s*(.*?)\s*[?!.]*\s*$', re.DOTALL)
@handler
def handle_strip(self, event):
if isinstance(event.message, basestring):
event.message = {'raw': event.message, 'deaddressed': event.message,}
event.message['clean'] = event.message['stripped'] \
= self.pattern.search(event.message['raw']).group(1)
class Ignore(Processor):
priority = -1500
addressed = False
event_types = (u'message', u'action', u'notice', u'invite')
nicks = ListOption('ignore', 'List of nicks to ignore', [])
@handler
def handle_ignore(self, event):
for who in self.nicks:
if event.sender['nick'] == who:
event.processed = True
class IgnorePublic(Processor):
priority = -1490
@handler
def ignore_public(self, event):
if event.public and not ibid.auth.authorise(event, u'publicresponse'):
event.addresponse(
u"Sorry, I'm not allowed to talk to you in public. "
'Ask me by private message.'
)
class Address(Processor):
priority = 1600
processed = True
addressed = False
event_types = (u'message', u'action', u'notice', u'state', u'invite')
acknowledgements = ListOption('acknowledgements', 'Responses for positive acknowledgements',
(u'Okay', u'Sure', u'Done', u'Righto', u'Alrighty', u'Yessir'))
refusals = ListOption('refusals', 'Responses for negative acknowledgements',
(u'No', u"I won't", u"Shan't", u"I'm sorry, but I can't do that"))
@handler
def address(self, event):
for response in event.responses:
if isinstance(response['reply'], bool):
if response:
response['reply'] = choice(self.acknowledgements)
else:
response['reply'] = choice(self.refusals)
if (response.get('address', False)
and not response.get('action', False)
and not response.get('notice', False)
and event.public):
response['reply'] = ('%s: %s' % (
event.sender['nick'], response['reply']))
class Timestamp(Processor):
priority = -1900
def process(self, event):
event.time = datetime.utcnow()
class Complain(Processor):
priority = 950
processed = True
event_types = (u'message', u'action', u'invite')
complaints = DictOption('complaints', 'Complaint responses', {
'nonsense': (
u'Huh?', u'Sorry...',
u'Excuse me?', u'*blink*', u'What?',
),
'notauthed': (
u"I'm not your bitch", u"Just do it yourself",
u"I'm not going to listen to you", u"You're not the boss of me",
),
'exception': (
u"I'm not feeling too well", u"That didn't go down very well. Burp.",
u"That didn't seem to agree with me",
),
'network': (
u'The tubes are clogged!', u"I can't reach that site",
u"That site seems to be down",
),
})
@handler
def complain(self, event):
if 'complain' in event and not event.responses:
event.addresponse(choice(self.complaints[event.complain]))
elif event.processed:
return
else:
event.addresponse(choice(self.complaints['nonsense']))
class RateLimit(Processor):
priority = -1000
event_types = (u'message', u'action', u'notice')
limit_time = IntOption('limit_time', 'Time period over which to measure messages', 10)
limit_messages = IntOption('limit_messages', 'Number of messages to allow during the time period', 5)
messages = {}
@handler
def ratelimit(self, event):
if event.identity not in self.messages:
self.messages[event.identity] = [event.time]
else:
self.messages[event.identity].append(event.time)
self.messages[event.identity] = filter(
lambda x: event.time - x < timedelta(seconds=self.limit_time),
self.messages[event.identity])
if len(self.messages[event.identity]) > self.limit_messages:
if event.public:
event.addresponse(u'Geez, give me some time to think!', address=False)
else:
event.processed = True
class Format(Processor):
priority = 2000
def _truncate(self, line, length):
if length is not None:
eline = line.encode('utf-8')
if len(eline) > length:
# horizontal ellipsis = 3 utf-8 bytes
return eline[:length-3].decode('utf-8', 'ignore') \
+ u'\N{horizontal ellipsis}'
return line
def process(self, event):
filtered = []
for response in event.responses:
source = response['source'].lower()
supports = ibid.sources[source].supports
maxlen = ibid.sources[source].truncation_point(response, event)
if response.get('action', False) and 'action' not in supports:
response['reply'] = u'*%s*' % response['reply']
conflate = response.get('conflate', True)
# Expand response into multiple single-line responses:
if (not conflate and 'multiline' not in supports):
for line in response['reply'].split('\n'):
r = {'reply': self._truncate(line, maxlen)}
for k in response.iterkeys():
if k not in ('reply'):
r[k] = response[k]
filtered.append(r)
# Expand response into multiple multi-line responses:
elif (not conflate and 'multiline' in supports
and maxlen is not None):
message = response['reply']
while len(message.encode('utf-8')) > maxlen:
splitpoint = len(message.encode('utf-8')[:maxlen] \
.decode('utf-8', 'ignore'))
parts = [message[:splitpoint].rstrip(),
message[splitpoint:].lstrip()]
for sep in u'\n.;:, ':
if sep in u'\n ':
search = message[:splitpoint+1]
else:
search = message[:splitpoint]
if sep in search:
splitpoint = search.rindex(sep)
parts = [message[:splitpoint+1].rstrip(),
message[splitpoint+1:]]
break
r = {'reply': parts[0]}
for k in response.iterkeys():
if k not in ('reply'):
r[k] = response[k]
filtered.append(r)
message = parts[1]
response['reply'] = message
filtered.append(response)
else:
line = response['reply']
# Remove any characters that make no sense on IRC-like sources:
if 'multiline' not in supports:
line = line.expandtabs(1) \
.replace('\n', conflate == True
and u' ' or conflate or u'')
response['reply'] = self._truncate(line, maxlen)
filtered.append(response)
event.responses = filtered
class UnicodeWarning(Processor):
priority = 1950
def setup(self):
self.log = logging.getLogger('plugins.unicode')
def process(self, object):
if isinstance(object, dict):
for value in object.values():
self.process(value)
elif isinstance(object, list):
for value in object:
self.process(value)
elif isinstance(object, str):
self.log.warning(u'Found a non-unicode string: %r', object)
class ChannelTracker(Processor):
priority = -1550
addressed = False
event_types = (u'state', u'source')
@handler
def track(self, event):
if event.type == u'source':
if event.status == u'disconnected':
ibid.channels.pop(event.source, None)
elif event.status == u'left':
ibid.channels[event.source].pop(event.channel, None)
elif event.public:
if event.state == u'online' and hasattr(event, 'othername'):
oldid = identify(event.session, event.source, event.othername)
for channel in ibid.channels[event.source].values():
if oldid in channel:
channel.remove(oldid)
channel.add(event.identity)
elif event.state == u'online':
ibid.channels[event.source][event.channel].add(event.identity)
elif event.state == u'offline' and not hasattr(event, 'othername'):
if event.channel:
ibid.channels[event.source][event.channel].remove(event.identity)
else:
for channel in ibid.channels[event.source].values():
channel.discard(event.identity)
# vi: set et sta sw=4 ts=4: