In [23]:
import sqlite3
import os
from pathlib import Path
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
from datetime import datetime

In [24]:
# 异常类
class DuplicateKeyError(Exception):
    """当 Email 已经存在于系统中时抛出"""
    pass

class RecordNotFound(Exception):
    """当无法通过 ID 或 Email 找到记录时抛出"""
    pass

class InvalidOperationError(Exception): 
    """当业务流转违反规则时抛出"""
    def __init__(self, message, current_status=None):
        super().__init__(message)
        self.current_status = current_status  # 存储状态以便后续审计

In [25]:
# 数据类
@dataclass
class Customer:
    name: str
    email: str
    id: Optional[int] = None
    created_at: Optional[str] = None

@dataclass
class Loan:
    customer_id: int
    principal_cents: int
    interest_rate: float
    term_months: int
    status: str = "pending"
    id: Optional[int] = None
    applied_at: Optional[str] = None
    approved_at: Optional[str] = None

@dataclass
class Repayment:
    loan_id: int
    amount_cents: int
    id: Optional[int] = None
    paid_at: Optional[str] = None

In [26]:
# App类
class CreditApp:
    def __init__(self, db_path: str = "credit.db"):
        """
        初始化信贷应用：
        1. 检索/创建数据库文件
        2. 开启外键约束
        3. 自动执行建表逻辑（幂等操作）
        """
        self.db_path = db_path
        
        # 1. 建立连接（如果文件不存在，sqlite3 会自动创建）
        self.conn = sqlite3.connect(self.db_path)
        
        # 允许通过列名访问结果 (例如 row['name'])
        self.conn.row_factory = sqlite3.Row
        
        # 2. 配置环境：启用外键约束 (SQLite 默认关闭)
        self.conn.execute("PRAGMA foreign_keys = ON;")
        
        # 3. 初始化表结构
        self._create_tables()

    def _create_tables(self):
        """内部方法：执行初始建表 SQL"""
        # 使用 with self.conn 保证事务原子性，即使建表失败也会回滚
        with self.conn:
            # 创建客户表
            self.conn.execute("""
                CREATE TABLE IF NOT EXISTS customers (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    name TEXT NOT NULL,
                    email TEXT NOT NULL UNIQUE,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                );
            """)
            
            # 创建贷款表
            # 注意：status 约束在这里直接固化在数据库层
            self.conn.execute("""
                CREATE TABLE IF NOT EXISTS loans (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    customer_id INTEGER NOT NULL,
                    principal_cents INTEGER NOT NULL CHECK (principal_cents > 0),
                    interest_rate REAL NOT NULL CHECK (interest_rate BETWEEN 0 AND 1),
                    term_months INTEGER NOT NULL CHECK (term_months > 0),
                    status TEXT NOT NULL CHECK (status IN ('pending','approved','rejected','closed')),
                    applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    approved_at TIMESTAMP,
                    FOREIGN KEY (customer_id) REFERENCES customers(id) ON DELETE CASCADE
                );
            """)
            
            # 创建还款流水表
            self.conn.execute("""
                CREATE TABLE IF NOT EXISTS repayments (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    loan_id INTEGER NOT NULL,
                    amount_cents INTEGER NOT NULL CHECK (amount_cents > 0),
                    paid_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    FOREIGN KEY (loan_id) REFERENCES loans(id) ON DELETE CASCADE
                );
            """)

    def add_customer(self, c: Customer) -> None:
        """
        将 Customer 对象持久化到数据库中。
        如果 email 违反 UNIQUE 约束，抛出 DuplicateKeyError。
        """
        try:
            # 使用事务上下文，保证操作的原子性
            with self.conn:
                self.conn.execute(
                    "INSERT INTO customers (name, email) VALUES (?, ?)",
                    (c.name, c.email)
                )
        except sqlite3.IntegrityError as e:
            # SQLite 抛出 IntegrityError 通常意味着违反了 UNIQUE 或 CHECK 约束
            # 我们根据错误消息进一步判断是否为 email 重复
            if "UNIQUE constraint failed: customers.email" in str(e):
                raise DuplicateKeyError(f"邮箱 '{c.email}' 已存在，无法重复注册。") from e
            # 如果是其他完整性错误，则原样抛出
            raise e

    def apply_loan(self, email: str, principal_cents: int, interest_rate: float, term_months: int) -> int:
        """
        根据客户 Email 申请贷款。
        1. 查找客户 ID
        2. 插入状态为 'pending' 的贷款记录
        3. 返回生成的贷款 ID
        """
        # 1. 查找客户 ID
        cursor = self.conn.execute("SELECT id FROM customers WHERE email = ?", (email,))
        row = cursor.fetchone()
        
        if not row:
            raise RecordNotFound(f"未找到邮箱为 '{email}' 的客户，请先开户。")
        
        customer_id = row["id"]
        
        # 2. 执行插入
        with self.conn:
            cursor = self.conn.execute(
                """
                INSERT INTO loans (customer_id, principal_cents, interest_rate, term_months, status)
                VALUES (?, ?, ?, ?, 'pending')
                """,
                (customer_id, principal_cents, interest_rate, term_months)
            )
            # 获取自动递增产生的 ID
            loan_id = cursor.lastrowid
            
        return loan_id
    
    def approve_loan(self, loan_id: int, approve: bool) -> None:
        """
        审批贷款。仅限 pending 状态。
        更新 status 并记录审批时间。
        """
        # 1. 查询当前贷款状态
        cursor = self.conn.execute("SELECT status FROM loans WHERE id = ?", (loan_id,))
        row = cursor.fetchone()
        
        if not row:
            raise RecordNotFound(f"未找到 ID 为 {loan_id} 的贷款记录。")
        
        current_status = row["status"]
        
        # 2. 状态校验：只有 pending 状态可以审批
        if current_status != 'pending':
            raise InvalidOperationError(
                f"贷款 {loan_id} 当前状态为 '{current_status}'，无法重复审批。",
                current_status=current_status
            )
        
        # 3. 确定新状态
        new_status = 'approved' if approve else 'rejected'
        
        # 4. 执行更新
        with self.conn:
            self.conn.execute(
                """
                UPDATE loans 
                SET status = ?, approved_at = CURRENT_TIMESTAMP 
                WHERE id = ?
                """,
                (new_status, loan_id)
            )
    
    def record_repayment(self, loan_id: int, amount_cents: int) -> None:
        """
        登记还款。仅限 approved 状态。
        若累计还款达到本金，则结清贷款。
        """
        # 1. 检查贷款状态
        cursor = self.conn.execute("SELECT status, principal_cents FROM loans WHERE id = ?", (loan_id,))
        loan = cursor.fetchone()
        
        if not loan:
            raise RecordNotFound(f"贷款 ID {loan_id} 不存在。")
        if loan["status"] != 'approved':
            raise InvalidOperationError(f"贷款 {loan_id} 状态为 {loan['status']}，无法登记还款。")

        # 2. 开启事务处理：记录还款并检查结清
        with self.conn:
            # 插入还款流水
            self.conn.execute(
                "INSERT INTO repayments (loan_id, amount_cents) VALUES (?, ?)",
                (loan_id, amount_cents)
            )
            
            # 计算当前总还款额
            cursor = self.conn.execute(
                "SELECT SUM(amount_cents) as total_paid FROM repayments WHERE loan_id = ?",
                (loan_id,)
            )
            total_paid = cursor.fetchone()["total_paid"] or 0
            
            # 3. 结清判定
            if total_paid >= loan["principal_cents"]:
                self.conn.execute(
                    "UPDATE loans SET status = 'closed' WHERE id = ?",
                    (loan_id,)
                )

    def customer_balance(self, email: str) -> tuple[int, int]:
        """
        返回指定客户的 (总借款本金, 总已还金额)。
        """
        # 使用 JOIN 一次性查出两个总额
        # COALESCE 用于处理 NULL（例如没有还款记录时显示 0）
        sql = """
            SELECT 
                COALESCE(SUM(l.principal_cents), 0) as total_principal,
                COALESCE(SUM(r.amount_cents), 0) as total_repaid
            FROM customers c
            LEFT JOIN loans l ON c.id = l.customer_id
            LEFT JOIN repayments r ON l.id = r.loan_id
            WHERE c.email = ?
        """
        cursor = self.conn.execute(sql, (email,))
        row = cursor.fetchone()
        
        # 注意：如果 email 不存在，由于是 LEFT JOIN，仍会返回 (0, 0)
        # 我们可以先验证客户是否存在来决定是否抛出 RecordNotFound
        check = self.conn.execute("SELECT 1 FROM customers WHERE email = ?", (email,)).fetchone()
        if not check:
            raise RecordNotFound(f"未找到客户: {email}")

        return (row["total_principal"], row["total_repaid"])

    def overdue_loans(self, days: int = 30) -> list[tuple]:
        """
        列出逾期贷款。
        逾期定义：距离最后一次还款（或审批通过日）已超过指定天数，且未结清。
        """
        # SQL 逻辑：
        # 1. 使用 LEFT JOIN 包含那些一次都还没还过的贷款
        # 2. GROUP BY l.id 以便计算每个贷款的还款总和与最后还款时间
        # 3. HAVING 子句过滤天数差
        sql = """
            SELECT 
                l.id as loan_id,
                c.name as customer_name,
                (l.principal_cents - COALESCE(SUM(r.amount_cents), 0)) as outstanding_cents,
                COALESCE(MAX(r.paid_at), l.approved_at) as last_activity
            FROM loans l
            JOIN customers c ON l.customer_id = c.id
            LEFT JOIN repayments r ON l.id = r.loan_id
            WHERE l.status = 'approved'
            GROUP BY l.id
            HAVING (julianday('now') - julianday(last_activity)) > ?
        """
        
        cursor = self.conn.execute(sql, (days,))
        results = []
        for row in cursor.fetchall():
            results.append((
                row["loan_id"],
                row["customer_name"],
                row["outstanding_cents"],
                row["last_activity"]
            ))
        return results

    def portfolio_summary(self) -> dict:
        """
        生成业务大盘汇总报告。
        """
        # 1. 统计总客户数 (明确指定从 customers 表统计)
        total_customers = self.conn.execute("SELECT COUNT(*) FROM customers").fetchone()[0]

        # 2. 统计贷款相关基础指标：活跃数、平均利率、平均本金
        # 我们只统计 status 为 'approved' 的贷款，这通常更符合“资产组合”的管理逻辑
        loan_stats = self.conn.execute("""
            SELECT 
                COUNT(*) as active_count,
                AVG(interest_rate) as avg_rate,
                AVG(principal_cents) / 100.0 as avg_principal_cny
            FROM loans 
            WHERE status = 'approved'
        """).fetchone()

        active_count = loan_stats["active_count"] or 0
        avg_rate = loan_stats["avg_rate"] or 0
        avg_principal = loan_stats["avg_principal_cny"] or 0

        # 3. 计算不良贷款数 (逾期 > 90天)
        default_sql = """
            SELECT COUNT(*) FROM (
                SELECT l.id
                FROM loans l
                LEFT JOIN repayments r ON l.id = r.loan_id
                WHERE l.status = 'approved'
                GROUP BY l.id
                HAVING (julianday('now') - julianday(COALESCE(MAX(r.paid_at), l.approved_at))) > 90
            )
        """
        default_loans_count = self.conn.execute(default_sql).fetchone()[0]

        # 4. 计算不良率 (NPL Ratio)
        default_ratio = 0.0
        if active_count > 0:
            default_ratio = default_loans_count / active_count

        return {
            "total_customers": total_customers,
            "active_loans": active_count,
            "avg_interest_rate": round(avg_rate, 4),
            "avg_principal_cny": round(avg_principal, 2),
            "default_ratio": round(default_ratio, 4)
        }

    def close(self):
        """关闭数据库连接"""
        if self.conn:
            self.conn.close()

