In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
import tensorflow as tf
from tensorflow.keras.layers import Conv1D, BatchNormalization, ReLU, Add, Dense, GlobalAveragePooling1D, Input
from tensorflow.keras.models import Model
import yfinance as yf

def fetch_data_from_yfinance(ticker, period):
    data = yf.download(ticker, period=period, interval='1d')
    if data.empty:
        raise ValueError("Nie udało się pobrać danych z yfinance.")
    data = data[['Close']].rename(columns={'Close': 'price'})
    return data

ticker = "BTC-USD"
period = "5y"
data = fetch_data_from_yfinance(ticker, period)

data['price'].fillna(method='ffill', inplace=True)

prices = data['price']
scaler = MinMaxScaler(feature_range=(0, 1))
prices_scaled = scaler.fit_transform(prices.values.reshape(-1, 1))

def build_dataset(dataset, time_step):
    X, y = [], []
    for i in range(len(dataset) - time_step - 1):
        X.append(dataset[i:(i + time_step), 0])
        y.append(dataset[i + time_step, 0])
    return np.array(X).reshape(-1, time_step, 1), np.array(y)

time_step = 90
train_size = int(len(prices_scaled) * 0.7)
train_data, test_data = prices_scaled[:train_size], prices_scaled[train_size:]
X_train, y_train = build_dataset(train_data, time_step)
X_test, y_test = build_dataset(test_data, time_step)

def resnet_block(input_layer, filters, kernel_size=3, stride=1):
    x = Conv1D(filters, kernel_size, strides=stride, padding="same")(input_layer)
    x = BatchNormalization()(x)
    x = ReLU()(x)
    x = Conv1D(filters, kernel_size, strides=1, padding="same")(x)
    x = BatchNormalization()(x)
    shortcut = Conv1D(filters, kernel_size=1, strides=stride, padding="same")(input_layer)
    x = Add()([x, shortcut])
    x = ReLU()(x)
    return x

def build_resnet_model(input_shape):
    inputs = Input(shape=input_shape)
    x = Conv1D(64, kernel_size=3, padding="same", activation="relu")(inputs)
    x = BatchNormalization()(x)
    for _ in range(3):  # 3 bloki ResNet
        x = resnet_block(x, filters=64)
    x = GlobalAveragePooling1D()(x)
    outputs = Dense(1, activation="linear")(x)
    model = Model(inputs, outputs)
    return model

input_shape = (time_step, 1)
model = build_resnet_model(input_shape)
model.compile(optimizer='adam', loss='mean_squared_error')

early_stopping = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)

start_time = time.time()
history = model.fit(
    X_train, y_train,
    validation_data=(X_test, y_test),
    epochs=100,
    batch_size=64,
    verbose=1,
    callbacks=[early_stopping]
)
training_time = time.time() - start_time

plt.figure(figsize=(10, 5))
plt.plot(history.history['loss'], label='Training Loss', color='blue')
plt.plot(history.history['val_loss'], label='Validation Loss', color='orange')
plt.title('Loss Function')
plt.xlabel('Epoch')
plt.ylabel('Mean Squared Error Loss')
plt.legend()
plt.grid()
plt.show()

start_time = time.time()
train_predict = model.predict(X_train)
test_predict = model.predict(X_test)
inference_time = time.time() - start_time
train_predict = scaler.inverse_transform(train_predict)
test_predict = scaler.inverse_transform(test_predict)


print('\n Metryki czasowe:')
print(f'Czas treningu: {training_time:.2f} sekund')
print(f'Czas inferencji: {inference_time:.2f} sekund')


plt.figure(figsize=(12, 6))
plt.plot(prices.index, prices.values, label='Actual Data', color='blue')
plt.plot(prices.index[-len(test_predict):], test_predict.flatten(), label='Test Predictions', color='orange', linestyle='dashed')
plt.title('Historical Data and Test Predictions')
plt.xlabel('Date')
plt.ylabel('Price')
plt.legend()
plt.grid()
plt.show()

plt.figure(figsize=(12, 6))
plt.plot(prices.index[:len(train_predict)], train_predict, label='Train Predictions', color='green', linestyle='dashed')
plt.plot(prices.index[-len(test_predict):], test_predict, label='Test Predictions', color='orange', linestyle='dashed')
plt.plot(prices.index, prices.values, label='Historical Data', color='blue')
plt.title('Model Performance')
plt.xlabel('Date')
plt.ylabel('Price')
plt.legend()
plt.grid()
plt.show()

y_train_actual = scaler.inverse_transform(y_train.reshape(-1, 1))
y_test_actual = scaler.inverse_transform(y_test.reshape(-1, 1))

mae_train = mean_absolute_error(y_train_actual, train_predict)
mae_test = mean_absolute_error(y_test_actual, test_predict)

mse_train = mean_squared_error(y_train_actual, train_predict)
mse_test = mean_squared_error(y_test_actual, test_predict)

rmse_train = np.sqrt(mse_train)
rmse_test = np.sqrt(mse_test)

r2_train = r2_score(y_train_actual, train_predict)
r2_test = r2_score(y_test_actual, test_predict)

print("Metryki skuteczności modelu CNN-ResNet:")
print(f"MAE (Train): {mae_train:.4f}, MAE (Test): {mae_test:.4f}")
print(f"MSE (Train): {mse_train:.4f}, MSE (Test): {mse_test:.4f}")
print(f"RMSE (Train): {rmse_train:.4f}, RMSE (Test): {rmse_test:.4f}")
print(f"R² (Train): {r2_train:.4f}, R² (Test): {r2_test:.4f}")

num_prediction_days = 7
last_sequence = X_test[-1]
future_predictions = []

for _ in range(num_prediction_days):
    last_sequence_new = last_sequence.reshape((1, time_step, 1))
    next_prediction = model.predict(last_sequence_new, verbose=0)
    future_predictions.append(next_prediction[0, 0])
    last_sequence = np.roll(last_sequence, -1)
    last_sequence[-1] = next_prediction

future_predictions = scaler.inverse_transform(np.array(future_predictions).reshape(-1, 1))
future_dates = pd.date_range(start=prices.index[-1] + pd.Timedelta(days=1), periods=num_prediction_days, freq='D')

plt.figure(figsize=(12, 6))
plt.plot(prices.index, prices.values, label='Actual Data', color='blue')
plt.axvline(x=prices.index[-1], color='black', linestyle='--', label='Prediction Start')
plt.plot(future_dates, future_predictions.flatten(), label='Future Predictions', color='red', linestyle='dotted')
plt.title('Historical Data and Future Predictions (CNN-ResNet)')
plt.xlabel('Date')
plt.ylabel('Price')
plt.legend()
plt.grid()
plt.show()
