Permalink
Find file
Fetching contributors…
Cannot retrieve contributors at this time
1060 lines (898 sloc) 39.6 KB
# coding: utf-8
import bisect
from collections import defaultdict, deque
import json
import math
import os
import time
import unittest
import uuid
import zlib
import redis
QUIT = False
pipe = inv = item = buyer = seller = inventory = None
# 代码清单 6-1
# <start id="_1314_14473_8380"/>
def add_update_contact(conn, user, contact):
ac_list = 'recent:' + user
# 准备执行原子操作。
pipeline = conn.pipeline(True)
# 如果联系人已经存在,那么移除他。
pipeline.lrem(ac_list, contact)
# 将联系人推入到列表的最前端。
pipeline.lpush(ac_list, contact)
# 只保留列表里面的前100个联系人。
pipeline.ltrim(ac_list, 0, 99)
# 实际地执行以上操作。
pipeline.execute()
# <end id="_1314_14473_8380"/>
# <start id="_1314_14473_8383"/>
def remove_contact(conn, user, contact):
conn.lrem('recent:' + user, contact)
# <end id="_1314_14473_8383"/>
# 代码清单 6-2
# <start id="_1314_14473_8386"/>
def fetch_autocomplete_list(conn, user, prefix):
# 获取自动补完列表。
candidates = conn.lrange('recent:' + user, 0, -1)
matches = []
# 检查每个候选联系人。
for candidate in candidates:
if candidate.lower().startswith(prefix):
# 发现一个匹配的联系人。
matches.append(candidate)
# 返回所有匹配的联系人。
return matches
# <end id="_1314_14473_8386"/>
# 代码清单 6-3
# <start id="_1314_14473_8396"/>
# 准备一个由已知字符组成的列表。
valid_characters = '`abcdefghijklmnopqrstuvwxyz{'
def find_prefix_range(prefix):
# 在字符列表中查找前缀字符所处的位置。
posn = bisect.bisect_left(valid_characters, prefix[-1:])
# 找到前驱字符。
suffix = valid_characters[(posn or 1) - 1]
# 返回范围。
return prefix[:-1] + suffix + '{', prefix + '{'
# <end id="_1314_14473_8396"/>
# 代码清单 6-4
# <start id="_1314_14473_8399"/>
def autocomplete_on_prefix(conn, guild, prefix):
# 根据给定的前缀计算出查找范围的起点和终点。
start, end = find_prefix_range(prefix)
identifier = str(uuid.uuid4())
start += identifier
end += identifier
zset_name = 'members:' + guild
# 将范围的起始元素和结束元素添加到有序集合里面。
conn.zadd(zset_name, start, 0, end, 0)
pipeline = conn.pipeline(True)
while 1:
try:
pipeline.watch(zset_name)
# 找到两个被插入元素在有序集合中的排名。
sindex = pipeline.zrank(zset_name, start)
eindex = pipeline.zrank(zset_name, end)
erange = min(sindex + 9, eindex - 2)
pipeline.multi()
# 获取范围内的值,然后删除之前插入的起始元素和结束元素。
pipeline.zrem(zset_name, start, end)
pipeline.zrange(zset_name, sindex, erange)
items = pipeline.execute()[-1]
break
# 如果自动补完有序集合已经被其他客户端修改过了,那么进行重试。
except redis.exceptions.WatchError:
continue
# 如果有其他自动补完操作正在执行,
# 那么从获取到的元素里面移除起始元素和终结元素。
return [item for item in items if '{' not in item]
# <end id="_1314_14473_8399"/>
# 代码清单 6-5
# <start id="_1314_14473_8403"/>
def join_guild(conn, guild, user):
conn.zadd('members:' + guild, user, 0)
def leave_guild(conn, guild, user):
conn.zrem('members:' + guild, user)
# <end id="_1314_14473_8403"/>
#END
# 代码清单 6-6
# <start id="_1314_14473_8431"/>
def list_item(conn, itemid, sellerid, price):
#...
# 监视卖家包裹发生的变动。
pipe.watch(inv)
# 确保被出售的物品仍然存在于卖家的包裹里面。
if not pipe.sismember(inv, itemid):
pipe.unwatch()
return None
# 将物品添加到市场里面。
pipe.multi()
pipe.zadd("market:", item, price)
pipe.srem(inv, itemid)
pipe.execute()
return True
#...
# <end id="_1314_14473_8431"/>
# 代码清单 6-7
# <start id="_1314_14473_8435"/>
def purchase_item(conn, buyerid, itemid, sellerid, lprice):
#...
# 监视市场以及买家个人信息发生的变化。
pipe.watch("market:", buyer)
# 检查物品是否已经售出、物品的价格是否已经发生了变化,
# 以及买家是否有足够的金钱来购买这件物品。
price = pipe.zscore("market:", item)
funds = int(pipe.hget(buyer, 'funds'))
if price != lprice or price > funds:
pipe.unwatch()
return None
# 将买家支付的货款转移给卖家,并将被卖出的物品转移给买家。
pipe.multi()
pipe.hincrby(seller, 'funds', int(price))
pipe.hincrby(buyerid, 'funds', int(-price))
pipe.sadd(inventory, itemid)
pipe.zrem("market:", item)
pipe.execute()
return True
#...
# <end id="_1314_14473_8435"/>
# 代码清单 6-8
# <start id="_1314_14473_8641"/>
def acquire_lock(conn, lockname, acquire_timeout=10):
# 128位随机标识符。
identifier = str(uuid.uuid4())
end = time.time() + acquire_timeout
while time.time() < end:
# 尝试取得锁。
if conn.setnx('lock:' + lockname, identifier):
return identifier
time.sleep(.001)
return False
# <end id="_1314_14473_8641"/>
# 代码清单 6-9
# <start id="_1314_14473_8645"/>
def purchase_item_with_lock(conn, buyerid, itemid, sellerid):
buyer = "users:%s" % buyerid
seller = "users:%s" % sellerid
item = "%s.%s" % (itemid, sellerid)
inventory = "inventory:%s" % buyerid
# 尝试获取锁。
locked = acquire_lock(conn, 'market:')
if not locked:
return False
pipe = conn.pipeline(True)
try:
# 检查物品是否已经售出,以及买家是否有足够的金钱来购买物品。
pipe.zscore("market:", item)
pipe.hget(buyer, 'funds')
price, funds = pipe.execute()
if price is None or price > funds:
return None
# 将买家支付的货款转移给卖家,并将售出的物品转移给买家。
pipe.hincrby(seller, 'funds', int(price))
pipe.hincrby(buyer, 'funds', int(-price))
pipe.sadd(inventory, itemid)
pipe.zrem("market:", item)
pipe.execute()
return True
finally:
# 释放锁。
release_lock(conn, 'market:', locked)
# <end id="_1314_14473_8645"/>
# 代码清单 6-10
# <start id="_1314_14473_8650"/>
def release_lock(conn, lockname, identifier):
pipe = conn.pipeline(True)
lockname = 'lock:' + lockname
while True:
try:
# 检查并确认进程还持有着锁。
pipe.watch(lockname)
if pipe.get(lockname) == identifier:
# 释放锁。
pipe.multi()
pipe.delete(lockname)
pipe.execute()
return True
pipe.unwatch()
break
# 有其他客户端修改了锁;重试。
except redis.exceptions.WatchError:
pass
# 进程已经失去了锁。
return False
# <end id="_1314_14473_8650"/>
# 代码清单 6-11
# <start id="_1314_14473_8790"/>
def acquire_lock_with_timeout(
conn, lockname, acquire_timeout=10, lock_timeout=10):
# 128位随机标识符。
identifier = str(uuid.uuid4())
lockname = 'lock:' + lockname
# 确保传给EXPIRE的都是整数。
lock_timeout = int(math.ceil(lock_timeout))
end = time.time() + acquire_timeout
while time.time() < end:
# 获取锁并设置过期时间。
if conn.setnx(lockname, identifier):
conn.expire(lockname, lock_timeout)
return identifier
# 检查过期时间,并在有需要时对其进行更新。
elif not conn.ttl(lockname):
conn.expire(lockname, lock_timeout)
time.sleep(.001)
return False
# <end id="_1314_14473_8790"/>
# 代码清单 6-12
# <start id="_1314_14473_8986"/>
def acquire_semaphore(conn, semname, limit, timeout=10):
# 128位随机标识符。
identifier = str(uuid.uuid4())
now = time.time()
pipeline = conn.pipeline(True)
# 清理过期的信号量持有者。
pipeline.zremrangebyscore(semname, '-inf', now - timeout)
# 尝试获取信号量。
pipeline.zadd(semname, identifier, now)
# 检查是否成功取得了信号量。
pipeline.zrank(semname, identifier)
if pipeline.execute()[-1] < limit:
return identifier
# 获取信号量失败,删除之前添加的标识符。
conn.zrem(semname, identifier)
return None
# <end id="_1314_14473_8986"/>
# 代码清单 6-13
# <start id="_1314_14473_8990"/>
def release_semaphore(conn, semname, identifier):
# 如果信号量已经被正确地释放,那么返回True;
# 返回False则表示该信号量已经因为过期而被删除了。
return conn.zrem(semname, identifier)
# <end id="_1314_14473_8990"/>
# 代码清单 6-14
# <start id="_1314_14473_9004"/>
def acquire_fair_semaphore(conn, semname, limit, timeout=10):
# 128位随机标识符。
identifier = str(uuid.uuid4())
czset = semname + ':owner'
ctr = semname + ':counter'
now = time.time()
pipeline = conn.pipeline(True)
# 删除超时的信号量。
pipeline.zremrangebyscore(semname, '-inf', now - timeout)
pipeline.zinterstore(czset, {czset: 1, semname: 0})
# 对计数器执行自增操作,并获取操作执行之后的值。
pipeline.incr(ctr)
counter = pipeline.execute()[-1]
# 尝试获取信号量。
pipeline.zadd(semname, identifier, now)
pipeline.zadd(czset, identifier, counter)
# 通过检查排名来判断客户端是否取得了信号量。
pipeline.zrank(czset, identifier)
if pipeline.execute()[-1] < limit:
# 客户端成功取得了信号量。
return identifier
# 客户端未能取得信号量,清理无用数据。
pipeline.zrem(semname, identifier)
pipeline.zrem(czset, identifier)
pipeline.execute()
return None
# <end id="_1314_14473_9004"/>
# 代码清单 6-15
# <start id="_1314_14473_9014"/>
def release_fair_semaphore(conn, semname, identifier):
pipeline = conn.pipeline(True)
pipeline.zrem(semname, identifier)
pipeline.zrem(semname + ':owner', identifier)
# 返回True表示信号量已被正确地释放,
# 返回False则表示想要释放的信号量已经因为超时而被删除了。
return pipeline.execute()[0]
# <end id="_1314_14473_9014"/>
# 代码清单 6-16
# <start id="_1314_14473_9022"/>
def refresh_fair_semaphore(conn, semname, identifier):
# 更新客户端持有的信号量。
if conn.zadd(semname, identifier, time.time()):
# 告知调用者,客户端已经失去了信号量。
release_fair_semaphore(conn, semname, identifier)
return False
# 客户端仍然持有信号量。
return True
# <end id="_1314_14473_9022"/>
# 代码清单 6-17
# <start id="_1314_14473_9031"/>
def acquire_semaphore_with_lock(conn, semname, limit, timeout=10):
identifier = acquire_lock(conn, semname, acquire_timeout=.01)
if identifier:
try:
return acquire_fair_semaphore(conn, semname, limit, timeout)
finally:
release_lock(conn, semname, identifier)
# <end id="_1314_14473_9031"/>
# 代码清单 6-18
# <start id="_1314_14473_9056"/>
def send_sold_email_via_queue(conn, seller, item, price, buyer):
# 准备好待发送邮件。
data = {
'seller_id': seller,
'item_id': item,
'price': price,
'buyer_id': buyer,
'time': time.time()
}
# 将待发送邮件推入到队列里面。
conn.rpush('queue:email', json.dumps(data))
# <end id="_1314_14473_9056"/>
# 代码清单 6-19
# <start id="_1314_14473_9060"/>
def process_sold_email_queue(conn):
while not QUIT:
# 尝试获取一封待发送邮件。
packed = conn.blpop(['queue:email'], 30)
# 队列里面暂时还没有待发送邮件,重试。
if not packed:
continue
# 从JSON对象中解码出邮件信息。
to_send = json.loads(packed[1])
try:
# 使用预先编写好的邮件发送函数来发送邮件。
fetch_data_and_send_sold_email(to_send)
except EmailSendError as err:
log_error("Failed to send sold email", err, to_send)
else:
log_success("Sent sold email", to_send)
# <end id="_1314_14473_9060"/>
# 代码清单 6-20
# <start id="_1314_14473_9066"/>
def worker_watch_queue(conn, queue, callbacks):
while not QUIT:
# 尝试从队列里面取出一项待执行任务。
packed = conn.blpop([queue], 30)
# 队列为空,没有任务需要执行;重试。
if not packed:
continue
# 解码任务信息。
name, args = json.loads(packed[1])
# 没有找到任务指定的回调函数,用日志记录错误并重试。
if name not in callbacks:
log_error("Unknown callback %s"%name)
continue
# 执行任务。
callbacks[name](*args)
# <end id="_1314_14473_9066"/>
# 代码清单 6-21
# <start id="_1314_14473_9074"/>
def worker_watch_queues(conn, queues, callbacks): # 实现优先级特性要修改的第一行代码。
while not QUIT:
packed = conn.blpop(queues, 30) # 实现优先级特性要修改的第二行代码。
if not packed:
continue
name, args = json.loads(packed[1])
if name not in callbacks:
log_error("Unknown callback %s"%name)
continue
callbacks[name](*args)
# <end id="_1314_14473_9074"/>
# 代码清单 6-22
# <start id="_1314_14473_9094"/>
def execute_later(conn, queue, name, args, delay=0):
# 创建唯一标识符。
identifier = str(uuid.uuid4())
# 准备好需要入队的任务。
item = json.dumps([identifier, queue, name, args])
if delay > 0:
# 延迟执行这个任务。
conn.zadd('delayed:', item, time.time() + delay)
else:
# 立即执行这个任务。
conn.rpush('queue:' + queue, item)
# 返回标识符。
return identifier
# <end id="_1314_14473_9094"/>
# 代码清单 6-23
# <start id="_1314_14473_9099"/>
def poll_queue(conn):
while not QUIT:
# 获取队列中的第一个任务。
item = conn.zrange('delayed:', 0, 0, withscores=True)
# 队列没有包含任何任务,或者任务的执行时间未到。
if not item or item[0][1] > time.time():
time.sleep(.01)
continue
# 解码要被执行的任务,弄清楚它应该被推入到哪个任务队列里面。
item = item[0][0]
identifier, queue, function, args = json.loads(item)
# 为了对任务进行移动,尝试获取锁。
locked = acquire_lock(conn, identifier)
# 获取锁失败,跳过后续步骤并重试。
if not locked:
continue
# 将任务推入到适当的任务队列里面。
if conn.zrem('delayed:', item):
conn.rpush('queue:' + queue, item)
# 释放锁。
release_lock(conn, identifier, locked)
# <end id="_1314_14473_9099"/>
# 代码清单 6-24
# <start id="_1314_14473_9124"/>
def create_chat(conn, sender, recipients, message, chat_id=None):
# 获得新的群组ID。
chat_id = chat_id or str(conn.incr('ids:chat:'))
# 创建一个由用户和分值组成的字典,字典里面的信息将被添加到有序集合里面。
recipients.append(sender)
recipientsd = dict((r, 0) for r in recipients)
pipeline = conn.pipeline(True)
# 将所有参与群聊的用户添加到有序集合里面。
pipeline.zadd('chat:' + chat_id, **recipientsd)
# 初始化已读有序集合。
for rec in recipients:
pipeline.zadd('seen:' + rec, chat_id, 0)
pipeline.execute()
# 发送消息。
return send_message(conn, chat_id, sender, message)
# <end id="_1314_14473_9124"/>
# 代码清单 6-25
# <start id="_1314_14473_9127"/>
def send_message(conn, chat_id, sender, message):
identifier = acquire_lock(conn, 'chat:' + chat_id)
if not identifier:
raise Exception("Couldn't get the lock")
try:
# 筹备待发送的消息。
mid = conn.incr('ids:' + chat_id)
ts = time.time()
packed = json.dumps({
'id': mid,
'ts': ts,
'sender': sender,
'message': message,
})
# 将消息发送至群组。
conn.zadd('msgs:' + chat_id, packed, mid)
finally:
release_lock(conn, 'chat:' + chat_id, identifier)
return chat_id
# <end id="_1314_14473_9127"/>
# 代码清单 6-26
# <start id="_1314_14473_9132"/>
def fetch_pending_messages(conn, recipient):
# 获取最后接收到的消息的ID。
seen = conn.zrange('seen:' + recipient, 0, -1, withscores=True)
pipeline = conn.pipeline(True)
# 获取所有未读消息。
for chat_id, seen_id in seen:
pipeline.zrangebyscore(
'msgs:' + chat_id, seen_id+1, 'inf')
# 这些数据将被返回给函数调用者。
chat_info = zip(seen, pipeline.execute())
for i, ((chat_id, seen_id), messages) in enumerate(chat_info):
if not messages:
continue
messages[:] = map(json.loads, messages)
# 使用最新收到的消息来更新群组有序集合。
seen_id = messages[-1]['id']
conn.zadd('chat:' + chat_id, recipient, seen_id)
# 找出那些所有人都已经阅读过的消息。
min_id = conn.zrange(
'chat:' + chat_id, 0, 0, withscores=True)
# 更新已读消息有序集合。
pipeline.zadd('seen:' + recipient, chat_id, seen_id)
if min_id:
# 清除那些已经被所有人阅读过的消息。
pipeline.zremrangebyscore(
'msgs:' + chat_id, 0, min_id[0][1])
chat_info[i] = (chat_id, messages)
pipeline.execute()
return chat_info
# <end id="_1314_14473_9132"/>
# 代码清单 6-27
# <start id="_1314_14473_9135"/>
def join_chat(conn, chat_id, user):
# 取得最新群组消息的ID。
message_id = int(conn.get('ids:' + chat_id))
pipeline = conn.pipeline(True)
# 将用户添加到群组成员列表里面。
pipeline.zadd('chat:' + chat_id, user, message_id)
# 将群组添加到用户的已读列表里面。
pipeline.zadd('seen:' + user, chat_id, message_id)
pipeline.execute()
# <end id="_1314_14473_9135"/>
# 代码清单 6-28
# <start id="_1314_14473_9136"/>
def leave_chat(conn, chat_id, user):
pipeline = conn.pipeline(True)
# 从群组里面移除给定的用户。
pipeline.zrem('chat:' + chat_id, user)
pipeline.zrem('seen:' + user, chat_id)
# 查看群组剩余成员的数量。
pipeline.zcard('chat:' + chat_id)
if not pipeline.execute()[-1]:
# 删除群组。
pipeline.delete('msgs:' + chat_id)
pipeline.delete('ids:' + chat_id)
pipeline.execute()
else:
# 查找那些已经被所有成员阅读过的消息。
oldest = conn.zrange(
'chat:' + chat_id, 0, 0, withscores=True)
# 删除那些已经被所有成员阅读过的消息。
conn.zremrangebyscore('msgs:' + chat_id, 0, oldest[0][1])
# <end id="_1314_14473_9136"/>
# 代码清单 6-29
# <start id="_1314_15044_3669"/>
# 本地聚合数据字典。
aggregates = defaultdict(lambda: defaultdict(int))
def daily_country_aggregate(conn, line):
if line:
line = line.split()
# 提取日志行中的信息。
ip = line[0]
day = line[1]
# 根据IP地址判断用户所在国家。
country = find_city_by_ip_local(ip)[2]
# 对本地聚合数据执行自增操作。
aggregates[day][country] += 1
return
# 当天的日志文件已经处理完毕,将聚合计算的结果写入到Redis里面。
for day, aggregate in aggregates.items():
conn.zadd('daily:country:' + day, **aggregate)
del aggregates[day]
# <end id="_1314_15044_3669"/>
# 代码清单 6-30
# <start id="_1314_14473_9209"/>
def copy_logs_to_redis(conn, path, channel, count=10,
limit=2**30, quit_when_done=True):
bytes_in_redis = 0
waiting = deque()
# 创建用于向客户端发送消息的群组。
create_chat(conn, 'source', map(str, range(count)), '', channel)
count = str(count)
# 遍历所有日志文件。
for logfile in sorted(os.listdir(path)):
full_path = os.path.join(path, logfile)
fsize = os.stat(full_path).st_size
# 如果程序需要更多空间,那么清除已经处理完毕的文件。
while bytes_in_redis + fsize > limit:
cleaned = _clean(conn, channel, waiting, count)
if cleaned:
bytes_in_redis -= cleaned
else:
time.sleep(.25)
# 将文件上传至Redis。
with open(full_path, 'rb') as inp:
block = ' '
while block:
block = inp.read(2**17)
conn.append(channel+logfile, block)
# 提醒监听者,文件已经准备就绪。
send_message(conn, channel, 'source', logfile)
# 对本地记录的Redis内存占用量相关信息进行更新。
bytes_in_redis += fsize
waiting.append((logfile, fsize))
# 所有日志文件已经处理完毕,向监听者报告此事。
if quit_when_done:
send_message(conn, channel, 'source', ':done')
# 在工作完成之后,清理无用的日志文件。
while waiting:
cleaned = _clean(conn, channel, waiting, count)
if cleaned:
bytes_in_redis -= cleaned
else:
time.sleep(.25)
# 对Redis进行清理的详细步骤。
def _clean(conn, channel, waiting, count):
if not waiting:
return 0
w0 = waiting[0][0]
if conn.get(channel + w0 + ':done') == count:
conn.delete(channel + w0, channel + w0 + ':done')
return waiting.popleft()[1]
return 0
# <end id="_1314_14473_9209"/>
# 代码清单 6-31
# <start id="_1314_14473_9213"/>
def process_logs_from_redis(conn, id, callback):
while 1:
# 获取文件列表。
fdata = fetch_pending_messages(conn, id)
for ch, mdata in fdata:
for message in mdata:
logfile = message['message']
# 所有日志行已经处理完毕。
if logfile == ':done':
return
elif not logfile:
continue
# 选择一个块读取器(block reader)。
block_reader = readblocks
if logfile.endswith('.gz'):
block_reader = readblocks_gz
# 遍历日志行。
for line in readlines(conn, ch+logfile, block_reader):
# 将日志行传递给回调函数。
callback(conn, line)
# 强制地刷新聚合数据缓存。
callback(conn, None)
# 报告日志已经处理完毕。
conn.incr(ch + logfile + ':done')
if not fdata:
time.sleep(.1)
# <end id="_1314_14473_9213"/>
# 代码清单 6-32
# <start id="_1314_14473_9221"/>
def readlines(conn, key, rblocks):
out = ''
for block in rblocks(conn, key):
out += block
# 查找位于文本最右端的断行符;如果断行符不存在,那么rfind()返回-1。
posn = out.rfind('\n')
# 找到一个断行符。
if posn >= 0:
# 根据断行符来分割日志行。
for line in out[:posn].split('\n'):
# 向调用者返回每个行。
yield line + '\n'
# 保留余下的数据。
out = out[posn+1:]
# 所有数据块已经处理完毕。
if not block:
yield out
break
# <end id="_1314_14473_9221"/>
# 代码清单 6-33
# <start id="_1314_14473_9225"/>
def readblocks(conn, key, blocksize=2**17):
lb = blocksize
pos = 0
# 尽可能地读取更多数据,直到出现不完整读操作(partial read)为止。
while lb == blocksize:
# 获取数据块。
block = conn.substr(key, pos, pos + blocksize - 1)
# 准备进行下一次遍历。
yield block
lb = len(block)
pos += lb
yield ''
# <end id="_1314_14473_9225"/>
# 代码清单 6-34
# <start id="_1314_14473_9229"/>
def readblocks_gz(conn, key):
inp = ''
decoder = None
# 从Redis里面读入原始数据。
for block in readblocks(conn, key, 2**17):
if not decoder:
inp += block
try:
# 分析头信息以便取得被压缩数据。
if inp[:3] != "\x1f\x8b\x08":
raise IOError("invalid gzip data")
i = 10
flag = ord(inp[3])
if flag & 4:
i += 2 + ord(inp[i]) + 256*ord(inp[i+1])
if flag & 8:
i = inp.index('\0', i) + 1
if flag & 16:
i = inp.index('\0', i) + 1
if flag & 2:
i += 2
# 程序读取的头信息并不完整。
if i > len(inp):
raise IndexError("not enough data")
except (IndexError, ValueError):
continue
else:
# 已经找到头信息,准备好相应的解压程序。
block = inp[i:]
inp = None
decoder = zlib.decompressobj(-zlib.MAX_WBITS)
if not block:
continue
# 所有数据已经处理完毕,向调用者返回最后剩下的数据块。
if not block:
yield decoder.flush()
break
# 向调用者返回解压后的数据块。
yield decoder.decompress(block)
# <end id="_1314_14473_9229"/>
class TestCh06(unittest.TestCase):
def setUp(self):
import redis
self.conn = redis.Redis(db=15)
def tearDown(self):
self.conn.flushdb()
del self.conn
print
print
def test_add_update_contact(self):
import pprint
conn = self.conn
conn.delete('recent:user')
print "Let's add a few contacts..."
for i in xrange(10):
add_update_contact(conn, 'user', 'contact-%i-%i'%(i//3, i))
print "Current recently contacted contacts"
contacts = conn.lrange('recent:user', 0, -1)
pprint.pprint(contacts)
self.assertTrue(len(contacts) >= 10)
print
print "Let's pull one of the older ones up to the front"
add_update_contact(conn, 'user', 'contact-1-4')
contacts = conn.lrange('recent:user', 0, 2)
print "New top-3 contacts:"
pprint.pprint(contacts)
self.assertEquals(contacts[0], 'contact-1-4')
print
print "Let's remove a contact..."
print remove_contact(conn, 'user', 'contact-2-6')
contacts = conn.lrange('recent:user', 0, -1)
print "New contacts:"
pprint.pprint(contacts)
self.assertTrue(len(contacts) >= 9)
print
print "And let's finally autocomplete on "
all = conn.lrange('recent:user', 0, -1)
contacts = fetch_autocomplete_list(conn, 'user', 'c')
self.assertTrue(all == contacts)
equiv = [c for c in all if c.startswith('contact-2-')]
contacts = fetch_autocomplete_list(conn, 'user', 'contact-2-')
equiv.sort()
contacts.sort()
self.assertEquals(equiv, contacts)
conn.delete('recent:user')
def test_address_book_autocomplete(self):
self.conn.delete('members:test')
print "the start/end range of 'abc' is:", find_prefix_range('abc')
print
print "Let's add a few people to the guild"
for name in ['jeff', 'jenny', 'jack', 'jennifer']:
join_guild(self.conn, 'test', name)
print
print "now let's try to find users with names starting with 'je':"
r = autocomplete_on_prefix(self.conn, 'test', 'je')
print r
self.assertTrue(len(r) == 3)
print "jeff just left to join a different guild..."
leave_guild(self.conn, 'test', 'jeff')
r = autocomplete_on_prefix(self.conn, 'test', 'je')
print r
self.assertTrue(len(r) == 2)
self.conn.delete('members:test')
def test_distributed_locking(self):
self.conn.delete('lock:testlock')
print "Getting an initial lock..."
self.assertTrue(acquire_lock_with_timeout(self.conn, 'testlock', 1, 1))
print "Got it!"
print "Trying to get it again without releasing the first one..."
self.assertFalse(acquire_lock_with_timeout(self.conn, 'testlock', .01, 1))
print "Failed to get it!"
print
print "Waiting for the lock to timeout..."
time.sleep(2)
print "Getting the lock again..."
r = acquire_lock_with_timeout(self.conn, 'testlock', 1, 1)
self.assertTrue(r)
print "Got it!"
print "Releasing the lock..."
self.assertTrue(release_lock(self.conn, 'testlock', r))
print "Released it..."
print
print "Acquiring it again..."
self.assertTrue(acquire_lock_with_timeout(self.conn, 'testlock', 1, 1))
print "Got it!"
self.conn.delete('lock:testlock')
def test_counting_semaphore(self):
self.conn.delete('testsem', 'testsem:owner', 'testsem:counter')
print "Getting 3 initial semaphores with a limit of 3..."
for i in xrange(3):
self.assertTrue(acquire_fair_semaphore(self.conn, 'testsem', 3, 1))
print "Done!"
print "Getting one more that should fail..."
self.assertFalse(acquire_fair_semaphore(self.conn, 'testsem', 3, 1))
print "Couldn't get it!"
print
print "Lets's wait for some of them to time out"
time.sleep(2)
print "Can we get one?"
r = acquire_fair_semaphore(self.conn, 'testsem', 3, 1)
self.assertTrue(r)
print "Got one!"
print "Let's release it..."
self.assertTrue(release_fair_semaphore(self.conn, 'testsem', r))
print "Released!"
print
print "And let's make sure we can get 3 more!"
for i in xrange(3):
self.assertTrue(acquire_fair_semaphore(self.conn, 'testsem', 3, 1))
print "We got them!"
self.conn.delete('testsem', 'testsem:owner', 'testsem:counter')
def test_delayed_tasks(self):
import threading
self.conn.delete('queue:tqueue', 'delayed:')
print "Let's start some regular and delayed tasks..."
for delay in [0, .5, 0, 1.5]:
self.assertTrue(execute_later(self.conn, 'tqueue', 'testfn', [], delay))
r = self.conn.llen('queue:tqueue')
print "How many non-delayed tasks are there (should be 2)?", r
self.assertEquals(r, 2)
print
print "Let's start up a thread to bring those delayed tasks back..."
t = threading.Thread(target=poll_queue, args=(self.conn,))
t.setDaemon(1)
t.start()
print "Started."
print "Let's wait for those tasks to be prepared..."
time.sleep(2)
global QUIT
QUIT = True
t.join()
r = self.conn.llen('queue:tqueue')
print "Waiting is over, how many tasks do we have (should be 4)?", r
self.assertEquals(r, 4)
self.conn.delete('queue:tqueue', 'delayed:')
def test_multi_recipient_messaging(self):
self.conn.delete('ids:chat:', 'msgs:1', 'ids:1', 'seen:joe', 'seen:jeff', 'seen:jenny')
print "Let's create a new chat session with some recipients..."
chat_id = create_chat(self.conn, 'joe', ['jeff', 'jenny'], 'message 1')
print "Now let's send a few messages..."
for i in xrange(2, 5):
send_message(self.conn, chat_id, 'joe', 'message %s'%i)
print
print "And let's get the messages that are waiting for jeff and jenny..."
r1 = fetch_pending_messages(self.conn, 'jeff')
r2 = fetch_pending_messages(self.conn, 'jenny')
print "They are the same?", r1==r2
self.assertEquals(r1, r2)
print "Those messages are:"
import pprint
pprint.pprint(r1)
self.conn.delete('ids:chat:', 'msgs:1', 'ids:1', 'seen:joe', 'seen:jeff', 'seen:jenny')
def test_file_distribution(self):
import gzip, shutil, tempfile, threading
self.conn.delete('test:temp-1.txt', 'test:temp-2.txt', 'test:temp-3.txt', 'msgs:test:', 'seen:0', 'seen:source', 'ids:test:', 'chat:test:')
dire = tempfile.mkdtemp()
try:
print "Creating some temporary 'log' files..."
with open(dire + '/temp-1.txt', 'wb') as f:
f.write('one line\n')
with open(dire + '/temp-2.txt', 'wb') as f:
f.write(10000 * 'many lines\n')
out = gzip.GzipFile(dire + '/temp-3.txt.gz', mode='wb')
for i in xrange(100000):
out.write('random line %s\n'%(os.urandom(16).encode('hex'),))
out.close()
size = os.stat(dire + '/temp-3.txt.gz').st_size
print "Done."
print
print "Starting up a thread to copy logs to redis..."
t = threading.Thread(target=copy_logs_to_redis, args=(self.conn, dire, 'test:', 1, size))
t.setDaemon(1)
t.start()
print "Let's pause to let some logs get copied to Redis..."
time.sleep(.25)
print
print "Okay, the logs should be ready. Let's process them!"
index = [0]
counts = [0, 0, 0]
def callback(conn, line):
if line is None:
print "Finished with a file %s, linecount: %s"%(index[0], counts[index[0]])
index[0] += 1
elif line or line.endswith('\n'):
counts[index[0]] += 1
print "Files should have 1, 10000, and 100000 lines"
process_logs_from_redis(self.conn, '0', callback)
self.assertEquals(counts, [1, 10000, 100000])
print
print "Let's wait for the copy thread to finish cleaning up..."
t.join()
print "Done cleaning out Redis!"
finally:
print "Time to clean up files..."
shutil.rmtree(dire)
print "Cleaned out files!"
self.conn.delete('test:temp-1.txt', 'test:temp-2.txt', 'test:temp-3.txt', 'msgs:test:', 'seen:0', 'seen:source', 'ids:test:', 'chat:test:')
if __name__ == '__main__':
unittest.main()