### 1) Kernel

Pengolahan ini menggunakan kernel ais-tt atau saat ini: pyspark3.3 ais2.8 untuk melakukan pengolahan data. Kernel ini dilengkapi dengan konfigurasi spark tambahan dan kredensial untuk Amazon Web Services (AWS).

### 2) Koneksi AIS Package dari AIS Task Team

In [1]:
import sys
import subprocess

GITLAB_USER = "read_aistt"  #For use of members of AIS Task Team, read only access
GITLAB_TOKEN = "J1Kk8tArfyXB6dZvFcWW"
ais_package = f"git+https://{GITLAB_USER}:{GITLAB_TOKEN}@code.officialstatistics.org/trade-task-team-phase-1/ais.git"

std_out = subprocess.run([sys.executable, "-m", "pip", "install",ais_package], capture_output=True, text=True).stdout

print(std_out) 

## Import modul
from ais import functions as af

Collecting git+https://read_aistt:****@code.officialstatistics.org/trade-task-team-phase-1/ais.git
  Cloning https://read_aistt:****@code.officialstatistics.org/trade-task-team-phase-1/ais.git to /tmp/pip-req-build-e2nf86ns



### 3) Import beberapa package yang akan digunakan

In [2]:
import geopandas as gpd # membuat geodataframe
import pandas as pd # membuat dataframe pandas
import h3 # membuat dan membantu visualisasi index h3

import matplotlib # plotting untuk visualisasi data
import matplotlib.pyplot as plt # modul dalam matplotlib untuk membuat plot dan grafik
from shapely.geometry import Polygon # kelas Shapely untuk membuat dan memanipulasi poligon
from datetime import datetime # modul untuk manipulasi tanggal dan waktut Polygon # kelas Shapely untuk membuat dan memanipulasi poligon
from datetime import datetime # modul untuk manipulasi tanggal dan waktu

# SEDONA
import sedona.sql # modul untuk menjalankan query SQL pada data spasial
from sedona.register import SedonaRegistrator # alat untuk mendaftarkan Sedona ke Spark
from sedona.utils import SedonaKryoRegistrator, KryoSerializer 
# registrator untuk serialisasi objek spasial dengan Kryo
# serializer untuk meningkatkan kinerja serialisasi

# PYSPARK
import pyspark.sql.functions as F # modul untuk fungsi SQL pada DataFrame
import pyspark.sql.types as T # modul untuk tipe data SQL pada DataFrame
from pyspark.sql import SparkSession  # kelas untuk membuat dan mengelola sesi Spark

generated new fontManager


### 4) Mengaktifkan Sesi Spark

In [3]:
spark = SparkSession. \
    builder. \
    appName('Emissions_Indonesia'). \
    config("spark.serializer", KryoSerializer.getName). \
    config("spark.kryo.registrator", SedonaKryoRegistrator.getName). \
    config('spark.jars.packages'). \
    config("spark.sql.parquet.enableVectorizedReader", "false").\
    getOrCreate()

SedonaRegistrator.registerAll(spark)

True

### 5) Read Data AIS di AWS S3 Bucket

In [4]:
save_path = "s3a://ungp-ais-data-historical-backup/user_temp/"
save_path_unique = save_path + "212112081/"

In [5]:
#read saved parquet
data_21 = spark.read.parquet(save_path_unique + "ais-data-indonesia-2021_expanded.parquet", header=True)

# Preprocessing Data

### a) Menghapus Record Duplikat
### b) Mencocokkan record AIS dan IHS

In [6]:
# Fungsi untuk melakukan pencocokan nama kapal AIS dan IHS
import re
import math
from collections import Counter

## Fungsi untuk mendapatkan nilai Cosine
def get_cosine(vec1, vec2):
    intersection = set(vec1.keys()) & set(vec2.keys())
    numerator = sum([vec1[x] * vec2[x] for x in intersection])

    sum1 = sum([vec1[x]**2 for x in vec1.keys()])
    sum2 = sum([vec2[x]**2 for x in vec2.keys()])
    denominator = math.sqrt(sum1) * math.sqrt(sum2)

    if not denominator:
        return 0.0
    else:
        return float(numerator) / denominator

## Fungsi untuk mengubah text menjadi vektor sebelum menghitung nilai cosine
def text_to_vector(text):
    word = re.compile(r'\w+')
    words = word.findall(text)
    return Counter(words)

## Fungsi untuk melakukan perbandingan dua nama kapal dengan cosine similarity
def compare_vessel_name(name_1, name_2):
    vector1 = text_to_vector(name_1)
    vector2 = text_to_vector(name_2)

    cosine_result = get_cosine(vector1, vector2)
    return cosine_result

# Mengubah fungsi menjadi fungsi udf agar dapat dimanfaatkan dalam dataset pyspark
compare = F.udf(lambda x,y:compare_vessel_name(x,y),T.DoubleType()) 

In [7]:
# Menghapus record duplikat
ais_data_21 = data_21.distinct()

# Ekstraksi data IHS
specs = spark.read.load("s3a://ungp-ais-data-historical-backup/register/ShipData.CSV",format="csv",sep=",",inferSchema="true",header="true")
specs = specs.withColumnRenamed("MaritimeMobileServiceIdentityMMSINumber","mmsi_ihs")\
                .withColumnRenamed("LRIMOShipNo","imo_ihs")\
                .withColumnRenamed("Draught","SummerDraught")

# Penggabungan data AIS dan IHS
## Record AIS yang Cocok Berdasaekan IMO
imo_match_21 = ais_data_21\
                    .join(specs, (ais_data_21.imo == specs.imo_ihs),how="inner")\
                    .withColumn("matchBy", F.lit("imo"))

