In [None]:
%pip install eth-account==0.13.7 eth-abi==5.2.0 web3==7.11.0 requests==2.32.3 pandas numpy

In [None]:
import json
import math
import time
import hmac
import hashlib
from datetime import datetime
from typing import Any, Dict, List, Optional, Union, Protocol
from urllib.parse import urlencode


import numpy as np
import pandas as pd
import requests
from eth_abi import encode
from eth_account import Account
from eth_account.messages import encode_defunct
from web3 import Web3

# ==================
# Configuration
# ==================

class APIConfig:
    def __init__(
        self,
        base_url: str = "https://fapi.asterdex.com",
        timeout: int = 30,
        recv_window: int = 5000
    ):
        self.base_url = base_url
        self.timeout = timeout
        self.recv_window = recv_window


# ==================
# Exceptions
# ==================

class AsterAPIError(Exception):
    """Exception raised for API errors returned by Aster"""
    pass


class AsterNetworkError(Exception):
    """Exception raised for network-related errors"""
    pass


# ============================
# Authentication & Client
# ============================

class AuthProtocol(Protocol):
    """Protocol defining the interface for authentication classes"""
    def sign_params(self, params: Dict[str, Any]) -> Dict[str, Any]:
        """Sign request parameters"""
        ...


class AsterAuthHMAC:
    def __init__(self, api_key: str, api_secret_key: str):
        self.api_key = api_key
        self.api_secret = api_secret_key
    
    def sign_params(self, params: Dict[str, Any]) -> Dict[str, Any]:
        query_string = urlencode(params)
        signature = hmac.new(
            self.api_secret.encode('utf-8'),
            query_string.encode('utf-8'),
            hashlib.sha256
        ).hexdigest()
        params['signature'] = signature
        return params


class AsterAuthWeb3:
    def __init__(self, wallet_address: str, api_wallet_address: str, api_secret_key: str):
        """
        Initialize Aster authentication
        
        Args:
            wallet_address: Your main wallet address (the one you login with)
            api_key: The "API key" from platform (64 hex chars - will derive address from it)
            api_secret_key: The "API secret key" from platform (64 hex chars)
        """       

        self.user = wallet_address
        self.signer = api_wallet_address
        self.private_key = api_secret_key
    
    def _trim_dict(self, my_dict: Dict[str, Any]) -> Dict[str, str]:
        """Convert all parameter values to strings"""
        for key in my_dict:
            value = my_dict[key]
            if isinstance(value, list):
                new_value = []
                for item in value:
                    if isinstance(item, dict):
                        new_value.append(json.dumps(self._trim_dict(item)))
                    else:
                        new_value.append(str(item))
                my_dict[key] = json.dumps(new_value)
                continue
            if isinstance(value, dict):
                my_dict[key] = json.dumps(self._trim_dict(value))
                continue
            my_dict[key] = str(value)
        
        return my_dict
    
    def _generate_signature(self, params: Dict[str, Any], nonce: int) -> str:
        """
        Generate Web3 signature for Aster API
        
        Args:
            params: Request parameters
            nonce: Current timestamp in microseconds
            
        Returns:
            Hex signature string with 0x prefix
        """
        trimmed_params = self._trim_dict(params.copy())
        json_str = json.dumps(trimmed_params, sort_keys=True).replace(' ', '').replace('\'', '"')
        
        encoded = encode(
            ['string', 'address', 'address', 'uint256'],
            [json_str, self.user, self.signer, nonce]
        )
        
        keccak_hex = Web3.keccak(encoded).hex()
        
        signable_msg = encode_defunct(hexstr=keccak_hex)
        signed_message = Account.sign_message(signable_message=signable_msg, private_key=self.private_key)
        
        return '0x' + signed_message.signature.hex()
    
    def sign_params(self, params: Dict[str, Any]) -> Dict[str, Any]:
        """
        Add signature parameters to request
        
        Args:
            params: Request parameters
            
        Returns:
            Parameters with signature fields added
        """
        nonce = math.trunc(time.time() * 1000000)
        
        signature = self._generate_signature(params, nonce)
        
        params['nonce'] = nonce
        params['user'] = self.user
        params['signer'] = self.signer
        params['signature'] = signature
        
        return params
    
