# Pre-Elaborazione dei Dati (Dataset di riferimento da Luglio 2024 a Giugno 2025)

In [8]:
import pandas as pd
import glob
import os
from sklearn.preprocessing import MinMaxScaler
import numpy as np
import shutil
import re 
import gc
import math

### Delineamo l'ambiente di lavoro

In questa sezione vengono definite le directory di lavoro e tutti quei parametri per cui andiamo a filtrare i nostri dati.

SOG_MIN --> Impostiamo il parametro a 2.0, questo ci serve per poi andare a scartare tutte le navi ferme.

TIME_GAP --> Questa √® una soglia di tempo massima arbitraria permessa all'interno di una singola traiettoria. Se tra due messaggi consecutivi della stessa nave passano pi√π di 60 minuti, assumiamo che la rotta sia stata interrotta.

In [None]:
INPUT_DIR = '../../../Dataset'
SCRIPT_DIR = os.getcwd()                                # Restituisce la directory di lavoro corrente

OUTPUT_DIR_NAME = 'Dataset_Pre-Cleaned_AIS' 
OUTPUT_DIR = os.path.join(SCRIPT_DIR, OUTPUT_DIR_NAME)

SOG_MIN_THRESHOLD = 2.0
TIME_GAP_THRESHOLD = pd.Timedelta(hours=1)

os.makedirs(OUTPUT_DIR, exist_ok=True)

all_files = glob.glob(os.path.join(INPUT_DIR, '*.parquet'))

all_clean_data = []

print(f"Trovati {len(all_files)} file Parquet da processare.")

Trovati 365 file Parquet da processare.


#### Test

- Proviamo a verificare la lettura di un file parquet e della corretta formattazione dei dati.  
- Oltre a questo andiamo ad estrarre il numero di colonne per verificare se sono state selezionate le colonne corrette.  
- Viene aggiunto anche un controllo sulle righe per vedere dopo la pulizia la percentuale di pulizia per ogni file.



In [None]:
BASE_PATH = '../../../Dataset/'

FILE_PATH_TEST = os.path.join(BASE_PATH, 'ais-2025-01-01.parquet')
FILE_PATH_TEST2 = os.path.join(BASE_PATH, 'AIS_2024_12_31.parquet')

COLUMN_MAPPING2025 = {
    'mmsi': 'MMSI', 
    'latitude': 'Latitude', 
    'longitude': 'Longitude', 
    'sog': 'SOG', 
    'cog': 'COG', 
    'base_date_time': 'Timestamp' 
}
COLUMNS_TO_READ_2025 = list(COLUMN_MAPPING2025.keys())

try:
    df = pd.read_parquet(
        FILE_PATH_TEST, 
        columns=COLUMNS_TO_READ_2025,
        engine='pyarrow' 
    )

    df = df.rename(columns=COLUMN_MAPPING2025)

    df['Timestamp'] = pd.to_datetime(df['Timestamp'])
    
    
    print(f"--- üîç DEBUG: Dati iniziali dal file {os.path.basename(FILE_PATH_TEST)} ---")
    print("\nHead del DataFrame:")
    print(df.head())
    print("\nTipi di Dati (Dtypes) dopo la conversione Timestamp:")
    print(df.dtypes)
    print("----------------------------------------------------------------\n")
    rows,columns = df.shape
    print(f"Numero di righe: {rows}, Numero di colonne: {columns}\n")
        
except Exception as e:
    print(f"Errore nel processare il file {FILE_PATH_TEST}: {e}")


COLUMN_MAPPING2024 = {
    'MMSI': 'MMSI',
    'LAT': 'Latitude',
    'LON': 'Longitude',
    'SOG': 'SOG',
    'COG': 'COG',
    'BaseDateTime': 'Timestamp' 
}
COLUMNS_TO_READ_2024 = list(COLUMN_MAPPING2024.keys())

try:
    df = pd.read_parquet(
        FILE_PATH_TEST2, 
        columns=COLUMNS_TO_READ_2024,
        engine='pyarrow' 
    )

    df = df.rename(columns=COLUMN_MAPPING2024)

    df['Timestamp'] = pd.to_datetime(df['Timestamp'])


    print(f" DEBUG: Dati iniziali dal file {os.path.basename(FILE_PATH_TEST2)}")
    print("\nHead del DataFrame:")
    print(df.head())
    print("\nTipi di Dati (Dtypes) dopo la conversione Timestamp:")
    print(df.dtypes)
    print("----------------------------------------------------------------\n")
    rows,columns = df.shape
    print(f"Numero di righe: {rows}, Numero di colonne: {columns}\n")
        
except Exception as e:
    print(f"Errore nel processare il file {FILE_PATH_TEST2}: {e}")

--- üîç DEBUG: Dati iniziali dal file ais-2025-01-01.parquet ---

Head del DataFrame:
        MMSI  Latitude  Longitude  SOG    COG           Timestamp
0  671087100  18.46281  -66.10297  0.0  176.7 2025-01-01 00:00:00
1  367733950  48.48503 -122.60927  0.0  215.5 2025-01-01 00:00:00
2  368138010  40.47715  -73.84652  5.5  286.9 2025-01-01 00:00:02
3  367637210  29.12033  -90.21215  0.0  227.6 2025-01-01 00:00:03
4  368050000  41.27196  -72.46934  0.0  107.1 2025-01-01 00:00:03

Tipi di Dati (Dtypes) dopo la conversione Timestamp:
MMSI                  int64
Latitude            float64
Longitude           float64
SOG                 float64
COG                 float64
Timestamp    datetime64[ns]
dtype: object
----------------------------------------------------------------

Numero di righe: 7337208, Numero di colonne: 6

--- üîç DEBUG: Dati iniziali dal file AIS_2024_12_31.parquet ---

Head del DataFrame:
        MMSI  Latitude  Longitude   SOG    COG           Timestamp
0  367776660 

### Pulizia dei dati
 
In questa sezione, iteriamo su ogni file del nostro dataset ed eseguiamo la pulizia vera e propria, applicando dei filtri. Il primo filtro filtro applicato √® sulla lettura delle colonne `COLUMNS_TO_READ` prima di caricare i dati. √à il modo pi√π efficiente per scartare le colonne inutili e riduce drasticamente l'utilizzo della RAM velocizzando l'intero processo.

##### Filtri Navigazione Attiva e di Validit√†
  
Vengono applicati una serie di filtri per lasciare all'interno del dataset solo valori validi e di navigazione attiva:
1. Applichiamo il filtro `df = df[df['SOG'] > SOG_MIN_THRESHOLD`, eliminando i dati statici come deciso sopra.
2. Applichiamo il filtro `df[df['COG'] != 511]`,rimuovendo i record dove il COG (Course Over Ground) √® $511$. Questo √® un codice standard AIS che significa "Dato Non Disponibile". Senza una rotta (COG), l'informazione cinematica √® incompleta e inutile per il modello.
3. Applichiamo il filtro `Filtro Lat/Lon (>= -90, <= 90, etc.)`, eliminiamo i record con coordinate geografiche errate (fuori dal globo). Questi sono errori di trasmissione o del sensore che inquinerebbero il dataset.
4. Utilizziamo il metodo `df.dopna(...)` per rimuovere qualsiasi riga che abbia valori mancanti. Questo perch√® i modelli LSTM/LNN richiedono input completi per funzionare correttamente.
5. Infine l'ultimo filtro √® `df['MMSI'].str.len()==9` per rimuovere i record con l'identificativo della nave non corretto. Questo perch√® l'MMSI deve essere di 9 cifre e questo ci garantisce che ogni traiettoria sia attribuita ad una nave valida.



In [5]:
MAPPING_2025 = {
    'mmsi': 'MMSI', 
    'latitude': 'Latitude', 
    'longitude': 'Longitude', 
    'sog': 'SOG', 
    'cog': 'COG', 
    'base_date_time': 'Timestamp' 
}

COLUMNS_2025 = list(MAPPING_2025.keys())

MAPPING_2024 = {
    'MMSI': 'MMSI',
    'LAT': 'Latitude',
    'LON': 'Longitude',
    'SOG': 'SOG',
    'COG': 'COG',
    'BaseDateTime': 'Timestamp'
}
COLUMNS_2024 = list(MAPPING_2024.keys())

