In [1]:
from import_libraries.project_functions import *

warnings.filterwarnings("ignore")

# data from yfinance

In [2]:
def get_historical_data(timeframe=mt5.TIMEFRAME_D1, symbol="GER30", count = 30_000):
    """
    Получает исторические данные из MetaTrader 5.

    Аргументы:
    timeframe (int): Временной интервал для исторических данных.
    symbol (str): Символ, для которого необходимо получить исторические данные.
    start (str): Начальная дата для получения исторических данных.

    Возвращает:
    pd.DataFrame: DataFrame, содержащий полученные исторические данные.
    """
    if not connect_to_mt5():
        return None

    
    rates = mt5.copy_rates_from_pos(symbol, timeframe, 0, count)

    # Проверяем успешность получения данных
    if rates is not None:
        # Конвертируем данные в pandas DataFrame
        rates_frame = pd.DataFrame(rates)
        rates_frame['time'] = pd.to_datetime(rates_frame['time'], unit='s')
        rates_frame = rates_frame.rename(columns=lambda x: x.capitalize())
        rates_frame = rates_frame.rename(columns={"Time": "Date"})
        
        rates_frame['Volume'] = rates_frame['High'] - rates_frame['Low']
        rates_frame['MaxPositivePriceChange'] = rates_frame['High'] - rates_frame['Open'] 
        rates_frame['MaxNegativePriceChange'] = rates_frame['Open'] - rates_frame['Low']
        
        rates_frame['PriceChange'] = abs(rates_frame['Close'] - rates_frame['Close'].shift(1))
        
        # Закрываем соединение с MetaTrader 5
        mt5.shutdown()
        return rates_frame
    else:
        print("Ошибка при получении данных.")
        mt5.shutdown()
        return None



In [3]:
data = get_historical_data(count = 70_00) # D1
# data_m5 = get_historical_data(timeframe=mt5.TIMEFRAME_M5, symbol="GER30", start='2022-01-01')
data.shape

Połączono z kontem


(3656, 12)

In [33]:
data

Unnamed: 0,Date,Open,High,Low,Close,Tick_volume,Spread,Real_volume,Volume,MaxPositivePriceChange,MaxNegativePriceChange,PriceChange
0,2023-11-03,15276.9,15336.9,15217.9,15265.9,22804,13,0,119.0,60.0,59.0,


In [5]:
# data.hist(figsize = (10,10));

# data preprocessing

In [5]:
# preprocessor = Preprocessing_stock_data(data)
# all_indicator = preprocessor.all_methods()

# data = preprocessor.support_resistance_line()
# data

In [6]:
# all_indicator.sample(3)

# experiments 



Etap I. Określanie kierunku.
1. Wyznaczam cenę max i min dla różnych przedziałów od 1 do 60 dni wstecz.
2. Określam czy cena w bieżącym dniu przekroczyła wyznaczony poziom min lub max.
3. Określam czy cena w bieżącym dniu powróciła do określonego przedziału między min a max.
4. Wyznaczam bieżący kieru5.ek:
- po przekroczeniu i powrocie do poziomu min ("wsparcie") kierun  to6.BUY;
- po przekroczeniu i powrocie do poziomu max ("opór") kieru ek to7SELL;
5. Po zakończeniu sesji powtarzam punk
8. Sprawdzam w którym kierunku następnego dnia był większy ruch. W górę to cena max - otwarcie. W dół to cena otwarcie  - min.ty 1-4 d

In [7]:
def breakdown_analysis(data):

    data['Buy'] = ( # Сопротивление линия с верху 
        (data["RollingMax"] < data["High"])  & # maslo masliane
        (data["RollingMax"] < data["Close"])
    ).map({True: 1, False: 0})
    

    data['SELL'] = (
        (data["RollingMin"] > data["Low"])  & # maslo masliane
        (data["RollingMin"] > data["Close"])
    ).map({True: 2, False: 0})
    

    data["Signal"] = data['Buy'] + data['SELL']
    data.loc[data["Signal"] == 1, "Signal"] = "buy"
    data.loc[data["Signal"] == 2, "Signal"] = "sell"

    data["Signal"] = data["Signal"].replace(0, np.nan).ffill()
    data = data.drop(["SELL","Buy"], axis=1)

    data.dropna(axis=0, inplace=True)
    
    data.reset_index(drop=True, inplace=True)

    return data

In [8]:
rebound_best_parameters = optimize_parameters(data, rebound_analysis, 
                                              windows_size= [i for i in range(1, 365, 1)],)

print(f"Najlepsze opcje: {rebound_best_parameters}")

  0%|          | 0/364 [00:00<?, ?it/s]