class AsterClient:
    def __init__(self, auth: AuthProtocol, config: APIConfig):
        self._auth = auth
        self._config = config
        self._headers = {}
    
    @staticmethod
    def _handle_response(response: requests.Response) -> Any:
        """Handle API response and raise errors if needed"""
        if response.status_code != 200:
            try:
                error_detail = response.json()
            except (json.JSONDecodeError, AttributeError):
                error_detail = {"status": response.status_code}
            raise AsterNetworkError(f"HTTP {response.status_code} error: {error_detail}")
        
        try:
            data = response.json()
            
            if isinstance(data, dict) and 'code' in data:
                code = data.get('code')
                if code and code < 0:
                    msg = data.get('msg', 'Unknown error')
                    raise AsterAPIError(f"Aster API error code {code}: {msg}")
            
            return data
        except json.JSONDecodeError:
            raise AsterNetworkError(f"Invalid JSON response: {response.text}")
    
    def _prepare_params(self, params: Optional[Dict[str, Any]]) -> Dict[str, Any]:
        """Prepare and sign parameters for request"""
        if params is None:
            params = {}
        
        params['timestamp'] = int(time.time() * 1000)
        params['recvWindow'] = self._config.recv_window
        params = {k: v for k, v in params.items() if v is not None}
        
        return self._auth.sign_params(params)
    
    def _request(self, method: str, endpoint: str, **kwargs) -> Any:
        """Execute HTTP request with error handling"""
        url = f"{self._config.base_url}{endpoint}"
        
        try:
            if self._headers:
                kwargs.setdefault("headers", {}).update(self._headers)
            response = requests.request(
                method=method,
                url=url,
                timeout=self._config.timeout,
                **kwargs
            )
            return self._handle_response(response)
        except requests.exceptions.RequestException as e:
            raise AsterNetworkError(f"Request failed: {e}")
    
    def get(self, endpoint: str, params: Optional[Dict[str, Any]] = None) -> Any:
        """Execute GET request"""
        signed_params = self._prepare_params(params)
        return self._request('GET', endpoint, params=signed_params)
    
    def post(self, endpoint: str, data: Optional[Dict[str, Any]] = None) -> Any:
        """Execute POST request"""
        signed_data = self._prepare_params(data)
        headers = {
            'Content-Type': 'application/x-www-form-urlencoded',
            'User-Agent': 'PythonApp/1.0'
        }
        return self._request('POST', endpoint, data=signed_data, headers=headers)
    
    def delete(self, endpoint: str, data: Optional[Dict[str, Any]] = None) -> Any:
        """Execute DELETE request"""
        signed_data = self._prepare_params(data)
        return self._request('DELETE', endpoint, data=signed_data)

# ==================
# Futures API
# ==================

