In [584]:
import sqlite3
import random
from datetime import datetime, timedelta

conn = sqlite3.connect(':memory:')
conn.execute("PRAGMA foreign_keys = 1")
cursor = conn.cursor()

In [585]:
# 创建表结构
cursor.executescript('''
CREATE TABLE users (
    user_id TEXT PRIMARY KEY,
    username TEXT NOT NULL UNIQUE,
    password TEXT NOT NULL CHECK(length(password) >= 8)
);

CREATE TABLE trips (
    trip_id INTEGER PRIMARY KEY,
    start_time DATETIME NOT NULL,
    end_time DATETIME NOT NULL CHECK(end_time > start_time)
);

CREATE TABLE cities (
    city_id INTEGER PRIMARY KEY,
    name TEXT NOT NULL,
    country TEXT NOT NULL,
    longitude DECIMAL(9,6) NOT NULL,
    latitude DECIMAL(9,6) NOT NULL
);

CREATE TABLE locations (
    location_id INTEGER PRIMARY KEY,
    name TEXT NOT NULL,
    address TEXT,
    type TEXT CHECK(type IN ('attraction','restaurant','transport')),
    city_id INTEGER NOT NULL REFERENCES cities(city_id)
);
                     
CREATE TABLE footprints (
    footprint_id INTEGER PRIMARY KEY,
    title TEXT NOT NULL,
    content TEXT,
    image_url TEXT,
    created_at DATETIME NOT NULL,
    user_id TEXT NOT NULL REFERENCES users(user_id),
    location_id INTEGER NOT NULL REFERENCES locations(location_id)
);

CREATE TABLE comments (
    comment_id INTEGER,
    footprint_id INTEGER,
    content TEXT NOT NULL,
    created_at DATETIME NOT NULL,
    user_id TEXT NOT NULL REFERENCES users(user_id),
    parent_comment_id INTEGER,
    PRIMARY KEY (comment_id, footprint_id),
    FOREIGN KEY (footprint_id) REFERENCES footprints(footprint_id),
    FOREIGN KEY (parent_comment_id, footprint_id) 
        REFERENCES comments(comment_id, footprint_id)
);

CREATE TABLE user_favorites (
    user_id TEXT REFERENCES users(user_id),
    footprint_id INTEGER REFERENCES footprints(footprint_id),
    favorited_at DATETIME NOT NULL,
    PRIMARY KEY (user_id, footprint_id)
);

CREATE TABLE trip_participants (
    user_id TEXT REFERENCES users(user_id),
    trip_id INTEGER REFERENCES trips(trip_id),
    role TEXT CHECK(role IN ('creator','participant')),
    PRIMARY KEY (user_id, trip_id)
);

CREATE TABLE trip_cities (
    trip_id INTEGER REFERENCES trips(trip_id),
    city_id INTEGER REFERENCES cities(city_id),
    PRIMARY KEY (trip_id, city_id)
);
''')

# 提交事务
conn.commit()

In [586]:
cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
print("已创建表:", [table[0] for table in cursor.fetchall()])

已创建表: ['users', 'trips', 'cities', 'locations', 'footprints', 'comments', 'user_favorites', 'trip_participants', 'trip_cities']


In [587]:
# 业务功能实现
def get_user_footprints(user_id):
    query = '''
    SELECT * FROM footprints 
    WHERE user_id = ?
    ORDER BY created_at DESC
    '''
    cursor.execute(query, (user_id,))
    return cursor.fetchall()

def add_comment(footprint_id, user_id, content, parent_id=None):
    # 获取下一个评论ID
    cursor.execute('''
    SELECT MAX(comment_id) FROM comments 
    WHERE footprint_id = ?
    ''', (footprint_id,))
    max_id = cursor.fetchone()[0]
    next_id = max_id + 1 if max_id is not None else 1
    
    try:
        cursor.execute('''
        INSERT INTO comments 
        VALUES (?, ?, ?, datetime('now'), ?, ?)
        ''', (next_id, footprint_id, content, user_id, parent_id))
        conn.commit()
        return True
    except Exception as e:
        print(f"Error adding comment: {str(e)}")
        conn.rollback()
        return False