In [27]:
def run_simulation():
    # 1. 初始化应用（自动建表）
    app = CreditApp("simulation.db")
    print("=== 模拟开始：个人消费信贷系统 ===\n")

    try:
        # --- 场景 1: 客户开户 ---
        print("步骤 1: 录入新客户...")
        alice = Customer(name="Alice Wang", email="alice@example.com")
        app.add_customer(alice)
        print("OK: 客户 Alice 注册成功。")

        # --- 场景 2: 触发 DuplicateKeyError ---
        print("\n步骤 2: 尝试用相同 Email 再次注册（预期触发异常）...")
        try:
            app.add_customer(Customer(name="Alice Clone", email="alice@example.com"))
        except DuplicateKeyError as e:
            print(f"捕获预期异常: {e}")

        # --- 场景 3: 申请贷款与 RecordNotFound ---
        print("\n步骤 3: 尝试为不存在的邮箱申请贷款（预期触发异常）...")
        try:
            app.apply_loan("stranger@example.com", 100000, 0.05, 12)
        except RecordNotFound as e:
            print(f"捕获预期异常: {e}")

        # --- 场景 4: 正常申请贷款 ---
        print("\n步骤 4: Alice 申请一笔 5000 元的贷款...")
        loan_id = app.apply_loan("alice@example.com", 500000, 0.08, 24)
        print(f"OK: 贷款申请成功，ID 为: {loan_id}")

        # --- 场景 5: 触发 InvalidOperationError (提前还款) ---
        print("\n步骤 5: 在审批前尝试还款（预期触发异常）...")
        try:
            app.record_repayment(loan_id, 10000)
        except InvalidOperationError as e:
            print(f"捕获预期异常: {e}")

        # --- 场景 6: 审批贷款 ---
        print("\n步骤 6: 审批通过该贷款...")
        app.approve_loan(loan_id, approve=True)
        print(f"OK: 贷款 {loan_id} 已转为 approved 状态。")

        # --- 场景 7: 触发 InvalidOperationError (重复审批) ---
        print("\n步骤 7: 尝试对已审批的贷款再次审批（预期触发异常）...")
        try:
            app.approve_loan(loan_id, approve=False)
        except InvalidOperationError as e:
            print(f"捕获预期异常: {e}")

        # --- 场景 8: 正常还款与余额查询 ---
        print("\n步骤 8: 登记一笔 2000 元的还款...")
        app.record_repayment(loan_id, 200000)
        principal, repaid = app.customer_balance("alice@example.com")
        print(f"OK: Alice 当前账单 -> 总本金: {principal/100}元, 已还: {repaid/100}元")

        # --- 场景 9: 结清贷款 ---
        print("\n步骤 9: 偿还剩余 3000 元，观察状态自动结清...")
        app.record_repayment(loan_id, 300000)
        # 验证状态
        cursor = app.conn.execute("SELECT status FROM loans WHERE id = ?", (loan_id,))
        status = cursor.fetchone()["status"]
        print(f"OK: 贷款最终状态为: {status}")

        # --- 场景 10: 生成报表 ---
        print("\n步骤 10: 查看全盘资产报表...")
        summary = app.portfolio_summary()
        print("资产大盘:")
        for k, v in summary.items():
            print(f"  - {k}: {v}")

    finally:
        app.close()
        # 清理测试数据库文件（可选）
        os.remove("simulation.db")
        print("\n=== 模拟结束 ===")

