Imports e funzione per prendere i dati degli stock

In [None]:
# Import delle librerie necessarie
import yfinance as yf
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

# Lista dei ticker che vogliamo scaricare
tickers = ["AAPL", "MSFT", "GOOGL", "TSLA", "AMZN"]

# Funzione per scaricare i dati di ciascun ticker e verificare la data di inizio
def get_stock_data(ticker, start_date='2010-01-01', end_date=None):
    if end_date is None:
        end_date = pd.to_datetime("today").strftime('%Y-%m-%d')

    print(f"\nScaricando dati per {ticker} dal {start_date} al {end_date}...")
    data = yf.download(ticker, start=start_date, end=end_date)

    if data.empty:
        print(f"⚠️ Nessun dato disponibile per {ticker} nel periodo richiesto.")
    else:
        first_date = data.index.min().strftime('%Y-%m-%d')
        print(f"📈 Dati disponibili dal {first_date}. Numero di record: {len(data)}")

    # Aggiungiamo il ticker come colonna
    data['Ticker'] = ticker

    data.columns = ['Open', 'High', 'Low', 'Close', 'Volume', 'Ticker']

    return data

Prendiamo i dati per Google nel 2015 e analizziamoli 

In [None]:
# Trova il primo mese disponibile
first_month = ('2015')

data = get_stock_data('GOOGL')

# Filtra il dataset per il primo mese
data_first_month = data.loc[first_month]
data_first_month.describe()

Plottiamo il volume

In [None]:
data_first_month['Volume'].plot()

Applichiamo la rolling window a tutti i campi e analizziamo le caratteristiche

In [None]:
# Riduzione del rumore tramite media mobile
for col in ["Open", "High", "Low", "Close", "Volume"]:
        data_first_month[col] = data_first_month[col].rolling(window=10, min_periods=1).mean()

data_first_month.describe()

Plottiamo il nuovo volume

In [None]:
data_first_month['Volume'].plot()

Prendiamo i dati per tutti gli stock e regoliamo il volume

In [None]:

dataset_list = []
for ticker in tickers:
    data = get_stock_data(ticker)
    data['Volume'] = data['Volume'].rolling(window=10, min_periods=1).mean()
    dataset_list.append(data)

for i in range(len(dataset_list)):
    dataset_list[i] = dataset_list[i].reset_index()  # Resetta l’indice
    dataset_list[i]['Date'] = pd.to_datetime(dataset_list[i]['Date'])  # Assicura che Date sia un datetime
    dataset_list[i] = dataset_list[i].set_index('Date')  # Imposta Date come indice

all_data = pd.concat(dataset_list, axis=0, join='outer')  # Combina tutti i dataset
all_data = all_data.sort_index()  # Ordina per data

all_data.dropna(inplace=True)

all_data

Controlliamo che non ci siano valori nulli

In [None]:
# Verifica se ci sono NaN nell'intero dataset
has_nan = all_data.isna().any().any()

if has_nan:
    print("Ci sono valori NaN nel dataset.")
else:
    print("Non ci sono valori NaN nel dataset.")


Normalizziamo

In [None]:
from sklearn.preprocessing import MinMaxScaler

# Seleziona solo le colonne numeriche da normalizzare
columns_to_normalize = ['Open', 'High', 'Low', 'Close', 'Volume']

# Applica la normalizzazione separatamente per ogni ticker
normalized_data_list = []
for ticker in all_data['Ticker'].unique():
    subset = all_data[all_data['Ticker'] == ticker].copy()
    scaler = MinMaxScaler()
    subset[columns_to_normalize] = scaler.fit_transform(subset[columns_to_normalize])
    normalized_data_list.append(subset)

# Unisci di nuovo i dati normalizzati
all_data_normalized = pd.concat(normalized_data_list)

# Mostra i primi valori normalizzati
all_data_normalized


Divisione in finestre

In [None]:
import numpy as np

# Parametri
window_size = 60  # Dimensione della finestra temporale
forecast_horizon = 7  # Giorni futuri da prevedere
feature_columns = ['Open', 'High', 'Low', 'Close', 'Volume']  # Colonne di interesse

# Funzione per creare finestre temporali con multi-step forecasting
def create_time_windows_per_ticker(data, window_size, forecast_horizon):
    X, y = [], []

    tickers = data['Ticker'].unique()  # Trova i ticker unici
    for ticker in tickers:
        ticker_data = data[data['Ticker'] == ticker].reset_index(drop=True)

        for i in range(window_size, len(ticker_data) - forecast_horizon):
            # Finestra temporale di input
            X.append(ticker_data[feature_columns].iloc[i-window_size:i].values)
            # Prevedi i prossimi 'forecast_horizon' giorni di prezzo di chiusura
            future_returns = ticker_data['Close'].iloc[i:i+forecast_horizon].values
            y.append(future_returns)

    X = np.array(X)
    y = np.array(y)
    return X, y

# Creazione delle finestre temporali
X, y = create_time_windows_per_ticker(all_data_normalized, window_size, forecast_horizon)

# Verifica delle dimensioni degli array risultanti
print(f"Dimensione di X: {X.shape}")  # (n_finestre, 60, n_features)
print(f"Dimensione di y: {y.shape}")  # (n_finestre, 30)

# Visualizzazione di un esempio di finestra temporale
print("Esempio di X:", X[0])
print("Esempio di y:", y[0])



