In [10]:
from datetime import datetime, date
import pandas as pd
import plotly.express as px

from pyspark.ml import Pipeline
from pyspark.sql import Row
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import StringIndexer, VectorIndexer, VectorAssembler

### Modelos utilizados ###

from pyspark.ml.classification import GBTClassifier
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import RandomForestClassifier

spark = SparkSession.builder.getOrCreate()

df = spark.read.option("delimiter", ";").option("header", True).csv('Data/inmet_filtered_A401_H_2000-05-12_2023-05-16.csv')

dfTransformado = df.withColumn('PRECIPITACAO TOTAL, HORARIO(mm)', regexp_replace('PRECIPITACAO TOTAL, HORARIO(mm)', ',', '.').cast(DoubleType()))

dfTransformado = dfTransformado.drop(
           'PRESSAO ATMOSFERICA REDUZIDA NIVEL DO MAR, AUT(mB)',
           'PRESSAO ATMOSFERICA REDUZIDA NIVEL DO MAR, AUT(mB)',
           'PRESSAO ATMOSFERICA MAX.NA HORA ANT. (AUT)(mB)',
           'PRESSAO ATMOSFERICA MIN. NA HORA ANT. (AUT)(mB)',
           'PRESSAO ATMOSFERICA AO NIVEL DA ESTACAO, HORARIA(mB)',
           'TEMPERATURA DA CPU DA ESTACAO(°C)',
           'TEMPERATURA DO AR - BULBO SECO, HORARIA(°C)',
           'TEMPERATURA DO PONTO DE ORVALHO(°C)',
           'TEMPERATURA MAXIMA NA HORA ANT. (AUT)(°C)',
           'TEMPERATURA MINIMA NA HORA ANT. (AUT)(°C)',
           'TEMPERATURA ORVALHO MAX. NA HORA ANT. (AUT)(°C)',
           'TEMPERATURA ORVALHO MIN. NA HORA ANT. (AUT)(°C)',
           'TENSAO DA BATERIA DA ESTACAO(V)',
           'UMIDADE REL. MAX. NA HORA ANT. (AUT)(%)',
           'UMIDADE REL. MIN. NA HORA ANT. (AUT)(%)',
           'UMIDADE RELATIVA DO AR, HORARIA(%)',
           'VENTO, DIRECAO HORARIA (gr)(° (gr))',
           'VENTO, RAJADA MAXIMA(m/s)',
           'VENTO, VELOCIDADE HORARIA(m/s)',
           'Unnamed: 22',
           '_c22'
)

df = dfTransformado.withColumn("Inicio_Semana",date_sub(next_day(col("Data Medicao"),"sunday"),7))\
                    .groupBy("Inicio_Semana").agg\
                        (sum("PRECIPITACAO TOTAL, HORARIO(mm)").cast("float").alias("Total de Chuvas(mm)"),\
                         sum("PRECIPITACAO TOTAL, HORARIO(mm)").cast("float").alias("Média diária de chuvas(mm)"))\
                    .orderBy("Inicio_Semana")

df = df.select('Inicio_Semana','Total de Chuvas(mm)',col('Média diária de chuvas(mm)')/ 7 )

df = df.withColumnRenamed("(Média diária de chuvas(mm) / 7)","Média diária de chuvas(mm)")

dfCompleto = df.withColumn('Semana_Ano',weekofyear(df.Inicio_Semana))
dfCompleto.toPandas()

Unnamed: 0,Inicio_Semana,Total de Chuvas(mm),Média diária de chuvas(mm),Semana_Ano
0,2000-05-07,,,18
1,2000-05-14,,,19
2,2000-05-21,,,20
3,2000-05-28,,,21
4,2000-06-04,,,22
...,...,...,...,...
1197,2023-04-16,6.600000,0.942857,15
1198,2023-04-23,94.800003,13.542858,16
1199,2023-04-30,49.400002,7.057143,17
1200,2023-05-07,117.800003,16.828572,18


In [11]:
dfCompleto = dfCompleto.withColumn("Ano", substring(dfCompleto.Inicio_Semana, 1,4))

dfCompleto.toPandas()

Unnamed: 0,Inicio_Semana,Total de Chuvas(mm),Média diária de chuvas(mm),Semana_Ano,Ano
0,2000-05-07,,,18,2000
1,2000-05-14,,,19,2000
2,2000-05-21,,,20,2000
3,2000-05-28,,,21,2000
4,2000-06-04,,,22,2000
...,...,...,...,...,...
1197,2023-04-16,6.600000,0.942857,15,2023
1198,2023-04-23,94.800003,13.542858,16,2023
1199,2023-04-30,49.400002,7.057143,17,2023
1200,2023-05-07,117.800003,16.828572,18,2023


In [12]:
inema = spark.read.option("header",True).csv("Data/inema_filtered_balneabilidade_farol_barra.csv")

