In [2]:
import numpy as np
from scipy.signal import argrelextrema
import ccxt
import pandas as pd
import numba as nb
from sklearn.preprocessing import StandardScaler, RobustScaler
pd.options.plotting.backend = "plotly"
pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)
pd.set_option('display.width', 1000)

In [3]:
nb.jit
def create_target(df, long, method='polyfit', polyfit_var='close', pct=0.1):
    if method == 'polyfit':

        trend_list = []
        slope_list = []
        start_list = []
        end_list = []

        index_l = np.arange(long)
        rolling_df = df[polyfit_var].rolling(window=long, min_periods=long)
        for roll in rolling_df:
            if len(roll) < long:
                continue
            slope_array = np.round(np.polyfit(index_l, roll.values, deg=1)[-2], decimals=8)
            slope_list.append(slope_array)
            trend_list.append(np.where(slope_array > 0, 1, np.where(slope_array == 0, 0, -1)).tolist())
            start_list.append(roll.index[0])
            end_list.append(roll.index[long-1])

    y = pd.DataFrame({'trend': trend_list, 'slope': slope_list, 'start_windows': start_list, 'end_windows': end_list})
    return y

In [4]:
def adjusted_sigmoid(x, k=0.5, x0=0):
    """
    Adjusted sigmoid function to map values to the range [-1, 1].
    k controls the steepness of the curve.
    x0 is the midpoint of the sigmoid.
    """
    return 2 / (1 + np.exp(-k * (x - x0))) - 1

In [5]:
def download_1000_candles(market='ADA/USDT', tf='1h'):
    ex = ccxt.binance()
    ohlcv = ex.fetch_ohlcv(market, tf, limit=1001)  # Fetch one extra to ensure 1000 closed candles

    # Build DataFrame
    header = ['timestamp', 'open', 'high', 'low', 'close', 'volume']
    ohlcv_df = pd.DataFrame(ohlcv, columns=header)
    ohlcv_df['timestamp'] = pd.to_datetime(ohlcv_df['timestamp'], unit='ms', utc=True)
    # ohlcv_df.set_index('timestamp', inplace=True)

    # Remove the last row to ensure all candles are closed
    ohlcv_df = ohlcv_df.iloc[:-1].copy()

    return ohlcv_df

# download_1000_candles()

In [6]:
candles = download_1000_candles(market='ADA/USDT', tf='1h')
candles

Unnamed: 0,timestamp,open,high,low,close,volume
0,2023-12-01 00:00:00+00:00,0.3760,0.3761,0.3736,0.3746,1715322.8
1,2023-12-01 01:00:00+00:00,0.3745,0.3795,0.3739,0.3790,4433896.7
2,2023-12-01 02:00:00+00:00,0.3790,0.3813,0.3782,0.3801,2721474.9
3,2023-12-01 03:00:00+00:00,0.3801,0.3805,0.3788,0.3791,1524704.5
4,2023-12-01 04:00:00+00:00,0.3792,0.3809,0.3788,0.3801,1432860.0
...,...,...,...,...,...,...
994,2024-01-11 10:00:00+00:00,0.5807,0.5949,0.5807,0.5947,12119916.1
995,2024-01-11 11:00:00+00:00,0.5947,0.6000,0.5901,0.5994,11200159.5
996,2024-01-11 12:00:00+00:00,0.5994,0.6174,0.5990,0.6145,21825917.4
997,2024-01-11 13:00:00+00:00,0.6145,0.6145,0.5917,0.5996,27891214.7


In [7]:
kernel = 6
min_peaks = argrelextrema(
	candles["low"].values, np.less,
	order=kernel
)
max_peaks = argrelextrema(
	candles["high"].values, np.greater,
	order=kernel
)

candles["extrema"] = 0

for mp in min_peaks[0]:
	candles.at[mp, "extrema"] = -1
for mp in max_peaks[0]:
	candles.at[mp, "extrema"] = 1

candles['extrema'] = candles['extrema'].rolling(
	window=3, win_type='gaussian', center=True).mean(std=0.5)

candles['extrema'].fillna(0, inplace=True)

In [8]:
candles['extrema'].value_counts()

extrema
 0.000000    745
 0.106507     91
-0.106507     75
 0.786986     46
-0.786986     38
-0.680479      2
 0.680479      2
Name: count, dtype: int64

In [9]:
candles['extrema'].plot()