for file_path in all_files:
    df = None
    mapping_usato = None

    try:
        
        df = pd.read_parquet(
            file_path, 
            columns=COLUMNS_2025,
            engine='pyarrow' 
        )
        df = df.rename(columns=MAPPING_2025)
        mapping_usato = "2025"
    
    except Exception as e1:
        try:
            df = pd.read_parquet(
                file_path, 
                columns=COLUMNS_2024,
                engine='pyarrow' 
            )
            df = df.rename(columns=MAPPING_2024)
            mapping_usato = "2024"
        
        except Exception as e2:
            print(f"Errore IRRISOLVIBILE nel caricare {file_path}: Schema non riconosciuto.")
            continue

    if df is not None:
        try:
            
            df['Timestamp'] = pd.to_datetime(df['Timestamp'])
            
            # Filtri cinematici e geografici
            df = df[df['SOG'] > SOG_MIN_THRESHOLD]
            df = df[df['COG'] != 511]
            df = df[(df['Latitude'] >= -90) & (df['Latitude'] <= 90)]
            df = df[(df['Longitude'] >= -180) & (df['Longitude'] <= 180)]
            
            # Filtri di integrit√†
            df = df.dropna(subset=['MMSI', 'Latitude', 'Longitude', 'SOG', 'COG'])
            df['MMSI'] = df['MMSI'].astype(str).str.replace(r'\D', '', regex=True)
            df = df[df['MMSI'].str.len() == 9]

            if not df.empty:

                output_filename = os.path.basename(file_path).lower()
                output_file = os.path.join(OUTPUT_DIR, output_filename)
                
                df.to_parquet(output_file, index=False)
                
                print(f"File {os.path.basename(file_path)} pre-pulito")
            
        except Exception as e:
            print(f"Errore nella FASE DI PULIZIA per il file {file_path}: {e}")

print("\n--- FASE 1 (Pre-Pulizia) completata. ---")

File ais-2025-02-15.parquet pre-pulito
File ais-2025-06-22.parquet pre-pulito
File ais-2025-04-26.parquet pre-pulito
File AIS_2024_08_21.parquet pre-pulito
File ais-2025-01-22.parquet pre-pulito
File ais-2025-02-27.parquet pre-pulito
File ais-2025-06-18.parquet pre-pulito
File ais-2025-06-14.parquet pre-pulito
File AIS_2024_08_11.parquet pre-pulito
File AIS_2024_08_06.parquet pre-pulito
File AIS_2024_08_22.parquet pre-pulito
File ais-2025-06-21.parquet pre-pulito
File AIS_2024_11_05.parquet pre-pulito
File AIS_2024_10_30.parquet pre-pulito
File AIS_2024_10_16.parquet pre-pulito
File ais-2025-01-23.parquet pre-pulito
File ais-2025-03-26.parquet pre-pulito
File AIS_2024_09_24.parquet pre-pulito
File ais-2025-03-25.parquet pre-pulito
File AIS_2024_07_01.parquet pre-pulito
File AIS_2024_09_11.parquet pre-pulito
File ais-2025-03-08.parquet pre-pulito
File AIS_2024_11_21.parquet pre-pulito
File ais-2025-04-25.parquet pre-pulito
File AIS_2024_08_18.parquet pre-pulito
File AIS_2024_12_19.parqu

#### Test file pre-pulizia

In [None]:
PRE_CLEANED_FILE_PATH_TEST = 'Dataset_Pre-Cleaned_AIS/ais-2025-01-01.parquet'
COLUMNS_TO_READ_2025 = ['MMSI', 'Latitude', 'Longitude','SOG', 'COG', 'Timestamp']

df = pd.read_parquet(
        PRE_CLEANED_FILE_PATH_TEST, 
        columns=COLUMNS_TO_READ_2025,
        engine='pyarrow' 
    )

df.head()

Unnamed: 0,MMSI,Latitude,Longitude,SOG,COG,Timestamp
0,368138010,40.47715,-73.84652,5.5,286.9,2025-01-01 00:00:02
1,367188610,27.93936,-82.45703,2.2,147.6,2025-01-01 00:00:04
2,366938780,46.04232,-83.93567,11.8,126.0,2025-01-01 00:00:00
3,316028554,49.28782,-123.10689,7.8,215.6,2025-01-01 00:00:06
4,338122081,37.78262,-122.38452,3.7,196.6,2025-01-01 00:00:12


#### Unificazione in blocchi di file da 15 giorni

Questo raggruppamento serve per andare a diminuire quelli che sono i punti del problema di "mezzanotte". Questo problema chiamato cos√¨ da noi per indicare la situazione in cui ci siano traiettorie continue a cavallo di due differenti file. Con un'unica grande unificazione non ci sarebbe stato tale problema ma a causa di limiti Hardware non √® stato possibile consolidare tutto in un unico file. Si √® scelto quindi di procedere con un unificazione parziale del dataset totale dove ogni file racchiude 15 giorni.

##### Segmentazione e Creazione delle Traiettorie

Questa √® la fase finale prima del salvataggio dei nuovi blocchi, dove trasformiamo i dati puliti in sequenze coerenti (TrajectoryID).  
Quello che andiamo a fare √® raggruppare i nostri dati prima per l'MMSI e poi per il TimeStamp. In questo modo abbiamo i dati ordinati ed  √® possibile delineare quelle che sono le traiettorie diverse per ogni nave. Viene aggiunta una nuova colonna al dataset che √® `TrajectoryID` che ha il compito di raggruppare tutti i dati di ogni singola nave che fanno riferimento ad un intero spostamento.  
Gli spostamenti sono stati delineati assumendo che spostamenti diversi vengono caratterizzati da uno stato di navigazione non attiva di almeno 1 ora.  
Questa fase √® essenziale perch√® i modelli che andremo ad addestrare, impareranno non dai singoli punti ma dalle intere sequenze.

```
df = df.sort_values(by=['MMSI', 'Timestamp']).reset_index(drop=True)
df_blocco['TimeDiff'] = df_blocco.groupby('MMSI')['Timestamp'].diff()     
df_blocco['IsNewTraj'] = (df_blocco['MMSI'] != df_blocco['MMSI'].shift(1)) | (df_blocco['TimeDiff'] > TIME_GAP_THRESHOLD)
df_blocco['IsNewTraj_int'] = df_blocco['IsNewTraj'].astype(int)
df_blocco['TrajectoryID'] = df_blocco['IsNewTraj_int'].cumsum() + max_trajectory_id_globale
```

In [5]:

INPUT_DIR = 'Dataset_Pre-Cleaned_AIS' 
SCRIPT_DIR = os.getcwd()

OUTPUT_DIR_NAME = 'Dataset_Segmentato_15Giorni' 
OUTPUT_DIR = os.path.join(SCRIPT_DIR, OUTPUT_DIR_NAME)

if os.path.exists(OUTPUT_DIR):
    shutil.rmtree(OUTPUT_DIR)
os.makedirs(OUTPUT_DIR, exist_ok=True)

TIME_GAP_THRESHOLD = pd.Timedelta(hours=1)

GIORNI_PER_BLOCCO = 15

all_files = glob.glob(os.path.join(INPUT_DIR, '*.parquet'))
all_files.sort() # Fondamentale per ordinare i giorni!

num_blocchi = math.ceil(len(all_files) / GIORNI_PER_BLOCCO)
print(f"Trovati {len(all_files)} file, raggruppati in {num_blocchi} blocchi da 15 giorni.")

max_trajectory_id_globale = 0 

for i in range(num_blocchi):
    start_index = i * GIORNI_PER_BLOCCO
    end_index = (i + 1) * GIORNI_PER_BLOCCO
    
    file_list_blocco = all_files[start_index:end_index]
    
    print(f"\n--- Inizio elaborazione Blocco {i+1}/{num_blocchi} ---")
    
    try:
        print(f"Caricamento di {len(file_list_blocco)} file...")
        
        df_list = [pd.read_parquet(f) for f in file_list_blocco]
        df_blocco = pd.concat(df_list, ignore_index=True)
        

        df_blocco = df_blocco.sort_values(by=['MMSI', 'Timestamp']).reset_index(drop=True)
        
        print("Inizio calcolo TrajectoryID...")
        df_blocco['TimeDiff'] = df_blocco.groupby('MMSI')['Timestamp'].diff()
        df_blocco['IsNewTraj'] = (df_blocco['MMSI'] != df_blocco['MMSI'].shift(1)) | (df_blocco['TimeDiff'] > TIME_GAP_THRESHOLD)
        df_blocco['IsNewTraj_int'] = df_blocco['IsNewTraj'].astype(int)
        df_blocco['TrajectoryID'] = df_blocco['IsNewTraj_int'].cumsum() + max_trajectory_id_globale
        
        df_blocco = df_blocco.drop(columns=['TimeDiff', 'IsNewTraj', 'IsNewTraj_int'])
        
        max_trajectory_id_globale = df_blocco['TrajectoryID'].max()
        
        output_file = os.path.join(OUTPUT_DIR, f"blocco_{i:03d}-segmentato.parquet")
        df_blocco.to_parquet(output_file, index=False, engine='pyarrow', compression='snappy')
        
    except MemoryError:
        print(f"--- ‚ùå ERRORE DI MEMORIA: Blocco {i+1} (15 giorni) √® ancora troppo grande! ---")
        break 
    except Exception as e:
        print(f"--- ‚ùå ERRORE SCONOSCIUTO nel blocco {i+1}: {e} ---")


Trovati 365 file, raggruppati in 25 blocchi da 15 giorni.

--- Inizio elaborazione Blocco 1/25 ---
Caricamento di 15 file...
Inizio calcolo TrajectoryID...

