In [None]:
import sys
import subprocess

GITLAB_USER = "read_aistt"  #For use of members of AIS Task Team, read only access
GITLAB_TOKEN = "J1Kk8tArfyXB6dZvFcWW"
git_package = f"git+https://{GITLAB_USER}:{GITLAB_TOKEN}@code.officialstatistics.org/trade-task-team-phase-1/ais.git"

std_out = subprocess.run([sys.executable, "-m", "pip", "install",git_package], capture_output=True, text=True).stdout
print(std_out)

In [None]:
from ais import functions as af

#still need to register Sedona even with template configuration. need to check why
from sedona.register import SedonaRegistrator
SedonaRegistrator.registerAll(spark)

import pandas as pd
from datetime import datetime
#import json
from IPython.display import display

import h3
import h3.api.numpy_int as h3int
import pyspark.sql.functions as F

In [None]:
%%time
# Get AIS data from date range (June)
start_date = datetime.fromisoformat("2022-06-01")
end_date = datetime.fromisoformat("2022-07-01")
df = af.get_ais(spark,start_date, end_date = end_date)

df.show()

# 16.1s for 1 week
# ~4 mins

In [None]:
# Define polygon for vancouver port
vanPort = { "type": "Polygon", 
            "coordinates": [ 
                [ 
                    [-123.212426525887096, 49.274760228402968],
                    [-122.985548446858502, 49.251700411088592], 
                    [-122.988952264515504, 49.355219640768908],
                    [-123.214802962515606, 49.353944465477369],
                    [-123.212426525887096, 49.274760228402968]
                ]
            ]
          }

In [None]:
%%time
# Filter by geography
df_van = af.apply_geo_filter(spark, df, vanPort)

df_van.count()

# Takes 5 mins 30s 

In [None]:
# Try to convert it to pandas df
try:
    df_van = df_van.toPandas()
    display(df_van)
except:
    print("The dataframe was too large to convert to Pandas")

In [None]:
pd.set_option('display.max_columns', 50)

df_van.head()

In [None]:
print(df_van['dt_pos_utc'].min())
print(df_van['dt_pos_utc'].max())

In [None]:
# Drop row on 2022-06-30
df_van = df_van["2022-06-01 00:00:01" <= df_van['dt_pos_utc']]
df_van = df_van[df_van['dt_pos_utc'] <= "2022-06-30 23:59:59"]
df_van.reset_index(drop=True)

print(df_van['dt_pos_utc'].min())
print(len(df_van.index))

In [None]:
from IPython.display import HTML
import base64  

import json
import pandas as pd
from io import StringIO
from urllib import parse

import requests

def create_download_link(df, title = "Download CSV file", filename = "data.csv"):
    """
    Download locally a small pandas dataframe into csv 
    
    Parameters:
    ----------
    
    df : pandas dataframe
    
    title : str, title of download link
    
    filename : str, filename of file
    
    Returns:
    Link for download
    
    """
    csv = df.to_csv()
    b64 = base64.b64encode(csv.encode())
    payload = b64.decode()
    html = '<a download="{filename}" href="data:text/csv;base64,{payload}" target="_blank">{title}</a>'
    html = html.format(payload=payload,title=title,filename=filename)
    print(html)
    return HTML(html)

In [None]:
create_download_link(df_van, filename="test.csv")

In [None]:
import os

In [None]:
# Get number of unique ships per day
# Use DT_POS_UTC column: Date and Time of Last Position AIS Message in UTC [YYYY-MM-DD HH24:MI:SS]
df_van.groupby(pd.Grouper(key="dt_pos_utc", axis=0, freq="1D"))['mmsi'].count().reset_index()

In [None]:
# Get number of unique ships per hour
df_van.groupby(pd.Grouper(key="dt_pos_utc", axis=0, freq="1H"))['mmsi'].count().reset_index()

In [None]:
# Check nav status of ships
df_van['nav_status'].unique()

In [None]:
# We want ships that are moored
df_van_moored = df_van[df_van['nav_status'] == 'Moored']

# Create a column for dt_pos_utc_hour and dt_pos_utc_day
df_van_moored['dt_pos_utc_date'] = df_van_moored['dt_pos_utc'].dt.date
df_van_moored['dt_pos_utc_hour'] = df_van_moored['dt_pos_utc'].dt.hour

# Drop duplicates based on mmsi and date and hour
df_moored_date = df_van_moored.drop_duplicates(subset=['mmsi', 'dt_pos_utc_date'], ignore_index=True)
df_moored_hour = df_van_moored.drop_duplicates(subset=['mmsi', 'dt_pos_utc_hour'], ignore_index=True)

# Get number of unique ships per day and hour
display(df_moored_date.groupby("dt_pos_utc_date")['mmsi'].count().reset_index())
display(df_moored_hour.groupby(["dt_pos_utc_date", "dt_pos_utc_hour"])['mmsi'].count().reset_index())

In [None]:
# Count of unique ships that arrived in port by date
# Count time spent in port per ship
port_stats = df_van.groupby(['mmsi','vessel_type']).agg({'mmsi':'count','dt_pos_utc':['min','max']}).reset_index()
port_stats.columns = ['mmsi','vessel_type','obs_count','dt_pos_utc_min','dt_pos_utc_max']
port_stats['arrival_date'] = port_stats.dt_pos_utc_min.dt.date
port_stats['time_in_port'] = port_stats.dt_pos_utc_max - port_stats.dt_pos_utc_min
port_stats['time_in_port_sec'] = port_stats['time_in_port'].dt.total_seconds()

# Display
port_stats

# Length of port_stats is the number of unique ships in specified area

In [None]:
# Count of unique ships that were moored in port by date
# Count time spent in port per ship
port_stats2 = df_van_moored.groupby(['mmsi','vessel_type']).agg({'mmsi':'count','dt_pos_utc':['min','max']}).reset_index()
port_stats2.columns = ['mmsi','vessel_type','obs_count','dt_pos_utc_min','dt_pos_utc_max']
port_stats2['arrival_date'] = port_stats2.dt_pos_utc_min.dt.date
port_stats2['time_in_port'] = port_stats2.dt_pos_utc_max - port_stats.dt_pos_utc_min
port_stats2['time_in_port_sec'] = port_stats2['time_in_port'].dt.total_seconds()

# Display
port_stats2

In [None]:
!pip install plotly

In [None]:
# 1. make daily vessel count (date vs vessel count)
# 2. Make histogram (time spent in days, weeks vs freq or percentage) then group by vessel_type
import plotly.express as px