class AsterFutures:
    """Aster Finance Futures API Client"""
    
    
    @classmethod
    def with_hmac_auth(cls, api_key: str, api_secret_key: str, config: Optional[APIConfig] = None):
        """Use HMAC auth (for email account API keys)"""
        if config is None:
            config = APIConfig()
        instance = cls.__new__(cls)
        instance._config = config
        instance._auth = AsterAuthHMAC(api_key, api_secret_key)
        instance._client = AsterClient(instance._auth, instance._config)
        instance._client._headers = {"X-MBX-APIKEY": api_key}
        instance._exchange_info = None
        instance._symbols_info = None
        instance.API_VERSION = "/fapi/v1"
        return instance
    
    @classmethod
    def with_web3_auth(cls, wallet_address: str, api_wallet_address: str, api_secret_key: str, config: Optional[APIConfig] = None):
        """Use Web3 auth (for API wallet)"""
        if config is None:
            config = APIConfig()
        instance = cls.__new__(cls)
        instance._config = config
        instance._auth = AsterAuthWeb3(wallet_address, api_wallet_address, api_secret_key)
        instance._client = AsterClient(instance._auth, instance._config)
        instance._exchange_info = None
        instance._symbols_info = None
        instance.API_VERSION = "/fapi/v3"
        return instance
    
    API_VERSION = "/fapi/v3"
    
    def __init__(
        self,
        wallet_address: str,
        api_key: str,
        api_secret_key: str,
        config: Optional[APIConfig] = None
    ):
        """
        Initialize Aster Futures client
        
        Args:
            wallet_address: Your main wallet address (the one you login with)
            api_key: The "API key" shown on the platform
            api_secret_key: The "API secret key" shown on the platform
            config: Optional API configuration
        """
        if config is None:
            config = APIConfig(base_url="https://fapi.asterdex.com")
        
        self._config = config
        self._auth = AsterAuthWeb3(wallet_address, api_key, api_secret_key)
        self._client = AsterClient(self._auth, self._config)
        self._exchange_info: Optional[Dict[str, Any]] = None
        self._symbols_info: Optional[Dict[str, Dict[str, Any]]] = None
    
    # ==================
    # Helper Methods
    # ==================
    
    def _ensure_exchange_info(self) -> None:
        """Load exchange info once and cache filters as dict"""
        if self._exchange_info is None:
            self._exchange_info = self.get_exchange_info()
            self._symbols_info = {}
            
            for sym in self._exchange_info['symbols']:
                filters_dict = {f['filterType']: f for f in sym['filters']}
                self._symbols_info[sym['symbol']] = filters_dict
    
    @staticmethod
    def _apply_precision(value: float, precision: int, rounding_mode: str) -> str:
        """Apply precision like Bitunix example"""
        multiplier = 10 ** precision
        scaled = value * multiplier
        scaled = int(scaled) if rounding_mode == "TRUNCATE" else round(scaled)
        return f"{scaled / multiplier:g}"
    
    def _qty_to_precision(self, symbol: str, quantity: float) -> str:
        """Format quantity to LOT_SIZE precision"""
        self._ensure_exchange_info()
        
        if symbol not in self._symbols_info:
            raise ValueError(f"Symbol {symbol} not found")
        
        lot_size = self._symbols_info[symbol]['LOT_SIZE']
        min_qty = float(lot_size['minQty'])
        step_size = float(lot_size['stepSize'])
        
        if quantity < min_qty:
            raise ValueError(f"Quantity {quantity} below minimum {min_qty}")
        
        precision = len(str(step_size).rstrip('0').split('.')[-1]) if step_size < 1 else 0
        return self._apply_precision(quantity, precision, "TRUNCATE")
    
    def _price_to_precision(self, symbol: str, price: float) -> str:
        """Format price to PRICE_FILTER precision"""
        self._ensure_exchange_info()
        
        if symbol not in self._symbols_info:
            raise ValueError(f"Symbol {symbol} not found")
        
        price_filter = self._symbols_info[symbol]['PRICE_FILTER']
        tick_size = float(price_filter['tickSize'])
        
        precision = len(str(tick_size).rstrip('0').split('.')[-1]) if tick_size < 1 else 0
        return self._apply_precision(price, precision, "ROUND")
    
    # ==================
    # Account Methods
    # ==================
    
    def get_balance(self) -> List[Dict[str, Any]]:
        """
        Get futures account balance
        
        Returns:
            List of asset balances
        """
        endpoint = f"{self.API_VERSION}/balance"
        return self._client.get(endpoint)
    
    def get_coin_balance(self, coin: str) -> float:
        """
        Get balance for a specific asset
        
        Args:
            coin: Asset symbol (e.g., 'USDT', 'BTC', 'ETH')
            
        Returns:
            Balance amount as float, or 0.0 if asset not found
        """
        balances = self.get_balance()
        coin_upper = coin.upper()
        
        coin_data = next((b for b in balances if b['asset'] == coin_upper), None)
        return float(coin_data['balance']) if coin_data else 0.0

    def get_exchange_info(self) -> Dict[str, Any]:
        """
        Get exchange information including trading rules and symbol info
        
        Returns:
            Exchange information dictionary
        """
        endpoint = "/fapi/v1/exchangeInfo"
        return self._client.get(endpoint, {})
    
    def set_leverage(self, symbol: str, leverage: int) -> Dict[str, Any]:
        """
        Change initial leverage for a symbol
        
        Args:
            symbol: Trading pair symbol
            leverage: Target leverage (1-125)
            
        Returns:
            Response data
        """
        endpoint = f"{self.API_VERSION}/leverage"
        data = {
            "symbol": symbol,
            "leverage": leverage
        }
        return self._client.post(endpoint, data)
    
    def set_margin_type(self, symbol: str, margin_type: str) -> Dict[str, Any]:
        """
        Change margin type for a symbol
        
        Args:
            symbol: Trading pair symbol
            margin_type: "ISOLATED" or "CROSSED"
            
        Returns:
            Response data
        """
        if margin_type.upper() not in ['ISOLATED', 'CROSSED']:
            raise ValueError("margin_type must be 'ISOLATED' or 'CROSSED'")
        
        endpoint = f"{self.API_VERSION}/marginType"
        data = {
            "symbol": symbol,
            "marginType": margin_type.upper()
        }
        return self._client.post(endpoint, data)
    
    # ==================
    # Trading Methods
    # ==================
    
    def place_order(
        self,
        symbol: str,
        side: str,
        order_type: str,
        quantity: Optional[float] = None,
        price: Optional[float] = None,
        position_side: str = "BOTH",
        time_in_force: Optional[str] = None,
        reduce_only: bool = False,
        close_position: bool = False,
        stop_price: Optional[float] = None,
        activation_price: Optional[float] = None,
        callback_rate: Optional[float] = None,
        working_type: Optional[str] = None,
        client_order_id: Optional[str] = None
    ) -> Dict[str, Any]:
        """
        Place a new order
        
        Args:
            symbol: Trading pair symbol
            side: "BUY" or "SELL"
            order_type: "LIMIT", "MARKET", "STOP", "TAKE_PROFIT", "TRAILING_STOP_MARKET", etc.
            quantity: Order quantity
            price: Order price (required for LIMIT orders)
            position_side: "BOTH", "LONG", or "SHORT"
            time_in_force: "GTC", "IOC", "FOK", "GTX"
            reduce_only: True to reduce position only
            close_position: True to close position
            stop_price: Stop price for stop orders
            activation_price: Activation price for TRAILING_STOP_MARKET orders
            callback_rate: Callback rate for TRAILING_STOP_MARKET orders (min 0.1, max 5, where 1 = 1%)
            working_type: "MARK_PRICE" or "CONTRACT_PRICE" (default "CONTRACT_PRICE")
            client_order_id: Custom order ID
            
        Returns:
            Order data
        """
        endpoint = f"{self.API_VERSION}/order"
        
        data = {
            "symbol": symbol,
            "side": side.upper(),
            "type": order_type.upper(),
            "positionSide": position_side.upper()
        }
        
        if quantity is not None:
            data['quantity'] = self._qty_to_precision(symbol, quantity)
        if price is not None:
            data['price'] = self._price_to_precision(symbol, price)
        if time_in_force:
            data['timeInForce'] = time_in_force.upper()
        elif price is not None:  
            data['timeInForce'] = 'GTC' 
        if reduce_only:
            data['reduceOnly'] = "true"
        if close_position:
            data['closePosition'] = "true"
        if stop_price is not None:
            data['stopPrice'] = self._price_to_precision(symbol, stop_price)
        if activation_price is not None:
            data['activationPrice'] = self._price_to_precision(symbol, activation_price)
        if callback_rate is not None:
            data['callbackRate'] = callback_rate
        if working_type:
            data['workingType'] = working_type.upper()
        if client_order_id:
            data['newClientOrderId'] = client_order_id
        
        return self._client.post(endpoint, data)
    
    def place_batch_orders(
        self,
        orders: List[Dict[str, Any]]
    ) -> List[Dict[str, Any]]:
        """
        Place multiple orders in a single request (max 5 orders)

        Args:
            orders: List of order dictionaries with same parameters as place_order

        Returns:
            List of order responses (successful orders and errors)
        """
        if len(orders) > 5:
            raise ValueError("Maximum 5 orders allowed in batch")

        endpoint = f"{self.API_VERSION}/batchOrders"

        formatted_orders = []
        for order in orders:
            formatted_order = {
                "symbol": order["symbol"],
                "side": order["side"].upper(),
                "type": order["type"].upper(),
                "positionSide": order.get("positionSide", "BOTH").upper()
            }

            if "quantity" in order and order["quantity"] is not None:
                formatted_order["quantity"] = self._qty_to_precision(order["symbol"], float(order["quantity"]))
            if "price" in order and order["price"] is not None:
                formatted_order["price"] = self._price_to_precision(order["symbol"], order["price"])
            if "timeInForce" in order and order["timeInForce"] is not None:
                formatted_order["timeInForce"] = order["timeInForce"].upper()
            if order.get("reduceOnly"):
                formatted_order["reduceOnly"] = "true"
            if order.get("closePosition"):
                formatted_order["closePosition"] = "true"
            if "stopPrice" in order and order["stopPrice"] is not None:
                formatted_order["stopPrice"] = self._price_to_precision(order["symbol"], order["stopPrice"])
            if "activationPrice" in order and order["activationPrice"] is not None:
                formatted_order["activationPrice"] = self._price_to_precision(order["symbol"], order["activationPrice"])
            if "callbackRate" in order and order["callbackRate"] is not None:
                formatted_order["callbackRate"] = str(order["callbackRate"])
            if "workingType" in order:
                formatted_order["workingType"] = order["workingType"].upper()
            if "newClientOrderId" in order:
                formatted_order["newClientOrderId"] = order["newClientOrderId"]

            formatted_orders.append(formatted_order)

        data = {"batchOrders": json.dumps(formatted_orders)}
        return self._client.post(endpoint, data)

    def place_order_with_tp_sl(
        self,
        symbol: str,
        side: str,
        quantity: float,
        entry_price: Optional[float] = None,
        stop_loss_price: Optional[float] = None,
        take_profit_price: Optional[float] = None,
        position_side: str = "BOTH",
        time_in_force: Optional[str] = None,
        working_type: str = "CONTRACT_PRICE"
    ) -> Dict[str, Any]:
        """
        Place an order with optional stop loss and take profit in one batch request

        Args:
            symbol: Trading pair symbol
            side: "BUY" or "SELL"
            quantity: Order quantity
            entry_price: Entry price (if None, uses MARKET order, otherwise LIMIT)
            stop_loss_price: Stop loss price (optional)
            take_profit_price: Take profit price (optional)
            position_side: "BOTH", "LONG", or "SHORT"
            time_in_force: "GTC", "IOC", "FOK", "GTX" (for LIMIT orders, defaults to "GTC")
            working_type: "MARK_PRICE" or "CONTRACT_PRICE"

        Returns:
            Dictionary with 'entry', 'stop_loss', and 'take_profit' order responses
        """
        orders = []

        entry_order = {
            "symbol": symbol,
            "side": side,
            "positionSide": position_side,
            "quantity": quantity
        }

        if entry_price is None:
            entry_order["type"] = "MARKET"
        else:
            entry_order["type"] = "LIMIT"
            entry_order["price"] = entry_price
            entry_order["timeInForce"] = time_in_force if time_in_force is not None else "GTC"

        orders.append(entry_order)

        if stop_loss_price is not None:
            sl_side = "SELL" if side == "BUY" else "BUY"
            sl_order = {
                "symbol": symbol,
                "side": sl_side,
                "type": "STOP_MARKET",
                "positionSide": position_side,
                "stopPrice": stop_loss_price,
                "closePosition": True,
                "workingType": working_type
            }
            orders.append(sl_order)

        if take_profit_price is not None:
            tp_side = "SELL" if side == "BUY" else "BUY"
            tp_order = {
                "symbol": symbol,
                "side": tp_side,
                "type": "TAKE_PROFIT_MARKET",
                "positionSide": position_side,
                "stopPrice": take_profit_price,
                "closePosition": True,
                "workingType": working_type
            }
            orders.append(tp_order)

        results = self.place_batch_orders(orders)

        response = {}
        if len(results) >= 1:
            response["entry"] = results[0]
        if stop_loss_price is not None and len(results) >= 2:
            response["stop_loss"] = results[1]
        if take_profit_price is not None:
            if stop_loss_price is not None and len(results) >= 3:
                response["take_profit"] = results[2]
            elif stop_loss_price is None and len(results) >= 2:
                response["take_profit"] = results[1]

        return response

    def place_order_with_tsl(
        self,
        symbol: str,
        side: str,
        quantity: float,
        callback_rate: float,
        entry_price: Optional[float] = None,
        activation_price: Optional[float] = None,
        position_side: str = "BOTH",
        time_in_force: str = "GTC"
    ) -> Dict[str, Any]:
        """
        Place an order with trailing stop loss in one batch request

        Args:
            symbol: Trading pair symbol
            side: "BUY" or "SELL"
            quantity: Order quantity
            callback_rate: Callback rate for trailing stop (min 0.1, max 5, where 1 = 1%)
            entry_price: Entry price (if None, uses MARKET order, otherwise LIMIT)
            activation_price: Price to activate trailing stop (optional, defaults to entry price)
            position_side: "BOTH", "LONG", or "SHORT"
            time_in_force: "GTC", "IOC", "FOK", "GTX" (for LIMIT orders)

        Returns:
            Dictionary with 'entry' and 'trailing_stop' order responses
        """
        orders = []

        entry_order = {
            "symbol": symbol,
            "side": side,
            "positionSide": position_side,
            "quantity": quantity
        }

        if entry_price is None:
            entry_order["type"] = "MARKET"
        else:
            entry_order["type"] = "LIMIT"
            entry_order["price"] = entry_price
            entry_order["timeInForce"] = time_in_force

        orders.append(entry_order)

        tsl_side = "SELL" if side == "BUY" else "BUY"
        tsl_order = {
            "symbol": symbol,
            "side": tsl_side,
            "type": "TRAILING_STOP_MARKET",
            "positionSide": position_side,
            "callbackRate": callback_rate,
            "quantity": self._qty_to_precision(symbol, quantity),
            "reduceOnly": True
        }

        if activation_price is not None:
            tsl_order["activationPrice"] = activation_price

        orders.append(tsl_order)

        results = self.place_batch_orders(orders)

        response = {
            "entry": results[0] if len(results) >= 1 else None,
            "trailing_stop": results[1] if len(results) >= 2 else None
        }

        return response

    def cancel_order(
        self,
        symbol: str,
        order_id: Optional[int] = None,
        client_order_id: Optional[str] = None
    ) -> Dict[str, Any]:
        """
        Cancel an order
        
        Args:
            symbol: Trading pair symbol
            order_id: Order ID
            client_order_id: Client order ID
            
        Returns:
            Cancellation data
        """
        if not order_id and not client_order_id:
            raise ValueError("Either order_id or client_order_id must be provided")
        
        endpoint = f"{self.API_VERSION}/order"
        data = {"symbol": symbol}
        
        if order_id:
            data['orderId'] = order_id
        if client_order_id:
            data['origClientOrderId'] = client_order_id
        
        return self._client.delete(endpoint, data)
      
    def cancel_all_orders(
        self,
        symbol: str
    ) -> Dict[str, Any]:
        """
        Cancel all open orders for a symbol using the official API endpoint

        Args:
            symbol: Trading pair symbol

        Returns:
            Response with cancellation status
        """
        endpoint = f"{self.API_VERSION}/allOpenOrders"
        data = {"symbol": symbol}
        return self._client.delete(endpoint, data)


    def get_order(
        self,
        symbol: str,
        order_id: Optional[int] = None,
        client_order_id: Optional[str] = None
    ) -> Dict[str, Any]:
        """
        Query order
        
        Args:
            symbol: Trading pair symbol
            order_id: Order ID
            client_order_id: Client order ID
            
        Returns:
            Order data
        """
        if not order_id and not client_order_id:
            raise ValueError("Either order_id or client_order_id must be provided")
        
        endpoint = f"{self.API_VERSION}/order"
        params = {"symbol": symbol}
        
        if order_id:
            params['orderId'] = order_id
        if client_order_id:
            params['origClientOrderId'] = client_order_id
        
        return self._client.get(endpoint, params)
    
    def get_open_orders(self, symbol: Optional[str] = None) -> List[Dict[str, Any]]:
        """
        Get all open orders
        
        Args:
            symbol: Optional trading pair symbol
            
        Returns:
            List of open orders
        """
        endpoint = f"{self.API_VERSION}/openOrders"
        params = {}
        if symbol:
            params['symbol'] = symbol
        return self._client.get(endpoint, params)
    
    def get_position_info(self, symbol: Optional[str] = None) -> List[Dict[str, Any]]:
        """
        Get position information
        
        Args:
            symbol: Optional trading pair symbol
            
        Returns:
            List of position data
        """
        endpoint = f"{self.API_VERSION}/positionRisk"
        params = {}
        if symbol:
            params['symbol'] = symbol
        return self._client.get(endpoint, params)

    # ==================
    # Market Data Methods
    # ==================
    
    @staticmethod
    def _convert_klines_to_dataframe(raw_data: List[List[Any]]) -> pd.DataFrame:
        """Convert raw klines to DataFrame"""
        df = pd.DataFrame(raw_data, columns=[
            'time', 'open', 'high', 'low', 'close', 'volume',
            'close_time', 'quote_volume', 'trades', 
            'taker_buy_base_vol', 'taker_buy_quote_vol', 'ignore'
        ])
        
        df['datetime'] = pd.to_datetime(df['time'].astype(np.float64), unit='ms')
        df = df.set_index('datetime').sort_index(ascending=True)
        
        numeric_cols = ['open', 'high', 'low', 'close', 'volume', 'quote_volume']
        df[numeric_cols] = df[numeric_cols].astype(float)
        
        return df.drop(['time', 'close_time', 'ignore'], axis=1)
    
    @staticmethod
    def _to_milliseconds(dt: Union[datetime, int, None]) -> Optional[int]:
        """Convert datetime to milliseconds timestamp"""
        if dt is None:
            return None
        if isinstance(dt, int):
            return dt
        if isinstance(dt, datetime):
            return int(dt.timestamp() * 1000)
        raise ValueError(f"Invalid time format: {type(dt)}")
    
    def get_klines(
        self,
        symbol: str,
        interval: str,
        start_time: Union[datetime, int, None] = None,
        end_time: Union[datetime, int, None] = None,
        limit: int = 1500
    ) -> pd.DataFrame:
        """
        Get kline/candlestick data (OHLCV) as DataFrame
        
        Args:
            symbol: Trading pair symbol
            interval: Kline interval - 1m, 3m, 5m, 15m, 30m, 1h, 2h, 4h, 6h, 8h, 12h, 1d, 3d, 1w, 1M
            start_time: Start time as datetime object, or milliseconds timestamp
            end_time: End time as datetime object, or milliseconds timestamp
            limit: Number of results (default 1500, max 1500)
            
        Returns:
            DataFrame with datetime index and columns: open, high, low, close, volume, trades, etc.
        """
        endpoint = "/fapi/v1/klines"
        params = {
            "symbol": symbol,
            "interval": interval,
            "limit": limit
        }
        
        start_ms = self._to_milliseconds(start_time)
        end_ms = self._to_milliseconds(end_time)
        
        if start_ms:
            params['startTime'] = start_ms
        if end_ms:
            params['endTime'] = end_ms
        
        raw_data = self._client.get(endpoint, params)
        return self._convert_klines_to_dataframe(raw_data)
        
    def get_ticker_price(self, symbol: Optional[str] = None) -> float:
        """
        Get latest price for symbol(s)
        
        Args:
            symbol: Optional trading pair symbol
            
        Returns:
            Price data (dict if symbol provided, list otherwise)
        """
        endpoint = f"{self.API_VERSION}/ticker/price"
        params = {}
        if symbol:
            params['symbol'] = symbol
        return float(self._client.get(endpoint, params)['price'])

