In [3]:
import pandas as pd
import funzioni as fx
from multiprocessing import Pool, cpu_count, Manager, Value
import numpy as np
import os

def process_ticker(nome_simbolo, features_prezzo, features_da_scalare_singolarmente, features_meno_piu, features_candele, features_no_scala, elenco_targets, n_timesteps, giorni_previsione, bilanciamento):
    try:
        ticker = pd.read_parquet(f"tickers/{nome_simbolo}.parquet")
        _, X, Y, _ = fx.to_XY(ticker, features_prezzo, features_da_scalare_singolarmente, features_meno_piu, features_candele, features_no_scala, elenco_targets, n_timesteps, giorni_previsione, bilanciamento)
        return nome_simbolo, X, Y
    except Exception as e:
        return nome_simbolo, str(e)

def salva_progressi(X, Y, perc_dati, set_file_x_y, tot):
    print("Salvataggio su file")
    np.save(f'{perc_dati}/X{set_file_x_y}', X)
    np.save(f'{perc_dati}/Y{set_file_x_y}', Y)
    with open(f"{perc_dati}/indice.txt", 'w') as f:
        f.write(str(tot))

def callback_result(result, X, Y, perc_dati, set_file_x_y, totale_processati, n_simboli_addestramento):
    nome_simbolo, Xt, Yt = result
    print(f"{totale_processati.value}/{n_simboli_addestramento}) Completato ticker {nome_simbolo}")
    if Xt is not None and Yt is not None:
        X.extend(Xt)
        Y.extend(Yt)
        with totale_processati.get_lock(): 
            totale_processati.value += 1
            if (totale_processati.value % 1000 == 0):
                salva_progressi(X, Y, perc_dati, set_file_x_y, totale_processati.value)

def prepara_dati(lista_ticker, n_simboli_addestramento, perc_dati, set_file_x_y, bilanciamento=0):
    try:
        with open(f"{perc_dati}/indice.txt", 'r') as f:
            inizio = int(f.read().strip()) 
    except FileNotFoundError:
        inizio = 0  
    print(f'inizio={inizio}, n_simboli_addestramento={n_simboli_addestramento}')

    manager = Manager()
    X = manager.list()
    Y = manager.list()
    totale_processati = Value('i', inizio)  

    # Controlla se i dati sono già stati salvati e li carica
    if os.path.exists(f"{perc_dati}/X{set_file_x_y}.npy") and os.path.exists(f"{perc_dati}/Y{set_file_x_y}.npy"):
        print("Caricamento X e Y")
        X.extend(np.load(f'{perc_dati}/X{set_file_x_y}.npy'))
        Y.extend(np.load(f'{perc_dati}/Y{set_file_x_y}.npy'))
        
    if inizio < n_simboli_addestramento:
        with Pool(cpu_count()) as p:
            for i in range(inizio, n_simboli_addestramento):
                nome_simbolo = lista_ticker.iloc[i]["Ticker"]
                task = (nome_simbolo, fx.features_prezzo, fx.features_da_scalare_singolarmente, fx.features_meno_piu, fx.features_candele, fx.features_no_scala, fx.elenco_targets, fx.n_timesteps, fx.giorni_previsione, bilanciamento)
                p.apply_async(process_ticker, args=task, callback=lambda result: callback_result(result, X, Y, perc_dati, set_file_x_y, totale_processati, n_simboli_addestramento))

            p.close()
            p.join()
    
    if ((n_simboli_addestramento - totale_processati.value) < 1000):
        with open(f"{perc_dati}/indice.txt", 'w') as f:
            f.write(str(n_simboli_addestramento))

    return list(X), list(Y)

lista_ticker = pd.read_parquet("lista_ticker.parquet")
lista_ticker.sample(frac=1)
tot_ticker = 1000
bilanciamento = 0.25
X, Y = prepara_dati(lista_ticker, tot_ticker, 'dati', f'_{tot_ticker}_{bilanciamento}', bilanciamento=bilanciamento)
print(X.shape)
print(Y.shape)

inizio=1000, n_simboli_addestramento=1000


Caricamento X e Y


KeyboardInterrupt: 