In [1]:
# Install a pip package in the current Jupyter kernel
import sys
!{sys.executable} -m pip install libais
!{sys.executable} -m pip install bokeh




In [1]:
import re
import pandas as pd
import numpy as np
import time

import ais

from dask.distributed import Client
from dask import dataframe as ddf
from dask.multiprocessing import get
from multiprocessing import cpu_count

nCores = cpu_count()

start = time.time()
start

1585291188.5031936

In [2]:
def meta_parse(row):
    '''
    Parse meta strings. Either single or multiline headers:
    s:66,c:1555624754*3E
    or 
    g:1-2-0300,s:66,c:1555624791*46
    or 
    g:2-2-0300*5E
    and s:xx multiline messages?
    '''
    meta, meta_checksum = row['meta'].split("*")
    time_found = re.search(r'[0-9]{10}',meta)
    
    if time_found:
        event_time = time_found.group(0)
    else:
        event_time = None
    
    row['meta_checksum'] = meta_checksum
    row['meta_source'] = meta
    row['event_time'] = event_time
    
    return row


In [3]:
def ais_decode_to_dict(row, multi = False, drop_message_type):
    try:
        if row[frag_count] == '1':
            decode_dict = ais.decode(row['payload'],int(row['padding']))
        else: 
            return {}
    except Exception as err:
        "try padding to reach 70"
        try:
            decode_dict = ais.decode(row['payload'], 12)
#             print('It worked!')
        except Exception as err:
#             print('{1} Still bad: {0}'.format(err, row.name))
#             print(row['ais'])            
            decode_dict = {}
            
    if decode_dict.get('id') in drop_message_type:
#         print('Dropping unwanted decoded message')
        decode_dict = {}
    return decode_dict

In [25]:
# 170 Lines of AIS in the expected format:
# infile = 'full_test.log'

# 10% of the daily file. Around 70 megs/ 800K messages. 
# infile = 'decimate_test.log'

# Full day:
infile = 'TNPA_AIS.log.2019-04-19'

ais_sentence = ['packet',
  'frag_count',
  'frag_num',
  'seq_id',
  'radio_chan',
  'payload',
  'padding',
  'checksum']

first_cols = ['rx_time',
          'meta',
          'ais',
          'meta_checksum',
          'meta_source',
          'event_time',
          'decoded',
          'msg_num']

meta_cols = ['meta_checksum',
            'meta_source',
            'event_time']

drop_message_type = [7, 8, 10, 11, 12, 13, 14, 15, 16, 17, 20, 21, 22, 23, 24, 25, 26, 27]


# Prepare Data

## Using Pandas

In [26]:
%%time

df = pd.read_csv(infile,
                 sep=r": \\|\\",
                 names=first_cols, 
                 header=None,
                 skiprows=1, 
                 skip_blank_lines=True,
                 engine="python")
 
 

CPU times: user 43.9 s, sys: 4.56 s, total: 48.5 s
Wall time: 45.9 s


In [27]:
%%time

df[ais_sentence] = df['ais'].str.split(r',|\*', expand=True) 
df

CPU times: user 32.8 s, sys: 1.64 s, total: 34.5 s
Wall time: 34 s


Unnamed: 0,rx_time,meta,ais,meta_checksum,meta_source,event_time,decoded,msg_num,packet,frag_count,frag_num,seq_id,radio_chan,payload,padding,checksum
0,"2019-04-19 00:00:00,774","s:66,c:1555624754*3E","!AIVDM,1,1,,,19NS36@02EMm;F5smJcsf9LF0@7u,0*3B",,,,,,!AIVDM,1,1,,,19NS36@02EMm;F5smJcsf9LF0@7u,0,3B
1,"2019-04-19 00:00:00,775","s:66,c:1555624754*3E","!AIVDM,1,1,,,13csJ70vBOw2:u9s@l`23A`H087w,0*6F",,,,,,!AIVDM,1,1,,,13csJ70vBOw2:u9s@l`23A`H087w,0,6F
2,"2019-04-19 00:00:00,777","s:66,c:1555624756*3C","!AIVDM,1,1,,,1815CD@01dNCOGee6Qdbt8pF0@8>,0*20",,,,,,!AIVDM,1,1,,,1815CD@01dNCOGee6Qdbt8pF0@8>,0,20
3,"2019-04-19 00:00:00,778","s:66,c:1555624756*3C","!AIVDM,1,1,,,13UMff0vAoNj`EicjOASc2tJ0@8B,0*4C",,,,,,!AIVDM,1,1,,,13UMff0vAoNj`EicjOASc2tJ0@8B,0,4C
4,"2019-04-19 00:00:00,779","s:66,c:1555624756*3C","!AIVDM,1,1,,,177=a8001RMpV93hSCO:vHlT0@8E,0*36",,,,,,!AIVDM,1,1,,,177=a8001RMpV93hSCO:vHlT0@8E,0,36
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
7992781,"2019-04-19 23:59:59,226","s:66,c:1555711162*3A","!AIVDM,1,1,,,1815DP@01oMpM7kgioMs18ln0D00,0*5C",,,,,,!AIVDM,1,1,,,1815DP@01oMpM7kgioMs18ln0D00,0,5C
7992782,"2019-04-19 23:59:59,226","s:66,c:1555711163*3B","!AIVDM,1,1,,,177;A;0wR8O1bV9lG=HlkCnT0H<F,0*6B",,,,,,!AIVDM,1,1,,,177;A;0wR8O1bV9lG=HlkCnT0H<F,0,6B
7992783,"2019-04-19 23:59:59,227","s:66,c:1555711163*3B","!AIVDM,1,1,,,15CN<T001l0EWM?b@`A3MBtb00S=,0*4C",,,,,,!AIVDM,1,1,,,15CN<T001l0EWM?b@`A3MBtb00S=,0,4C
7992784,"2019-04-19 23:59:59,228","s:66,c:1555711163*3B","!AIVDM,1,1,,,1;23g`000;Ki8<kQciidr83420S;,0*5C",,,,,,!AIVDM,1,1,,,1;23g`000;Ki8<kQciidr83420S;,0,5C


In [28]:
df_meta = df[['meta','meta_checksum','meta_source','event_time']]

In [29]:
df_to_decode = df[['frag_count','payload','padding']]
df_to_decode

Unnamed: 0,frag_count,payload,padding
0,1,19NS36@02EMm;F5smJcsf9LF0@7u,0
1,1,13csJ70vBOw2:u9s@l`23A`H087w,0
2,1,1815CD@01dNCOGee6Qdbt8pF0@8>,0
3,1,13UMff0vAoNj`EicjOASc2tJ0@8B,0
4,1,177=a8001RMpV93hSCO:vHlT0@8E,0
...,...,...,...
7992781,1,1815DP@01oMpM7kgioMs18ln0D00,0
7992782,1,177;A;0wR8O1bV9lG=HlkCnT0H<F,0
7992783,1,15CN<T001l0EWM?b@`A3MBtb00S=,0
7992784,1,1;23g`000;Ki8<kQciidr83420S;,0


