In [None]:
import websocket
import pandas as pd
import numpy as np
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
import json
import threading
import time
from datetime import datetime, timedelta
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from IPython.display import clear_output
import plotly.io as pio

# Initialize DataFrames
merged_df = pd.DataFrame(columns=['volume_true', 'volume_false', 'vdiff','liq_volume'])
merged_df_all = pd.DataFrame(columns=['open', 'high', 'low', 'close', 'volume_true', 'volume_false', 'vdiff', 'liq_volume'])
df_agg = pd.DataFrame(columns=["symbol", "price", "quantity", "is_buyer_market_maker"])
df_kline = pd.DataFrame(columns=["symbol", "open", "high", "low", "close", "volume", 'is_closed'])
df_liquid = pd.DataFrame(columns=['symbol','original_quantity', 'price'])
#df_liquidation = pd.DataFrame(columns=['symbol','original_quantity', 'price', 'original_volume'])

asset = 'btcusdt'
interval = '1'

# WebSocket URLs
agg_socket_url = f"wss://fstream.binance.com/ws/{asset}@aggTrade"
kline_socket_url = f"wss://fstream.binance.com/ws/{asset}@kline_{interval}m"
liq_socket_url = "wss://fstream.binance.com/ws/!forceOrder@arr"

# Save interval in seconds
save_interval = 1

# def calculate_bollinger_bands(data, columns, window=20, num_std=2):
#     for column in columns:
#         rolling_mean = data[column].rolling(window=window).mean()
#         rolling_std = data[column].rolling(window=window).std()
#         data['Bollinger_Band_Upper_' + column] = rolling_mean + (rolling_std * num_std)
#         data['Bollinger_Band_Lower_' + column] = rolling_mean - (rolling_std * num_std)
#     return data

