From e29e4788978458bc0d12a8d6c2030789cedaaa11 Mon Sep 17 00:00:00 2001 From: downdawn <1436759077@qq.com> Date: Tue, 18 Nov 2025 15:24:45 +0800 Subject: [PATCH 1/3] feat: Add support distributed deployment for Snowflake Auto-assign node IDs via Redis to support multi-instance deployment and avoid ID conflicts --- backend/core/conf.py | 9 ++ backend/core/registrar.py | 7 ++ backend/utils/snowflake.py | 196 +++++++++++++++++++++++++++++++++++-- 3 files changed, 203 insertions(+), 9 deletions(-) diff --git a/backend/core/conf.py b/backend/core/conf.py index 6f22cb643..b0c844c7c 100644 --- a/backend/core/conf.py +++ b/backend/core/conf.py @@ -52,6 +52,15 @@ class Settings(BaseSettings): # Redis REDIS_TIMEOUT: int = 5 + # .env Snowflake + SNOWFLAKE_CLUSTER_ID: int | None = None + SNOWFLAKE_NODE_ID: int | None = None + + # Snowflake + SNOWFLAKE_REDIS_PREFIX: str = 'fba:snowflake' + SNOWFLAKE_HEARTBEAT_INTERVAL: int = 30 # 心跳间隔(秒) + SNOWFLAKE_NODE_TTL: int = 60 # 节点存活时间(秒) + # .env Token TOKEN_SECRET_KEY: str # 密钥 secrets.token_urlsafe(32) diff --git a/backend/core/registrar.py b/backend/core/registrar.py index 1e20e5004..38d6cbfe5 100644 --- a/backend/core/registrar.py +++ b/backend/core/registrar.py @@ -34,6 +34,7 @@ from backend.utils.health_check import ensure_unique_route_names, http_limit_callback from backend.utils.openapi import simplify_operation_ids from backend.utils.serializers import MsgSpecJSONResponse +from backend.utils.snowflake import snowflake @asynccontextmanager @@ -57,11 +58,17 @@ async def register_init(app: FastAPI) -> AsyncGenerator[None, None]: http_callback=http_limit_callback, ) + # 初始化 snowflake 节点 + await snowflake.initialize() + # 创建操作日志任务 create_task(OperaLogMiddleware.consumer()) yield + # 释放 snowflake 节点 + await snowflake.shutdown() + # 关闭 redis 连接 await redis_client.aclose() diff --git a/backend/utils/snowflake.py b/backend/utils/snowflake.py index e91883ce9..23830e16a 100644 --- a/backend/utils/snowflake.py +++ b/backend/utils/snowflake.py @@ -1,10 +1,14 @@ +import asyncio +import os import time from dataclasses import dataclass from backend.common.dataclasses import SnowflakeInfo from backend.common.exception import errors +from backend.common.log import log from backend.core.conf import settings +from backend.database.redis import RedisCli @dataclass(frozen=True) @@ -35,31 +39,199 @@ class SnowflakeConfig: DEFAULT_SEQUENCE: int = 0 +class SnowflakeNodeManager: + """雪花算法节点管理器,负责从 Redis 分配和管理节点 ID""" + + def __init__(self, redis_client: RedisCli) -> None: + """ + 初始化节点管理器 + + :param redis_client: Redis 客户端实例 + """ + self.redis_client = redis_client + self.prefix = settings.SNOWFLAKE_REDIS_PREFIX + self.cluster_id: int | None = None + self.node_id: int | None = None + self.heartbeat_task: asyncio.Task | None = None + + async def acquire_node_id(self) -> tuple[int, int]: + """ + 从 Redis 获取可用的 cluster_id 和 node_id + + :return: (cluster_id, node_id) + """ + # 查找所有已占用的节点 + occupied_nodes = set() + pattern = f'{self.prefix}:nodes:*' + async for key in self.redis_client.scan_iter(match=pattern): + # 解析 key: {prefix}:nodes:{cluster_id}:{node_id} + parts = key.split(':') + if len(parts) >= 5: + try: + cluster_id = int(parts[-2]) + node_id = int(parts[-1]) + occupied_nodes.add((cluster_id, node_id)) + except ValueError: + continue + + # 查找最小可用的 ID 组合 + for cluster_id in range(SnowflakeConfig.MAX_DATACENTER_ID + 1): + for node_id in range(SnowflakeConfig.MAX_WORKER_ID + 1): + # 尝试注册这个节点 + if (cluster_id, node_id) not in occupied_nodes and await self.register_node(cluster_id, node_id): + return cluster_id, node_id + + raise errors.ServerError(msg='无可用的雪花算法节点 ID,所有 ID 已被占用') + + async def register_node(self, cluster_id: int, node_id: int) -> bool: + """ + 注册节点并设置 TTL + + :param cluster_id: 集群 ID + :param node_id: 节点 ID + :return: 注册成功返回 True,失败返回 False + """ + key = f'{self.prefix}:nodes:{cluster_id}:{node_id}' + # 使用 SETNX 原子操作,只有 key 不存在时才设置 + # 存储进程信息用于调试 + value = f'pid:{os.getpid()}' + success = await self.redis_client.set(key, value, nx=True, ex=settings.SNOWFLAKE_NODE_TTL) + return bool(success) + + async def release_node(self, cluster_id: int, node_id: int) -> None: + """ + 释放节点 ID + + :param cluster_id: 集群 ID + :param node_id: 节点 ID + """ + key = f'{self.prefix}:nodes:{cluster_id}:{node_id}' + await self.redis_client.delete(key) + + async def heartbeat(self, cluster_id: int, node_id: int) -> None: + """ + 心跳续期任务 + + :param cluster_id: 集群 ID + :param node_id: 节点 ID + """ + key = f'{self.prefix}:nodes:{cluster_id}:{node_id}' + try: + while True: + await asyncio.sleep(settings.SNOWFLAKE_HEARTBEAT_INTERVAL) + try: + # 续期 TTL + await self.redis_client.expire(key, settings.SNOWFLAKE_NODE_TTL) + log.debug(f'雪花算法节点心跳: cluster_id={cluster_id}, node_id={node_id}') + except Exception as e: + log.error(f'雪花算法节点心跳失败: {e}') + except asyncio.CancelledError: + log.info(f'雪花算法节点心跳任务取消: cluster_id={cluster_id}, node_id={node_id}') + + async def start_heartbeat(self, cluster_id: int, node_id: int) -> None: + """ + 启动心跳任务 + + :param cluster_id: 集群 ID + :param node_id: 节点 ID + """ + self.cluster_id = cluster_id + self.node_id = node_id + self.heartbeat_task = asyncio.create_task(self.heartbeat(cluster_id, node_id)) + + async def stop_heartbeat(self) -> None: + """停止心跳任务""" + if self.heartbeat_task: + self.heartbeat_task.cancel() + try: + await self.heartbeat_task + except asyncio.CancelledError: + pass + self.heartbeat_task = None + + class Snowflake: """雪花算法类""" def __init__( self, - cluster_id: int = SnowflakeConfig.DEFAULT_DATACENTER_ID, - node_id: int = SnowflakeConfig.DEFAULT_WORKER_ID, + cluster_id: int | None = None, + node_id: int | None = None, sequence: int = SnowflakeConfig.DEFAULT_SEQUENCE, ) -> None: """ 初始化雪花算法生成器 - :param cluster_id: 集群 ID (0-31) - :param node_id: 节点 ID (0-31) + :param cluster_id: 集群 ID (0-31),None 表示自动分配 + :param node_id: 节点 ID (0-31),None 表示自动分配 :param sequence: 起始序列号 """ - if cluster_id < 0 or cluster_id > SnowflakeConfig.MAX_DATACENTER_ID: - raise errors.RequestError(msg=f'集群编号必须在 0-{SnowflakeConfig.MAX_DATACENTER_ID} 之间') - if node_id < 0 or node_id > SnowflakeConfig.MAX_WORKER_ID: - raise errors.RequestError(msg=f'节点编号必须在 0-{SnowflakeConfig.MAX_WORKER_ID} 之间') - self.node_id = node_id self.cluster_id = cluster_id self.sequence = sequence self.last_timestamp = -1 + self.node_manager: SnowflakeNodeManager | None = None + self._initialized = False + self._auto_allocated = False # 标记是否是自动分配的 ID + + async def initialize(self) -> None: + """ + 初始化雪花算法,从环境变量或 Redis 获取节点 ID + """ + if self._initialized: + return + + # 优先从环境变量读取配置 + env_cluster_id = settings.SNOWFLAKE_CLUSTER_ID + env_node_id = settings.SNOWFLAKE_NODE_ID + + if env_cluster_id is not None and env_node_id is not None: + # 使用环境变量配置 + self.cluster_id = env_cluster_id + self.node_id = env_node_id + log.info(f'✅ 雪花算法使用环境变量配置: cluster_id={self.cluster_id}, node_id={self.node_id}') + elif self.cluster_id is not None and self.node_id is not None: + # 使用初始化时传入的配置 + log.info(f'✅ 雪花算法使用手动配置: cluster_id={self.cluster_id}, node_id={self.node_id}') + else: + # 从 Redis 自动分配 + from backend.database.redis import redis_client + + self.node_manager = SnowflakeNodeManager(redis_client) + self.cluster_id, self.node_id = await self.node_manager.acquire_node_id() + self._auto_allocated = True + log.info( + f'✅ 雪花算法从 Redis 自动分配: cluster_id={self.cluster_id}, node_id={self.node_id}, pid={os.getpid()}' + ) + + # 启动心跳任务 + await self.node_manager.start_heartbeat(self.cluster_id, self.node_id) + + # 验证 ID 范围 + if self.cluster_id < 0 or self.cluster_id > SnowflakeConfig.MAX_DATACENTER_ID: + raise errors.RequestError(msg=f'集群编号必须在 0-{SnowflakeConfig.MAX_DATACENTER_ID} 之间') + if self.node_id < 0 or self.node_id > SnowflakeConfig.MAX_WORKER_ID: + raise errors.RequestError(msg=f'节点编号必须在 0-{SnowflakeConfig.MAX_WORKER_ID} 之间') + + self._initialized = True + + async def shutdown(self) -> None: + """ + 关闭雪花算法,释放节点 ID + """ + if not self._initialized: + return + + if self.node_manager and self._auto_allocated: + # 停止心跳 + await self.node_manager.stop_heartbeat() + + # 释放节点 + if self.cluster_id is not None and self.node_id is not None: + await self.node_manager.release_node(self.cluster_id, self.node_id) + log.info(f'✅ 雪花算法节点已释放: cluster_id={self.cluster_id}, node_id={self.node_id}') + + self._initialized = False @staticmethod def _current_millis() -> int: @@ -81,6 +253,12 @@ def _next_millis(self, last_timestamp: int) -> int: def generate(self) -> int: """生成雪花 ID""" + if not self._initialized: + raise errors.ServerError(msg='雪花算法未初始化,请先调用 initialize() 方法') + + if self.cluster_id is None or self.node_id is None: + raise errors.ServerError(msg='雪花算法节点 ID 未设置') + timestamp = self._current_millis() if timestamp < self.last_timestamp: From 44bd6b11d458d745021ab9450fe8ed6de9d92fb3 Mon Sep 17 00:00:00 2001 From: Wu Clan Date: Tue, 18 Nov 2025 18:01:50 +0800 Subject: [PATCH 2/3] Update the algorithm implementation --- backend/common/dataclasses.py | 4 +- backend/core/conf.py | 8 +- backend/core/registrar.py | 2 +- backend/utils/snowflake.py | 346 ++++++++++++++-------------------- 4 files changed, 144 insertions(+), 216 deletions(-) diff --git a/backend/common/dataclasses.py b/backend/common/dataclasses.py index b838e9ba3..389be6857 100644 --- a/backend/common/dataclasses.py +++ b/backend/common/dataclasses.py @@ -70,6 +70,6 @@ class UploadUrl: class SnowflakeInfo: timestamp: int datetime: str - cluster_id: int - node_id: int + datacenter_id: int + worker_id: int sequence: int diff --git a/backend/core/conf.py b/backend/core/conf.py index b0c844c7c..b093cdb82 100644 --- a/backend/core/conf.py +++ b/backend/core/conf.py @@ -53,13 +53,13 @@ class Settings(BaseSettings): REDIS_TIMEOUT: int = 5 # .env Snowflake - SNOWFLAKE_CLUSTER_ID: int | None = None - SNOWFLAKE_NODE_ID: int | None = None + SNOWFLAKE_DATACENTER_ID: int | None = None + SNOWFLAKE_WORKER_ID: int | None = None # Snowflake SNOWFLAKE_REDIS_PREFIX: str = 'fba:snowflake' - SNOWFLAKE_HEARTBEAT_INTERVAL: int = 30 # 心跳间隔(秒) - SNOWFLAKE_NODE_TTL: int = 60 # 节点存活时间(秒) + SNOWFLAKE_HEARTBEAT_INTERVAL_SECONDS: int = 30 + SNOWFLAKE_NODE_TTL_SECONDS: int = 60 # .env Token TOKEN_SECRET_KEY: str # 密钥 secrets.token_urlsafe(32) diff --git a/backend/core/registrar.py b/backend/core/registrar.py index 38d6cbfe5..dd2408a93 100644 --- a/backend/core/registrar.py +++ b/backend/core/registrar.py @@ -59,7 +59,7 @@ async def register_init(app: FastAPI) -> AsyncGenerator[None, None]: ) # 初始化 snowflake 节点 - await snowflake.initialize() + await snowflake.init() # 创建操作日志任务 create_task(OperaLogMiddleware.consumer()) diff --git a/backend/utils/snowflake.py b/backend/utils/snowflake.py index 23830e16a..f0935b569 100644 --- a/backend/utils/snowflake.py +++ b/backend/utils/snowflake.py @@ -1,5 +1,7 @@ import asyncio +import datetime import os +import threading import time from dataclasses import dataclass @@ -8,12 +10,13 @@ from backend.common.exception import errors from backend.common.log import log from backend.core.conf import settings -from backend.database.redis import RedisCli +from backend.database.redis import redis_client +from backend.utils.timezone import timezone @dataclass(frozen=True) class SnowflakeConfig: - """雪花算法配置类""" + """雪花算法配置类,采用 Twitter 原版 Snowflake 64 位 ID 位分配配置(通用标准)""" # 位分配 WORKER_ID_BITS: int = 5 @@ -33,255 +36,180 @@ class SnowflakeConfig: # 元年时间戳 EPOCH: int = 1262275200000 - # 默认值 - DEFAULT_DATACENTER_ID: int = 1 - DEFAULT_WORKER_ID: int = 0 - DEFAULT_SEQUENCE: int = 0 + # 时钟回拨容忍阈值,应对 NTP 自动同步引起的正常回跳(非标准) + CLOCK_BACKWARD_TOLERANCE_MS: int = 10_000 class SnowflakeNodeManager: """雪花算法节点管理器,负责从 Redis 分配和管理节点 ID""" - def __init__(self, redis_client: RedisCli) -> None: - """ - 初始化节点管理器 - - :param redis_client: Redis 客户端实例 - """ - self.redis_client = redis_client - self.prefix = settings.SNOWFLAKE_REDIS_PREFIX - self.cluster_id: int | None = None - self.node_id: int | None = None - self.heartbeat_task: asyncio.Task | None = None + def __init__(self) -> None: + """初始化节点管理器""" + self.datacenter_id: int | None = None + self.worker_id: int | None = None + self.node_redis_prefix: str = f'{settings.SNOWFLAKE_REDIS_PREFIX}:nodes' + self._heartbeat_task: asyncio.Task | None = None async def acquire_node_id(self) -> tuple[int, int]: - """ - 从 Redis 获取可用的 cluster_id 和 node_id - - :return: (cluster_id, node_id) - """ - # 查找所有已占用的节点 + """从 Redis 获取可用的 datacenter_id 和 worker_id""" occupied_nodes = set() - pattern = f'{self.prefix}:nodes:*' - async for key in self.redis_client.scan_iter(match=pattern): - # 解析 key: {prefix}:nodes:{cluster_id}:{node_id} + async for key in redis_client.scan_iter(match=f'{self.node_redis_prefix}:*'): parts = key.split(':') if len(parts) >= 5: try: - cluster_id = int(parts[-2]) - node_id = int(parts[-1]) - occupied_nodes.add((cluster_id, node_id)) + datacenter_id = int(parts[-2]) + worker_id = int(parts[-1]) + occupied_nodes.add((datacenter_id, worker_id)) except ValueError: continue - # 查找最小可用的 ID 组合 - for cluster_id in range(SnowflakeConfig.MAX_DATACENTER_ID + 1): - for node_id in range(SnowflakeConfig.MAX_WORKER_ID + 1): - # 尝试注册这个节点 - if (cluster_id, node_id) not in occupied_nodes and await self.register_node(cluster_id, node_id): - return cluster_id, node_id + # 顺序查找第一个可用的 ID 组合 + for datacenter_id in range(SnowflakeConfig.MAX_DATACENTER_ID + 1): + for worker_id in range(SnowflakeConfig.MAX_WORKER_ID + 1): + if (datacenter_id, worker_id) not in occupied_nodes and await self._register(datacenter_id, worker_id): + return datacenter_id, worker_id - raise errors.ServerError(msg='无可用的雪花算法节点 ID,所有 ID 已被占用') + raise errors.ServerError(msg='无可用的雪花算法节点,节点已耗尽') - async def register_node(self, cluster_id: int, node_id: int) -> bool: - """ - 注册节点并设置 TTL - - :param cluster_id: 集群 ID - :param node_id: 节点 ID - :return: 注册成功返回 True,失败返回 False - """ - key = f'{self.prefix}:nodes:{cluster_id}:{node_id}' - # 使用 SETNX 原子操作,只有 key 不存在时才设置 - # 存储进程信息用于调试 - value = f'pid:{os.getpid()}' - success = await self.redis_client.set(key, value, nx=True, ex=settings.SNOWFLAKE_NODE_TTL) - return bool(success) - - async def release_node(self, cluster_id: int, node_id: int) -> None: - """ - 释放节点 ID + async def _register(self, datacenter_id: int, worker_id: int) -> bool: + key = f'{self.node_redis_prefix}:{datacenter_id}:{worker_id}' + value = f'pid:{os.getpid()}-ts:{timezone.now().timestamp()}' + return await redis_client.set(key, value, nx=True, ex=settings.SNOWFLAKE_NODE_TTL_SECONDS) - :param cluster_id: 集群 ID - :param node_id: 节点 ID - """ - key = f'{self.prefix}:nodes:{cluster_id}:{node_id}' - await self.redis_client.delete(key) + async def start_heartbeat(self, datacenter_id: int, worker_id: int) -> None: + """启动节点心跳""" + self.datacenter_id = datacenter_id + self.worker_id = worker_id - async def heartbeat(self, cluster_id: int, node_id: int) -> None: - """ - 心跳续期任务 - - :param cluster_id: 集群 ID - :param node_id: 节点 ID - """ - key = f'{self.prefix}:nodes:{cluster_id}:{node_id}' - try: + async def heartbeat() -> None: + key = f'{self.node_redis_prefix}:{datacenter_id}:{worker_id}' while True: - await asyncio.sleep(settings.SNOWFLAKE_HEARTBEAT_INTERVAL) + await asyncio.sleep(settings.SNOWFLAKE_HEARTBEAT_INTERVAL_SECONDS) try: - # 续期 TTL - await self.redis_client.expire(key, settings.SNOWFLAKE_NODE_TTL) - log.debug(f'雪花算法节点心跳: cluster_id={cluster_id}, node_id={node_id}') + await redis_client.expire(key, settings.SNOWFLAKE_NODE_TTL_SECONDS) + log.debug(f'雪花算法节点心跳任务开始:datacenter_id={datacenter_id}, worker_id={worker_id}') except Exception as e: - log.error(f'雪花算法节点心跳失败: {e}') - except asyncio.CancelledError: - log.info(f'雪花算法节点心跳任务取消: cluster_id={cluster_id}, node_id={node_id}') + log.error(f'雪花算法节点心跳任务失败:{e}') - async def start_heartbeat(self, cluster_id: int, node_id: int) -> None: - """ - 启动心跳任务 + self._heartbeat_task = asyncio.create_task(heartbeat()) - :param cluster_id: 集群 ID - :param node_id: 节点 ID - """ - self.cluster_id = cluster_id - self.node_id = node_id - self.heartbeat_task = asyncio.create_task(self.heartbeat(cluster_id, node_id)) - - async def stop_heartbeat(self) -> None: - """停止心跳任务""" - if self.heartbeat_task: - self.heartbeat_task.cancel() + async def release(self) -> None: + """释放节点""" + if self._heartbeat_task: + self._heartbeat_task.cancel() try: - await self.heartbeat_task + await self._heartbeat_task except asyncio.CancelledError: - pass - self.heartbeat_task = None + log.debug(f'雪花算法节点心跳任务释放:datacenter_id={self.datacenter_id}, worker_id={self.worker_id}') + + if self.datacenter_id is not None and self.worker_id is not None: + key = f'{self.node_redis_prefix}:{self.datacenter_id}:{self.worker_id}' + await redis_client.delete(key) class Snowflake: """雪花算法类""" - def __init__( - self, - cluster_id: int | None = None, - node_id: int | None = None, - sequence: int = SnowflakeConfig.DEFAULT_SEQUENCE, - ) -> None: - """ - 初始化雪花算法生成器 + def __init__(self) -> None: + """初始化雪花算法""" + self.datacenter_id: int | None = None + self.worker_id: int | None = None + self.sequence: int = 0 + self.last_timestamp: int = -1 - :param cluster_id: 集群 ID (0-31),None 表示自动分配 - :param node_id: 节点 ID (0-31),None 表示自动分配 - :param sequence: 起始序列号 - """ - self.node_id = node_id - self.cluster_id = cluster_id - self.sequence = sequence - self.last_timestamp = -1 - self.node_manager: SnowflakeNodeManager | None = None + self._lock = threading.Lock() self._initialized = False - self._auto_allocated = False # 标记是否是自动分配的 ID + self._node_manager: SnowflakeNodeManager | None = None + self._auto_allocated = False # 标记是否由 Redis 自动分配 ID - async def initialize(self) -> None: - """ - 初始化雪花算法,从环境变量或 Redis 获取节点 ID - """ + async def init(self) -> None: + """初始化雪花算法""" if self._initialized: return - # 优先从环境变量读取配置 - env_cluster_id = settings.SNOWFLAKE_CLUSTER_ID - env_node_id = settings.SNOWFLAKE_NODE_ID - - if env_cluster_id is not None and env_node_id is not None: - # 使用环境变量配置 - self.cluster_id = env_cluster_id - self.node_id = env_node_id - log.info(f'✅ 雪花算法使用环境变量配置: cluster_id={self.cluster_id}, node_id={self.node_id}') - elif self.cluster_id is not None and self.node_id is not None: - # 使用初始化时传入的配置 - log.info(f'✅ 雪花算法使用手动配置: cluster_id={self.cluster_id}, node_id={self.node_id}') - else: - # 从 Redis 自动分配 - from backend.database.redis import redis_client - - self.node_manager = SnowflakeNodeManager(redis_client) - self.cluster_id, self.node_id = await self.node_manager.acquire_node_id() - self._auto_allocated = True - log.info( - f'✅ 雪花算法从 Redis 自动分配: cluster_id={self.cluster_id}, node_id={self.node_id}, pid={os.getpid()}' - ) - - # 启动心跳任务 - await self.node_manager.start_heartbeat(self.cluster_id, self.node_id) - - # 验证 ID 范围 - if self.cluster_id < 0 or self.cluster_id > SnowflakeConfig.MAX_DATACENTER_ID: - raise errors.RequestError(msg=f'集群编号必须在 0-{SnowflakeConfig.MAX_DATACENTER_ID} 之间') - if self.node_id < 0 or self.node_id > SnowflakeConfig.MAX_WORKER_ID: - raise errors.RequestError(msg=f'节点编号必须在 0-{SnowflakeConfig.MAX_WORKER_ID} 之间') - - self._initialized = True + with self._lock: + if self._initialized: + return + + # 环境变量固定分配 + if settings.SNOWFLAKE_DATACENTER_ID is not None and settings.SNOWFLAKE_WORKER_ID is not None: + self.datacenter_id = settings.SNOWFLAKE_DATACENTER_ID + self.worker_id = settings.SNOWFLAKE_WORKER_ID + log.debug( + f'雪花算法使用环境变量固定节点:datacenter_id={self.datacenter_id}, worker_id={self.worker_id}' + ) + else: + # Redis 动态分配 + self._node_manager = SnowflakeNodeManager() + self.datacenter_id, self.worker_id = await self._node_manager.acquire_node_id() + self._auto_allocated = True + await self._node_manager.start_heartbeat(self.datacenter_id, self.worker_id) + log.debug( + f'雪花算法使用 Redis 动态分配节点:datacenter_id={self.datacenter_id}, worker_id={self.worker_id}' + ) + + # 严格校验范围 + if not (0 <= self.datacenter_id <= SnowflakeConfig.MAX_DATACENTER_ID): + raise errors.RequestError(msg=f'雪花算法 datacenter_id 必须在 0~{SnowflakeConfig.MAX_DATACENTER_ID}') + if not (0 <= self.worker_id <= SnowflakeConfig.MAX_WORKER_ID): + raise errors.RequestError(msg=f'雪花算法 worker_id 必须在 0~{SnowflakeConfig.MAX_WORKER_ID}') + + self._initialized = True async def shutdown(self) -> None: - """ - 关闭雪花算法,释放节点 ID - """ - if not self._initialized: - return - - if self.node_manager and self._auto_allocated: - # 停止心跳 - await self.node_manager.stop_heartbeat() - - # 释放节点 - if self.cluster_id is not None and self.node_id is not None: - await self.node_manager.release_node(self.cluster_id, self.node_id) - log.info(f'✅ 雪花算法节点已释放: cluster_id={self.cluster_id}, node_id={self.node_id}') - - self._initialized = False + """释放 Redis 节点""" + if self._node_manager and self._auto_allocated: + await self._node_manager.release() @staticmethod - def _current_millis() -> int: - """返回当前毫秒时间戳""" - return int(time.time() * 1000) - - def _next_millis(self, last_timestamp: int) -> int: - """ - 等待至下一毫秒 + def _current_ms() -> int: + return int(timezone.now().timestamp() * 1000) - :param last_timestamp: 上次生成 ID 的时间戳 - :return: - """ - timestamp = self._current_millis() - while timestamp <= last_timestamp: - time.sleep((last_timestamp - timestamp + 1) / 1000.0) - timestamp = self._current_millis() - return timestamp + def _till_next_ms(self, last_timestamp: int) -> int: + """等待直到下一毫秒""" + ts = self._current_ms() + while ts <= last_timestamp: + time.sleep(0.0001) + ts = self._current_ms() + return ts def generate(self) -> int: """生成雪花 ID""" if not self._initialized: - raise errors.ServerError(msg='雪花算法未初始化,请先调用 initialize() 方法') - - if self.cluster_id is None or self.node_id is None: - raise errors.ServerError(msg='雪花算法节点 ID 未设置') - - timestamp = self._current_millis() - - if timestamp < self.last_timestamp: - raise errors.ServerError(msg=f'系统时间倒退,拒绝生成 ID 直到 {self.last_timestamp}') - - if timestamp == self.last_timestamp: - self.sequence = (self.sequence + 1) & SnowflakeConfig.SEQUENCE_MASK - if self.sequence == 0: - timestamp = self._next_millis(self.last_timestamp) - else: - self.sequence = 0 - - self.last_timestamp = timestamp - - return ( - ((timestamp - SnowflakeConfig.EPOCH) << SnowflakeConfig.TIMESTAMP_LEFT_SHIFT) - | (self.cluster_id << SnowflakeConfig.DATACENTER_ID_SHIFT) - | (self.node_id << SnowflakeConfig.WORKER_ID_SHIFT) - | self.sequence - ) + raise errors.ServerError(msg='雪花 ID 生成失败,雪花算法未初始化') + + with self._lock: + timestamp = self._current_ms() + + # 时钟回拨处理 + if timestamp < self.last_timestamp: + back_ms = self.last_timestamp - timestamp + if back_ms <= SnowflakeConfig.CLOCK_BACKWARD_TOLERANCE_MS: + log.warning(f'检测到时钟回拨 {back_ms} ms,等待恢复...') + timestamp = self._till_next_ms(self.last_timestamp) + else: + raise errors.ServerError(msg=f'时钟回拨超过 {back_ms} ms,雪花 ID 生成失败,请立即联系系统管理员') + + # 同毫秒内序列号递增 + if timestamp == self.last_timestamp: + self.sequence = (self.sequence + 1) & SnowflakeConfig.SEQUENCE_MASK + if self.sequence == 0: + timestamp = self._till_next_ms(self.last_timestamp) + else: + self.sequence = 0 + + self.last_timestamp = timestamp + + # 组合 64 位 ID + return ( + ((timestamp - SnowflakeConfig.EPOCH) << SnowflakeConfig.TIMESTAMP_LEFT_SHIFT) + | (self.datacenter_id << SnowflakeConfig.DATACENTER_ID_SHIFT) + | (self.worker_id << SnowflakeConfig.WORKER_ID_SHIFT) + | self.sequence + ) @staticmethod - def parse_id(snowflake_id: int) -> SnowflakeInfo: + def parse(snowflake_id: int) -> SnowflakeInfo: """ 解析雪花 ID,获取其包含的详细信息 @@ -289,15 +217,15 @@ def parse_id(snowflake_id: int) -> SnowflakeInfo: :return: """ timestamp = (snowflake_id >> SnowflakeConfig.TIMESTAMP_LEFT_SHIFT) + SnowflakeConfig.EPOCH - cluster_id = (snowflake_id >> SnowflakeConfig.DATACENTER_ID_SHIFT) & SnowflakeConfig.MAX_DATACENTER_ID - node_id = (snowflake_id >> SnowflakeConfig.WORKER_ID_SHIFT) & SnowflakeConfig.MAX_WORKER_ID + datacenter_id = (snowflake_id >> SnowflakeConfig.DATACENTER_ID_SHIFT) & SnowflakeConfig.MAX_DATACENTER_ID + worker_id = (snowflake_id >> SnowflakeConfig.WORKER_ID_SHIFT) & SnowflakeConfig.MAX_WORKER_ID sequence = snowflake_id & SnowflakeConfig.SEQUENCE_MASK return SnowflakeInfo( timestamp=timestamp, - datetime=time.strftime(settings.DATETIME_FORMAT, time.localtime(timestamp / 1000)), - cluster_id=cluster_id, - node_id=node_id, + datetime=timezone.to_str(datetime.datetime.fromtimestamp(timestamp / 1000, timezone.tz_info)), + datacenter_id=datacenter_id, + worker_id=worker_id, sequence=sequence, ) From f541d99b9c5c5d14868f24997b711621de0b2b13 Mon Sep 17 00:00:00 2001 From: Wu Clan Date: Wed, 19 Nov 2025 12:19:41 +0800 Subject: [PATCH 3/3] Remove duplicate codes and update error messages --- backend/utils/snowflake.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/backend/utils/snowflake.py b/backend/utils/snowflake.py index f0935b569..267bd8cde 100644 --- a/backend/utils/snowflake.py +++ b/backend/utils/snowflake.py @@ -128,9 +128,6 @@ async def init(self) -> None: return with self._lock: - if self._initialized: - return - # 环境变量固定分配 if settings.SNOWFLAKE_DATACENTER_ID is not None and settings.SNOWFLAKE_WORKER_ID is not None: self.datacenter_id = settings.SNOWFLAKE_DATACENTER_ID @@ -138,6 +135,11 @@ async def init(self) -> None: log.debug( f'雪花算法使用环境变量固定节点:datacenter_id={self.datacenter_id}, worker_id={self.worker_id}' ) + elif (settings.SNOWFLAKE_DATACENTER_ID is not None and settings.SNOWFLAKE_WORKER_ID is None) or ( + settings.SNOWFLAKE_DATACENTER_ID is None and settings.SNOWFLAKE_WORKER_ID is not None + ): + log.error('雪花算法 datacenter_id 和 worker_id 配置错误,只允许同时非 None 或同时为 None') + raise errors.ServerError(msg='雪花算法配置失败,请联系系统管理员') else: # Redis 动态分配 self._node_manager = SnowflakeNodeManager() @@ -150,9 +152,11 @@ async def init(self) -> None: # 严格校验范围 if not (0 <= self.datacenter_id <= SnowflakeConfig.MAX_DATACENTER_ID): - raise errors.RequestError(msg=f'雪花算法 datacenter_id 必须在 0~{SnowflakeConfig.MAX_DATACENTER_ID}') + log.error(f'雪花算法 datacenter_id 配置失败,必须在 0~{SnowflakeConfig.MAX_DATACENTER_ID} 之间') + raise errors.ServerError(msg='雪花算法数据中心配置失败,请联系系统管理员') if not (0 <= self.worker_id <= SnowflakeConfig.MAX_WORKER_ID): - raise errors.RequestError(msg=f'雪花算法 worker_id 必须在 0~{SnowflakeConfig.MAX_WORKER_ID}') + log.error(f'雪花算法 worker_id 配置失败,必须在 0~{SnowflakeConfig.MAX_WORKER_ID} 之间') + raise errors.ServerError(msg='雪花算法工作机器配置失败,请联系系统管理员') self._initialized = True @@ -188,7 +192,7 @@ def generate(self) -> int: log.warning(f'检测到时钟回拨 {back_ms} ms,等待恢复...') timestamp = self._till_next_ms(self.last_timestamp) else: - raise errors.ServerError(msg=f'时钟回拨超过 {back_ms} ms,雪花 ID 生成失败,请立即联系系统管理员') + raise errors.ServerError(msg=f'雪花 ID 生成失败,时钟回拨超过 {back_ms} ms,请立即联系系统管理员') # 同毫秒内序列号递增 if timestamp == self.last_timestamp: