# 1. Ingestione e Preparazione dei Dati

**Obiettivo:** Caricare il dataset grezzo dei giocatori NBA, applicare le funzioni di pulizia e prepararlo per le analisi successive. 

**Fasi:**
1.  **Setup dell'Ambiente:** Configurazione di PySpark e importazione delle librerie.
2.  **Caricamento Dati:** Download automatico del dataset da Kaggle (se non presente) e caricamento in un DataFrame Spark.
3.  **Pulizia e Standardizzazione:** Applicazione di funzioni per standardizzare i nomi delle colonne, correggere i tipi di dato e gestire i valori mancanti.
4.  **Salvataggio:** Salvataggio del DataFrame pulito in formato Parquet per efficienza e interoperabilità.

In [None]:
import os
import sys

# Aggiunge la root del progetto al sys.path per permettere l'importazione dei moduli custom
module_path = os.path.abspath(os.path.join('..'))
if module_path not in sys.path:
    sys.path.append(module_path)

# Importazione delle utility e delle funzioni di elaborazione
from src.utils.helpers import get_spark_session, save_dataframe
from src.data_ingestion.download_data import download_nba_dataset
from src.data_processing.cleaning import standardize_column_names, correct_data_types, handle_missing_values
from src.config.spark_config import SPARK_CONFIG

# Inizializzazione della sessione Spark
spark = get_spark_session(
    app_name="NBA_Data_Ingestion_Preparation",
    driver_memory=SPARK_CONFIG["driver_memory"]
)

### Fase 1: Caricamento Dati con Download Automatico

In [None]:
# Definizione dei percorsi per i dati grezzi
raw_data_dir = "../data/raw"
csv_file_name = "Player Totals.csv"
csv_file_path = os.path.join(raw_data_dir, csv_file_name)

# Verifica se il dataset esiste, altrimenti lo scarica da Kaggle
if not os.path.exists(csv_file_path):
    print(f"Dataset non trovato. Avvio del download in '{raw_data_dir}'...")
    download_nba_dataset(raw_data_dir)
else:
    print(f"Dataset già presente in '{csv_file_path}'. Salto il download.")

# Caricamento del file CSV in un DataFrame Spark
raw_df = spark.read.csv(csv_file_path, header=True, inferSchema=False)

# Visualizzazione delle informazioni preliminari del DataFrame
print(f"\nNumero di righe iniziali: {raw_df.count()}")
print("Schema iniziale del DataFrame:")
raw_df.printSchema()
raw_df.show(5, truncate=False)

### Fase 2: Pulizia e Preparazione dei Dati

In [None]:
# Applicazione sequenziale delle funzioni di pulizia
df_std_names = standardize_column_names(raw_df)
df_typed = correct_data_types(df_std_names)
df_cleaned = handle_missing_values(
    df_typed, 
    min_games_threshold=SPARK_CONFIG["min_games_threshold"]
)

# Conteggio delle righe dopo il processo di pulizia e filtraggio
print(f"Numero di righe dopo la pulizia: {df_cleaned.count()}")

# Visualizzazione di un campione di dati puliti per verifica
df_cleaned.select("player", "season", "pts", "mp", "g").show(10)

### Fase 3: Salvataggio dei Dati Processati

In [None]:
# Definizione del percorso di output per il file Parquet
output_path = "../data/processed/players_cleaned.parquet"

# Salvataggio del DataFrame pulito usando la funzione helper
# Il formato Parquet è scelto per le sue performance e l'ottimizzazione con Spark
save_dataframe(df_cleaned, output_path)

print(f"DataFrame pulito salvato con successo in '{output_path}'.")

In [None]:
# Termina la sessione Spark per liberare le risorse
spark.stop()