# Import libraries

In [0]:
from pyais import decode, IterMessages
import math
import numpy as np
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import *
import pyspark.sql.functions as f
from pyspark.sql.functions import col, input_file_name, when 
from pyspark.sql.window import Window

In [0]:
# https://github.com/M0r13n/pyais
# the library for decoding AIS messages, uncoment the nextline for downloading
# %pip install pyais
sc = spark.sparkContext

# Functions

In [0]:
# decoding raw messages
def get_decoded(row):
    decoded = {}
    try:
        decoded = decode(row[0]).asdict()
        decoded['filename'] = row[1]
    except Exception:
        pass
    # cast 'status' from pyais object to float for spark to handle the type     
    try:
        decoded['status'] = float(decoded['status'])
    except (TypeError, KeyError) as error:
        pass
    # return a dictionary of the decoded message     
    return decoded

In [0]:
# select desired features
def get_features(row):
    schema_list = list(row)
    # if the message contains undesired feature, remove it     
    for i in schema_list:
        if i not in feature_list:
            row.pop(i, None)
    # if the message doesn't contain desired features, add it and initialize to None
    for j in feature_list:
        if j not in schema_list:
            row[j] = None
    # create a new dictionary which stores keys following the order in the feature_list
    new_row = {}
    for k in feature_list:
        new_row[k] = row[k]
    # convert the dictionary to a Row object and return it
    return Row(**new_row)

In [0]:
# encode eta (estimate time of arrival)
def get_eta(row):
    eta = ''
    if row[1].month != 0 and row[1].day != 0 and row[1].hour != 0 and row[1].minute != 0:
        eta = '{m}-{d} {hour}:{minute}'.format(m = str(int(row[1].month)).zfill(2), d = str(int(row[1].day)).zfill(2), hour = str(int(row[1].hour)).zfill(2), minute = str(int(row[1].minute)).zfill(2))
    else:
        eta = 'nan'
    return (row[0], eta)

In [0]:
# parse the filename and generate timestamp 
def get_timestamp(row):
    year = row.filename[29:33]
    month = row.filename[34:36]
    day = row.filename[37:39]
    hour = row.filename[40:42]
    minute = row.filename[43:45]
    ts = '{yy}-{mm}-{dd} {hour}:{minute}'.format(yy = year, mm = str(month).zfill(2), dd = str(day).zfill(2), hour = str(hour).zfill(2), minute = str(minute).zfill(2)) 
    return (row, ts)

In [0]:
def read_1hour_pipeline(files, year, month, d):
    write_out_path = '/mnt/lsde/group12/input/{yy}-{mm}-{dd}/'.format(yy = year, mm = str(month).zfill(2), dd = str(d).zfill(2))
    
    df = spark.read.format("csv").option("header", "false").option("delimiter", ",").option("inferschema","false").load(files)  
    df = df.withColumn('filename', input_file_name())
    # replace nan with ""
    df_no_null = df.na.fill("")
    # turn rows into parsable strings for the decoder, rdd -> (raw_message, filename)
    stream = df_no_null.rdd.map(lambda x: ((x[0] + "," + x[1] + "," + x[2] + "," + x[3] + "," + x[4] + "," + x[5] + "," + x[6]), x[7]) )
    # get decoded messages with filename encoded, rdd -> (dict(messages))     
    df_decoded = stream.map(get_decoded)
    # get desired features, rdd -> (Row)
    df_good_features = df_decoded.map(get_features).toDF(schema)
    # select desired message types     
    df_good_types = df_good_features.filter(col('msg_type').isin(type_list))
    # select desired sea area (north atlantic)     
    df_good_area = df_good_types.filter((col('lat') >= -0.936) & (col('lat') <= 68.6387) & (col('lon') >= -98.0539) & (col('lon') <= 12.0059)) 
    # replace null in month, day, hour, minute for eta generation
    df_pruned = df_good_area.na.fill(0, eta_list)
    # encode index to rdd, rdd -> (index, row)
    df_pruned_with_index = df_pruned.rdd.zipWithIndex().map(lambda row: (row[1], row[0]))
    # get eta, rdd -> (index, eta)
    eta = df_pruned_with_index.map(get_eta)
    # join df_pruned and eta on index, rdd after join -> (index, (row, eda)), rdd after map -> (Row)
    with_eta = df_pruned_with_index.join(eta).map(lambda row: Row(**row[1][0].asDict(), eda = row[1][1]))
    # get timestamps for entries, rdd -> (Row, timestamp) -> (Row)
    with_timestamp = with_eta.map(get_timestamp).map(lambda row: Row(**row[0].asDict(), timestamp = row[1])).toDF(schema2)
    # drop unneeded columns and ready to write out
    final_df = with_timestamp.drop('month', 'day', 'hour', 'minute', 'filename')
    # write to prefix:/input in parquet format
    if write_out_path[24:] not in [file.name for file in dbutils.fs.ls('/mnt/lsde/group12/input/')]:
        final_df.write.format("parquet").save(write_out_path)
    else:
        final_df.write.mode('append').format("parquet").save(write_out_path)

