In [6]:
import time
import requests
from functools import wraps

# 定义一个装饰器来测量函数运行时间
def measure_time(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        elapsed_time = time.time() - start_time
        print(f"{func.__name__} took {elapsed_time:.2f} seconds.")
        return result
    return wrapper

In [9]:
import requests

API_URL = "http://localhost:8000/api"  # Make sure to change this to your actual API server URL.

@measure_time
def get_product_details(product_id):
    """获取产品详情"""
    response = requests.get(f"{API_URL}/products/{product_id}")
    print("\nGet Product Details:")
    print("Status Code:", response.status_code)
    if response.status_code == 200:
        print("Response:", response.json())
    else:
        print("Error:", response.text)
    return response

# 假设我们有一个产品ID为"123"，你可以替换为实际的产品ID
product_id = "993"
get_product_details(product_id)



Get Product Details:
Status Code: 200
Response: {'id': 993, 'model': 'CQRJSTPH-A', 'sku': 'CQRJSTPH-A', 'mpn': 'B0731MZCGF', 'quantity': 991, 'stock_status_id': 7, 'image_url': 'catalog/CQRobot/2.0JST C/CQRJST2.0-C-1.jpg', 'manufacturer_id': 8, 'price': 4.99, 'date_available': '2011-04-28T00:00:00', 'weight_grams': 158.0, 'viewed': 29731, 'date_added': '2017-05-01T00:00:00', 'date_modified': '2021-11-11T06:17:33'}
get_product_details took 2.11 seconds.


<Response [200]>

In [15]:
import requests

API_URL = "http://127.0.0.1:8000/api"


def test_me(access_token=None):
    """尝试获取当前用户信息"""
    headers = {}
    if access_token:
        headers['Authorization'] = f'Bearer {access_token}'
    response = requests.get(f"{API_URL}/me", headers=headers)
    print("\nGet Me:")
    print("Status Code:", response.status_code)
    print("Response:", response.json())


def register_user():
    """注册新用户"""
    data = {
        "email": "test@example1.com",
        # "password": "password123",
        "password": "newpassword456",
        "first_name": "Test",
        "last_name": "User",
        "phone_number": "1234567890"
    }
    response = requests.post(f"{API_URL}/users/register", json=data)
    print("\nRegister User:")
    print("Status Code:", response.status_code)
    print("Response:", response.json())


def login_user():
    """登录并获取访问令牌"""
    data = {
        "email": "test@example.com",
        "password": "newpassword456",
    }
    response = requests.post(f"{API_URL}/users/login", json=data)
    print("\nLogin User:")
    print("Status Code:", response.status_code)
    print("Response:", response.json())
    return response.json().get("access_token")


if __name__ == "__main__":
    # Step 1: 尝试获取当前用户信息（应该失败）
    test_me()

    # Step 2: 注册新用户
    register_user()

    # Step 3: 登录并获取访问令牌
    access_token = login_user()

    # Step 4: 使用访问令牌再次尝试获取当前用户信息（应该成功）
    test_me(access_token)



Get Me:
Status Code: 401
Response: {'detail': 'Not authenticated'}

Register User:
Status Code: 200
Response: {'message': 'User registered successfully.'}

Login User:
Status Code: 400
Response: {'detail': 'Incorrect email or password'}

Get Me:
Status Code: 401
Response: {'detail': 'Not authenticated'}


In [11]:
import requests
API_URL = "http://127.0.0.1:8000/api"
def reset_password(email, old_password, new_password):
    """重置密码"""
    data = {
        "email": email,
        "old_password": old_password,
        "new_password": new_password
    }
    response = requests.post(f"{API_URL}/users/reset_password", json=data)
    print("\nReset Password:")
    print("Status Code:", response.status_code)
    print("Response:", response.json())


if __name__ == "__main__":
    # Step 1: 尝试获取当前用户信息（应该失败）
    # test_me()

    # # Step 2: 注册新用户
    # register_user()

    # # Step 3: 登录并获取访问令牌
    # access_token = login_user()

    # # Step 4: 使用访问令牌再次尝试获取当前用户信息（应该成功）
    # test_me(access_token)

    # Step 5: 重置密码（假设已知旧密码）
    reset_password("testuser@example.com", "testpassword123", "newpassword456")



Reset Password:
Status Code: 200
Response: {'message': 'Password updated successfully.'}


In [4]:
# reset_password("test@example.com", "newpassword456", "newpassword456")

In [2]:
# import requests
# from functools import wraps
# import time

# # 定义一个装饰器来测量函数运行时间
# def measure_time(func):
#     @wraps(func)
#     def wrapper(*args, **kwargs):
#         start_time = time.time()
#         result = func(*args, **kwargs)
#         elapsed_time = time.time() - start_time
#         print(f"{func.__name__} took {elapsed_time:.2f} seconds.")
#         return result
#     return wrapper

# @measure_time
# def test_register():
#     url = "http://localhost:8000/api/users/register"
#     payload = {
#         "email": "testuser@example.com",
#         "password": "testpassword123",
#         "first_name": "Test",
#         "last_name": "User",
#         "phone_number": "1234567890"
#     }
#     response = requests.post(url, json=payload)
#     print("Register - Status Code:", response.status_code)
#     print("Register - Response Body:", response.json())

# @measure_time
# def test_login():
#     url = "http://localhost:8000/api/users/login"
#     payload = {
#         "email": "testuser@example.com",
#         "password": "testpassword123"
#     }
#     response = requests.post(url, json=payload)
#     print("Login - Status Code:", response.status_code)
#     print("Login - Response Body:", response.json())
#     return response.json().get('access_token')

# @measure_time
# def test_me(access_token):
#     url = "http://localhost:8000/api/me"
#     headers = {
#         "Authorization": f"Bearer {access_token}"
#     }
#     response = requests.get(url, headers=headers)
#     print("Me - Status Code:", response.status_code)
#     print("Me - Response Body:", response.json())

# @measure_time
# def test_logout(access_token):
#     url = "http://localhost:8000/api/users/logout"
#     headers = {
#         "Authorization": f"Bearer {access_token}"
#     }
#     response = requests.post(url, headers=headers)
#     print("Logout - Status Code:", response.status_code)
#     print("Logout - Response Body:", response.json())

# @measure_time
# def test_refresh_token(access_token):
#     url = "http://localhost:8000/api/users/refresh_token"
#     headers = {
#         "Authorization": f"Bearer {access_token}"
#     }
#     response = requests.post(url, headers=headers)
#     print("Refresh Token - Status Code:", response.status_code)
#     print("Refresh Token - Response Body:", response.json())
#     return response.json().get('access_token')

# if __name__ == "__main__":
#     test_register()
#     access_token = test_login()
#     if access_token:
#         test_me(access_token)
#         test_logout(access_token)
#         # 测试刷新令牌逻辑（如果适用）
#         new_access_token = test_refresh_token(access_token)
#         if new_access_token:
#             test_me(new_access_token)


In [1]:
import requests
from functools import wraps
import time

# 定义一个装饰器来测量函数运行时间
def measure_time(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        elapsed_time = time.time() - start_time
        print(f"{func.__name__} took {elapsed_time:.2f} seconds.")
        return result
    return wrapper

@measure_time
def test_register():
    url = "http://localhost:8000/api/users/register"
    payload = {
        "email": "testuser@example.com",
        "password": "testpassword123",
        "first_name": "Test",
        "last_name": "User",
        "phone_number": "1234567890"
    }
    response = requests.post(url, json=payload)
    print("Register - Status Code:", response.status_code)
    print("Register - Response Body:", response.json())

@measure_time
def test_login():
    url = "http://localhost:8000/api/users/login"
    payload = {
        "email": "testuser@example.com",
        "password": "testpassword123" # "testpassword123"
    }
    response = requests.post(url, json=payload)
    print("Login - Status Code:", response.status_code)
    print("Login - Response Body:", response.json())
    return response.json().get('access_token'), response.json().get('refresh_token')

@measure_time
def test_me(access_token):
    url = "http://localhost:8000/api/users/me"
    headers = {
        "Authorization": f"Bearer {access_token}"
    }
    response = requests.get(url, headers=headers)
    print("Me - Status Code:", response.status_code)
    print("Me - Response Body:", response.json())


@measure_time
def test_logout(access_token):
    url = "http://localhost:8000/api/users/logout"
    headers = {
        "Authorization": f"Bearer {access_token}",
        "Content-Type": "application/json"  # 确保设置正确的Content-Type
    }
    # 注意：确保通过JSON请求体发送刷新令牌
    # payload = {"refresh_token": refresh_token}
    response = requests.post(url, headers=headers)  # , json=payload使用json=payload发送JSON数据
    print("Logout - Status Code:", response.status_code)
    print("Logout - Response Body:", response.json())


@measure_time
def test_refresh_token(access_token):#refresh_token
    url = "http://localhost:8000/api/users/refresh_token"
    # 注意：根据您的接口设计调整传递刷新令牌的方式
    # payload = {"refresh_token": refresh_token}
    headers = {
        "Authorization": f"Bearer {access_token}",
        "Content-Type": "application/json"  # 确保设置正确的Content-Type
    }
    response = requests.post(url, headers=headers)#, json=payload
    print("Refresh Token - Status Code:", response.status_code)
    print("Refresh Token - Response Body:", response.json())
    if response.status_code == 200:
        return response.json().get('access_token')
    return None

# import time

# if __name__ == "__main__":
#     # 注册用户
#     test_register()
#     # 登录并获取令牌
#     # access_token, refresh_token = test_login()
#     access_token, _ = test_login()
#     if access_token:
#         # 使用令牌访问 /me 接口
#         test_me(access_token)
#         # 登出
#         # test_logout(access_token, refresh_token)
#         test_logout(access_token)
#         # 尝试在登出后使用刷新令牌获取新的访问令牌（应该失败）
#         print("Attempting to refresh token after logout:")
#         test_refresh_token(access_token)#refresh_token
#         # 重新登录并获取令牌
#         access_token, refresh_token = test_login()
#         if access_token:
#             # 使用新的访问令牌访问 /me 接口
#             test_me(access_token)
#             # 使用刷新令牌获取新的访问令牌（应该成功）
#             print("Attempting to refresh token after re-login:")
#             # time.sleep(10)
#             new_access_token = test_refresh_token(access_token)#refresh_token
#             if new_access_token:
#                 # 使用新的访问令牌访问 /me 接口
#                 test_me(new_access_token)


In [12]:
# 定义一个装饰器来测量函数运行时间
def measure_time(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        elapsed_time = time.time() - start_time
        print(f"{func.__name__} took {elapsed_time:.2f} seconds.")
        return result
    return wrapper

@measure_time
def test_get_addresses(access_token):
    url = "http://localhost:8000/api/users/addresses"
    headers = {"Authorization": f"Bearer {access_token}"}
    response = requests.get(url, headers=headers)
    print("Get Addresses - Status Code:", response.status_code)
    print("Get Addresses - Response Body:", response.json())
    return response.json()

@measure_time
def test_add_address(access_token):
    url = "http://localhost:8000/api/users/addresses"
    headers = {"Authorization": f"Bearer {access_token}"}
    payload = {
        "address_1": "123 Elm St",
        "address_2": "Apt 4",
        "city_state": "Springfield",
        "zip": "12345",
        "country": "USA"
    }
    response = requests.post(url, json=payload, headers=headers)
    print("Add Address - Status Code:", response.status_code)
    print("Add Address - Response Body:", response.json())
    return response.json()

@measure_time
def test_get_address_detail(address_id, access_token):
    url = f"http://localhost:8000/api/users/addresses/{address_id}"
    headers = {"Authorization": f"Bearer {access_token}"}
    response = requests.get(url, headers=headers)
    print("Get Address Detail - Status Code:", response.status_code)
    print("Get Address Detail - Response Body:", response.json())
    return response.json()

@measure_time
def test_update_address(address_id, access_token):
    url = f"http://localhost:8000/api/users/addresses/{address_id}"
    headers = {"Authorization": f"Bearer {access_token}"}
    payload = {
        "address_1": "123 Elm Street",
        "address_2": "Suite 4",
        "city_state": "Springfield",
        "zip": "12345",
        "country": "USA"
    }
    response = requests.patch(url, json=payload, headers=headers)
    print("Update Address - Status Code:", response.status_code)
    print("Update Address - Response Body:", response.json())
    return response.json()

@measure_time
def test_delete_address(address_id, access_token):
    url = f"http://localhost:8000/api/users/addresses/{address_id}"
    headers = {"Authorization": f"Bearer {access_token}"}
    response = requests.delete(url, headers=headers)
    print("Delete Address - Status Code:", response.status_code)
    print("Delete Address - Response Body:", response.json())
    return response.status_code


if __name__ == "__main__":
    # 登录并获取令牌
    access_token, _ = test_login()
    if access_token:
        # 测试地址API
        # 验证初始地址列表为空
        test_get_addresses(access_token)
        # 添加一个地址
        address_info = test_add_address(access_token)
        address_id = address_info['id']
        # 获取并验证这个新添加的地址
        test_get_address_detail(address_id, access_token)
        # 更新这个地址
        test_update_address(address_id, access_token)
        # 删除这个地址
        test_delete_address(address_id, access_token)
        # 验证地址已删除
        test_get_addresses(access_token)


Login - Status Code: 200
Login - Response Body: {'access_token': 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJ0ZXN0dXNlckBleGFtcGxlLmNvbSIsImV4cCI6MTcxMzQ1MzY0MH0.mf3lCaAmQaSPzAjzSrPqo2JvIjWTog8KrJ_7CgQxGuo', 'token_type': 'bearer', 'expires_in': 180}
test_login took 2.11 seconds.
Get Addresses - Status Code: 200
Get Addresses - Response Body: []
test_get_addresses took 2.06 seconds.
Add Address - Status Code: 201
Add Address - Response Body: {'address_1': '123 Elm St', 'address_2': 'Apt 4', 'address_3': None, 'city_state': 'Springfield', 'zip': '12345', 'country': 'USA', 'id': 1}
test_add_address took 2.07 seconds.
Get Address Detail - Status Code: 200
Get Address Detail - Response Body: {'address_1': '123 Elm St', 'address_2': 'Apt 4', 'address_3': None, 'city_state': 'Springfield', 'zip': '12345', 'country': 'USA', 'id': 1}
test_get_address_detail took 2.05 seconds.
Update Address - Status Code: 200
Update Address - Response Body: {'address_1': '123 Elm Street', 'address_2': 'Su

In [8]:
{"Authorization": f"Bearer {access_token}"}

{'Authorization': 'Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJ0ZXN0dXNlckBleGFtcGxlLmNvbSIsImV4cCI6MTcxMzQ1MjQyMn0.eqRAvpTqA8CqjRKCtXVNMVdKxIkwnG9TcZVWqcwEyAk'}

In [10]:
test_get_addresses("")

Get Addresses - Status Code: 401
Get Addresses - Response Body: {'detail': 'Token is invalid'}
test_get_addresses took 2.08 seconds.


{'detail': 'Token is invalid'}

In [21]:
@measure_time
def test_view_cart_items(access_token):
    url = "http://localhost:8000/api/cart/"
    headers = {"Authorization": f"Bearer {access_token}"}
    response = requests.get(url, headers=headers)
    print("View Cart Items - Status Code:", response.status_code)
    print("View Cart Items - Response Body:", response.json())

@measure_time
def test_view_cart_item_details(access_token, product_id):
    url = f"http://localhost:8000/api/cart/{product_id}"
    headers = {"Authorization": f"Bearer {access_token}"}
    response = requests.get(url, headers=headers)
    print("View Cart Item Details - Status Code:", response.status_code)
    print("View Cart Item Details - Response Body:", response.json())

@measure_time
def test_update_cart_item(access_token, product_id, new_amount):
    url = f"http://localhost:8000/api/cart/{product_id}"
    headers = {
        "Authorization": f"Bearer {access_token}",
        "Content-Type": "application/json"
    }
    payload = {"amount": new_amount}
    response = requests.patch(url, headers=headers, json=payload)
    print("Update Cart Item - Status Code:", response.status_code)
    print("Update Cart Item - Response Body:", response.json())

# Example of using these test functions:
if __name__ == "__main__":
    test_register()
    access_token, _ = test_login()
    if access_token:
        test_me(access_token)
        test_logout(access_token)

        # Re-login to perform cart operations
        access_token, _ = test_login()
        if access_token:
            # Assuming you have a product_id, here using a placeholder
            product_id = 1  # This should be replaced with an actual product_id
            test_view_cart_items(access_token)
            test_view_cart_item_details(access_token, product_id)
            # Test updating cart item, assuming new_amount as an example
            new_amount = 5
            test_update_cart_item(access_token, product_id, new_amount)
            test_view_cart_items(access_token)
            test_logout(access_token)


Register - Status Code: 400
Register - Response Body: {'detail': 'Email already registered'}
test_register took 2.04 seconds.
Login - Status Code: 200
Login - Response Body: {'access_token': 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJ0ZXN0dXNlckBleGFtcGxlLmNvbSIsImV4cCI6MTcxMjkyMDY2MH0.SonUqt2TJJpXfxVSqiwM7Nb_9k0oFynp0l-Cj66qnYw', 'token_type': 'bearer', 'expires_in': 180}
test_login took 2.10 seconds.
Me - Status Code: 200
Me - Response Body: {'email': 'testuser@example.com', 'first_name': 'Test', 'last_name': 'User', 'phone_number': '1234567890'}
test_me took 2.05 seconds.
Logout - Status Code: 200
Logout - Response Body: {'message': 'You have been logged out.'}
test_logout took 2.10 seconds.
Login - Status Code: 200
Login - Response Body: {'access_token': 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJ0ZXN0dXNlckBleGFtcGxlLmNvbSIsImV4cCI6MTcxMjkyMDY2N30._9jGvlR-VAG7rbCaQGYD-dfCiLgZaqMZBzF1DQ1BKyw', 'token_type': 'bearer', 'expires_in': 180}
test_login took 2.11 seconds.
View

View Cart Items - Status Code: 200
View Cart Items - Response Body: [{'product_id': 1, 'amount': 5}, {'product_id': 2, 'amount': 3}]
test_view_cart_items took 2.06 seconds.


In [11]:
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
# from database import Base  # 确保这里正确导入你的 Base 类
from database.models import User, Cart, Product

DATABASE_URL = "sqlite:///./test.db"  # 或使用其他数据库连接字符串

engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

# 创建所有表（如果尚未创建）
Base.metadata.create_all(bind=engine)

def add_products():
    db = SessionLocal()
    try:
        # 创建一些产品实例
        product1 = Product(
            name="Laptop",
            description="High-performance laptop",
            price=1000,
            currency_id=1,  # 假设货币ID已存在
            quantity=10,
            weight_grams=2000,
            image_path="/images/laptop.png"
        )
        product2 = Product(
            name="Smartphone",
            description="Latest model smartphone",
            price=700,
            currency_id=1,  # 同上
            quantity=15,
            weight_grams=300,
            image_path="/images/smartphone.png"
        )
        
        # 添加到会话并提交
        db.add(product1)
        db.add(product2)
        db.commit()
        print("Products added successfully.")
    except Exception as e:
        print(f"Error adding products: {e}")
        db.rollback()
    finally:
        db.close()

if __name__ == "__main__":
    add_products()


Products added successfully.


In [2]:
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from database.models import Product, Base

DATABASE_URL = "sqlite:///./test.db"  # 或使用其他数据库连接字符串

engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

def print_products():
    db = SessionLocal()
    try:
        # 查询所有产品
        products = db.query(Product).all()
        print("Listing all products:")
        for product in products:
            print(f"ID: {product.id}, Name: {product.mpn}, Description: {product.stock_status_id}, "
                  f"Price: {product.price}, Quantity: {product.quantity}, "
                  f"Weight: {product.weight_grams}, Image Path: {product.image_url}")
    except Exception as e:
        print(f"Error retrieving products: {e}")
    finally:
        db.close()

if __name__ == "__main__":
    print_products()


Listing all products:
ID: 993, Name: B0731MZCGF, Description: 7, Price: 4.9900, Quantity: 991, Weight: 158.00000000, Image Path: catalog/CQRobot/2.0JST C/CQRJST2.0-C-1.jpg
ID: 1005, Name: B01DUHZAT0, Description: 7, Price: 12.9900, Quantity: 912, Weight: 128.00000000, Image Path: catalog/CQRobot/CQRDMX512/DMX Shield-1.jpg
ID: 1012, Name: B07D35VJNQ, Description: 7, Price: 1.5800, Quantity: 91, Weight: 68.00000000, Image Path: catalog/0704/B01ENQY4WG/B01ENQY4WG-1.jpg
ID: 1031, Name: B01LVVI1XI, Description: 7, Price: 6.9900, Quantity: 392, Weight: 78.00000000, Image Path: catalog/CQRobot/RS232 to RS485/AngelDT-9000DE-1.JPG
ID: 1047, Name: B0738NLFTG, Description: 7, Price: 3.9900, Quantity: 922, Weight: 78.00000000, Image Path: catalog/Speaker /FBA/8O-3W/JST PH/CQRANQI0007US-1.jpg
ID: 1048, Name: B0731NHS9R, Description: 7, Price: 4.8300, Quantity: 983, Weight: 150.00000000, Image Path: catalog/CQRobot/JST XH 2P-4P/JST XH 2P-4P-1.jpg
ID: 1068, Name: , Description: 6, Price: 82.8700, Qua

In [18]:
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from database.models import Cart, Base, User, Product  # 确保引入了Cart, User和Product模型

DATABASE_URL = "sqlite:///./test.db"  # 或使用其他数据库连接字符串

engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

def print_cart_items():
    db = SessionLocal()
    try:
        # 查询所有购物车条目
        cart_items = db.query(Cart).all()
        print("Listing all cart items:")
        for item in cart_items:
            # 这里假设User和Product模型与Cart关联
            user = db.query(User).filter(User.id == item.user_id).first()
            product = db.query(Product).filter(Product.id == item.product_id).first()
            print(f"Cart ID: {item.id}, User: {user.email if user else 'Unknown'}, "
                  f"Product: {product.name if product else 'Unknown'}, "
                  f"Quantity: {item.quantity}")
    except Exception as e:
        print(f"Error retrieving cart items: {e}")
    finally:
        db.close()

if __name__ == "__main__":
    print_cart_items()


Listing all cart items:
Cart ID: 1, User: john.doe@example.com, Product: Laptop, Quantity: 2
Cart ID: 2, User: john.doe@example.com, Product: Smartphone, Quantity: 3
Cart ID: 3, User: testuser@example.com, Product: Laptop, Quantity: 2
Cart ID: 4, User: testuser@example.com, Product: Smartphone, Quantity: 3


In [17]:
# from sqlalchemy import create_engine
# from sqlalchemy.orm import sessionmaker
# from database.models import Base, User, Product, Cart
# from sqlalchemy.exc import IntegrityError
# from datetime import datetime

# DATABASE_URL = "sqlite:///./test.db"  # 或使用其他数据库连接字符串

# engine = create_engine(DATABASE_URL)
# SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

# Base.metadata.create_all(bind=engine)  # 确保所有表都已创建

# def add_user_and_cart_items():
#     db = SessionLocal()
#     try:
#         # 创建一个新用户
#         new_user = User(
#             email="john.doe@example.com",
#             hashed_password="hashed_password_example",
#             disabled=False
#         )
#         db.add(new_user)
#         db.commit() #

#         # 假设产品ID为1和2的产品已经存在于数据库中
#         product1 = db.query(Product).filter_by(id=1).first()
#         product2 = db.query(Product).filter_by(id=2).first()

#         # 为用户添加购物车条目
#         cart_item1 = Cart(
#             user_id=new_user.id,
#             product_id=product1.id,
#             quantity=2
#         )
#         cart_item2 = Cart(
#             user_id=new_user.id,
#             product_id=product2.id,
#             quantity=3
#         )
        
#         db.add(cart_item1)
#         db.add(cart_item2)
#         db.commit()
#         print("Cart items added successfully for the user.")
#     except IntegrityError as e:
#         db.rollback()
#         print(f"Error: {e}")
#     except Exception as e:
#         db.rollback()
#         print(f"An error occurred: {e}")
#     finally:
#         db.close()

# if __name__ == "__main__":
#     add_user_and_cart_items()

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from database.models import Base, User, Product, Cart
from sqlalchemy.exc import IntegrityError
from datetime import datetime

DATABASE_URL = "sqlite:///./test.db"  # 或使用其他数据库连接字符串

engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

Base.metadata.create_all(bind=engine)  # 确保所有表都已创建

def add_cart_items_to_existing_user():
    db = SessionLocal()
    try:
        # 查询已存在的用户
        existing_user = db.query(User).filter_by(email="testuser@example.com").first()
        if not existing_user:
            print("No user found with the specified email.")
            return  # 如果用户不存在则退出函数

        # 假设产品ID为1和2的产品已经存在于数据库中
        product1 = db.query(Product).filter_by(id=1).first()
        product2 = db.query(Product).filter_by(id=2).first()

        # 为用户添加购物车条目
        cart_item1 = Cart(
            user_id=existing_user.id,
            product_id=product1.id,
            quantity=2
        )
        cart_item2 = Cart(
            user_id=existing_user.id,
            product_id=product2.id,
            quantity=3
        )
        
        db.add(cart_item1)
        db.add(cart_item2)
        db.commit()
        print("Cart items added successfully for the existing user.")
    except IntegrityError as e:
        db.rollback()
        print(f"Error: {e}")
    except Exception as e:
        db.rollback()
        print(f"An error occurred: {e}")
    finally:
        db.close()

if __name__ == "__main__":
    add_cart_items_to_existing_user()


Cart items added successfully for the existing user.


In [22]:
import requests
from functools import wraps

# 测试API的基本URL
BASE_URL = "http://localhost:8000/api"

def measure_time(func):
    """
    装饰器，用于测量函数执行时间
    """
    @wraps(func)
    def wrapper(*args, **kwargs):
        import time
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        print(f"{func.__name__} took {end_time - start_time:.2f} seconds.")
        return result
    return wrapper

@measure_time
def test_register_user(email, password, first_name=None, last_name=None, phone_number=None):
    """
    测试注册用户接口
    """
    url = f"{BASE_URL}/users/register"
    payload = {
        "email": email,
        "password": password,
        "first_name": first_name,
        "last_name": last_name,
        "phone_number": phone_number
    }
    response = requests.post(url, json=payload)
    print("Register User Response:", response.json())
    return response

@measure_time
def test_login_user(email, password):
    """
    测试登录用户接口，返回token
    """
    url = f"{BASE_URL}/users/login"
    data = {
        "email": email,
        "password": password
    }
    response = requests.post(url, json=data)
    print("Login User Response:", response.json())
    return response.json()["access_token"]

# 测试用例
email = ""#test2@example.com
password = "test1234"#

# 运行测试
test_register_user(email, password)#, "Test", "User", "+123456789"

Register User Response: {'detail': [{'type': 'value_error', 'loc': ['body', 'email'], 'msg': 'value is not a valid email address: The email address is not valid. It must have exactly one @-sign.', 'input': '', 'ctx': {'reason': 'The email address is not valid. It must have exactly one @-sign.'}}]}
test_register_user took 2.04 seconds.


<Response [422]>

In [5]:
token = test_login_user(email, password)

Login User Response: {'access_token': 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJ0ZXN0MUBleGFtcGxlLmNvbSIsImV4cCI6MTcxMDc3NjE3Mn0.uE_Xwh3H9RbtN4ufkh_SqzMd6NjG8nO652ZuJFt2poY', 'token_type': 'bearer', 'expires_in': 1800}
test_login_user took 2.12 seconds.


In [6]:
@measure_time
def test_get_current_user(token):
    """
    测试获取当前用户信息接口
    """
    url = f"{BASE_URL}/me"
    headers = {
        "Authorization": f"Bearer {token}"
    }
    response = requests.get(url, headers=headers)
    print("Current User Info:", response.json())

test_get_current_user(token)

Current User Info: {'email': 'test1@example.com', 'first_name': '', 'last_name': '', 'phone_number': '', 'disabled': False}
test_get_current_user took 2.05 seconds.


In [4]:
import requests
from functools import wraps
import time

# 定义一个装饰器来测量函数运行时间
def measure_time(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        elapsed_time = time.time() - start_time
        print(f"{func.__name__} took {elapsed_time:.2f} seconds.")
        return result
    return wrapper

@measure_time
def test_register():
    url = "http://localhost:8000/api/users/register"
    payload = {
        "username": "testuser",
        "password": "testpassword123"
    }
    response = requests.post(url, json=payload)
    print("Register - Status Code:", response.status_code)
    print("Register - Response Body:", response.json())

@measure_time
def test_token():
    url = "http://localhost:8000/api/login"
    payload = {
        "username": "testuser",
        "password": "testpassword123"
    }
    response = requests.post(url, data=payload)
    print("Token - Status Code:", response.status_code)
    print("Token - Response Body:", response.json())
    return response.json().get('access_token')

@measure_time
def test_me(access_token):
    url = "http://localhost:8000/api/me"
    headers = {
        "Authorization": f"Bearer {access_token}"
    }
    response = requests.get(url, headers=headers)
    print("Me - Status Code:", response.status_code)
    print("Me - Response Body:", response.json())

if __name__ == "__main__":
    # 注册用户
    test_register()
    # 获取令牌
    token = test_token()
    if token:
        # 使用令牌访问 /me 接口
        test_me(token)


Register - Status Code: 422
Register - Response Body: {'detail': [{'type': 'missing', 'loc': ['query', 'local_kw'], 'msg': 'Field required', 'input': None, 'url': 'https://errors.pydantic.dev/2.6/v/missing'}]}
test_register took 2.02 seconds.
Token - Status Code: 404
Token - Response Body: {'detail': 'Not Found'}
test_token took 2.05 seconds.


In [2]:
import requests
from functools import wraps
import time

# 定义一个装饰器来测量函数运行时间
def measure_time(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        elapsed_time = time.time() - start_time
        print(f"{func.__name__} took {elapsed_time:.2f} seconds.")
        return result
    return wrapper

@measure_time
def test_token():
    url = "http://localhost:8000/api/token"
    payload = {
        "username": "user1",
        "password": "password123"
    }
    response = requests.post(url, data=payload)
    print("Status Code:", response.status_code)
    print("Response Body:", response.json())

# 首先运行 test_register() 确保用户已经注册
# test_register()
# 然后测试 /token 接口
test_token()


Status Code: 404
Response Body: {'detail': 'Not Found'}
test_token took 2.04 seconds.


In [25]:
import time
import requests
from functools import wraps

# 定义一个装饰器来测量函数运行时间
def measure_time(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        elapsed_time = time.time() - start_time
        print(f"{func.__name__} took {elapsed_time:.2f} seconds.")
        return result
    return wrapper

@measure_time
def test_register():
    url = "http://127.0.0.1:8000/api/aaa"
    # headers = {"Content-Type": "application/json"}  # 指定内容类型为JSON
    # payload = {
    #     "username": "user1@11.com",
    #     "password": "password123"
    # }
    response = requests.get(url)#, json=payload, headers=headers
    print("Status Code:", response.status_code)
    print("Response Body:", response.json())

test_register()


Status Code: 200
Response Body: {'message': 'User registered successfully.'}
test_register took 0.01 seconds.


In [36]:
import time
import requests
from functools import wraps

# 定义一个装饰器来测量函数运行时间
def measure_time(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        elapsed_time = time.time() - start_time
        print(f"{func.__name__} took {elapsed_time:.2f} seconds.")
        return result
    return wrapper

@measure_time
def test_register():
    url = "http://127.0.0.1:8000/api/users/register"
    payload = {
        "email": "user1@11.com",
        "password": "password123"
    }
    response = requests.post(url, json=payload)
    print("Status Code:", response.status_code)
    print("Response Body:", response.json())

test_register()


Status Code: 422
Response Body: {'detail': [{'type': 'missing', 'loc': ['query', 'local_kw'], 'msg': 'Field required', 'input': None, 'url': 'https://errors.pydantic.dev/2.6/v/missing'}]}
test_register took 0.03 seconds.


In [29]:
import time
import requests
from functools import wraps

# 定义一个装饰器来测量函数运行时间
def measure_time(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        elapsed_time = time.time() - start_time
        print(f"{func.__name__} took {elapsed_time:.2f} seconds.")
        return result
    return wrapper


import requests
# 使用装饰器
@measure_time
def test_register():
    url = "http://localhost:8000/api/users/register"
    payload = {
        "email": "user1@11.com",
        "password": "password123"
    }
    response = requests.post(url, json=payload)
    print("Status Code:", response.status_code)
    print("Response Body:", response.json())

test_register()

Status Code: 200
Response Body: {'message': 'User registered successfully.'}
test_register took 2.16 seconds.


Status Code: 400
Response Body: {'detail': 'Username already registered'}
test_register took 2.10 seconds.


In [7]:
import requests

@measure_time
def test_login():
    url = "http://localhost:8000/api/users/login"
    payload = {
        "username": "user1",
        "password": "password123"
    }
    response = requests.post(url, data=payload)
    print("Status Code:", response.status_code)
    print("Response Body:", response.json())

test_login()

Status Code: 422
Response Body: {'detail': [{'type': 'missing', 'loc': ['query', 'local_kw'], 'msg': 'Field required', 'input': None, 'url': 'https://errors.pydantic.dev/2.6/v/missing'}]}
test_login took 2.04 seconds.


In [5]:
import requests

@measure_time
def test_login():
    url = "http://localhost:8000/api/me"
    payload = {
        "username": "user1",
        "password": "password123"
    }
    response = requests.post(url, data=payload)
    print("Status Code:", response.status_code)
    print("Response Body:", response.json())

test_login()

Status Code: 405
Response Body: {'detail': 'Method Not Allowed'}
test_login took 2.03 seconds.


In [12]:
import requests

def test_get_product(product_id):
    url = f"http://localhost:8000/api/products/product/{product_id}"
    response = requests.get(url)
    print("Status Code:", response.status_code)
    if response.status_code == 200:
        print("Response Body:", response.json())
    else:
        print("Error:", response.text)

# Replace '1' with the actual product_id you wish to test
test_get_product(1099)

Status Code: 200
Response Body: {'id': 1099, 'model': 'CQRJSTEL-A', 'quantity': 999, 'image_url': 'catalog/CQRobot/JST EL/JST EL 2P-1.jpg', 'price': 6.16, 'name': 'JST EL - 2 Pin Connector Kit', 'description': '&lt;p&gt;&lt;font face=&quot;Tahoma&quot;&gt;With the development of electronic products, a wide range of connectors are present in electronic equipment. Currently, malfunctions caused by poor connectors account for more than 37% of all equipment failures. The JST connector plays the role of conducting current and connecting signal in electronic equipment. Connectors are easier for professional division of labor, parts replacement, troubleshooting and quick assembly. They are widely used in various equipments because of their stronger and more reliable features.&lt;/font&gt;&lt;/p&gt;&lt;p&gt;&lt;font face=&quot;Tahoma&quot;&gt;&lt;b&gt;FEATURES&lt;/b&gt;&lt;/font&gt;&lt;/p&gt;&lt;ul&gt;&lt;li&gt;&lt;font face=&quot;Tahoma&quot;&gt;50 Pieces 4.5mm pitch JST - EL 2-Pin housing ma

In [10]:
import requests

def test_get_popular():
    url = f"http://localhost:8000/api/products/popular"
    response = requests.get(url)
    print("Status Code:", response.status_code)
    if response.status_code == 200:
        print("Response Body:", response.json())
    else:
        print("Error:", response.text)

# Replace '1' with the actual product_id you wish to test
test_get_popular()

Status Code: 200
Response Body: [{'id': 1120, 'img_url': 'catalog/CQRobot/MCP23017/MCP23017-1.JPG', 'name': 'Ocean: MCP23017 IO Expansion Board for Raspberry Pi, Micro:bit, Arduino and STM32.', 'model': 'CQRMCP23017A'}, {'id': 1005, 'img_url': 'catalog/CQRobot/CQRDMX512/DMX Shield-1.jpg', 'name': 'DMX Shield for Arduino', 'model': 'AngelDFR0260US'}, {'id': 993, 'img_url': 'catalog/CQRobot/2.0JST C/CQRJST2.0-C-1.jpg', 'name': 'JST PH - 2 / 3 / 4 Pin Connector Kit', 'model': 'CQRJSTPH-A'}, {'id': 1047, 'img_url': 'catalog/Speaker /FBA/8O-3W/JST PH/CQRANQI0007US-1.jpg', 'name': 'Passive Speaker 8Ω 3W, JST-PH2.0 Interface.', 'model': 'CQRLB8O3W-A'}, {'id': 1031, 'img_url': 'catalog/CQRobot/RS232 to RS485/AngelDT-9000DE-1.JPG', 'name': 'Passive RS232 to RS485 Converter Adapter', 'model': 'AngelDT-9000DE'}, {'id': 1073, 'img_url': 'catalog/CQRobot/CQRCK701/CQRCK701-A.jpg', 'name': 'USB to RS422 or RS485 Serial Port Converter Adapter Cable with FTDI FT232 Chip', 'model': 'CQRCK701'}, {'id': 1

In [2]:
from sqlalchemy import Column, Integer, String, create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

Base = declarative_base()

class User(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True, index=True)
    email = Column(String, unique=True, index=True)
    hashed_password = Column(String)


class AccTokenMapping(Base):
    __tablename__ = 'acc_token_mapping'

    access_token = Column(String, primary_key=True, index=True)
    refresh_token = Column(String, nullable=False)
    exp_at = Column(Integer, nullable=False)  # 使用Integer来存储时间戳

    def __repr__(self):
        return f"<TokenMapping(access_token='{self.access_token}', refresh_token='{self.refresh_token}', exp_at={self.exp_at})>"

# 数据库连接设置
SQLALCHEMY_DATABASE_URL = "sqlite:///./test.db"
engine = create_engine(
    SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

def get_all_users():
    db = SessionLocal()
    try:
        # 查询所有用户记录
        users = db.query(AccTokenMapping).all()
        for user in users:
            print(f"access_token: {user.access_token}, refresh_token: {user.refresh_token},exp_at: {user.exp_at}")
    except Exception as e:
        print(f"Error occurred: {e}")
    finally:
        db.close()

if __name__ == "__main__":
    get_all_users()


access_token: eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJ0ZXN0dXNlckBleGFtcGxlLmNvbSIsImV4cCI6MTcxMjQwOTk2Mn0.x4YQN0_uhJ7-c2Vqw8TXzhti5E0Wk1H7x3Qm3d6YQ4g, refresh_token: eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJ0ZXN0dXNlckBleGFtcGxlLmNvbSIsImV4cCI6MTcxMzAwMzk2Mn0.7G5KlwByiCKHpblMZwIFxIeKib6taSyDa5F6xAMsNHc,exp_at: 1713003962
access_token: eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJ0ZXN0dXNlckBleGFtcGxlLmNvbSIsImV4cCI6MTcxMjQwOTk3MH0.HYkQxRl2drHv4ZWVQlFZQpQek6GTIgWyoFNosGjjTko, refresh_token: eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJ0ZXN0dXNlckBleGFtcGxlLmNvbSIsImV4cCI6MTcxMzAwMzk3MH0.BZDXuJZpwsU440O8NwlIlMd5xw69nc_FSDgxBNldlHc,exp_at: 1713003970


  Base = declarative_base()


In [5]:
from sqlalchemy import Column, Integer, String, ForeignKey, create_engine, Boolean
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship

Base = declarative_base()

class User(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True, index=True)
    email = Column(String, unique=True, index=True)
    hashed_password = Column(String)
    disabled = Column(Boolean, default=False)
    info = relationship("UserInfo", back_populates="user", uselist=False)

class UserInfo(Base):
    __tablename__ = "user_info"
    id = Column(Integer, primary_key=True, index=True)
    user_id = Column(Integer, ForeignKey('users.id'), unique=True)
    first_name = Column(String)
    last_name = Column(String)
    phone_number = Column(String)
    user = relationship("User", back_populates="info")

# Database connection settings
SQLALCHEMY_DATABASE_URL = "sqlite:///./test.db"
engine = create_engine(
    SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

def get_all_user_info():
    db = SessionLocal()
    try:
        # Query all user_info records
        user_infos = db.query(UserInfo).all()
        for user_info in user_infos:
            print(f"UserInfo ID: {user_info.id}, User_id: {user_info.user_id},First Name: {user_info.first_name}, Last Name: {user_info.last_name}, Phone Number: {user_info.phone_number}")
    except Exception as e:
        print(f"Error occurred: {e}")
    finally:
        db.close()

# Running the function to print user_info table entries
get_all_user_info()


UserInfo ID: 1, User_id: 1,First Name: mikky, Last Name: my, Phone Number: 12345648


  Base = declarative_base()


In [5]:
from sqlalchemy import create_engine, Column, Integer, String, ForeignKey, Boolean
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship
import time  # 引入time处理时间戳

Base = declarative_base()

# 你的User和UserInfo模型定义省略...

class TokenBlacklist(Base):
    __tablename__ = "token_blacklist"
    id = Column(Integer, primary_key=True, autoincrement=True)
    token = Column(String, nullable=False, unique=True)
    expires_at = Column(Integer, nullable=False)  # 使用整数存储失效时间戳

    def __repr__(self):
        return f"<TokenBlacklist(token='{self.token}', expires_at={self.expires_at})>"

# Database connection settings
SQLALCHEMY_DATABASE_URL = "sqlite:///./test.db"
engine = create_engine(SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

def get_all_token_blacklist():
    db = SessionLocal()
    try:
        # 查询TokenBlacklist表中的所有记录
        token_blacklists = db.query(TokenBlacklist).all()
        for token_blacklist in token_blacklists:
            # 将expires_at从时间戳转换为可读的日期时间格式进行打印
            expires_datetime = datetime.fromtimestamp(token_blacklist.expires_at, timezone.utc).strftime('%Y-%m-%d %H:%M:%S')
            print(f"Token ID: {token_blacklist.id}, Token: {token_blacklist.token}, Expires At: {expires_datetime}")
    except Exception as e:
        print(f"Error occurred: {e}")
    finally:
        db.close()

# 运行函数以打印TokenBlacklist表中的条目
get_all_token_blacklist()


Error occurred: 'TokenBlacklist' object has no attribute 'created_at'


  Base = declarative_base()


In [None]:
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from your_model_file import Base, User  # 确保从你的模型文件导入Base和User

# 数据库配置
SQLALCHEMY_DATABASE_URL = "sqlite:///./test.db"
engine = create_engine(
    SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(bind=engine)

# 清空User表
def clear_user_table():
    # 创建会话
    db = SessionLocal()
    try:
        # 删除User表中的所有记录
        db.query(User).delete()
        # 提交更改
        db.commit()
        print("User table cleared.")
    except Exception as e:
        # 如果发生错误，则回滚
        db.rollback()
        print(f"Error occurred: {e}")
    finally:
        # 关闭会话
        db.close()

if __name__ == "__main__":
    clear_user_table()
