In [62]:
# pip install faker

In [None]:
import pandas as pd
from faker import Faker
import random
from datetime import datetime, timedelta

# 創建 Faker 實例
fake = Faker('zh_TW')
fake_en = Faker('en_US')

# 輸出文件名
OUTPUT_FILE = "schema_data.xlsx"

file_path = "road_name.csv"

# 加載 CSV 文件
def load_road_names(file_path):
    """讀取路名 CSV 文件，返回 DataFrame，並忽略第一行"""
    return pd.read_csv(file_path, skiprows=1, names=["city", "site_id", "road"])


road_data = load_road_names(file_path)


def generate_tw_phone_number():
    area_code = "09"  # 台灣行動電話的區碼
    line_number = random.randint(10000000, 99999999)  # 8 位數的行動號碼
    return f"({area_code}){line_number:08d}"


# 生成地址函數
def generate_address(road_data):
    selected_row = road_data.sample(n=1).iloc[0]
    city = selected_row["city"]
    site_id = selected_row["site_id"]
    road = selected_row["road"]
    number = random.randint(1, 300)
    return f"{city}{site_id}{road}{number}號"

# CATEGORY
def generate_categories():
    categories = ["Concert","Musical","Exhibition","Drama","Music","Dance","Family","Sports","Others"]
    data = [{"c_id": idx + 1, "c_name": name} for idx, name in enumerate(categories)]
    return pd.DataFrame(data)

def generate_organizers(num=20):
    # 常見演唱會主辦公司風格名稱元素
    prefix_keywords = [
        "星", "天", "金", "超級", "華", "音樂", "夢想", "未來", "傳奇", "青春"
    ]
    suffix_keywords = [
        "娛樂", "音樂", "製作", "文化", "活動", "傳播", "星光", "演藝", "表演", "創意"
    ]
    data = []

    for i in range(1, num + 1):
        # 隨機生成公司名稱
        company_name = f"{random.choice(prefix_keywords)}{random.choice(suffix_keywords)}公司"
        
        data.append({
            "o_id": i,
            "o_name": company_name,  # 更真實的公司名稱
            "contact_info": generate_tw_phone_number()  # 台灣電話格式
        })

    return pd.DataFrame(data)

# CUSTOMERS
def generate_customers(num=11451):
    email_providers = ["gmail.com", "yahoo.com", "outlook.com", "hotmail.com", "icloud.com"]
    roles = ["User", "Admin"]
    role_weights = [0.99, 0.01]  # User 的概率为 99%，Admin 的概率为 1%
    
    used_emails = set()  # 用于存储已生成的唯一 email
    
    # 指定前两行数据
    data = [
        {
            "cu_id": 1,
            "cu_name": "admin",  # 自定义用户名
            "email": "admin@google.com",  # 自定义邮箱
            "phone_number": generate_tw_phone_number(),  # 自定义电话
            "address": generate_address(road_data),  # 自定义地址
            "pwd": "1234",  # 自定义密码
            "role": "Admin"  # 自定义角色
        },
        {
            "cu_id": 2,
            "cu_name": "test user",
            "email": "test_user@google.com",
            "phone_number": generate_tw_phone_number(),
            "address": generate_address(road_data),
            "pwd": "1234",
            "role": "User"
        }
    ]
    
    # 添加指定的 email 到集合
    used_emails.add("admin@google.com")
    used_emails.add("test_user@google.com")
    
    # 添加随机生成的数据
    for i in range(3, num + 3):  # 从3开始编号，避免ID冲突
        while True:
            # 生成唯一的 email
            local_part = fake.user_name()
            domain = random.choice(email_providers)
            email = f"{local_part}@{domain}"
            
            if email not in used_emails:  # 确保唯一
                used_emails.add(email)
                break  # 跳出循环
        
        data.append({
            "cu_id": i,
            "cu_name": fake.name(),
            "email": email,
            "phone_number": generate_tw_phone_number(),  
            "address": generate_address(road_data),  
            "pwd": fake.password(),
            "role": random.choices(roles, weights=role_weights, k=1)[0]  # 按权重选择角色
        })
    
    return pd.DataFrame(data)