Divisione in training e test set

In [None]:
from sklearn.model_selection import train_test_split

# Suddividere i dati in training e test set
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=False)


Istanziamo il modello

In [None]:
from keras.models import Sequential
from keras.layers import LSTM, Dense, Dropout
from keras.optimizers import Adam
# Crea il modello LSTM
model = Sequential()
model.add(LSTM(units=512, return_sequences=False, input_shape=(X_train.shape[1], X_train.shape[2])))
model.add(Dropout(0.2))  # Dropout per evitare overfitting
model.add(Dense(forecast_horizon))  # Layer finale per la previsione del valore 'Close'

# Compilare il modello
model.compile(optimizer=Adam(learning_rate=0.001), loss='mean_squared_error')

# Riassunto del modello
model.summary()


Addestramento del modello

In [None]:
# Allenare il modello
history = model.fit(X_train, y_train, epochs=20, batch_size=32, validation_data=(X_test, y_test))


Prevision

In [None]:
# Fare previsioni sui dati di test
predictions = model.predict(X_test)

# Visualizzare i risultati
plt.figure(figsize=(10,6))
plt.plot(y_test[3700], label='True Values')
plt.plot(predictions[3700], label='Predicted Values')
plt.legend()
plt.show()

print(f"Shape of y_test: {y_test.shape}")
print(f"Shape of predictions: {predictions.shape}")


Average comparison on whole dataset

In [None]:
# Calcola la media lungo tutte le finestre temporali
y_test_mean = np.mean(y_test, axis=0)
predictions_mean = np.mean(predictions, axis=0)

plt.figure(figsize=(12, 6))
plt.plot(y_test_mean, label='True Values (Mean)', color='blue')
plt.plot(predictions_mean, label='Predicted Values (Mean)', color='red', linestyle='dashed')
plt.xlabel('Days in the Future')
plt.ylabel('Close Price')
plt.title('Average Prediction vs True Values')
plt.legend()
plt.grid(True)
plt.show()

Efficiency analysis

In [None]:
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score

# Calcola MSE, MAE e R²
mse = mean_squared_error(y_test.flatten(), predictions.flatten())
mae = mean_absolute_error(y_test.flatten(), predictions.flatten())
r2 = r2_score(y_test.flatten(), predictions.flatten())

# Mostra i risultati
print("=== Valori di Controllo Qualità ===")
print(f"Mean Squared Error (MSE): {mse:.4f}")
print(f"Mean Absolute Error (MAE): {mae:.4f}")
print(f"R² Score: {r2:.4f}")

# Visualizzazione delle differenze tra i valori reali e le previsioni
plt.figure(figsize=(12, 6))
plt.plot(y_test.flatten(), label="True Values", alpha=0.6)
plt.plot(predictions.flatten(), label="Predictions", alpha=0.6)
plt.xlabel('Samples')
plt.ylabel('Close Price')
plt.title('True Values vs Predictions')
plt.legend()
plt.grid(True)
plt.show()


Function to use the model to predict a stock

In [None]:
def predictaa(name, a):
    newdata = get_stock_data(name)
    newdata['Volume'] = newdata['Volume'].rolling(window=10, min_periods=1).mean()

    columns_to_normalize = ['Open', 'High', 'Low', 'Close', 'Volume']

    scaler = MinMaxScaler()
    newdata[columns_to_normalize] = scaler.fit_transform(newdata[columns_to_normalize])

    X, y = create_time_windows_per_ticker(newdata, window_size, 7)
    predictions = model.predict(X)

    mse = mean_squared_error(y.flatten(), predictions.flatten())
    mae = mean_absolute_error(y.flatten(), predictions.flatten())
    r2 = r2_score(y.flatten(), predictions.flatten())

    # Mostra i risultati
    print("=== Valori di Controllo Qualità ===")
    print(f"Mean Squared Error (MSE): {mse:.4f}")
    print(f"Mean Absolute Error (MAE): {mae:.4f}")
    print(f"R² Score: {r2:.4f}")

    # Calcoliamo l'errore percentuale
    percentage_change = np.abs((y.flatten() - predictions.flatten()) / y.flatten()) * 100

    # Visualizziamo qualche statistica utile
    mean_percentage_change = np.mean(percentage_change)
    max_percentage_change = np.max(percentage_change)
    min_percentage_change = np.min(percentage_change)

    print(f"Mean Percentage Change: {mean_percentage_change:.2f}%")
    print(f"Max Percentage Change: {max_percentage_change:.2f}%")
    print(f"Min Percentage Change: {min_percentage_change:.2f}%")


    plt.figure(figsize=(12, 6))
    plt.plot(y[a], label='Valori reali')
    plt.plot(predictions[a], label='Predizioni')
    plt.legend()
    plt.title('Confronto tra valori reali e predizioni per il nuovo stock')
    plt.show()

    plt.figure(figsize=(10, 6))
    plt.plot(percentage_change, label='Percentage Change (%)', color='orange')
    plt.axhline(y=mean_percentage_change, color='red', linestyle='--', label='Mean % Change')
    plt.xlabel('Prediction Index')
    plt.ylabel('Percentage Change (%)')
    plt.legend()
    plt.title('Percentage Change between True and Predicted Values')
    plt.show()



Netflix Prediction

In [None]:
predictaa('NFLX', 150)