In [None]:
#!/usr/bin/env python3
"""
A Python script that:
  - Queries Spire Maritime's GraphQL API using the query defined in GRAPHQL_QUERY.
  - Paginates results (fetches up to 1000 per page) using internal pagination variables.
  - Runs continuously at a configurable interval (default: 60 seconds) to update AIS data.
  - Upserts new data into a CSV. Depending on configuration, it either:
       * Replaces the old AIS record for a vessel (one row per vessel), or
       * Saves historical messages by appending a new row if any field has changed.
  - Verbose printing lets you follow the ingestion steps.

IMPORTANT:
  1. Replace 'YOUR_SPIRE_TOKEN' with your actual Bearer token from Spire.
  2. Update the GRAPHQL_QUERY variable below if you wish to change filtering.
     This is now the only user–facing change.
  3. The script stores data in a file named 'vessels_data.csv' by default.
"""

import requests
import pandas as pd
from IPython.display import display  # Optional, for Jupyter
import os
import time

# -----------------------------------------------------------------------------
# 1. USER CONFIGURATION
# -----------------------------------------------------------------------------

SPIRE_GRAPHQL_ENDPOINT = "https://api.spire.com/graphql"
BEARER_TOKEN = "your-token"  # <-- REPLACE THIS
CSV_FILENAME = "vessels_data.csv"

# Run continuously? If False, the script runs once.
RUN_CONTINUOUS = True
# Interval between runs in seconds (default 60 seconds = 1 minute)
RUN_INTERVAL = 60

# Save historical messages? If True, new AIS data will be appended (only if data changed);
# If False, the latest AIS data for a vessel will simply replace the old one.
SAVE_HISTORY = False

# -----------------------------------------------------------------------------
# 2. THE GRAPHQL QUERY
#
#    NOTE: Update this query if you want to change your search/filtering.
#    For example, to change the MMSI value or use IMO, update the query below.
#    The pagination variables ($first and $after) are still supplied automatically.
# -----------------------------------------------------------------------------

GRAPHQL_QUERY = """
query GetVessels(
  $first: Int,
  $after: String
) {
  vessels(
    #shipType: [LNG_CARRIER]  # <-- Change this value or modify to use imo instead.
      areaOfInterest: { polygon: { type: "Polygon" coordinates: [ [ [-125.61293378057832,37.85152006848887],[-121.55390635761478,31.37524717787872],[-115.60295937310033,32.77427969380787],[-120.72834145802892,38.473576456397296],[-125.61293378057832,37.85152006848887] ] ] } }
      lastPositionUpdate:{
  startTime:"2025-06-27T17:10:00.00Z"
}
    first: $first,
    after: $after
  ) {
    pageInfo {
      endCursor
      hasNextPage
    }
    totalCount{
      relation
      value
    }
    nodes {
      staticData {
        name
        imo
        mmsi
        aisClass
        callsign
        flag
        shipType
        updateTimestamp
        dimensions {
          length
          width
        }
        validated {
          name
          imo
          callsign
          shipType
          dimensions {
            length
            width
          }
        }
      }
      lastPositionUpdate {
        accuracy
        collectionType
        course
        heading
        latitude
        longitude
        maneuver
        navigationalStatus
        rot
        speed
        timestamp
        updateTimestamp
      }
    }
  }
}
"""

# -----------------------------------------------------------------------------
# 3. HELPER FUNCTION TO FETCH (PAGINATE) DATA
# -----------------------------------------------------------------------------

def fetch_vessel_data(token: str, verbose: bool = True):
    """
    Fetch vessel data from Spire's GraphQL API in a paginated manner.
    Uses the GRAPHQL_QUERY defined above and supplies pagination variables.
    """
    all_nodes = []
    variables = {"first": 1000, "after": None}
    page_count = 0

    while True:
        page_count += 1
        if verbose:
            print(f"\n[fetch_vessel_data] Requesting page {page_count} ...")
        payload = {
            "query": GRAPHQL_QUERY,
            "variables": variables
        }
        headers = {
            "Content-Type": "application/json",
            "Authorization": f"Bearer {token}"
        }
        response = requests.post(
            SPIRE_GRAPHQL_ENDPOINT,
            json=payload,
            headers=headers,
            timeout=30
        )
        response.raise_for_status()
        data = response.json()
        if "errors" in data:
            raise Exception(f"GraphQL returned errors: {data['errors']}")
        vessels_data = data["data"]["vessels"]
        page_nodes = vessels_data["nodes"]
        all_nodes.extend(page_nodes)
        has_next_page = vessels_data["pageInfo"]["hasNextPage"]
        end_cursor = vessels_data["pageInfo"]["endCursor"]
        if verbose:
            print(f" - Page {page_count} returned {len(page_nodes)} nodes.")
            print(f" - hasNextPage = {has_next_page}, endCursor = {end_cursor}")
        if not has_next_page:
            break
        variables["after"] = end_cursor

    if verbose:
        print(f"[fetch_vessel_data] Finished. Total nodes fetched: {len(all_nodes)}")
    return all_nodes

# -----------------------------------------------------------------------------
# 4. FUNCTION TO UP-OR-INSERT THE RAW QUERY OUTPUT INTO CSV
#
#    This function:
#      - Uses pd.json_normalize to flatten the JSON data,
#      - Dynamically selects a primary key based on the query:
#            * If 'staticData.mmsi' is present, it is used.
#            * Otherwise, 'staticData.imo' is used.
#      - Depending on SAVE_HISTORY:
#            * If True: new rows are appended only if the AIS data for a vessel has changed.
#            * If False: new data replaces the previous record for a vessel.
#      - Writes the result to CSV.
# -----------------------------------------------------------------------------