# VENUE
def generate_venues(num=30):
    # 台灣常見場地名稱和類型
    cities = [
        "台北市", "新北市", "台中市", "高雄市", "台南市", "桃園市", 
        "基隆市", "新竹市", "嘉義市", "彰化縣", "屏東縣", "宜蘭縣"
    ]
    venue_types = ["文化中心", "體育館", "展覽館", "表演廳", "會議中心", "音樂廳"]
    road_types = ["路", "街", "巷", "大道"]
    
    capacities = [500, 1000, 2000, 5000, 10000]
    capacity_weights = [0.5, 0.3, 0.1, 0.07, 0.03]
    
    data = []
    
    for i in range(1, num + 1):
        city = random.choice(cities)  # 隨機選擇台灣縣市
        venue_type = random.choice(venue_types)  # 場地類型
        venue_name = f"{city}{venue_type}"  # 場地名稱

        # 生產台灣格式地址
        address = f"{city}{fake.street_name()}{random.choice(road_types)}{random.randint(1, 300)}號"

        data.append({
            "v_id": i,
            "v_name": venue_name,
            "address": address,
            "capacity": random.choices(capacities, weights=capacity_weights, k=1)[0],  # 按權重分配容納量
            "contact_info": generate_tw_phone_number()  # 電話格式
        })
    
    return pd.DataFrame(data)


# EVENT
def generate_events(categories, organizers, num=200):
    # 定义展演类型的示例数据
    event_types = {
        "1": ["Annual Tech Expo", "Innovators Meetup", "Coding Hackathon"],  # 技术类
        "2": ["Modern Art Showcase", "National Art Fair", "Gallery Nights"], # 艺术类
        "3": ["Jazz Festival", "Pop Concert", "Symphony Night"],             # 音乐类
        "4": ["Fitness Challenge", "Yoga Retreat", "Outdoor Marathon"],      # 健身类
        "5": ["Literary Festival", "Book Reading", "Cultural Showcase"]      # 文化类
    }

    data = []
    for i in range(1, num + 1):
        # 根据类别选择活动类型
        category_id = str(random.choice(categories["c_id"]))  # 假设 c_id 是整数，转为字符串用于查找
        type_ = random.choice(event_types.get(category_id, ["Special Event"]))

        # 生成动态标题：地点 + 时间 + 活动类型
        city = fake_en.city()
        
        # 设置年份：前两年、当前年或下一年
        year = datetime.now().year + random.choice([-2, -1, 0, 1])  # 前两年、当前年或下一年
        event_name = f"{year} {type_} in {city}"

        # 活动时间随机生成
        event_date = datetime.now() - timedelta(days=random.randint(1, 365))

        # 填充活动数据
        data.append({
            "e_id": i,
            "e_name": event_name,
            "c_id": category_id,
            "o_id": random.choice(organizers["o_id"]),
            "e_datetime": event_date,
            "e_location": city,
            "description": ""
        })
    
    return pd.DataFrame(data)


# EVENT_VENUE
def generate_event_venues(events, venues):
    data = []
    for e_id in events["e_id"]:
        v_id = random.choice(venues["v_id"])
        data.append({
            "e_id": e_id,
            "v_id": v_id,
            "arrangement": ""
        })
    return pd.DataFrame(data)

# TICKET
def generate_tickets(events, max_tickets_per_event=4):
    data = []
    t_id = 1  # 用于生成唯一的票券 ID

    for _, event in events.iterrows():
        used_types = set()  # 记录当前 e_id 已使用的票种
        num_ticket_types = random.randint(2, max_tickets_per_event)  # 每个活动的票种数

        for _ in range(num_ticket_types):
            while True:
                t_type = random.choice(["Standard", "VIP", "Early Bird", "Group"])
                if t_type not in used_types:  # 确保票种不重复
                    used_types.add(t_type)
                    break  # 找到未使用的票种，跳出循环

            # 生成票券信息
            total_quantity = random.randint(50, 500)
            remain_quantity = random.randint(0, total_quantity)  # 确保剩余数量合法
            if t_type == "VIP":
                price = random.randint(150, 300)  # VIP 票价较高
            elif t_type == "Standard":
                price = random.randint(50, 150)  # Standard 票价
            elif t_type == "Early Bird":
                price = random.randint(30, 100)  # Early Bird 票价较低
            elif t_type == "Group":
                price = random.randint(100, 200)  # Group 票价适中

            data.append({
                "t_id": t_id,
                "e_id": event["e_id"],  # 关联的 event ID
                "t_type": t_type,
                "price": price,
                "total_quantity": total_quantity,
                "remain_quantity": remain_quantity
            })
            t_id += 1  # 确保票券 ID 唯一
    
    return pd.DataFrame(data)


# ORDER
def generate_orders(customers, num=114514):
    # 仅选择角色为 "User" 的客户
    user_customers = customers[customers["role"] == "User"]
    
    data = []
    for i in range(1, num + 1):
        # 随机选择一个 "User" 角色的客户
        cu_id = random.choice(user_customers["cu_id"].values)  # Use .values to get the array of cu_id
        
        # 随机选择支付状态
        payment_status = random.choice(["Pending", "Paid", "Failed"])
        is_canceled = random.choice([True, False]) if payment_status != "Paid" else False
        
        # 生成订单数据
        data.append({
            "or_id": i,
            "cu_id": cu_id,
            "or_date": datetime.now() - timedelta(days=random.randint(1, 365)),
            "total_amount": round(random.uniform(50, 500), 2),
            "payment_status": payment_status,
            "is_canceled": is_canceled
        })
    
    return pd.DataFrame(data)