def save_and_clear_df():
    global merged_df_all, merged_df, df_agg, df_kline, df_liquid
    while True:
        time.sleep(save_interval)
        current_time = pd.Timestamp.now().strftime("%Y-%m-%d %H:%M:%S")

        completed_klines = df_kline[df_kline['is_closed']]

        if not completed_klines.empty:
            if df_liquid.empty:
                df_liquidation = pd.DataFrame(columns=['symbol', 'original_quantity', 'price', 'original_volume'],
                                              index=pd.DatetimeIndex([pd.Timestamp.now() - pd.Timedelta(seconds=10)]))
                df_liquidation['price'] = 0
                df_liquidation['original_quantity'] = 0
                df_liquidation['original_volume'] = 0
            else:
                df_liquidation = df_liquid.copy(deep=True)
                df_liquidation['original_volume'] = df_liquidation['price'] * df_liquidation['original_quantity']
                df_liquidation = df_liquidation.loc[df_liquid['symbol'] == 'BTCUSDT']

            df_liquid_resampled = df_liquidation['original_volume'].resample('1T').sum()

            df_aggregation = df_agg.copy(deep=True)
            df_aggregation['Volume'] = df_aggregation['price'] * df_aggregation['quantity']
            volume_true = df_aggregation.loc[df_aggregation['is_buyer_market_maker'] == True]['Volume'].sum()
            volume_false = df_aggregation.loc[df_aggregation['is_buyer_market_maker'] == False]['Volume'].sum()
            vdiff = volume_false - volume_true

            merged_df = completed_klines[['open', 'high', 'low', 'close']].copy()
            merged_df['volume_true'] = volume_true
            merged_df['volume_false'] = volume_false
            merged_df['vdiff'] = vdiff

            if not df_liquid_resampled.empty:
                merged_df['liq_volume'] = df_liquid_resampled
            else:
                merged_df['liq_volume'] = 0

            merged_df['liq_volume'].fillna(0, inplace=True)
            merged_df_all = pd.concat([merged_df_all, merged_df])
            merged_df_all = merged_df_all.tail(1000)

            columns_to_calculate = ['volume_true', 'volume_false', 'liq_volume']
            #merged_df_all = calculate_bollinger_bands(merged_df_all, columns_to_calculate)

            # for column in columns_to_calculate:
            #     merged_df_all[f'signal_bollinger_{column}'] = 0
            #     merged_df_all.loc[merged_df_all[column] > merged_df_all['Bollinger_Band_Upper_' + column], f'signal_bollinger_{column}'] = 1
            #     merged_df_all.loc[merged_df_all[column] < merged_df_all['Bollinger_Band_Lower_' + column], f'signal_bollinger_{column}'] = -1

            merged_df_all['long_position'] = ((merged_df_all['volume_true'] >= 20000000) & (merged_df_all['volume_false'] <= merged_df_all['volume_true']))
            merged_df_all['short_position'] = ((merged_df_all['volume_false'] >= 20000000) & (merged_df_all['volume_false'] >= merged_df_all['volume_true']))

            merged_df_all['long_position_1'] = ((merged_df_all['vdiff'] <=-20000000))
            merged_df_all['short_position_1'] = ((merged_df_all['vdiff'] >=20000000))
            
            merged_df_all['liqudation_long'] = (merged_df_all['liq_volume'] >= 150000) & (merged_df_all['vdiff'] <= 0)
            merged_df_all['liqudation_short'] = (merged_df_all['liq_volume'] >= 150000) & (merged_df_all['vdiff'] >= 0)
            
            # merged_df_all['long_position'] = (merged_df_all['signal_bollinger_volume_false'] == 1) & (merged_df_all['vdiff'] <= 0)
            # merged_df_all['short_position'] = (merged_df_all['signal_bollinger_volume_true'] == 1) & (merged_df_all['vdiff'] >= 0)
            # merged_df_all['liqudation_long'] = (merged_df_all['signal_bollinger_liq_volume'] == 1) & (merged_df_all['vdiff'] <= 0)
            # merged_df_all['liqudation_short'] = (merged_df_all['signal_bollinger_liq_volume'] == 1) & (merged_df_all['vdiff'] >= 0)
            
            #### TELE #####
            latest_row = merged_df_all.iloc[-1]
            if latest_row['long_position']:
                # Send Telegram message for long position
                telegram_msg(f"Time: {latest_row.name}\n Close: {latest_row['close']}\n Volume True: {latest_row['volume_true']}\n Volume False: {latest_row['volume_false']}\n VDiff: {latest_row['vdiff']}\n Liq Volume: {latest_row['liq_volume']}\n - Aggregation Long Position")
            if latest_row['short_position']:
                # Send Telegram message for short position
                telegram_msg(f"Time: {latest_row.name}\n Close: {latest_row['close']}\n Volume True: {latest_row['volume_true']}\n Volume False: {latest_row['volume_false']}\n VDiff: {latest_row['vdiff']}\n Liq Volume: {latest_row['liq_volume']}\n - Aggregation Short Position")
            if latest_row['long_position_1']:
                # Send Telegram message for long position
                telegram_msg(f"Time: {latest_row.name}\n Close: {latest_row['close']}\n Volume True: {latest_row['volume_true']}\n Volume False: {latest_row['volume_false']}\n VDiff: {latest_row['vdiff']}\n Liq Volume: {latest_row['liq_volume']}\n - Aggregation Long Position_1")
            if latest_row['short_position_1']:
                # Send Telegram message for short position
                telegram_msg(f"Time: {latest_row.name}\n Close: {latest_row['close']}\n Volume True: {latest_row['volume_true']}\n Volume False: {latest_row['volume_false']}\n VDiff: {latest_row['vdiff']}\n Liq Volume: {latest_row['liq_volume']}\n - Aggregation Short Position_1")
            if latest_row['liqudation_long']:
                # Send Telegram message for liquidation long
                telegram_msg(f"Time: {latest_row.name}\n Close: {latest_row['close']}\n Volume True: {latest_row['volume_true']}\n Volume False: {latest_row['volume_false']}\n VDiff: {latest_row['vdiff']}\n Liq Volume: {latest_row['liq_volume']}\n - Liquidation Long Position")
            if latest_row['liqudation_short']:
                # Send Telegram message for liquidation short
                telegram_msg(f"Time: {latest_row.name}\n Close: {latest_row['close']}\n Volume True: {latest_row['volume_true']}\n Volume False: {latest_row['volume_false']}\n VDiff: {latest_row['vdiff']}\n Liq Volume: {latest_row['liq_volume']}\n - Liquidation Short Position")

            ###### PLOT THE DATAFRAME ######
            plot_dataframe(merged_df_all, plot_height=1000, plot_width=2400)

            df_agg.drop(df_agg.index, inplace=True)
            df_kline.drop(completed_klines.index, inplace=True)
            df_liquid.drop(df_liquid.index, inplace=True)


