## iterator vs list

In [3]:
from sys import getsizeof
from typing import Iterator

from pyleak import no_task_leaks, no_event_loop_blocking, no_thread_leaks
from pyleak.base import LeakAction

# foo: Iterator[int] = (x for x in range(1_000_000))
foo: Iterator[int] = iter(range(1_000_000))
bar: list[int] = list(foo)

print("iterator:", getsizeof(foo), "bytes")
print("list:", getsizeof(bar), "bytes")


iterator: 32 bytes
list: 8000056 bytes


## httpx.AsyncClient reuse

benchmark the cases between "new client in each iter" and "one client for all iter"

In [4]:
from httpx import AsyncClient

import time
from typing import Callable, Awaitable


async def benchmark_async(func: Callable[[], Awaitable], n_iter: int = 5) -> float:
    start = time.perf_counter()
    for _ in range(n_iter):
        await func()
    return time.perf_counter() - start


async def request_with_new_client():
    print("1) Creating new client in each request...")
    async with AsyncClient() as client:
        await client.get('https://getman.cn/mock/route/to/demo')


async def request_with_reused_client(client: AsyncClient):
    print("2) Reusing client...")
    await client.get('https://getman.cn/mock/route/to/demo')


async def run_benchmarks():
    # New client per request
    new_client_time = await benchmark_async(request_with_new_client)

    # Reused client
    async with AsyncClient() as client:
        reused_client_time = await benchmark_async(
            lambda: request_with_reused_client(client)
        )

    print(f"New client per request: {new_client_time:.3f}s")
    print(f"Reused client: {reused_client_time:.3f}s")
    print(f"Performance difference: {(new_client_time / reused_client_time - 1) * 100:.1f}%")


await run_benchmarks()


1) Creating new client in each request...
1) Creating new client in each request...
1) Creating new client in each request...
1) Creating new client in each request...
1) Creating new client in each request...
2) Reusing client...
2) Reusing client...
2) Reusing client...
2) Reusing client...
2) Reusing client...
New client per request: 10.287s
Reused client: 2.686s
Performance difference: 283.0%


## dataclass / pydantic instead of dict

### performance comparison

- 100万次创建对比
- 100万次访问属性对比


In [5]:
from dataclasses import dataclass
from typing import Dict
import timeit
import random


# Define test structures
@dataclass
class Person:
    name: str
    age: int
    city: str


def create_dict() -> Dict:
    return {
        "name": "John",
        "age": random.randint(20, 80),
        "city": "New York"
    }


def create_dataclass() -> Person:
    return Person(
        name="John",
        age=random.randint(20, 80),
        city="New York"
    )


# Benchmark creation
dict_creation = timeit.timeit(
    'create_dict()',
    globals=globals(),
    number=1_000_000
)

dataclass_creation = timeit.timeit(
    'create_dataclass()',
    globals=globals(),
    number=1_000_000
)

print(f"Dict creation: {dict_creation:.3f}s")
print(f"Dataclass creation: {dataclass_creation:.3f}s")
print(f"Performance difference: {(dataclass_creation / dict_creation - 1) * 100:.1f}%")

# Benchmark attribute access
d = create_dict()
dc = create_dataclass()

dict_access = timeit.timeit(
    'd["name"]',
    globals=globals(),
    number=1_000_000
)

dataclass_access = timeit.timeit(
    'dc.name',
    globals=globals(),
    number=1_000_000
)

print(f"\nDict attribute access: {dict_access:.3f}s")
print(f"Dataclass attribute access: {dataclass_access:.3f}s")
print(f"Performance difference: {(dataclass_access / dict_access - 1) * 100:.1f}%")


Dict creation: 0.271s
Dataclass creation: 0.474s
Performance difference: 74.7%

Dict attribute access: 0.014s
Dataclass attribute access: 0.010s
Performance difference: -30.0%


### 可维护性对比

``` python
person = {
    "name": "John",
    "age": 30
}

# 输入的数据缺失时，会有可能的 KeyError
def get_styled_name(name: str) -> str:
    return name.title()
# 一次性函数
get_styled_name(person["name"]) # possible error: KeyError: 'name'


@dataclass
class Person:
    name: str
    age: int

    @property
    def styled_name(self) -> str:
        return self.name.title()

# 输入的数据缺失时，会有明确的错误提示
person = Person("John", 30)
# 内联的 property 避免了游离的函数
person.styled_name  # no error
```