inema = inema.withColumnRenamed("01/2007","numero_boletim").withColumnRenamed("Farol da Barra - SSA FB 100","ponto_codigo").withColumnRenamed("Indisponível","categoria")

inema = inema.withColumn("Ano",substring(inema.numero_boletim, 4,7)).withColumn("Semana_Ano", substring(inema.numero_boletim, 1,2))

inema.toPandas()

Unnamed: 0,numero_boletim,categoria,ponto_codigo,Ano,Semana_Ano
0,01/2007,Indisponível,Farol da Barra - SSA FB 200,2007,01
1,01/2007,Indisponível,Porto da Barra - SSA PB 100,2007,01
2,01/2007,Indisponível,Santa Maria - SSA SM 100,2007,01
3,02/2007,Indisponível,Farol da Barra - SSA FB 100,2007,02
4,02/2007,Indisponível,Farol da Barra - SSA FB 200,2007,02
...,...,...,...,...,...
2826,20/2023,Imprópria,Santa Maria - SSA SM 100,2023,20
2827,21/2023,Imprópria,Farol da Barra - SSA FB 100,2023,21
2828,21/2023,Imprópria,Farol da Barra - SSA FB 200,2023,21
2829,21/2023,Própria,Porto da Barra - SSA PB 100,2023,21


In [13]:
dfCompleto = dfCompleto.join(inema,["Ano","Semana_Ano"])

dfCompleto = dfCompleto.drop("numero_boletim","Ano","Semana_Ano")

dfCompleto.toPandas()

Unnamed: 0,Inicio_Semana,Total de Chuvas(mm),Média diária de chuvas(mm),categoria,ponto_codigo
0,2008-01-20,,,Indisponível,Santa Maria - SSA SM 100
1,2008-01-20,,,Indisponível,Porto da Barra - SSA PB 100
2,2008-01-20,,,Indisponível,Farol da Barra - SSA FB 200
3,2008-01-20,,,Indisponível,Farol da Barra - SSA FB 100
4,2008-08-10,17.400000,2.485714,Indisponível,Santa Maria - SSA SM 100
...,...,...,...,...,...
2738,2022-10-02,0.200000,0.028571,Própria,Farol da Barra - SSA FB 100
2739,2022-04-17,274.399994,39.199999,Indisponível,Santa Maria - SSA SM 100
2740,2022-04-17,274.399994,39.199999,Indisponível,Porto da Barra - SSA PB 100
2741,2022-04-17,274.399994,39.199999,Indisponível,Farol da Barra - SSA FB 200


In [14]:
### Contabilizando os Nulls por coluna ###

dfCompleto.select([count(when(isnull(c), c)).alias(c) for c in dfCompleto.columns]).toPandas()

Unnamed: 0,Inicio_Semana,Total de Chuvas(mm),Média diária de chuvas(mm),categoria,ponto_codigo
0,0,112,112,0,0


In [15]:
### Removendo os Nulls ####

dfCompleto = dfCompleto.replace('?', None).dropna(how='any')

In [16]:
### Transformando os valores qualitativos em numéricos => 0 = Própria | 1 = Imprópria | 2 = Indisponível ###

dfCompleto = StringIndexer(
    inputCol='categoria', 
    outputCol='Categoria_Indexada', 
    handleInvalid='keep').fit(dfCompleto).transform(dfCompleto)

In [17]:
Farol200 = dfCompleto.filter(dfCompleto.ponto_codigo == "Farol da Barra - SSA FB 200")

In [19]:
Farol200.show()

+-------------+-------------------+--------------------------+------------+--------------------+------------------+
|Inicio_Semana|Total de Chuvas(mm)|Média diária de chuvas(mm)|   categoria|        ponto_codigo|Categoria_Indexada|
+-------------+-------------------+--------------------------+------------+--------------------+------------------+
|   2008-08-10|               17.4|        2.4857142312186107|Indisponível|Farol da Barra - ...|               2.0|
|   2007-08-05|               22.4|        3.1999999455043246|Indisponível|Farol da Barra - ...|               2.0|
|   2008-05-11|                5.8|        0.8285714558192662|Indisponível|Farol da Barra - ...|               2.0|
|   2007-03-11|               53.0|         7.571428571428571|Indisponível|Farol da Barra - ...|               2.0|
|   2007-07-08|               17.0|        2.4285714285714284|     Própria|Farol da Barra - ...|               0.0|
|   2008-06-01|                0.2|      0.028571428997176036|     Própr

In [33]:
Farol200.toPandas().write.options(header='True', delimiter=',').csv("Data/preprocessado/Farol200")

AttributeError: 'DataFrame' object has no attribute 'write'

In [36]:
from pathlib import Path  

filepath = Path('Data/preprocessado/Farol200/out.csv')  

filepath.parent.mkdir(parents=True, exist_ok=True)  

Farol200.toPandas().to_csv(filepath)  
