In [19]:
import pandas as pd

# Specifica il percorso del file
file_path = 'bitcoin_data_sorted.csv'

# Carica il dataset in un DataFrame di pandas
df = pd.read_csv(file_path)

# 1. Visualizza le prime 5 righe del dataset per avere una prima impressione
print("--- Prime 5 righe del dataset ---")
print(df.head())

# 2. Ottieni informazioni generali: nomi delle colonne, numero di righe non nulle e tipo di dati
print("\n--- Informazioni sul dataset ---")
df.info()

# 3. Controlla se ci sono valori mancanti (NaN) in ogni colonna
print("\n--- Conteggio dei valori mancanti per colonna ---")
print(df.isnull().sum())

--- Prime 5 righe del dataset ---
      Timestamp    Open    High     Low   Close    Volume
0  1.483229e+09  963.16  963.16  963.16  963.16  0.000000
1  1.483229e+09  962.85  962.85  962.85  962.85  0.028129
2  1.483229e+09  962.85  962.85  962.85  962.85  0.000000
3  1.483229e+09  962.85  962.85  962.85  962.85  0.000000
4  1.483229e+09  963.17  963.17  963.17  963.17  0.008000

--- Informazioni sul dataset ---
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 4432647 entries, 0 to 4432646
Data columns (total 6 columns):
 #   Column     Dtype  
---  ------     -----  
 0   Timestamp  float64
 1   Open       float64
 2   High       float64
 3   Low        float64
 4   Close      float64
 5   Volume     float64
dtypes: float64(6)
memory usage: 202.9 MB

--- Conteggio dei valori mancanti per colonna ---
Timestamp    0
Open         0
High         0
Low          0
Close        0
Volume       0
dtype: int64


In [24]:
import pandas as pd

# Per leggere i dati principali
try:
    df_results = pd.read_parquet('results_data/')
    print("--- Dati dei Risultati Aggregati ---")
    print(df_results.tail())
except Exception as e:
    print(f"Nessun dato ancora in results_data o errore: {e}")

# Per leggere gli alert
try:
    df_alerts = pd.read_parquet('results_alerts/')
    print("\n--- Alert Rilevati ---")
    print(df_alerts)
except Exception as e:
    print(f"Nessun alert ancora in results_alerts o errore: {e}")

--- Dati dei Risultati Aggregati ---
                                                 window  avg_price  min_price  \