In [588]:
# 数据生成工具函数
def generate_users(num):
    """生成示例用户数据"""
    return [
        (f'U{100+i}', f'User{i}', f'Passw0rd!{i}')
        for i in range(num)
    ]

def generate_cities():
    """生成示例城市数据"""
    return [
        (1, 'Paris', 'France', 48.8566, 2.3522),
        (2, 'London', 'UK', 51.5074, -0.1278),
        (3, 'Tokyo', 'Japan', 35.6762, 139.6503),
        (4, 'New York', 'USA', 40.7128, -74.0060),
        (5, 'Beijing', 'China', 39.9042, 116.4074)
    ]

def generate_locations():
    """生成示例地点数据"""
    return [
        (1, 'Eiffel Tower', 'Champ de Mars', 'attraction', 1),
        (2, 'Louvre Museum', 'Rue de Rivoli', 'attraction', 1),
        (3, 'Big Ben', 'Westminster', 'attraction', 2),
        (4, 'Tower Bridge', 'Tower Bridge Rd', 'attraction', 2),
        (5, 'Shibuya Crossing', 'Shibuya', 'attraction', 3)
    ]

def insert_data(table, data):
    try:
        cursor.executemany(f'INSERT INTO {table} VALUES ({",".join("?"*len(data[0]))})', data)
        conn.commit()
        print(f"Inserted {len(data)} records into {table}")
    except sqlite3.IntegrityError as e:
        print(f"Integrity error: {str(e)}")

insert_data('users', generate_users(5))
insert_data('cities', generate_cities())
insert_data('locations', generate_locations())

Inserted 5 records into users
Inserted 5 records into cities
Inserted 5 records into locations


