In [None]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from keras.models import Sequential
from keras.layers import Dense
from keras.layers import LSTM
from keras.layers import Dropout
import seaborn as sns

# Convert numbers from scientific notation to normal format
np.set_printoptions(suppress=True)


In [None]:
# Inputs
file_name = 'datasets/apple'
test_size = 0.1
seri = 40
units = 64
dropout = 0.1
epochs = 40
batch_size = 32
validation_split = 0.1


In [None]:
# Read the data
df = pd.read_csv(f'{file_name}.csv')

# Take only the 'open' column
df_open = df[['open']].copy()
# df_open.to_csv(f'{file_name}rates.csv', index=False)

# Check for missing data
print(df_open.isnull().sum())

# Previous open value
df_open['prev_open'] = df_open['open'].shift(1)

# Rate of change: (current - previous) / previous * 100
df_open['rate'] = ((df_open['open'] - df_open['prev_open']) / df_open['prev_open']) * 100

# Remove prev_open
df_result = df_open[['open', 'rate']]

df_result = df_result.iloc[1:]
# df_result.to_csv(f'{file_name}rates.csv', index=False)

print(df_result)
# Find the highest rate value and the corresponding open value
max_rate_row = df_result.loc[df_result['rate'].idxmax()]

print("Maximum rate:")
print(max_rate_row)


In [None]:
# df = pd.read_csv(f'{file_name}rates.csv')
df = df_result.copy()

n = int(len(df) * test_size)
train_df = df.iloc[:-n]
test_df = df.iloc[-n:]

print(f'Train set: {len(train_df)}')
print(f'Test set: {len(test_df)}')

# Add dummy data to make -25 equal -1 and 25 equal 1 for MinMaxScaler
extra_rows = pd.DataFrame({
    'open': [0, 0],
    'rate': [-25, 25]
})

# Merge with real data
train_df = pd.concat([extra_rows, train_df], ignore_index=True)

# Scale between -1 and 1 using MinMaxScaler
scaler = MinMaxScaler(feature_range=(-1, 1))

train_df['scaled_rate'] = scaler.fit_transform(train_df[['rate']])
test_df['scaled_rate'] = scaler.transform(test_df[['rate']])

# Remove first two rows (return to real data)
train_df = train_df.iloc[2:].reset_index(drop=True)
# train_df.to_csv(f'{file_name}rates.csv', index=False)

# Result
print(train_df[['open', 'rate', 'scaled_rate']])

max_rate_row = train_df.loc[train_df['scaled_rate'].idxmax()]

print("Maximum rate:")
print(max_rate_row)


In [None]:
train_data = train_df[['scaled_rate']].values
test_data = test_df[['scaled_rate']].values
print(train_data)



In [None]:
# for training we take serial data as input (from index 0 to serial-1) and take serial+1. data as output (index serial)
X_train = []
y_train = []
for i in range(seri, len(train_data)):
    X_train.append(train_data[i-seri:i, 0])
    y_train.append(train_data[i, 0])
X_train, y_train = np.array(X_train), np.array(y_train)

X_train = np.reshape(X_train, (X_train.shape[0], X_train.shape[1], 1))
print(X_train, y_train)


In [None]:

model = Sequential()

model.add(LSTM(units=units, return_sequences=True, input_shape=(seri, 1)))
model.add(Dropout(dropout))

model.add(LSTM(units=units, return_sequences=True))  
model.add(Dropout(dropout))

model.add(LSTM(units=units, return_sequences=False))  
model.add(Dropout(dropout))

model.add(Dense(units=1))

model.compile(optimizer='adam', loss='mean_squared_error')
model.fit(X_train, y_train, epochs=epochs, batch_size=batch_size, validation_split=validation_split)


In [None]:
X_test = []
for i in range(seri, len(test_data)):
    X_test.append(test_data[i-seri:i, 0])
X_test = np.array(X_test)
X_test = np.reshape(X_test, (X_test.shape[0], X_test.shape[1], 1))
model.summary()



In [None]:
predicted_stock_price = model.predict(X_test)

true_positive = 0  # pd
true_negative = 0  # nd
false_positive = 0  # ny
false_negative = 0  # py

total_positive = 0

for i in range(len(predicted_stock_price)):
    predicted = predicted_stock_price[i]
    actual = test_data[i + seri]

    if actual > 0:
        total_positive += 1

    if predicted > 0 and actual > 0:
        true_positive += 1
    elif predicted < 0 and actual < 0:
        true_negative += 1
    elif predicted > 0 and actual < 0:
        false_positive += 1
    elif predicted < 0 and actual > 0:
        false_negative += 1

total_correct = true_positive + true_negative
total_wrong = false_positive + false_negative
accuracy = total_correct / (total_correct + total_wrong) * 100




In [None]:
cm = np.array([[true_negative, false_positive],
               [false_negative, true_positive]])

labels = ["True Negative", "True Positive"]
columns = ["Prediction Negative", "Prediction Positive"]

plt.figure(figsize=(6, 4))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
            xticklabels=columns, yticklabels=labels)
plt.title("Confusion Matrix")
plt.ylabel("real")
plt.xlabel("predicted")
plt.tight_layout()
plt.show()

In [None]:
# Simulation
initial_money = 1000
money = initial_money
stock_quantity = 0
has_stock = False

first_value = test_df.iloc[0]["open"]
last_value = test_df.tail(1)["open"].values[0]
stock_increase = ((last_value - first_value) / first_value) * 100

actual_prices = []
predicted_prices = []
time = []
k = 0

for i in range(seri, len(test_df) - 1):
    # Input data
    past_data = test_df['scaled_rate'].values[i - seri:i].reshape(-1, 1)
    model_input = past_data.reshape(1, seri, 1)

    # Prediction
    predicted_scaled = model.predict(model_input, verbose=0)
    predicted_rate = predicted_scaled[0][0]

    previous_price = test_df['open'].values[i]
    actual_price = test_df['open'].values[i + 1]
    predicted_price = previous_price * (1 + predicted_rate)

    actual_prices.append(actual_price)
    predicted_prices.append(predicted_price)
    time.append(i)

    # Buy-sell simulation
    if predicted_rate > 0 and not has_stock:
        stock_quantity = money / actual_price
        money = 0
        has_stock = True
    elif predicted_rate < 0 and has_stock:
        money = stock_quantity * actual_price
        stock_quantity = 0
        has_stock = False

    print(k)
    print("Available money:", money)
    print("Stock quantity:", stock_quantity)
    print("Stock price:", actual_price)
    print("Value:", stock_quantity * actual_price)
    print("******************\n")
    k += 1

# Sell the last remaining stock
if has_stock:
    final_price = test_df['open'].values[-1]
    money = stock_quantity * final_price

# Results
model_increase = ((money - initial_money) / initial_money) * 100

print(f"\nInitial: {initial_money:.2f}")
print(f"Final amount: {money:.2f}")
print(f"Profit: {money - initial_money:.2f}")
print(f"Stock increase: {stock_increase:.2f}%")
print(f"Model gain: {model_increase:.2f}%")

# Plot (scatter only)
plt.figure(figsize=(12, 6))
plt.scatter(time, actual_prices, label="Actual Price", color='blue', s=10)
plt.scatter(time, predicted_prices, label="Predicted Price", color='orange', s=10)
plt.title("Actual vs Predicted Price (Scatter Plot)")
plt.xlabel("Time (Day)")
plt.ylabel("Price")
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()
