In [6]:
# this cell contains the code to access GitLab repo
# need it to install ais package from GitLab repo
import sys
import subprocess

GITLAB_USER = "read_aistt"  # read only access
GITLAB_TOKEN = "MMQ6ky1rnLsuKxjyZuvB"

# clone the repo and install the ais packag
git_package = f"git+https://{GITLAB_USER}:{GITLAB_TOKEN}@code.officialstatistics.org/trade-task-team-phase-1/ais.git"

std_out = subprocess.run([sys.executable, "-m", "pip", "install", git_package], capture_output=True, text=True).stdout
print(std_out) 

Collecting git+https://read_aistt:****@code.officialstatistics.org/trade-task-team-phase-1/ais.git
  Cloning https://read_aistt:****@code.officialstatistics.org/trade-task-team-phase-1/ais.git to /tmp/pip-req-build-7fy9uwgc



In [7]:
#allow multiple outputs in one jupyter cell
from IPython.core.interactiveshell import InteractiveShell 
InteractiveShell.ast_node_interactivity = "all"


import pandas as pd
from datetime import datetime
# to apply aggregation functions on spark df
import pyspark.sql.functions as F

In [8]:
# import get_ais() from ais package
from ais import functions as af

In [9]:
# Example 1: Retrieve data for a single date using get_ais()
columns = [
    "mmsi", 
    "latitude", 
    "longitude", 
    'vessel_name',
    "vessel_type", 
    'vessel_type_main',
    'vessel_type_sub',
    'vessel_type_code',
    'vessel_type_cargo',
    'vessel_class',
    "dt_insert_utc", 
    "eeid",
    'length','width','flag_country','destination','eta','source',
    "H3_int_index_1",
    "H3_int_index_5"
]

# date inputs should be in date time format. dt_insert_utc is the basis for the parquet partitions
start_date = datetime.fromisoformat("2022-01-01")
end_date = datetime.fromisoformat("2022-06-30")

# spark is the current spark session you are using. Gets automatically created during kernel init. 
df = af.get_ais(
    spark, 
    start_date = start_date,
    end_date = end_date,
    columns = columns
)

In [10]:
sample_mmsi = '311922000'
SAVEPATH = "s3a://team-cedarsdata-582958291898-mvvpu/"
SEED = 2022
h3_id = [581118283558682623, 581109487465660415, 581122681605193727, 581135875744727039, 581267817140060159, 581514107744681983, 58151850579119308, 581760398349303807, 581500913605148671, 581509709698170879, 581742806163259391, 581764796395814911, 581769194442326015, 582081455744614399]

In [11]:
len(h3_id)

14

In [12]:
# columns in df
df.columns

['mmsi',
 'latitude',
 'longitude',
 'vessel_name',
 'vessel_type',
 'vessel_type_main',
 'vessel_type_sub',
 'vessel_type_code',
 'vessel_type_cargo',
 'vessel_class',
 'dt_insert_utc',
 'eeid',
 'length',
 'width',
 'flag_country',
 'destination',
 'eta',
 'source',
 'H3_int_index_1',
 'H3_int_index_5']

In [13]:
# name and type of each column in df
df.printSchema()

