In [1]:
from helpers.helpers import *
import os
import datetime
from unidecode import unidecode
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.functions import *
from pyspark.sql.functions import split

In [2]:
###############################################################
#####################VARIAVÉIS#DE#AMBIENTE#####################
###############################################################
url_ibge = os.environ.get("IBGE-URL")
key_api = os.environ.get("API-KEY")
url_weather_api = os.environ.get("VISUAL-CROSSING")
###############################################################
###############################################################
###############################################################

In [3]:
# Iniciando o SPARK

spark = (
    SparkSession.builder.master("local")
    .appName("projeto-ibge-climatempo")
    .config("spark.ui.port", "4050")
    .getOrCreate()
)

sc = SparkContext.getOrCreate()


22/09/04 18:36:08 WARN Utils: Your hostname, gsantos-Lenovo-ideapad-330-15IKB resolves to a loopback address: 127.0.1.1; using 192.168.0.78 instead (on interface wlp2s0)
22/09/04 18:36:08 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/09/04 18:36:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Buscar cidades do Vale do Paraíba

In [4]:

# Realizando o request no site do IBGE e salvando arquivo json
response = http_requests().get(url_ibge)
dados_ibge_json = response.json()

# Criar data frame com as cidades
jsonRDD = sc.parallelize(dados_ibge_json)
df_ibge = spark.read.json(jsonRDD)

# Criar view com as cidades
df_ibge.createOrReplaceTempView('cities')

                                                                                

In [5]:
spark.sql("SELECT id,nome,microrregiao FROM cities ").show(5)