def create_retry_session(retries, backoff_factor, status_forcelist):
    session = requests.Session()
    retry = Retry(
        total=retries,
        read=retries,
        connect=retries,
        backoff_factor=backoff_factor,
        status_forcelist=status_forcelist
    )
    adapter = HTTPAdapter(max_retries=retry)
    session.mount('https://', adapter)
    return session

def telegram_msg(message):
    TOKEN = '7407830307:AAGytcPx85HtpNCMNJYA_lIC4EcFKp6DHEg'
    chat_id = '-1002160371072'
    urltelegram = f"https://api.telegram.org/bot{TOKEN}/sendMessage?chat_id={chat_id}&text={message}"
    
    # Create a retry session
    session = create_retry_session(retries=5, backoff_factor=0.3, status_forcelist=[500, 502, 504])
    
    try:
        response = session.get(urltelegram)
        response.raise_for_status()
        print(response.json())
    except requests.exceptions.RequestException as e:
        print(f"Error sending message to Telegram: {e}")

def telegram_send_image(image_path):
    TOKEN = '7407830307:AAGytcPx85HtpNCMNJYA_lIC4EcFKp6DHEg'
    chat_id = '-1002160371072'
    urltelegram = f"https://api.telegram.org/bot{TOKEN}/sendPhoto"
    
    # Create a retry session
    session = create_retry_session(retries=5, backoff_factor=0.3, status_forcelist=[500, 502, 504])
    
    try:
        with open(image_path, 'rb') as img:
            response = session.post(urltelegram, data={'chat_id': chat_id}, files={'photo': img})
            response.raise_for_status()
            print(response.json())
    except requests.exceptions.RequestException as e:
        print(f"Error sending image to Telegram: {e}")