In [28]:
if __name__ == "__main__":
    # 确保数据库文件名统一
    app = CreditApp("credit_test.db")
    
    print("--- 1) 新增两位客户 ---")
    try:
        alice = Customer(name="Alice Wang", email="alice@example.com")
        bob = Customer(name="Bob Li", email="bob@example.com")
        app.add_customer(alice)
        app.add_customer(bob)
        print("成功: Alice 和 Bob 已开户。")
    except DuplicateKeyError as e:
        print(f"提示: 客户可能已存在 ({e})")

    print("\n--- 2) 各自申请贷款并审批 ---")
    # Alice 借 5000 元，8% 利率
    loan_id_alice = app.apply_loan("alice@example.com", 500000, 0.08, 12)
    # Bob 借 3000 元，10% 利率
    loan_id_bob = app.apply_loan("bob@example.com", 300000, 0.10, 6)
    
    app.approve_loan(loan_id_alice, approve=True)
    app.approve_loan(loan_id_bob, approve=True)
    print(f"成功: 贷款 ID {loan_id_alice} (Alice) 和 ID {loan_id_bob} (Bob) 已审批通过。")

    print("\n--- 3) 录入还款并故意触发重复审批异常 ---")
    # Alice 还款 2000 元
    app.record_repayment(loan_id_alice, 200000)
    print(f"成功: Alice 偿还了 2000 元。")

    try:
        print("尝试重复审批 Alice 的贷款...")
        app.approve_loan(loan_id_alice, approve=True)
    except InvalidOperationError as e:
        print(f"预期异常触发成功: {e}")

    print("\n--- 4) 打印报表与查询结果 ---")
    
    # 查询 Alice 的余额
    bal_a = app.customer_balance("alice@example.com")
    print(f"Alice 账户余额 -> 总借款: {bal_a[0]/100}元, 总已还: {bal_a[1]/100}元")

    # 查询逾期贷款 (假设逾期天数为 0 以便观察当前所有活跃贷款)
    overdues = app.overdue_loans(days=-1) # 使用 -1 天强制列出所有未结清且有活动记录的
    print(f"当前未结清贷款列表: {overdues}")

    # 打印全盘报表
    summary = app.portfolio_summary()
    print("\n=== 业务全盘报表 (Portfolio Summary) ===")
    print(f"  总客户数:       {summary['total_customers']}")
    print(f"  活跃贷款数:     {summary['active_loans']}")
    print(f"  平均利率:       {summary['avg_interest_rate'] * 100:.2f}%")
    print(f"  平均本金:       {summary['avg_principal_cny']:.2f} 元")
    print(f"  不良率 (NPL):   {summary['default_ratio'] * 100:.2f}%")
    
    app.close()

--- 1) 新增两位客户 ---
成功: Alice 和 Bob 已开户。

--- 2) 各自申请贷款并审批 ---
成功: 贷款 ID 1 (Alice) 和 ID 2 (Bob) 已审批通过。

--- 3) 录入还款并故意触发重复审批异常 ---
成功: Alice 偿还了 2000 元。
尝试重复审批 Alice 的贷款...
预期异常触发成功: 贷款 1 当前状态为 'approved'，无法重复审批。

--- 4) 打印报表与查询结果 ---
Alice 账户余额 -> 总借款: 5000.0元, 总已还: 2000.0元
当前未结清贷款列表: [(1, 'Alice Wang', 300000, '2026-01-03 06:30:10'), (2, 'Bob Li', 300000, '2026-01-03 06:30:10')]

=== 业务全盘报表 (Portfolio Summary) ===
  总客户数:       2
  活跃贷款数:     2
  平均利率:       9.00%
  平均本金:       4000.00 元
  不良率 (NPL):   0.00%