In [10]:
import numpy as np
import pandas as pd
from scipy.signal import argrelextrema
import scipy.stats as stats

def extrema_analyze_candles(candles, kernel=6):
    # Find peaks
    min_peaks = argrelextrema(candles["low"].values, np.less, order=kernel)[0]
    max_peaks = argrelextrema(candles["high"].values, np.greater, order=kernel)[0]

    # Count peaks
    num_low_peaks = len(min_peaks)
    num_high_peaks = len(max_peaks)
    total_peaks = num_low_peaks + num_high_peaks

    # Prepare lists for data
    distances = []
    candles_between_peaks = []
    distance_per_candle = []

    for low_peak in min_peaks:
        # Find the next high peak
        next_high_peaks = max_peaks[max_peaks > low_peak]
        if next_high_peaks.size > 0:
            high_peak = next_high_peaks[0]
            low_price = candles.at[low_peak, 'close']
            high_price = candles.at[high_peak, 'close']
            distance_percentage = ((high_price - low_price) / low_price) * 100
            distances.append(distance_percentage)

            # Calculate number of candles between peaks
            num_candles = high_peak - low_peak
            candles_between_peaks.append(num_candles)

            # Calculate distance % per candle
            if num_candles != 0:
                distance_per_candle.append(distance_percentage / num_candles)
            else:
                distance_per_candle.append(0)

    # Describe distances, candles between peaks, and distance per candle
    distances_description = pd.Series(distances).describe()
    candles_between_peaks_description = pd.Series(candles_between_peaks).describe()
    distance_per_candle_description = pd.Series(distance_per_candle).describe()

    print(f"Low peaks: {num_low_peaks}")
    print(f"High peaks: {num_high_peaks}")
    print(f"Total peaks: {total_peaks}")
    print(f"Distances (%): {distances_description}")
    print(f"Candles between peaks: {candles_between_peaks_description}")
    # print(f"Distance per candle (%): {distance_per_candle_description}")

    # Perform correlation analysis
    if len(distances) > 1 and len(candles_between_peaks) > 1:  # Ensure there are enough data points
        correlation, p_value = stats.pearsonr(candles_between_peaks, distances)

        print(f"Correlation between number of candles and return price: {correlation}")
        print(f"P-value of the correlation: {p_value}")

    else:
        print("Not enough data points for correlation analysis.")
    # return num_low_peaks, num_high_peaks, total_peaks, distances_description, candles_between_peaks_description, distance_per_candle_description


