In [1]:
#allow multiple outputs in one jupyter cell
from IPython.core.interactiveshell import InteractiveShell 
InteractiveShell.ast_node_interactivity = "all"


import pandas as pd
from datetime import datetime
# to apply aggregation functions on spark df
import pyspark.sql.functions as F

In [2]:
# this cell contains the code to access GitLab repo
# need it to install ais package from GitLab repo
import sys
import subprocess

GITLAB_USER = "read_aistt"  # read only access
GITLAB_TOKEN = "MMQ6ky1rnLsuKxjyZuvB"

# clone the repo and install the ais packag
git_package = f"git+https://{GITLAB_USER}:{GITLAB_TOKEN}@code.officialstatistics.org/trade-task-team-phase-1/ais.git"

std_out = subprocess.run([sys.executable, "-m", "pip", "install", git_package], capture_output=True, text=True).stdout
print(std_out) 

Collecting git+https://read_aistt:****@code.officialstatistics.org/trade-task-team-phase-1/ais.git
  Cloning https://read_aistt:****@code.officialstatistics.org/trade-task-team-phase-1/ais.git to /tmp/pip-req-build-bwrch6bf
Building wheels for collected packages: ais
  Building wheel for ais (setup.py): started
  Building wheel for ais (setup.py): finished with status 'done'
  Created wheel for ais: filename=ais-2.7.6-py3-none-any.whl size=9267 sha256=afa84e9b418dc8cf7b062c227eda0b5b6a0e379a1bbe5afa0547f33a43f82718
  Stored in directory: /tmp/pip-ephem-wheel-cache-yxoddbwg/wheels/49/e0/a2/25d96a62cf626776ab2fd57fcbd822c2b8118049a84b16953d
Successfully built ais
Installing collected packages: ais
Successfully installed ais-2.7.6



# Chapter I: Access AIS

They provide us a helper function called `get_ais`. We will use this function to extract AIS data fragment.

In [3]:
# import get_ais() from ais package
from ais import functions as af

In [4]:
# details about the function e.g. 
    # input parameters, 
    # output: spark df
    # usage of this function in examples below
af.get_ais?

