In [2]:
import pandas as pd
import numpy as np
import time
import random
from datetime import datetime, timedelta
import sqlalchemy
from sqlalchemy import create_engine, text

# Database connection
engine = create_engine('postgresql+psycopg2://postgres:Ilialimonov05@localhost:5432/tableau_project')

# Global variable to track invoice number
next_invoice_number = None

# Load existing data to understand patterns
def load_reference_data():
    """Load existing sales data to understand patterns"""
    global next_invoice_number
    
    # Get random 1000 samples for price/quantity patterns
    sales_df = pd.read_sql("""
        SELECT * FROM sales 
        ORDER BY RANDOM() 
        LIMIT 1000
    """, engine)
    
    # Get the most recent invoice number
    latest_invoice = pd.read_sql("""
        SELECT invoice 
        FROM sales 
        ORDER BY invoice_date DESC 
        LIMIT 1
    """, engine)
    
    if len(latest_invoice) > 0:
        last_invoice = str(latest_invoice.iloc[0]['invoice'])
        try:
            next_invoice_number = int(last_invoice) + 1
        except ValueError:
            next_invoice_number = 538172
    else:
        next_invoice_number = 538172
    
    customers_df = pd.read_sql("SELECT customer_id FROM customers", engine)
    
    print(f"üìã Starting invoice numbers from: {next_invoice_number}")
    
    return sales_df, customers_df

# Generate random sale
def generate_random_sale(sales_df, customers_df):
    """Generate a realistic random sale based on existing patterns"""
    global next_invoice_number
    
    # Reference point: Real date when last sale in DB occurred
    real_reference_date = datetime(2026, 1, 6)
    sim_reference_date = datetime(2010, 12, 10)
    
    days_elapsed = (datetime.now() - real_reference_date).days
    invoice_date = sim_reference_date + timedelta(days=days_elapsed)
    invoice_date += timedelta(
        hours=random.randint(0, 23),
        minutes=random.randint(0, 59),
        seconds=random.randint(0, 59)
    )
    
    # Random customer
    if random.random() < 0.1 and len(customers_df) > 0:
        customer_id = random.choice(customers_df['customer_id'].tolist())
    else:
        customer_id = random.randint(10000, 99999)
    
    # Get random stock_code and description from existing data
    if len(sales_df) > 0 and 'stock_code' in sales_df.columns and 'description' in sales_df.columns:
        random_product = sales_df.sample(n=1).iloc[0]
        stock_code = random_product['stock_code']
        description = random_product['description']
    else:
        stock_code = f"SKU{random.randint(10000, 99999)}"
        description = f"Product Item {random.randint(1, 100)}"
    
    # Random price and quantity
    if len(sales_df) > 0:
        price_mean = sales_df['price'].mean()
        price_std = sales_df['price'].std()
        price = max(1.0, np.random.normal(price_mean, price_std))
        
        quantity_mean = sales_df['quantity'].mean()
        quantity = max(1, int(np.random.poisson(quantity_mean)))
    else:
        price = round(random.uniform(10, 500), 2)
        quantity = random.randint(1, 5)
    
    # Generate sequential invoice number
    invoice = str(next_invoice_number)
    next_invoice_number += 1
    
    country = "United Kingdom"
    
    sale = {
        'invoice': invoice,
        'stock_code': stock_code,
        'description': description,
        'quantity': quantity,
        'invoice_date': invoice_date,
        'price': round(price, 2),
        'customer_id': customer_id,
        'country': country
    }
    
    return sale