Najlepsze opcje: {'window_size': 4, 'bias': 1}


In [9]:
current_data = define_level(data, 4, 1)
rebound_data = rebound_analysis(current_data)
current_score = calculate_score(rebound_data)
current_score

50.0

In [10]:
breakdown_best_parameters = optimize_parameters(data, breakdown_analysis, 
                                                windows_size= [i for i in range(1, 365, 1)],)

print(f"Najlepsze opcje: {breakdown_best_parameters}")


  0%|          | 0/364 [00:00<?, ?it/s]

Najlepsze opcje: {'window_size': 4, 'bias': 1}


In [11]:
current_data = define_level(data, 4, 5)
rebound_data = breakdown_analysis(current_data)
current_score = calculate_score(rebound_data)
current_score

45.652173913043484

In [12]:
# Tworzenie kopii danych, aby nie modyfikować oryginalnych danych
df = rebound_data.copy()

# Obliczanie Breakout_from_Resistance - oznacza przełamanie od oporu w górę
# Przy użyciu warunków, sprawdzamy, czy RollingMax jest mniejszy od High
# i jednocześnie większy od Close, co wskazuje na przełamanie od oporu w górę.
# Następnie konwertujemy wynik na wartość całkowitą (0 lub 1).
df['Breakout_from_Resistance'] = (
    (df["RollingMax"] < df["High"]) & 
    (df["RollingMax"] > df["Close"])
).astype(int)

# Obliczanie Breakout_from_Support - oznacza przełamanie od wsparcia w górę
# Podobnie jak wyżej, sprawdzamy warunki, aby znaleźć przełamanie od wsparcia w górę.
df['Breakout_from_Support'] = (
    (df["RollingMin"] > df["Low"]) & 
    (df["RollingMin"] < df["Close"])
).astype(int)

# Obliczanie Breakdown_from_Resistance - oznacza przełamanie od oporu w dół
# Tutaj sprawdzamy, czy RollingMax jest mniejszy od High i jednocześnie 
# mniejszy od Close, co wskazuje na przełamanie od oporu w dół.
df['Breakdown_from_Resistance'] = (
    (df["RollingMax"] < df["High"]) & 
    (df["RollingMax"] < df["Close"])
).astype(int)

# Obliczanie Breakdown_from_Support - oznacza przełamanie od wsparcia w dół
# Podobnie jak wyżej, sprawdzamy warunki, aby znaleźć przełamanie od wsparcia w dół.
df['Breakdown_from_Support'] = (
    (df["RollingMin"] > df["Low"]) & 
    (df["RollingMin"] > df["Close"])
).astype(int)

# Wyświetlanie sumy wartości
print("Breakout_from_Resistance-> ", df['Breakout_from_Resistance'].sum())
print("Breakout_from_Support-> ", df['Breakout_from_Support'].sum())
print("Breakdown_from_Resistance-> ", df['Breakdown_from_Resistance'].sum())
print("Breakdown_from_Support-> ", df['Breakdown_from_Support'].sum())

Breakout_from_Resistance->  72
Breakout_from_Support->  56
Breakdown_from_Resistance->  229
Breakdown_from_Support->  134


In [13]:
df[["Breakout_from_Resistance","Breakout_from_Support","Breakdown_from_Resistance","Breakdown_from_Support"]].describe()  

Unnamed: 0,Breakout_from_Resistance,Breakout_from_Support,Breakdown_from_Resistance,Breakdown_from_Support
count,636.0,636.0,636.0,636.0
mean,0.113208,0.08805,0.360063,0.210692
std,0.317096,0.283591,0.480396,0.408121
min,0.0,0.0,0.0,0.0
25%,0.0,0.0,0.0,0.0
50%,0.0,0.0,0.0,0.0
75%,0.0,0.0,1.0,0.0
max,1.0,1.0,1.0,1.0


# AI

In [14]:
def create_dataset(dataset, time_step=25):
    dataX, dataY = [], []
    dataset_without_close = dataset.drop('Close', axis=1)

    for i in range(len(dataset) - time_step - 1):
        if (i + time_step) < len(dataset):
            a = dataset_without_close.iloc[i:(i + time_step), :]
            dataX.append(a.values)
            dataY.append(dataset.iloc[i + time_step]['Close'])  # Добавлено изменение в индексации здесь
    return np.array(dataX), np.array(dataY)

In [15]:
def create_dataset(dataset, time_step):
    dataX, dataY = [], []
    for i in range(len(dataset)-time_step-1):
        a = dataset[i:(i+time_step), 0]
        dataX.append(a)
        dataY.append(dataset[i + time_step, 0])
    return np.array(dataX), np.array(dataY)