# Bitmap 应用, 记录很多个事件的 0/1 状态


## 问题描述

假设我们要运行 1000 个任务, 每个任务都有可能成功和失败.

1. 我们想要记录每个任务的成功或失败状态
2. 并且能统计成功的任务的数量
3. 以及给定一个任务的 id, 知道这个任务是否成功
4. 以及给出所有成功或失败的任务的列表

当然这些任务是分布式的.

## 解法 1, 使用 SQL 数据库

创建一个有两列的表. primary key 是 task id, 另一列是 status, 0 表示失败, 1 表示成功.

In [50]:
!pip install -q sqlitedict
!pip install -q mpire

You should consider upgrading via the '/Users/sanhehu/.pyenv/versions/3.8.11/bin/python3.8 -m pip install --upgrade pip' command.[0m
You should consider upgrading via the '/Users/sanhehu/.pyenv/versions/3.8.11/bin/python3.8 -m pip install --upgrade pip' command.[0m


In [45]:
import redis
from sqlitedict import SqliteDict

endpoint = "sanhe-dev.hozbo8.ng.0001.use1.cache.amazonaws.com"
r = redis.Redis(host=endpoint, port=6379, db=0)

dct = SqliteDict(
    # ":memory:",
    "status.sqlite",
    tablename="status",
    autocommit=True,
)


In [46]:
import math
import time
import random

# 设置要执行的 task 的总数
n_task = 1024

KEY = "status"

# 计算需要的 bitmap 的位空间大小
n_bit_in_bitmap = math.ceil(n_task / 8) * 8


def init():
    """
    Initialize redis and sqlite.
    """
    dct.clear()
    r.delete(KEY)
    r.setbit(KEY, n_bit_in_bitmap - 1, 0)


def run_task(
    task_id: int,
    use_sqlite: bool = True,
    use_redis: bool = True,
):
    """
    任务有 70% 的几率成功.
    """
    key = str(task_id)
    if random.randint(1, 100) <= 70:
        if use_sqlite:
            dct[key] = 1
        if use_redis:
            r.setbit(KEY, task_id, 1)
    else:
        if use_sqlite:
            dct[key] = 0


def run_all_task(
    use_sqlite: bool = True,
    use_redis: bool = True,
):
    for task_id in range(n_task):
        run_task(
            task_id,
            use_sqlite=use_sqlite,
            use_redis=use_redis,
        )


def run_all_task_multi_process(
    use_sqlite: bool = True,
    use_redis: bool = True,
):
    import os
    from mpire import WorkerPool

    args = [
        dict(
            task_id=task_id,
            use_sqlite=use_sqlite,
            use_redis=use_redis,
        )
        for task_id in range(n_task)
    ]

    print(f"got {os.cpu_count()} core")
    with WorkerPool() as pool:
        pool.map(run_task, args)


def statistics_with_sqlite():
    """
    直接遍历所有的 row, 按照值为 1, 0 分类.
    """
    success = list()
    failed = list()
    for k, v in dct.items():
        if v:
            success.append(k)
        else:
            failed.append(k)
    return success, failed


def statistics_with_redis():
    """
    把 Value 整个读取出来, 然后对 bytes string 进行遍历, 这个遍历时从低位到高位大遍历的, 和字符串不同, 字符串在内存中例如 "abc", 是先 a, 后 b, 再 c. 但是 bytes string 刚好相反.
    然后用 bin 函数将 二进制整数 换算成 10 进制整数的字符串形式, 即可进行比较了.
    """
    ith = -1
    success = list()
    failed = list()
    for b_value in r.get(KEY):
        for c in bin(b_value)[2:]:
            ith += 1
            if c == "1":
                success.append(ith)
            else:
                failed.append(ith)
    return success, failed

In [47]:
# 这段代码执行所有 task, 可以用参数控制是否用 sqlite 或是 redis 来记录 status
# 注意 sqlite 不支持多线程模式, 会锁死
# 跟 redis 通信在单机上由于是一个序列, 所以会比较慢, 而在生产环境中会有很多并发, 所以会比较相对较快.
init()
st = time.time()
run_all_task(use_sqlite=True, use_redis=False)
# run_all_task_multi_process(use_sqlite=False, use_redis=True)
elapsed = time.time() - st
print(f"elapsed = {elapsed:.6f}")

elapsed = 1.265563


In [49]:
# 用 sqlite 对结果进行统计, 并跟 redis 的结果进行比较
st = time.time()
success, failed = statistics_with_sqlite()
elapsed = time.time() - st
print(f"elapsed = {elapsed:.6f}")
print(f"n success = {len(success)}")
print(f"n failed = {len(failed)}")
print(f"success: {success}")
print(f"failed: {failed}")

elapsed = 0.009024
n success = 717
n failed = 307
success: ['0', '2', '3', '6', '7', '8', '9', '10', '12', '13', '14', '16', '17', '18', '19', '20', '21', '24', '25', '26', '29', '31', '32', '33', '34', '37', '39', '40', '41', '42', '44', '45', '47', '48', '49', '50', '51', '52', '53', '57', '58', '60', '62', '65', '66', '68', '72', '73', '74', '75', '76', '78', '79', '80', '81', '83', '84', '86', '87', '88', '89', '91', '92', '94', '97', '99', '100', '101', '102', '103', '104', '105', '107', '108', '109', '110', '111', '116', '117', '118', '119', '121', '123', '125', '127', '128', '129', '130', '131', '132', '133', '136', '137', '138', '139', '140', '142', '143', '144', '145', '146', '150', '151', '152', '153', '154', '155', '157', '158', '159', '160', '161', '162', '163', '165', '166', '167', '168', '170', '171', '173', '174', '175', '179', '180', '181', '182', '183', '184', '186', '187', '188', '189', '190', '191', '194', '195', '196', '197', '198', '199', '203', '204', '206', '207'

In [44]:
# 用 redis 对结果进行统计, 并跟 sqlite 的结果进行比较
st = time.time()
success, failed = statistics_with_redis()
elapsed = time.time() - st
print(f"elapsed = {elapsed:.6f}")
print(f"n success = {len(success)}")
print(f"n failed = {n_task - len(success)}")
print(f"success: {success}")
print(f"failed: {failed}")

elapsed = 0.050739
n success = 743
n failed = 281
success: [0, 1, 4, 5, 6, 7, 8, 9, 10, 11, 13, 15, 16, 17, 18, 19, 21, 22, 23, 24, 25, 26, 27, 29, 31, 32, 33, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 47, 48, 49, 51, 54, 55, 56, 58, 59, 60, 62, 63, 65, 66, 68, 69, 70, 71, 72, 73, 74, 75, 76, 78, 79, 80, 81, 82, 83, 84, 86, 87, 88, 89, 91, 92, 93, 94, 95, 96, 97, 98, 99, 101, 102, 103, 107, 109, 111, 112, 113, 116, 117, 119, 120, 122, 123, 126, 127, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149, 150, 151, 153, 154, 155, 156, 157, 158, 159, 162, 165, 166, 167, 168, 169, 173, 174, 175, 176, 178, 179, 181, 182, 183, 184, 188, 189, 190, 191, 192, 194, 195, 197, 198, 200, 202, 203, 205, 206, 207, 208, 209, 210, 211, 212, 213, 214, 215, 217, 218, 219, 220, 221, 222, 223, 224, 225, 226, 228, 229, 231, 232, 233, 234, 235, 236, 238, 239, 240, 242, 244, 245, 246, 247, 248, 250, 251, 252, 255, 256, 259, 260, 261, 262, 263, 264, 265, 267, 268,