### 结论

- dict 在创建方面表现更好，属性访问方面 dataclass 表现更好
- dataclass 相比 dict 能提高代码的可读性和可维护性


## 异常捕获

### 避免捕获异常基类 `Exception`


In [6]:
# 错误示范 - 捕获所有异常
try:
    value = int("abc")  # 可能抛出ValueError
    value2 = 1 / 0  # 可能抛出ZeroDivisionError
except Exception as e:
    # 问题: 所有错误都用相同方式处理，无法区分是类型转换错误还是除零错误
    print(f"发生错误: {e}")

# 正确示范 - 捕获具体异常类型
try:
    value = int("abc")  # 可能抛出ValueError
    value2 = 1 / 0  # 可能抛出ZeroDivisionError
except ValueError as e:
    # 针对性处理类型转换错误
    print(f"无法将字符串转换为整数: {e}")
except ZeroDivisionError as e:
    # 针对性处理除零错误
    print(f"不能除以零: {e}")


发生错误: invalid literal for int() with base 10: 'abc'
无法将字符串转换为整数: invalid literal for int() with base 10: 'abc'


### 打印错误堆栈

下面用一个嵌套函数的示例来展示缺少堆栈信息的问题

In [7]:
import logging


def process_data(data):
    try:
        result = complex_calculation(data)
        return result
    except Exception as e:
        # 不好的做法 - 只打印错误信息
        logging.error(f"Error: {e}")


def complex_calculation(data):
    # 假设这里有复杂的计算逻辑
    intermediate = data / 0  # 这里会触发除零错误
    return intermediate * 2


def main():
    # 测试函数
    result = process_data(10)
    logging.info("Final result: %s", result)


main()

ERROR:root:Error: division by zero


这种输出存在以下问题:

1. 无法知道错误发生在哪个函数中
2. 无法知道错误发生在代码的哪一行
3. 无法追踪错误的调用链路

In [8]:
def process_data_with_traceback(data):
    try:
        return complex_calculation(data)
    except Exception as e:
        # 好的做法 - 打印完整堆栈信息
        logging.error("Error occurred", exc_info=True)
        # logging.exception("Error occurred")
        return None


def complex_calculation(data):
    intermediate = data / 0
    return intermediate * 2


def main():
    result = process_data_with_traceback(10)
    logging.info("Final result:", result)


main()

ERROR:root:Error occurred
Traceback (most recent call last):
  File "C:\Users\xiaojiezhi-jk\AppData\Local\Temp\ipykernel_1856\3113272483.py", line 3, in process_data_with_traceback
    return complex_calculation(data)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\xiaojiezhi-jk\AppData\Local\Temp\ipykernel_1856\3113272483.py", line 12, in complex_calculation
    intermediate = data / 0
                   ~~~~~^~~
ZeroDivisionError: division by zero



通过堆栈信息, 我们可以:
1. 立即定位到错误发生在 complex_calculation 函数的第11行
2. 了解到错误类型是 ZeroDivisionError
3. 清楚地看到函数调用链: main -> process_data_with_traceback -> complex_calculation

## 异步代码中的同步代码

In [9]:
import asyncio
import time


async def sync_block():
    print("sync block started...")
    time.sleep(1)
    print("sync block finished.")


async def async_block():
    print("async block started...")
    await asyncio.sleep(1)
    print("async block finished.")


async def run_blocks():
    start_time = time.time()
    print(">>> Running sync blocks, started")
    tasks = (sync_block() for _ in range(5))
    await asyncio.gather(*tasks)
    print("Running sync blocks, finished, elapsed: %s" % (time.time() - start_time))

    print(">>> Running async blocks, started")
    start_time = time.time()
    tasks = (async_block() for _ in range(5))
    await asyncio.gather(*tasks)
    print("Running async blocks, finished, elapsed: %s" % (time.time() - start_time))


await run_blocks()


>>> Running sync blocks, started
sync block started...
sync block finished.
sync block started...
sync block finished.
sync block started...
sync block finished.
sync block started...
sync block finished.
sync block started...
sync block finished.
Running sync blocks, finished, elapsed: 5.004634380340576
>>> Running async blocks, started
async block started...
async block started...
async block started...
async block started...
async block started...
async block finished.
async block finished.
async block finished.
async block finished.
async block finished.
Running async blocks, finished, elapsed: 1.0110499858856201


