# Datasets

### Configuring the server

In [1]:
import geomesa_pyspark 
conf = geomesa_pyspark.configure(jars=['/usr/lib/spark/jars/geomesa-hbase-spark-runtime_2.11-2.1.0-m.2.jar'],
                                 packages=['geomesa_pyspark','pytz'], 
                                 spark_home='/usr/lib/spark/') 
#setAppName('MyTestApp')
conf.get('spark.master') # u'yarn'
from pyspark.sql import SparkSession
spark = ( SparkSession.builder.config(conf=conf).getOrCreate() )

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
112,application_1599127947832_0113,pyspark3,idle,Link,Link,✔


SparkSession available as 'spark'.


In [2]:
params = {    "hbase.zookeepers": "hbase.optix-ons-local:2181",    "hbase.catalog": "ons-historical" } 
feature = "ee" 
ee = ( spark    .read    .format("geomesa")    .options(**params)    .option("geomesa.feature", feature)    .load() )
ee.createOrReplaceTempView("ee")

### Incident
In order to know the wave, there was an indication of communication, we selected the records with message_type equal to 12, 13, or 14, peas, these types specifically identify some type of problem in the vessel. In addition, we selected the time period 2020-06-01, as there is a lot of data from the database and validating all data would be impossible, due to the query processing time. However, finding message_type 12, 13 and 14 is difficult and we were also unable to select a short time or filter by a specific region.

In [3]:
spark.sql(""" select message_type, latitude, longitude, CAST (dtg AS DATE), count(*) 
                    from ee where 
                    dtg >= cast('2020-07-01' as DATE)
                    AND (message_type = 12 or message_type = 13 or message_type = 14)
                    group by dtg, latitude, longitude, message_type order by dtg""")

### Average speed
To find out what the average speed was for a given region, we add all vessel speeds at any latitude in the range (from -20 to -25) and divide by the number of vessel records at longitude,
so we have the average speed per longitude

In [None]:
spark.sql(""" SELECT longitude, SUM(sog)/COUNT(*) 
                    FROM ee WHERE 
                    st_intersects(position,st_makeBox2d(st_point(-46,-20),st_point(15,-25))) 
                    AND dtg >= cast('2020-09-01' as DATE) 
                    group by longitude order by longitude""")

### Shadow area
Shadow area is a place where communication does not work well and therefore, it is not possible to send the position record.
Thus, we analyzed the uploaded files in which the time difference between the file's dtg and ts_pos_utc is greater than 10 min.
In this query we show which regions are more likely to be a shadow area based on the amount of records that failed to be sent.

In [None]:
spark.sql(""" SELECT latitude, longitude, COUNT(*)
                    FROM ee WHERE 
                    st_intersects(position,st_makeBox2d(st_point(-46,-20),st_point(15,25))) 
                    AND dtg >= cast('2020-09-01' as DATE) 
                    AND (minute(dtg) - minute(ts_pos_utc))>= 10
                    group by latitude, longitude order by longitude""")

In [None]:
# Used to save the results in a CSV file
df1.coalesce(1).write.option("header", "true").csv("s3://optix.ons.jupyter/jupyter/dwater/folder/dados")