--- Inizio elaborazione Blocco 2/25 ---
Caricamento di 15 file...
Inizio calcolo TrajectoryID...

--- Inizio elaborazione Blocco 3/25 ---
Caricamento di 15 file...
Inizio calcolo TrajectoryID...

--- Inizio elaborazione Blocco 4/25 ---
Caricamento di 15 file...
Inizio calcolo TrajectoryID...

--- Inizio elaborazione Blocco 5/25 ---
Caricamento di 15 file...
Inizio calcolo TrajectoryID...

--- Inizio elaborazione Blocco 6/25 ---
Caricamento di 15 file...
Inizio calcolo TrajectoryID...

--- Inizio elaborazione Blocco 7/25 ---
Caricamento di 15 file...
Inizio calcolo TrajectoryID...

--- Inizio elaborazione Blocco 8/25 ---
Caricamento di 15 file...
Inizio calcolo TrajectoryID...

--- Inizio elaborazione Blocco 9/25 ---
Caricamento di 15 file...
Inizio calcolo TrajectoryID...

--- Inizio elaborazione Blocco 10/25 ---
Caricamento di 15 

#### Individuazione e Applicazione dell 'algoritmo di cucitura

Per ottenere un'effettiva coerenza dei **TrajectorID** dobbiamo andare ad individuare la presenza di incoerenza nei relativi punti di mezzanotte. Ci√≤ sta a significare che nonostante siano stati diminuiti questi punti cruciali non √® possibile lasciare che traiettorie continue (nuovo spostamento in meno di 1 ora) venga considerato come una nuova traiettoria solo perch√® vi √® un cambio di file .parquet.

Per gestire tale situazione √® stato implementato un algoritmo :

- Viene letto il primo file insieme al secondo e viene analizzato ogni spostamento durante la "Mezzanotte"
- Se ci sono spostamenti che non superano l'ora, viene modificato il **TrajectorID** per mantenere coerenza tra i due file
- Successivamente viene tolto dalla memoria il primo file e si analizza interamente il secondo per propagare i cambiamenti
- In conlcusione viene preso il terzo file e si ripete tutta la procedura

Questi passaggi vengono attuati a tutti i blocchi in ordine in modo da poter propagare la correzione all'intero dataset.

In [None]:
INPUT_DIR = 'Dataset_Segmentato_15Giorni' 
OUTPUT_DIR_NAME = 'Dataset_Stitched_Finale' 
OUTPUT_DIR = os.path.join(os.getcwd(), OUTPUT_DIR_NAME)

if os.path.exists(OUTPUT_DIR):
    shutil.rmtree(OUTPUT_DIR)
os.makedirs(OUTPUT_DIR, exist_ok=True)

TIME_GAP_THRESHOLD = pd.Timedelta(hours=1)

print(f"--- FASE B: Stitching Sequenziale avviato ---")
print(f"Lettura blocchi da: {INPUT_DIR}")
print(f"Salvataggio finale in: {OUTPUT_DIR}\n")

all_blocks = sorted(glob.glob(os.path.join(INPUT_DIR, '*.parquet')))

if not all_blocks:
    print(f"ERRORE: Nessun file blocco trovato in {INPUT_DIR}.")
else:
        
    path_A_corretto = all_blocks[0]
    output_path_A = os.path.join(OUTPUT_DIR, os.path.basename(path_A_corretto))
    shutil.copy(path_A_corretto, output_path_A)
    print(f"Blocco 0 ({os.path.basename(path_A_corretto)}) copiato, nessuna correzione necessaria.")

    # Si passa al controllo sui due blocchi consecutivi    
    for i in range(len(all_blocks) - 1):
                
        path_A_corretto = os.path.join(OUTPUT_DIR, os.path.basename(all_blocks[i]))
        path_B_grezzo = all_blocks[i+1]
        
        print(f"\n--- Inizio cucitura: {os.path.basename(path_A_corretto)} -> {os.path.basename(path_B_grezzo)} ---")
        
        try:
            
            df_A = pd.read_parquet(path_A_corretto)
            df_B = pd.read_parquet(path_B_grezzo)
            
            
            print("  Trovati confini, calcolo mappa...")
            last_records_A = df_A.loc[df_A.groupby('MMSI')['Timestamp'].idxmax()]
            last_records_A = last_records_A[['MMSI', 'Timestamp', 'TrajectoryID']].rename(
                columns={'Timestamp': 'Last_Timestamp', 'TrajectoryID': 'Correct_ID'}
            )   # Prende l'ultimo record di ogni MMSI in A

            first_records_B = df_B.loc[df_B.groupby('MMSI')['Timestamp'].idxmin()]
            first_records_B = first_records_B[['MMSI', 'Timestamp', 'TrajectoryID']].rename(
                columns={'Timestamp': 'First_Timestamp', 'TrajectoryID': 'Old_ID'}
            )   # Prende il primo record di ogni MMSI in B

            
            boundary_check = pd.merge(last_records_A, first_records_B, on='MMSI')
            boundary_check['TimeDiff'] = boundary_check['First_Timestamp'] - boundary_check['Last_Timestamp']
            stitch_candidates = boundary_check[boundary_check['TimeDiff'] <= TIME_GAP_THRESHOLD]
        
            # Creazione della mappa di correzione
            local_fix_map = stitch_candidates.set_index('Old_ID')['Correct_ID'].to_dict()
            print(f"  -> Trovate {len(local_fix_map)} cuciture da applicare.")

            print("  Rilascio memoria Blocco A...")
            del df_A, last_records_A, first_records_B, boundary_check, stitch_candidates
            gc.collect()

            print("  Applicazione correzioni a Blocco B...")
            df_B['TrajectoryID'] = df_B['TrajectoryID'].map(local_fix_map).fillna(df_B['TrajectoryID']).astype(int)

            output_path_B = os.path.join(OUTPUT_DIR, os.path.basename(path_B_grezzo))
            df_B.to_parquet(output_path_B, index=False, engine='pyarrow', compression='snappy')
            print(f"  -> Blocco {os.path.basename(output_path_B)} corretto e salvato.")

        except Exception as e:
            print(f"  -> ‚ùå ERRORE durante la cucitura: {e}")
            break 
        finally:
            if 'df_A' in locals(): del df_A
            if 'df_B' in locals(): del df_B
            gc.collect()

    print(f"Dataset perfetto salvato in: {OUTPUT_DIR}")

--- FASE B: Stitching Sequenziale avviato ---
Lettura blocchi da: Dataset_Segmentato_15Giorni
Salvataggio finale in: /home/al3th3ia/Scrivania/Cybersecurity/Detecting-Trajectory-Spoofing-Attacks-on-AIS/Progetto/Pre-Elaborazione Dati/Dataset_Stitched_Finale

Blocco 0 (blocco_000-segmentato.parquet) copiato, nessuna correzione necessaria.

--- Inizio cucitura: blocco_000-segmentato.parquet -> blocco_001-segmentato.parquet ---
  Trovati confini, calcolo mappa...
  -> Trovate 3127 cuciture da applicare.
  Rilascio memoria Blocco A...
  Applicazione correzioni a Blocco B...
  -> Blocco blocco_001-segmentato.parquet corretto e salvato.

--- Inizio cucitura: blocco_001-segmentato.parquet -> blocco_002-segmentato.parquet ---
  Trovati confini, calcolo mappa...
  -> Trovate 2913 cuciture da applicare.
  Rilascio memoria Blocco A...
  Applicazione correzioni a Blocco B...
  -> Blocco blocco_002-segmentato.parquet corretto e salvato.

--- Inizio cucitura: blocco_002-segmentato.parquet -> blocco_00

#### Verifica dell'algoritmo di cucitura

In questa sezione viene integrata una verifica delle cuciture appena svolte. Questo viene fatto analizzando tutti i gap temporali inferiori ad 1 ora e successivamente si vede se tra questi c'√® differenza di **TrajectorID**.

In [2]:
INPUT_DIR = 'Dataset_Stitched_Finale' 
TIME_GAP_THRESHOLD = pd.Timedelta(hours=1)

all_blocks = sorted(glob.glob(os.path.join(INPUT_DIR, '*.parquet')))

if not all_blocks:
    print(f"ERRORE: Nessun file blocco trovato in {INPUT_DIR}.")
