In [3]:
import os
import pandas as pd

# Path where files are stored
data_dir = r"D:\Projects\Hackathon\Agentic Hackathon\data\raw\ais_dummy\noaa_2013"

# List all files
all_files = sorted(os.listdir(data_dir))

# Separate logs and ship files
log_files = [f for f in all_files if 'logs' in f and f.endswith(".parquet")]
ship_files = [f for f in all_files if 'ships' in f and f.endswith(".parquet")]

print(f"Found {len(log_files)} log files and {len(ship_files)} ship files.")

# Function to get schema of each file
def get_schema(file_list, base_path):
    schemas = {}
    for file in file_list:
        filepath = os.path.join(base_path, file)
        try:
            df = pd.read_parquet(filepath, engine='pyarrow')  # load full schema
            schemas[file] = list(df.columns)
            print(f"✅ Loaded schema from: {file}")
        except Exception as e:
            print(f"❌ Failed to read {file}: {e}")
            schemas[file] = []
    return schemas

# Step 1: Check schema consistency
log_schemas = get_schema(log_files, data_dir)
ship_schemas = get_schema(ship_files, data_dir)

# Step 2: Print schema differences if any
print("\nChecking schema consistency for logs:")
log_columns_set = set(tuple(cols) for cols in log_schemas.values())
if len(log_columns_set) == 1:
    print("✅ All log files have the same schema.")
else:
    print("❌ Inconsistent schemas in log files.")
    for fname, cols in log_schemas.items():
        print(f"{fname}: {cols}")

print("\nChecking schema consistency for ships:")
ship_columns_set = set(tuple(cols) for cols in ship_schemas.values())
if len(ship_columns_set) == 1:
    print("✅ All ship files have the same schema.")
else:
    print("❌ Inconsistent schemas in ship files.")
    for fname, cols in ship_schemas.items():
        print(f"{fname}: {cols}")


Found 12 log files and 12 ship files.
✅ Loaded schema from: 2023_NOAA_AIS_logs_01.parquet
✅ Loaded schema from: 2023_NOAA_AIS_logs_02.parquet
✅ Loaded schema from: 2023_NOAA_AIS_logs_03.parquet
✅ Loaded schema from: 2023_NOAA_AIS_logs_04.parquet
✅ Loaded schema from: 2023_NOAA_AIS_logs_05.parquet
✅ Loaded schema from: 2023_NOAA_AIS_logs_06.parquet
✅ Loaded schema from: 2023_NOAA_AIS_logs_07.parquet
✅ Loaded schema from: 2023_NOAA_AIS_logs_08.parquet
✅ Loaded schema from: 2023_NOAA_AIS_logs_09.parquet
✅ Loaded schema from: 2023_NOAA_AIS_logs_10.parquet
✅ Loaded schema from: 2023_NOAA_AIS_logs_11.parquet
✅ Loaded schema from: 2023_NOAA_AIS_logs_12.parquet
✅ Loaded schema from: 2023_NOAA_AIS_ships_01.parquet
✅ Loaded schema from: 2023_NOAA_AIS_ships_02.parquet
✅ Loaded schema from: 2023_NOAA_AIS_ships_03.parquet
✅ Loaded schema from: 2023_NOAA_AIS_ships_04.parquet
✅ Loaded schema from: 2023_NOAA_AIS_ships_05.parquet
✅ Loaded schema from: 2023_NOAA_AIS_ships_06.parquet
✅ Loaded schema from

In [1]:
import dask.dataframe as dd
import os

input_dir = r"D:\Projects\Hackathon\Agentic Hackathon\data\raw\ais_dummy\noaa_2013"
output_dir = os.path.join(input_dir, "merged_dask")
os.makedirs(output_dir, exist_ok=True)

# Read multiple parquet logs
ddf = dd.read_parquet(
    os.path.join(input_dir, "*logs*.parquet"),
    engine='pyarrow'
)

