其实就是实现各种小功能  
使用日志和记录器来收集当前系统的信息，挖掘正在使用系统的顾客的相关信息，将Redis用作配置信息的字典

# 使用Redis来监控日志

In [1]:
import logging
import redis
import time
import datetime
import bisect
import uuid
import contextlib
import json
import functools

In [2]:
conn = redis.Redis()

In [3]:
# 将各个等级转换成字符串
SEVERITY = {
    logging.DEBUG: 'debug',
    logging.INFO: 'info',
    logging.WARNING: 'warning',
    logging.ERROR: 'error',
    logging.CRITICAL: 'critical'
}
def log_recent(conn, name, massage, severity=logging.INFO, pipe=None):
    """将日志按照name和等级，放到相应的列表中，并且控制列表的长度"""
    # 当找不到相应的等级时，str(logging.INFO) 这样的方式能够转换出一个数字字符串
    severity = str(SEVERITY.get(severity, severity)).lower()
    destination = 'recent:%s:%s'%(name, severity)
    message = time.asctime() + ' ' + message
    pipe = pipe or conn.pipeline()
    pipe.lpush(destination, message)
    pipe.ltrim(destination, 0, 99)
    pipe.execute()

## 常见日志
纯粹的消息记录日志，没有办法告诉你哪些消息是重要的，哪些是不重要的  
可以根据消息出现的频率来判定消息的重要性  
其实就是做一个有序集合

In [4]:
def log_common(conn, name, message, severity=logging.INFO, timeout=5):
    severity = str(SEVERITY.get(severity, severity)).lower()
    destination = 'common:%s:%s'%(name, severity)
    start_key = destination + ':start'
    pipe = conn.pipeline()
    end = time.time() + timeout
    while time.time() < end:
        try:
            # 还可以看一个不存在的键
            pipe.watch(start_key)
            # 格林尼治时间
            now = datetime.datetime.utcnow().timetuple()
            hour_start = datetime(*now[:4]).isoformat()
            
            existing = pipe.get(start_key)
            pipe.multi()
            # 每个小时对数据进行轮换，保留前一个小时的日志信息叫做:last
            if existing and existing < hour_start:
                pipe.rename(destination, destination + ':last')
                pipe.rename(start_key, destination + ':pstart')
                pipe.set(start_key, hour_start)
            elif not existing:
                pipe.set(start_key, hour_start)
            # 对message频率+1
            pipe.zincrby(destination, message)
            log_recent(pipe, name, message, severity, pipe)
            return
        except redis.exceptions.WatchError:
            continue

# 计数器和统计数据
利用hash创建一个计数器，能够得到不同时间的计数情况，时间精度也可以随之改变  
为了对每个需要清除历史数据的计数器进行历史清除，需要有一个有序结构能让我们遍历所有计数器，此时所有值为0的有序集合是个不错的选择，因为当值都相等时，有序集合会按照成员名字的顺序进行排序，从而能够进行遍历。

## 计数器

### 对计数器进行更新

In [5]:
PRECISION = [1, 5, 60, 300, 3600, 18000, 86400]
def update_counter(conn, name, count, now=None):
    now = now or time.time()
    pipe = conn.pipeline()
    for prec in PRECISION:
        # 取得当前时间片的开始时间
        pnow = int(now / prec) * prec
        hash = '%s:%s'%(prec, name)
        # 计数器可能已经在里面了，但是加一下也没关系
        pipe.zadd('known:', hash, 0)
        pipe.incrby('count:' + hash, pnow, count)
    pipe.execute()

### 获得计数器信息

In [6]:
# 获得相关计数器的信息
def get_counter(conn, name, precision):
    hash = '%s:%s'%(precision, name)
    data = conn.hgetall('count:' + hash)
    to_return = list()
    for key, value in data.items():
        to_return.append((int(key), int(value)))
    to_return.sort()
    return to_return

### 清理计数器信息
不适用EXPIRE，正如之前的发现的，EXPIRE只能设定一个键，不能对键中的某一部分数据进行设置  
但是在自己写守护进程的时候，需要注意一下情况:
* 任何时候都会有新的计数器被添加进来
* 同一时间可能会有多个清理程序在执行
* 清理某个计数器的时间间隔，肯定大于计数器本身的时间分片间隔，不然仅仅是浪费资源
* 如果一个计数器不包含任何数据，就肯定不需要对其进行清理

