In [1]:
import os
from dotenv import load_dotenv
import psycopg2
from psycopg2.extras import RealDictCursor
import pandas as pd

# Load environment variables
load_dotenv()

# Get Supabase connection string
SUPABASE_CONNECTION_STRING = os.getenv("SUPABASE_CONNECTION_STRING")

# Create connection function
def get_db_connection():
    """
    Create and return a connection to Supabase PostgreSQL database
    """
    return psycopg2.connect(SUPABASE_CONNECTION_STRING)

# Test connection
try:
    conn = get_db_connection()
    print("[OK] Connected to Supabase successfully!")
    conn.close()
except Exception as e:
    print(f"[ERROR] Connection failed: {e}")


[OK] Connected to Supabase successfully!


# JWT Token Generation for API Testing

This notebook demonstrates how to generate JWT access tokens for testing API endpoints that require authentication.

In [8]:
import jwt
from datetime import datetime, timedelta, timezone
import os
from dotenv import load_dotenv

# Load environment variables
load_dotenv()

def generate_access_token(user_id, role, remember_me=False):
    """
    Generate JWT access token
    
    Args:
        user_id: User ID
        role: User role
        remember_me: If True, token expires in 3 days, otherwise 1 day
    
    Returns:
        JWT token string
    """
    payload = {
        'user_id': user_id,
        'role': role,
        'exp': datetime.now(timezone.utc) + timedelta(days=3 if remember_me else 1)
    }
    
    # Get secret from environment or use default for testing
    secret = os.getenv('JWT_SECRET', 'your-secret-key-here')
    
    token = jwt.encode(payload, secret, algorithm='HS256')
    return token

In [3]:
def get_all_users ():
    conn = get_db_connection()
    cursor = conn.cursor(cursor_factory=RealDictCursor)
    
    query = "SELECT * FROM users"
    cursor.execute(query)
    results = cursor.fetchall()
    
    df = pd.DataFrame(results)
    
    cursor.close()
    conn.close()
    
    return df


In [4]:
def get_all_products ():
    conn = get_db_connection()
    cursor = conn.cursor(cursor_factory=RealDictCursor)
    
    query = "SELECT * FROM products"
    cursor.execute(query)
    results = cursor.fetchall()
    
    df = pd.DataFrame(results)
    
    cursor.close()
    conn.close()
    
    return df


In [5]:
def bid_amount_array(product_id):
    """
    Generate array of valid bid amounts for a product
    
    Args:
        product_id: Product ID
    
    Returns:
        List of valid bid amounts: [start_price, start_price + 1*step, ..., start_price + 10*step]
    """
    conn = get_db_connection()
    cursor = conn.cursor(cursor_factory=RealDictCursor)
    
    # Get start_price and step_price from products table
    query = "SELECT current_price, step_price FROM products WHERE product_id = %s"
    cursor.execute(query, (product_id,))
    result = cursor.fetchone()
    
    cursor.close()
    conn.close()
    
    if not result:
        return []
    
    current_price = result['current_price']
    step_price = result['step_price']
    
    # Generate valid bid amounts: start_price + x*step_price where x = 1, 2, 3, 4
    valid_amounts = [current_price + (x * step_price) for x in range(1, 5)]

    return valid_amounts



In [6]:
import random

def get_random_users(product_id, count=5):
    """
    Get random users for testing bidding on a product
    
    Args:
        product_id: Product ID to check existence
        count: Number of random users to return (default: 5)
    
    Returns:
        List of user dictionaries, or None if product doesn't exist
    """
    conn = get_db_connection()
    cursor = conn.cursor(cursor_factory=RealDictCursor)
    
    # Check if product exists
    check_query = "SELECT product_id FROM products WHERE product_id = %s"
    cursor.execute(check_query, (product_id,))
    product = cursor.fetchone()
    
    if not product:
        cursor.close()
        conn.close()
        print(f"[ERROR] Product ID {product_id} does not exist!")
        return None
    
    # Get all users
    users_query = "SELECT user_id, username, email, role FROM users"
    cursor.execute(users_query)
    all_users = cursor.fetchall()
    
    cursor.close()
    conn.close()
    
    if len(all_users) < count:
        print(f"[WARNING] Only {len(all_users)} users available, returning all")
        return list(all_users)
    
    # Randomly select users
    random_users = random.sample(all_users, count)
    
    return random_users


In [7]:
import time
import requests

# ==============================================================================
# Bot Auction Bidding Simulation
# ==============================================================================