# ORDER_DETAIL
def generate_order_details(orders, tickets):
    data = []
    for or_id in orders["or_id"]:
        num_details = random.randint(1, 3)  # 每个订单包含 1-3 条详情记录
        ticket_ids = random.sample(list(tickets["t_id"]), num_details)
        quantities = [random.randint(1, 5) for _ in range(num_details)]
        
        subtotals = []
        for t_id, quantity in zip(ticket_ids, quantities):
            ticket_price = tickets.loc[tickets["t_id"] == t_id, "price"].values[0]
            subtotal = round(ticket_price * quantity, 2)  # 根据票价和数量计算小计
            subtotals.append(subtotal)

        # 确保小计的总和不超过订单金额
        total_amount = orders.loc[orders["or_id"] == or_id, "total_amount"].values[0]
        if sum(subtotals) > total_amount:
            scaling_factor = total_amount / sum(subtotals)
            subtotals = [round(sub * scaling_factor, 2) for sub in subtotals]
            subtotals[-1] += round(total_amount - sum(subtotals), 2)  # 调整最后一项确保总和一致
        
        for t_id, quantity, subtotal in zip(ticket_ids, quantities, subtotals):
            data.append({
                "or_id": or_id,
                "t_id": t_id,
                "quantity": quantity,
                "subtotal": subtotal
            })
    return pd.DataFrame(data)


# PAYMENT
def generate_payments(orders):
    data = []
    # 篩選出支付成功且未取消的訂單
    paid_orders = orders[(orders["payment_status"] == "Paid") & (orders["is_canceled"] == False)]
    
    p_id = 1
    
    for or_id in paid_orders["or_id"]:
        # 獲取該訂單的 total_amount
        total_amount = orders.loc[orders["or_id"] == or_id, "total_amount"].values[0]
        data.append({
            "p_id": p_id,  # 支付 ID 與訂單 ID 對應
            "or_id": or_id,
            "payment_method": random.choice([
                "Credit Card", "PayPal", "Bank Transfer", "Apple Pay", "Google Pay", 
                "Samsung Pay"
            ]),
            "payment_datetime": datetime.now() - timedelta(days=random.randint(1, 365)),
            "amount": total_amount  # 使用訂單的 total_amount
        })
        p_id += 1
        
    return pd.DataFrame(data)


# 將數據保存到 Excel
def save_to_excel(tables):
    with pd.ExcelWriter(OUTPUT_FILE, engine="openpyxl") as writer:
        for sheet_name, df in tables.items():
            df.to_excel(writer, index=False, sheet_name=sheet_name)
    print(f"Data has been saved to {OUTPUT_FILE}")

# 主函數
def main():
    categories = generate_categories()
    organizers = generate_organizers()
    customers = generate_customers()
    venues = generate_venues()
    events = generate_events(categories, organizers)
    event_venues = generate_event_venues(events, venues)
    tickets = generate_tickets(events)
    orders = generate_orders(customers)
    order_details = generate_order_details(orders, tickets)
    payments = generate_payments(orders)

    tables = {
        "CATEGORY": categories,
        "ORGANIZER": organizers,
        "CUSTOMER": customers,
        "VENUE": venues,
        "EVENT": events,
        "EVENT_VENUE": event_venues,
        "TICKET": tickets,
        "ORDER": orders,
        "ORDER_DETAIL": order_details,
        "PAYMENT": payments
    }

    save_to_excel(tables)

if __name__ == "__main__":
    main()


Data has been saved to schema_data.xlsx


In [14]:
import pandas as pd
import psycopg2
from psycopg2.extras import execute_values

# 连接到 PostgreSQL 数据库
def connect_to_postgresql():
    return psycopg2.connect(
        dbname="ticket_system",  # 替换为你的数据库名
        user="postgres",   # 替换为你的用户名
        password="1234",  # 替换为你的密码
        host="localhost",       # 或其他主机名
        port="5432"             # PostgreSQL 默认端口
    )

# 清空所有表的数据
def clear_all_tables(conn):
    try:
        with conn.cursor() as cur:
            # 获取所有表名
            cur.execute("""
                SELECT table_name FROM information_schema.tables
                WHERE table_schema = 'public';
            """)
            tables = cur.fetchall()
            
            # 遍历所有表并清空它们
            for table in tables:
                table_name = table[0]
                
                # 如果表名是保留字或特殊表名，添加双引号
                if table_name.upper() == "ORDER":
                    table_name = f'"{table_name}"'
                
                print(f"Clearing data from {table_name}...")
                cur.execute(f"TRUNCATE TABLE {table_name} CASCADE;")
            
            conn.commit()
            print("All tables cleared successfully.")
    except Exception as e:
        print(f"Error clearing tables: {e}")
        raise


