# Sobre os dados

Como uma empresa orientada para a comunidade, apreciamos o uso de dados abertos em nossa análise. Portanto, para este
teste, queremos que você baixe e seja criativo em sua análise sobre Open Ocean Data do
Digital Ocean Institute da Irlanda.

In [0]:
from pyspark.sql.functions import *
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql.types import IntegerType, FloatType, DoubleType, DateType, TimestampType, StringType
from pyspark.sql import functions as F
from pyspark.sql.window import Window

import pandas as pd

from datetime import date, datetime, timedelta

from pandas_profiling import ProfileReport

import plotly.express as px

In [0]:
# converte mes numerico pra string
def func(month):
    if month == 1: return 'JAN'
    if month == 2: return 'FEB'
    if month == 3: return 'MAR'
    if month == 4: return 'APR'
    if month == 5: return 'MAY'
    if month == 6: return 'JUN'
    if month == 7: return 'JUL'
    if month == 8: return 'AUG'
    if month == 9: return 'SEP'
    if month == 10: return 'OCT'
    if month == 11: return 'NOV'
    if month == 12: return 'DEC'
    return month

# funcção converte string para datetime
func_time_datetime =  udf (lambda x: datetime.strptime(x, '%Y-%m-%dT%H:%M:%SZ'), DateType())

# funcção converte string para datetime
func_time_stamp =  udf (lambda x: datetime.strptime(x, '%Y-%m-%dT%H:%M:%SZ'), TimestampType())
func_time_stamp_simples =  udf (lambda x: datetime.strptime(x, '%Y-%m-%d'), TimestampType())

# funcção converte string para datetime
func_time_string =  udf (lambda x: datetime.strptime(x, '%Y-%m-%d'), StringType())

***

## 1.1) Entendendo os dados

Uma análise sobre Open Ocean Data do Digital Ocean Institute da Irlanda. 


**Intervalo de tempo:** \
Análise feita num intervalo de 1 ano, do dia 21/09/2021 a 21/09/2022.

***

## 1.2) Coleta de dados

### Tides - Marés
Medidas sobre Marés coletadas por várias bóias no mar da Irlanda

Time - UTC \
Station ID \
Latitude - degrees_north \
Longitude - degrees_east \
Water Level (LAT) - meters \
Water Level (OD Malin) - meters \
Quality Control Flag

In [0]:
# File location and type
file_location_td = "/FileStore/tables/tide/*.csv"

# The applied options are for CSV files. For other file types, these will be ignored.
df_tides = spark.read.format('csv') \
                .option("inferSchema", 'true') \
                .option("header", 'true') \
                .option("sep", ',') \
                .load(file_location_td)\
                .drop_duplicates()

# remove a primeira linha que possui dados discrepantes do resto da base
df_tides = df_tides.filter(~col("time").contains(df_tides.first()[0]))

# convert types
df_tides = df_tides.withColumn("latitude", df_tides["latitude"].cast(FloatType()))\
                   .withColumn("longitude", df_tides["longitude"].cast(FloatType()))\
                   .withColumn("Water_Level_LAT", df_tides["Water_Level_LAT"].cast(FloatType()))\
                   .withColumn("Water_Level_OD_Malin", df_tides["Water_Level_OD_Malin"].cast(FloatType()))\

display(df_tides)
df_tides.count()

time,station_id,latitude,longitude,Water_Level_LAT,Water_Level_OD_Malin,QC_Flag
2021-09-20T22:35:00Z,Aranmore Island - Leabgarrow,54.9905,-8.4955,0.718,-1.485,1
2021-09-21T20:15:00Z,Aranmore Island - Leabgarrow,54.9905,-8.4955,3.264,1.061,1
2021-09-22T18:55:00Z,Aranmore Island - Leabgarrow,54.9905,-8.4955,4.097,1.894,1
2021-09-23T14:40:00Z,Aranmore Island - Leabgarrow,54.9905,-8.4955,1.097,-1.106,1
2021-09-20T23:50:00Z,Aranmore Island - Leabgarrow,54.9905,-8.4955,0.412,-1.791,1
2021-09-21T13:15:00Z,Aranmore Island - Leabgarrow,54.9905,-8.4955,0.92,-1.283,1
2021-09-21T18:40:00Z,Aranmore Island - Leabgarrow,54.9905,-8.4955,4.094,1.891,1
2021-09-21T23:15:00Z,Aranmore Island - Leabgarrow,54.9905,-8.4955,0.864,-1.339,1
2021-09-22T00:15:00Z,Aranmore Island - Leabgarrow,54.9905,-8.4955,0.52,-1.683,1
2021-09-24T02:00:00Z,Aranmore Island - Leabgarrow,54.9905,-8.4955,0.942,-1.261,1


#### Estações na base de dados

In [0]:
# df_tides.station_id.unique()
display(df_tides.select('station_id', 'latitude', 'longitude').distinct())

station_id,latitude,longitude
Aranmore Island - Leabgarrow,54.9905,-8.4955
Ballyglass Harbour,54.2536,-9.8928
Howth Water Level 1,53.391335,-6.06809
Union Hall Harbor,51.558964,-9.13349
Dingle Harbour,52.13924,-10.27732
Roonagh Pier,53.76235,-9.90442
Dunmore East Harbour,52.14767,-6.99188
Wexford Harbour,52.3385,-6.4589
Castletownbere Port,51.6496,-9.9034
Inishmore,53.1178,-9.6669


