<a href="https://colab.research.google.com/github/Leokik/Thesis_/blob/main/Portfolio_Optimization.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [6]:
import datetime
import pandas as pd
import yfinance as yf
import numpy as np
from scipy.optimize import minimize
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.keras import layers, models
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
from keras.callbacks import EarlyStopping
import random

In [7]:
tipo_di_ritorno = 1
period_T = [30]
period_P = [20]
gamma_list = [0.1, 0.5, 1, 1.5, 2, 2.5, 3]

In [8]:
# Modello di ottimizzazione dei pesi, quindi sono pesi ottimi che ho considerando il primo periodo di Amp_Previsione
def optimize_weights(n_titoli, gamma, meanLogRet, Sigma):  #qui prendo la media non una amtrice di rendimenti , ma un vettore di medie
    def objective_function(w):
        R = np.dot(meanLogRet,w)

        V = np.sqrt(np.dot(w.T, np.dot(Sigma, w)))

        return -1 * (R / V)

    def check_sum_to_one(w):
        return np.sum(w) - 1

    constraints = ({'type': 'eq', 'fun': check_sum_to_one})
    initial_weights = np.ones(n_titoli) / n_titoli
    bounds = [(0, 1) for _ in range(n_titoli)]

    optimal_weights = minimize(objective_function, initial_weights, method='SLSQP', bounds=bounds, constraints=constraints)
    return optimal_weights.x


In [9]:
#Queste mi servono per lo SR

# Calcolo del rendimento atteso
def calculate_expected_return(P, mu, x):
    return 1 / P * (np.sum(np.dot(mu, x))) #mu è una matrice di rendimaneti quindi la media la fa direttamente qui dnetro altrimenti mu sarebbe stato un vettore, non avrei messo /P

# Calcolo della deviazione standard
def calculate_standard_deviation(P, mu, x, Ex_ret):
    return np.sqrt((1 / (P - 1)) * np.sum((np.dot(mu, x) - Ex_ret) ** 2))

In [10]:
#Creo un modello di ottimizzazione che ha come f.o. la nn
# Modello di ottimizzazione dei pesi, con NN
def opt_weights_nn(n_titoli,X):

    def neural_network(x):

        mean_NN = X.mean()
        mean_NN = np.array(mean_NN)
        conc_NN = np.concatenate([mean_NN, x])
        #conc_NN = x #se uso solo x

        input = conc_NN
        input_norm = scaler.transform(input.reshape(1, -1))

        # Pesi e bias del primo livello
        W1, b1 = model.layers[1].get_weights()
        # Pesi e bias del secondo livello
        W2, b2 = model.layers[2].get_weights()

        # Calcola l'output del primo livello senza applicare la funzione di attivazione
        output_layer1 = tf.matmul(input_norm, W1) + b1
        # Calcola l'output del secondo livello

        # Calcola l'output del secondo livello
        output_layer2 = tf.matmul(tf.tanh(output_layer1), W2) + b2

        # Scala inversamente l'output
        SR =  (scaler_SR.inverse_transform(output_layer2.numpy()))
        return  SR * (-1)


    def check_sum_to_one(x):
        return np.sum(x) - 1

    constraints = ({'type': 'eq', 'fun': check_sum_to_one})
    initial_weights = np.ones(n_titoli) / n_titoli
    bounds = [(0, 1) for _ in range(n_titoli)]

    optimal_weights = minimize(neural_network, initial_weights, method='SLSQP', bounds=bounds, constraints=constraints)
    return optimal_weights


In [11]:
def generate_random_end_date():
    start_date = datetime.datetime(2015, 1, 1)
    end_date = datetime.datetime(2023, 1, 1)
    random_days = random.randint(0, (end_date - start_date).days)
    random_end_date = start_date + datetime.timedelta(days=random_days)
    return random_end_date


In [None]:
results_df = pd.DataFrame(columns=['T', 'P', 'gamma', 'end_date', 'Best_clo', 'Best_clo_val', 'best_eff', 'best_eff_val'])

# Definizione della lista di simboli dei ticker
tickers = ['BA', 'GOOGL', 'TSLA', 'PFE', 'AAPL', 'AMZN', 'MSFT', 'META', 'NVDA', 'V']

# Creazione di un dizionario per memorizzare i dati scaricati
stock_data = {}

# Download dei dati per ciascun ticker e memorizzazione nel dizionario
for ticker in tickers:
    stock_data[ticker] = yf.download(ticker, end=datetime.datetime(2023, 11, 1))

# Creazione di un DataFrame con i dati dei titoli
stocks = pd.concat([stock_data[ticker]['Close'] for ticker in tickers], axis=1)