# 将 DataFrame 插入 PostgreSQL
def insert_dataframe_to_table(conn, df, table_name):
    try:
        # 获取列名
        columns = list(df.columns)
        values = [tuple(x) for x in df.to_numpy()]
        
        # 如果表名是保留字或包含特殊字符，用双引号包裹
        if table_name.upper() in ["ORDER"]:  # 添加需要特别处理的表名
            table_name = f'"{table_name}"'
        
        # 创建插入 SQL
        insert_query = f"INSERT INTO {table_name} ({', '.join(columns)}) VALUES %s"
        
        with conn.cursor() as cur:
            execute_values(cur, insert_query, values)
        conn.commit()
        print(f"Data inserted into {table_name} successfully.")
    except Exception as e:
        print(f"Error inserting data into table {table_name}: {e}")
        raise

# 主函数
def main():
    # 读取 Excel 文件
    data = pd.read_excel("schema_data.xlsx", sheet_name=None)  # 加载所有表
    
    # 连接到 PostgreSQL
    conn = connect_to_postgresql()
    
    try:
        # 清空所有表的数据
        clear_all_tables(conn)
        
        # 遍历 Excel 中的所有表
        for table_name, df in data.items():
            print(f"Inserting data into table: {table_name}...")
            insert_dataframe_to_table(conn, df, table_name)
    except Exception as e:
        print("An error occurred during the process:")
        print(e)
    finally:
        conn.close()

if __name__ == "__main__":
    main()


Clearing data from category...
Clearing data from event...
Clearing data from ticket...
Clearing data from order_detail...
Clearing data from organizer...
Clearing data from customer...
Clearing data from event_venue...
Clearing data from venue...
Clearing data from "ORDER"...
Clearing data from payment...
All tables cleared successfully.
Inserting data into table: CATEGORY...
Data inserted into CATEGORY successfully.
Inserting data into table: ORGANIZER...
Data inserted into ORGANIZER successfully.
Inserting data into table: CUSTOMER...
Data inserted into CUSTOMER successfully.
Inserting data into table: VENUE...
Data inserted into VENUE successfully.
Inserting data into table: EVENT...
Data inserted into EVENT successfully.
Inserting data into table: EVENT_VENUE...
Data inserted into EVENT_VENUE successfully.
Inserting data into table: TICKET...
Data inserted into TICKET successfully.
Inserting data into table: ORDER...
Data inserted into "ORDER" successfully.
Inserting data into tab

In [3]:
import psycopg2
import pandas as pd
import random

# 建立資料庫連線
conn = psycopg2.connect(
    dbname="ticket_system",  # 替换为你的数据库名
    user="postgres",   # 替换为你的用户名
    password="1234",  # 替换为你的密码
    host="localhost",       # 或其他主机名
    port="5432"             # PostgreSQL 默认端口
)
cursor = conn.cursor()

# 讀取 e_id > 200 的資料
query = "SELECT * FROM EVENT WHERE e_id > 200"
cursor.execute(query)
rows = cursor.fetchall()
columns = [desc[0] for desc in cursor.description]
events = pd.DataFrame(rows, columns=columns)  # 將結果轉為 DataFrame

def generate_tickets(events, max_tickets_per_event=4):
    data = []

    for _, event in events.iterrows():
        used_types = set()
        num_ticket_types = random.randint(2, max_tickets_per_event)

        for _ in range(num_ticket_types):
            while True:
                t_type = random.choice(["Standard", "VIP", "Early Bird", "Group"])
                if t_type not in used_types:
                    used_types.add(t_type)
                    break

            total_quantity = random.randint(50, 500)
            remain_quantity = random.randint(0, total_quantity)
            if t_type == "VIP":
                price = random.randint(150, 300)
            elif t_type == "Standard":
                price = random.randint(50, 150)
            elif t_type == "Early Bird":
                price = random.randint(30, 100)
            elif t_type == "Group":
                price = random.randint(100, 200)

            data.append({
                "e_id": event["e_id"],
                "t_type": t_type,
                "price": price,
                "total_quantity": total_quantity,
                "remain_quantity": remain_quantity
            })
    
    return pd.DataFrame(data)


# 生成票券數據
tickets = generate_tickets(events)
# 將 TICKET 資料插入資料庫
insert_query = """
    INSERT INTO TICKET (e_id, t_type, price, total_quantity, remain_quantity)
    VALUES (%s, %s, %s, %s, %s)
"""

for _, row in tickets.iterrows():
    cursor.execute(
        insert_query,
        (row['e_id'], row['t_type'], row['price'], row['total_quantity'], row['remain_quantity'])
    )

# 提交變更
conn.commit()