# Update customer metrics after new sale
def update_customer_metrics(customer_id):
    """Recalculate all customer metrics for a specific customer"""
    
    with engine.connect() as conn:
        # Update LTV
        conn.execute(text("""
            UPDATE customers c
            SET ltv = sub.lifetime_value
            FROM (
                SELECT
                    customer_id,
                    SUM(price * quantity) AS lifetime_value
                FROM sales
                WHERE price > 0 AND quantity > 0
                    AND customer_id = :cust_id
                GROUP BY customer_id
            ) sub
            WHERE c.customer_id = sub.customer_id
        """), {"cust_id": customer_id})
        
        # Update last_purchase
        conn.execute(text("""
            UPDATE customers c
            SET last_purchase = sub.last_order
            FROM (
                SELECT
                    customer_id,
                    MAX(invoice_date) AS last_order
                FROM sales
                WHERE customer_id = :cust_id
                GROUP BY customer_id
            ) sub
            WHERE c.customer_id = sub.customer_id
        """), {"cust_id": customer_id})
        
        # Update first_purchase if this is a new customer
        conn.execute(text("""
            UPDATE customers c
            SET first_purchase = sub.first_order
            FROM (
                SELECT
                    customer_id,
                    MIN(invoice_date) AS first_order
                FROM sales
                WHERE customer_id = :cust_id
                GROUP BY customer_id
            ) sub
            WHERE c.customer_id = sub.customer_id
                AND c.first_purchase IS NULL
        """), {"cust_id": customer_id})
        
        # Update importance_score
        conn.execute(text("""
            WITH customer_metrics AS (
                SELECT
                    customer_id,
                    SUM(price * quantity) AS total_spend,
                    COUNT(DISTINCT invoice_date) AS order_count,
                    MAX(invoice_date) AS last_order_date,
                    MIN(invoice_date) AS first_order_date
                FROM sales
                WHERE customer_id = :cust_id
                GROUP BY customer_id
            ),
            year_bounds AS (
                SELECT
                    MIN(invoice_date) AS year_start,
                    MAX(invoice_date) AS year_end,
                    EXTRACT(EPOCH FROM (MAX(invoice_date) - MIN(invoice_date))) / 86400 AS days_in_period
                FROM sales
            ),
            importance_metrics AS (
                SELECT
                    cm.customer_id,
                    cm.total_spend,
                    cm.order_count,
                    EXTRACT(EPOCH FROM (cm.last_order_date - yb.year_start)) / 86400 / 
                        NULLIF(yb.days_in_period, 0) AS recency_ratio
                FROM customer_metrics cm
                CROSS JOIN year_bounds yb
            )
            UPDATE customers c
            SET importance_score = (
                (im.total_spend / 1000.0) * 0.6 +
                (im.order_count) * 0.3 +
                (im.recency_ratio * 10) * 0.1
            )
            FROM importance_metrics im
            WHERE c.customer_id = im.customer_id
                AND c.customer_id = :cust_id
        """), {"cust_id": customer_id})
        
        # Update risky_to_lose flag
        conn.execute(text("""
            WITH customer_segments AS (
                SELECT
                    customer_id,
                    importance_score,
                    NTILE(4) OVER (ORDER BY importance_score DESC) AS importance_quartile
                FROM customers
                WHERE importance_score IS NOT NULL
            ),
            risk_thresholds AS (
                SELECT
                    customer_id,
                    CASE importance_quartile
                        WHEN 1 THEN 45
                        WHEN 2 THEN 75
                        WHEN 3 THEN 120
                        WHEN 4 THEN 180
                    END AS days_threshold
                FROM customer_segments
                WHERE customer_id = :cust_id
            ),
            data_window AS (
                SELECT MAX(invoice_date) AS data_end_date
                FROM sales
            )
            UPDATE customers c
            SET risky_to_lose = CASE
                WHEN EXTRACT(EPOCH FROM (dw.data_end_date - c.last_purchase)) / 86400 > rt.days_threshold
                THEN TRUE
                ELSE FALSE
            END
            FROM risk_thresholds rt
            CROSS JOIN data_window dw
            WHERE c.customer_id = rt.customer_id
                AND c.customer_id = :cust_id
        """), {"cust_id": customer_id})
        
        conn.commit()

# Insert sale and update customer
def insert_sale(sale):
    """Insert a single sale record and update customer metrics"""
    customer_id = sale['customer_id']
    
    # Check if customer exists, if not create them
    with engine.connect() as conn:
        result = conn.execute(
            text("SELECT customer_id FROM customers WHERE customer_id = :cust_id"),
            {"cust_id": customer_id}
        )
        if result.fetchone() is None:
            # Create new customer
            conn.execute(
                text("""
                    INSERT INTO customers (customer_id, first_purchase)
                    VALUES (:cust_id, :first_purchase)
                """),
                {"cust_id": customer_id, "first_purchase": sale['invoice_date']}
            )
            conn.commit()
            print(f"üÜï Created new customer: {customer_id}")
    
    # Insert the sale
    df = pd.DataFrame([sale])
    df.to_sql('sales', engine, if_exists='append', index=False)
    
    # Update customer metrics
    update_customer_metrics(customer_id)
    
    print(f"‚úÖ Invoice {sale['invoice']}: Customer {sale['customer_id']}, {sale['description']}, ${sale['price']} x {sale['quantity']} = ${sale['price'] * sale['quantity']:.2f}")

# Main bot loop
def run_sales_bot(min_interval=5, max_interval=30):
    """Run bot that inserts random sales at random intervals"""
    print("ü§ñ Sales Bot Starting...")
    print(f"üìä Generating sales every {min_interval}-{max_interval} seconds")
    print("Press Ctrl+C to stop\n")
    
    sales_df, customers_df = load_reference_data()
    
    try:
        while True:
            sale = generate_random_sale(sales_df, customers_df)
            insert_sale(sale)
            
            wait_time = random.randint(min_interval, max_interval)
            print(f"‚è≥ Waiting {wait_time} seconds until next sale...\n")
            time.sleep(wait_time)
            
    except KeyboardInterrupt:
        print("\nüõë Sales Bot Stopped")
        print(f"üìã Next invoice number would be: {next_invoice_number}")

if __name__ == "__main__":
    run_sales_bot(min_interval=10, max_interval=60)


ü§ñ Sales Bot Starting...
üìä Generating sales every 10-60 seconds
Press Ctrl+C to stop

üìã Starting invoice numbers from: 538370
üÜï Created new customer: 86897
‚úÖ Invoice 538370: Customer 86897, LADYBIRD + BEE RAFFIA FOOD COVER, $1.0 x 6 = $6.00
‚è≥ Waiting 30 seconds until next sale...

üÜï Created new customer: 41796
‚úÖ Invoice 538371: Customer 41796, FLAG OF ST GEORGE CAR FLAG, $10.62 x 9 = $95.58
‚è≥ Waiting 30 seconds until next sale...


üõë Sales Bot Stopped
üìã Next invoice number would be: 538372
