###### imports

In [24]:
import os
import pandas as pd
import numpy as np

from pathlib import Path

import scipy

from sklearn.model_selection import train_test_split

import tensorflow as tf
from keras.callbacks import EarlyStopping
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Masking, Conv1D, Flatten, MaxPooling1D
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras import regularizers

import synthetic_data

## Check Path

In [25]:
DATA_PATH_RISING_WEDGE = Path("data/patterns/rising_wedge")
DATA_PATH_RISING_WEDGE.exists()

data = {filepath.stem: pd.read_csv(filepath) for filepath in DATA_PATH_RISING_WEDGE.glob("*.csv")}

## Create Tensor and Label lists from real data

In [26]:
X_real = []
y_real = []

for key, df in data.items():
    X_real.append(df[['Open', 'High', 'Low', 'Close']].values)
    y_real.append(df.loc[0, ['Start Date', 'End Date', 'Pattern']].values)

## Create Synthetic Data with pattern

In [27]:
amount = int(len(X_real) * 0.40)
amount_25 = int(len(y_real) * 0.25)

X_synthetic, y_synthetic = synthetic_data.gen_x_y(l=amount, pattern="rising_wedge", noise=True, general=False)

## Take Real data with no pattern

In [30]:
DATA_PATH_DOWNTREND = Path("data/patterns/downtrend")
DATA_PATH_DOWNTREND.exists()

downtrend_data = {filepath.stem: pd.read_csv(filepath) for filepath in DATA_PATH_DOWNTREND.glob("*.csv")}

X_no_pattern = []
y_no_pattern = []

for key, df in downtrend_data.items():
    X_no_pattern.append(df[['Open', 'High', 'Low', 'Close']].values)
    y_no_pattern.append(df.loc[0, ['Start Date', 'End Date', 'Pattern']].values)

X_no_pattern = X_no_pattern[:amount_25]
y_no_pattern = y_no_pattern[:amount_25]

In [31]:
total = find_max_length(X_real, X_synthetic, X_no_pattern)
total

NameError: name 'find_max_length' is not defined

## Join lists

In [32]:
X_all = X_real + X_synthetic + X_no_pattern
y_all = y_real + y_synthetic + y_no_pattern

## Padding

In [33]:
def find_longest_array(array):
    largest = 0
    for a in array:
        if len(a) > largest:
            largest = len(a)
    return largest

test = find_longest_array(X_all)

In [34]:
X_pad = pad_sequences(X_all, dtype='float32', padding='post', value=-100)

In [36]:
type(X_pad)

numpy.ndarray

### Test padding

In [None]:
X_pad.shape

## Train Test Split

In [46]:
y_all = np.array(y_all)

X_train, X_test, y_train, y_test = train_test_split(X_pad, y_all, test_size=0.30)

display(X_train.shape)
display(X_test.shape)
display(y_train.shape)
display(y_test.shape)

(797, 502, 4)

(342, 502, 4)

(797, 3)

(342, 3)

## Modelling

In [56]:
input_shape = X_train.shape[1:]

def initialize_model_CNN():
    model = Sequential()

    model.add(Masking(mask_value=-1, input_shape=input_shape))
    model.add(Conv1D(32, activation='relu', kernel_size=3, kernel_regularizer=regularizers.L1L2(l1=1e-3, l2=1e-3))),
    model.add(MaxPooling1D(pool_size=2))
    model.add(Conv1D(32, activation='relu', kernel_size=3))
    
    model.add(Flatten())
    model.add(Dense(units=200, activation='relu'))
    model.add(Dense(units=100, activation='relu'))
    model.add(Dense(units=16, activation='relu'))
    model.add(Dense(units=16, activation='relu'))
    model.add(Dense(units=3, activation='linear'))

    model.compile(optimizer='adam', loss='mse', metrics=['mae'])
    
    return model

In [63]:
model.summary()

Model: "sequential_5"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 masking_5 (Masking)         (None, 502, 4)            0         
                                                                 
 conv1d_10 (Conv1D)          (None, 500, 32)           416       
                                                                 
 max_pooling1d_5 (MaxPoolin  (None, 250, 32)           0         
 g1D)                                                            
                                                                 
 conv1d_11 (Conv1D)          (None, 248, 32)           3104      
                                                                 
 flatten_5 (Flatten)         (None, 7936)              0         
                                                                 
 dense_15 (Dense)            (None, 200)               1587400   
                                                      

In [57]:
display(type(X_train[0]))
display(type(y_train[0]))

tensorflow.python.framework.ops.EagerTensor

tensorflow.python.framework.ops.EagerTensor

In [58]:
y_train.dtype

tf.int16

In [59]:
X_train = tf.convert_to_tensor(X_train, np.float32)
y_train = tf.convert_to_tensor(y_train, np.int16)

display(type(X_train))
display(type(y_train))

tensorflow.python.framework.ops.EagerTensor

tensorflow.python.framework.ops.EagerTensor

In [60]:
es = EarlyStopping(patience = 5, restore_best_weights=True)

model = initialize_model_CNN()

model.fit(
    X_train,
    y_train,
    validation_split = 0.2,
    shuffle = True,
    batch_size=32,
    epochs = 50,
    callbacks = [es],
    verbose = 1
)

Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50


<keras.src.callbacks.History at 0x7f3a384a67a0>

In [62]:
X_test = tf.convert_to_tensor(X_test, np.float32)
y_test = tf.convert_to_tensor(y_test, np.int16)
res = model.evaluate(X_test, y_test)