+-------+---------+--------------------+
|     id|     nome|        microrregiao|
+-------+---------+--------------------+
|3502507|Aparecida|{35051, {{35, São...|
|3503158|   Arapeí|{35052, {{35, São...|
|3503505|   Areias|{35052, {{35, São...|
|3504909|  Bananal|{35052, {{35, São...|
|3508504| Caçapava|{35050, {{35, São...|
+-------+---------+--------------------+
only showing top 5 rows



# Buscar previsão do tempo para as cidades

In [6]:

# Exportando a coluna com os nomes das cidades para uma lista python
list_cities = df_ibge.select("nome").rdd.flatMap(lambda x: x).collect()

# Consumindo os dados da API WEATHER
data_inicio = datetime.date.today()
data_final = data_inicio + datetime.timedelta(5)
dados_weather_api = requests_weather_api(
    list_cities,
    key_api,
    url_weather_api,
    metodo="/timeline/",
    periodo=f"{str(data_inicio)}/{str(data_final)}",
)


# Formatandos os dados para criação do dataframe
dados_df_weather = dados_of_df_weather(dados_weather_api)

# Criar data frame com as previsões
df_weather = spark.createDataFrame(dados_df_weather)



In [7]:
df_weather.show()

+--------------------+--------+---------+--------------------+----------+--------------------+----------+----------+----------+--------+--------+----+-------+-------+------------+
|              Cidade|Latitude|Longitude|          conditions|  datetime|              normal|precipprob|preciptype|severerisk| sunrise|  sunset|temp|tempmax|tempmin|windspeedmax|
+--------------------+--------+---------+--------------------+----------+--------------------+----------+----------+----------+--------+--------+----+-------+-------+------------+
|Aparecida, SP, Br...|-22.8494| -45.2318|      Rain, Overcast|2022-09-04|{tempmax -> [10.7...|     100.0|    [rain]|      10.0|06:08:24|17:51:51|14.5|   17.8|   12.9|        14.4|
|Aparecida, SP, Br...|-22.8494| -45.2318|    Partially cloudy|2022-09-05|{tempmax -> [11.0...|      23.8|    [rain]|      10.0|06:07:26|17:52:09|16.3|   24.9|   11.2|        12.6|
|Aparecida, SP, Br...|-22.8494| -45.2318|    Partially cloudy|2022-09-06|{tempmax -> [12.3...|      

In [8]:
# DROP DAS COLUNAS QUE NÃO SERÃO UTILIZADAS
df_weather_2 = df_weather.drop(F.col('normal')).drop(F.col('severerisk'))

In [9]:
df_weather_renamed = (df_weather_2.withColumnRenamed('conditions','CondicaoDoTempo')\
            .withColumnRenamed('datetime','Data')\
            .withColumnRenamed('precipprob','ChanceDeChuva')\
            .withColumnRenamed('preciptype','VaiChover')\
            .withColumnRenamed('sunrise','NascerDoSol')\
            .withColumnRenamed('sunset','PorDoSol')\
            .withColumnRenamed('temp','TemperaturaMedia')\
            .withColumnRenamed('tempmax','TemperaturaMaxima')\
            .withColumnRenamed('tempmin','TemperaturaMinima')\
            .withColumnRenamed('windspeedmax','VelocidadeMaximaDoVento'))

In [11]:
# CONVERTENDO A COLUNA VAICHOVER EM STRING
df_weather_alter_type = df_weather_renamed.withColumn("VaiChover", df_weather_renamed["VaiChover"].cast("string"))

In [12]:
# ATRIBUINDO OS VALORES SIM E NÃO A COLUNA VAICHOVER
df_weather_rain = df_weather_alter_type.withColumn('VaiChover',F.when((F.col('VaiChover')=='[rain]'), F.lit('Sim'))
                                              .when((F.col('VaiChover').isNull()),F.lit('Não')))                           
                                    

In [26]:
# CRIAÇÃO DAS COLUNAS REGIÃO E PAIS
df_weather_region = df_weather_rain.withColumn('Cidade2', split(df_weather_rain['Cidade'], ',').getItem(0)) \
       .withColumn('Regiao', split(df_weather_rain['Cidade'], ', ').getItem(1)) \
       .withColumn('Pais', split(df_weather_rain['Cidade'], ', ').getItem(2))



In [35]:
df_weather_region=df_weather_region.drop('Cidade')

In [36]:
df_weather_region.show(truncate=False,vertical=True)

-RECORD 0-----------------------------------------
 Latitude                | -22.8494               
 Longitude               | -45.2318               
 CondicaoDoTempo         | Rain, Overcast         
 Data                    | 2022-09-04             
 ChanceDeChuva           | 100.0                  
 VaiChover               | Sim                    
 NascerDoSol             | 06:08:24               
 PorDoSol                | 17:51:51               
 TemperaturaMedia        | 14.5                   
 TemperaturaMaxima       | 17.8                   
 TemperaturaMinima       | 12.9                   
 VelocidadeMaximaDoVento | 14.4                   
 Cidade2                 | Aparecida              
 Regiao                  | SP                     
 Pais                    | Brasil                 
-RECORD 1-----------------------------------------
 Latitude                | -22.8494               
 Longitude               | -45.2318               
 CondicaoDoTempo         | Part

In [37]:
# Criar view com as previsões
df_weather_region.createOrReplaceTempView('forecasts')

In [81]:
# Criar DF da Tabela 1
Tabela1 = spark.sql("SELECT f.Cidade2 as Cidade,c.id as CodigoDaCidade,f.Data,f.Regiao,f.Pais,f.Latitude,f.Longitude,\
             f.TemperaturaMaxima,f.TemperaturaMinima,f.TemperaturaMedia,f.VaiChover,f.ChanceDeChuva,\
             f.CondicaoDoTempo,f.NascerDoSol,f.PorDoSol,f.VelocidadeMaximaDoVento\
             FROM  forecasts f INNER JOIN cities c ON f.Cidade2 == c.nome ORDER BY f.Data ASC")

In [97]:
# Criar DF da Tabela 2
Tabela2=spark.sql("SELECT f.Cidade2 as Cidade,SUM(if(f.Vaichover = 'Sim',1,0)) as QtdDiasVaiChover,\
                SUM(if(f.Vaichover = 'Não',1,0)) as QtdDiasNaoVaiChover,\
                COUNT(f.Data) as TotalDiasMapeados FROM forecasts f GROUP BY f.Cidade2")

In [110]:
# Exportar CSVs
Tabela1.write.format("csv")\
.mode("overwrite")\
.option('header', 'true') \
.save("Datalake/Tabela1.csv")

Tabela2.write.format("csv")\
.mode("overwrite")\
.option('header', 'true') \
.save("Datalake/Tabela2.csv")