# Optional: repartition to smaller chunks (~100 MB each)
ddf = ddf.repartition(partition_size="100MB")

# Write to merged output
ddf.to_parquet(output_dir, engine='pyarrow', compression='snappy')

print(f"✅ Dask wrote merged logs to partitioned folder: {output_dir}")


✅ Dask wrote merged logs to partitioned folder: D:\Projects\Hackathon\Agentic Hackathon\data\raw\ais_dummy\noaa_2013\merged_dask


ignoring exception in ensure_cleanup_on_exception
Traceback (most recent call last):
  File "d:\Projects\Hackathon\Agentic Hackathon\illegal_fish\Lib\site-packages\dask\dataframe\shuffle.py", line 226, in ensure_cleanup_on_exception
    yield
  File "d:\Projects\Hackathon\Agentic Hackathon\illegal_fish\Lib\site-packages\dask\dataframe\shuffle.py", line 108, in collect
    res = p.get(part)
          ^^^^^^^^^^^
  File "d:\Projects\Hackathon\Agentic Hackathon\illegal_fish\Lib\site-packages\partd\core.py", line 71, in get
    return self.get([keys], **kwargs)[0]
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "d:\Projects\Hackathon\Agentic Hackathon\illegal_fish\Lib\site-packages\partd\core.py", line 77, in get
    return self._get(keys, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "d:\Projects\Hackathon\Agentic Hackathon\illegal_fish\Lib\site-packages\partd\encode.py", line 29, in _get
    return [self.join([self.decode(frame) for frame in framesplit(chunk)])
           ^^^^^^^^^^

In [1]:
import dask.dataframe as dd

# Load from partitioned Parquet directory
df = dd.read_parquet("D:/Projects/Hackathon/Agentic Hackathon/data/raw/ais_dummy/noaa_2013/merged_dask")

# Basic checks
print("Columns:", df.columns)
print("Dask Partition Info:")
print(df.npartitions)

# Sample records
print(df.head())


Columns: Index(['MMSI', 'BaseDateTime', 'LAT', 'LON', 'SOG', 'COG', 'Heading'], dtype='object')
Dask Partition Info:
1923
        MMSI BaseDateTime       LAT        LON   SOG    COG  Heading
0  129129903   2023-01-01  18.81136  -66.46386   6.5  338.8    338.0
1  209920000   2023-01-01  28.86785  -89.39993   6.2   48.3     50.0
2  212259000   2023-01-01  48.42268 -123.37375   0.0  335.9    321.0
3  215567000   2023-01-01  26.25989  -79.05443  14.3  273.9    275.0
4  241454000   2023-01-01  29.10736  -94.41600   7.9    3.6      6.0


In [2]:
import dask.dataframe as dd
import pandas as pd

df = dd.read_parquet("D:/Projects/Hackathon/Agentic Hackathon/data/raw/ais_dummy/noaa_2013/merged_dask")

# Convert BaseDateTime to datetime
df["BaseDateTime"] = dd.to_datetime(df["BaseDateTime"], errors="coerce")

# Drop rows with missing critical values
df = df.dropna(subset=["MMSI", "BaseDateTime", "LAT", "LON", "SOG", "COG", "Heading"])

# Sanity check
print(df.dtypes)
print(df.head())


MMSI                   float64
BaseDateTime    datetime64[ns]
LAT                    float64
LON                    float64
SOG                    float64
COG                    float64
Heading                float64
dtype: object
        MMSI BaseDateTime       LAT        LON   SOG    COG  Heading
0  129129903   2023-01-01  18.81136  -66.46386   6.5  338.8    338.0
1  209920000   2023-01-01  28.86785  -89.39993   6.2   48.3     50.0
2  212259000   2023-01-01  48.42268 -123.37375   0.0  335.9    321.0
3  215567000   2023-01-01  26.25989  -79.05443  14.3  273.9    275.0
4  241454000   2023-01-01  29.10736  -94.41600   7.9    3.6      6.0


In [3]:
import dask.dataframe as dd
import pandas as pd
import numpy as np
import os
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix, ConfusionMatrixDisplay
import matplotlib.pyplot as plt
import joblib


In [4]:
# Change this to your merged parquet folder
data_path = "D:/Projects/Hackathon/Agentic Hackathon/data/raw/ais_dummy/noaa_2013/merged_dask"
ddf = dd.read_parquet(data_path)

print("Columns:", ddf.columns)
print("Dask Partition Info:", ddf.npartitions)
ddf.head()


Columns: Index(['MMSI', 'BaseDateTime', 'LAT', 'LON', 'SOG', 'COG', 'Heading'], dtype='object')
Dask Partition Info: 1923


Unnamed: 0,MMSI,BaseDateTime,LAT,LON,SOG,COG,Heading
0,129129903,2023-01-01,18.81136,-66.46386,6.5,338.8,338.0
1,209920000,2023-01-01,28.86785,-89.39993,6.2,48.3,50.0
2,212259000,2023-01-01,48.42268,-123.37375,0.0,335.9,321.0
3,215567000,2023-01-01,26.25989,-79.05443,14.3,273.9,275.0
4,241454000,2023-01-01,29.10736,-94.416,7.9,3.6,6.0


In [None]:
import os
import numpy as np
import pandas as pd
import dask.dataframe as dd
from geopy.distance import geodesic
import time 
since = time.time()
# ✅ Vectorized safe distance computation using geopy
def process_partition_vectorized(df):
    df = df.sort_values(['MMSI', 'BaseDateTime'])

    # Shifting & Diff
    df['lat_shift'] = df.groupby('MMSI')['LAT'].shift(1)
    df['lon_shift'] = df.groupby('MMSI')['LON'].shift(1)
    df['sog_diff'] = df.groupby('MMSI')['SOG'].diff()
    df['time_diff'] = df.groupby('MMSI')['BaseDateTime'].diff().dt.total_seconds()

    # Vectorized Geodesic Distance
    def geodesic_distance_np(lat1, lon1, lat2, lon2):
        return np.vectorize(
            lambda a, b, c, d: geodesic((a, b), (c, d)).meters
            if not np.any(pd.isnull([a, b, c, d])) else np.nan
        )(lat1, lon1, lat2, lon2)

    df['distance'] = geodesic_distance_np(
        df['LAT'].values, df['LON'].values,
        df['lat_shift'].values, df['lon_shift'].values
    )

    return df.dropna(subset=['distance', 'sog_diff', 'time_diff'])

# 📁 Input & Output Paths
input_path = "D:/Projects/Hackathon/Agentic Hackathon/data/raw/ais_dummy/noaa_2013/merged_dask"
output_path = os.path.join("D:/Projects/Hackathon/Agentic Hackathon/data/raw/ais_dummy/noaa_2013", "processed_partitions")
os.makedirs(output_path, exist_ok=True)

# 📦 Load Dask DataFrame
ddf = dd.read_parquet(input_path)
print("Total Partitions:", ddf.npartitions)

# 🔁 Partition-wise Batch Processing
for i in range(ddf.npartitions):
    print(f"⏳ Processing partition {i+1}/{ddf.npartitions}")
    df = ddf.get_partition(i).compute()
    df = df.dropna(subset=['LAT', 'LON', 'BaseDateTime', 'SOG'])
    df['BaseDateTime'] = pd.to_datetime(df['BaseDateTime'])

    try:
        processed_df = process_partition_vectorized(df)
        processed_df.to_parquet(os.path.join(output_path, f"processed_part_{i}.parquet"))
    except Exception as e:
        print(f"❌ Error in partition {i}: {e}")
        continue
elapsed_time = time.time() - since
print(f"The Training time is {elapsed_time//60:.0f}m {elapsed_time%60:.0f}s")
print("✅ All partitions processed and saved to disk.")


Total Partitions: 1923
⏳ Processing partition 1/1923
⏳ Processing partition 2/1923
⏳ Processing partition 3/1923
⏳ Processing partition 4/1923
⏳ Processing partition 5/1923
⏳ Processing partition 6/1923
⏳ Processing partition 7/1923
⏳ Processing partition 8/1923
⏳ Processing partition 9/1923
⏳ Processing partition 10/1923
⏳ Processing partition 11/1923
⏳ Processing partition 12/1923
⏳ Processing partition 13/1923
⏳ Processing partition 14/1923
⏳ Processing partition 15/1923
⏳ Processing partition 16/1923
⏳ Processing partition 17/1923
⏳ Processing partition 18/1923
⏳ Processing partition 19/1923
⏳ Processing partition 20/1923
⏳ Processing partition 21/1923
⏳ Processing partition 22/1923
⏳ Processing partition 23/1923
⏳ Processing partition 24/1923
⏳ Processing partition 25/1923
⏳ Processing partition 26/1923
⏳ Processing partition 27/1923
⏳ Processing partition 28/1923
⏳ Processing partition 29/1923
⏳ Processing partition 30/1923
⏳ Processing partition 31/1923
⏳ Processing partition 32

In [None]:
# Concatenate all cleaned and enriched partitions
processed_batches = "D:/Projects/Hackathon/Agentic Hackathon/data/raw/ais_dummy/noaa_2013/processed_partitions"
pdf = pd.concat(processed_batches, ignore_index=True)
print("✅ Batch-wise processing complete.")
pdf.head()

In [None]:
features = ['SOG', 'sog_diff', 'time_diff', 'distance']
X = pdf[features]
y = (pdf['SOG'] < 1).astype(int)  # Anomaly label example

In [None]:
# Group by vessel and calculate speed and heading differences
ddf['SOG'] = ddf['SOG'].fillna(0)
ddf['COG'] = ddf['COG'].fillna(0)

ddf['sog_diff'] = ddf.groupby('MMSI')['SOG'].diff().fillna(0)
ddf['cog_diff'] = ddf.groupby('MMSI')['COG'].diff().fillna(0)

# Label suspicious activity (placeholder logic)
ddf['label'] = ((ddf['sog_diff'].abs() > 10) | (ddf['cog_diff'].abs() > 30)).astype(int)

ddf = ddf.persist()
ddf.head()


In [None]:
# Select features
features = ['SOG', 'COG', 'Heading', 'sog_diff', 'cog_diff']
target = 'label'

df = ddf[features + [target]].dropna().compute()
print("Final dataset shape:", df.shape)
df.head()


In [None]:
X = df[features]
y = df[target]

scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

X_train, X_test, y_train, y_test = train_test_split(
    X_scaled, y, test_size=0.2, random_state=42, stratify=y
)


In [None]:
import time
since = time.time()
clf = RandomForestClassifier(n_estimators=100, random_state=42, class_weight='balanced')
clf.fit(X_train, y_train)
elapsed_time = time.time() - since
print(f"The Training time is {elapsed_time//60:.0f}m {elapsed_time%60:.0f}s") 
y_pred = clf.predict(X_test)

In [None]:
print("Classification Report:\n", classification_report(y_test, y_pred))
print("Confusion Matrix:\n", confusion_matrix(y_test, y_pred))

ConfusionMatrixDisplay.from_predictions(y_test, y_pred, cmap="Blues")
plt.title("Confusion Matrix")
plt.grid(False)
plt.tight_layout()
plt.show()

In [None]:
model_path = "../model_utils/noaa_2013"
os.makedirs(model_path, exist_ok=True)

joblib.dump(clf, os.path.join(model_path, "rf_model.joblib"))
joblib.dump(scaler, os.path.join(model_path, "scaler.joblib"))

print("✅ Model and scaler saved to:", model_path)