# 自动补全联系人

In [240]:
import redis
import time
import datetime
import bisect
import uuid
import math
import json
conn = redis.Redis(decode_responses=True)

In [7]:
def add_update_contact(conn, user, contact):
    ac_list = 'recent:'+user
    pipe = conn.pipeline(True)
    pipe.lrem(ac_list, 0, contact)
    pipe.lpush(ac_list, contact)
    pipe.ltrim(ac_list, 0, 10)
    pipe.execute()

In [16]:
def fetch_autocomplete_list(conn, user, prefix):
    candidates = list(conn.lrange('recent:'+user, 0, -1))
    matches = []
    for candidate in candidates:
        if candidate.lower().startswith(prefix):
            matches.append(candidate)
    return matches

In [17]:
add_update_contact(conn,'user1','cont')

In [18]:
fetch_autocomplete_list(conn,'user1','con')

['cont']

In [20]:
# 通过插入在有序列表中插入前缀之前的一个字符串和之后的一个字符串来获取中间满足条件的字符串
validate_characters = '`abcdefghijklmnopqrstuvwxyz{'

def find_prefix_range(prefix):
    posn = bisect.bisect_left(validate_characters, prefix[-1:])
    suffix = validate_characters[(posn or 1) - 1]
    return prefix[:-1]+suffix+'{', prefix+'}'


In [30]:
def autocomplete_on_prefix(conn, guild, prefix):
    start, end = find_prefix_range(prefix)
    identifier = str(uuid.uuid4())
    start += identifier
    end += identifier
    zset_name = 'members:'+guild
    
    conn.zadd(zset_name, {start: 0, end: 0})
    pipe = conn.pipeline(True)
    while True:
        try:
            pipe.watch(zset_name)
            sindex = pipe.zrank(zset_name, start)
            eindex = pipe.zrank(zset_name, end)
            erange = min(sindex + 9, eindex-2)
            pipe.multi()
            
            pipe.zrem(zset_name, start, end)
            pipe.zrange(zset_name, sindex, erange)
            items = pipe.execute()[-1]
            break
        except redis.exceptions.WatchError:
            continue
    return [item for item in items if '{' not in item]

In [23]:
def join_guild(conn, guild, user):
    conn.zadd('members:'+guild, {user:0})
    
def leave_guild(conn, guild, user):
    conn.zrem('members:'+guild, user)

In [26]:
join_guild(conn, 'guild1', 'use2')

conn.zrange('members:guild1',0,-1, withscores=True)

In [32]:
autocomplete_on_prefix(conn, 'guild1', 'user')

['user1', 'user2']

# 锁

In [107]:
def acquire_lock(conn, lockname, acquire_timeout=10):
    identifier = str(uuid.uuid4())
    
    end = time.time() + acquire_timeout
    while time.time() < end:
        if conn.setnx('lock:'+lockname, identifier):
            return identifier
        time.sleep(.001)
    return False

def release_lock(conn, lockname, identifier):
    pipe = conn.pipeline(True)
    lockname = 'lock:' + lockname
    
    while True:
        try:
            pipe.watch(lockname)
            if pipe.get(lockname) == identifier:
                pipe.multi()
                pipe.delete(lockname)
                pipe.execute()
                return True
            pipe.unwatch()
            break
        except redis.exceptions.RedisError:
            pass
        return False

In [122]:
def purchase_item_with_lock(conn, buyerid, itemid, sellerid):
    buyer = 'users:%s' % buyerid
    seller = 'users:%s' % sellerid
    item = '%s.%s' %(itemid, sellerid)
    inventory = 'inventory:%s' %buyerid
    
    #locked = acquire_lock(conn, 'market:')
    locked = acquire_lock_with_timeout(conn, 'market:')
    if not locked:
        print('获取锁失败')
        return False
    pipe = conn.pipeline(True) 
    try:
        pipe.zscore('market:', item)
        pipe.hget(buyer, 'funds')
        price, funds = map(int,pipe.execute())
        if price is None or price > funds:
            return None
        pipe.hincrby(seller, 'funds' , int(price))
        pipe.hincrby(buyer, 'funds', int(-price))
        pipe.sadd(inventory, itemid)
        pipe.zrem('market:', item)
        pipe.execute()
        return True
    finally:
        release_lock(conn, 'market:', locked)

In [124]:
conn.zrange('market:', 0, -1, withscores=True)

[]

In [125]:
conn.smembers('inventory:2')