## Record AIS yang Tidak Cocok Berdasarkan IMO
ais_ihs_left_21 = ais_data_21.join(specs, (ais_data_21.imo == specs.imo_ihs),how="left_anti")

## Record AIS yang Tidak Cocok Berdasarkan IMO dan cocok berdasarkan MMSI
mmsi_match_21 = ais_ihs_left_21.join(specs, (ais_ihs_left_21.mmsi == specs.mmsi_ihs),how="inner")

## Record AIS yang Tidak Cocok Berdasarkan IMO dan cocok berdasarkan MMSI dan Nama kapal
vessel_name_check_21 = mmsi_match_21.withColumn("similarity", compare(F.col("vessel_name"), F.col("ShipName")))
vessel_name_match_21 = vessel_name_check_21.filter(F.col("similarity")>=0.50)\
                                        .withColumn("imo", F.col("imo_ihs"))\
                                        .withColumn("matchBy", F.lit("mmsi"))

## Penggabungan Record AIS yang cocok Berdasarkan IMO dan (MMSI dan Nama Kapal)
match_record_21 = imo_match_21.union(vessel_name_match_21.drop(F.col("similarity")))

### Tabel Ringkasan Hasil Preprocessing

##### Jumlah record per tahap preprocessing

In [8]:
table_filter_21 = spark.createDataFrame([
    {"Keterangan": "Record AIS Indonesia Tahun 2022", "Jumlah record": data_21.count()},
    {"Keterangan": "Penghapusan duplikat", "Jumlah record": ais_data_21.count()},
    {"Keterangan": "Pencocokan dengan database IHS", "Jumlah record": match_record_21.count()}
])
table_filter_21.show()

+-------------+--------------------+
|Jumlah record|          Keterangan|
+-------------+--------------------+
|    292000087|Record AIS Indone...|
|    291814967|Penghapusan duplikat|
|    252776570|Pencocokan dengan...|
+-------------+--------------------+



##### Jumlah record berdasarkan jenis kecocokan dengan data IHS

In [9]:
table_match_21 = spark.createDataFrame([
    {"Jumlah record": imo_match_21.count(), "Keterangan": "Cocok dengan IMO"},
    {"Jumlah record": mmsi_match_21.count(), "Keterangan": "Cocok dengan MMSI"},
    {"Jumlah record": vessel_name_match_21.count(), "Keterangan": "Cocok dengan Nama Kapal"}
])
table_match_21.show()

+-------------+--------------------+
|Jumlah record|          Keterangan|
+-------------+--------------------+
|    239837830|    Cocok dengan IMO|
|     16617507|   Cocok dengan MMSI|
|     12938740|Cocok dengan Nama...|
+-------------+--------------------+



##### Jumlah kapal berdasarkan kecocokan dengan data IHS

In [10]:
table_vessel_21 = spark.createDataFrame([
    {"Jumlah record": imo_match_21.dropDuplicates(["imo"]).count(), "Keterangan": "Cocok dengan IMO"},
    {"Jumlah record": vessel_name_match_21.dropDuplicates(["imo"]).count(), "Keterangan": "Cocok dengan MMSI dan Nama Kapal"}
])
table_vessel_21.show()

+-------------+--------------------+
|Jumlah record|          Keterangan|
+-------------+--------------------+
|        25456|    Cocok dengan IMO|
|         4057|Cocok dengan MMSI...|
+-------------+--------------------+



### c) Menghitung durasi antar pesan AIS

In [11]:
# Fungsi untuk menghitung durasi
def count_freq(dfspark):
    df = (
      dfspark
      .selectExpr(
        "*"
      )
      .withColumn("previous_freq",F.expr(f"LAG(dt_pos_utc) OVER (PARTITION BY imo ORDER BY dt_pos_utc ASC) as previous_freq"))
      .withColumn("previous_h3",F.expr(f"LAG(H3_int_index_7) OVER (PARTITION BY imo ORDER BY dt_pos_utc ASC) as previous_h3"))
      .withColumn("freq", F.expr(f"(unix_timestamp(dt_pos_utc)-unix_timestamp(previous_freq))/3600 as freq"))
      .selectExpr(
        "*"
      )

    )
    
    return df

In [15]:
df_21 = count_freq(match_record_21)
df_21.printSchema()

root
 |-- vessel_type: string (nullable = true)
 |-- H3_int_index_5: long (nullable = true)
 |-- message_type: integer (nullable = true)
 |-- mmsi: integer (nullable = true)
 |-- dt_insert_utc: timestamp (nullable = true)
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- imo: integer (nullable = true)
 |-- vessel_name: string (nullable = true)
 |-- callsign: string (nullable = true)
 |-- vessel_type_code: integer (nullable = true)
 |-- vessel_type_cargo: string (nullable = true)
 |-- vessel_class: string (nullable = true)
 |-- length: double (nullable = true)
 |-- width: double (nullable = true)
 |-- flag_country: string (nullable = true)
 |-- flag_code: integer (nullable = true)
 |-- destination: string (nullable = true)
 |-- eta: integer (nullable = true)
 |-- draught: double (nullable = true)
 |-- sog: double (nullable = true)
 |-- cog: double (nullable = true)
 |-- rot: double (nullable = true)
 |-- heading: double (nullable = true)
 |-- nav_stat

In [16]:
df_21 = df_21.drop("callsign")

In [13]:
df_21.count()

252776570

## SAVING FILE

In [17]:
#save as parquet
df_21.write.option("header",True).mode("overwrite").parquet(save_path_unique + "ais-ihs-indonesia-2021.parquet")

### Stop Sesi Spark

In [18]:
spark.stop()