## Authentication Methods

In [None]:
### Web3 Authentication (API Wallet) 

# aster = AsterFutures.with_web3_auth(
#     wallet_address="",
#     api_wallet_address="",
#     api_secret_key=""
# )

In [None]:
### HMAC Authentication (Email Account)

aster = AsterFutures.with_hmac_auth(
    api_key="",
    api_secret_key=""
)

In [None]:
balance = aster.get_balance()
balance

In [None]:
usdt_balance = aster.get_coin_balance('USDT')
usdt_balance

#### Set Leverage


In [None]:
symbol = "ETHUSDT"
leverage = 1

aster.set_leverage(symbol=symbol, leverage=leverage)


#### Set Margin Type


In [None]:
### /!\ must be in single asset mode to be able to change margin mode

symbol = "ETHUSDT" 
margin_type = "ISOLATED" 
# margin_type = "CROSSED" 

aster.set_margin_type(symbol=symbol, margin_type=margin_type)


#### Get current price

In [None]:
symbol = "ETHUSDT"
current_price = aster.get_ticker_price(symbol=symbol)
current_price

#### Market order with TP / SL

In [None]:
symbol = "ETHUSDT"
side = "BUY"
cost = 20

current_price = aster.get_ticker_price(symbol=symbol)
qty = cost / current_price