def upsert_into_csv(nodes: list, csv_filename: str, save_history: bool, verbose: bool = True):
    """
    Merge the list of nodes from Spire into an existing CSV (if it exists)
    or create a new one. Dynamically handles columns based on the query output.

    Primary key selection:
      - If 'staticData.mmsi' exists, it is used.
      - Otherwise, 'staticData.imo' is used.

    In non-historical mode (save_history=False), rows for the same vessel (primary key)
    are merged with the new data overriding the old.

    In historical mode (save_history=True), new rows are appended only if the latest AIS data
    for a vessel (compared to the last historical record) has changed.
    """
    # Flatten the JSON data
    df_new = pd.json_normalize(nodes)

    if df_new.empty:
        if verbose:
            print("[upsert_into_csv] No new nodes to upsert.")
        return pd.read_csv(csv_filename) if os.path.exists(csv_filename) else df_new

    # Determine primary key
    primary_key = None
    if "staticData.mmsi" in df_new.columns:
        primary_key = "staticData.mmsi"
    elif "staticData.imo" in df_new.columns:
        primary_key = "staticData.imo"

    if primary_key is None:
        primary_key = "staticData.mmsi"
        #raise ValueError("Neither 'staticData.mmsi' nor 'staticData.imo' found in query results for primary key.")

    if verbose:
        print(f"[upsert_into_csv] Using '{primary_key}' as the primary key for upsert.")

    # Read existing CSV if available
    if os.path.isfile(csv_filename):
        if verbose:
            print(f"[upsert_into_csv] Reading existing CSV: {csv_filename}")
        df_existing = pd.read_csv(csv_filename, dtype=str)
    else:
        if verbose:
            print(f"[upsert_into_csv] No existing CSV found. Creating new DataFrame.")
        df_existing = pd.DataFrame()

    # Ensure the primary key is a string
    df_new[primary_key] = df_new[primary_key].astype(str)
    if not df_existing.empty and primary_key in df_existing.columns:
        df_existing[primary_key] = df_existing[primary_key].astype(str)

    if save_history:
        # Historical mode: append new row only if there is no previous record for that vessel,
        # or if the AIS data has changed compared to the last record.
        if not df_existing.empty and primary_key in df_existing.columns:
            # Get the last record for each vessel based on the primary key.
            last_records = df_existing.sort_index().groupby(primary_key, as_index=False).last()
        else:
            last_records = pd.DataFrame()

        rows_to_append = []
        for _, new_row in df_new.iterrows():
            key_val = new_row[primary_key]
            if not last_records.empty and key_val in last_records[primary_key].values:
                last_record = last_records[last_records[primary_key] == key_val].iloc[0]
                # Compare common columns (if any differences, append new record)
                common_cols = set(new_row.index).intersection(last_record.index)
                if new_row[list(common_cols)].to_dict() != last_record[list(common_cols)].to_dict():
                    rows_to_append.append(new_row)
                else:
                    if verbose:
                        print(f"[upsert_into_csv] No changes for {primary_key}={key_val}")
            else:
                rows_to_append.append(new_row)
        if rows_to_append:
            df_append = pd.DataFrame(rows_to_append)
            df_merged = pd.concat([df_existing, df_append], ignore_index=True)
        else:
            df_merged = df_existing.copy()
        if verbose:
            print(f"[upsert_into_csv] Historical upsert complete. Total rows: {len(df_merged)}")
    else:
        # Replacement mode: new data simply replaces old data based on primary key.
        if not df_existing.empty:
            df_combined = pd.concat([df_existing, df_new], ignore_index=True)
            df_merged = df_combined.groupby(primary_key, as_index=False).last()
        else:
            df_merged = df_new
        if verbose:
            print(f"[upsert_into_csv] Replacement upsert complete. Total rows: {len(df_merged)}")

    # Write merged DataFrame to CSV.
    df_merged.to_csv(csv_filename, index=False)
    return df_merged

# -----------------------------------------------------------------------------
# 5. MAIN ENTRY POINT
# -----------------------------------------------------------------------------

def main(verbose: bool = True):
    """
    Main routine:
      1. Fetch data using the query defined in GRAPHQL_QUERY.
      2. Upsert the data into the CSV based on the chosen mode.
      3. Optionally display the top rows (for demonstration in Jupyter).
    """
    if verbose:
        print("=== Starting script ===")
    nodes = fetch_vessel_data(token=BEARER_TOKEN, verbose=verbose)
    df_merged = upsert_into_csv(nodes=nodes, csv_filename=CSV_FILENAME, save_history=SAVE_HISTORY, verbose=verbose)
    if verbose:
        print(f"=== Final dataframe after upsert: {len(df_merged)} rows ===")
        display(df_merged.head(10))

In [None]:
# -----------------------------------------------------------------------------
# 6. EXECUTION LOOP
# -----------------------------------------------------------------------------

if __name__ == "__main__":
    if RUN_CONTINUOUS:
        while True:
            main(verbose=True)
            print(f"Waiting {RUN_INTERVAL} seconds until next update...\n")
            time.sleep(RUN_INTERVAL)
    else:
        main(verbose=True)