***

### Waves - Ondas
Medidas sobre Ondas coletadas por várias bóias no mar da Irlanda


Time - UTC\
Station_id \
Latitude - degrees_north \
Longitude - degrees_east \
Peak Period - S\
Peak Direction - degrees_true\
Upcross Period - S\
Significant Wave Height - cm\
Maximum Wave Height - cm\
Sea Temperature - degree_C\
Current Speed - m/s\
Current Direction - degrees_true

In [0]:
# File location and type
file_location_wv = "/FileStore/tables/wave/*.csv"

# The applied options are for CSV files. For other file types, these will be ignored.
df_waves = spark.read.format('csv') \
                .option("inferSchema", 'true') \
                .option("header", 'true') \
                .option("sep", ',') \
                .load(file_location_wv) \
                .drop_duplicates()\
                .select('time', 'station_id', 'latitude', 'longitude', 'PeakPeriod', 'PeakDirection', 'UpcrossPeriod', 'SignificantWaveHeight', 'Hmax', 
                        'SeaTemperature', 'MeanCurSpeed', 'MeanCurDirTo')

# remove a primeira linha que possui dados discrepantes do resto da base
df_waves = df_waves.filter(~col("time").contains(df_waves.first()[0]))

# convert types
df_waves = df_waves.withColumn("latitude", df_waves["latitude"].cast(FloatType()))\
                   .withColumn("longitude", df_waves["longitude"].cast(FloatType()))\
                   .withColumn("PeakPeriod", df_waves["PeakPeriod"].cast(FloatType()))\
                   .withColumn("PeakDirection", df_waves["PeakDirection"].cast(FloatType()))\
                   .withColumn("UpcrossPeriod", df_waves["UpcrossPeriod"].cast(FloatType()))\
                   .withColumn("SignificantWaveHeight", df_waves["SignificantWaveHeight"].cast(FloatType()))\
                   .withColumn("Hmax", df_waves["Hmax"].cast(FloatType()))\
                   .withColumn("SeaTemperature", df_waves["SeaTemperature"].cast(FloatType()))\
                   .withColumn("MeanCurSpeed", df_waves["MeanCurSpeed"].cast(FloatType()))\
                   .withColumn("MeanCurDirTo", df_waves["MeanCurDirTo"].cast(FloatType()))\

display(df_waves)
df_waves.count()

time,station_id,latitude,longitude,PeakPeriod,PeakDirection,UpcrossPeriod,SignificantWaveHeight,Hmax,SeaTemperature,MeanCurSpeed,MeanCurDirTo
2021-09-23T07:00:00Z,Bantry Bay,51.647,-9.681,6.25,239.9121,2.89,40.0,59.0,17.01,,
2021-09-23T09:40:00Z,Bantry Bay,51.647,-9.681,,,,,,17.23,,
2021-09-23T12:45:00Z,Bantry Bay,51.647,-9.681,,,,,,17.74,,
2021-09-22T00:25:00Z,Bantry Bay,51.647,-9.681,,,,,,16.84,,
2021-09-22T01:05:00Z,Bantry Bay,51.647,-9.681,,,,,,16.84,,
2021-09-22T06:55:00Z,Bantry Bay,51.647,-9.681,,,,,,16.89,,
2021-09-22T09:05:00Z,Bantry Bay,51.647,-9.681,,,,,,17.01,,
2021-09-22T11:45:00Z,Bantry Bay,51.647,-9.681,,,,,,17.1,,
2021-09-24T12:30:00Z,Bantry Bay,51.647,-9.681,11.76,257.93405,3.24,54.0,72.0,17.74,,
2021-09-24T12:45:00Z,Bantry Bay,51.647,-9.681,,,,,,17.69,,


#### Estações na base de dados

Observaçao: estacoes AMETS Berth B Wave Buoy e Westwave Wave Buoy estava com sua base de dados vazia

In [0]:
# df_waves.station_id.unique()
display(df_waves.select('station_id', 'latitude', 'longitude').distinct())

station_id,latitude,longitude
Bantry Bay,51.647,-9.681
SmartBay Wave Buoy,53.228333,-9.262278
AMETS Berth A Wave Buoy,54.2753,-10.29737
Brandon Bay,52.282333,-10.094833
Clew Bay,53.801353,-9.911498
AMETS Berth B Wave Buoy,54.2251,-10.15099


***

## 1.3) Limpeza dos dados

### Tides

In [0]:
df_tides.count()

In [0]:
df_tides.select(*[(
                   F.count(F.when((F.isnan(c) | F.col(c).isNull()), c)) if t not in ("timestamp", "date")
                   else F.count(F.when(F.col(c).isNull(), c))
                  ).alias(c)
                  for c, t in df_tides.dtypes if c in df_tides.columns
                 ]).show()

### Waves

In [0]:
df_waves.count()

In [0]:
df_waves.select(*[(
                   F.count(F.when((F.isnan(c) | F.col(c).isNull()), c)) if t not in ("timestamp", "date")
                   else F.count(F.when(F.col(c).isNull(), c))
                  ).alias(c)
                  for c, t in df_waves.dtypes if c in df_waves.columns
                 ]).show()