### 从env文件读取数据，并给BaseSetting类实例赋值

In [1]:
from pydantic_settings import BaseSettings
from pydantic import Field
from dotenv import load_dotenv
import asyncio
import redis.asyncio as redis
# from pymongo import AsyncMongoClient
from motor.motor_asyncio import AsyncIOMotorClient
from beanie import init_beanie, Document
import mysql.connector.aio
import os

In [2]:
# load_dotenv(dotenv_path="./configs/.env")

In [2]:
class Settings(BaseSettings):
    # # DATABASE_URL: str = "mysql+pymysql://user:password@localhost/blog_db"
    MYSQL_URI: str = Field()
    MONGODB_URI: str = Field()
    REDIS_URI: str = Field()
    
    # mysql_uri: str = Field()
    # mongodb_uri: str=Field()
    # redis_uri: str=Field()
    
    # ALGORITHM: str = Field(default="HS256")
    # ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
    
    class Config:
        env_file = "./configs/.env"
        env_file_encoding = "utf-8"
        # extra = "allow"

settings = Settings()  # type: ignore
print(f"Mysql_URI: {settings.MYSQL_URI}")
print(f"Mongodb_URI:{settings.MONGODB_URI}")
print(f"Redis_URI: {settings.REDIS_URI}")
# print(f"Mysql_URI: {settings.mysql_uri}")
# print(f"Mongodb_URI:{settings.mongodb_uri}")
# print(f"Redis_URI: {settings.redis_uri}")

Mysql_URI: mysql+aiomysql://root:123456@localhost/lmxy
Mongodb_URI:mongodb://localhost:27017
Redis_URI: redis://localhost


In [5]:

class ConfigSettings(BaseSettings):
    redis_host: str = Field()
    redis_port: int = Field()
    redis_db: int = Field()
    redis_uri: str = Field()
    
    mongodb_uri: str = Field()
    mongodb_db: str = Field()
    
    mysql_host: str = Field()
    mysql_port: int = Field()
    mysql_user: str = Field()
    mysql_password: str = Field()
    mysql_database: str = Field()
    mysql_uri: str=Field()
    
    class Config:
        # env_file = ".env"
        env_file = "./configs/.env"
        env_file_encoding = "utf-8"
        # extra = 'allow'

# 实例化 BaseSettings
settings = ConfigSettings() # type: ignore
print("Settings loaded:")
print(f"Redis: {settings.redis_host}:{settings.redis_port}/{settings.redis_db}")
print(f"MongoDB: {settings.mongodb_uri}/{settings.mongodb_db}")
print(f"MySQL: {settings.mysql_user}@{settings.mysql_host}:{settings.mysql_port}/{settings.mysql_database}")

Settings loaded:
Redis: localhost:6379/0
MongoDB: mongodb://localhost:27017/lmxy
MySQL: root@localhost:3306/lmxy


#### 测试redis连接

In [4]:
async def test_redis():
    client = redis.Redis(
        host=settings.redis_host,
        port=settings.redis_port,
        db=settings.redis_db,
        decode_responses=True
    )
    try:
        await client.ping()
        print("Redis connection successful")
        # 测试写入和读取
        await client.set("test_key", "test_value")
        value = await client.get("test_key")
        print(f"Redis test_key: {value}")
    finally:
        await client.aclose()

# 运行测试
await test_redis()

Redis connection successful
Redis test_key: test_value


#### 测试mongodb连接

In [5]:
settings.mongodb_db

'test'

In [None]:
class TestDocument(Document):
    name: str
    
    class Settings:
        collection = "test_collection"

async def test_mongodb():
    client = AsyncIOMotorClient(settings.mongodb_uri)
    try:
        # 初始化 Beanie
        await init_beanie(database=client[settings.mongodb_db], document_models=[TestDocument])
        print("MongoDB connection successful")
        
        # 测试插入和查询
        doc = TestDocument(name="Test User")
        await doc.insert()
        result = await TestDocument.find_one(TestDocument.name == "Test User")
        if result:
            print(f"MongoDB test document: {result.model_dump()}")
        else:
            print("No document found with the specified query.")
    finally:
        client.close()

# 运行测试
await test_mongodb()

MongoDB connection successful
MongoDB test document: {'id': ObjectId('6846474c0714fdf8efe46738'), 'name': 'Test User'}


C:\Users\Tong\AppData\Local\Temp\ipykernel_17984\2506033081.py:18: PydanticDeprecatedSince20: The `dict` method is deprecated; use `model_dump` instead. Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2 Migration Guide at https://errors.pydantic.dev/2.11/migration/
  print(f"MongoDB test document: {result.dict()}")


#### 测试mysql连接

In [None]:
async def test_mysql():
    config = {
        "host": settings.mysql_host,
        "port": settings.mysql_port,
        "user": settings.mysql_user,
        "password": settings.mysql_password,
        "database": settings.mysql_database
    }
    cnx = await mysql.connector.aio.connect(**config)
    try:
        async with await cnx.cursor() as cur:
            await cur.execute("SELECT VERSION()")
            version = await cur.fetchone()
            if version:
                print(f"MySQL connection successful, version: {version[0]}")
    finally:
        await cnx.close()

# 运行测试
await test_mysql()

### 尝试mongodb数据库，motor异步驱动

motor日后不再更新，但是官方推荐的pymongo刚出，和fastapi集成不好，有问题，先用motor

In [5]:
from pymongo import AsyncMongoClient

In [7]:
client = AsyncMongoClient("mongodb://localhost:27017")
db = client["test"]
print("Connected to Mongodb")
print(db.name)
await client.close()

Connected to Mongodb
test


### redis数据库

aioredis异步驱动被遗弃，用redis库，现已集成异步操作

In [2]:
import redis.asyncio as redis

In [4]:
pool = redis.ConnectionPool.from_url("redis://localhost")
client=redis.Redis.from_pool(pool)
allkey = await client.keys("*")
print(allkey)
await client.aclose()

[b'name1', b'hello', b'user', b'name2', b'times']


### mysql数据库，aiomysql异步驱动，目前先用该库

- 创建表模型
  - 用SQLModel类创建
- 创建引擎
  - 有uri链接，连上数据库服务器
- 创建表
  - 用SQLModel的方法，把SQLModel的表模型创建
- 创建会话
  - 通过引擎，创建会话，管理链接，命令接口

In [5]:
from sqlalchemy.ext.asyncio import create_async_engine,AsyncSession,async_sessionmaker
from sqlmodel import SQLModel,Field


In [16]:
# class test(SQLModel,table=True):
#     __table_args__={"extend_existing":True}
#     name: str=Field()
#     score: int=Field()

In [None]:
DATABASE_URL = "mysql+aiomysql://root:123456@localhost:3306/blog_db"
async_engine = create_async_engine(DATABASE_URL, echo=True)
async with async_engine.begin() as conn:
            await conn.run_sync(SQLModel.metadata.create_all)
            print()
AsyncSessionLocal = async_sessionmaker( bind=async_engine, class_=AsyncSession,expire_on_commit=False)

# async with AsyncSessionLocal() as session:


### pathlib 新型操作系统文件和路径

In [12]:
from pathlib import Path 

In [11]:
test = Path("test.txt")
code = test.write_text("Hello World!")
code , test.read_text()

(12, 'Hello World!')

In [13]:
trytest = Path("./tests/try")

In [15]:
str(trytest)

'tests\\try'