In [11]:
def extrema_analyze_candles2(candles, kernel=6):
    # Find peaks
    min_peaks = argrelextrema(candles["low"].values, np.less, order=kernel)[0]
    max_peaks = argrelextrema(candles["high"].values, np.greater, order=kernel)[0]

    # Count peaks
    num_low_peaks = len(min_peaks)
    num_high_peaks = len(max_peaks)
    total_peaks = num_low_peaks + num_high_peaks

    # Prepare lists for upward data
    distances_upward = []
    candles_between_peaks_upward = []

    # Prepare lists for downward data
    distances_downward = []
    candles_between_peaks_downward = []

    # Calculate distances upward (low to high)
    for low_peak in min_peaks:
        next_high_peaks = max_peaks[max_peaks > low_peak]
        if next_high_peaks.size > 0:
            high_peak = next_high_peaks[0]
            low_price = candles.at[low_peak, 'close']
            high_price = candles.at[high_peak, 'close']
            distance_upward = ((high_price - low_price) / low_price) * 100
            distances_upward.append(distance_upward)
            num_candles_upward = high_peak - low_peak
            candles_between_peaks_upward.append(num_candles_upward)

    # Calculate distances downward (high to low)
    for high_peak in max_peaks:
        next_low_peaks = min_peaks[min_peaks > high_peak]
        if next_low_peaks.size > 0:
            low_peak = next_low_peaks[0]
            high_price = candles.at[high_peak, 'close']
            low_price = candles.at[low_peak, 'close']
            distance_downward = ((low_price - high_price) / high_price) * 100
            distances_downward.append(distance_downward)
            num_candles_downward = low_peak - high_peak
            candles_between_peaks_downward.append(num_candles_downward)

    # Describe upward data
    distances_upward_description = pd.Series(distances_upward).describe()
    candles_between_peaks_upward_description = pd.Series(candles_between_peaks_upward).describe()

    # Describe downward data
    distances_downward_description = pd.Series(distances_downward).describe()
    candles_between_peaks_downward_description = pd.Series(candles_between_peaks_downward).describe()

    # Print results
    print(f"Low peaks: {num_low_peaks}, High peaks: {num_high_peaks}, Total peaks: {total_peaks}")
    print(f"Upward Distances (%): {distances_upward_description}")
    print(f"Candles Between Peaks Upward: {candles_between_peaks_upward_description}")
    print(f"Downward Distances (%): {distances_downward_description}")
    print(f"Candles Between Peaks Downward: {candles_between_peaks_downward_description}")

    # Perform correlation analysis for upward movement
    if len(distances_upward) > 1 and len(candles_between_peaks_upward) > 1:
        correlation_upward, p_value_upward = stats.pearsonr(candles_between_peaks_upward, distances_upward)
        print(f"Correlation between number of candles and return price (upward): {correlation_upward}, P-value: {p_value_upward}")

    # Perform correlation analysis for downward movement
    if len(distances_downward) > 1 and len(candles_between_peaks_downward) > 1:
        correlation_downward, p_value_downward = stats.pearsonr(candles_between_peaks_downward, distances_downward)
        print(f"Correlation between number of candles and return price (downward): {correlation_downward}, P-value: {p_value_downward}")

    else:
        print("Not enough data points for correlation analysis.")

    # Return the data for further analysis if needed
    return {
        "upward": {
            "num_low_peaks": num_low_peaks,
            "num_high_peaks": num_high_peaks,
            "total_peaks": total_peaks,
            "distances": distances_upward_description,
            "candles_between_peaks": candles_between_peaks_upward_description,
            "correlation": correlation_upward,
            "p_value": p_value_upward
        },
        "downward": {
            "distances": distances_downward_description,
            "candles_between_peaks": candles_between_peaks_downward_description,
            "correlation": correlation_downward,
            "p_value": p_value_downward
        }
    }


In [12]:
def extrema_analyze_candles3(candles, kernel=6):
    # Find peaks
    min_peaks = argrelextrema(candles["low"].values, np.less, order=kernel)[0]
    max_peaks = argrelextrema(candles["high"].values, np.greater, order=kernel)[0]

    # Prepare lists for upward and downward data
    distances_upward = []
    candles_between_peaks_upward = []
    distances_downward = []
    candles_between_peaks_downward = []

    # Iterate over low peaks to find the next high peak
    for low_peak in min_peaks:
        next_high_peaks = max_peaks[max_peaks > low_peak]
        if next_high_peaks.size > 0:
            high_peak = next_high_peaks[0]
            distance_percentage = ((candles.at[high_peak, 'close'] - candles.at[low_peak, 'close']) / candles.at[low_peak, 'close']) * 100
            distances_upward.append(distance_percentage)
            candles_between_peaks_upward.append(high_peak - low_peak)

    # Iterate over high peaks to find the next low peak
    for high_peak in max_peaks:
        next_low_peaks = min_peaks[min_peaks > high_peak]
        if next_low_peaks.size > 0:
            low_peak = next_low_peaks[0]
            distance_percentage = ((candles.at[low_peak, 'close'] - candles.at[high_peak, 'close']) / candles.at[high_peak, 'close']) * -100
            distances_downward.append(distance_percentage)
            candles_between_peaks_downward.append(low_peak - high_peak)

    # Merge and describe the data
    all_distances = distances_upward + distances_downward
    all_candles_between_peaks = candles_between_peaks_upward + candles_between_peaks_downward
    all_distances_description = pd.Series(all_distances).describe()
    all_candles_between_peaks_description = pd.Series(all_candles_between_peaks).describe()

    # Print merged results
    print(f"Merged Distances (%): {all_distances_description}")
    print(f"Merged Candles Between Peaks: {all_candles_between_peaks_description}")

    # Return the merged data for further analysis if needed
    # return {
    #     "merged_distances": all_distances_description,
    #     "merged_candles_between_peaks": all_candles_between_peaks_description
    # }


In [34]:
candles = download_1000_candles(market='ADA/USDT', tf='1h')
candles

