### Setup spark session

In [1]:
import geomesa_pyspark
conf = geomesa_pyspark.configure(
    jars=['/usr/lib/spark/jars/geomesa-hbase-spark-runtime_2.11-2.1.0-m.2.jar'],
    packages=['geomesa_pyspark','pytz'],
    spark_home='/usr/lib/spark/').\
    setAppName('MyTestApp')

conf.get('spark.master')
# u'yarn'

from pyspark.sql import SparkSession

spark = ( SparkSession
    .builder
    .config(conf=conf)
    .getOrCreate()
)

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
50,application_1599131715985_0051,pyspark3,idle,Link,Link,✔


SparkSession available as 'spark'.


### Import dependencies and create datastore connection 

In [2]:
params = {
"hbase.zookeepers": "hbase.optix-ons-local:2181",
"hbase.catalog": "ons-historical"
}
#ee refers to exactearth specifically. Other datasets we have include: orbcomm and adsbx
feature = "ee"
ee = ( spark
.read
.format("geomesa")
.options(**params)
.option("geomesa.feature", feature)
.load()
)
ee.createOrReplaceTempView("ee")

## Running the query

### Lautoka

In [7]:
#Make sure the position column is used for the bounding boxes. query will not work with lat/long
#position is a boolean and pyspark breaks down when trying to export a boolean to csv, hence the st_asText wrapper around position
#allows us to export the data into csv
#The st_makeBBOX is a bounding box wrapper, a point to remember: 'X' is LONGITUDE and 'Y' is LATITUDE when working with geographic data.
#The timestamp is in days, the code below is extracting the data for the last 100 days for the defined boundary box.
#The current code takes ~10 seconds to run, so we can make either the boundary box larger, or the timestamp larger
df = spark.sql("""
SELECT
dtg, mmsi, vessel_name, callsign, vessel_type_code, vessel_class, length, width, flag_country, flag_code, longitude, latitude, st_asText(position), vessel_type, vessel_type_cargo, vessel_type_main, vessel_type_sub,
destination, eta, draught, sog, cog, rot, heading, nav_status, nav_status_code, source, message_type
FROM
ee
WHERE st_contains(st_makeBBOX(177.2597,-17.7725,177.637,-17.4265), position)
AND dtg < date_add(current_timestamp(), -1)
AND dtg > date_add(current_timestamp(), -365)
""")

In [None]:
#Coalesce saves file as one csv output. Else the data is exported as multiple csvs
df.coalesce(1).write.option("header", "true").csv("s3://optix.ons.jupyter/jupyter/kdhingra/Suva")

### Suva

In [None]:
#Running same query for Suva
df = spark.sql("""
SELECT
dtg, mmsi, vessel_name, callsign, vessel_type_code, vessel_class, length, width, flag_country, flag_code, longitude, latitude, st_asText(position), vessel_type, vessel_type_cargo, vessel_type_main, vessel_type_sub,
destination, eta, draught, sog, cog, rot, heading, nav_status, nav_status_code, source, message_type
FROM
ee
WHERE st_contains(st_makeBBOX(178.0403,-18.4516,178.8282,-17.8369), position)
AND dtg < date_add(current_timestamp(), -1)
AND dtg > date_add(current_timestamp(), -365)
""")

## Extracting the data

In [8]:
df.coalesce(1).write.option("header", "true").csv("s3://optix.ons.jupyter/jupyter/kdhingra/Suva")

An error was encountered:
Invalid status code '400' from http://ip-10-250-1-28.ec2.internal:8998/sessions/33/statements/8 with error payload: "requirement failed: Session isn't active."


### Cook Islands

In [3]:
df = spark.sql("""
SELECT
dtg, mmsi, vessel_name, callsign, vessel_type_code, vessel_class, length, width, flag_country, flag_code, longitude, latitude, st_asText(position), vessel_type, vessel_type_cargo, vessel_type_main, vessel_type_sub,
destination, eta, draught, sog, cog, rot, heading, nav_status, nav_status_code, source, message_type
FROM
ee
WHERE st_contains(st_makeBBOX(-159.796901,-21.21187,-159.77495,-21.190184), position)
AND dtg < date_add(current_timestamp(), -1)
AND dtg > date_add(current_timestamp(), -365)
""")

In [4]:
df.coalesce(4).write.option("header", "true").csv("s3://optix.ons.jupyter/jupyter/kdhingra/cook-islands")

### Solomon Islands

Honiara

