In [1]:
from market_imbalance import MarketImbalance
from find_imbalances import sort_imbalances_by_timestamp
from plot_market_candles_with_plotly import plot_market_candles_with_plotly
from market_statistics import conversion_factors, calculate_statistics, plot_fill_time_histogram
from convert_ts_to_datetime import format_elapsed_time

def find_imbalances_after_fall(historical_candles):
    imbalances_after_fall = []
    for index in range(len(historical_candles)):
        if index == 0 or index == len(historical_candles) - 1:
            continue
        prev_candle = historical_candles[index - 1]
        current_candle = historical_candles[index]
        next_candle = historical_candles[index + 1]
        if prev_candle.low_price > next_candle.high_price:
            delta_to_be_filled_in = prev_candle.low_price - next_candle.high_price
            
            imbalance = MarketImbalance(
                imbalance_type="imbalance_after_fall",
                timestamp=current_candle.timestamp,
                open_price=next_candle.high_price,
                close_price=prev_candle.low_price,
                delta_to_be_filled_in=delta_to_be_filled_in,
                is_full_filled=False,
                was_fullfilled_at=None,
                time_to_be_fullfilled=None,
                is_partially_filled=False,
                remaining_delta_open_price=None,
                remaining_delta_to_be_filled_in=None,
                candles_of_identification=(prev_candle, current_candle, next_candle),
                candles_of_fullfilling=None,
                candles_of_partfilling=None
            )
            imbalances_after_fall.append(imbalance)
    return imbalances_after_fall

def find_imbalances_after_rise(historical_candles):
    imbalances_after_rise = []
    for index in range(len(historical_candles)):
        if index == 0 or index == len(historical_candles) - 1:
            continue
        prev_candle = historical_candles[index - 1]
        current_candle = historical_candles[index]
        next_candle = historical_candles[index + 1]
        if prev_candle.high_price < next_candle.low_price:
            delta_to_be_filled_in = next_candle.low_price - prev_candle.high_price
            
            imbalance = MarketImbalance(
                imbalance_type="imbalance_after_rise",
                timestamp=current_candle.timestamp,
                open_price=next_candle.low_price,
                close_price=prev_candle.high_price,
                delta_to_be_filled_in=delta_to_be_filled_in,
                is_full_filled=False,
                was_fullfilled_at=None,
                time_to_be_fullfilled=None,
                is_partially_filled=False,
                remaining_delta_open_price=None,
                remaining_delta_to_be_filled_in=None,
                candles_of_identification=(prev_candle, current_candle, next_candle),
                candles_of_fullfilling=None,
                candles_of_partfilling=None
            )
            imbalances_after_rise.append(imbalance)
    return imbalances_after_rise

def check_if_imbalance_filled(imbalances, candles):
    for imbalance in imbalances:
        if imbalance.is_full_filled:
            continue  # Passer les imbalances déjà comblés

        # Ne vérifier que les bougies avec un timestamp >= à celui de l'imbalance
        relevant_candles = [candle for candle in candles if candle.timestamp > imbalance.timestamp]

        for candle in relevant_candles:
            if imbalance.imbalance_type == "imbalance_after_fall":
                # Vérifier si la bougie comble l'imbalance après une chute
                if candle.high_price >= imbalance.close_price:
                    imbalance.is_full_filled = True
                    imbalance.was_fullfilled_at = candle.timestamp
                    imbalance.candles_of_fullfilling = candle

                    # Calculer le temps pour combler l'imbalance
                    time_to_fullfill = candle.timestamp - imbalance.timestamp
                    imbalance.time_to_be_fullfilled = time_to_fullfill  # en millisecondes
                    break  # Sortir de la boucle une fois comblé

            elif imbalance.imbalance_type == "imbalance_after_rise":
                # Vérifier si la bougie comble l'imbalance après une hausse
                if candle.low_price <= imbalance.close_price:
                    imbalance.is_full_filled = True
                    imbalance.was_fullfilled_at = candle.timestamp
                    imbalance.candles_of_fullfilling = candle

                    # Calculer le temps pour combler l'imbalance
                    time_to_fullfill = candle.timestamp - imbalance.timestamp
                    imbalance.time_to_be_fullfilled = time_to_fullfill  # en millisecondes
                    break  # Sortir de la boucle une fois comblé

def get_unfilled_imbalances(imbalances):
    unfilled_imbalances = [imbalance for imbalance in imbalances if not imbalance.is_full_filled]
    return unfilled_imbalances

def get_fullfilled_imbalances(imbalances):
    fullfilled_imbalances = [imbalance for imbalance in imbalances if imbalance.is_full_filled]
    return fullfilled_imbalances

def calculate_fulfillment_percentage(total_imbalances, fullfilled_imbalances):
    if total_imbalances == 0:
        raise ValueError("Le nombre total d'imbalances ne peut pas être zéro.")

    # Calcule le pourcentage d'imbalances comblés
    percentage = (len(fullfilled_imbalances) / len(total_imbalances)) * 100
    
    return round(percentage, 2)  # Arrondir à deux décimales


In [2]:
# !pip install python-binance
import os
from interact_with_binance import fetch_ohlcv, fetch_ohlcv_as_df
from market_candles import MarketCandle
from binance.client import Client
from convert_ts_to_datetime import convert_ts_to_datetime


api_key = os.environ.get('BINANCE_API_KEY')
api_secret = os.environ.get('BINANCE_API_SECRET')
client = Client(api_key, api_secret)

# Récupère les données de marché en *temps différé* sur Binance
interval_value = Client.KLINE_INTERVAL_1MINUTE
from_date = "27 avril 2024"