else:
    total_missed_stitches = 0

    print("Inizio scansione...")

    for i in range(len(all_blocks) - 1):
        path_A = all_blocks[i]
        path_B = all_blocks[i+1]
        
        try:
            df_A = pd.read_parquet(path_A)
            df_B = pd.read_parquet(path_B)

            
            last_records_A = df_A.loc[df_A.groupby('MMSI')['Timestamp'].idxmax()]
            last_records_A = last_records_A[['MMSI', 'Timestamp', 'TrajectoryID']].rename(
                columns={'Timestamp': 'Last_Timestamp', 'TrajectoryID': 'ID_A'}
            )

            
            first_records_B = df_B.loc[df_B.groupby('MMSI')['Timestamp'].idxmin()]
            first_records_B = first_records_B[['MMSI', 'Timestamp', 'TrajectoryID']].rename(
                columns={'Timestamp': 'First_Timestamp', 'TrajectoryID': 'ID_B'}
            )

            
            boundary_check = pd.merge(last_records_A, first_records_B, on='MMSI')

            
            boundary_check['TimeDiff'] = boundary_check['First_Timestamp'] - boundary_check['Last_Timestamp']

            
            stitchable_gaps = boundary_check[boundary_check['TimeDiff'] <= TIME_GAP_THRESHOLD]
            
            if not stitchable_gaps.empty:
                
                missed_stitches = stitchable_gaps[stitchable_gaps['ID_A'] != stitchable_gaps['ID_B']]
                
                local_missed_count = len(missed_stitches)
                total_missed_stitches += local_missed_count
                
                print(f"Confine {i+1}: Trovati {len(stitchable_gaps)} gap (<= 1h). Di questi, {local_missed_count} cuciture mancate.")
            else:
                print(f"Confine {i+1}: Nessun gap (<= 1h) trovato.")

        except Exception as e:
            print(f"  -> ERRORE durante il controllo del confine {i+1}: {e}")
            
        finally:
            del df_A, df_B, last_records_A, first_records_B, boundary_check, stitchable_gaps
            if 'missed_stitches' in locals(): del missed_stitches
            gc.collect()

    print(f"RISULTATO FINALE: Trovate {total_missed_stitches} cuciture mancate.")

Inizio scansione...
Confine 1: Trovati 3127 gap (<= 1h). Di questi, 0 cuciture mancate.
Confine 2: Trovati 2913 gap (<= 1h). Di questi, 0 cuciture mancate.
Confine 3: Trovati 3666 gap (<= 1h). Di questi, 0 cuciture mancate.
Confine 4: Trovati 3450 gap (<= 1h). Di questi, 0 cuciture mancate.
Confine 5: Trovati 3353 gap (<= 1h). Di questi, 0 cuciture mancate.
Confine 6: Trovati 2838 gap (<= 1h). Di questi, 0 cuciture mancate.
Confine 7: Trovati 3095 gap (<= 1h). Di questi, 0 cuciture mancate.
Confine 8: Trovati 2501 gap (<= 1h). Di questi, 0 cuciture mancate.
Confine 9: Trovati 2540 gap (<= 1h). Di questi, 0 cuciture mancate.
Confine 10: Trovati 2657 gap (<= 1h). Di questi, 0 cuciture mancate.
Confine 11: Trovati 2115 gap (<= 1h). Di questi, 0 cuciture mancate.
Confine 12: Trovati 2019 gap (<= 1h). Di questi, 0 cuciture mancate.
Confine 13: Trovati 2353 gap (<= 1h). Di questi, 0 cuciture mancate.
Confine 14: Trovati 1890 gap (<= 1h). Di questi, 0 cuciture mancate.
Confine 15: Trovati 215

#### Verifica presenza dei duplicati

In [3]:
INPUT_DIR = 'Dataset_Stitched_Finale' 

all_blocks = sorted(glob.glob(os.path.join(INPUT_DIR, '*.parquet')))

if not all_blocks:
    print(f"ERRORE: Nessun file blocco trovato in {INPUT_DIR}.")
else:
    total_duplicates_found = 0
    print("Inizio scansione")

 
    for block_path in all_blocks:
        try:
            print(f"Controllo: {os.path.basename(block_path)}...")
    
            df_block = pd.read_parquet(block_path)
            
            local_duplicates = df_block.duplicated().sum()   
            
            if local_duplicates > 0:
                print(f"Trovate {local_duplicates} righe duplicate.")
                total_duplicates_found += local_duplicates
            
        except Exception as e:
            print(f"ERRORE durante il controllo del blocco: {e}")
        
        finally:
            if 'df_block' in locals():
                del df_block
            gc.collect()

    print(f"Sono state trovate {total_duplicates_found:,} righe duplicate in totale.")

Inizio scansione
Controllo: blocco_000-segmentato.parquet...
Trovate 7527 righe duplicate.
Controllo: blocco_001-segmentato.parquet...
Trovate 1245 righe duplicate.
Controllo: blocco_002-segmentato.parquet...
Trovate 1329 righe duplicate.
Controllo: blocco_003-segmentato.parquet...
Trovate 1064 righe duplicate.
Controllo: blocco_004-segmentato.parquet...
Trovate 2699 righe duplicate.
Controllo: blocco_005-segmentato.parquet...
Trovate 8737 righe duplicate.
Controllo: blocco_006-segmentato.parquet...
Trovate 5302 righe duplicate.
Controllo: blocco_007-segmentato.parquet...
Trovate 6240 righe duplicate.
Controllo: blocco_008-segmentato.parquet...
Trovate 5287 righe duplicate.
Controllo: blocco_009-segmentato.parquet...
Trovate 6058 righe duplicate.
Controllo: blocco_010-segmentato.parquet...
Trovate 6137 righe duplicate.
Controllo: blocco_011-segmentato.parquet...
Trovate 4072 righe duplicate.
Controllo: blocco_012-segmentato.parquet...
Trovate 4855 righe duplicate.
Controllo: blocco_013

#### Pulizia dei duplicati

In [None]:
INPUT_DIR = 'Dataset_Stitched_Finale' 
OUTPUT_DIR_NAME = 'Dataset' 
OUTPUT_DIR = os.path.join(os.getcwd(), OUTPUT_DIR_NAME)
NOME_COLONNA_MMSI = "MMSI" 

if os.path.exists(OUTPUT_DIR):
    shutil.rmtree(OUTPUT_DIR)
os.makedirs(OUTPUT_DIR, exist_ok=True)

print(f"Lettura blocchi da: {INPUT_DIR}")
print(f"Salvataggio in: {OUTPUT_DIR}\n")

all_blocks = sorted(glob.glob(os.path.join(INPUT_DIR, '*.parquet')))

if not all_blocks:
    print(f"ERRORE: Nessun file blocco trovato in {INPUT_DIR}.")
else:
    total_duplicates_removed = 0
    total_mmsi_removed = 0  
    
    print("Inizio scansione e pulizia (MMSI Specifici + Duplicati)...")
    print("Regole MMSI: Rimozione di zeri, '123456789' e cifre ripetute.")

    for block_path in all_blocks:
        block_name = os.path.basename(block_path)
        df_block = None 
        
        try:
            df_block = pd.read_parquet(block_path)
            rows_before_total = len(df_block)
            
            if rows_before_total == 0:
                print(f"Blocco {block_name}: Vuoto. Saltato.")
                continue
                
            if NOME_COLONNA_MMSI not in df_block.columns:
                print(f"ERRORE: Colonna '{NOME_COLONNA_MMSI}' non trovata in {block_name}. File saltato.")
                continue
            
            df_block['MMSI_str'] = df_block[NOME_COLONNA_MMSI].astype(str).str.strip()

            filtro_lunghezza = df_block['MMSI_str'].str.len() == 9
            filtro_numerico = df_block['MMSI_str'].str.isdigit()
            filtro_non_zeri = df_block['MMSI_str'] != '000000000'
            filtro_non_seq = df_block['MMSI_str'] != '123456789'
            filtro_non_ripetuti = df_block['MMSI_str'].apply(lambda x: len(set(x)) > 1)

            df_block = df_block[
                filtro_lunghezza & 
                filtro_numerico & 
                filtro_non_zeri & 
                filtro_non_seq & 
                filtro_non_ripetuti
            ]
            
            df_block.drop(columns=['MMSI_str'], inplace=True)
            
            rows_after_mmsi = len(df_block)
            local_mmsi_removed = rows_before_total - rows_after_mmsi
            total_mmsi_removed += local_mmsi_removed
            

            rows_before_duplicates = rows_after_mmsi
            df_block.drop_duplicates(inplace=True)
            
            rows_after_duplicates = len(df_block)
            local_duplicates_removed = rows_before_duplicates - rows_after_duplicates
            total_duplicates_removed += local_duplicates_removed

            print(f"Blocco {block_name}: Rimosse {local_mmsi_removed} righe (MMSI) | Rimosse {local_duplicates_removed} righe (Duplicati)")

            output_file = os.path.join(OUTPUT_DIR, block_name)
            df_block.to_parquet(output_file, index=False, engine='pyarrow', compression='snappy')
            
        except MemoryError:
            print(f"ERRORE DI MEMORIA: Il blocco {block_name} √® troppo grande per essere processato.")
            break 
        except Exception as e:
            print(f"ERRORE durante la pulizia del blocco {block_name}: {e}")

        finally:
            if df_block is not None:
                del df_block
            gc.collect() 

    print("\n--- PULIZIA COMPLETATA ---")
    print(f"Totale righe rimosse per MMSI non valido: {total_mmsi_removed}")
    print(f"Totale righe rimosse per duplicati: {total_duplicates_removed}")
    print(f"Il dataset finale √® in: {OUTPUT_DIR}")