In [3]:
df = spark.sql("""
SELECT
dtg, mmsi, vessel_name, callsign, vessel_type_code, vessel_class, length, width, flag_country, flag_code, longitude, latitude, st_asText(position), vessel_type, vessel_type_cargo, vessel_type_main, vessel_type_sub,
destination, eta, draught, sog, cog, rot, heading, nav_status, nav_status_code, source, message_type
FROM
ee
WHERE st_contains(st_makeBBOX(159.813175,-9.499527,160.137272,-9.237509), position)
AND dtg < date_add(current_timestamp(), -1)
AND dtg > date_add(current_timestamp(), -365)
""")

In [4]:
df.coalesce(4).write.option("header", "true").csv("s3://optix.ons.jupyter/jupyter/kdhingra/honiara-solomon-islands")

Noro

In [5]:
df = spark.sql("""
SELECT
dtg, mmsi, vessel_name, callsign, vessel_type_code, vessel_class, length, width, flag_country, flag_code, longitude, latitude, st_asText(position), vessel_type, vessel_type_cargo, vessel_type_main, vessel_type_sub,
destination, eta, draught, sog, cog, rot, heading, nav_status, nav_status_code, source, message_type
FROM
ee
WHERE st_contains(st_makeBBOX(157.131933,-8.302952,157.260289,-8.180972), position)
AND dtg < date_add(current_timestamp(), -1)
AND dtg > date_add(current_timestamp(), -365)
""")

In [6]:
df.coalesce(4).write.option("header", "true").csv("s3://optix.ons.jupyter/jupyter/kdhingra/noro-solomon-islands")

## Vanuatu

Port Vila

In [5]:
df = spark.sql("""
SELECT
dtg, mmsi, vessel_name, callsign, vessel_type_code, vessel_class, length, width, flag_country, flag_code, longitude, latitude, st_asText(position), vessel_type, vessel_type_cargo, vessel_type_main, vessel_type_sub,
destination, eta, draught, sog, cog, rot, heading, nav_status, nav_status_code, source, message_type
FROM
ee
WHERE st_contains(st_makeBBOX(168.141321,-17.824701,168.39538,-17.648119), position)
AND dtg < date_add(current_timestamp(), -1)
AND dtg > date_add(current_timestamp(), -365)
""")

In [6]:
#increasing no. of csvs to make the output run faster. Prevents significant lag in saving
df.coalesce(10).write.option("header", "true").csv("s3://optix.ons.jupyter/jupyter/kdhingra/port-vila-vanuatu")

Luganville

In [7]:
df = spark.sql("""
SELECT
dtg, mmsi, vessel_name, callsign, vessel_type_code, vessel_class, length, width, flag_country, flag_code, longitude, latitude, st_asText(position), vessel_type, vessel_type_cargo, vessel_type_main, vessel_type_sub,
destination, eta, draught, sog, cog, rot, heading, nav_status, nav_status_code, source, message_type
FROM
ee
WHERE st_contains(st_makeBBOX(167.143315,-15.544086,167.244407,-15.498857), position)
AND dtg < date_add(current_timestamp(), -1)
AND dtg > date_add(current_timestamp(), -365)
""")

In [8]:
df.coalesce(15).write.option("header", "true").csv("s3://optix.ons.jupyter/jupyter/kdhingra/luganville-vanuatu")

## Kiribati

Port of Betio

In [9]:
df = spark.sql("""
SELECT
dtg, mmsi, vessel_name, callsign, vessel_type_code, vessel_class, length, width, flag_country, flag_code, longitude, latitude, st_asText(position), vessel_type, vessel_type_cargo, vessel_type_main, vessel_type_sub,
destination, eta, draught, sog, cog, rot, heading, nav_status, nav_status_code, source, message_type
FROM
ee
WHERE st_contains(st_makeBBOX(172.899429,1.330346,172.965518,1.404222), position)
AND dtg < date_add(current_timestamp(), -1)
AND dtg > date_add(current_timestamp(), -365)
""")

In [10]:
df.coalesce(5).write.option("header", "true").csv("s3://optix.ons.jupyter/jupyter/kdhingra/betio-kiribati")

## Palau

Port of Malakal

In [4]:
df = spark.sql("""
SELECT
dtg, mmsi, vessel_name, callsign, vessel_type_code, vessel_class, length, width, flag_country, flag_code, longitude, latitude, st_asText(position), vessel_type, vessel_type_cargo, vessel_type_main, vessel_type_sub,
destination, eta, draught, sog, cog, rot, heading, nav_status, nav_status_code, source, message_type
FROM
ee
WHERE st_contains(st_makeBBOX(134.44567,7.321713,134.467089,7.340101), position)
AND dtg < date_add(current_timestamp(), -1)
AND dtg > date_add(current_timestamp(), -365)
""")

In [5]:
df.coalesce(15).write.option("header", "true").csv("s3://optix.ons.jupyter/jupyter/kdhingra/malakal-palau")