In [1]:
# ------------------------------------------------------------
# üìå STEP 0: SETUP AND CLEANUP: Remove Old Database if Exists
# ------------------------------------------------------------

In [2]:
import os

db_path = "ais_data.sqlite"

if os.path.exists(db_path):
    os.remove(db_path)
    print(f"üßπ Deleted old database: {db_path}")
else:
    print("‚úÖ No existing database found. You're good to go!")

üßπ Deleted old database: ais_data.sqlite


In [3]:
# -------------------------------------
# üìå STEP 1: IMPORT REQUIRED LIBRARIES
# -------------------------------------

In [4]:
import asyncio
import json
import sqlite3
import pandas as pd
from pyais import decode
from geopy.distance import geodesic
from collections import defaultdict
import unittest

In [5]:
# -------------------------
# üìå STEP 2: LOAD AIS DATA
# -------------------------

In [6]:
with open("ais_data.json", "r") as f:
    ais_data = json.load(f)

DB_NAME = "ais_data.sqlite"

In [7]:
# ------------------------------------------------
# üìå STEP 3: DATABASE INITIALIZATION & VALIDATION
# ------------------------------------------------

In [8]:
def initialize_db():
    conn = sqlite3.connect(DB_NAME)
    cursor = conn.cursor()
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS ais_messages (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            mmsi INTEGER,
            timestamp TEXT,
            lat REAL,
            lon REAL,
            speed REAL,
            heading INTEGER,
            course REAL,
            raw_payload TEXT,
            UNIQUE(mmsi, timestamp)
        )
    ''')
    cursor.execute("CREATE INDEX IF NOT EXISTS idx_timestamp ON ais_messages (timestamp)")
    cursor.execute("CREATE INDEX IF NOT EXISTS idx_mmsi ON ais_messages (mmsi)")
    conn.commit()
    conn.close()

def save_to_db(parsed, mmsi, timestamp, raw_payload):
    # Validate coordinates and timestamp
    if not (-90 <= parsed.lat <= 90): return False
    if not (-180 <= parsed.lon <= 180): return False
    timestamp_without_Z = timestamp.rstrip('Z')
    if pd.to_datetime(timestamp_without_Z, errors='coerce', utc=True) is pd.NaT:
        return False

    try:
        conn = sqlite3.connect(DB_NAME)
        cursor = conn.cursor()
        cursor.execute('''
            INSERT OR IGNORE INTO ais_messages
            (mmsi, timestamp, lat, lon, speed, heading, course, raw_payload)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?)
        ''', (
            mmsi, timestamp, parsed.lat, parsed.lon,
            parsed.speed, parsed.heading, parsed.course, raw_payload
        ))
        conn.commit()
        conn.close()
        return True
    except sqlite3.IntegrityError:
        return "duplicate"
    except Exception as e:
        print(f"‚ö†Ô∏è Error saving to DB: {e}")
        return False

In [9]:
# -------------------------------------
# üìå STEP 4: STREAM AND STORE FUNCTION
# -------------------------------------

In [10]:
async def stream_and_store_combined():
    initialize_db()
    count = 0
    invalid_count = 0
    duplicate_count = 0
    total = len(ais_data)

    for i, msg in enumerate(ais_data):
        try:
            decoded = decode(msg["payload"])
            result = save_to_db(decoded, msg["mmsi"], msg["timestamp"], msg["payload"])
            if result == True:
                count += 1
            elif result == "duplicate":
                duplicate_count += 1
            else:
                invalid_count += 1
            if (i + 1) % 100 == 0:
                print(f"‚úÖ Processed {i+1}/{total} messages...")
        except Exception as e:
            invalid_count += 1
            print(f"‚ö†Ô∏è Malformed at {msg['timestamp']}: {e}")

    print(f"\nüéâ Done! {count} saved, {duplicate_count} duplicates, {invalid_count} invalid.")
    conn = sqlite3.connect(DB_NAME)
    rows = conn.execute("SELECT * FROM ais_messages LIMIT 5").fetchall()
    print("\nüìä First 5 rows from DB:")
    for row in rows:
        print(row)
    conn.close()

# Run the async function
await stream_and_store_combined()

‚úÖ Processed 100/11487 messages...
‚úÖ Processed 200/11487 messages...
‚úÖ Processed 300/11487 messages...
‚úÖ Processed 400/11487 messages...
‚úÖ Processed 500/11487 messages...
‚úÖ Processed 600/11487 messages...
‚úÖ Processed 700/11487 messages...
‚úÖ Processed 800/11487 messages...
‚úÖ Processed 900/11487 messages...
‚úÖ Processed 1000/11487 messages...
‚úÖ Processed 1100/11487 messages...
‚úÖ Processed 1200/11487 messages...
‚úÖ Processed 1300/11487 messages...
‚úÖ Processed 1400/11487 messages...
‚úÖ Processed 1500/11487 messages...
‚úÖ Processed 1600/11487 messages...
‚úÖ Processed 1700/11487 messages...
‚úÖ Processed 1800/11487 messages...
‚úÖ Processed 1900/11487 messages...
‚úÖ Processed 2000/11487 messages...
‚úÖ Processed 2100/11487 messages...
‚úÖ Processed 2200/11487 messages...
‚úÖ Processed 2300/11487 messages...
‚úÖ Processed 2400/11487 messages...
‚úÖ Processed 2500/11487 messages...
‚úÖ Processed 2600/11487 messages...
‚úÖ Processed 2700/11487 messages...
‚úÖ Proces

In [11]:
# -----------------------------------------------
# üìå STEP 5: DATABASE INSPECTION & SAMPLE OUTPUT
# -----------------------------------------------

In [12]:
conn = sqlite3.connect(DB_NAME)

tables = conn.execute("SELECT name FROM sqlite_master WHERE type='table';").fetchall()
print(f"\nüìã Tables: {tables}")

total_rows = conn.execute("SELECT COUNT(*) FROM ais_messages").fetchone()[0]
print(f"üì¶ Total AIS Messages: {total_rows}")

distinct_mmsi = conn.execute("SELECT COUNT(DISTINCT mmsi) FROM ais_messages").fetchone()[0]
print(f"üõ•Ô∏è Distinct Vessels: {distinct_mmsi}")

time_range = conn.execute("SELECT MIN(timestamp), MAX(timestamp) FROM ais_messages").fetchone()
print(f"‚è∞ Time Range: {time_range[0]} ‚Üí {time_range[1]}")

invalid_coords = conn.execute("""
SELECT COUNT(*) FROM ais_messages
WHERE lat NOT BETWEEN -90 AND 90
   OR lon NOT BETWEEN -180 AND 180
""").fetchone()[0]
print("‚úÖ Coordinate validation passed." if invalid_coords == 0 else f"‚ö†Ô∏è Invalid coordinates: {invalid_coords}")

mmsi_list = [row[0] for row in conn.execute("SELECT DISTINCT mmsi FROM ais_messages").fetchall()]
print("\nüìã Sample 5 rows for each MMSI:")
for mmsi in mmsi_list:
    print(f"\nüö¢ MMSI {mmsi}:")
    sample = pd.read_sql_query(f"""
        SELECT * FROM ais_messages
        WHERE mmsi = {mmsi}
        ORDER BY timestamp
        LIMIT 5
    """, conn)
    print(sample)
conn.close()


üìã Tables: [('ais_messages',), ('sqlite_sequence',)]
üì¶ Total AIS Messages: 11487
üõ•Ô∏è Distinct Vessels: 3
‚è∞ Time Range: 2025-04-30T09:38:40.869824+00:00Z ‚Üí 2025-05-18T11:38:41.054395+00:00Z
‚úÖ Coordinate validation passed.

üìã Sample 5 rows for each MMSI:

üö¢ MMSI 123456789:
   id       mmsi                          timestamp        lat         lon  \
0   1  123456789  2025-04-30T09:38:40.869824+00:00Z  30.732393  121.827393   
1   2  123456789  2025-04-30T09:43:40.869824+00:00Z  30.735913  121.856710   
2   3  123456789  2025-04-30T09:48:40.869824+00:00Z  30.739435  121.886028   
3   4  123456789  2025-04-30T09:53:40.869824+00:00Z  30.742955  121.915345   
4   5  123456789  2025-04-30T09:58:40.869824+00:00Z  30.746477  121.944663   

   speed  heading  course                                      raw_payload  
0   18.0       90    90.0  !AIVDO,1,1,,A,11mg=5OP2l8ecW`AUM33Q2l1P000,0*69  
1   18.0       90    90.0  !AIVDO,1,1,,A,11mg=5OP2l8el=DAUUC3Q2l1P000,0*40  
2   18

In [13]:
# -------------------------------
# üìå STEP 6: UNIT TESTING FOR DB
# -------------------------------

In [14]:
class TestAISDatabase(unittest.TestCase):
    def setUp(self):
        self.conn = sqlite3.connect(DB_NAME)

    def tearDown(self):
        self.conn.close()

    def test_data_exists(self):
        count = self.conn.execute("SELECT COUNT(*) FROM ais_messages").fetchone()[0]
        self.assertGreater(count, 0, "Database should have data.")

    def test_lat_lon_valid(self):
        df = pd.read_sql_query("SELECT lat, lon FROM ais_messages LIMIT 100", self.conn)
        self.assertTrue(df['lat'].between(-90, 90).all(), "Latitudes must be valid")
        self.assertTrue(df['lon'].between(-180, 180).all(), "Longitudes must be valid")

unittest.main(argv=[''], exit=False)

..
----------------------------------------------------------------------
Ran 2 tests in 0.030s

OK


<unittest.main.TestProgram at 0x1cf6cbb17f0>

In [15]:
# -----------------------------------------------------
# üìå STEP 7: ANALYTICS: DISTANCE AND SPEED CALCULATION
# -----------------------------------------------------

In [16]:
vessels_info = {
    123456789: "Shanghai ‚Üí Los Angeles",
    987654321: "Singapore ‚Üí Sydney",
    192837465: "New York ‚Üí Rotterdam"
}

def get_full_track(mmsi, start_time=None, end_time=None):
    conn = sqlite3.connect(DB_NAME)
    query = f"""
    SELECT timestamp, lat, lon, speed, heading, course
    FROM ais_messages
    WHERE mmsi = {mmsi}
    """
    if start_time and end_time:
        query += f" AND timestamp BETWEEN '{start_time}' AND '{end_time}'"
    query += " ORDER BY timestamp"
    df = pd.read_sql_query(query, conn)
    conn.close()
    return df

def calculate_distance_and_speed(df):
    if df.empty or len(df) < 2:
        return None, None

    df["timestamp"] = pd.to_datetime(df["timestamp"].str.replace("Z", "", regex=False), utc=True, errors="coerce")
    df = df.dropna(subset=["timestamp"])
    df["latitude"] = df["lat"]
    df["longitude"] = df["lon"]

    total_distance_km = sum(
        geodesic((df.iloc[i-1]["latitude"], df.iloc[i-1]["longitude"]),
                 (df.iloc[i]["latitude"], df.iloc[i]["longitude"])).km
        for i in range(1, len(df))
    )
    time_diff_hours = (df["timestamp"].iloc[-1] - df["timestamp"].iloc[0]).total_seconds() / 3600
    avg_speed = total_distance_km / time_diff_hours if time_diff_hours > 0 else 0

    return total_distance_km, avg_speed

print("\nüìã Vessel Distance and Speed Summary:")
for mmsi, vessel_name in vessels_info.items():
    df_track = get_full_track(mmsi)
    distance, avg_speed = calculate_distance_and_speed(df_track)
    if distance is not None:
        print(f"\nüö¢ {vessel_name} (MMSI {mmsi}):")
        print(f"üìè Distance: {distance:.2f} km")
        print(f"üöÄ Avg Speed: {avg_speed:.2f} km/h")
    else:
        print(f"\n‚ö†Ô∏è {vessel_name} (MMSI {mmsi}): Not enough data.")


üìã Vessel Distance and Speed Summary:

üö¢ Shanghai ‚Üí Los Angeles (MMSI 123456789):
üìè Distance: 10700.66 km
üöÄ Avg Speed: 33.61 km/h

üö¢ Singapore ‚Üí Sydney (MMSI 987654321):
üìè Distance: 17842.91 km
üöÄ Avg Speed: 41.11 km/h

üö¢ New York ‚Üí Rotterdam (MMSI 192837465):
üìè Distance: 6176.98 km
üöÄ Avg Speed: 30.18 km/h
