In [1]:
import redis
import random
import time
import json
import threading

# Configuration
VALKEY_HOST = 'localhost'      # Host where Valkey server is running
VALKEY_PORT = 6379             # Default Valkey/Redis port
CHANNEL = 'trades'             # Channel to publish trading batches

# Connect to Valkey server (acts as the publisher client)
client = redis.Redis(host=VALKEY_HOST, port=VALKEY_PORT)

# Predefined lists for simulated trade generation
cryptos = ['BTC', 'ETH', 'SOL', 'ADA', 'XRP']       # Traded assets
actions = ['buy', 'sell']                           # Trade directions
traders = ['Asteroid', 'Exoplanet']                 # Simulated trader identities
frequency = 20                                      # Seconds between sending full round of trader batches

print(f"📤 Publishing batches to '{CHANNEL}'... (Ctrl+C to stop)")

def send_batch(trader: str):
    """
    Builds and publishes a JSON-encoded trade batch for a given trader.
    
    - Randomly delays sending (to simulate network or thinking time)
    - Includes 1 to 2 trade actions
    - Publishes the batch to the Valkey Pub/Sub channel
    """
    time.sleep(random.uniform(0.5, 3.0))  # Introduce delay per trader batch for realism

    batch = {
        "timestamp": int(time.time()),  # UNIX timestamp for batch
        "trader": trader,
        "actions": [
            {
                "crypto": random.choice(cryptos),
                "action": random.choice(actions),
                "quantity": round(random.uniform(0.01, 10.0), 2)  # Random quantity: 0.01 to 10.00
            }
            for _ in range(random.randint(1, 2))  # Each batch contains 1 or 2 actions
        ]
    }

    # Serialize to JSON and publish
    payload = json.dumps(batch)
    client.publish(CHANNEL, payload)

    print(f"🆕 Published batch from {trader}: {payload}")


try:
    while True:
        threads = []

        # Start a thread per trader to simulate parallel batch generation
        for trader in traders:
            t = threading.Thread(target=send_batch, args=(trader,), daemon=True)
            t.start()
            threads.append(t)

        # Ensure all trader threads finish before next round
        for t in threads:
            t.join()

        # Sleep between batch cycles (one full round of all traders)
        time.sleep(frequency)

except KeyboardInterrupt:
    # Allow clean exit on Ctrl+C
    print("\n❌ Stopped publisher.")


📤 Publishing batches to 'trades'... (Ctrl+C to stop)
🆕 Published batch from Asteroid: {"timestamp": 1750927018, "trader": "Asteroid", "actions": [{"crypto": "XRP", "action": "buy", "quantity": 4.27}, {"crypto": "SOL", "action": "buy", "quantity": 0.17}]}
🆕 Published batch from Exoplanet: {"timestamp": 1750927019, "trader": "Exoplanet", "actions": [{"crypto": "ETH", "action": "sell", "quantity": 7.77}]}
🆕 Published batch from Asteroid: {"timestamp": 1750927040, "trader": "Asteroid", "actions": [{"crypto": "ETH", "action": "buy", "quantity": 3.77}]}
🆕 Published batch from Exoplanet: {"timestamp": 1750927042, "trader": "Exoplanet", "actions": [{"crypto": "XRP", "action": "buy", "quantity": 6.39}]}
🆕 Published batch from Asteroid: {"timestamp": 1750927063, "trader": "Asteroid", "actions": [{"crypto": "SOL", "action": "sell", "quantity": 0.42}]}
🆕 Published batch from Exoplanet: {"timestamp": 1750927063, "trader": "Exoplanet", "actions": [{"crypto": "ETH", "action": "sell", "quantity": 3.74