In [None]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from pyspark.sql.window import Window
from pyspark.sql.types import *
# import polars as pl
import pandas as pd
from shapely import Polygon, Point

ModuleNotFoundError: No module named 'py4j'

In [2]:
conf = SparkConf().set('spark.ui.port', '4045')\
  .set("google.cloud.auth.service.account.enable", "true")\
  .set("google.cloud.auth.service.account.json.keyfile", "/opt/spark/credentials/google-credential.json")
spark = SparkSession.builder.appName("test").config(conf = conf).master("local[*]").getOrCreate()
# spark.conf.set("google.cloud.auth.service.account.enable", "true") 
# spark.conf.set("google.cloud.auth.service.account.json.keyfile", "/opt/spark/credentials/google-credential.json")

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/08/18 00:06:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
def get_spark_schema():
  from pyspark.sql.types import StructType, StructField, IntegerType, TimestampType, FloatType, DoubleType, StringType
  return StructType([
    StructField("MMSI", StringType(), False),
    StructField("BaseDateTime", TimestampType(), False),
    StructField("LAT", DoubleType(), False),
    StructField("LON", DoubleType(), False),
    StructField("SOG", FloatType(), False),
    StructField("COG", FloatType(), False),
    StructField("Heading", FloatType(), True),
    StructField("VesselName", StringType(), True),
    StructField("IMO", StringType(), True),
    StructField("CallSign", StringType(), True),
    StructField("VesselType", ShortType(), True),
    StructField("Status", ShortType(), True),
    StructField("Length", FloatType(), True),
    StructField("Width", FloatType(), True),
    StructField("Draft", FloatType(), True),
    StructField("Cargo", StringType(), True),
    StructField("TransceiverClass", StringType(), False)
  ])
  
def get_port_schema():
  from pyspark.sql.types import StructType, StructField, IntegerType, TimestampType, FloatType, DoubleType, StringType
  return StructType([
    StructField("UNLOCODE", StringType(), False),
    StructField("NAME", StringType(), False),
    StructField("STATE", StringType(), False),
    StructField("LAT", DoubleType(), False),
    StructField("LON", DoubleType(), False),
  ])
  
@f.udf(returnType=BooleanType())
def near_coastline_polygon(lat, lon):
  point = Point(lon, lat)
  polygon = Polygon([
    (-125.1,48.8),
    (-124.7,41.9),
    (-124.5, 40.3),
    (-120.7, 34.4),
    (-117.3, 32.4),
    (-97, 25.7),
    (-97, 27.6),
    (-93, 29.5),
    (-89, 28.7),
    (-87.3, 30.1),
    (-85, 29.4),
    (-83, 27.8),
    (-81.2, 25),
    (-80, 25),
    (-80, 27),
    (-80.5, 28.6),
    (-75.3, 35.2),
    (-75.7, 36.9),
    (-73.8, 40.3),
    (-69.7, 41.1),
    (-70, 43),
    (-125.1,48.8),
  ])
  return polygon.contains(point)

In [None]:
bucket_path = "gs://vessel-traffic-parquet-data/"
#file_path = "raw_day/year=2024/month=1/AIS_2024_01_01.parquet"
file_path = "raw_day/year=2024/"
spark_df = spark.read.schema(get_spark_schema()).format("parquet").load(bucket_path + file_path)
spark_df = spark_df.filter((f.length(f.col("MMSI")) == 9) & (f.abs(f.col("LAT")) <= 90) & (f.abs(f.col("LON")) <= 180))
port_df = spark.read.schema(get_port_schema()).csv("ports.csv", header=True)
vessel_profile_df = spark_df.select("MMSI", "VesselName", "IMO", "CallSign", "VesselType", f.col("Length"), f.col("Width")).distinct()
ais_df = spark_df.select("MMSI","BaseDateTime","LAT","LON","SOG","COG","Heading","Status","Draft","Cargo","TransceiverClass")

In [None]:
spark_df.schema

In [None]:
spark_df.filter(f.isnull(f.col("MMSI"))).show()

In [None]:
#documentation regarding "invalid/not accessable/default" values on:
#https://www.navcen.uscg.gov/ais-class-a-reports

#replace values for "invalid/not accessable/default" to Null for non-categorial field 
vessel_profile_df = vessel_profile_df.replace("IMO0000000", None, "IMO")
vessel_profile_df = vessel_profile_df.replace(0, None, ["Length", "Width"])
ais_df = ais_df.replace(511.0, None, "Heading")
ais_df = ais_df.replace(102.3, None, "SOG")
ais_df = ais_df.replace(360, None, "COG")
ais_df = ais_df.replace(0, None, "Draft")

#replace null to encoded "invalid/not accessable/default" values for categorial field
vessel_profile_df = vessel_profile_df.fillna(0, "VesselType")
ais_df = ais_df.fillna(15, "Status")
ais_df = ais_df.fillna(0, "Cargo")

In [None]:
ais_df.show()

In [None]:
vessel_profile_df.show()

In [None]:
ais_df.filter((f.abs(f.col("LAT")) > 90) | (f.abs(f.col("LON")) > 180)).show()

In [None]:
vessel_profile_df.filter(f.expr("VesselName like '@'")).show()

In [None]:
if vessel_profile_df.count() != vessel_profile_df.select("MMSI").distinct().count():
  vessel_profile_df.groupBy("MMSI").count().filter(f.expr("count > 1")).sort(f.desc("count")).show()

In [None]:
vessel_profile_df.select(f.length(f.col("MMSI")).alias("MMSI-Length")).distinct().show()

