<a href="https://colab.research.google.com/github/collen1/collen1/blob/main/trade_v1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import requests
import websocket
import json
import time
import threading
import ssl
from datetime import datetime

class KuCoinWebSocket:
    def __init__(self):
        self.ws = None
        self.token = None
        self.endpoint = None
        self.keep_running = True
        self.reconnect_delay = 5
        self.ping_interval = 20
        self.last_message_time = time.time()
        self.debug = True
        self.symbol = "BTC-USDT"
        self.orderbook = {'bids': {}, 'asks': {}}

    def log(self, message):
        if self.debug:
            print(f"{datetime.now().isoformat()} - {message}")

    def get_websocket_token(self):
        for attempt in range(3):
            # Or self.retries if you define it in __init__

            try:
                self.log(f"Fetching new token (attempt {attempt + 1})...")
                url = "https://api.kucoin.com/api/v1/bullet-public"
                response = requests.post(url, timeout=10)
                response.raise_for_status()
                data = response.json()
                if data.get('code') != '200000':
                    raise Exception(f"API Error: {data.get('msg')}")
                self.token = data["data"]["token"]
                self.endpoint = data["data"]["instanceServers"][0]["endpoint"]
                self.log(f"Token received. Endpoint: {self.endpoint}")
                return True
            except Exception as e:
                self.log(f"❌ Attempt {attempt + 1} failed: {e}")
                time.sleep(2)
                return False

    def initialize_orderbook(self):
        try:
            url = f"https://api.kucoin.com/api/v3/market/orderbook/level2?symbol={self.symbol}"
            response = requests.get(url, timeout=10)
            data = response.json()['data']
            self.orderbook['bids'] = {float(price): float(size) for price, size in data['bids']}
            self.orderbook['asks'] = {float(price): float(size) for price, size in data['asks']}
            self.log("Initial order book snapshot loaded")
        except Exception as e:
            self.log(f"Failed to get initial order book: {str(e)}")

    def on_message(self, ws, message):
        self.last_message_time = time.time()
        try:
            data = json.loads(message)

            if data.get("type") == "welcome":
                self.log("Connection acknowledged by server")
                self.initialize_orderbook()
                self.subscribe_orderbook()

            elif data.get("type") == "ack":
                self.log(f"Subscription acknowledged: {data.get('id')}")

            elif data.get("type") == "message":
                if data.get("subject") == "trade.l2update":
                    self.process_orderbook_update(data['data'])
                else:
                    self.log(f"Unhandled message type: {data.get('subject')}")

            elif data.get("type") == "error":
                self.log(f"SERVER ERROR: {data}")

        except Exception as e:
            self.log(f"Message processing error: {str(e)}")

    # Capture features
    def capture_features(self):

            best_bid = max(self.orderbook['bids']) if self.orderbook['bids'] else 0
            best_ask = min(self.orderbook['asks']) if self.orderbook['asks'] else 0
            spread = best_ask - best_bid
            mid_price = (best_bid + best_ask) / 2 if best_bid and best_ask else 0
            bid_volume = sum(self.orderbook['bids'].values())
            ask_volume = sum(self.orderbook['asks'].values())
            imbalance = (bid_volume - ask_volume) / (bid_volume + ask_volume) if (bid_volume + ask_volume) != 0 else 0

            return {
                'timestamp': datetime.utcnow().isoformat(),
                'best_bid': best_bid,
                'best_ask': best_ask,
                'spread': spread,
                'mid_price': mid_price,
                'imbalance': imbalance,
                'bid_volume': bid_volume,
                'ask_volume': ask_volume
            }
         # Save to file
    def save_feature_row(self, row):

            import csv
            with open("features.csv", "a", newline="") as f:
                writer = csv.DictWriter(f, fieldnames=row.keys())
                writer.writerow(row)


    def process_orderbook_update(self, update_data):
        changes = update_data['changes']

        # Process asks updates
        for price, size, _ in changes['asks']:
            price_f = float(price)
            size_f = float(size)
            if size_f == 0:
                self.orderbook['asks'].pop(price_f, None)
            else:
                self.orderbook['asks'][price_f] = size_f

        # Process bids updates
        for price, size, _ in changes['bids']:
            price_f = float(price)
            size_f = float(size)
            if size_f == 0:
                self.orderbook['bids'].pop(price_f, None)
            else:
                self.orderbook['bids'][price_f] = size_f

        # Get best bid/ask
        best_bid = max(self.orderbook['bids'].keys()) if self.orderbook['bids'] else 0
        best_ask = min(self.orderbook['asks'].keys()) if self.orderbook['asks'] else 0

        self.log(f"Bid: {best_bid} ({self.orderbook['bids'].get(best_bid, 0)}) | "
                f"Ask: {best_ask} ({self.orderbook['asks'].get(best_ask, 0)})")

        # ✅ Capture and save features
        feature_row = self.capture_features()
        self.save_feature_row(feature_row)






    def on_error(self, ws, error):
        self.log(f"WebSocket Error: {error}")
        self.schedule_reconnect()

    def on_close(self, ws, close_status_code, close_msg):
        self.log(f"Closed (Code: {close_status_code}, Reason: {close_msg})")
        self.schedule_reconnect()

    def on_open(self, ws):
        self.log("WebSocket Connected")

    def subscribe_orderbook(self):
        sub_msg = {
            "id": int(time.time() * 1000),
            "type": "subscribe",
            "topic": f"/market/level2:{self.symbol}",
            "privateChannel": False,
            "response": True
        }
        self.ws.send(json.dumps(sub_msg))
        self.log(f"Subscribed to {self.symbol} orderbook")

    def start_ping(self):
        while self.keep_running:
            if time.time() - self.last_message_time > self.ping_interval:
                if self.ws and hasattr(self.ws, 'sock') and self.ws.sock and self.ws.sock.connected:
                    try:
                        self.ws.send(json.dumps({"id": str(int(time.time())), "type": "ping"}))
                        self.log("Sent ping")
                    except Exception as e:
                        self.log(f"Ping failed: {str(e)}")
                        self.schedule_reconnect()
                        break
            time.sleep(1)

    def schedule_reconnect(self):
        if not self.keep_running:
            return
        self.log(f"Reconnecting in {self.reconnect_delay} sec...")
        time.sleep(self.reconnect_delay)
        self.connect()

    def connect(self):
        # Stop any old ping threads if needed
        self.keep_running = False
        time.sleep(1)  # Allow old thread to exit
        self.keep_running = True

        self.log("🔄 Starting connection...")

        if not self.get_websocket_token():
            self.log("❌ Token fetch failed during reconnect.")
            self.schedule_reconnect()
            return

        ws_url = f"{self.endpoint}?token={self.token}&connectId={int(time.time())}"
        self.log(f"Connecting to: {ws_url}")

        self.ws = websocket.WebSocketApp(
            ws_url,
            on_open=self.on_open,
            on_message=self.on_message,
            on_error=self.on_error,
            on_close=self.on_close
        )

        threading.Thread(target=self.start_ping, daemon=True).start()

        try:
            self.ws.run_forever(
                sslopt={"cert_reqs": ssl.CERT_NONE},
                ping_interval=15,
                ping_timeout=10
            )
        except Exception as e:
            self.log(f"❌ run_forever failed: {e}")
            self.schedule_reconnect()

    def run(self):
        """Main entry point to start the WebSocket connection"""
        self.connect()

    def stop(self):
        """Clean shutdown"""
        self.keep_running = False
        if self.ws:
            self.ws.close()
        self.log("WebSocket client stopped")