## Using Dask

In [21]:
%%time
client = Client()
client

CPU times: user 263 ms, sys: 158 ms, total: 420 ms
Wall time: 855 ms


0,1
Client  Scheduler: tcp://127.0.0.1:44699  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 5  Cores: 10  Memory: 23.11 GB


In [30]:
%%time 
 
dask_meta   = ddf.from_pandas(df_meta, npartitions=2000*nCores)
dask_decode = ddf.from_pandas(df_to_decode, npartitions=2000*nCores)

CPU times: user 6.77 s, sys: 1.54 s, total: 8.31 s
Wall time: 7.09 s


In [None]:
%%time
dask_meta = dask_meta.apply(lambda x : meta_parse(x), axis=1, meta = dask_meta)
dask_meta.compute()


In [None]:
dask_meta

In [11]:
%%time
single_dataframe = dask_dataframe[dask_dataframe.frag_count == '1']

dask_decode = dask_decode.map_partitions(lambda df : df.apply(lambda x : ais_decode_to_dict(x, multi = False, drop_message_type),axis=1)).compute()
single_dataframe.compute()

KeyboardInterrupt: 

In [12]:
%%time
multi_ddf = dask_dataframe[dask_dataframe.frag_count == '2']
multi_df = multi_ddf.compute()

Exception ignored in: <function Future.__del__ at 0x7f83feaea8c0>
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/site-packages/distributed/client.py", line 377, in __del__
    self.release()
  File "/opt/conda/lib/python3.7/site-packages/distributed/client.py", line 355, in release
    self.client.loop.add_callback(self.client._dec_ref, tokey(self.key))
  File "/opt/conda/lib/python3.7/site-packages/tornado/platform/asyncio.py", line 177, in add_callback
    call_soon(self._run_callback, functools.partial(callback, *args, **kwargs))
  File "/opt/conda/lib/python3.7/asyncio/base_events.py", line 738, in call_soon_threadsafe
    self._write_to_self()
  File "/opt/conda/lib/python3.7/asyncio/selector_events.py", line 137, in _write_to_self
    csock.send(b'\0')
KeyboardInterrupt: 
Exception ignored in: <function Future.__del__ at 0x7f83feaea8c0>
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/site-packages/distributed/client.py", line 377, in __del

KeyboardInterrupt: 

# Seperate in single and multi-line messages

In [13]:
%%time

multi_df['prev_payload'] = multi_df['payload'].shift(1)
multi_df['prev_event_time'] = multi_df['event_time'].shift(1)
multi_df['payload'] = multi_df['prev_payload'] + multi_df['payload']

multi_df =  multi_df[multi_df['prev_event_time'].notnull() & multi_df['meta_source'].str.contains('g:2')]


NameError: name 'multi_df' is not defined

In [14]:
%%time

# pass DF to row 
yy = multi_df.apply(lambda x : ais_decode_to_dict(x, drop_message_type), axis=1)
yy_df = pd.DataFrame(yy.values.tolist())

NameError: name 'multi_df' is not defined

In [None]:
%%time

xx = dask_dataframe.map_partitions(lambda df : df.apply(lambda x : ais_decode_to_dict(x,drop_message_type),axis=1)).compute()
# client.shutdown()

In [None]:
# client.shutdown()

In [None]:
%%time
new_df = pd.DataFrame(xx.values.tolist())
new_df.columns

In [None]:
# new_df.to_csv('decoded_single.csv')
# yy_df.to_csv('decoded_multi.csv')
# multi_df.to_csv('parsed_multi.csv')

In [None]:
end = time.time()
print("Duration to process: {0} mins".format(round((end-start)/60,2)))

## To Do:
- DB Structure Pos vs Voy?   
- Combine Single and Multi?     
- DB larger when ?


# Use Cases (from experience)

- History of vessel X?
- What vessels are in this area?
- get point matches (sar-to-ais and radar-to-ais)
- Transhipments (ship-to-ship or ship-to-bilge)
- Get names/imo/callsigns from AIS pos report
- heatmap agg

# Reasearch use cases
- various distributions (speeds, nav status, courses) for vessel x for last 7 days (timescale db contiuous agg)
- Heatmap agg points
- Event based history
- API interface (although that's a seperate thing)
    