# Desafio: Consumo de Dados para Previsão do Tempo das Cidades do Vale do Paraíba.

## Objetivo

Avaliar conhecimentos nas linguagens Python e SQL e na engine de processamento Apache Spark.

## Descrição

Neste desafio, você desenvolverá um notebook que será responsável por extrair dados de previsão do tempo das cidades do Vale do Paraíba. Para consultar todas as cidades dessa região, utilizaremos a API do IBGE. No caso, basta realizar uma requisição HTTP com o método GET, utilizando a URL abaixo:

```
https://servicodados.ibge.gov.br/api/v1/localidades/mesorregioes/3513/municipios
```

Com esses dados, gerar um data frame e a partir dele uma temp view. Ex: "cities"

Utilizando os nomes das cidades, deverão ser consultados os dados de previsão de tempo para cada cidade. Para realizar essa consulta, poderá ser utilizada qualquer uma das APIs informadas no link abaixo.

[Public APIs - Wather](https://github.com/public-apis/public-apis#weather)

Obs.: Para algumas, pode ser necessário cadastrar-se para acessar sua API Key. Mas nenhuma delas deve precisar cadastrar cartão de crédito ou adicionar qualquer valor monetário para utilizar. Caso alguma solicite, basta optar por outra.

Com os dados consultados, gerar um data frame e partir dele outra temp view. Ex: "forecasts"

Com as temp views geradas, utilizar Spark SQL para criar queries e gerar data frames das seguintes tabelas:

- Tabela 1: dados de previsão do tempo para os próximos cinco dias, para cada data e cidade consultadas. As colunas dessa tabela serão:
    - Cidade
    - CodigoDaCidade
    - Data
    - Regiao
    - Pais
    - Latitude
    - Longigute
    - TemperaturaMaxima
    - TemperaturaMinima
    - TemperaturaMedia
    - VaiChover
    - ChanceDeChuva
    - CondicaoDoTempo
    - NascerDoSol
    - PorDoSol
    - VelocidadeMaximaDoVento
    
    Obs.: Os valores da coluna "VaiChover" deverá ser "Sim" ou "Não". E a coluna "CodigoDaCidade" é o ID retornado junto com os nomes da cidades na API do IBGE.
    Obs.: Dependendo da API utilizada, algumas colunas podem não existir e ficarão em branco. Você deve optar por uma API que traga o maior número de informações possível.

- Tabela 2: quantidade de dias com chuva e sem chuva para os dias consultados, para cada data consultada. Colunas:
    - Cidade
    - QtdDiasVaiChover
    - QtdDiasNaoVaiChover
    - TotalDiasMapeados

Essas tabelas deverão ser exportadas em formado CSV e entregue no final do desafio.

## To Do

[ ] - Consultar municípios do Vale do Paraíba, gerar um data frame e criar uma temp view com esses dados.
[ ] - Consultar dados do tempo para cada município, gerar um data frame e criar uma outra temp view.
[ ] - Utilizar Spark SQL para gerar os data frames das Tabelas 1 e 2.
[ ] - Exportar os data frames para CSV.

## Atenção

- Existe um limite de requisições de 10000 requests por conta cadastrada na m3o.
- Essa API pode retornar cidades de outras regiões que possuem nome semelhante a alguma cidade do Vale do Paraiba. Pode mantê-las ou filtrar para gerar as tabelas apenas com dados de Regiao = Sao Paulo. Fica a seu critério.




In [1]:

# pyspark
!pip install pyspark

# findspark
!pip install findspark

# jupyterlab
!pip install jupyterlab

!pip install unidecode

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 50 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 51.3 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=395ed1a54afac0bff43e0970cbe7917b82ceea8cb4b6ed351a888e0167750364
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting findspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting unidecode
  Downloading Unidecode-1.3.4-py3-none-any.whl (235 kB)
[K     |████████████████████████████████| 235 kB 7.4 MB/s 
[?25hInstalling collected packages: unidecode
Successfully installed unidecode-1.3.4


In [1]:
import findspark
findspark.init()
import requests, os, shutil
import json
import unidecode
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession.builder \
      .master("local[1]") \
      .appName("SparkByExamples.com") \
      .getOrCreate()

In [None]:
from google.colab import drive
drive.mount('/content/drive')

# Criação dos caminhos que serão utilizados

In [4]:
path_arquivos = '/content/drive/MyDrive/arquivos/'
path_previsao = '/content/drive/MyDrive/arquivos/previsao/'


# Buscando Cidades do vale do Paraíba

In [5]:
#acessando a API e transformando a response json em uma string para criar arquivo no drive
municipios = requests.get('https://servicodados.ibge.gov.br/api/v1/localidades/mesorregioes/3513/municipios')
municipios = municipios.json()
str_municipios = json.dumps(municipios)

# Criando arquivo contendo as informações da API de municipios.

In [6]:
#abrindo o arquivo json
file = open(path_arquivos+'arq.json', "w")
 
#Escreve no arquivo
file.write(str_municipios)
 
#Fecha o arquivo com os dados da api salvos
file.close()


# Criação do DF cities

In [7]:

df_cidades = spark.read.json(path_arquivos+'arq.json')

#Ajuste do dataframe
cities = df_cidades.select(
      col("id"),
      col("nome"),
      col("microrregiao.id").alias("id_microrregiao"),
      col("microrregiao.nome").alias("nome_microrregiao"),
      col("microrregiao.mesorregiao.UF.id").alias("id_uf_microrregiao"),
      col("microrregiao.mesorregiao.UF.nome").alias("nome_uf_microrregiao"),
      col("microrregiao.mesorregiao.UF.sigla").alias("sigla_uf_micorregiao"),
      col("microrregiao.mesorregiao.UF.regiao.id").alias("id_regiao_microrregiao"),
      col("microrregiao.mesorregiao.UF.regiao.nome").alias("nome_regiao_microrregiao"),
      col("microrregiao.mesorregiao.UF.regiao.sigla").alias("sigla_regiao_microrregiao"),
      col("regiao-imediata.id").alias("id_regiao_imediata"),
      col("regiao-imediata.nome").alias("nome_regiao_imediata"),
      col("regiao-imediata.regiao-intermediaria.id").alias("id_regiao_intermediaria"),
      col("regiao-imediata.regiao-intermediaria.nome").alias("nome_regiao_intermediaria"),
      col("regiao-imediata.regiao-intermediaria.UF.id").alias("id_uf_regiao_imediata"),
      col("regiao-imediata.regiao-intermediaria.UF.nome").alias("nome_uf_regiao_imediata"),
      col("regiao-imediata.regiao-intermediaria.UF.sigla").alias("sigla_uf_regiao_imediata"),
      col("regiao-imediata.regiao-intermediaria.UF.regiao.id").alias("id_regiao_regiao_imediata"),
      col("regiao-imediata.regiao-intermediaria.UF.regiao.nome").alias("nome_regiao_regiao_imediata"),
      col("regiao-imediata.regiao-intermediaria.UF.regiao.sigla").alias("sigla_regiao_regiao_imediata"),
)

# CRIAÇÃO DA VIEW CIDADES

In [8]:
cities.createOrReplaceTempView("VW_CIDADES")


#  Buscar previsão do tempo para as cidades

### Lista criada para coletar os nomes das cidades e usar na consulta da API (Consulta o primeiro DF)

In [9]:
lista_cidades = cities.select('nome').rdd.flatMap(lambda x: x).collect()

### Abaixo nós temos a Key da Segunda API de previsão do tempo, ela é limitada por consulta, então na variavel, vai ser a key usada, caso ela expire( Questão na qual poderemos acompanhar em células abaixo, substituir por outra key encontrada no comentario.

In [10]:
key = 'ded6401a'
#  '82f74bad', 'ccadb0c5', '7a06c130'

# Consulta na API com as previsões
#### Dentro do loop também esta criando arquivos contendo as informações da API de cada municipio.

In [11]:
for i in lista_cidades:
  previsao  =requests.get('https://api.hgbrasil.com/weather?key='+key+'&city_name=' + i +',SP')
  previsao = previsao.json()
  str_previsao = json.dumps(previsao)
  #criando o arquivo json
  file = open(path_previsao+ i +'.json', "w")
 
  #Escreve no arquivo
  file.write(str_previsao)
 
  #Fecha o arquivo com os dados da api salvos
  file.close()

# É criado um dataframe com a estrutura dos arquivos json
### Como há arrays  no json, é necessario realizar um explode, em seguida do recolhimento das colunas do array, a coluna array é deletada

In [12]:

df_previsao_cidades = spark.read.json(path_previsao+'*.json')


prev = df_previsao_cidades.select(
    col("results.cid"), 
    col("results.city"), 
    col("results.city_name"), 
    col("results.condition_code"),
    col("results.condition_slug"),
    col("results.currently"),
    col("results.date"),
    col("results.description"),
    explode(col("results.forecast")).alias("element"),
    col("element.condition"), 
    col("element.date").alias('date_forecast'), 
    col("element.description").alias("description_detail"), 
    col("element.max"),
    col("element.min"), 
    col("element.weekday"),
    col("results.humidity"),
    col("results.img_id"),
    col("results.sunrise"),
    col("results.sunset"),
    col("results.temp"),
    col("results.time"),
    col("results.wind_speedy")
)
prev = prev.drop("element")

# Criação da VIEW FORECAST(previsões)

In [13]:
prev.createOrReplaceTempView("VW_FORECAST")

##### Se aparecer alguma tela de erro, é porque a key expirou, caso seja necessário, é possível trocar na variável algumas celulas acima.

In [14]:
df_previsao_cidades.show(40)

+---------+--------------+----------+--------------------+---------+
|       by|execution_time|from_cache|             results|valid_key|
+---------+--------------+----------+--------------------+---------+
|     null|          0.27|     false|{, São José do Ba...|     true|
|city_name|          1.57|     false|{, Tremembé, SP, ...|     true|
|city_name|           0.5|     false|{, Santo Antônio ...|     true|
|    woeid|          0.29|     false|{, Campos do Jord...|     true|
|city_name|          0.16|     false|{, Silveiras, SP,...|     true|
|city_name|          0.58|     false|{, São Bento do S...|     true|
|city_name|          0.55|     false|{, Lagoinha, SP, ...|     true|
|city_name|          0.81|     false|{, Lavrinhas, SP,...|     true|
|city_name|          0.52|     false|{, Paraibuna, SP,...|     true|
|city_name|           0.0|      true|{, São José dos C...|     true|
|city_name|          1.02|     false|{, Cachoeira Paul...|     true|
|city_name|          0.33|     fal

# VIEW CIDADES

In [15]:
spark.sql("SELECT * FROM VW_CIDADES").show(5)

+-------+---------+---------------+-------------------+------------------+--------------------+--------------------+----------------------+------------------------+-------------------------+------------------+--------------------+-----------------------+-------------------------+---------------------+-----------------------+------------------------+-------------------------+---------------------------+----------------------------+
|     id|     nome|id_microrregiao|  nome_microrregiao|id_uf_microrregiao|nome_uf_microrregiao|sigla_uf_micorregiao|id_regiao_microrregiao|nome_regiao_microrregiao|sigla_regiao_microrregiao|id_regiao_imediata|nome_regiao_imediata|id_regiao_intermediaria|nome_regiao_intermediaria|id_uf_regiao_imediata|nome_uf_regiao_imediata|sigla_uf_regiao_imediata|id_regiao_regiao_imediata|nome_regiao_regiao_imediata|sigla_regiao_regiao_imediata|
+-------+---------+---------------+-------------------+------------------+--------------------+--------------------+--------------

# VIEW PREVISÃO

In [16]:
spark.sql("SELECT * FROM VW_FORECAST").show(5)

+---+--------------------+--------------------+--------------+--------------+---------+----------+--------------------+-----------+-------------+------------------+---+---+-------+--------+------+--------+--------+----+-----+-----------+
|cid|                city|           city_name|condition_code|condition_slug|currently|      date|         description|  condition|date_forecast|description_detail|max|min|weekday|humidity|img_id| sunrise|  sunset|temp| time|wind_speedy|
+---+--------------------+--------------------+--------------+--------------+---------+----------+--------------------+-----------+-------------+------------------+---+---+-------+--------+------+--------+--------+----+-----+-----------+
|   |São José do Barre...|São José do Barreiro|            29|         cloud|    noite|09/09/2022|Parcialmente nublado|  clear_day|        09/09|       Tempo limpo| 35| 18|    Sex|      34|   29n|06:00 am|05:50 pm|  27|17:54|  0.92 km/h|
|   |São José do Barre...|São José do Barreiro| 

# DF Tabela 1

In [17]:

#Considerei Tempo nublado e chuvas esparsas como dia de chuva
tabela1 = spark.sql("""SELECT VW_CIDADES.id, VW_CIDADES.nome as cidade,
    CONCAT(VW_FORECAST.date_forecast,'/2022') as data, 
    VW_CIDADES.nome_regiao_microrregiao as regiao,
    VW_FORECAST.max as temp_max,
    VW_FORECAST.min as temp_min,
    (VW_FORECAST.max + VW_FORECAST.min)/2 as temp_media, 
    case when (VW_FORECAST.description_detail in ('Chuvas esparsas','Tempo nublado'))
        then 'Sim' else 'Nao'end as vai_vhover,
    VW_FORECAST.description as condicao_do_tempo,
    VW_FORECAST.description_detail as chance_de_chuva,
    VW_FORECAST.sunrise as nascer_do_sol, 
    VW_FORECAST.sunset as por_do_sol, 
    VW_FORECAST.wind_speedy as velocidade_maxima_do_vento 
    FROM VW_CIDADES 
      LEFT JOIN VW_FORECAST 
        ON VW_CIDADES.nome = VW_FORECAST.city_name 
    WHERE VW_FORECAST.date_forecast in ('09/09', '10/09', '11/09', '12/09','13/09') """)

# DF Tabela 2

In [18]:
tabela2 = spark.sql("""SELECT VW_FORECAST.city_name as cidade,
    count(case when (VW_FORECAST.description_detail in ('Chuvas esparsas','Tempo nublado')) 
      then 1 
    end) qtd_dias_vai_chover, 
    count(case when (VW_FORECAST.description_detail not in ('Chuvas esparsas','Tempo nublado')) 
      then 1 
    end) qtd_dias_sem_chover, 
    count(VW_FORECAST.date_forecast) as total_dias_mapeados 
    FROM VW_FORECAST 
    WHERE VW_FORECAST.date_forecast in ('09/09', '10/09', '11/09', '12/09','13/09')
      GROUP BY VW_FORECAST.city_name""")

# Exportar CSVs

In [19]:
tabela1.repartition(1).write.mode("overwrite").option("header", "true").option("delimiter", ",").option("encoding", "ISO-8859-1").csv(path_arquivos+"tabela1.csv")
tabela2.repartition(1).write.mode("overwrite").option("header", "true").option("delimiter", ",").option("encoding", "ISO-8859-1").csv(path_arquivos+"tabela2.csv")