stop_loss_price = current_price * 0.95  # 5% stop loss
take_profit_price = current_price * 1.10  # 10% take profit

aster.place_order_with_tp_sl(
    symbol=symbol,
    side=side,
    quantity=qty,
    stop_loss_price=stop_loss_price,
    take_profit_price=take_profit_price
)

In [None]:
symbol = "ETHUSDT"
aster.get_position_info(symbol=symbol)

#### Close/Reduce position

In [None]:
symbol = "ETHUSDT"
side = "SELL"
order_type = "MARKET"

position_info = aster.get_position_info(symbol=symbol)
# qty = abs(float(position_info[0]['positionAmt'])) / 2
qty = abs(float(position_info[0]['positionAmt']))

order = aster.place_order(
    symbol=symbol,
    side=side,
    order_type=order_type,
    quantity=qty,
    reduce_only=True # /!\
)
order

#### Cancel all orders

In [None]:
symbol = "ETHUSDT"

aster.cancel_all_orders(symbol=symbol)

#### Batch Orders Example

In [None]:
symbol = "ETHUSDT"

cost = 20
current_price = aster.get_ticker_price(symbol=symbol)
entry_qty = cost / current_price
partial_tp_qty = entry_qty / 2

orders = [
    {
        "symbol": symbol,
        "side": "BUY",
        "type": "MARKET",
        "quantity": entry_qty,
    },
    {
        "symbol": symbol,
        "side": "SELL",
        "type": "STOP_MARKET",
        "stopPrice": 3700,
        "closePosition": True,
    },
    {
        "symbol": symbol,
        "side": "SELL",
        "type": "TAKE_PROFIT_MARKET",
        "stopPrice": 4300,
        "quantity": partial_tp_qty, # /!\
    },
    {
        "symbol": symbol,
        "side": "SELL",
        "type": "TAKE_PROFIT_MARKET",
        "stopPrice": 4600,
        "closePosition": True, # /!\
    }
]