########### PLOT ###################
def plot_dataframe(df, plot_height=600, plot_width=1000):
    try:
        # Create subplots
        fig = make_subplots(rows=1, cols=1, shared_xaxes=True)
        
        # Add trace for candlestick chart
        fig.add_trace(go.Candlestick(x=df.index,
                                    open=df['open'], high=df['high'],
                                    low=df['low'], close=df['close']))

        # Additional hovering for the close price
        fig.add_trace(go.Scatter(x=df.index, 
                                y=df['close'], 
                                mode='lines', 
                                name='Close Price',
                                hovertemplate="<b>Date</b>: %{x}<br><b>Close Price</b>: %{y}<br>"
                                            "<b>vdiff</b>: %{customdata[0]}<br>"
                                            "<b>Volume True</b>: %{customdata[1]}<br>"
                                            "<b>Volume False</b>: %{customdata[2]}<br>"
                                            "<b>Liquidation Volume</b>: %{customdata[3]}<extra></extra>",
                                line=dict(width=0.5),  
                                customdata=df[['vdiff', 'volume_true', 'volume_false', 'liq_volume']]))

        ################## Markers 1 #####################
        fig.add_trace(go.Scatter(x=df.index[df['long_position']], y=df['close'][df['long_position']],
                                mode='markers',
                                marker=dict(color='green', symbol='triangle-up', size=19),
                                name='Long Position',
                                customdata=df[df['long_position']][['vdiff', 'volume_true', 'volume_false', 'liq_volume']],
                                hovertemplate="<b>Date</b>: %{x}<br><b>Close Price</b>: %{y}<br>"
                                            "<b>vdiff</b>: %{customdata[0]}<br>"
                                            "<b>Volume True</b>: %{customdata[1]}<br>"
                                            "<b>Volume False</b>: %{customdata[2]}<br>"
                                            "<b>Liquidation Volume</b>: %{customdata[3]}<extra></extra>"))
        
        fig.add_trace(go.Scatter(x=df.index[df['short_position']], y=df['close'][df['short_position']],
                                mode='markers',
                                marker=dict(color='red', symbol='triangle-down', size=19),
                                name='Short Position',
                                customdata=df[df['short_position']][['vdiff', 'volume_true', 'volume_false', 'liq_volume']],
                                hovertemplate="<b>Date</b>: %{x}<br><b>Close Price</b>: %{y}<br>"
                                            "<b>vdiff</b>: %{customdata[0]}<br>"
                                            "<b>Volume True</b>: %{customdata[1]}<br>"
                                            "<b>Volume False</b>: %{customdata[2]}<br>"
                                            "<b>Liquidation Volume</b>: %{customdata[3]}<extra></extra>"))
        ################## Markers 1 #####################
        fig.add_trace(go.Scatter(x=df.index[df['long_position_1']], y=df['close'][df['long_position_1']],
                                mode='markers',
                                marker=dict(color='green', symbol='triangle-up', size=19),
                                name='Long Position',
                                customdata=df[df['long_position_1']][['vdiff', 'volume_true', 'volume_false', 'liq_volume']],
                                hovertemplate="<b>Date</b>: %{x}<br><b>Close Price</b>: %{y}<br>"
                                            "<b>vdiff</b>: %{customdata[0]}<br>"
                                            "<b>Volume True</b>: %{customdata[1]}<br>"
                                            "<b>Volume False</b>: %{customdata[2]}<br>"
                                            "<b>Liquidation Volume</b>: %{customdata[3]}<extra></extra>"))
        
        fig.add_trace(go.Scatter(x=df.index[df['short_position_1']], y=df['close'][df['short_position_1']],
                                mode='markers',
                                marker=dict(color='red', symbol='triangle-down', size=19),
                                name='Short Position',
                                customdata=df[df['short_position_1']][['vdiff', 'volume_true', 'volume_false', 'liq_volume']],
                                hovertemplate="<b>Date</b>: %{x}<br><b>Close Price</b>: %{y}<br>"
                                            "<b>vdiff</b>: %{customdata[0]}<br>"
                                            "<b>Volume True</b>: %{customdata[1]}<br>"
                                            "<b>Volume False</b>: %{customdata[2]}<br>"
                                            "<b>Liquidation Volume</b>: %{customdata[3]}<extra></extra>"))
        ################## Markers 2 #####################
        fig.add_trace(go.Scatter(x=df.index[df['liqudation_long']], y=df['close'][df['liqudation_long']],
                                mode='markers',
                                marker=dict(color='yellow', symbol='triangle-up', size=12),
                                name='Long Position',
                                customdata=df[df['liqudation_long']][['vdiff', 'volume_true', 'volume_false', 'liq_volume']],
                                hovertemplate="<b>Date</b>: %{x}<br><b>Close Price</b>: %{y}<br>"
                                            "<b>vdiff</b>: %{customdata[0]}<br>"
                                            "<b>Volume True</b>: %{customdata[1]}<br>"
                                            "<b>Volume False</b>: %{customdata[2]}<br>"
                                            "<b>Liquidation Volume</b>: %{customdata[3]}<extra></extra>"))
        
        fig.add_trace(go.Scatter(x=df.index[df['liqudation_short']], y=df['close'][df['liqudation_short']],
                                mode='markers',
                                marker=dict(color='orange', symbol='triangle-down', size=12),
                                name='Short Position',
                                customdata=df[df['liqudation_short']][['vdiff', 'volume_true', 'volume_false', 'liq_volume']],
                                hovertemplate="<b>Date</b>: %{x}<br><b>Close Price</b>: %{y}<br>"
                                            "<b>vdiff</b>: %{customdata[0]}<br>"
                                            "<b>Volume True</b>: %{customdata[1]}<br>"
                                            "<b>Volume False</b>: %{customdata[2]}<br>"
                                            "<b>Liquidation Volume</b>: %{customdata[3]}<extra></extra>"))

        # Update layout with adjustable plot size
        clear_output(wait=True)
        fig.update_layout(xaxis_title='Date', yaxis_title='Price', height=plot_height, width=plot_width, xaxis_rangeslider_visible=False)
        # Display plot
        fig.show()
        
        image_path = r"C:\Users\Mcc\Desktop\Telegram Bot\image.jpg"
        pio.write_image(fig, image_path, format='jpg')
        print(f"Image saved at {image_path}")
        # Send image to Telegram
        latest_row = merged_df_all.iloc[-1]
        if latest_row['long_position'] or latest_row['short_position'] or latest_row['long_position_1'] or latest_row['short_position_1'] or latest_row['liqudation_short'] or latest_row['liqudation_long']:
            telegram_send_image(image_path)
    except Exception as e:
        print(f"Error plotting and saving image: {e}")

# Create a thread to save and clear DataFrames
save_thread = threading.Thread(target=save_and_clear_df)
save_thread.daemon = True
save_thread.start()