In [7]:
QUIT = False
SAMPLE_COUNT = 120
def clean_counters(conn):
    pipe = conn.pipeline(True)
    # 记录清理次数
    passes = 0
    while not QUIT:
        start = time.time()
        index = 0
        while index < conn.zcard('known:'):
            hash = conn.zrange('known:', index, index)
            index += 1
            if not hash:
                break
            hash = hash[0]
            # 取得计数器的精度
            prec = int(hash.partition(':')[0])
            # 取得计数器的清理频率
            bprec = int(prec // 60) or 2
            if passes % bprec:
                # 因为passes定义了一分钟做一次，所以由此可以确定某一个计数器在这一轮pass中是否需要清理
                continue
            
            hkey = 'count:'+ hash
            cutoff = time.time() - SAMPLE_COUNT * prec
            samples = map(int, conn.hkeys(hkey))
            # 计数器的键其实是没有被排序过的
            smaples.sort()
            # 通过bisect来确定要被移除的
            remove = bisect.bisect_right(samples, cutoff)
            
            if remove:
                conn.hdel(hkey, *samples[:remove])
                # 这个散列可能已经被清空了，因为清掉了所有数据
                if remove == len(samples):
                    try:
                        pipe.watch(hkey)
                        if not pipe.hlen(hkey):
                            pipe.multi()
                            pipe.zrem('known:', hash)
                            pipe.execute()
                            index -= 1
                        else:
                            pipe.unwatch()
                    except redis.exceptions.WatchError:
                        pass
            passes += 1
            duration = min(int(time.time() - start) + 1, 60)
            time.sleep(max(60 - duration), 1)

## 使用Redis存储统计数据
统计信息的存储其实可以是有序集合，虽然存储的是一个个信息，但是有序集合有助于在与其他有序集合做交集并集的时候的聚合计算

In [8]:
def update_stats(conn, context, type, value, timeout=5):
    # 用于存储的统计量
    destination = 'stats:%s:%s'%(context, type) 
    # 处理当前这一个小时的数据和上一个小时的数据。
    start_key = destination + ':start'
    pipe = conn.pipeline(True)
    end = time.time() + timeout
    while time.time() < end:
        try:
            pipe.watch(start_key) 
            now = datetime.utcnow().timetuple() 
            hour_start = datetime(*now[:4]).isoformat() 

            existing = pipe.get(start_key)
            pipe.multi()
            if existing and existing < hour_start:
                pipe.rename(destination, destination + ':last') 
                pipe.rename(start_key, destination + ':pstart') 
                pipe.set(start_key, hour_start)
            tkey1 = str(uuid.uuid4())
            tkey2 = str(uuid.uuid4())
            # 将值添加到临时键里面。
            pipe.zadd(tkey1, 'min', value)
            pipe.zadd(tkey2, 'max', value)                     
            # 使用合适聚合函数MIN和MAX，
            # 对存储统计数据的键和两个临时键进行并集计算。
            pipe.zunionstore(destination,                     
                [destination, tkey1], aggregate='min')          
            pipe.zunionstore(destination,                      
                [destination, tkey2], aggregate='max')        

            # 删除临时键。
            pipe.delete(tkey1, tkey2)                           
            # 对有序集合中的样本数量、值的和、值的平方之和三个成员进行更新。
            pipe.zincrby(destination, 'count')                  
            pipe.zincrby(destination, 'sum', value)            
            pipe.zincrby(destination, 'sumsq', value*value)    

            # 返回基本的计数信息，以便函数调用者在有需要时做进一步的处理。
            return pipe.execute()[-3:]                        
        except redis.exceptions.WatchError:
            # 如果新的一个小时已经开始，并且旧的数据已经被归档，那么进行重试。
            continue                                           

In [9]:
def get_status(conn, context, type):
    key = 'status:%s:%s'%(context, type)
    data = dict(conn.zrange(key, 0, -1, withscores=True))
    # 额外算出平均值
    data['average'] = data['sum'] / data['count']
    # 计算标准差的第一个步骤
    numerator = data['sumsq'] - data['sum'] ** 2 / data['count']
    data['stddev'] = (numerator / (data['count'] - 1 or 1)) ** .5
    return data

### 简化统计数据的记录与发现(*)

利用装饰器直接实现计时功能并将最慢的100个页面存储在数据结构内

In [10]:
# 将这个Python生成器用作上下文管理器。
@contextlib.contextmanager                                            
def access_time(conn, context):
    # 记录代码块执行前的时间。
    start = time.time()                                               
    # 运行被包裹的代码块。
    yield                                                              

    # 计算代码块的执行时长。
    delta = time.time() - start                                        
    # 更新这一上下文的统计数据。
    stats = update_stats(conn, context, 'AccessTime', delta)           
    # 计算页面的平均访问时长。
    average = stats[1] / stats[0]                                      

    pipe = conn.pipeline(True)
    # 将页面的平均访问时长添加到记录最慢访问时间的有序集合里面。
    pipe.zadd('slowest:AccessTime', context, average)                 
    # AccessTime有序集合只会保留最慢的100条记录。
    pipe.zremrangebyrank('slowest:AccessTime', 0, -101)                
    pipe.execute()

contextlib.contextmanager: 包装一个生成器带 yield， yield之前的语句在真正函数运行之前完成，之后的函数在函数运行之后完成，有点合并两个装饰器的感觉

In [11]:
@contextlib.contextmanager
def foo():
    start = time.time()
    yield
    delta = int(time.time() - start)
    print(delta)

In [12]:
with foo():
    time.sleep(1)

1


In [13]:
def process_view(conn, callback):             
    # 计算并记录访问时长的上下文管理器就是这样包围代码块的。
    with access_time(conn, request.path):     
        # 当上下文管理器中的yield语句被执行时，这个语句就会被执行。
        return callback()           

# 查找IP所属城市以及国家
一系列用于分析和载入IP所属地数据库的函数  
之所以用redis不用数据库，是因为redis在查找速度上更具优势  
需要用到两张表: 根据IP地址查找城市ID， 通过ID查找城市的详细信息

In [14]:
def ip_to_score(ip_address):
    score = 0
    for v in ip_address.split('.'):
        score = score * 256 + int(v, 10)
    return score

In [15]:
def import_ips_to_redis(conn, filename):
    csv_file = csv.reader(open(filename, 'rb'))
    for count, row in enumerate(csv_file):
        # 按需将IP地址转换为分值。
        start_ip = row[0] if row else ''             
        if 'i' in start_ip.lower():
            continue
        if '.' in start_ip:                            
            start_ip = ip_to_score(start_ip)           
        elif start_ip.isdigit():                       
            start_ip = int(start_ip, 10)               
        else:
            # 略过文件的第一行以及格式不正确的条目。
            continue                                  

        # 构建唯一城市ID。
        city_id = row[2] + '_' + str(count)            
        # 将城市ID及其对应的IP地址分值添加到有序集合里面。
        # 这里会比在数据库查询好很多，因为有有序集合，所以用 score 就行
        conn.zadd('ip2cityid:', city_id, start_ip)

In [16]:
def import_cities_to_redis(conn, filename):  
    for row in csv.reader(open(filename, 'rb')):
        if len(row) < 4 or not row[0].isdigit():
            continue
        row = [i.decode('latin-1') for i in row]
        # 准备好需要添加到散列里面的信息。
        city_id = row[0]                          
        country = row[1]                           
        region = row[2]                            
        city = row[3]                             
        # 将城市信息添加到Redis里面。
        conn.hset('cityid2city:', city_id, 
            json.dumps([city, region, country])) 

In [17]:
def find_city_by_ip(conn, ip_address):
    # 将IP地址转换为分值以便执行ZREVRANGEBYSCORE命令。
    if isinstance(ip_address, str):                        
        ip_address = ip_to_score(ip_address)               

    # 查找唯一城市ID。
    city_id = conn.zrevrangebyscore(                       
        'ip2cityid:', ip_address, 0, start=0, num=1)       

    if not city_id:
        return None

    # 将唯一城市ID转换为普通城市ID。
    city_id = city_id[0].partition('_')[0]                 
    # 从散列里面取出城市信息。
    return json.loads(conn.hget('cityid2city:', city_id))  


# 服务的发现与配置
就可以将很多配置信息转移到redis当中，通过访问redis来更改相关配置信息  
其实就是充当一个信息共享的平台，普通数据库其实也能做到，但是普通数据库只存储一个值得话总觉得怪怪的，配置文件毕竟大部分都是dict  
好处就是当更改配置信息时，服务不需要重启

应用举例：获取是否正在维护这个状态

In [18]:
LAST_CHECKED = None
IS_UNDER_MAINTENANCE = False

def is_under_maintenance(conn):
    # 将两个变量设置为全局变量以便在之后对它们进行写入。
    global LAST_CHECKED, IS_UNDER_MAINTENANCE
    # 距离上次检查是否已经超过1秒钟？
    if LAST_CHECKED < time.time() - 1:          
        # 更新最后检查时间。
        LAST_CHECKED = time.time()              
        # 检查系统是否正在进行维护。
        IS_UNDER_MAINTENANCE = bool(            
            conn.get('is-under-maintenance'))   
    # 返回一个布尔值，用于表示系统是否正在进行维护。
    return IS_UNDER_MAINTENANCE

## 为每个应用程序组件分别配置一个Redis服务器
同时可以用一个redis来记录所有其他redis服务器的相关信息

In [19]:
def set_config(conn, type, component, config):
    conn.set(
        'config:%s:%s'%(type, component),
        json.dumps(config))

In [20]:
CONFIGS = {}
CHECKED = {}

def get_config(conn, type, component, wait=1):
    key = 'config:%s:%s'%(type, component)

    # 检查是否需要对这个组件的配置信息进行更新。
    if CHECKED.get(key) < time.time() - wait:     
        # 有需要对配置进行更新，记录最后一次检查这个连接的时间。
        CHECKED[key] = time.time() 
        # 取得Redis存储的组件配置。
        config = json.loads(conn.get(key) or '{}')    
        # 将潜在的Unicode关键字参数转换为字符串关键字参数。
        config = dict((str(k), config[k]) for k in config)
        # 取得组件正在使用的配置。
        old_config = CONFIGS.get(key)                  

        # 如果两个配置并不相同……
        if config != old_config:                    
            # ……那么对组件的配置进行更新。
            CONFIGS[key] = config                     
    return CONFIGS.get(key)

## 自动Redis连接管理
利用装饰器来做连接redis的操作

In [21]:
REDIS_CONNECTIONS = {}

# 将应用组件的名字传递给装饰器。
# 这个装饰器还是带参数的
def redis_connection(component, wait=1):
    # 因为函数每次被调用都需要获取这个配置键，所以我们干脆把它缓存起来。
    key = 'config:redis:' + component                       
    # 包装器接受一个函数作为参数，并使用另一个函数来包裹这个函数。
    def wrapper(function):                              
        # 将被包裹函数里的一些有用的元数据复制到配置处理器。
        @functools.wraps(function)                      
        # 创建负责管理连接信息的函数。
        def call(*args, **kwargs):                      
            # 如果有旧配置存在，那么获取它。
            old_config = CONFIGS.get(key, object())     
            # 如果有新配置存在，那么获取它。
            _config = get_config(                   
                config_connection, 'redis', component, wait)

            config = {}
            # 对配置进行处理并将其用于创建Redis连接。
            for k, v in _config.iteritems():            
                config[k.encode('utf-8')] = v               

            # 如果新旧配置并不相同，那么创建新的连接。
            if config != old_config:                        
                REDIS_CONNECTIONS[key] = redis.Redis(**config) 

            # 将Redis连接以及其他匹配的参数传递给被包裹函数，然后调用函数并返回执行结果。
            return function(                                    
                REDIS_CONNECTIONS.get(key), *args, **kwargs)
        # 返回被包裹的函数。
        return call                                         
    # 返回用于包裹Redis函数的包装器。
    return wrapper                            