Lettura blocchi da: Dataset_Stitched_Finale
Salvataggio in: /home/al3th3ia/Scrivania/Cybersecurity/Detecting-Trajectory-Spoofing-Attacks-on-AIS/Progetto/Pre-Elaborazione Dati/Dataset

Inizio scansione e pulizia (MMSI Specifici + Duplicati)...
Regole MMSI: Rimozione di zeri, '123456789' e cifre ripetute.
Blocco blocco_000-segmentato.parquet: Rimosse 1915 righe (MMSI) | Rimosse 7526 righe (Duplicati)
Blocco blocco_001-segmentato.parquet: Rimosse 1014 righe (MMSI) | Rimosse 1245 righe (Duplicati)
Blocco blocco_002-segmentato.parquet: Rimosse 1555 righe (MMSI) | Rimosse 1329 righe (Duplicati)
Blocco blocco_003-segmentato.parquet: Rimosse 1234 righe (MMSI) | Rimosse 1064 righe (Duplicati)
Blocco blocco_004-segmentato.parquet: Rimosse 996 righe (MMSI) | Rimosse 2699 righe (Duplicati)
Blocco blocco_005-segmentato.parquet: Rimosse 1144 righe (MMSI) | Rimosse 8737 righe (Duplicati)
Blocco blocco_006-segmentato.parquet: Rimosse 272 righe (MMSI) | Rimosse 5302 righe (Duplicati)
Blocco blocco_007-

#### Conteggio delle traiettorie uniche con fine valutativo

In [8]:
INPUT_DIR = 'Dataset' 

global_unique_ids = set()  # Utilizziamo il set perch√® memorizza solo valori unici

try:
    all_blocks = sorted(glob.glob(os.path.join(INPUT_DIR, '*.parquet')))
    
    if not all_blocks:
        print(f"ERRORE: Nessun file blocco trovato in {INPUT_DIR}.")
    else:
        
        for block_path in all_blocks:
            print(f"Processando {os.path.basename(block_path)}...")
            
            df = pd.read_parquet(block_path, columns=['TrajectoryID'])
            
            local_uniques = set(df['TrajectoryID'].unique())
            
            global_unique_ids.update(local_uniques)
            
            del df, local_uniques
            gc.collect()

        numero_totale_traiettorie = len(global_unique_ids)
        
        print(f"Il tuo dataset finale contiene {numero_totale_traiettorie:,} traiettorie uniche totali.")

except Exception as e:
    print(f"ERRORE")
    print(e)

Processando blocco_000-segmentato.parquet...
Processando blocco_001-segmentato.parquet...
Processando blocco_002-segmentato.parquet...
Processando blocco_003-segmentato.parquet...
Processando blocco_004-segmentato.parquet...
Processando blocco_005-segmentato.parquet...
Processando blocco_006-segmentato.parquet...
Processando blocco_007-segmentato.parquet...
Processando blocco_008-segmentato.parquet...
Processando blocco_009-segmentato.parquet...
Processando blocco_010-segmentato.parquet...
Processando blocco_011-segmentato.parquet...
Processando blocco_012-segmentato.parquet...
Processando blocco_013-segmentato.parquet...
Processando blocco_014-segmentato.parquet...
Processando blocco_015-segmentato.parquet...
Processando blocco_016-segmentato.parquet...
Processando blocco_017-segmentato.parquet...
Processando blocco_018-segmentato.parquet...
Processando blocco_019-segmentato.parquet...
Processando blocco_020-segmentato.parquet...
Processando blocco_021-segmentato.parquet...
Processand

#### Controllo finale

In [4]:
FILE_PATH_TEST = 'Dataset/blocco_000-segmentato.parquet'
COLUMNS_TO_READ = ['MMSI', 'Latitude', 'Longitude','SOG', 'COG', 'Timestamp','TrajectoryID']

df = pd.read_parquet(
        FILE_PATH_TEST, 
        columns=COLUMNS_TO_READ,
        engine='pyarrow' 
    )

df.head(50)



Unnamed: 0,MMSI,Latitude,Longitude,SOG,COG,Timestamp,TrajectoryID
0,100011758,43.92981,-124.62096,5.6,180.3,2024-07-09 15:37:13,1
1,100011758,43.93671,-124.61779,4.7,21.5,2024-07-09 20:39:09,2
2,100011758,43.93372,-124.61674,3.0,176.4,2024-07-09 20:56:43,2
3,100011758,43.93272,-124.61994,3.0,246.4,2024-07-09 20:58:02,2
4,100011758,43.95354,-124.6234,6.3,10.7,2024-07-09 23:49:54,3
5,100011758,43.95859,-124.62075,6.4,20.6,2024-07-09 23:53:08,3
6,100011758,43.96356,-124.61822,6.3,20.1,2024-07-09 23:56:14,3
7,100011758,43.96884,-124.61549,6.7,20.4,2024-07-09 23:59:25,3
8,100011758,43.97417,-124.61268,6.8,20.8,2024-07-10 00:02:39,3
9,100011758,43.97894,-124.60952,6.3,25.4,2024-07-10 00:05:46,3


In [5]:
rows,columns = df.shape
print(f"Numero di righe: {rows}, Numero di colonne: {columns}\n")

Numero di righe: 39959464, Numero di colonne: 7



##### Diagnosi del dataset per identificare gap all'interno delle traiettorie > 10 min

In [1]:
import polars as pl
import glob
import os
import gc

# --- CONFIGURAZIONE ---
INPUT_DIR = 'Dataset'  # La cartella finale del tuo notebook
GAP_LIMIT_SECONDS = 10 * 60  # 10 minuti
MIN_LENGTH = 30  # Filtro rete neurale

print(f"üîç AVVIO DIAGNOSI DATASET: {INPUT_DIR}")
print(f"   > Regola Gap Max: {GAP_LIMIT_SECONDS/60} min")
print(f"   > Regola Lunghezza Min: {MIN_LENGTH} righe")
print("-" * 60)

all_files = sorted(glob.glob(os.path.join(INPUT_DIR, '*.parquet')))

# Contatori Globali
total_trajectories = 0
valid_trajectories = 0
dropped_gap_10m = 0
dropped_too_short = 0

# Dizionario per tracciare ID che attraversano i file (per statistiche corrette)
# Mappa: ID -> {max_gap_found: bool, row_count: int}
global_stats = {}

for i, file_path in enumerate(all_files):
    try:
        # Leggiamo solo le colonne necessarie per la diagnosi
        df = pl.read_parquet(file_path).select(['TrajectoryID', 'Timestamp'])
        
        # Casting e Ordinamento
        df = df.with_columns(pl.col("Timestamp").cast(pl.Datetime))
        df = df.sort(["TrajectoryID", "Timestamp"])
        
        # Calcolo Gap Temporali (Delta tra righe consecutive dello stesso ID)
        df = df.with_columns([
            pl.col("Timestamp").diff().dt.total_seconds().over("TrajectoryID").alias("delta_sec")
        ])
        
        # Aggregazione Statistica per ID in questo file
        stats = df.group_by("TrajectoryID").agg([
            pl.col("delta_sec").max().alias("max_gap"),
            pl.len().alias("count")
        ])
        
        # Aggiornamento Statistiche Globali
        # Nota: Poich√© un ID pu√≤ essere spalmato su pi√π file, dobbiamo accumulare
        for row in stats.iter_rows(named=True):
            tid = row['TrajectoryID']
            gap = row['max_gap'] if row['max_gap'] is not None else 0
            cnt = row['count']
            
            if tid not in global_stats:
                global_stats[tid] = {'max_gap': gap, 'total_rows': cnt}
            else:
                # Aggiorniamo il gap massimo visto finora e sommiamo le righe
                global_stats[tid]['max_gap'] = max(global_stats[tid]['max_gap'], gap)
                global_stats[tid]['total_rows'] += cnt
        
        print(f"[{i+1}/{len(all_files)}] Analizzato {os.path.basename(file_path)}", end='\r')
        
        del df, stats
        gc.collect()

    except Exception as e:
        print(f"\n‚ùå Errore su {file_path}: {e}")

print(f"\n{'='*60}")
print("üìä RISULTATI ANALISI GLOBALE")

# Elaborazione finale dai dati accumulati
for tid, data in global_stats.items():
    total_trajectories += 1
    
    # Verifica Regole
    is_gap_ok = data['max_gap'] <= GAP_LIMIT_SECONDS
    is_len_ok = data['total_rows'] >= MIN_LENGTH
    
    if not is_gap_ok:
        dropped_gap_10m += 1
    elif not is_len_ok: # Se il gap √® ok, controlliamo la lunghezza
        dropped_too_short += 1
    else:
        valid_trajectories += 1

