# AIS Data Ingestion 

Vessel locations data is ingested from the Automatic Identification System (AIS) data available from the federal [Marine Cadastre website](https://hub.marinecadastre.gov/pages/vesseltraffic), and is processed in the following steps:
- read data from the csv urls corresponding to each calendar day
- drop unnessary columns
- filter to only include cargo vessels
- cast datatypes appropriately
- save daily file to parquet

Notes:
- failed downloads are noted in print outputs
- rows that would cause parsing errors (e.g., if there is an extra comma on one row) are skipped and are thus missing from the saved outputs. A warning is printed in the cell output.

Descriptions of each column of the raw data are available at the [AIS Data Dictionary](https://coast.noaa.gov/data/marinecadastre/ais/data-dictionary.pdf).

In [5]:
#preliminaries
import pandas as pd
import polars as pl
from datetime import datetime
import os

#enable string cache for polars categoricals
pl.enable_string_cache()

In [6]:
#set variables
 
#start and end dates (format yyyy_mm_dd)
start_date = '2018_01_01'
end_date = '2024_06_30'

#vessel types - includes cargo and tanker types
cargo_types = pl.arange(70,90,eager=True)

In [7]:
#init days as series 
days = pl.date_range(datetime.strptime(start_date, '%Y_%m_%d'),
                  datetime.strptime(end_date, '%Y_%m_%d'), eager=True)

#define processing function
def process(day, replace=False):
    #get year
    year = day.year
    #convert day to string
    day = day.strftime('%Y_%m_%d')
    #load from url to pandas df
    try:
        day_raw = (
            pd.read_csv(
                f'https://coast.noaa.gov/htdata/CMSP/AISDataHandler/{year}/AIS_{day}.zip',
                low_memory=False, 
                #print warning and skip row when parsing error is encountered
                on_bad_lines='warn'
            )
        )
        print(f'Download complete for {day}.')
        try:
            #convert to polars
            day_df = pl.DataFrame(day_raw,infer_schema_length=0)
            #process data
            day_df = (
                day_df
                #keep only cargo vessels
                .filter(pl.col('VesselType').is_in(cargo_types))
                #keep cols of interest
                .select('MMSI', 'BaseDateTime','LAT', 'LON', 'SOG', 'COG', 
                        'Heading', 'Status', 'VesselName', 'VesselType', 'IMO',
                        'Length', 'Width', 'Draft','Cargo')
                #give pythonic names
                .rename({
                    'MMSI':'mmsi',
                    'BaseDateTime':'time',
                    'LAT':'lat',
                    'LON':'lon',
                    'SOG':'speed',
                    'COG':'course',
                    'Heading':'heading',
                    'Status':'status',
                    'VesselName':'vessel_name',
                    'VesselType':'vessel_type',
                    'IMO':'imo',
                    'Length':'length',
                    'Width':'width',
                    'Draft':'draft',
                    'Cargo':'cargo'
                })
                #clean cols
                .with_columns(
                    #strip IMO prefix and cast to int
                    imo = pl.col('imo').str.strip_prefix('IMO').cast(pl.Int64),
                    #clean course and heading 
                    course = pl.col('course').replace(360.0,None),
                    heading = pl.col('heading').replace(511.0,None)
                )
                #cast
                .cast({
                    'time':pl.Datetime,
                    'vessel_name':pl.Categorical
                })
                #deduplicate
                .unique()
            )
            #write processed file to parquet
            day_df.write_parquet(f'../data/ais_clean/ais_{day}.parquet')
            print(f'{day} successfully processed and saved to parquet.')
        except:
            try:
                #write raw file to csv 
                day_raw.to_csv(f'../data/ais_processing_errors/ais_{day}.csv')
                print(f'WARNING: Error in processing {day} data. Raw file saved to CSV instead.' )
            except:
                print(f'WARNING: Error in processing {day} data. FILE LOST.')
    except:
        print(f'Error importing {day} to dataframe - Url may be invalid or download may have failed.')
    
#define main function
def ais_ingest(days=days, replace=False):
    if replace:
        for day in days:
            process(day)
    else:
        for day in days:
            if not os.path.exists(f'../data/ais_clean/ais_{day.strftime('%Y_%m_%d')}.parquet'):
                process(day)
            else:
                continue

In [8]:
#run main function
ais_ingest()

Download complete for 2024_06_04.
2024_06_04 successfully processed and saved to parquet.
Download complete for 2024_06_05.
2024_06_05 successfully processed and saved to parquet.
Download complete for 2024_06_06.
2024_06_06 successfully processed and saved to parquet.
Download complete for 2024_06_07.
2024_06_07 successfully processed and saved to parquet.
Download complete for 2024_06_08.
2024_06_08 successfully processed and saved to parquet.
Download complete for 2024_06_09.
2024_06_09 successfully processed and saved to parquet.
Download complete for 2024_06_10.
2024_06_10 successfully processed and saved to parquet.
Download complete for 2024_06_11.
2024_06_11 successfully processed and saved to parquet.
Download complete for 2024_06_12.
2024_06_12 successfully processed and saved to parquet.
Download complete for 2024_06_13.
2024_06_13 successfully processed and saved to parquet.
Download complete for 2024_06_14.
2024_06_14 successfully processed and saved to parquet.
Download c