In [None]:
class DatabaseManager:
    def __init__(self, conn):
        self.conn = conn
        self.cursor = conn.cursor()
        self.trip_cnt = 0
        self.footprint_cnt = 0

    def get_user_footprints(self, user_id):
        query = '''
        SELECT * FROM footprints 
        WHERE user_id = ?
        ORDER BY created_at DESC
        '''
        self.cursor.execute(query, (user_id,))
        return self.cursor.fetchall()
    
    def get_user_favorites(self, user_id):
        query = '''
        SELECT * 
        FROM footprints fp
        WHERE footprint_id IN (
            SELECT footprint_id
            FROM user_favorites
            WHERE user_id == ?
        )
        '''
        self.cursor.execute(query, (user_id,))
        return self.cursor.fetchall()

    def add_comment(self, footprint_id, user_id, content, parent_id=None):
        # 获取下一个评论ID
        self.cursor.execute('''
        SELECT MAX(comment_id) FROM comments 
        WHERE footprint_id = ?
        ''', (footprint_id,))
        max_id = self.cursor.fetchone()[0]
        next_id = max_id + 1 if max_id is not None else 1
        
        try:
            self.cursor.execute('''
            INSERT INTO comments 
            VALUES (?, ?, ?, datetime('now'), ?, ?)
            ''', (next_id, footprint_id, content, user_id, parent_id))
            self.conn.commit()
            return True
        except Exception as e:
            print(f"Error adding comment: {str(e)}")
            self.conn.rollback()
            return False
    
    # 用户管理
    def create_user(self, user_id, username, password):
        """创建新用户"""
        try:
            self.cursor.execute('''
                INSERT INTO users VALUES (?, ?, ?)
            ''', (user_id, username, password))
            self.conn.commit()
            return True
        except sqlite3.IntegrityError as e:
            print(f"创建用户失败: {str(e)}")
            return False
    
    def delete_user(self, user_id):
        """删除用户（级联删除相关数据）"""
        try:
            self.cursor.execute('DELETE FROM users WHERE user_id = ?', (user_id,))
            self.conn.commit()
            return self.cursor.rowcount > 0
        except sqlite3.Error as e:
            print(f"删除用户失败: {str(e)}")
            return False
    
    # 足迹管理
    def create_footprint(self, user_id, title, content, location_id):
        """创建新足迹（关联所有参与行程）"""
        self.footprint_cnt += 1
        footprint_id = self.footprint_cnt
        try:        
            # 插入足迹基础数据
            self.cursor.execute('''
                INSERT INTO footprints 
                VALUES (?, ?, ?, ?, datetime('now'), ?, ?)
            ''', (footprint_id, title, content, f'img_{footprint_id}.jpg', user_id, location_id))
        
            
            self.conn.commit()
            return footprint_id
        except sqlite3.Error as e:
            print(f"创建足迹失败: {str(e)}")
            return None
        
    # 行程管理
    def create_trip(self, user_id, days=7, identity="participant"):
        """创建新行程"""
        self.trip_cnt += 1
        trip_id = self.trip_cnt
        try:
            start = datetime.now() - timedelta(days=random.randint(1,30))
            end = start + timedelta(days=days)
            
            self.cursor.execute('''
                INSERT INTO trips VALUES (?, ?, ?)
            ''', (trip_id, start.isoformat(), end.isoformat()))
            
            
            self.cursor.execute('''
                INSERT INTO trip_participants VALUES (?, ?, ?)
            ''', (user_id, trip_id, identity))
            
            self.conn.commit()
            return trip_id
        except sqlite3.Error as e:
            print(f"创建行程失败: {str(e)}")
            return None
    
    # 高级查询
    def get_user_statistics(self, user_id):
        """获取用户统计信息"""
        stats = {}
        
        # 足迹数量
        self.cursor.execute('''
            SELECT COUNT(*) FROM footprints WHERE user_id = ?
        ''', (user_id,))
        stats['footprint_count'] = self.cursor.fetchone()[0]

        # 所有足迹
        stats['footprints'] = self.get_user_footprints(user_id)
        
        # 收藏数量
        self.cursor.execute('''
            SELECT COUNT(*) FROM user_favorites WHERE user_id = ?
        ''', (user_id,))
        stats['favorite_count'] = self.cursor.fetchone()[0]

        # 所有收藏
        stats['favorites'] = self.get_user_favorites(user_id)
        
        return stats

# 使用示例
db = DatabaseManager(conn)

In [590]:
# 生成测试数据
# 创建10个用户
for i in range(10, 20):
    db.create_user(f'U{i}', f'TestUser{i}', f'P@ssw0rd{i}')

# 为每个用户创建行程
for user_id in [f'U{i}' for i in range(100,105)]:
    ident = "creator" if random.randint(0, 5) == 0 else "participant"
    trip_id = db.create_trip(user_id, identity=ident)
    if trip_id:
        print(f"用户 {user_id} 作为 {ident} 参与了行程 {trip_id}")

# 生成足迹数据
locations = [row[0] for row in cursor.execute('SELECT location_id FROM locations').fetchall()]
for user_id in [f'U{i}' for i in range(100,105)]:
    for _ in range(random.randint(1,2)):
        title = f"{random.choice(['探索', '发现', '体验'])}{random.choice(['文化', '美食', '历史'])}"
        location_id = random.choice(locations)
        fp_id = db.create_footprint(
            user_id=user_id,
            title=title,
            content=f"这是关于{title}的详细内容...",
            location_id=location_id
        )
        if fp_id:
            location_name = [row[0] for row in 
                         cursor.execute('SELECT name FROM locations WHERE location_id == ?', (location_id,)).fetchall()][0]
            print(f"用户 {user_id} 创建了足迹 {fp_id}, 标题 {title}, 位于 {location_name}")

# 生成收藏关系
footprints = [row[0] for row in cursor.execute('SELECT footprint_id FROM footprints').fetchall()]
for user_id in [f'U{i}' for i in range(100,105)]:
    favorites = random.sample(footprints, k=random.randint(1, 3))
    for fp_id in favorites:
        try:
            cursor.execute('''
                INSERT INTO user_favorites 
                VALUES (?, ?, datetime('now'))
            ''', (user_id, fp_id))
            print(f"用户 {user_id} 收藏了足迹 {fp_id}")
        except sqlite3.Error as e:
            print(f"收藏添加失败, {e}")