def on_message(ws, message):
    msg = json.loads(message)
    if msg["e"] == "aggTrade":
        stream_agg_trade(msg)
    elif msg["e"] == "kline" and msg["k"]["i"] == "1m":
        stream_kline(msg)
    elif msg["e"] == "forceOrder":
        stream_liquidation(msg)

def on_error(ws, error):
    current_time = pd.Timestamp.now().strftime("%Y-%m-%d_%H-%M-%S")
    print(error)
    print(f'Time of Error: {current_time}')

def on_close(ws, close_status_code, close_msg):
    current_time = pd.Timestamp.now().strftime("%Y-%m-%d_%H-%M-%S")
    print(f"### Closed at {current_time} with code: {close_status_code}, message: {close_msg} ###")
    reconnect(ws.url)  # Attempt to reconnect

def on_open(ws):
    current_time = pd.Timestamp.now().strftime("%Y-%m-%d_%H-%M-%S")
    print(f"### Opened at {current_time} ###")

def stream_agg_trade(msg):
    event_time = pd.to_datetime(msg["E"], unit='ms')
    symbol = msg["s"]
    price = float(msg["p"])
    quantity = float(msg["q"])
    is_buyer_market_maker = msg["m"]
    df_agg.loc[event_time] = [symbol, price, quantity, is_buyer_market_maker]
    #print(f"AGGREGATION: Event Time: {event_time} | Symbol: {symbol} | Price: {price} | Quantity: {quantity} | Is Buyer Market Maker: {is_buyer_market_maker}", end="\r")

def stream_kline(msg):
    event_time = pd.to_datetime(msg["k"]["t"], unit='ms')
    symbol = msg["s"]
    kline_data = msg["k"]
    open_price = float(kline_data["o"])
    high_price = float(kline_data["h"])
    low_price = float(kline_data["l"])
    close_price = float(kline_data["c"])
    volume = float(kline_data["v"])
    is_closed = kline_data["x"]  # Check if the Kline is closed
    df_kline.loc[event_time] = [symbol, open_price, high_price, low_price, close_price, volume, is_closed]
    #print(f"KLINE: Event Time: {event_time} | Symbol: {symbol} | Open: {open_price} | High: {high_price} | Low: {low_price} | Close: {close_price} | Volume: {volume} | Closed: {is_closed}", end="\r")

def stream_liquidation(msg):
    # Extract required items from the message
    event_time = pd.to_datetime(msg["E"], unit='ms')  # Convert timestamp to datetime
    symbol = msg["o"]["s"]
    original_quantity = float(msg["o"]["q"])
    price = float(msg["o"]["p"])
    #side = msg["o"]["S"]
    #order_type = msg["o"]["o"]
    #time_in_force = msg["o"]["f"]
    #average_price = msg["o"]["ap"]
    #order_status = msg["o"]["X"]
    #last_filled_quantity = msg["o"]["l"]
    #filled_accumulated_quantity = msg["o"]["z"]
    #order_trade_time = msg["o"]["T"]

    #print(f"LIQUIDATION: Event Time: {event_time} | Symbol: {symbol} | Side: {side} | Order Type: {order_type} | Price: {price} | Original  Quantity: {original_quantity} | Last Filled  Quantity: {last_filled_quantity} | Order Status: {order_status}", end="\r")
    #print(f"LIQUIDATION: Event Time: {event_time} | Symbol: {symbol} | Original Quantity: {original_quantity} | Price: {price}", end="\r")
    df_liquid.loc[event_time] = [symbol, original_quantity, price]


def reconnect(url):
    time.sleep(1)  # Delay before reconnecting
    run_websocket(url)

def run_websocket(url):
    while True:
        websocket.enableTrace(False)
        ws = websocket.WebSocketApp(url,
                                    on_message=on_message,
                                    on_error=on_error,
                                    on_close=on_close)
        ws.on_open = on_open
        ws.run_forever()
        print(f"Attempting to reconnect to {url}...")
        time.sleep(1)  # Wait before trying to reconnect

# Start WebSocket connections
agg_thread = threading.Thread(target=run_websocket, args=(agg_socket_url,))
agg_thread.start()

kline_thread = threading.Thread(target=run_websocket, args=(kline_socket_url,))
kline_thread.start()

force_thread = threading.Thread(target=run_websocket, args=(liq_socket_url,))
force_thread.start()