def run_simulation_for_product(product_id):
    """Run bidding simulation for a specific product"""
    print(f"\n{'='*60}")
    print(f"Starting Bot Auction Simulation for Product {product_id}")
    print(f"{'='*60}\n")
    
    # Get random users
    users = get_random_users(product_id, count=5)
    if not users:
        print("[ERROR] Cannot proceed - product does not exist")
        return None
    
    # Generate JWT tokens for each user
    user_tokens = {}
    for user in users:
        token = generate_access_token(user['user_id'], user['role'], remember_me=False)
        user_tokens[user['user_id']] = token

    
    # API endpoint
    api_url = "http://localhost:5000/api/bid/play"
    
    # Random number of bids (5-10)
    num_bids = random.randint(5, 10)
    
    # Simulate bidding
    successful_bids = 0
    failed_bids = 0
    
    for i in range(num_bids):
        # Random select user
        user = random.choice(users)
        user_id = user['user_id']
        username = user['username']
        
        # Get valid bid amounts for this product (refresh each time)
        valid_bids = bid_amount_array(product_id)
        if not valid_bids:
            print(f"[WARNING] No valid bids available, skipping...")
            continue
            
        max_price = random.choice(valid_bids)
        
        # Get token for this user
        token = user_tokens[user_id]
        
        # Prepare request
        cookies = {'accessToken': token}
        payload = {
            'product_id': product_id,
            'max_price': max_price
        }
        
        print(f"[{i+1}/{num_bids}] User: {username} (ID: {user_id}) - Bidding: {max_price:,.0f} VND")
        
        try:
            response = requests.post(api_url, cookies=cookies, json=payload)
            
            if response.status_code == 200:
                result = response.json()
                print(f"        [SUCCESS] {result.get('message', 'Bid placed')}")
                successful_bids += 1
            else:
                print(f"        [FAILED] Status {response.status_code} - {response.text[:100]}")
                failed_bids += 1
        except Exception as e:
            print(f"        [ERROR] {str(e)[:100]}")
            failed_bids += 1
        
        # Add delay between requests (0.5-2 seconds)
        delay = random.uniform(1.5, 3.0)
        time.sleep(delay)
    
    # Summary
    print(f"\n{'='*60}")
    print(f"Simulation Complete for Product {product_id}!")
    print(f"{'='*60}")
    print(f"Successful bids: {successful_bids}")
    print(f"Failed bids: {failed_bids}")
    print(f"Success rate: {(successful_bids/num_bids)*100:.1f}%")
    print(f"{'='*60}\n")
    
    return {
        'product_id': product_id,
        'successful': successful_bids,
        'failed': failed_bids,
        'total': num_bids
    }

# ==============================================================================
# Main Menu
# ==============================================================================

print("\n" + "="*60)
print("Bot Auction Bidding Simulation")
print("="*60)
print("Select mode:")
print("  1. Simulate for a specific product ID")
print("  2. Simulate for 10 random products")
print("="*60)

mode = input("Enter your choice (1 or 2): ").strip()

if mode == "1":
    # Single product mode
    product_id_input = input("Enter product_id: ")
    product_id = int(product_id_input)
    run_simulation_for_product(product_id)
    
elif mode == "2":
    # Multiple products mode
    print("\n[INFO] Fetching 10 random products...")
    
    conn = get_db_connection()
    cursor = conn.cursor(cursor_factory=RealDictCursor)
    
    # Get 10 random products
    query = """
        SELECT product_id, product_name, current_price, start_price, step_price 
        FROM products 
        WHERE end_time > NOW()
        ORDER BY RANDOM() 
        LIMIT 10
    """
    cursor.execute(query)
    products = cursor.fetchall()
    
    cursor.close()
    conn.close()
    
    if not products:
        print("[ERROR] No products found in database")
    else:
        print(f"[OK] Found {len(products)} products\n")
        
        # Display products
        for idx, prod in enumerate(products, 1):
            print(f"{idx}. Product {prod['product_id']}: {prod['product_name']} - Current: {prod['current_price']:,.0f} VND")
        
        # Run simulation for each product
        results = []
        for idx, prod in enumerate(products, 1):
            print(f"\n{'#'*60}")
            print(f"Processing {idx}/{len(products)}: Product {prod['product_id']}")
            print(f"{'#'*60}")
            
            result = run_simulation_for_product(prod['product_id'])
            if result:
                results.append(result)
            
            # Delay between products
            if idx < len(products):
                time.sleep(2)
        
        # Final summary
        print("\n" + "="*60)
        print("FINAL SUMMARY - ALL PRODUCTS")
        print("="*60)
        total_success = sum(r['successful'] for r in results)
        total_failed = sum(r['failed'] for r in results)
        total_bids = sum(r['total'] for r in results)
        
        print(f"Total products simulated: {len(results)}")
        print(f"Total bids placed: {total_bids}")
        print(f"Total successful: {total_success}")
        print(f"Total failed: {total_failed}")
        print(f"Overall success rate: {(total_success/total_bids)*100:.1f}%")
        print("="*60 + "\n")
        
else:
    print("[ERROR] Invalid choice. Please select 1 or 2.")



Bot Auction Bidding Simulation
Select mode:
  1. Simulate for a specific product ID
  2. Simulate for 10 random products

Starting Bot Auction Simulation for Product 37



  'exp': datetime.utcnow() + timedelta(days=3 if remember_me else 1)


[1/7] User: Tien Thanh (ID: 77) - Bidding: 4,360,000 VND
        [SUCCESS] Bid placed
[2/7] User: wnguyen (ID: 42) - Bidding: 3,920,000 VND
        [SUCCESS] Bid placed
[3/7] User: Nguyễn Thanh Tiến (ID: 17) - Bidding: 4,800,000 VND
        [SUCCESS] Bid placed
[4/7] User: dangjane (ID: 60) - Bidding: 5,240,000 VND
        [SUCCESS] Bid placed
[5/7] User: dangjane (ID: 60) - Bidding: 7,000,000 VND
        [SUCCESS] Bid placed
[6/7] User: Tien Thanh (ID: 77) - Bidding: 6,560,000 VND
        [SUCCESS] Bid placed
[7/7] User: Tien Thanh (ID: 77) - Bidding: 7,000,000 VND
        [SUCCESS] Bid placed

Simulation Complete for Product 37!
Successful bids: 7
Failed bids: 0
Success rate: 100.0%