root
 |-- mmsi: integer (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- vessel_name: string (nullable = true)
 |-- vessel_type: string (nullable = true)
 |-- vessel_type_main: string (nullable = true)
 |-- vessel_type_sub: string (nullable = true)
 |-- vessel_type_code: integer (nullable = true)
 |-- vessel_type_cargo: string (nullable = true)
 |-- vessel_class: string (nullable = true)
 |-- dt_insert_utc: timestamp (nullable = true)
 |-- eeid: long (nullable = true)
 |-- length: double (nullable = true)
 |-- width: double (nullable = true)
 |-- flag_country: string (nullable = true)
 |-- destination: string (nullable = true)
 |-- eta: integer (nullable = true)
 |-- source: string (nullable = true)
 |-- H3_int_index_1: long (nullable = true)
 |-- H3_int_index_5: long (nullable = true)



In [14]:
# display data contained in 1st row of df. 
# Each row represents a single AIS message transmitted by a single ship
df.show(n=1, vertical=True, truncate=False)

-RECORD 0--------------------------------
 mmsi              | 205654000           
 latitude          | 51.32248833         
 longitude         | 3.20316             
 vessel_name       | DN97                
 vessel_type       | Port Tender         
 vessel_type_main  | null                
 vessel_type_sub   | null                
 vessel_type_code  | 53                  
 vessel_type_cargo | null                
 vessel_class      | A                   
 dt_insert_utc     | 2022-01-01 21:17:48 
 eeid              | 4897682788452534256 
 length            | 17.0                
 width             | 6.0                 
 flag_country      | Belgium             
 destination       | ZEEBRUGGE           
 eta               | 10081400            
 source            | T-AIS               
 H3_int_index_1    | 581412952674926591  
 H3_int_index_5    | 599423900178186239  
only showing top 1 row



In [16]:
df = df.filter(F.col('H3_int_index_1').isin(h3_id)).persist()
df.show()

+---------+-----------+-----------+-----------+-----------+------------------+---------------+----------------+-----------------+------------+-------------------+-------------------+------+-----+------------+--------------+-------+------+------------------+------------------+
|     mmsi|   latitude|  longitude|vessel_name|vessel_type|  vessel_type_main|vessel_type_sub|vessel_type_code|vessel_type_cargo|vessel_class|      dt_insert_utc|               eeid|length|width|flag_country|   destination|    eta|source|    H3_int_index_1|    H3_int_index_5|
+---------+-----------+-----------+-----------+-----------+------------------+---------------+----------------+-----------------+------------+-------------------+-------------------+------+-----+------------+--------------+-------+------+------------------+------------------+
|209664000|57.82563333|19.23776167|    SOLYMAR|      Cargo|General Cargo Ship|           null|              70|             null|           A|2022-01-01 21:36:10|4613726

In [None]:
df.count()

402547513

In [30]:
df.select(F.countDistinct("mmsi")).show()

+--------------------+
|count(DISTINCT mmsi)|
+--------------------+
|               72891|
+--------------------+



In [17]:
#filter for sample mmsi
# df_sample = df.filter(F.col('mmsi')==sample_mmsi)
df_sample = df.sample(0.05, seed=SEED).persist()
df_sample

DataFrame[mmsi: int, latitude: double, longitude: double, vessel_name: string, vessel_type: string, vessel_type_main: string, vessel_type_sub: string, vessel_type_code: int, vessel_type_cargo: string, vessel_class: string, dt_insert_utc: timestamp, eeid: bigint, length: double, width: double, flag_country: string, destination: string, eta: int, source: string, H3_int_index_1: bigint, H3_int_index_5: bigint]

In [18]:
df_sample.count()

20124657

In [19]:
unique_mmsi = df_sample.dropDuplicates(['mmsi']).select("mmsi").toPandas()
unique_mmsi

Unnamed: 0,mmsi
0,258073000
1,265603060
2,265717490
3,265738930
4,265828420
...,...
65896,538009947
65897,211657130
65898,271010057
65899,211413130


In [44]:
print("coverage mmsi from sample: {:.4}%".format(65901/72891*100))

coverage mmsi from sample: 90.41%


In [45]:
# write as parquet to s3
df_sample.repartition('mmsi').write.mode('overwrite').parquet(SAVEPATH+"AIS_sample_baltic_005_202201_202206a.parquet")

In [46]:
# write as parquet to local
df_sample.repartition('mmsi').write.mode('overwrite').parquet("AIS_sample_baltic_005_202201_202206a.parquet")

In [20]:
# write as parquet to our own s3
df_sample.limit(1000).repartition('mmsi').write.mode('overwrite').parquet("s3://know-ais/data/AIS_sample_baltic_005_202201_202206.parquet")

Py4JJavaError: An error occurred while calling o237.parquet.
: java.nio.file.AccessDeniedException: s3://know-ais/data/AIS_sample_baltic_005_202201_202206.parquet: getFileStatus on s3://know-ais/data/AIS_sample_baltic_005_202201_202206.parquet: com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: ZDE9B17TNB0WHXRF; S3 Extended Request ID: FOh1VHLHKCj0e9hK/6PmIbRXpazemxxphys/Q3MKQBpXobp8VoQ5ld+VTd6uTBqE0U2o6PE+uU8=), S3 Extended Request ID: FOh1VHLHKCj0e9hK/6PmIbRXpazemxxphys/Q3MKQBpXobp8VoQ5ld+VTd6uTBqE0U2o6PE+uU8=:403 Forbidden
	at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:230)
	at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:151)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2198)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2163)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2102)
	at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1683)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.exists(S3AFileSystem.java:2984)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:118)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:131)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:127)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:126)
	at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:962)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:962)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:414)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:398)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:287)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:847)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.base/java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Unknown Source)
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: ZDE9B17TNB0WHXRF; S3 Extended Request ID: FOh1VHLHKCj0e9hK/6PmIbRXpazemxxphys/Q3MKQBpXobp8VoQ5ld+VTd6uTBqE0U2o6PE+uU8=), S3 Extended Request ID: FOh1VHLHKCj0e9hK/6PmIbRXpazemxxphys/Q3MKQBpXobp8VoQ5ld+VTd6uTBqE0U2o6PE+uU8=
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1640)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1058)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
	at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
	at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4368)
	at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4315)
	at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1271)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$getObjectMetadata$4(S3AFileSystem.java:1249)
	at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:322)
	at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:285)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:1246)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2183)
	... 37 more


In [19]:
!ls

AIS_sample_baltic_025_202201_202206.parquet  requirements.txt
launch_ipykernel.py


In [21]:
!pip install -q boto3

You should consider upgrading via the '/opt/conda/bin/python -m pip install --upgrade pip' command.[0m


In [24]:
import boto3
s3 = boto3.resource('s3')
s3.meta.client.upload_file('AIS_sample_baltic_025_202201_202206.parquet', 'knowais-unbdh', 'knowais/AIS_sample_baltic_025_202201_202206.parquet')

IsADirectoryError: [Errno 21] Is a directory: 'AIS_sample_baltic_025_202201_202206.parquet'