historical_candles = fetch_ohlcv(client, symbol = "ATOMUSDT", interval=interval_value, from_date=from_date) #"1 Jan, 2015"
historical_candles_df = fetch_ohlcv_as_df(client, symbol = "ATOMUSDT", interval=interval_value, from_date=from_date) #"1 Jan, 2015"

print(len(historical_candles), "bougies récupérées depuis le", from_date)



2620 bougies récupérées depuis le 27 avril 2024


In [3]:
# Récupère les imbalances à partir des infos de marché
imbalances_after_fall = find_imbalances_after_fall(historical_candles)
print("nombre d'imbalances après une baisse :", len(imbalances_after_fall))

imbalances_after_rise = find_imbalances_after_rise(historical_candles)
print("nombre d'imbalances après une hausse :", len(imbalances_after_rise))

# réuni les imbalances après une baisse et ceux après une hausse
imbalances = imbalances_after_fall + imbalances_after_rise
print("nombre d'imbalances cumulés :", len(imbalances))

# Met à jour les attributs is_full_filled, was_fullfilled_at, time_to_be_fullfilled et conserve les attributs de la bougie de comblage
check_if_imbalance_filled(imbalances, historical_candles)


# Identifie uniquement les imbalances déjà comblés
fullfilled_imbalances_after_fall = get_fullfilled_imbalances(imbalances_after_fall)
print("nombre d'imbalances comblés après une baisse :", len(fullfilled_imbalances_after_fall))

fullfilled_imbalances_after_rise = get_fullfilled_imbalances(imbalances_after_rise)
print("nombre d'imbalances comblés après une hausse :", len(fullfilled_imbalances_after_rise))

# réuni les imbalances comblés après une baisse et ceux après une hausse
fullfilled_imbalances = fullfilled_imbalances_after_fall + fullfilled_imbalances_after_rise
print("nombre d'imbalances comblés cumulés :", len(fullfilled_imbalances))

fullfillment_percentage = calculate_fulfillment_percentage(imbalances, fullfilled_imbalances)
print(f"le pourcentage de comblage est de : {fullfillment_percentage}%")


# Identifie uniquement les imbalances non comblés
unfilled_imbalances_after_fall = get_unfilled_imbalances(imbalances_after_fall)
print("nombre d'imbalances non comblés après une baisse :", len(unfilled_imbalances_after_fall))

unfilled_imbalances_after_rise = get_unfilled_imbalances(imbalances_after_rise)
print("nombre d'imbalances non comblés après une hausse :", len(unfilled_imbalances_after_rise))

# réuni les imbalances comblés après une baisse et ceux après une hausse
unfilled_imbalances = unfilled_imbalances_after_fall + unfilled_imbalances_after_rise
print("nombre d'imbalances non comblés cumulés :", len(unfilled_imbalances))



nombre d'imbalances après une baisse : 494
nombre d'imbalances après une hausse : 485
nombre d'imbalances cumulés : 979
nombre d'imbalances comblés après une baisse : 473
nombre d'imbalances comblés après une hausse : 466
nombre d'imbalances comblés cumulés : 939
le pourcentage de comblage est de : 95.91%
nombre d'imbalances non comblés après une baisse : 21
nombre d'imbalances non comblés après une hausse : 19
nombre d'imbalances non comblés cumulés : 40


In [4]:
import numpy as np

# Extraire les temps de comblement en millisecondes
times_to_fill = [imb.time_to_be_fullfilled for imb in fullfilled_imbalances]

# Calcule les statistiques
imbalances_statistics, imbalances_statistics_in_french = calculate_statistics(times_to_fill)
# print(imbalances_statistics)
# print(imbalances_statistics_in_french)


In [5]:
# plot_fill_time_histogram(num_bins = 10, median_time = imbalances_statistics['percentiles']['50th'], times_to_fill = times_to_fill)


In [None]:
# plot_market_candles_with_plotly(historical_candles_df)

In [None]:

for imb in unfilled_imbalances_after_rise[-10:] : 
    print('fullfilled =',imb.is_full_filled, ',imbalance that started on', convert_ts_to_datetime(imb.timestamp), 'at an open price =', imb.open_price, 'at on close_price =', imb.close_price, 'with a delta =', round(imb.delta_to_be_filled_in, 2))

In [None]:
# Affiche les imbalances non comblés sur le graphique
unfilled_imbalances = []
unfilled_imbalances.extend(unfilled_imbalances_after_fall)
unfilled_imbalances.extend(unfilled_imbalances_after_rise)

sorted_imbalances = sort_imbalances_by_timestamp(unfilled_imbalances)
sorted_imbalances[-10:]

for imb in sorted_imbalances[-10:]:
    print(imb.timestamp, imb.open_price, imb.close_price, round(imb.delta_to_be_filled_in, 2))



In [None]:
# Percentage of gap filled after fall
percentage_filled_after_fall = (len(fullfilled_imbalances_after_fall) / len(find_imbalances_after_fall)) * 100
print(f'% of filled gap after fall = {percentage_filled_after_fall} with {len(fullfilled_imbalances_after_fall)} fullfilled imbalances on {len(imbalances_after_fall)} in total')

# Percentage of gap filled after rise
percentage_filled_after_rise = (len(fullfilled_imbalances_after_rise) / len(find_imbalances_after_rise)) * 100
print(f'% of filled gap after rise = {percentage_filled_after_rise} with {len(fullfilled_imbalances_after_rise)} fullfilled imbalances on {len(imbalances_after_rise)} in total')

# Percentage of gap filled in both directions
total_imbalances = (len(fullfilled_imbalances_after_fall) + len(fullfilled_imbalances_after_rise))
percentage_filled = (len(fullfilled_imbalances) / len(total_imbalances)) * 100
print(f'% of filled gap = {percentage_filled} with {len(total_imbalances)} fullfilled imbalances on {len(fullfilled_imbalances)} in total')
