In [None]:
import pandas as pd
from keras.models import Sequential # type: ignore
from keras.layers import LSTM, Dense, Dropout # type: ignore
from keras.callbacks import EarlyStopping # type: ignore
from keras.losses import Huber # type: ignore
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
import matplotlib.pyplot as plt
import numpy as np

In [None]:
# Step 1: Load the dataset
data = pd.read_csv('crypto_yearly_data.csv')

In [None]:
data.columns

In [None]:
data.head()

In [None]:
data.describe()

In [None]:
data.info()

In [None]:
data.isna().sum()

In [None]:
if 'year' in data.columns:
    data = data.drop('year', axis=1)
data.columns

In [None]:
# Data Preparation
def prepare_data(data, window_size=100):
    """
    Prepare data for LSTM model by creating sliding windows of features and labels.
    """
    scaler = MinMaxScaler(feature_range=(0, 1))
    data['close_scaled'] = scaler.fit_transform(data[['close']])
    
    # Create features and labels
    features, labels = [], []
    for i in range(len(data) - window_size):
        features.append(data['close_scaled'].values[i:i + window_size])
        labels.append(data['close_scaled'].values[i + window_size])
    
    return np.array(features), np.array(labels), scaler

In [None]:
def train_lstm_model(features, labels, epochs=20, batch_size=32):
    """
    Define, train, and return an LSTM model.
    """
    features = features.reshape((features.shape[0], features.shape[1], 1))
    train_size = int(len(features) * 0.8)
    X_train, X_test = features[:train_size], features[train_size:]
    y_train, y_test = labels[:train_size], labels[train_size:]

    # Define LSTM model
    model = Sequential([
        LSTM(128, return_sequences=True, input_shape=(X_train.shape[1], 1)),
        Dropout(0.2),
        LSTM(64, return_sequences=False),
        Dropout(0.2),
        Dense(50, activation='relu'),
        Dense(1)
    ])
    
    # Use Huber Loss
    huber_loss = Huber(delta=1.0)
    model.compile(optimizer='adam', loss=huber_loss)

    # Early stopping to avoid overfitting
    early_stopping = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)
    
    print("Training the LSTM model...")
    model.fit(X_train, y_train, epochs=epochs, batch_size=batch_size, 
              validation_split=0.1, verbose=1, callbacks=[early_stopping])
    
    return model, X_test, y_test


In [None]:
# Model Evaluation and Visualization
def evaluate_and_visualize(model, X_test, y_test, scaler):
    """
    Generate predictions, evaluate metrics, and visualize results.
    """
    predictions = model.predict(X_test)
    predictions = scaler.inverse_transform(predictions)
    y_test = scaler.inverse_transform(y_test.reshape(-1, 1))

    mse = mean_squared_error(y_test, predictions)
    mae = mean_absolute_error(y_test, predictions)
    rmse = np.sqrt(mse)
    r2 = r2_score(y_test, predictions)

    print(f"Mean Absolute Error (MAE): {mae}")
    print(f"Mean Squared Error (MSE): {mse}")
    print(f"Root Mean Squared Error (RMSE): {rmse}")
    print(f"R² Score: {r2}")
    
    # Visualization
    plt.figure(figsize=(12, 6))
    plt.plot(y_test, label='True Values')
    plt.plot(predictions, label='Predictions')
    plt.legend()
    plt.title("True vs Predicted Prices")
    plt.show()


In [None]:
# Save the Model
def save_model(model, file_path="crypto_model.keras"):
    """
    Save the trained model to a file.
    """
    model.save(file_path)
    print(f"Model saved to {file_path}.")

In [None]:
if not data.empty:
    # Prepare data
    features, labels, scaler = prepare_data(data, window_size=100)
    
    # Train the LSTM model
    model, X_test, y_test = train_lstm_model(features, labels, epochs=20, batch_size=32)
    
    # Save the model
    save_model(model, file_path="crypto_model.keras")
    
    # Evaluate and visualize results
    evaluate_and_visualize(model, X_test, y_test, scaler)
else:
    print("Data not found or empty. Exiting...")