conn.commit()

用户 U100 作为 creator 参与了行程 1
用户 U101 作为 participant 参与了行程 2
用户 U102 作为 participant 参与了行程 3
用户 U103 作为 participant 参与了行程 4
用户 U104 作为 participant 参与了行程 5
用户 U100 创建了足迹 1, 标题 体验美食, 位于 Tower Bridge
用户 U101 创建了足迹 2, 标题 体验文化, 位于 Eiffel Tower
用户 U101 创建了足迹 3, 标题 发现美食, 位于 Big Ben
用户 U102 创建了足迹 4, 标题 发现美食, 位于 Shibuya Crossing
用户 U102 创建了足迹 5, 标题 探索历史, 位于 Tower Bridge
用户 U103 创建了足迹 6, 标题 体验文化, 位于 Eiffel Tower
用户 U104 创建了足迹 7, 标题 体验历史, 位于 Tower Bridge
用户 U104 创建了足迹 8, 标题 体验文化, 位于 Eiffel Tower
用户 U100 收藏了足迹 6
用户 U100 收藏了足迹 3
用户 U101 收藏了足迹 3
用户 U102 收藏了足迹 4
用户 U102 收藏了足迹 5
用户 U103 收藏了足迹 1
用户 U103 收藏了足迹 4
用户 U104 收藏了足迹 4
用户 U104 收藏了足迹 6


In [591]:
# 示例查询
print("\n=== 用户统计信息示例 ===")
sample_user = 'U100'
stats = db.get_user_statistics(sample_user)
print(f"用户 {sample_user} 的统计：")
print(f"- 足迹数量：{stats['footprint_count']}")
print(f"- 收藏数量：{stats['favorite_count']}")
print(f"- 所有足迹：")
for fp in stats['footprints']:
    print(fp)
print(f"- 所有收藏：")
for fa in stats['favorites']:
    print(fa)

print("\n=== 各城市足迹数量 ===")
cursor.execute('''
    SELECT c.name, COUNT(f.footprint_id) 
    FROM cities c
    LEFT JOIN locations loc ON c.city_id = loc.city_id
    LEFT JOIN footprints f ON loc.location_id = f.location_id
    GROUP BY c.name
''')
for row in cursor.fetchall():
    print(f"{row[0]}: {row[1]} 个足迹")

print("\n=== 事务示例：完整发布流程 ===")
try:
    # 开始事务
    conn.execute('BEGIN TRANSACTION')
    
    # 创建用户
    db.create_user('U999', 'NewUser', 'ValidPass123')
    
    # 创建行程
    trip_id = db.create_trip('U999', days=5)
    
    # 创建足迹
    fp_id = db.create_footprint('U999', '测试足迹', '事务测试内容', 1)
    
    # 提交事务
    conn.commit()
    print("事务提交成功！")
except Exception as e:
    conn.rollback()
    print(f"事务回滚：{str(e)}")


=== 用户统计信息示例 ===
用户 U100 的统计：
- 足迹数量：1
- 收藏数量：2
- 所有足迹：
(1, '体验美食', '这是关于体验美食的详细内容...', 'img_1.jpg', '2025-04-04 10:32:28', 'U100', 4)
- 所有收藏：
(3, '发现美食', '这是关于发现美食的详细内容...', 'img_3.jpg', '2025-04-04 10:32:28', 'U101', 3)
(6, '体验文化', '这是关于体验文化的详细内容...', 'img_6.jpg', '2025-04-04 10:32:28', 'U103', 1)

=== 各城市足迹数量 ===
Beijing: 0 个足迹
London: 4 个足迹
New York: 0 个足迹
Paris: 3 个足迹
Tokyo: 1 个足迹

=== 事务示例：完整发布流程 ===
事务提交成功！