In [None]:
vessel_profile_df.select(f.length(f.col("MMSI")).alias("MMSI-Length")).groupBy(f.col("MMSI-Length")).count().show()

In [None]:
vessel_profile_df.filter(f.length(f.col("MMSI")) != 9).show()

In [None]:
ais_df.filter(f.col("MMSI") == '36968098').show()

In [None]:
ais_df.filter(f.col("MMSI") == '99043470').show()

In [None]:
ais_df.filter(f.col("MMSI") == '91481544').show()

In [None]:
ais_df.filter(f.length(f.col("MMSI")) != 9).count()

In [None]:
vessel_profile_df.cache()
vessel_profile_df.filter(f.col("MMSI") == 338478187).show()

In [None]:
vessel_profile_df.groupBy("MMSI").agg(f.count_distinct("CallSign").alias("c")).filter(f.col("c") > 1).show()

In [None]:
vessel_profile_df.filter(f.col("MMSI") == 369914086).show()

In [None]:
vessel_profile_df.replace("NULL", None, ["CallSign","IMO"]).distinct().filter(f.col("MMSI") == 338442596).show()

In [None]:
vessel_profile_df.distinct()\
.filter(f.col("MMSI") == 338354216)\
.filter(f.col("CallSign") == "NULL")\
.show()
# .filter(f.col("VesselName") == "ADVENTURE")\
# .filter(f.col("VesselType") == 36)\
# .filter(f.isnull("IMO"))\
# .filter(f.col("Length") == 13)\
# .filter(f.col("Width") == 4)\


In [None]:
vessel_profile_df.filter(f.col("CallSign") == "NULL").count()

In [None]:
vessel_profile_df.filter(f.isnull(f.col("CallSign"))).count()

In [None]:
delta_phi = f.radians(f.expr("lead_LAT - LAT"))
delta_lambda = f.radians(f.expr("lead_LON - LON"))
a = f.pow(f.sin(delta_phi / 2), 2) + f.cos(f.radians(f.col("LAT"))) * f.cos(f.radians(f.col("lead_LAT"))) * f.pow(f.sin(delta_lambda)/2,2)
c = 2 * f.atan2(f.sqrt(a), f.sqrt(1-a))
d = 6371 * c

In [None]:
delta_lat_port = f.radians(f.expr("ais.LAT - port.LAT"))
delta_lon_port = f.radians(f.expr("ais.LON - port.LON"))
a_port = f.pow(f.sin(delta_lat_port / 2), 2) + f.cos(f.radians(f.col("ais.LAT"))) * f.cos(f.radians(f.col("port.LAT"))) * f.pow(f.sin(delta_lon_port)/2,2)
c_port = 2 * f.atan2(f.sqrt(a_port), f.sqrt(1-a_port))
d_port = (6371 * c_port).alias("km_to_port")

In [None]:
def col_select(col:str):
  if col == "SOG":
    return f.col("SOG").alias("SOG'")
  else:
    return f.col(col)

In [None]:
windowSpec = Window.partitionBy(f.col("MMSI")).orderBy(f.asc(f.col("BaseDateTime")))
#test_df = ais_df.select([col for col in ais_df.columns if col != "SOG"])
ping_df = ais_df.select("MMSI", "BaseDateTime", "LAT", "LON",
                      f.lead(f.col("LAT")).over(windowSpec).alias("lead_LAT"),
                      f.lead(f.col("LON")).over(windowSpec).alias("lead_LON"),
                      f.lead(f.col("BaseDateTime")).over(windowSpec).alias("lead_time"),
                      d.alias("d")
                      ).alias("ais")

In [None]:
windowSpec_port = Window.partitionBy(f.col("ais.MMSI"), f.col("ais.BaseDateTime")).orderBy(f.asc(f.col("km_to_port")))

In [None]:
cross_df = ping_df.crossJoin(f.broadcast(port_df.alias("port")))
cross_df = cross_df.select("ais.MMSI", "ais.BaseDateTime", d_port, "port.UNLOCODE").filter(f.col("km_to_port") <= 35)
cross_df = cross_df.select("*", f.row_number().over(windowSpec_port).alias("order")).where(f.col("order") == 1)
cross_df = cross_df.select("ais.MMSI", "ais.BaseDateTime", "km_to_port", "port.UNLOCODE")

In [None]:
cross_df.count()

In [None]:
final_df = ping_df.join(cross_df.alias("cross"), (f.expr("ais.MMSI = cross.MMSI AND ais.BaseDateTime = cross.BaseDateTime")), "left_outer")

In [None]:
ais_df.count()

In [None]:
final_df = final_df.select("ais.MMSI", "ais.BaseDateTime", "ais.LAT", "ais.LON", "d", "UNLOCODE").fillna("None", "UNLOCODE")

In [None]:
final_df.groupBy("UNLOCODE").agg(f.count("*")).show()

In [4]:
test_df = spark.read.parquet("gs://vessel-traffic-parquet-data/raw_day/year=2024/month=11/AIS_2024_11_08.parquet")

                                                                                

In [None]:
test_df.createOrReplaceTempView("temp")
spark.sql("SELECT * FROM temp WHERE sec_since_prev_ping = (SELECT MAX(sec_since_prev_ping) FROM temp)").show()

In [None]:
test_df.count()

In [None]:
test_df.filter(near_coastline_polygon(f.col("LAT"), f.col("LON")) == True).count()

[Stage 2:>                                                          (0 + 6) / 6]