In [1]:
import asyncio
import websockets
import json
import pandas as pd
from datetime import datetime, timezone

apiKeys = pd.read_csv("/Users/maier-borst.h/Documents/apiKeys.csv", sep=";")
apiKeyAIS = apiKeys[apiKeys["Service"] == "AISStream"]["key"].iloc[0]


In [6]:
async def connect_ais_stream():
    async with websockets.connect("wss://stream.aisstream.io/v0/stream") as websocket:
        subscribe_message = {
            "APIKey": apiKeyAIS,  # Required !
            "BoundingBoxes": [
          [
            [
              30.40921772695401,
                31.844734608073395
            ],
            [
              11.603304207355961,
                              44.75088774620332
            ]
          ]
        ],  # Required!
            "FilterMessageTypes": ["PositionReport"]  # Optional!
        }

        subscribe_message_json = json.dumps(subscribe_message)
        await websocket.send(subscribe_message_json)

        try:
            await asyncio.wait_for(process_messages(websocket), timeout=120)
        except asyncio.TimeoutError:
            print("Timeout occurred. Stopping execution.")

async def process_messages(websocket):
    
    async for message_json in websocket:
        try:
            await asyncio.wait_for(process_messages(websocket), timeout=120)
            message = json.loads(message_json)
            message_type = message["MessageType"]
            if message_type == "PositionReport":
                ais_message = message['Message']['PositionReport']
                print(f"[{datetime.now(timezone.utc)}] ShipId: {ais_message['UserID']} Latitude: {ais_message['Latitude']} Longitude: {ais_message['Longitude']}")

        except asyncio.TimeoutError:
            print("Timeout occurred. Stopping execution.")
            websocket.close()

        
# Call the function
#asyncio.run(connect_ais_stream())

In [7]:
await connect_ais_stream()

CancelledError: 

In [23]:
import asyncio
import websockets
import json
import pandas as pd
from datetime import datetime, timezone
import os
apiKeys = pd.read_csv("/Users/maier-borst.h/Documents/apiKeys.csv", sep=";")
apiKeyAIS = apiKeys[apiKeys["Service"] == "AISStream"]["key"].iloc[0]
countervar=0

output_file = os.environ.get('AIS_OUTPUT_FILE', './output.json')

def serialize_datetime(obj): 
    if isinstance(obj, datetime): 
        return obj.isoformat() 
    raise TypeError("Type not serializable") 

def write_ship_geosjon(ships_data_array):
  features = []
  for ship_id, ship_properties in ships_data_array.items():
    ship_data = ship_properties["data"]
    if len(ship_data) > 1:
      geom_type = "LineString"
      coords = list(map(lambda x: [x["lon"], x["lat"]], ship_data))
    else:
      geom_type = "Point"
      coords = list(map(lambda x: [x["lon"], x["lat"]], ship_data))[0]
    features.append({
      "type": "Feature",
      "geometry": {
        "type": geom_type,
        "coordinates": coords
      },
      "properties": {
        "ship_id": f"{ship_properties['ship_id']}",
        "timestamps": list(map(lambda x: x["dateTime"], ship_data))
      }
    })
  gj = {
    "type": "FeatureCollection",
    "features": features
  }
  with open(output_file, 'w') as f:
    f.write(json.dumps(gj, default=serialize_datetime))


async def connect_ais_stream(countervar):
    ship_data = {}
    if "AIS_BBOX" in os.environ:
      bbox = json.loads(os.environ.get("AIS_BBOX"))
      print(f"bbox loaded: {bbox}")
    else:
      bbox = [[11.603304207355961, # min lat
              30.40921772695401 # min lon
              ],
              [31.844734608073395, # max lat
              44.75088774620332 # max long
            ]]

    with open(f"{output_file}_bbox.json", 'w') as f:
      # geojson is [lon, lat]
      bottom_left = [bbox[0][1], bbox[0][0]]
      top_right = [bbox[1][1], bbox[1][0]]
      print(bottom_left)
      print(top_right)
      bbox_gj = {
        "type": "Feature",
        "properties": {},
        "geometry": {
          "type": "Polygon",
          "coordinates": [[
            bottom_left,
            [bottom_left[0], top_right[1]],
            top_right,
            [top_right[0], bottom_left[1]],
            bottom_left
          ]]
        }
      }
      f.write(json.dumps(bbox_gj))

    async with websockets.connect("wss://stream.aisstream.io/v0/stream") as websocket:
        subscribe_message = {
          "APIKey": apiKeyAIS,
          "BoundingBoxes": [bbox],
          # "FiltersShipMMSI": ["368207620", "367719770", "211476060"],
          "FilterMessageTypes": ["PositionReport"]
        }

        subscribe_message_json = json.dumps(subscribe_message)
        await websocket.send(subscribe_message_json)
       # try:
        #    await asyncio.wait_for(process_messages(websocket), timeout=120)
        #except asyncio.TimeoutError:
         #   print("Timeout occurred. Stopping execution.")
            
        async for message_json in websocket:
            message = json.loads(message_json)
            countervar=countervar+1
            print(countervar)
            if(countervar>5):
                break
            message_type = message["MessageType"]

            if message_type == "PositionReport":
                # the message parameter contains a key of the message type which contains the message itself
                ais_message = message['Message']['PositionReport']
                
                # store data in memory
                if ais_message['UserID'] not in ship_data:
                  ship_data[ais_message['UserID']] = {
                    "ship_id": ais_message['UserID'],
                    "data": []
                  }
                
                ship_data[ais_message['UserID']]["data"].append({
                  "lat": ais_message['Latitude'],
                  "lon": ais_message['Longitude'],
                  "dateTime": datetime.now()
                })

                # dump current in memory data to output file
                write_ship_geosjon(ship_data)

In [24]:
await connect_ais_stream(countervar)

[30.40921772695401, 11.603304207355961]
[44.75088774620332, 31.844734608073395]
1
2
3
4
5
6


In [None]:
if __name__ == "__main__":
    asyncio.run(asyncio.run(connect_ais_stream()))
