In [24]:
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark import SparkContext
from pyspark.sql.window import Window

import pandas as pd
from datetime import datetime

from pyspark.sql.types import IntegerType,DateType
from pyspark.sql.functions import udf,col

In [25]:
appName = "AIS Data Cleaning"
master = "local"
# Create Spark session
spark = SparkSession.builder \
    .appName(appName) \
    .master(master) \
    .getOrCreate()

df = spark.read.json('2020 data/ais-processed-log-2020-*.json') #all 2020 data

In [26]:
# df.count()
# df.printSchema()

**Dynamic Info**

In [27]:
# get the position info for ais type 1,2,3
# filter speed > 0, mmsi has 9 digits, lon & lat within selected boundary
dynamic_info = df.filter(col('type').isin([1,2,3]))\
        .select("mmsi","speed","time","lon","lat")\
        .filter(df.speed.isNotNull()).filter(df.speed > 0)\
        .filter((df.mmsi >= (10**8)) & (df.mmsi < (10**9)))\
        .filter((df.lon >= 103.4232) & (df.lon <=104.1297) & (df.lat >= 0.9854) & (df.lat <=1.4942))

# remove mmsi that appeared < 5 times
w = Window.partitionBy('mmsi')
dynamic_info = dynamic_info.select('mmsi','speed','time', count('mmsi').over(w).alias('n')).sort('mmsi', 'time')
dynamic_info1 = dynamic_info.filter(dynamic_info.n > 4).drop('n')


# get date and hour for each record 
date =  udf (lambda x: datetime.strptime(str(x)[0:10], '%Y-%m-%d'), DateType())
hour = udf (lambda x: int(str(x)[11:13]), IntegerType())
dynamic_info1 = dynamic_info1.withColumn('date',date(col('time'))).withColumn('hour',hour(col('time')))

# get the average speed for each mmsi for each hour (standardise the time interval to be 1 hour)
dynamic_info2 = dynamic_info1.drop('time').groupBy('mmsi','date','hour')\
        .agg(round(mean("speed"),1).alias('speed'))\
        .orderBy('mmsi','date','hour')

#dynamic_info2.show() #commented this line to save time

**Prepare data for Method 1 | estimated running time: 3 hrs**

In [29]:
# estimate maximum speed = mean + 3 * sd
w = Window.partitionBy('mmsi')
dynamic_info3 = dynamic_info2.select('mmsi','date','hour','speed',
                                   mean('speed').over(w).alias('mean'),
                                   stddev('speed').over(w).alias('sd'))
dynamic_info4 = dynamic_info3.withColumn('max_speed',col('mean')+3*col('sd'))\
        .drop('mean','sd').filter(col('speed') <= col('max_speed')) #remove speed > max_speed

# calculate fraction of max fuel used
dynamic_info5 = dynamic_info4.withColumn(
    'fraction_of_max_fuel', when((col("speed")/col('max_speed') <= 0.2), 0.03)\
    .when((col("speed")/col('max_speed') > 0.8), 1.0)\
    .otherwise(0.48))

# sum up fraction_of_max_fuel per day per vessel (to ease data storage and further calculation) 
dynamic_info6 = dynamic_info5.groupBy("mmsi","date")\
    .agg(round(sum('fraction_of_max_fuel'),2).alias("sum_fraction_of_max_fuel"),round(avg('speed'),1).alias("avg_speed"))

#dynamic_info6.show() #commented this line to save time

In [30]:
# output to csv
dynamic_info6.coalesce(1).write.format('com.databricks.spark.csv')\
        .options(header='true').save('clean_data_method1_2020')

**Prepare data for Method 2 | estimated running time: 2 hrs**

In [32]:
# calculate distance travelled per vessel per day
dynamic_info7 = dynamic_info2.groupBy("mmsi","date").agg({'speed':'sum'})\
    .withColumn('distance_daily(km)',round(1.852*col('sum(speed)'),2))\
    .drop('sum(speed)') # 1 knot = 1.852 km/h

#dynamic_info7.show() #commented this line to save time

In [33]:
# output to csv
dynamic_info7.coalesce(1).write.format('com.databricks.spark.csv')\
    .options(header='true').save('clean_data_method2_2020')

**Static Info (for mapping mmsi to imo) | estimated running time: 10 mins**

In [35]:
## get the reference table for mmsi-imo-shiptype
static_info = df.filter(df.type == 5)\
        .select("mmsi","imo","time")

# get latest imo number for each unique mmsi
w1 = Window.partitionBy('mmsi').orderBy(desc('time'))
static_info1 = static_info.withColumn('Rank',dense_rank().over(w1))
static_info2 = static_info1.filter(static_info1.Rank == 1)\
        .select("mmsi","imo")
static_info2.show()