### 使用 pyleak 检测

- `uv add pyleak`
- 配合使用 [pytest](https://github.com/deepankarm/pyleak?tab=readme-ov-file#add-the-plugin-to-your-pytest-configuration)

@pyleak_check.py && @test_leak.py

### 修复问题：使用线程执行同步函数

- 对于普通 python app，使用 `asyncio.to_thread` 或者 `loop.run_in_executor` 能让 sync函数在线程（池）中执行
- 对于 fastapi app
    - 在 api 层的函数定义直接使用 `def` 而不是 `async def`，fastapi 内部会自动转换
    - 自行封装线程池执行器 `concurrent.futures.ThreadPoolExecutor` + `loop.run_in_executor`
    - 使用 `starlette.concurrency.run_in_threadpool`



In [10]:
def sync_block():  # !! remove the `async` keyword
    print("sync block started...")
    time.sleep(1)
    print("sync block finished.")


async def run_with_wrapped_sync_block():
    start_time = time.time()
    print(">>> Running sync blocks, started")
    tasks = (asyncio.to_thread(sync_block) for _ in range(5))  # !! wrap the sync block in a separate thread
    await asyncio.gather(*tasks)
    print("Running sync blocks, finished, elapsed: %s" % (time.time() - start_time))

    print(">>> Running async blocks, started")
    start_time = time.time()
    tasks = (async_block() for _ in range(5))
    await asyncio.gather(*tasks)
    print("Running async blocks, finished, elapsed: %s" % (time.time() - start_time))


await run_with_wrapped_sync_block()

>>> Running sync blocks, started
sync block started...
sync block started...
sync block started...
sync block started...
sync block started...
sync block finished.sync block finished.
sync block finished.
sync block finished.
sync block finished.

Running sync blocks, finished, elapsed: 1.0274722576141357
>>> Running async blocks, started
async block started...
async block started...
async block started...
async block started...
async block started...
async block finished.
async block finished.
async block finished.
async block finished.
async block finished.
Running async blocks, finished, elapsed: 0.996722936630249


## logging

### 使用 `logging.getLogger` 而不是直接使用 `logging`

直接使用 `logging` 相当于使用了 `logging.getLogger('root')` （根 logger），无法通过配置控制行为（输出路径、格式等）。

应当使用 `logging.getLogger` 获取一个 logger，然后通过 logger 进行操作。

``` python
import logging

# 错误示范 - 直接使用 logging
logging.basicConfig(level=logging.DEBUG)
logging.debug("This is a debug message")

# 正确示范 - 使用 logging.getLogger
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
logger.debug("This is a debug message")
```

### 减少冗余日志

- 根据情况打印必要的日志，而不是全部打印。
- 使用不同的日志等级，只是 debug 用的，用 debug 等级输出，在正式环境，把全局的 log level 设置为更高的级别，避免日志过多。
- 在函数或方法中，使用 `logger.info` 代替 `print` 输出信息
    1. 级别控制 - 可以根据环境动态控制日志输出
    2. 多目标输出 - 可以同时输出到文件、控制台、网络等
    3. 结构化格式 - 提供时间戳、模块名、行号等上下文信息
    4. 生产环境友好 - 便于运维监控和问题排查
    5. 更好的错误处理 - 支持堆栈跟踪和异常信息记录
    6. 模块化管理 - 不同模块可以有独立的日志配置
    7. 性能优化 - 支持延迟求值，避免不必要的字符串操作

### 日志实在太多，本地开发时怎么只看自己关注的部分？

- 注释相关代码
- 使用 logger 的配置，如 `logger.disabled = True` 等方式关闭日志输出
- 使用 [zestyping/q](https://github.com/zestyping/q) 或者 [gruns/icecream](https://github.com/gruns/icecream)

使用 q 打印日志：

In [14]:
# macos / linux: tail -f $TMPDIR/q
# windows powershell: Get-Content $env:temp\q -Wait -Tail 30
logger = logging.getLogger(__name__)


def so_many_logs():
    logger.warning("This is a log message")
    logger.warning("This is a log message")

    __import__('q').q("look at this")
    some_number = 123
    __import__('q').q(some_number)

    logging.warning("This is a log message")
    logging.warning("This is a log message")


so_many_logs()