[0;31mSignature:[0m
[0maf[0m[0;34m.[0m[0mget_ais[0m[0;34m([0m[0;34m[0m
[0;34m[0m    [0mspark[0m[0;34m:[0m [0mpyspark[0m[0;34m.[0m[0msql[0m[0;34m.[0m[0msession[0m[0;34m.[0m[0mSparkSession[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mstart_date[0m[0;34m:[0m [0mdatetime[0m[0;34m.[0m[0mdatetime[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mend_date[0m[0;34m:[0m [0mdatetime[0m[0;34m.[0m[0mdatetime[0m [0;34m=[0m [0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mh3_list[0m[0;34m:[0m [0mUnion[0m[0;34m[[0m[0mList[0m[0;34m[[0m[0mint[0m[0;34m][0m[0;34m,[0m [0mNoneType[0m[0;34m][0m [0;34m=[0m [0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mpolygon_hex_df[0m[0;34m:[0m [0mUnion[0m[0;34m[[0m[0mpyspark[0m[0;34m.[0m[0msql[0m[0;34m.[0m[0mdataframe[0m[0;34m.[0m[0mDataFrame[0m[0;34m,[0m [0mNoneType[0m[0;34m][0m [0;34m=[0m [0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mmmsi_li

In [5]:
# Example 1: Retrieve data for a single date using get_ais()

# date inputs should be in date time format. dt_insert_utc is the basis for the parquet partitions
start_date = datetime.fromisoformat("2022-01-01")

# spark is the current spark session you are using. Gets automatically created during kernel init. 
df = af.get_ais(spark, start_date)

In [6]:
# spark df (not a pandas df)
type(df)

pyspark.sql.dataframe.DataFrame

In [7]:
# columns in df
# https://en.wikipedia.org/wiki/Automatic_identification_system#Broadcast_information
# https://h3geo.org/ for the H3_int cols (might discuss in upcomiming Workshop)


df.columns

['message_type',
 'mmsi',
 'imo',
 'vessel_name',
 'callsign',
 'vessel_type',
 'vessel_type_code',
 'vessel_type_cargo',
 'vessel_class',
 'length',
 'width',
 'flag_country',
 'flag_code',
 'destination',
 'eta',
 'draught',
 'longitude',
 'latitude',
 'sog',
 'cog',
 'rot',
 'heading',
 'nav_status',
 'nav_status_code',
 'source',
 'dt_pos_utc',
 'dt_static_utc',
 'dt_insert_utc',
 'vessel_type_main',
 'vessel_type_sub',
 'eeid',
 'source_filename',
 'H3index_0',
 'H3_int_index_0',
 'H3_int_index_1',
 'H3_int_index_2',
 'H3_int_index_3',
 'H3_int_index_4',
 'H3_int_index_5',
 'H3_int_index_6',
 'H3_int_index_7',
 'H3_int_index_8',
 'H3_int_index_9',
 'H3_int_index_10',
 'H3_int_index_11',
 'H3_int_index_12',
 'H3_int_index_13',
 'H3_int_index_14',
 'H3_int_index_15']

In [8]:
# name and type of each column in df
df.printSchema()

root
 |-- message_type: integer (nullable = true)
 |-- mmsi: integer (nullable = true)
 |-- imo: integer (nullable = true)
 |-- vessel_name: string (nullable = true)
 |-- callsign: string (nullable = true)
 |-- vessel_type: string (nullable = true)
 |-- vessel_type_code: integer (nullable = true)
 |-- vessel_type_cargo: string (nullable = true)
 |-- vessel_class: string (nullable = true)
 |-- length: double (nullable = true)
 |-- width: double (nullable = true)
 |-- flag_country: string (nullable = true)
 |-- flag_code: integer (nullable = true)
 |-- destination: string (nullable = true)
 |-- eta: integer (nullable = true)
 |-- draught: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- sog: double (nullable = true)
 |-- cog: double (nullable = true)
 |-- rot: double (nullable = true)
 |-- heading: double (nullable = true)
 |-- nav_status: string (nullable = true)
 |-- nav_status_code: integer (nullable = true)
 |-- source: st

In [9]:
# display data contained in 1st row of df. 
# Each row represents a single AIS message transmitted by a single ship
df.show(n=1, vertical=True, truncate=False)

-RECORD 0---------------------------------------------------------------------------------------------------------------------------
 message_type      | 1                                                                                                              
 mmsi              | 205654000                                                                                                      
 imo               | 9691279                                                                                                        
 vessel_name       | DN97                                                                                                           
 callsign          | ORRK                                                                                                           
 vessel_type       | Port Tender                                                                                                    
 vessel_type_code  | 53                                              

In [10]:
# number of rows in df
# Remember we only retrieved data for a single day and it is still 22 million rows so please make sure to always use a filter when retrieving data
df.count()

22136014

In [11]:
# Example 2: Filter data  on date and specific columns

columns = ["mmsi", "latitude", "longitude", "vessel_type", "dt_insert_utc", "eeid"]
start_date = datetime.fromisoformat("2022-01-01")

df = af.get_ais(spark, 
                start_date, 
                columns=columns)


In [12]:
# same number of rows
df.count()
# but filtered columns
df.columns

22136014

['mmsi', 'latitude', 'longitude', 'vessel_type', 'dt_insert_utc', 'eeid']

In [13]:
# Example 3: Retrieve data filtered on a range of dates and specific columns
# 160 Million rows retrieved
columns = ["mmsi", "latitude", "longitude", "vessel_type", "dt_insert_utc", "eeid"]
start_date = datetime.fromisoformat("2022-01-01")
end_date = datetime.fromisoformat("2022-01-07")

df = af.get_ais(spark, 
                start_date, 
                end_date = end_date, 
                columns=columns)
df.count()

160650992

In [14]:
# counting ais messages by date of insertion
    # dt_insert_utc for dates
    # eeid as unique row identifier
    
count_ais_by_date = df.withColumn("date", F.col("dt_insert_utc").cast("date")) \
        .groupby('date')  \
        .agg(F.count("eeid").alias("count")) \
        .orderBy("date")

count_ais_by_date.show()

+----------+--------+
|      date|   count|
+----------+--------+
|2022-01-01|22136014|
|2022-01-02|22813495|
|2022-01-03|22998973|
|2022-01-04|23132660|
|2022-01-05|23113870|
|2022-01-06|23319382|
|2022-01-07|23136598|
+----------+--------+



In [15]:
# groupby based on mmsi and date both
count_ais_per_mmsi_per_date = df.withColumn("date",F.col("dt_insert_utc").cast("date")) \
        .groupby("mmsi","date")  \
        .agg(F.count("eeid").alias("count")) \
        .orderBy("mmsi","date")

count_ais_per_mmsi_per_date.show()

+---------+----------+-----+
|     mmsi|      date|count|
+---------+----------+-----+
|201000000|2022-01-01|   43|
|201000000|2022-01-02|   79|
|201000000|2022-01-03|   68|
|201000000|2022-01-04|   89|
|201000000|2022-01-05|   70|
|201000000|2022-01-06|   59|
|201000000|2022-01-07|   72|
|201000058|2022-01-02|   84|
|201000058|2022-01-03|  103|
|201000058|2022-01-04|   86|
|201000058|2022-01-05|   37|
|201000115|2022-01-03|   89|
|201000115|2022-01-04|  153|
|201000115|2022-01-05|  125|
|201000115|2022-01-07|    1|
|201000128|2022-01-07|    2|
|201000131|2022-01-02|   39|
|201000131|2022-01-03|   96|
|201000131|2022-01-04|  118|
|201000131|2022-01-07|   82|
+---------+----------+-----+
only showing top 20 rows



In [16]:
# first this function and then pass on its output with get_ais()
af.polygon_to_hex_df?

[0;31mSignature:[0m
[0maf[0m[0;34m.[0m[0mpolygon_to_hex_df[0m[0;34m([0m[0;34m[0m
[0;34m[0m    [0mpolygons[0m[0;34m:[0m [0mList[0m[0;34m[[0m[0mTuple[0m[0;34m[[0m[0mstr[0m[0;34m,[0m [0mDict[0m[0;34m][0m[0;34m][0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mhex_resolution[0m[0;34m:[0m [0mint[0m [0;34m=[0m [0;36m8[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0moverfill[0m[0;34m=[0m[0;32mFalse[0m[0;34m,[0m[0;34m[0m
[0;34m[0m[0;34m)[0m [0;34m->[0m [0mpandas[0m[0;34m.[0m[0mcore[0m[0;34m.[0m[0mframe[0m[0;34m.[0m[0mDataFrame[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m
A wrapper for h3.polyfill that returns integer hex ids for multiple polygons.

Parameters
----------
polygons: list of tuples
    the first element in this tuple is expected to be a (name) string
    identifier for the polygon and the second element is the polygon itself (see example above)
    
hex_resolution: int, default 8
    the resolution of the hex

In [17]:
# 2nd parameter for polygon_to_hex_df() 
    #	https://boundingbox.klokantech.com/
# polygon coordinates in geojson format
colombo_polygon = {
        "type": "Polygon",
        "coordinates": [
            [
                [79.8133756779, 6.9156963109],    # longitude, latitude
                [79.8659040593, 6.9156963109],
                [79.8659040593, 6.9719290586],
                [79.8133756779, 6.9719290586],
                [79.8133756779, 6.9156963109]
            ]
        ]
    }

In [18]:
# first parameter for polygon_to_hex_df() is the name/label for the polygon
polygon_hex_df_colombo = af.polygon_to_hex_df([("Colombo_Port_Polygon", colombo_polygon)])

In [19]:
start_date = datetime.fromisoformat("2022-01-01")
end_date = datetime.fromisoformat("2022-01-07")
columns = ["mmsi", "latitude", "longitude", "eeid", "dt_insert_utc"]

# pass polygon_hex_df to get_ais()
df = af.get_ais(spark,
                start_date, 
                end_date = end_date,
                columns = columns,
                polygon_hex_df = polygon_hex_df_colombo  
                
               )

df.count()

47568

In [20]:
# ais messages captured in the Colombo port region
df.show(n=5)

+--------------+-----------+---------+-------------------+------------------+----------+-------------------+--------------------+
|hex_resolution|  longitude|     mmsi|               eeid|    H3_int_index_8|  latitude|      dt_insert_utc|        polygon_name|
+--------------+-----------+---------+-------------------+------------------+----------+-------------------+--------------------+
|             8|79.83612333|215181000|4961193053983565512|614197965673725951|   6.94602|2022-01-01 00:33:57|Colombo_Port_Polygon|
|             8|79.83608833|215181000|4961193053983565512|614197965673725951|6.94600833|2022-01-01 02:33:54|Colombo_Port_Polygon|
|             8|   79.83611|215181000|4961193053983565512|614197965673725951|6.94600667|2022-01-01 04:12:56|Colombo_Port_Polygon|
|             8|  79.836115|215181000|4961193053983565512|614197965673725951|6.94600667|2022-01-01 05:24:55|Colombo_Port_Polygon|
|             8|79.83613333|215181000|4961193053983565512|614197965673725951|6.94601167|20

# Chapter II: Accessing IHS Data 
- ship registry data in s3
- includes details about ship on a very granular level

In [21]:
basepath = "s3a://ungp-ais-data-historical-backup/register/"

# first file 
df_ship_data = spark.read.load(basepath+ "ShipData.CSV", 
                     format="csv", sep=",", inferSchema="true", header="true")

df_ship_data.printSchema()

root
 |-- LRIMOShipNo: integer (nullable = true)
 |-- StatCode5: string (nullable = true)
 |-- AlterationsDescriptiveNarrative: string (nullable = true)
 |-- PropulsionTypeCode: string (nullable = true)
 |-- ShipName: string (nullable = true)
 |-- ExName: string (nullable = true)
 |-- MaritimeMobileServiceIdentityMMSINumber: integer (nullable = true)
 |-- RegisteredOwnerCode: integer (nullable = true)
 |-- RegisteredOwnerCountryOfRegistration: string (nullable = true)
 |-- RegisteredOwnerCountryofDomicile: string (nullable = true)
 |-- ShipManagerCompanyCode: integer (nullable = true)
 |-- ShipManagerCountryOfRegistration: string (nullable = true)
 |-- ShipManagerCountryofDomicileName: string (nullable = true)
 |-- GroupBeneficialOwnerCompanyCode: integer (nullable = true)
 |-- GroupBeneficialOwnerCountryOfRegistration: string (nullable = true)
 |-- GroupBeneficialOwnerCountryofDomicile: string (nullable = true)
 |-- OperatorCompanyCode: integer (nullable = true)
 |-- OperatorCountryOf

In [22]:
# select only relevant cols from spark df
print('Loading ShipData.CSV (few cols) .....')
ship_data = df_ship_data.select("StatCode5", "MaritimeMobileServiceIdentityMMSINumber", "ShipStatusEffectiveDate",
                               "ShiptypeLevel5", "LRIMOShipNo").toPandas()

ship_data.shape
ship_data.head()

Loading ShipData.CSV (few cols) .....


(246724, 5)

Unnamed: 0,StatCode5,MaritimeMobileServiceIdentityMMSINumber,ShipStatusEffectiveDate,ShiptypeLevel5,LRIMOShipNo
0,X11A2YP,,19610000,Yacht,1000019
1,X11A2YP,,19951000,Yacht,1000021
2,X11A2YP,234028000.0,19950512,Yacht,1000033
3,X11A2YP,239488000.0,19950429,Yacht,1000045
4,X11A2YP,,20220601,Yacht,1000057


In [23]:
# second file read ship codes
df_ship_code = spark.read.load(basepath + "tblShipTypeCodes.CSV", 
                     format="csv", sep=",", inferSchema="true", header="true")

df_ship_code.printSchema()

# select only relevant cols from spark df
ship_code = df_ship_code.select("StatCode5", "ShipTypeLevel1", "ShipTypeLevel2", "ShipTypeLevel3", "ShipTypeLevel4", "ShipTypeLevel5", 
                                "SubGroup", "SubType").toPandas()

print('Loading tblShipTypeCodes.csv (few cols) ....')
ship_code.shape
ship_code.head()

root
 |-- StatCode5: string (nullable = true)
 |-- ShiptypeLevel5: string (nullable = true)
 |-- Level4Code: string (nullable = true)
 |-- ShipTypeLevel4: string (nullable = true)
 |-- Level3Code: string (nullable = true)
 |-- ShipTypeLevel3: string (nullable = true)
 |-- Level2Code: string (nullable = true)
 |-- ShipTypeLevel2: string (nullable = true)
 |-- ShipTypeLevel1Code: string (nullable = true)
 |-- ShiptypeLevel1: string (nullable = true)
 |-- HullType: string (nullable = true)
 |-- SubGroup: string (nullable = true)
 |-- SubType: string (nullable = true)

Loading tblShipTypeCodes.csv (few cols) ....


(295, 8)

Unnamed: 0,StatCode5,ShipTypeLevel1,ShipTypeLevel2,ShipTypeLevel3,ShipTypeLevel4,ShipTypeLevel5,SubGroup,SubType
0,A11A2TN,Cargo Carrying,Tankers,Liquefied Gas,LNG Tanker,LNG Tanker,Petroleum Products,LNG Tanker
1,A11A2TQ,Cargo Carrying,Tankers,Liquefied Gas,LNG Tanker,CNG Tanker,Petroleum Products,CNG Tanker
2,A11A2TZ,Cargo Carrying,Tankers,Liquefied Gas,LNG Tanker,Combination Gas Tanker (LNG/LPG),Petroleum Products,Combination Gas Tanker
3,A11B2TG,Cargo Carrying,Tankers,Liquefied Gas,LPG Tanker,LPG Tanker,Petroleum Products,LPG Tanker
4,A11B2TH,Cargo Carrying,Tankers,Liquefied Gas,LPG Tanker,LPG/Chemical Tanker,Petroleum Products,LPG/Chemical Tanker


In [24]:
# left join ship_data to ship_code
merged_ihs_df = pd.merge(ship_data, ship_code, left_on='StatCode5', right_on='StatCode5', how='left')
merged_ihs_df.head()

Unnamed: 0,StatCode5,MaritimeMobileServiceIdentityMMSINumber,ShipStatusEffectiveDate,ShiptypeLevel5,LRIMOShipNo,ShipTypeLevel1,ShipTypeLevel2,ShipTypeLevel3,ShipTypeLevel4,ShipTypeLevel5,SubGroup,SubType
0,X11A2YP,,19610000,Yacht,1000019,Non Merchant,Non-Merchant Ships,Yacht,Yacht,Yacht,Yacht,"Yacht, Private"
1,X11A2YP,,19951000,Yacht,1000021,Non Merchant,Non-Merchant Ships,Yacht,Yacht,Yacht,Yacht,"Yacht, Private"
2,X11A2YP,234028000.0,19950512,Yacht,1000033,Non Merchant,Non-Merchant Ships,Yacht,Yacht,Yacht,Yacht,"Yacht, Private"
3,X11A2YP,239488000.0,19950429,Yacht,1000045,Non Merchant,Non-Merchant Ships,Yacht,Yacht,Yacht,Yacht,"Yacht, Private"
4,X11A2YP,,20220601,Yacht,1000057,Non Merchant,Non-Merchant Ships,Yacht,Yacht,Yacht,Yacht,"Yacht, Private"


# Chapter III: How to Save processed data to s3 for your next notebook session?
- Store your data in your designated s3 directories only
    - save in parquet format
        - faster query performance
        - less data size
    - save in pickle format
    - csv not recommended

In [25]:
!pip install s3fs
import s3fs 

# create a handle for s3fs
fs = s3fs.S3FileSystem(anon=False) 

Collecting s3fs
  Downloading s3fs-2022.10.0-py3-none-any.whl (27 kB)
Collecting fsspec==2022.10.0
  Downloading fsspec-2022.10.0-py3-none-any.whl (138 kB)
[K     |████████████████████████████████| 138 kB 41.2 MB/s eta 0:00:01
[?25hCollecting aiobotocore~=2.4.0
  Downloading aiobotocore-2.4.0-py3-none-any.whl (65 kB)
[K     |████████████████████████████████| 65 kB 80.1 MB/s eta 0:00:01
[?25hCollecting aiohttp!=4.0.0a0,!=4.0.0a1
  Downloading aiohttp-3.8.3-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.0 MB)
[K     |████████████████████████████████| 1.0 MB 65.1 MB/s eta 0:00:01
[?25hCollecting botocore<1.27.60,>=1.27.59
  Downloading botocore-1.27.59-py3-none-any.whl (9.1 MB)
[K     |████████████████████████████████| 9.1 MB 44.8 MB/s eta 0:00:01
[?25hCollecting aioitertools>=0.5.1
  Downloading aioitertools-0.11.0-py3-none-any.whl (23 kB)
Collecting wrapt>=1.10.10
  Downloading wrapt-1.14.1-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manyl

In [27]:
# path to store your data on s3
s3_root_path = "s3a://team-datadive-582958291898-mvvpu"

In [28]:
# ls command to display the list of files located on a path
fs.ls(s3_root_path)

PermissionError: Access Denied

In [None]:
# write as parquet to s3
tmp_out_path=f"{s3_root_path}/pengfei/output/ihs_data.parquet"
merged_ihs_df.to_parquet(tmp_out_path)

# read parquet as pandas df from s3
new_df_parquet = pd.read_parquet(tmp_out_path)
new_df_parquet.shape

In [None]:
# remove files in a dir
fs.rm(tmp_out_path)

# Download results to local machine
- only allowed for small datasets
- save in csv format

In [29]:
# note af is provided by the hackthon
af.create_download_link(merged_ihs_df.head(20), title = "Download CSV file", filename = "myresults.csv")

# Terminate processes
- to save cost and resources

In [30]:
# stop spark session
spark.stop()

# shut down the kernel

# log out

# Best practices
- Save your notebooks after regular intervals
- Save your processed data in s3 for later use (instead of re-running the same notebook)
- Download only the results to your local machine
- Shut down spark, kernel and NB when not in use