result = aster.place_batch_orders(orders)
result

#### Market order with Trailing Stop Loss

In [None]:
symbol = "ETHUSDT"
side = "SELL" 
cost = 20
current_price = aster.get_ticker_price(symbol=symbol)
qty = cost / current_price

callback_rate = 1.0  # 1% trailing stop
activation_price = None # current_price * 0.95 # Start trailing when 5% profit

result = aster.place_order_with_tsl(
    symbol=symbol,
    side=side,
    quantity=qty,
    callback_rate=callback_rate,
    activation_price=activation_price
)
result


#### Limit order

In [None]:
symbol = "ETHUSDT"
side = "BUY"
entry_price = 3800
cost = 20

current_price = aster.get_ticker_price(symbol=symbol)
qty = cost / current_price

stop_loss_price = 3600
take_profit_price = 4500

order = aster.place_order_with_tp_sl(
    symbol=symbol,
    side=side,
    quantity=qty,
    entry_price=entry_price, # /!\
    stop_loss_price=stop_loss_price,
    take_profit_price=take_profit_price,
)
order


In [None]:
order = aster.get_open_orders("ETHUSDT")
order

In [None]:
symbol = "ETHUSDT"
order_id = order[2]["orderId"]

aster.cancel_order(order_id=order_id, symbol=symbol)

#### Get OHLCV data

In [None]:
symbol = "ETHUSDT"
interval = "1h"
start_time = datetime(year=2024, month=10, day=1, hour=0, minute=0)
end_time = datetime(year=2024, month=10, day=7, hour=23, minute=59)

ohlcv = aster.get_klines(
    symbol=symbol,
    interval=interval,
    start_time=start_time,
    end_time=end_time
)
ohlcv