print(f"Totale Traiettorie Uniche: {total_trajectories:,}")
print("-" * 30)
print(f"‚ùå Scartate per Gap > 10min:   {dropped_gap_10m:,} ({(dropped_gap_10m/total_trajectories)*100:.1f}%)")
print(f"‚ùå Scartate per Righe < 30:    {dropped_too_short:,} ({(dropped_too_short/total_trajectories)*100:.1f}%)")
print("-" * 30)
print(f"‚úÖ TRAIETTORIE VALIDE (Target): {valid_trajectories:,} ({(valid_trajectories/total_trajectories)*100:.1f}%)")
print("=" * 60)

üîç AVVIO DIAGNOSI DATASET: Dataset
   > Regola Gap Max: 10.0 min
   > Regola Lunghezza Min: 30 righe
------------------------------------------------------------
[25/25] Analizzato blocco_024-segmentato.parquet
üìä RISULTATI ANALISI GLOBALE
Totale Traiettorie Uniche: 6,743,290
------------------------------
‚ùå Scartate per Gap > 10min:   3,634,344 (53.9%)
‚ùå Scartate per Righe < 30:    2,124,181 (31.5%)
------------------------------
‚úÖ TRAIETTORIE VALIDE (Target): 984,765 (14.6%)


##### Algoritmo di pulizia salti temporali e interpolazione per l'intero dataset

In [10]:
import polars as pl
import glob
import os
import shutil
import gc
import numpy as np
from datetime import timedelta

# --- CONFIGURAZIONE ---
INPUT_DIR = 'Dataset'
OUTPUT_DIR = 'Dataset_Ready_For_AI_FINAL'

# Parametri Blindati
GAP_LIMIT_MINUTES = 10      # Se buco > 10 min sui dati grezzi, scarta ID
MIN_LENGTH_ROWS = 30        # Minima lunghezza traiettoria finale
BATCH_SIZE = 250            # Aumentato leggermente per velocit√†, ma SAFE per RAM
MAX_DURATION_DAYS = 20      # Safety Valve per date corrotte

if os.path.exists(OUTPUT_DIR):
    shutil.rmtree(OUTPUT_DIR)
os.makedirs(OUTPUT_DIR, exist_ok=True)

COLS = ['MMSI', 'Latitude', 'Longitude', 'SOG', 'COG', 'Timestamp', 'TrajectoryID']
all_files = sorted(glob.glob(os.path.join(INPUT_DIR, '*.parquet')))

print(f"üè≠ AVVIO V11: PRODUZIONE FINALE (FULL DATASET)")
print(f"   > Input: {len(all_files)} file")
print(f"   > Output: {OUTPUT_DIR}")
print(f"   > Batch Size: {BATCH_SIZE} (Safe Mode)")
print("-" * 60)

previous_tail_df = None
total_rows_final = 0

for i, file_path in enumerate(all_files):
    out_name = os.path.basename(file_path)
    
    # Feedback visivo semplice
    print(f"[{i+1}/{len(all_files)}] Elaborazione {out_name}...", end='\r')
    
    try:
        # 1. CARICAMENTO
        df = pl.read_parquet(file_path).select(COLS)
        df = df.with_columns(pl.col("Timestamp").cast(pl.Datetime("us")))
        
        # 2. STITCHING (Buffer)
        if previous_tail_df is not None:
            previous_tail_df = previous_tail_df.select(COLS)
            df = pl.concat([previous_tail_df, df], how="vertical")
        
        df = df.sort(["TrajectoryID", "Timestamp"])
        current_tail = df.group_by("TrajectoryID", maintain_order=True).last()

        # 3. KILLER FILTER (Pre-Kill Gaps > 10m)
        df = df.with_columns([
            pl.col("Timestamp").diff().dt.total_seconds().over("TrajectoryID").fill_null(0).alias("delta_sec")
        ])
        
        bad_gap_ids = df.filter(pl.col("delta_sec") > (GAP_LIMIT_MINUTES * 60))["TrajectoryID"].unique().to_list()
        
        if len(bad_gap_ids) > 0:
            df = df.filter(~pl.col("TrajectoryID").is_in(bad_gap_ids))
            
        df = df.drop("delta_sec")

        if df.height == 0:
            previous_tail_df = None
            continue

        # 4. PREPARAZIONE VETTORIALE (Snap-to-Grid)
        df = df.with_columns([
            pl.col("Timestamp").dt.round("1m").alias("GridTime"),
            (pl.col("COG") * np.pi / 180).sin().alias("cog_sin"),
            (pl.col("COG") * np.pi / 180).cos().alias("cog_cos")
        ])

        # 5. CONSOLIDAMENTO
        data_consolidated = (
            df.group_by(["TrajectoryID", "GridTime"])
            .agg([
                pl.col("Latitude").mean(),
                pl.col("Longitude").mean(),
                pl.col("SOG").mean(),
                pl.col("cog_sin").mean(),
                pl.col("cog_cos").mean(),
                pl.col("MMSI").first()
            ])
            .sort(["TrajectoryID", "GridTime"])
        )
        
        # Free RAM
        del df
        gc.collect()

        # 6. MICRO-BATCHING
        unique_ids = data_consolidated["TrajectoryID"].unique(maintain_order=True).to_list()
        processed_chunks = []
        
        # Loop batch
        for k_idx, k in enumerate(range(0, len(unique_ids), BATCH_SIZE)):
            batch_ids = unique_ids[k : k + BATCH_SIZE]
            batch_data = data_consolidated.filter(pl.col("TrajectoryID").is_in(batch_ids))
            
            # Calcolo Range
            ranges = (
                batch_data.group_by("TrajectoryID")
                .agg([
                    pl.col("GridTime").min().alias("start"),
                    pl.col("GridTime").max().alias("end")
                ])
                .with_columns([
                    (pl.col("end") - pl.col("start")).dt.total_days().alias("duration_days")
                ])
            )
            
            valid_ranges = ranges.filter(pl.col("duration_days") <= MAX_DURATION_DAYS)
            
            if valid_ranges.height > 0:
                # Creazione Scheletro (Fix V10 applicato)
                skeleton = (
                    valid_ranges.select([
                        "TrajectoryID",
                        pl.datetime_ranges(
                            pl.col("start"), 
                            pl.col("end"), 
                            interval="1m", 
                            closed="both"
                        ).alias("GridTime")
                    ])
                    .explode("GridTime")
                )
                
                joined = skeleton.join(batch_data, on=["TrajectoryID", "GridTime"], how="left")
                
                # Interpolazione Coerente
                final_batch = joined.with_columns([
                    pl.col("Latitude").interpolate().over("TrajectoryID"),
                    pl.col("Longitude").interpolate().over("TrajectoryID"),
                    pl.col("SOG").interpolate().clip(0, 200).over("TrajectoryID"),
                    pl.col("cog_sin").interpolate().over("TrajectoryID"),
                    pl.col("cog_cos").interpolate().over("TrajectoryID"),
                    pl.col("MMSI").forward_fill().backward_fill().over("TrajectoryID")
                ])
                
                # Ricostruzione COG
                final_batch = final_batch.with_columns([
                    (np.arctan2(pl.col("cog_sin"), pl.col("cog_cos")) * 180 / np.pi).alias("COG_new")
                ]).with_columns([
                    ((pl.col("COG_new") + 360) % 360).alias("COG")
                ]).rename({"GridTime": "Timestamp"})
                
                # Filtro Lunghezza
                id_counts = final_batch.group_by("TrajectoryID").len()
                valid_len_list = id_counts.filter(pl.col("len") >= MIN_LENGTH_ROWS)["TrajectoryID"].to_list()
                final_batch = final_batch.filter(pl.col("TrajectoryID").is_in(valid_len_list))
                
                # Deduplica Buffer e Selezione Colonne
                final_batch = final_batch.select(COLS).unique(subset=['TrajectoryID', 'Timestamp'], keep='last', maintain_order=True)
                
                processed_chunks.append(final_batch)
            
            # Pulizia RAM aggressiva intra-batch
            del batch_data, ranges, valid_ranges
            if 'skeleton' in locals(): del skeleton
            if 'joined' in locals(): del joined
            if 'final_batch' in locals(): del final_batch
            
            # Feedback ogni 20 batch per non intasare il log
            if k_idx % 20 == 0:
                print(f"[{i+1}/{len(all_files)}] Batch {k_idx} processato...", end='\r')

        # Salvataggio
        if processed_chunks:
            final_df = pl.concat(processed_chunks).sort(["TrajectoryID", "Timestamp"])
            out_path = os.path.join(OUTPUT_DIR, out_name)
            final_df.write_parquet(out_path, compression='zstd')
            
            total_rows_final += final_df.height
            del final_df
        
        previous_tail_df = current_tail
        del data_consolidated, processed_chunks, unique_ids
        gc.collect()
        
        # Conferma completamento file
        print(f"[{i+1}/{len(all_files)}] {out_name}: COMPLETATO.            ")

    except Exception as e:
        print(f"\n‚ö†Ô∏è ERRORE {file_path}: {e}")
        previous_tail_df = None

print("\n" + "="*60)
print(f"üéâ PRODUZIONE COMPLETATA")
print(f"   Output: {OUTPUT_DIR}")
print(f"   Totale Righe Stimate: {total_rows_final:,}")
print("="*60)