# Assegnazione dei nomi alle colonne
stocks.columns = tickers

# Rimozione delle righe contenenti valori mancanti
stocks_main = stocks.dropna()

for T in period_T:
    for P in period_P:
        for gamma in gamma_list:
            i = 0

            while i < 15:
                i = i + 1
                print(i)

                end_date = generate_random_end_date()
                # Taglio del DataFrame per includere solo valori antecedenti a end_date
                stocks = stocks_main.loc[stocks_main.index < end_date]

                # Calcolo dei rendimenti e dei log-rendimenti
                tipo_di_ritorno = 1  # Modifica a seconda del tuo requisito
                returns = stocks / stocks.shift(tipo_di_ritorno)
                logreturns = np.log(returns).dropna()

                n_titoli = logreturns.shape[1]

                X_test = logreturns.iloc[-(T + P):, :]
                X_test_P = X_test.iloc[-P:, :]  # Per test

                # inizializzo gli insiemi
                X = []
                SR = []

                for t in range(len(logreturns) - (T + P)):

                    logreturns_i = logreturns.iloc[t: T + P + t, :]  # creo finestre che si spostano di 1, ma che hanno la stessa ampiezza di T + P

                    # costruisco X_
                    rendimenti_T_per_X = logreturns_i.iloc[:T, :]

                    rendimenti_P_per_x_otp = logreturns_i.iloc[-P:, :]

                    # cose di P
                    mean_i = rendimenti_P_per_x_otp.mean()
                    Sigma_i = rendimenti_P_per_x_otp.cov()
                    x_i_P = optimize_weights(n_titoli, gamma, mean_i, Sigma_i)

                    # Cose di T
                    mean_T_i = rendimenti_T_per_X.mean()
                    Sigma_T_i = rendimenti_T_per_X.cov()
                    x_i_T = optimize_weights(n_titoli, gamma, mean_i, Sigma_i)
                    mean_T_i = np.array(mean_T_i)

                    conc_i = np.concatenate([mean_T_i, x_i_P])  # alleno con media dei rendimenti su T e x_otp su P

                    X.append(conc_i)

                    # costruisco l'insieme target SR
                    mu_i = rendimenti_P_per_x_otp
                    Exr_i = calculate_expected_return(P, mu_i, x_i_P)
                    Exd_i = calculate_standard_deviation(P, mu_i, x_i_P, Exr_i)

                    SR_i = Exr_i / Exd_i
                    SR.append(SR_i)

                SR = np.array(SR)
                X = np.array(X)

                # Creare un array di indici
                indici = np.arange(len(X))

                # Mescolare gli indici
                np.random.shuffle(indici)

                # Applicare la permutazione agli array
                X_mescolato = X[indici]
                SR_mescolato = SR[indici]

                # Normalizzo input ed output
                scaler = StandardScaler()
                X_normalized = scaler.fit_transform(X_mescolato)

                scaler_SR = StandardScaler()
                SR_normalized = scaler_SR.fit_transform(SR_mescolato.reshape(-1, 1)).flatten()

                # Divisione del dataset
                X_train, X_val, SR_train, SR_val = train_test_split(X_normalized, SR_normalized, test_size=0.2, random_state=42)

                # Creazione del modello NN
                model = tf.keras.Sequential([
                    tf.keras.layers.Flatten(input_shape=(X_train.shape[1],)),
                    tf.keras.layers.Dense(200, activation='tanh'),
                    tf.keras.layers.Dense(1)
                ])
                # Imposta il callback EarlyStopping
                early_stopping_callback = EarlyStopping(monitor='val_loss', patience=200, restore_best_weights=True)

                # Compilazione e allenamento del modello con EarlyStopping
                model.compile(optimizer='adam', loss='mean_squared_error', metrics=['mean_squared_error'])
                history = model.fit(X_train, SR_train, epochs=2000, batch_size=10, validation_data=(X_val, SR_val), callbacks=[early_stopping_callback], verbose=0)

                # Valore si SR che vorrei in base al mio problema di ott [es. preferisco avere rendimenti più bassi, ma che siano meno variabili]
                mean_true = X_test_P.mean()
                Sigma_true = X_test_P.cov()

                x_opt_test = optimize_weights(n_titoli, gamma, mean_true, Sigma_true)

                Exr_test = calculate_expected_return(P, X_test_P, x_opt_test)
                Exd_test = calculate_standard_deviation(P, X_test_P, x_opt_test, Exr_test)
                SR_true_target = Exr_test / Exd_test

                # Se avessi usato il metodo classico che SR avrei avuto?
                X = X_test.iloc[:T, :]
                mean = X.mean()
                Sigma = X.cov()
                x = optimize_weights(n_titoli, gamma, mean, Sigma)  # calcolo le x sulla base dello storico che ho

                Exr_clas = calculate_expected_return(P, X_test_P, x)
                Exd_clas = calculate_standard_deviation(P, X_test_P, x, Exr_test)
                SR_classic = Exr_clas / Exd_clas

                # Approccio Neural
                x_NN = opt_weights_nn(n_titoli, X).x

                Exr_NN = calculate_expected_return(P, X_test_P, x_NN)
                Exd_NN = calculate_standard_deviation(P, X_test_P, x_NN, Exr_NN)
                SR_NN = Exr_NN / Exd_NN

                # Graficare la frontiera
                mean_true = X_test_P.mean()
                Sigma_true = X_test_P.cov()

                def minimizeMyVolatility(w):
                    w = np.array(w)
                    V = np.sqrt(np.dot(w.T, np.dot(Sigma_true, w)))
                    return -1 * (Exr_NN / V)

                def checkSumToOne(w):
                    return np.sum(w) - 1

                def iniz_w0(n_titoli):
                    w0 = []
                    for i in range(n_titoli):
                        w0.append(1 / n_titoli)
                    return w0

                def bounds(n_titoli):
                    bounds = []
                    for i in range(n_titoli):
                        bounds.append((0, 1))
                    return bounds

                constraints = {'type': 'eq', 'fun': checkSumToOne}

                opt = minimize(minimizeMyVolatility, iniz_w0(n_titoli), method='SLSQP', bounds=bounds(n_titoli),
                               constraints=constraints)
                x_otp_NN = opt.x
                V_otp_NN = np.sqrt(np.dot(x_otp_NN.T, np.dot(Sigma_true, x_otp_NN)))

                # Graficare la frontiera
                mean_true = X_test_P.mean()
                Sigma_true = X_test_P.cov()

                def minimizeMyVolatility(w):
                    w = np.array(w)
                    V = np.sqrt(np.dot(w.T, np.dot(Sigma_true, w)))
                    return -1 * (Exr_clas / V)

                def checkSumToOne(w):
                    return np.sum(w) - 1

                def iniz_w0(n_titoli):
                    w0 = []
                    for i in range(n_titoli):
                        w0.append(1 / n_titoli)
                    return w0

                def bounds(n_titoli):
                    bounds = []
                    for i in range(n_titoli):
                        bounds.append((0, 1))
                    return bounds

                constraints = {'type': 'eq', 'fun': checkSumToOne}

                opt = minimize(minimizeMyVolatility, iniz_w0(n_titoli), method='SLSQP', bounds=bounds(n_titoli),
                               constraints=constraints)
                x_otp_classic = opt.x
                Vol_otp_classic = np.sqrt(np.dot(x_otp_classic.T, np.dot(Sigma_true, x_otp_classic)))

                # Coordinate dei punti
                red_coords = np.array([Exd_test, Exr_test])
                violet_coords = np.array([Exd_clas, Exr_clas])
                black_coords = np.array([Exd_NN, Exr_NN])
                green_coords = np.array([V_otp_NN, Exr_NN])
                pink_coords = np.array([Vol_otp_classic, Exr_clas])

                # Calcola le distanze
                distanza_red_black = np.linalg.norm(red_coords - black_coords)
                distanza_red_violet = np.linalg.norm(red_coords - violet_coords)
                distanza_black_green = np.linalg.norm(black_coords - green_coords)
                distanza_pink_violet = np.linalg.norm(pink_coords - violet_coords)

                # Più vicino al target
                if distanza_red_black <= distanza_red_violet:
                    Best_clo = 'Neural'
                    Best_clo_vall = distanza_red_black

                else:
                    Best_clo = 'Classic'
                    Best_clo_vall = distanza_red_violet

                if distanza_black_green <= distanza_pink_violet:
                    best_eff = 'Neural'
                    best_eff_vall = distanza_black_green

                else:
                    best_eff = 'Classic'
                    best_eff_vall = distanza_pink_violet

                result_row = pd.DataFrame({'T': [T], 'P': [P], 'gamma': [gamma], 'end_date': [end_date],
                                           'Best_clo': [Best_clo], 'Best_clo_val': [Best_clo_vall],
                                           'best_eff': [best_eff], 'best_eff_val': [best_eff_vall]})
                results_df = pd.concat([results_df, result_row], ignore_index=True)
                print(results_df)


In [None]:
results_df