Unnamed: 0,timestamp,open,high,low,close,volume
0,2023-12-01 00:00:00+00:00,0.3760,0.3761,0.3736,0.3746,1715322.8
1,2023-12-01 01:00:00+00:00,0.3745,0.3795,0.3739,0.3790,4433896.7
2,2023-12-01 02:00:00+00:00,0.3790,0.3813,0.3782,0.3801,2721474.9
3,2023-12-01 03:00:00+00:00,0.3801,0.3805,0.3788,0.3791,1524704.5
4,2023-12-01 04:00:00+00:00,0.3792,0.3809,0.3788,0.3801,1432860.0
...,...,...,...,...,...,...
994,2024-01-11 10:00:00+00:00,0.5807,0.5949,0.5807,0.5947,12119916.1
995,2024-01-11 11:00:00+00:00,0.5947,0.6000,0.5901,0.5994,11200159.5
996,2024-01-11 12:00:00+00:00,0.5994,0.6174,0.5990,0.6145,21825917.4
997,2024-01-11 13:00:00+00:00,0.6145,0.6145,0.5917,0.5996,27891214.7


In [35]:
extrema_analyze_candles3(candles=candles)

Merged Distances (%): count    87.000000
mean      4.426537
std       4.437843
min      -2.892263
25%       1.828149
50%       3.358869
75%       6.564998
max      26.214921
dtype: float64
Merged Candles Between Peaks: count    87.000000
mean     12.873563
std       8.406538
min       1.000000
25%       6.500000
50%      11.000000
75%      17.500000
max      36.000000
dtype: float64


In [36]:
# calculate ATR indicator for candles using talib
import talib

candles['atr'] = talib.ATR(candles['high'], candles['low'], candles['close'], timeperiod=100)/candles['close']
# describe ATR
candles['atr'].describe()

count    899.000000
mean       0.015890
std        0.002911
min        0.008285
25%        0.014402
50%        0.016093
75%        0.017551
max        0.021478
Name: atr, dtype: float64

In [37]:
def plot_candles(candles, kernel):
	candles['&-s_max'] = candles["high"].shift(-kernel).rolling(kernel).max() / candles["close"] - 1
	candles['&-s_min'] = candles["low"].shift(-kernel).rolling(kernel).min() / candles["close"] - 1

	candles['tresh'] = candles['atr'] * 2
	candles['minus_tresh'] = -candles['tresh']
	candles['&-s_max_class'] = np.where(candles['&-s_max'] > candles['tresh'], 0.02, 0)
	candles['&-s_min_class'] = np.where(candles['&-s_min'] < -candles['tresh'], -0.02, 0)

	# describe max and min
	print(candles[['&-s_max', '&-s_min']].describe())
	# value counts for min max classes
	print(candles['&-s_max_class'].value_counts())
	print(candles['&-s_min_class'].value_counts())

	candles[['&-s_max_class','&-s_min_class', '&-s_max', '&-s_min', 'tresh', 'minus_tresh']].plot().show()

	# plot distribution of max and min
	candles[['&-s_max', '&-s_min']].plot(kind='hist', bins=200).show()

In [38]:
plot_candles(candles, 24)

          &-s_max     &-s_min
count  952.000000  952.000000
mean     0.051448   -0.043403
std      0.057802    0.038727
min      0.000000   -0.200128
25%      0.013606   -0.059395
50%      0.031084   -0.035960
75%      0.066701   -0.013454
max      0.340095    0.000365
&-s_max_class
0.00    579
0.02    420
Name: count, dtype: int64
&-s_min_class
-0.02    522
 0.00    477
Name: count, dtype: int64


In [39]:
plot_candles(candles, 20)

          &-s_max     &-s_min
count  960.000000  960.000000
mean     0.045968   -0.039535
std      0.051945    0.036242
min      0.000000   -0.190677
25%      0.012134   -0.054319
50%      0.027373   -0.033372
75%      0.059259   -0.011770
max      0.261080    0.000365
&-s_max_class
0.00    615
0.02    384
Name: count, dtype: int64
&-s_min_class
 0.00    510
-0.02    489
Name: count, dtype: int64


In [40]:
plot_candles(candles, 6)

          &-s_max     &-s_min
count  988.000000  988.000000
mean     0.021908   -0.020661
std      0.023611    0.023522
min     -0.000254   -0.188838
25%      0.006862   -0.028968
50%      0.015303   -0.013909
75%      0.027167   -0.005616
max      0.164066    0.000365
&-s_max_class
0.00    802
0.02    197
Name: count, dtype: int64
&-s_min_class
 0.00    792
-0.02    207
Name: count, dtype: int64