In [0]:
def get_filepaths_1hour(year, month, day, hour):
    try:
        # handle empty directories
        paths_list = dbutils.fs.ls('/mnt/lsde/datasets/ais2/{y}/{m}/{d}/'.format(y = str(year).zfill(2), m = str(month).zfill(2), d = str(day).zfill(2)))
    except (Exception):
        return []
    existing_paths = []
    for i in paths_list:
        if i.path[37:39] == str(day).zfill(2) and i.path[40:42] == str(hour).zfill(2):
            existing_paths.append(i.path[5:])
    return existing_paths   

In [0]:
# Processing data for 1 day at a time will cause a significant delay in processing time, thus deprecated
# def get_filepaths_1day(year, month, day):
#     paths_list = dbutils.fs.ls('/mnt/lsde/datasets/ais2/{y}/{m}/{d}/'.format(y = str(year).zfill(2), m = str(month).zfill(2), d = str(day).zfill(2)))
#     existing_paths = []
#     for i in paths_list:
#         if i.path[37:39] == str(day).zfill(2):
#             existing_paths.append(i.path[5:])
#     return existing_paths   

In [0]:
def extract_data(year, month, day_head, day_tail):
    # date
    for d in range(day_head, day_tail):
        # hour
        for h in range(0, 24):
            files = get_filepaths_1hour(year, month, d, h)
            # skip incomplete or missing file paths              
            try:
                read_1hour_pipeline(files, year, month, d)
            except (Exception):
                continue

# Query

In [0]:
eta_list = ['month', 'day', 'hour', 'minute']
type_list = [1,2,3,5,18,19,24,27]
feature_list = ['mmsi', 'msg_type', 'imo', 'callsign', 'status', 
            'shipname', 'ship_type', 'lon', 'lat', 'to_bow',
            'to_stern', 'to_port', 'to_starboard', 'heading', 
            'course', 'speed', 'month', 'day', 'hour', 'minute', 
            'draught', 'destination', 'radio', 'filename']

schema = StructType([StructField('mmsi', LongType(), True), StructField('msg_type', LongType(), True), StructField('imo', LongType(), True),
                    StructField('callsign', StringType(), True), StructField('status', FloatType(), True), StructField('shipname', StringType(), True),
                    StructField('ship_type', LongType(), True), StructField('lon', FloatType(), True), StructField('lat', FloatType(), True),
                    StructField('to_bow', LongType(), True), StructField('to_stern', LongType(), True), StructField('to_port', LongType(), True),
                    StructField('to_starboard', LongType(), True), StructField('heading', LongType(), True), StructField('course', FloatType(), True),
                    StructField('speed', FloatType(), True), StructField('month', LongType(), True), StructField('day', LongType(), True),
                    StructField('hour', LongType(), True), StructField('minute', LongType(), True), StructField('draught', FloatType(), True),
                    StructField('destination', StringType(), True), StructField('radio', LongType(), True), StructField('filename', StringType(), True)])

schema2 = StructType([StructField('mmsi', LongType(), True), StructField('msg_type', LongType(), True), StructField('imo', LongType(), True),
                    StructField('callsign', StringType(), True), StructField('status', FloatType(), True), StructField('shipname', StringType(), True),
                    StructField('ship_type', LongType(), True), StructField('lon', FloatType(), True), StructField('lat', FloatType(), True),
                    StructField('to_bow', LongType(), True), StructField('to_stern', LongType(), True), StructField('to_port', LongType(), True),
                    StructField('to_starboard', LongType(), True), StructField('heading', LongType(), True), StructField('course', FloatType(), True),
                    StructField('speed', FloatType(), True), StructField('month', LongType(), True), StructField('day', LongType(), True),
                    StructField('hour', LongType(), True), StructField('minute', LongType(), True), StructField('draught', FloatType(), True),
                    StructField('destination', StringType(), True), StructField('radio', LongType(), True), StructField('filename', StringType(), True),
                    StructField('eta', StringType(), True), StructField('timestamp', StringType(), True)])

# modify the time range for extracting data
year = 2016
month = 7
day_head = 0     #includes
day_tail = 32     #excludes

extract_data(year, month, day_head, day_tail)