{'itemD', 'itemE', 'itemF', 'itemG', 'itemH', 'itemL'}

In [126]:
conn.smembers('inventory:1')

{'itemG', 'itemH'}

In [127]:
conn.hgetall('users:1')

{'name': 'jim', 'funds': '140'}

In [128]:
conn.hgetall('users:2')

{'name': 'tom', 'funds': '3'}

In [123]:
purchase_item_with_lock(conn, 2, 'itemL',1)

True

In [121]:
def acquire_lock_with_timeout(conn, lockname, acquire_timeout=10, lock_timeout=10):
    identifier = str(uuid.uuid4())
    lockname = 'lock:'+lockname
    lock_timeout = int(math.ceil(lock_timeout))
    
    end = time.time() + acquire_timeout
    while time.time() < end:
        if conn.setnx(lockname,  identifier):
            conn.expire(lockname, lock_timeout)
            return identifier
        elif not conn.ttl(lockname):
            conn.expire(lockname, lock_timeout)
        time.sleep(.001)
    return False
    

In [120]:
conn.get('lock:market:')

# 计数信号量

In [129]:
def acquire_semaphore(conn, semname, limit, timeout=10):
    identifier = str(uuid.uuid4())
    now = time.time()
    pipeline = conn.pipeline(True)
    pipeline.zremrangebyscore(semname, '-inf', now-timeout)
    pipeline.zadd(semname, {identifier:now})
    pipeline.zrank(semname, identifier)
    if pipeline.execute()[-1] < limit:
        return identifier
    conn.zrem(semname, identifier)
    return None

def release_semaphore(conn, semname, identifier):
    return conn.zrem(semname, identifier)

In [148]:
acquire_semaphore(conn, 'store', 5)

'5e0a176a-bfa2-4d19-9d9d-e232863c1fd8'

In [151]:
conn.zrange('store', 0, -1, withscores=True)

[]

In [150]:
release_semaphore(conn, 'store','5e0a176a-bfa2-4d19-9d9d-e232863c1fd8')

1

In [230]:
# 公平信号量
def acquire_fair_semaphore(conn, semname, limit, timeout=10):
    identifier = str(uuid.uuid4())
    czset = semname + ':owner'
    ctr = semname + ':counter'
    
    now = time.time()
    pipeline = conn.pipeline(True)
    pipeline.zremrangebyscore(semname, '-inf', now-timeout)
    pipeline.zinterstore(czset, {czset:1, semname:0})
    
    pipeline.incr(ctr)
    counter = pipeline.execute()[-1]
    
    pipeline.zadd(semname, {identifier:now})
    pipeline.zadd(czset, {identifier: counter})
    pipeline.zrank(czset, identifier)
    if pipeline.execute()[-1] < limit:
        return identifier
    pipeline.zrem(semname, identifier)
    pipeline.zrem(czset, identifier)
    pipeline.execute()
    return None

def release_fair_semaphore(conn, semname, identifier):
    pipeline = conn.pipeline()
    pipeline.zrem(semname, identifier)
    pipeline.zrem(semname+':owner', identifier)
    return pipeline.execute()[0]

In [231]:
acquire_fair_semaphore(conn, 'store', 3)

'ea69f278-fd04-4960-a5b7-9ae5c9c0ee25'

In [232]:
conn.zrange('store:owner',0, -1, withscores=True)

[('ea69f278-fd04-4960-a5b7-9ae5c9c0ee25', 19.0)]

In [233]:
conn.get('store:counter')

'19'

In [234]:
conn.zrange('store', 0, -1, withscores=True)

[('ea69f278-fd04-4960-a5b7-9ae5c9c0ee25', 1573269355.298795)]

In [221]:
release_fair_semaphore(conn,'store', 'd9bd8b0f-fcb2-4a25-b610-529f4b672e02')

1

In [226]:
def refresh_fair_semaphore(conn, semname, identifier):
    if conn.zadd(semname, {identifier:time.time()}):
        refresh_fair_semaphore(conn, semname, identifier)
        return False
    return True

In [227]:
refresh_fair_semaphore(conn,'store', 'd9bd8b0f-fcb2-4a25-b610-529f4b672e02')

False

In [235]:
def acquire_semaphore_with_lock(conn, semname, limit, timeout=10):
    identifier = acquire_lock_with_timeout(conn, semname, acquire_timeout=.01)
    if identifier:
        try:
            return acquire_fair_semaphore(conn, semname, limit, timeout)
        finally:
            release_lock(conn, semname, identifier)