if __name__ == "__main__":
    while True:
        try:
            client = KuCoinWebSocket()
            client.run()
        except KeyboardInterrupt:
            print("🛑 Manually stopped.")
            try:
                client.stop()
            except:
                pass
            break
        except Exception as e:
            print(f"❌ Fatal Error: {e}. Restarting in 10s...")
            try:
                client.stop()
            except:
                pass
            time.sleep(10)


configuration generated by an older version of XGBoost, please export the model by calling
`Booster.save_model` from that version first, then load it back in current version. See:

    https://xgboost.readthedocs.io/en/stable/tutorials/saving_model.html

for more details about differences between saving model and serializing.



[1;30;43mStreaming output truncated to the last 5000 lines.[0m
2025-07-17T21:37:01.254474 - 📈 Predicted price will go: DOWN
2025-07-17T21:37:01.255492 - Bid: 120927.6 (0.93839681) | Ask: 118431.1 (0.09478478)
2025-07-17T21:37:01.259811 - 📈 Predicted price will go: DOWN
2025-07-17T21:37:01.260896 - Bid: 120927.6 (0.93839681) | Ask: 118431.1 (0.09478478)
2025-07-17T21:37:01.265236 - 📈 Predicted price will go: DOWN
2025-07-17T21:37:01.266193 - Bid: 120927.6 (0.93839681) | Ask: 118431.1 (0.09478478)
2025-07-17T21:37:01.270514 - 📈 Predicted price will go: DOWN
2025-07-17T21:37:01.271608 - Bid: 120927.6 (0.93839681) | Ask: 118431.1 (0.09478478)
2025-07-17T21:37:01.275832 - 📈 Predicted price will go: DOWN
2025-07-17T21:37:01.276777 - Bid: 120927.6 (0.93839681) | Ask: 118431.1 (0.09478478)
2025-07-17T21:37:01.280986 - 📈 Predicted price will go: DOWN
2025-07-17T21:37:01.281997 - Bid: 120927.6 (0.93839681) | Ask: 118431.1 (0.09478478)
2025-07-17T21:37:01.286157 - 📈 Predicted price will go: DOW

ERROR:websocket:error from callback <bound method KuCoinWebSocket.on_error of <__main__.KuCoinWebSocket object at 0x7b3da1157390>>: maximum recursion depth exceeded while calling a Python object
ERROR:websocket:error from callback <bound method KuCoinWebSocket.on_close of <__main__.KuCoinWebSocket object at 0x7b3da1157390>>: maximum recursion depth exceeded while calling a Python object
ERROR:websocket:error from callback <bound method KuCoinWebSocket.on_error of <__main__.KuCoinWebSocket object at 0x7b3da1157390>>: maximum recursion depth exceeded while calling a Python object
ERROR:websocket:error from callback <bound method KuCoinWebSocket.on_close of <__main__.KuCoinWebSocket object at 0x7b3da1157390>>: maximum recursion depth exceeded while calling a Python object
ERROR:websocket:error from callback <bound method KuCoinWebSocket.on_error of <__main__.KuCoinWebSocket object at 0x7b3da1157390>>: maximum recursion depth exceeded while calling a Python object
ERROR:websocket:error fro