In [24]:
#!/usr/bin/env python
import logging
import time
import os
from binance.um_futures import UMFutures
from binance.error import ClientError
from dotenv import load_dotenv
import pandas as pd

logging.basicConfig(level=logging.INFO)

# Read API
load_dotenv(dotenv_path="API.env")

True

In [25]:
api_key = os.getenv("API_KEY")
api_secret = os.getenv("API_SECRET")
if not api_key or not api_secret:
    logging.error("API Key and Secret must be set in the .env file")
    exit(1)

In [26]:
# build Futures Client
client = UMFutures(key=api_key, secret=api_secret, base_url="https://testnet.binancefuture.com")

# Parameter setting
symbol = "BTCUSDT"
quantity_ratio = 0.5  # Balance ratio
short_ma_period = 20
long_ma_period = 60
take_profit_ratio = 0.3   #  TP：0.3%
stop_loss_ratio = 0.3     #  SL：-0.3%

def get_balance():
    account_info = client.balance()
    for asset in account_info:
        if asset["asset"] == "USDT":
            return float(asset["balance"])
    return 0.0

def get_latest_price(symbol):
    price_info = client.ticker_price(symbol=symbol)
    return float(price_info["price"])

def get_ma_signal(symbol):
    kline = client.klines(symbol=symbol, interval="1m", limit=long_ma_period+1)
    df = pd.DataFrame(kline, columns=["timestamp","open","high","low","close","volume",
                                      "close_time","quote_asset_volume","number_of_trades",
                                      "taker_buy_base_asset_volume","taker_buy_quote_asset_volume","ignore"])
    df["close"] = df["close"].astype(float)
    df["short_ma"] = df["close"].rolling(window=short_ma_period).mean()
    df["long_ma"] = df["close"].rolling(window=long_ma_period).mean()

    if df["short_ma"].iloc[-2] < df["long_ma"].iloc[-2] and df["short_ma"].iloc[-1] > df["long_ma"].iloc[-1]:
        return "BUY"  # long
    elif df["short_ma"].iloc[-2] > df["long_ma"].iloc[-2] and df["short_ma"].iloc[-1] < df["long_ma"].iloc[-1]:
        return "SELL"  # short
    else:
        return "HOLD"

In [27]:
def place_order(side, quantity, entry_price):
    position_side = "LONG" if side == "BUY" else "SHORT"
    close_side = "SELL" if side == "BUY" else "BUY"

    # Main order
    order = client.new_order(
        symbol=symbol,
        side=side,
        type="MARKET",
        quantity=quantity,
        positionSide=position_side,
    )
    logging.info(f"ORDER PLACED: {order}")

    # Calulate TP/SL
    take_profit_price = entry_price * take_profit_ratio if side == "BUY" else entry_price * stop_loss_ratio
    stop_loss_price = entry_price * stop_loss_ratio if side == "BUY" else entry_price * take_profit_ratio

    # Setting TP
    client.new_order(
        symbol=symbol,
        side=close_side,
        type="TAKE_PROFIT_MARKET",
        stopPrice=round(take_profit_price, 2),
        closePosition=True,
        positionSide=position_side,
        workingType="CONTRACT_PRICE",
    )

    # Setting SL
    client.new_order(
        symbol=symbol,
        side=close_side,
        type="STOP_MARKET",
        stopPrice=round(stop_loss_price, 2),
        closePosition=True,
        positionSide=position_side,
        workingType="CONTRACT_PRICE",
    )

    logging.info(f"TP: {round(take_profit_price, 2)}, SL: {round(stop_loss_price, 2)}")

In [28]:
def calculate_profit(symbol):
    positions = client.position_information(symbol=symbol)
    for pos in positions:
        if float(pos["positionAmt"]) != 0:
            entry_price = float(pos["entryPrice"])
            mark_price = float(pos["markPrice"])
            side = "LONG" if float(pos["positionAmt"]) > 0 else "SHORT"
            quantity = abs(float(pos["positionAmt"]))
            pnl = float(pos["unRealizedProfit"])
            logging.info(f"SIDE: {side} ENTRY_PRICE: {entry_price} MARK_PRICE: {mark_price} PNL: {pnl}")
            return pnl, mark_price
    return 0, 0

In [29]:
def main():
    while True:
        try:
            signal = get_ma_signal(symbol)
            logging.info(f"SIGNAL: {signal}")

            if signal in ["BUY", "SELL"]:
                usdt_balance = get_balance()
                latest_price = get_latest_price(symbol)
                quantity = round((usdt_balance * quantity_ratio) / latest_price, 3)

                # Order at Mark price
                place_order(signal, quantity, latest_price)

                
                time.sleep(10)

                # Calculate PNL
                pnl, now_price = calculate_profit(symbol)
                final_balance = get_balance()

                logging.info(f"LATEST_PRICE: {latest_price} | NOW_PRICE: {now_price} | PNL: {pnl} | FINAL_BLANCE: {final_balance} USDT")

                break  
            else:
                logging.info("waiting for next signal...")
            time.sleep(10)

        except ClientError as e:
            logging.error(f"API Error: {e}")
            time.sleep(5)
        except Exception as ex:
            logging.error(f"Unexpected Error: {ex}")
            time.sleep(5)

if __name__ == "__main__":
    main()

INFO:root:目前訊號: HOLD
INFO:root:等待下一個交叉訊號...
INFO:root:目前訊號: HOLD
INFO:root:等待下一個交叉訊號...
INFO:root:目前訊號: HOLD
INFO:root:等待下一個交叉訊號...
INFO:root:目前訊號: HOLD
INFO:root:等待下一個交叉訊號...
INFO:root:目前訊號: HOLD
INFO:root:等待下一個交叉訊號...
INFO:root:目前訊號: HOLD
INFO:root:等待下一個交叉訊號...
INFO:root:目前訊號: HOLD
INFO:root:等待下一個交叉訊號...
INFO:root:目前訊號: HOLD
INFO:root:等待下一個交叉訊號...
INFO:root:目前訊號: HOLD
INFO:root:等待下一個交叉訊號...
INFO:root:目前訊號: HOLD
INFO:root:等待下一個交叉訊號...
INFO:root:目前訊號: HOLD
INFO:root:等待下一個交叉訊號...
INFO:root:目前訊號: HOLD
INFO:root:等待下一個交叉訊號...
INFO:root:目前訊號: HOLD
INFO:root:等待下一個交叉訊號...
INFO:root:目前訊號: HOLD
INFO:root:等待下一個交叉訊號...
INFO:root:目前訊號: HOLD
INFO:root:等待下一個交叉訊號...
INFO:root:目前訊號: HOLD
INFO:root:等待下一個交叉訊號...
INFO:root:目前訊號: HOLD
INFO:root:等待下一個交叉訊號...
INFO:root:目前訊號: HOLD
INFO:root:等待下一個交叉訊號...
INFO:root:目前訊號: HOLD
INFO:root:等待下一個交叉訊號...
INFO:root:目前訊號: HOLD
INFO:root:等待下一個交叉訊號...
INFO:root:目前訊號: HOLD
INFO:root:等待下一個交叉訊號...
INFO:root:目前訊號: HOLD
INFO:root:等待下一個交叉訊號...
INFO:root:目前訊號: HOLD
INFO:root:等

KeyboardInterrupt: 