# Decode AIS raw data and savev to HDF5

In [1]:
from pyais.stream import FileReaderStream
from pathlib import Path
import os

import h5py
import vaex

import numpy as np

import yaml

import pooch

In [2]:
vaex.multithreading.thread_count_default = 4

## Define constants

In [3]:
# Convert to meters from radians
EARTH_RADIUS_IN_M: int = 6371000

In [4]:
MAX_NUMBER_ROWS = 10000000

## Define paths for input and output files

In [5]:
data_folder = "./data"
input_folder = os.path.join(data_folder, "input")
output_folder = os.path.join(data_folder, "decoded")

### Create folders

In [6]:
for folder in [data_folder, input_folder, output_folder]:
    if not os.path.isdir(folder):
        os.mkdir(folder)

## Get input data

In [7]:
input_url = "https://raw.githubusercontent.com/aduvenhage/ais-decoder/master/data/nmea-sample.txt"

In [8]:
pooch.retrieve(
    url=input_url,
    known_hash=None,
    path=input_folder,
    fname=input_url.split("/")[-1]
)

Downloading data from 'https://raw.githubusercontent.com/aduvenhage/ais-decoder/master/data/nmea-sample.txt' to file '/Users/annef/Documents/T-SAR/nmea2hdf5/data/input/nmea-sample.txt'.
SHA256 hash of downloaded file: dca4f63fb42042929225c6f2bac20e31f37e943cf4d33055a3442947ce6ac6e1
Use this value as the 'known_hash' argument of 'pooch.retrieve' to ensure that the file hasn't changed if it is downloaded again in the future.


'/Users/annef/Documents/T-SAR/nmea2hdf5/data/input/nmea-sample.txt'

## Define column names, types and attributes

- Read from XML template file containing information about data group, variables and their associated attributes such as types, missing values, bounds, etc.

In [9]:
# Load parameters
h5template_yaml: Path = "./ais_metadata.yaml"

## Analyse, decode and save to HDF5

In [10]:
class H5AIS:
    def __init__(self, ofilename, h5template=None, MAX_NUMBER_ROWS = 2000000):
        self.output_filename = ofilename
        self.columns = []
        self.coltypes = []
        self.colmissings = []
        self.colsizes = []
        self.colattrs = []
        self.buffers = []
        self.h5file = h5py.File(ofilename, "w")
        if h5template is None:
            self.h5group = "data"
        else:
            self.get_from_template(h5template)
        self.MAX_NUMBER_ROWS = MAX_NUMBER_ROWS
        self.h5columns = self.h5file.create_group(self.h5group) # vaex reads all datasets in the columns group
        
    
    def get_from_template(self, h5template):
        yaml_file = open(f"{h5template}")
        data_info = yaml.load(yaml_file, Loader=yaml.FullLoader)["group"]
        self.h5group = data_info["group_name"]
        self.h5template = data_info
        i = 0
        for var in self.h5template["variables"].keys():
            v = self.h5template["variables"][var]
            self.columns.append(var)
            attrs = { }      
            for vk in v.keys():
                if "type" == vk:
                    self.coltypes.append(v[vk])
                elif "missing_value" == vk:
                    self.colmissings.append(v[vk])
                else:
                    attrs[vk] = v[vk]
            self.colattrs.append(attrs)
            self.buffers.append([])
            i+=1
        
    def _key2value(self, msg, key):
        idx = self.columns.index(key)
        attrs = self.colattrs[idx]
        if key == "status" and "status" in msg.asdict():
            colval = msg.asdict()[key].value
        elif key in msg.asdict():
            if "flag_meanings" in attrs:
                if type(msg.asdict()[key]) == bool:
                    colval = int(msg.asdict()[key])
                else:
                    idx_flag = attrs["flag_meanings"].index(msg.asdict()[key])
                    colval = attrs["flag_values"][idx_flag]
            elif self.coltypes[idx] == "str":
                colval = str(msg.asdict()[key])
            else:
                colval = msg.asdict()[key]
        else:
            colval = self.colmissings[idx]
        
        if "scaling_factor" in self.colattrs[idx]:
            scaling = attrs["scaling_factor"]
            colval = scaling * colval
        return colval
    
    def create_column(self, colname, coltype, colmissing, colsize, colattr):
        idx = len(self.columns)
        self.columns.append(colname)
        self.coltypes[colname] = coltype
        self.colmissings[colname] = colmissing
        self.colsizes.append(colsize)
        self.colattrs.append(colattr)
        self.buffers.append([])
            
    def create_columns(self, colnames, coltypes, colmissings, colsizes, colattrs):
        print(coltypes)
        for cname, ctype, cmiss, csize, cattr in zip(colnames, list(coltypes.values()), list(colmissings.values()), colsizes, colattrs):
            self.create_column(cname, ctype, cmiss, csize, cattr)
        
    def decode_message(self, msg):
        try:
            decoded_message = msg.decode()
            if msg.tag_block is not None:
                msg.tag_block.init()
                tags = msg.tag_block
        except:
            print(" Message ", msg, " ignored")
        else:
            for cname in self.columns:
                idx = self.columns.index(cname)
                if "tag_block" in self.colattrs[idx]:
                    if msg.tag_block is not None:
                        val = self._key2value(tags, cname)
                    else:
                        val = self.colmissings[idx]
                else:
                    val = self._key2value(decoded_message, cname)
                self.buffers[idx].append(val) 
            
              
    def write(self, colsize=None):
        for idx, colname in zip(range(len(self.columns)), self.columns):
            if colsize is None:
                colsize = self.MAX_NUMBER_ROWS
            coltype = self.coltypes[idx]
            print("Writing Column ", colname, coltype, self.colmissings[idx])
            if coltype == "str": 
                coltype = h5py.string_dtype(encoding='utf-8')
                ds = self.h5columns.create_dataset(colname, data=np.array(self.buffers[idx], dtype='S'))
            else:
                ds = self.h5columns.create_dataset(colname, data=np.array(self.buffers[idx], dtype=coltype), dtype=coltype)
            attrs = self.colattrs[idx]
            for attr in self.colattrs[idx].keys():
                ds.attrs[attr] = attrs[attr]

    def close(self):
        self.h5file.close()