4614  {'start': 1485298200000000000, 'end': 14852988...    909.110     908.61   
4615  {'start': 1485300000000000000, 'end': 14853006...    907.521     907.46   
4616  {'start': 1485301800000000000, 'end': 14853024...    908.014     907.48   
4617  {'start': 1485303000000000000, 'end': 14853036...    907.580     907.01   
4618  {'start': 1485299400000000000, 'end': 14853000...    907.854     906.93   

      max_price  volatility  total_volume    alert  
4614     909.47    0.306050     46.625195  Nominal  
4615     907.92    0.140748      5.127181  Nominal  
4616     909.07    0.689560     33.147293  Nominal  
4617     907.94    0.384708      5.737847  Nominal  
4618     908.12    0.330831     29.972430  Nominal  

--- Alert Rilevati ---
                                              window  avg_price  min_price  \
0  {'start': 1483638600000000000, 'end': 14836392...    

--- 

<br>

## Visualizzazione degli Alert Generati:

_nota:_ la pipeline è stata lasciata in esecuzione per 60 minuti, ingerendo i dati filtrati dal 2017 (seguedo la logica di `prepare_data.py`)

In [23]:
def load_and_format_results(path):
    """
    Carica i risultati da un percorso Parquet, formatta la finestra temporale
    e restituisce un DataFrame di pandas pronto per la visualizzazione.
    """
    try:
        df = pd.read_parquet(path)
        if df.empty:
            return pd.DataFrame() # Restituisce un DF vuoto se non ci sono dati
        
        # Formatta la colonna 'window' in modo leggibile
        # Estrae le date di inizio e fine e le formatta
        df['start_time'] = pd.to_datetime(df['window'].apply(lambda x: x['start']))
        df['end_time'] = pd.to_datetime(df['window'].apply(lambda x: x['end']))
        
        # Riorganizza le colonne per una migliore leggibilità
        cols_to_show = ['start_time', 'end_time', 'avg_price', 'min_price', 'max_price', 'volatility', 'total_volume', 'alert']
        return df[cols_to_show]
        
    except Exception as e:
        # Gestisce il caso in cui la cartella non esista o sia vuota
        # print(f"Nessun dato trovato in '{path}' o errore: {e}")
        return pd.DataFrame()

In [21]:
# Carica e visualizza i dati aggregati principali
results_df = load_and_format_results('results_data/')

if not results_df.empty:
    print("--- Ultimi Risultati Aggregati dalla Pipeline ---")
    # Usiamo .tail() per vedere i risultati più recenti
    display(results_df.tail(10))
else:
    print("Nessun dato aggregato ancora salvato. Assicurati che la pipeline Spark sia in esecuzione.")

--- Ultimi Risultati Aggregati dalla Pipeline ---


Unnamed: 0,start_time,end_time,avg_price,min_price,max_price,volatility,total_volume,alert
4439,2017-01-23 21:10:00,2017-01-23 21:20:00,917.035,915.29,919.45,1.623017,145.443354,Nominal
4440,2017-01-23 20:50:00,2017-01-23 21:00:00,921.009,920.37,921.38,0.268554,5.795177,Nominal
4441,2017-01-23 20:10:00,2017-01-23 20:20:00,926.963,926.55,927.27,0.258588,52.003943,Nominal
4442,2017-01-23 21:30:00,2017-01-23 21:40:00,917.77,917.01,918.49,0.420079,10.489082,Nominal
4443,2017-01-23 20:40:00,2017-01-23 20:50:00,919.591,918.08,920.58,0.823265,29.768098,Nominal
4444,2017-01-24 22:50:00,2017-01-24 23:00:00,909.11,908.61,909.47,0.30605,46.625195,Nominal
4445,2017-01-24 23:20:00,2017-01-24 23:30:00,907.521,907.46,907.92,0.140748,5.127181,Nominal
4446,2017-01-24 23:50:00,2017-01-25 00:00:00,908.014,907.48,909.07,0.68956,33.147293,Nominal
4447,2017-01-25 00:10:00,2017-01-25 00:20:00,907.58,907.01,907.94,0.384708,5.737847,Nominal
4448,2017-01-24 23:10:00,2017-01-24 23:20:00,907.854,906.93,908.12,0.330831,29.97243,Nominal


In [26]:
from colorama import Fore, Style
# Carica e visualizza solo gli alert
alerts_df = load_and_format_results('results_alerts/')

print(f"--- Alert di Sistema Rilevati ---")
if not alerts_df.empty:
    print(f"Trovati {len(alerts_df)} alert.")
    # Stampa ogni alert con un formato personalizzato e colorato
    for index, row in alerts_df.iterrows():
        print(
            f"{Fore.GREEN}ALERT: {row['alert']}{Style.RESET_ALL} | "
            f"Periodo: {row['start_time'].strftime('%Y-%m-%d %H:%M')} -> {row['end_time'].strftime('%H:%M')}|"
            f"Prezzo Max: {row['max_price']:.2f} vs Media: {row['avg_price']:.2f}"
        )
else:
    print("Nessun alert rilevato finora.")

--- Alert di Sistema Rilevati ---
Trovati 12 alert.
ALERT: Spike di Prezzo > 2% | Periodo: 2017-03-11 02:40 -> 02:50 | Prezzo Max: 1138.22 vs Media: 1105.53
ALERT: Spike di Prezzo > 2% | Periodo: 2017-03-07 16:30 -> 16:40 | Prezzo Max: 1243.99 vs Media: 1216.75
ALERT: Spike di Prezzo > 2% | Periodo: 2017-01-05 17:50 -> 18:00 | Prezzo Max: 1025.00 vs Media: 994.55
ALERT: Spike di Prezzo > 2% | Periodo: 2017-01-05 18:40 -> 18:50 | Prezzo Max: 979.00 vs Media: 951.47
ALERT: Spike di Prezzo > 2% | Periodo: 2017-01-05 18:10 -> 18:20 | Prezzo Max: 955.23 vs Media: 915.03
ALERT: Spike di Prezzo > 2% | Periodo: 2017-01-05 18:20 -> 18:30 | Prezzo Max: 958.69 vs Media: 928.65
ALERT: Spike di Prezzo > 2% | Periodo: 2017-02-08 13:00 -> 13:10 | Prezzo Max: 1059.00 vs Media: 1036.18
ALERT: Spike di Prezzo > 2% | Periodo: 2017-04-13 10:00 -> 10:10 | Prezzo Max: 1209.41 vs Media: 1183.66
ALERT: Spike di Prezzo > 2% | Periodo: 2017-03-11 02:00 -> 02:10 | Prezzo Max: 1282.03 vs Media: 1111.84
ALERT: Spi