+---------+-------+
|     mmsi|    imo|
+---------+-------+
|219863000|9778832|
|220206000|9260433|
|228343900|9299642|
|240836000|9412098|
|241649000|9819844|
|243733592|9514286|
|244457000|9367073|
|249290000|9421831|
|249444280|9621596|
|249558000|9728928|
|256300000|9436355|
|256463000|9418822|
|311000318|9733117|
|351754000|9636462|
|352507000|9534640|
|352690000|9301055|
|353293960|9430612|
|354625000|9404651|
|354733000|9181819|
|355297000|9073701|
+---------+-------+
only showing top 20 rows



In [36]:
# output to csv
static_info2.coalesce(1).write.format('com.databricks.spark.csv')\
    .options(header='true').save('static_info_2020')

**Repeat for 2019 data | estimated running time: 7 hrs**

In [37]:
df = spark.read.json('2019 data/ais-processed-log-2019-*.json') #all 2019 data

# get the position info for ais type 1,2,3
# filter speed > 0, mmsi has 9 digits, lon & lat within selected boundary
dynamic_info = df.filter(col('type').isin([1,2,3]))\
        .select("mmsi","speed","time","lon","lat")\
        .filter(df.speed.isNotNull()).filter(df.speed > 0)\
        .filter((df.mmsi >= (10**8)) & (df.mmsi < (10**9)))\
        .filter((df.lon >= 103.4232) & (df.lon <=104.1297) & (df.lat >= 0.9854) & (df.lat <=1.4942))

# remove mmsi that appeared < 5 times
w = Window.partitionBy('mmsi')
dynamic_info = dynamic_info.select('mmsi','speed','time', count('mmsi').over(w).alias('n')).sort('mmsi', 'time')
dynamic_info1 = dynamic_info.filter(dynamic_info.n > 4).drop('n')


# get date and hour for each record 
date =  udf (lambda x: datetime.strptime(str(x)[0:10], '%Y-%m-%d'), DateType())
hour = udf (lambda x: int(str(x)[11:13]), IntegerType())
dynamic_info1 = dynamic_info1.withColumn('date',date(col('time'))).withColumn('hour',hour(col('time')))

# get the average speed for each mmsi for each hour (standardise the time interval to be 1 hour)
dynamic_info2 = dynamic_info1.drop('time').groupBy('mmsi','date','hour')\
        .agg(round(mean("speed"),1).alias('speed'))\
        .orderBy('mmsi','date','hour')

In [38]:
# estimate maximum speed = mean + 3 * sd
w = Window.partitionBy('mmsi')
dynamic_info3 = dynamic_info2.select('mmsi','date','hour','speed',
                                   mean('speed').over(w).alias('mean'),
                                   stddev('speed').over(w).alias('sd'))
dynamic_info4 = dynamic_info3.withColumn('max_speed',col('mean')+3*col('sd'))\
        .drop('mean','sd').filter(col('speed') <= col('max_speed')) #remove speed > max_speed

# calculate fraction of max fuel used
dynamic_info5 = dynamic_info4.withColumn(
    'fraction_of_max_fuel', when((col("speed")/col('max_speed') <= 0.2), 0.03)\
    .when((col("speed")/col('max_speed') > 0.8), 1.0)\
    .otherwise(0.48))

# sum up fraction_of_max_fuel per day per vessel (to ease data storage and further calculation) 
dynamic_info6 = dynamic_info5.groupBy("mmsi","date")\
    .agg(round(sum('fraction_of_max_fuel'),2).alias("sum_fraction_of_max_fuel"),round(avg('speed'),1).alias("avg_speed"))

# output to csv
dynamic_info6.coalesce(1).write.format('com.databricks.spark.csv')\
        .options(header='true').save('clean_data_method1_2019')

In [39]:
# calculate distance travelled per vessel per day
dynamic_info7 = dynamic_info2.groupBy("mmsi","date").agg({'speed':'sum'})\
    .withColumn('distance_daily(km)',round(1.852*col('sum(speed)'),2))\
    .drop('sum(speed)') # 1 knot = 1.852 km/h

# output to csv
dynamic_info7.coalesce(1).write.format('com.databricks.spark.csv')\
    .options(header='true').save('clean_data_method2_2019')

In [40]:
## get the reference table for mmsi-imo-shiptype
static_info = df.filter(df.type == 5)\
        .select("mmsi","imo","time")

# get latest imo number for each unique mmsi
w1 = Window.partitionBy('mmsi').orderBy(desc('time'))
static_info1 = static_info.withColumn('Rank',dense_rank().over(w1))
static_info2 = static_info1.filter(static_info1.Rank == 1)\
        .select("mmsi","imo")

# output to csv
static_info2.coalesce(1).write.format('com.databricks.spark.csv')\
    .options(header='true').save('static_info_2019')