In [11]:
%%time

nb_file = 1

input_filename = os.path.join(input_folder, input_url.split("/")[-1])
output_filename = os.path.join(output_folder, "ais_" + f"{nb_file:02}" + ".h5")

list_filenames = []
if os.path.isfile(output_filename):
    print(input_filename, " has been already decoded and ", output_filename, " have been generated")
else:
    
    print("Start...")
    ofile = H5AIS(output_filename, h5template=h5template_yaml, MAX_NUMBER_ROWS=MAX_NUMBER_ROWS)
    nb_msg = 0
    for msg in FileReaderStream(input_filename):
        ofile.decode_message(msg)
        nb_msg += 1
        if len(ofile.buffers[0]) >= ofile.MAX_NUMBER_ROWS:
            print("Writing buffer to ", output_filename)
            nb_msg = 0
            ofile.write()
            ofile.close()
            list_filenames.append(output_filename)
            nb_file += 1
            output_filename = output_prefix + "/ais_" + f"{nb_file:02}" + ".h5"
            ofile = H5AIS(output_filename, h5template=h5template_yaml, MAX_NUMBER_ROWS=MAX_NUMBER_ROWS)
            
    # Need to write last part
    print("Writing last buffer to ", output_filename)
    ofile.write(nb_msg)   
    ofile.close()

Start...
 Message  AISSentence<!AIVDM,1,1,,A,U31<0OOP000CshrMdl600?wP00SL,0*43>  ignored
 Message  AISSentence<!AIVDM,1,1,,A,a5MuRA0@00IQUuTA<Kgt1wvP00S6,0*65>  ignored
Writing last buffer to  ./data/decoded/ais_01.h5
Writing Column  msg_type int 1
Writing Column  mmsi int64 -1
Writing Column  heading float32 511
Writing Column  status int 15
Writing Column  lon float32 181
Writing Column  lat float32 91
Writing Column  speed int 102.3
Writing Column  course int 360.0
Writing Column  gnss int 1
Writing Column  accuracy int 0
Writing Column  receiver_timestamp float64 61
Writing Column  source_station int -1
Writing Column  destination_station str Unknown
Writing Column  shiptype int 0
CPU times: user 4.61 s, sys: 33.1 ms, total: 4.65 s
Wall time: 4.65 s


## Open decoded AIS data 

In [12]:
dset = vaex.open(os.path.join(output_folder, 'ais_*.h5'))
dset

kwrgs =  {}


#,accuracy,course,destination_station,gnss,heading,lat,lon,mmsi,msg_type,receiver_timestamp,shiptype,source_station,speed,status
0,0,367,b'Unknown',1,511.0,49.47558,0.13138,227006760,1,61.0,0,-1,0,0
1,1,633,b'Unknown',1,511.0,51.23766,4.419442,205448890,1,61.0,0,-1,0,0
2,1,1120,b'Unknown',1,511.0,51.967037,5.320033,786434,1,61.0,0,-1,16,0
3,1,2470,b'Unknown',1,511.0,37.955883,23.603634,249191000,1,61.0,0,-1,0,0
4,1,2379,b'Unknown',1,511.0,54.32111,-130.31624,316013198,1,61.0,0,-1,0,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
82751,1,3600,b'Unknown',1,511.0,47.9117,-122.52799,3669701,4,61.0,0,-1,1023,15
82752,0,0,b'Unknown',1,32.0,49.019833,-123.15085,240722000,3,61.0,0,-1,0,5
82753,0,2491,b'Unknown',1,318.0,51.231956,2.935753,372776000,1,61.0,0,-1,0,0
82754,1,3319,b'Unknown',1,331.0,59.00076,5.799692,258009500,1,61.0,0,-1,243,0