In [239]:
acquire_semaphore_with_lock(conn, 'store', 3)

# 消息拉取

In [255]:
def creat_chat(conn, sender, recipients, message, chat_id=None):
    chat_id = chat_id or str(conn.incr('ids:chat:',1))
    recipients.append(sender)
    recipientsd = dict((r, 0) for r in recipients)
    
    pipeline = conn.pipeline(True)
    pipeline.zadd('chat:'+chat_id, recipientsd)
    for rec in recipients:
        pipeline.zadd('seen:'+rec, {chat_id: 0})
    pipeline.execute()
    return send_message(conn, chat_id, sender, message)

In [258]:
def send_message(conn, chat_id, sender, message):
    identifer = acquire_lock(conn, 'chat:'+chat_id)
    if not identifer:
        raise Exception("Couldn't get the lock")
    try:
        mid = conn.incr('ids:'+chat_id)
        ts = time.time()
        packed = json.dumps({
            'id': mid,
            'ts': ts,
            'sender': sender,
            'message': message
        })
        conn.zadd('msgs:'+chat_id, {packed: mid})
    finally:
        release_lock(conn, 'chat:'+chat_id, identifer)
    return chat_id

In [355]:
def fetch_pending_messages(conn, recipient):
    seen = conn.zrange('seen:'+recipient, 0, -1, withscores=True)
    pipe = conn.pipeline(True)
    for chat_id, seen_id in seen:
        pipe.zrangebyscore('msgs:'+chat_id, seen_id+1, 'inf')
    chat_info = list(zip(seen, pipe.execute()))
    
    for i , ((chat_id, seen_id), messages) in enumerate(chat_info):
        if not messages:
            continue
        messages[:]=map(json.loads, messages)
        seen_id = messages[-1]['id']
        conn.zadd('chat:'+chat_id, {recipient:seen_id})

        min_id =  conn.zrange('chat:'+ chat_id, 0, 0, withscores=True)
        pipe.zadd('seen:'+ recipient, {chat_id: seen_id})
        if min_id:
            pipe.zremrangebyscore('msgs:'+chat_id, 0, min_id[0][1])
        chat_info[i] = (chat_id, messages)
    pipe.execute()
    return chat_info    

In [321]:
def join_chat(conn, chat_id, user):
    message_id = int(conn.get('ids:'+chat_id))
    pipe = conn.pipeline(True)
    pipe.zadd('chat:'+chat_id, {user: message_id})
    pipe.zadd('seen:'+user, {chat_id: message_id})
    pipe.execute()

In [324]:
def leave_chat(conn, chat_id, user):
    pipe = conn.pipeline(True)
    pipe.zrem('chat:'+chat_id, user)
    pipe.zrem('seen:'+user, chat_id)
    pipe.zcard('chat:'+chat_id)
    if not pipe.execute()[-1]:
        pipe.delete('msgs:'+chat_id)
        pipe.delete('ids:'+chat_id)
        pipe.execute()
    else:
        oldest = conn.zrange('chat:'+chat_id, 0, 0, withscores=True)
        conn.zremrangebyscore('chat:'+chat_id, 0, oldest[0][1])

In [358]:
creat_chat(conn, 'user1', [],'message1', 'chat1')
join_chat(conn, 'chat1','user1')
join_chat(conn, 'chat1','user2')
join_chat(conn, 'chat1','user3')

In [359]:
conn.zrange('chat:chat1', 0, -1, withscores=True)

[('user1', 3.0), ('user2', 3.0), ('user3', 3.0)]

In [360]:
conn.zrange('seen:user1',0,-1, withscores=True)

[('chat1', 3.0)]

In [361]:
conn.zrange('seen:user2',0,-1,withscores=True)

[('chat1', 3.0)]

In [365]:
send_message(conn,'chat1','user2','message2')

'chat1'

In [370]:
conn.zrange('msgs:chat1', 0, -1, withscores=True)

[('{"id": 4, "ts": 1573375480.7585511, "sender": "user2", "message": "message2"}',
  4.0)]

In [368]:
fetch_pending_messages(conn, 'user3')

[('chat1',
  [{'id': 4,
    'ts': 1573375480.7585511,
    'sender': 'user2',
    'message': 'message2'}])]

In [371]:
leave_chat(conn,'chat1','user2')