üè≠ AVVIO V11: PRODUZIONE FINALE (FULL DATASET)
   > Input: 25 file
   > Output: Dataset_Ready_For_AI_FINAL
   > Batch Size: 250 (Safe Mode)
------------------------------------------------------------
[1/25] blocco_000-segmentato.parquet: COMPLETATO.            
[2/25] blocco_001-segmentato.parquet: COMPLETATO.            
[3/25] blocco_002-segmentato.parquet: COMPLETATO.            
[4/25] blocco_003-segmentato.parquet: COMPLETATO.            
[5/25] blocco_004-segmentato.parquet: COMPLETATO.            
[6/25] blocco_005-segmentato.parquet: COMPLETATO.            
[7/25] blocco_006-segmentato.parquet: COMPLETATO.            
[8/25] blocco_007-segmentato.parquet: COMPLETATO.            
[9/25] blocco_008-segmentato.parquet: COMPLETATO.            
[10/25] blocco_009-segmentato.parquet: COMPLETATO.            
[11/25] blocco_010-segmentato.parquet: COMPLETATO.            
[12/25] blocco_011-segmentato.parquet: COMPLETATO.            
[13/25] blocco_012-segmentato.parquet: COMPLETATO.

##### Controllo integrit√† temporale

In [11]:
import polars as pl
import glob
import os

INPUT_DIR = 'Dataset_Ready_For_AI_FINAL'  # La cartella di test

print("üïµÔ∏è VERIFICA INTEGRIT√Ä TEST 3 FILE")
all_files = sorted(glob.glob(os.path.join(INPUT_DIR, '*.parquet')))

total_broken = 0

for f in all_files:
    df = pl.read_parquet(f).select(['TrajectoryID', 'Timestamp'])
    
    # Calcolo delta
    df = df.with_columns([
        pl.col("Timestamp").diff().dt.total_seconds().over("TrajectoryID").alias("delta")
    ])
    
    # Cerchiamo errori (delta != 60s e non null)
    # Tolleranza 0.1s
    errors = df.filter(
        (pl.col("delta").is_not_null()) & 
        ((pl.col("delta") - 60).abs() > 0.1)
    )
    
    count = errors.height
    total_broken += count
    
    if count == 0:
        print(f"‚úÖ {os.path.basename(f)}: OK (Tutti delta = 60s)")
    else:
        print(f"‚ùå {os.path.basename(f)}: {count} errori trovati!")
        print(errors.head())

print("-" * 30)
if total_broken == 0:
    print("üéâ SUCCESSO TOTALE: L'algoritmo funziona ed √® sicuro.")
else:
    print("‚ö†Ô∏è Ci sono ancora problemi.")

üïµÔ∏è VERIFICA INTEGRIT√Ä TEST 3 FILE
‚úÖ blocco_000-segmentato.parquet: OK (Tutti delta = 60s)
‚úÖ blocco_001-segmentato.parquet: OK (Tutti delta = 60s)
‚úÖ blocco_002-segmentato.parquet: OK (Tutti delta = 60s)
‚úÖ blocco_003-segmentato.parquet: OK (Tutti delta = 60s)
‚úÖ blocco_004-segmentato.parquet: OK (Tutti delta = 60s)
‚úÖ blocco_005-segmentato.parquet: OK (Tutti delta = 60s)
‚úÖ blocco_006-segmentato.parquet: OK (Tutti delta = 60s)
‚úÖ blocco_007-segmentato.parquet: OK (Tutti delta = 60s)
‚úÖ blocco_008-segmentato.parquet: OK (Tutti delta = 60s)
‚úÖ blocco_009-segmentato.parquet: OK (Tutti delta = 60s)
‚úÖ blocco_010-segmentato.parquet: OK (Tutti delta = 60s)
‚úÖ blocco_011-segmentato.parquet: OK (Tutti delta = 60s)
‚úÖ blocco_012-segmentato.parquet: OK (Tutti delta = 60s)
‚úÖ blocco_013-segmentato.parquet: OK (Tutti delta = 60s)
‚úÖ blocco_014-segmentato.parquet: OK (Tutti delta = 60s)
‚úÖ blocco_015-segmentato.parquet: OK (Tutti delta = 60s)
‚úÖ blocco_016-segmentato.parque

In [13]:
import polars as pl
import glob
import os
import sys

# --- CONFIGURAZIONE ---
INPUT_DIR = 'Dataset_Ready_For_AI_FINAL'  # La cartella da controllare
EXPECTED_DELTA_SEC = 60
TOLERANCE_SEC = 0.1  # Tolleranza minima per i float

all_files = sorted(glob.glob(os.path.join(INPUT_DIR, '*.parquet')))

print(f"üïµÔ∏è  AVVIO DIAGNOSI INTEGRIT√Ä TEMPORALE")
print(f"    Target: {INPUT_DIR}")
print("-" * 60)

total_ids_set = set()
bad_ids_set = set()
bad_files_map = {} # Per sapere in quali file sono gli errori

for i, file_path in enumerate(all_files):
    fname = os.path.basename(file_path)
    try:
        # Leggiamo solo ID e Timestamp per velocit√†
        df = pl.read_parquet(file_path).select(['TrajectoryID', 'Timestamp'])
        
        # Ordiniamo per sicurezza
        df = df.sort(['TrajectoryID', 'Timestamp'])

        # Calcolo Delta
        df = df.with_columns([
            pl.col("Timestamp").diff().dt.total_seconds().over("TrajectoryID").alias("delta")
        ])

        # Troviamo ID con delta != 60 (escludendo i null che sono i primi punti di ogni traccia)
        # Un delta di 120s significa che manca 1 riga.
        bad_rows = df.filter(
            (pl.col("delta").is_not_null()) & 
            ((pl.col("delta") - EXPECTED_DELTA_SEC).abs() > TOLERANCE_SEC)
        )
        
        # Estraiamo gli ID unici problematici in questo file
        current_bad_ids = set(bad_rows["TrajectoryID"].unique().to_list())
        current_total_ids = set(df["TrajectoryID"].unique().to_list())
        
        # Aggiorniamo i contatori globali
        bad_ids_set.update(current_bad_ids)
        total_ids_set.update(current_total_ids)
        
        if len(current_bad_ids) > 0:
            bad_files_map[fname] = len(current_bad_ids)
            print(f"[{i+1}/{len(all_files)}] {fname}: ‚ö†Ô∏è TROVATE {len(current_bad_ids)} traiettorie rotte.", end='\r')
        else:
            print(f"[{i+1}/{len(all_files)}] {fname}: ‚úÖ Integrit√† OK.", end='\r')

    except Exception as e:
        print(f"\n‚ùå ERRORE LETTURA {fname}: {e}")

print("\n" + "="*60)
print("üìä REPORT DIAGNOSTICO")
print("="*60)

num_total = len(total_ids_set)
num_bad = len(bad_ids_set)
percent_bad = (num_bad / num_total * 100) if num_total > 0 else 0

print(f"Totale Traiettorie: {num_total:,}")
print(f"Traiettorie Guaste: {num_bad:,}")
print(f"Percentuale Danno:  {percent_bad:.2f}%")
print("-" * 60)

if percent_bad == 0:
    print("üü¢ IL DATASET √à PERFETTO. Nessuna azione necessaria.")
elif percent_bad < 5:
    print("üü° DANNO LIEVE. Consiglio: ELIMINA le traiettorie guaste.")
else:
    print("üî¥ DANNO ESTESO. Consiglio: Bisogna capire perch√© l'interpolazione fallisce cos√¨ tanto.")
print("=" * 60)

üïµÔ∏è  AVVIO DIAGNOSI INTEGRIT√Ä TEMPORALE
    Target: Dataset_Ready_For_AI_FINAL
------------------------------------------------------------
[25/25] blocco_024-segmentato.parquet: ‚úÖ Integrit√† OK.
üìä REPORT DIAGNOSTICO
Totale Traiettorie: 1,232,449
Traiettorie Guaste: 0
Percentuale Danno:  0.00%
------------------------------------------------------------
üü¢ IL DATASET √à PERFETTO. Nessuna azione necessaria.


In [20]:
import polars as pl
import glob
import os
import sys

# --- CONFIGURAZIONE ---
INPUT_DIR = 'Dataset_Ready_For_AI_FINAL'

# Definizione Split
SPLITS = {
    "TRAIN": range(0, 16),
    "VAL":   range(16, 20),
    "TEST":  range(20, 24),
    "EXTRA": range(24, 25) 
}

EXPECTED_DELTA = 60.0
TOLERANCE = 0.001 

print(f"üìä AVVIO AUDIT DEL DATASET (COLUMN NAME FIX)")
print(f"   Target: {INPUT_DIR}")
print("-" * 60)

all_files = sorted(glob.glob(os.path.join(INPUT_DIR, '*.parquet')))
if not all_files:
    print("‚ùå Nessun file trovato.")
    sys.exit()

