# 资源池

在一个项目中我们会遇到很多需要管理、分配的资源，比如像是线程、进程的管理，硬件资源的管理等。解决这个问题的一种常见方法，就是将同类资源都关联到一个资源池中，并在需要使用资源时向资源池发送请求以获取资源，在资源使用完后将其归还至资源池。可以使用 Redis 来构建资源池，在需要时使用资源时向资源池发送请求以获取资源，在资源使用完后将其归还至资源池。

资源池实际上就是一个记录资源的集合，所以可以使用 Redis 集合来实现它，其中将资源添加到资源池的工作可以通过 SADD 命令来实现，而从资源池中移除资源的工作可以通过 SREM 命令来实现。

除了需要使用一个集合来记录可用的资源，还需要使用另一个集合来记录已被占用的资源：每当用户执行获取操作从资源池中获取一项资源的时候，程序将使用 SMOVE 命令把获取的资源从可用资源集合移至已占用资源集合，并且为了保证每项资源都能被公平地获取，选取可用资源的工作将由 SRANDMEMBER 命令来完成

举个例子，我们可以对 Web 应用的数据库连接（conn1, conn2, ..., conn50）放在 Redis 连接池中。为什么这样做？比如假设一个服务能创建 50 个数据库连接，现在有多个服务，如果每个服务都独立创建 50 个连接，会超过数据库限制，导致连接失败或数据库崩溃。而且每个服务独立管理连接，可能导致连接未被充分利用，也有可能会有频繁的创建和销毁。

通过统一的数据库连接池，总连接数可控，不会超过数据库限制，而且服务用完连接后归还，供其他服务使用，减少空闲连接浪费。

如何存“数据库连接”？数据库驱动，如 JDBC 会为每个连接分配一个内部 ID，在 Redis 中，可以将这个 ID 作为字符串存储，例如 conn1, conn2, ..., conn50。（不具体研究了，应该类似 Java 的线程池原理）

还有比如一个分布式计算系统有 10 台计算节点（node1, node2, ..., node10），需要动态分配给任务使用，用完后归还，这种场景也会用到。

In [1]:
from redis import WatchError

def available_key(pool_name):
    return "ResourcePool:{0}:available".format(pool_name)

def occupied_key(pool_name):
    return "ResourcePool:{0}:occupied".format(pool_name)

class ResourcePool:

    def __init__(self, client, pool_name):
        """
        基于给定的资源池名字创建出相应的资源池对象。
        """
        self.client = client
        self.available_set = available_key(pool_name)
        self.occupied_set = occupied_key(pool_name)

    def associate(self, resource):
        """
        将指定资源关联到资源池中。
        返回True表示关联成功，返回False表示资源已存在，关联失败。
        返回None则表示关联过程中操作失败，需要重新尝试。
        """
        tx = self.client.pipeline()
        try:
            # 监视两个集合，观察它们是否在操作中途变化
            tx.watch(self.available_set, self.occupied_set)
            # 检查给定资源是否存在于两个集合之中
            if tx.sismember(self.available_set, resource) or tx.sismember(self.occupied_set, resource):
                # 资源已存在，放弃添加
                tx.unwatch()
                return False
            else:
                # 资源未存在，尝试添加
                tx.multi()
                tx.sadd(self.available_set, resource)
                return tx.execute()[0]==1  # 添加是否成功？
        except WatchError:
            # 操作过程中集合键发生了变化，需要重试
            pass
        finally:
            tx.reset()

    def disassociate(self, resource):
        """
        将指定资源从资源池中移除。
        移除成功返回True，因资源不存在而导致移除失败返回False。
        """
        # 使用事务同时向两个集合发出SREM命令
        # 当资源存在于池中时，其中一个集合将返回1作为SREM命令的结果
        tx = self.client.pipeline()
        tx.srem(self.available_set, resource)
        tx.srem(self.occupied_set, resource)
        ret1, ret2 = tx.execute()
        return ret1==1 or ret2==1

    def acquire(self):
        """
        尝试从资源池中获取并返回可用的资源。
        返回None表示资源池为空，或者获取操作过程中失败。
        """
        tx = self.client.pipeline()
        try:
            # 监视两个集合，观察它们是否在操作中途变化
            tx.watch(self.available_set, self.occupied_set)
            # 尝试从可用集合中随机获取一项资源
            # SRANDMEMBER available_set
            resource = tx.srandmember(self.available_set)
            if resource is not None:
                # 将资源从可用集合移动到已占用集合
                tx.multi()
                # SMOVE source destination member
                tx.smove(self.available_set, self.occupied_set, resource)
                smove_ret = tx.execute()[0]
                if smove_ret == 1:
                    return resource
        except WatchError:
            # 操作过程中集合键发生了变化，需要重试
            pass
        finally:
            tx.reset()

    def release(self, resource):
        """
        将给定的一项已被占用的资源回归至资源池。
        回归成功时返回True，因资源不属于资源池而导致回归失败时返回False。
        """
        # 将资源从已占用集合移动至可用集合
        return self.client.smove(self.occupied_set, self.available_set, resource)

    def available_count(self):
        """
        返回资源池中目前可用的资源数量。
        """
        return self.client.scard(self.available_set)

    def occupied_count(self):
        """
        返回资源池中目前已被占用的资源数量。
        """
        return self.client.scard(self.occupied_set)

    def total_count(self):
        """
        返回资源池中（包括可用和已占用）资源的总数量。
        """
        tx = self.client.pipeline()
        tx.scard(self.available_set)
        tx.scard(self.occupied_set)
        available_count, occupied_count = tx.execute()
        return available_count + occupied_count

    def is_available(self, resource):
        """
        检查指定资源是否可用，是的话返回True，否则返回False。
        """
        return self.client.sismember(self.available_set, resource)==1

    def is_occupied(self, resource):
        """
        检查给定资源是否已被占用，是的话返回True，否则返回False。
        """
        return self.client.sismember(self.occupied_set, resource)==1

    def has(self, resource):
        """
        检查给定资源是否存在于资源池（包括可用或已占用）。
        """
        tx = self.client.pipeline()
        tx.sismember(self.available_set, resource)
        tx.sismember(self.occupied_set, resource)
        is_available, is_occupied = tx.execute()
        return is_available or is_occupied