# --- ACCUMULATORI ---
global_stats = {
    "total_rows": 0,
    "unique_ids": set(),
    "geo_bounds": {"min_lat": 90, "max_lat": -90, "min_lon": 180, "max_lon": -180},
    "sog_stats":  {"min": 1000, "max": -1},
    "time_gaps":  {"ok_60s": 0, "overlap_0s": 0, "errors": 0},
    "traj_lens":  [], 
    "split_counts": {"TRAIN": 0, "VAL": 0, "TEST": 0, "EXTRA": 0}
}

def get_split_name(idx):
    for name, rng in SPLITS.items():
        if idx in rng: return name
    return "EXTRA"

previous_file_last_rows = None
stitching_errors = 0

# --- ELABORAZIONE ---
for i, file_path in enumerate(all_files):
    fname = os.path.basename(file_path)
    split = get_split_name(i)
    
    try:
        # Caricamento
        df = pl.read_parquet(file_path)
        
        # 1. VOLUMETRIA
        rows = df.height
        ids = df["TrajectoryID"].unique().to_list()
        
        global_stats["total_rows"] += rows
        global_stats["unique_ids"].update(ids)
        global_stats["split_counts"][split] += rows
        
        # 2. FISICA
        global_stats["geo_bounds"]["min_lat"] = min(global_stats["geo_bounds"]["min_lat"], df["Latitude"].min())
        global_stats["geo_bounds"]["max_lat"] = max(global_stats["geo_bounds"]["max_lat"], df["Latitude"].max())
        global_stats["geo_bounds"]["min_lon"] = min(global_stats["geo_bounds"]["min_lon"], df["Longitude"].min())
        global_stats["geo_bounds"]["max_lon"] = max(global_stats["geo_bounds"]["max_lon"], df["Longitude"].max())
        
        global_stats["sog_stats"]["min"] = min(global_stats["sog_stats"]["min"], df["SOG"].min())
        global_stats["sog_stats"]["max"] = max(global_stats["sog_stats"]["max"], df["SOG"].max())
        
        # 3. ANALISI TEMPORALE
        df = df.sort(["TrajectoryID", "Timestamp"])
        
        df = df.with_columns([
            pl.col("Timestamp").diff().dt.total_seconds().over("TrajectoryID").fill_null(0).alias("delta")
        ])
        
        n_60 = df.filter((pl.col("delta") - 60).abs() < TOLERANCE).height
        n_0 = df.filter(pl.col("delta").abs() < TOLERANCE).height
        n_errors = df.filter(
            ((pl.col("delta") - 60).abs() >= TOLERANCE) & 
            (pl.col("delta").abs() >= TOLERANCE)
        ).height
        
        global_stats["time_gaps"]["ok_60s"] += n_60
        global_stats["time_gaps"]["overlap_0s"] += n_0 
        global_stats["time_gaps"]["errors"] += n_errors
        
        # 4. LUNGHEZZE
        lengths = df.group_by("TrajectoryID").len()["len"]
        global_stats["traj_lens"].append(lengths)

        # 5. CUCITURE (FIXED - Selezione Esplicita)
        # Selezioniamo SOLO le colonne che ci servono per evitare errori di indice
        current_starts = (
            df.group_by("TrajectoryID", maintain_order=True)
            .first()
            .select(["TrajectoryID", "Timestamp"]) # <--- FIX: Garantisce ordine colonne
        )
        
        if previous_file_last_rows is not None:
            prev_dict = {row[0]: row[1] for row in previous_file_last_rows.iter_rows()}
            
            for row in current_starts.iter_rows():
                # Ora siamo sicuri: 0=ID, 1=Timestamp
                tid = row[0] 
                curr_time = row[1] 
                
                if tid in prev_dict:
                    prev_time = prev_dict[tid]
                    diff = (curr_time - prev_time).total_seconds()
                    
                    if not (abs(diff - 60) < TOLERANCE or abs(diff) < TOLERANCE):
                        stitching_errors += 1

        # Salva coda per prossimo giro (Selezione Esplicita)
        last_rows = (
            df.group_by("TrajectoryID", maintain_order=True)
            .last()
            .select(["TrajectoryID", "Timestamp"]) # <--- FIX
        )
        previous_file_last_rows = last_rows
        
        print(f"[{i+1}/{len(all_files)}] Analizzato {fname} ({split})", end='\r')
        
    except Exception as e:
        print(f"\n‚ùå ERRORE CRITICO su {fname}: {e}")

# --- REPORT ---
print("\n" + "="*60)
print("üìÑ DATASET PASSPORT (REPORT FINALE)")
print("="*60)

all_lens = pl.concat(global_stats["traj_lens"])
len_stats = {
    "min": all_lens.min(),
    "max": all_lens.max(),
    "mean": all_lens.mean(),
    "median": all_lens.median()
}

print(f"\n1. DISTRIBUZIONE DATI (Split)")
print(f"   ‚Ä¢ TRAIN (0-15):  {global_stats['split_counts']['TRAIN']:,} righe")
print(f"   ‚Ä¢ VAL   (16-19): {global_stats['split_counts']['VAL']:,} righe")
print(f"   ‚Ä¢ TEST  (20-23): {global_stats['split_counts']['TEST']:,} righe")
print(f"   ‚Ä¢ EXTRA (24+):   {global_stats['split_counts']['EXTRA']:,} righe")
print(f"   --------------------------------------------------")
print(f"   ‚Ä¢ TOTALE REALE:  {global_stats['total_rows']:,} righe")
print(f"   ‚Ä¢ NAVI UNICHE:   {len(global_stats['unique_ids']):,} (ID univoci globali)")

print(f"\n2. COERENZA TEMPORALE (La Griglia)")
print(f"   ‚Ä¢ Delta 60s (Regolari): {global_stats['time_gaps']['ok_60s']:,}")
print(f"   ‚Ä¢ Delta Anomali (BUCHI): {global_stats['time_gaps']['errors']}  <-- DEVE ESSERE 0")

if global_stats["time_gaps"]["errors"] == 0:
    print("   ‚úÖ VERIFICA TEMPORALE SUPERATA: Nessun buco interno.")
else:
    print("   ‚ùå ATTENZIONE: Trovati buchi temporali interni ai file.")

print(f"\n3. INTEGRIT√Ä CUCITURE (Passaggio tra file)")
print(f"   ‚Ä¢ Errori di continuit√†: {stitching_errors}")
if stitching_errors == 0:
    print("   ‚úÖ CUCITURE PERFETTE.")
else:
    print(f"   ‚ö†Ô∏è NOTA: {stitching_errors} sovrapposizioni o gap ai bordi.")

print(f"\n4. FISICA E DENSIT√Ä")
print(f"   ‚Ä¢ Lat: {global_stats['geo_bounds']['min_lat']:.4f} / {global_stats['geo_bounds']['max_lat']:.4f}")
print(f"   ‚Ä¢ Lon: {global_stats['geo_bounds']['min_lon']:.4f} / {global_stats['geo_bounds']['max_lon']:.4f}")
print(f"   ‚Ä¢ SOG: Min {global_stats['sog_stats']['min']:.2f}, Max {global_stats['sog_stats']['max']:.2f}")
print(f"   ‚Ä¢ Durata Traj: Min {len_stats['min']}m, Max {len_stats['max']}m, Media {len_stats['mean']:.1f}m")

print("="*60)

üìä AVVIO AUDIT DEL DATASET (COLUMN NAME FIX)
   Target: Dataset_Ready_For_AI_FINAL
------------------------------------------------------------
[25/25] Analizzato blocco_024-segmentato.parquet (EXTRA)
üìÑ DATASET PASSPORT (REPORT FINALE)

1. DISTRIBUZIONE DATI (Split)
   ‚Ä¢ TRAIN (0-15):  136,786,489 righe
   ‚Ä¢ VAL   (16-19): 31,860,181 righe
   ‚Ä¢ TEST  (20-23): 42,379,530 righe
   ‚Ä¢ EXTRA (24+):   4,512,098 righe
   --------------------------------------------------
   ‚Ä¢ TOTALE REALE:  215,538,298 righe
   ‚Ä¢ NAVI UNICHE:   1,232,449 (ID univoci globali)

2. COERENZA TEMPORALE (La Griglia)
   ‚Ä¢ Delta 60s (Regolari): 214,299,754
   ‚Ä¢ Delta Anomali (BUCHI): 0  <-- DEVE ESSERE 0
   ‚úÖ VERIFICA TEMPORALE SUPERATA: Nessun buco interno.

3. INTEGRIT√Ä CUCITURE (Passaggio tra file)
   ‚Ä¢ Errori di continuit√†: 0
   ‚úÖ CUCITURE PERFETTE.

4. FISICA E DENSIT√Ä
   ‚Ä¢ Lat: 0.0085 / 80.8467
   ‚Ä¢ Lon: -179.5020 / 158.9307
   ‚Ä¢ SOG: Min 2.10, Max 102.30
   ‚Ä¢ Durata Traj: 