# Desafio: Consumo de Dados para Previsão do Tempo das Cidades do Vale do Paraíba.

## Objetivo

Avaliar conhecimentos nas linguagens Python e SQL e na engine de processamento Apache Spark.

## Descrição

Neste desafio, você desenvolverá um notebook que será responsável por extrair dados de previsão do tempo das cidades do Vale do Paraíba, região onde se localiza a Dataside. Para consultar todas as cidades dessa região, utilizaremos a API do IBGE. No caso, basta realizar uma requisição HTTP com o método GET, utilizando a URL abaixo:

```
https://servicodados.ibge.gov.br/api/v1/localidades/mesorregioes/3513/municipios
```

Com esses dados, gerar um data frame e a partir dele uma temp view. Ex: "cities"

Utilizando os nomes das cidades, deverão ser consultados os dados de previsão de tempo para cada cidade. Para realizar essa consulta, poderá ser utilizada qualquer uma das APIs informadas no link abaixo.

[Public APIs - Wather](https://github.com/public-apis/public-apis#weather)

Obs.: Para algumas, pode ser necessário cadastrar-se para acessar sua API Key. Mas nenhuma delas deve precisar cadastrar cartão de crédito ou adicionar qualquer valor monetário para utilizar. Caso alguma solicite, basta optar por outra.

Com os dados consultados, gerar um data frame e partir dele outra temp view. Ex: "forecasts"

Com as temp views geradas, utilizar Spark SQL para criar queries e gerar data frames das seguintes tabelas:

- Tabela 1: dados de previsão do tempo para os próximos cinco dias, para cada data e cidade consultadas. As colunas dessa tabela serão:
    - Cidade
    - CodigoDaCidade
    - Data
    - Regiao
    - Pais
    - Latitude
    - Longigute
    - TemperaturaMaxima
    - TemperaturaMinima
    - TemperaturaMedia
    - VaiChover
    - ChanceDeChuva
    - CondicaoDoTempo
    - NascerDoSol
    - PorDoSol
    - VelocidadeMaximaDoVento
    
    Obs.: Os valores da coluna "VaiChover" deverá ser "Sim" ou "Não". E a coluna "CodigoDaCidade" é o ID retornado junto com os nomes da cidades na API do IBGE.
    Obs.: Dependendo da API utilizada, algumas colunas podem não existir e ficarão em branco. Você deve optar por uma API que traga o maior número de informações possível.

- Tabela 2: quantidade de dias com chuva e sem chuva para os dias consultados, para cada data consultada. Colunas:
    - Cidade
    - QtdDiasVaiChover
    - QtdDiasNaoVaiChover
    - TotalDiasMapeados

Essas tabelas deverão ser exportadas em formado CSV e entregue no final do desafio.

## To Do

[ ] - Consultar municípios do Vale do Paraíba, gerar um data frame e criar uma temp view com esses dados.
[ ] - Consultar dados do tempo para cada município, gerar um data frame e criar uma outra temp view.
[ ] - Utilizar Spark SQL para gerar os data frames das Tabelas 1 e 2.
[ ] - Exportar os data frames para CSV.

## Atenção

- Existe um limite de requisições de 10000 requests por conta cadastrada na m3o.
- Essa API pode retornar cidades de outras regiões que possuem nome semelhante a alguma cidade do Vale do Paraiba. Pode mantê-las ou filtrar para gerar as tabelas apenas com dados de Regiao = Sao Paulo. Fica a seu critério.

## Entregando o desafio

Concluindo todos os passos informados em To Do, basta salvar o arquivo .ipynb do notebook e enviar para a Dataside juntamente com os CSVs das duas tabelas.


In [68]:
pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [69]:
pip install findspark


Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [70]:
!pip install unidecode

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [71]:
# Importanto bibliotecas e iniciando findspark
import findspark
findspark.init()

import requests
import json
import unidecode
from pyspark.sql import SparkSession, SQLContext, Row, Window
from pyspark import SparkContext, SparkConf
from pyspark.sql.types import *
from pyspark.sql import functions as f
from datetime import timedelta, datetime, date
import unicodedata
import re

# Definindo parâmetros com o SparkConf 
# SparkConf seta um nome para a aplicação e a URL para o meu cluster distribuído 
spark_conf = SparkConf() \
      .setAppName("dataside") \
      .setMaster("local[*]")

sc = SparkContext.getOrCreate();
sqlContext = SQLContext(sc)

spark = SparkSession.builder \
      .master("local[*]") \
      .appName("dataside") \
      .getOrCreate()



In [72]:
# Buscando cidades do Vale do Paraíba usando a API do IBGE indicada

# 1. Realizando a requsição HTTP usando o método get
r = requests.get("https://servicodados.ibge.gov.br/api/v1/localidades/mesorregioes/3513/municipios") 

vale_paraiba = r.json()

# 1.1 Mostrando os dados em forma não estruturada - JSON
vale_paraiba

[{'id': 3502507,
  'nome': 'Aparecida',
  'microrregiao': {'id': 35051,
   'nome': 'Guaratinguetá',
   'mesorregiao': {'id': 3513,
    'nome': 'Vale do Paraíba Paulista',
    'UF': {'id': 35,
     'sigla': 'SP',
     'nome': 'São Paulo',
     'regiao': {'id': 3, 'sigla': 'SE', 'nome': 'Sudeste'}}}},
  'regiao-imediata': {'id': 350052,
   'nome': 'Guaratinguetá',
   'regiao-intermediaria': {'id': 3511,
    'nome': 'São José dos Campos',
    'UF': {'id': 35,
     'sigla': 'SP',
     'nome': 'São Paulo',
     'regiao': {'id': 3, 'sigla': 'SE', 'nome': 'Sudeste'}}}}},
 {'id': 3503158,
  'nome': 'Arapeí',
  'microrregiao': {'id': 35052,
   'nome': 'Bananal',
   'mesorregiao': {'id': 3513,
    'nome': 'Vale do Paraíba Paulista',
    'UF': {'id': 35,
     'sigla': 'SP',
     'nome': 'São Paulo',
     'regiao': {'id': 3, 'sigla': 'SE', 'nome': 'Sudeste'}}}},
  'regiao-imediata': {'id': 350053,
   'nome': 'Cruzeiro',
   'regiao-intermediaria': {'id': 3511,
    'nome': 'São José dos Campos',
   

In [73]:
# 2. Criar Dataframe com as cidades

dataframe = spark.read.json(sc.parallelize(vale_paraiba))

dataframe.show(truncate=False)

+-------+------------------------------------------------------------------------------------------------------+------------------+---------------------------------------------------------------------------------------------------------------------+
|id     |microrregiao                                                                                          |nome              |regiao-imediata                                                                                                      |
+-------+------------------------------------------------------------------------------------------------------+------------------+---------------------------------------------------------------------------------------------------------------------+
|3502507|{35051, {{35, São Paulo, {3, Sudeste, SE}, SP}, 3513, Vale do Paraíba Paulista}, Guaratinguetá}       |Aparecida         |{350052, Guaratinguetá, {{35, São Paulo, {3, Sudeste, SE}, SP}, 3511, São José dos Campos}}                          |


In [74]:
# 3. Criar view com as cidades

# Função para remover os caracteres especiais
def format_cities(string):
    normalized = unicodedata.normalize('NFD', string)
    return normalized.encode('ascii', 'ignore').decode('utf8').casefold().title()

## 3.1 Criando lista com o nome de todas as cidades (Flatmap coleta todos os valores das partições e coloca em uma só lista)
cities_list = dataframe.select("nome").rdd.flatMap(lambda n: n).collect()

### Lista das cidades formatadas para criação do futuro DF1 - Inserindo no DF do IBGE, possibilitando fazer JOIN com o DF da API
formated_cities = []

### Inserindo cidades formatadas na lista formated_cities
for city in cities_list:
  formated_city = format_cities(city)
  formated_cities.append(formated_city)


In [75]:
# Criando colunas de um novo DF

columns =  ['nome_formatado', 'nome_original']

# Zipando as duas listas num mesmo DF
dataframe_2 = spark.createDataFrame(zip(formated_cities, cities_list), columns)

# Agregando dataframe_2 ao meu dataframe original
final_df_ibge = dataframe.join(dataframe_2, dataframe.nome == dataframe_2.nome_original)

# # Dropando as colunas utilizada para join
final_df_ibge = final_df_ibge.drop(f.col("nome_original"))

final_df_ibge.show(truncate=False)

+-------+------------------------------------------------------------------------------------------------------+------------------+---------------------------------------------------------------------------------------------------------------------+------------------+
|id     |microrregiao                                                                                          |nome              |regiao-imediata                                                                                                      |nome_formatado    |
+-------+------------------------------------------------------------------------------------------------------+------------------+---------------------------------------------------------------------------------------------------------------------+------------------+
|3502507|{35051, {{35, São Paulo, {3, Sudeste, SE}, SP}, 3513, Vale do Paraíba Paulista}, Guaratinguetá}       |Aparecida         |{350052, Guaratinguetá, {{35, São Paulo, {3, Sudeste, SE}, SP}

In [76]:
# Criando a View

final_df_ibge.createOrReplaceTempView('cities')

# all_cities = spark.sql("SELECT * from cities")

cities_names = spark.sql("SELECT nome_formatado FROM cities")
cities_names.show()

+------------------+
|    nome_formatado|
+------------------+
|         Aparecida|
|            Arapei|
|            Areias|
|           Bananal|
|Cachoeira Paulista|
|  Campos Do Jordao|
|             Canas|
|     Caraguatatuba|
|          Cacapava|
|          Cruzeiro|
|             Cunha|
|     Guaratingueta|
|           Igarata|
|          Ilhabela|
|           Jacarei|
|          Jambeiro|
|          Lagoinha|
|         Lavrinhas|
|            Lorena|
|   Monteiro Lobato|
+------------------+
only showing top 20 rows



In [77]:
formated_cities

['Aparecida',
 'Arapei',
 'Areias',
 'Bananal',
 'Cacapava',
 'Cachoeira Paulista',
 'Campos Do Jordao',
 'Canas',
 'Caraguatatuba',
 'Cruzeiro',
 'Cunha',
 'Guaratingueta',
 'Igarata',
 'Ilhabela',
 'Jacarei',
 'Jambeiro',
 'Lagoinha',
 'Lavrinhas',
 'Lorena',
 'Monteiro Lobato',
 'Natividade Da Serra',
 'Paraibuna',
 'Pindamonhangaba',
 'Piquete',
 'Potim',
 'Queluz',
 'Redencao Da Serra',
 'Roseira',
 'Santa Branca',
 'Santo Antonio Do Pinhal',
 'Sao Bento Do Sapucai',
 'Sao Jose Do Barreiro',
 'Sao Jose Dos Campos',
 'Sao Luiz Do Paraitinga',
 'Sao Sebastiao',
 'Silveiras',
 'Taubate',
 'Tremembe',
 'Ubatuba']

In [78]:
# Lista das cidades formatadas para ser repassada para a API do Forecast (com underline e minúsculo)
cities_api = []

for city in formated_cities:
  formated_city_api = city.replace(' ', '_').lower()
  cities_api.append(formated_city_api)

cities_api

['aparecida',
 'arapei',
 'areias',
 'bananal',
 'cacapava',
 'cachoeira_paulista',
 'campos_do_jordao',
 'canas',
 'caraguatatuba',
 'cruzeiro',
 'cunha',
 'guaratingueta',
 'igarata',
 'ilhabela',
 'jacarei',
 'jambeiro',
 'lagoinha',
 'lavrinhas',
 'lorena',
 'monteiro_lobato',
 'natividade_da_serra',
 'paraibuna',
 'pindamonhangaba',
 'piquete',
 'potim',
 'queluz',
 'redencao_da_serra',
 'roseira',
 'santa_branca',
 'santo_antonio_do_pinhal',
 'sao_bento_do_sapucai',
 'sao_jose_do_barreiro',
 'sao_jose_dos_campos',
 'sao_luiz_do_paraitinga',
 'sao_sebastiao',
 'silveiras',
 'taubate',
 'tremembe',
 'ubatuba']

In [79]:
# 4. Buscar previsão do tempo para as cidades (Usando API do WeatherAPI)
### Free Trial da WeatherAPI é a mais completa entre as APIs que tomei como amostra

## 4.1 Coletando o dia atual e os próximos 4 dias
dates = []

for i in range(5):
  chosen_day = datetime.today() + timedelta(i)
  day = chosen_day.strftime("%Y-%m-%d")
  dates.append(day)

dates

['2022-09-04', '2022-09-05', '2022-09-06', '2022-09-07', '2022-09-08']

In [80]:
## 4.2 Criando a lista que adicionará as previsões do tempo das cidades
forecast = []
api_key = "b3ddd1c9d3774e7180d222959220209"

### Fazendo o request de acordo com a documentação da WeatherAPI

for city in cities_api:
  path_api = f"http://api.weatherapi.com/v1/forecast.json?key={api_key}&q={city}&days=5&hour=12&lang=pt"
  request_api = requests.get(path_api)
  request_json = request_api.json()
  forecast.append(request_json)

In [81]:
# 5. Criar data frame com as previsões

df_forecast = spark.read.json(sc.parallelize(forecast))

# Selecionando as colunas que serão utilizadas na view
df_forecast2 = df_forecast.select(f.col('forecast.forecastday.date'), f.col('location.country'), 
                                  f.col('location.lat'), f.col('location.lon'), f.col('forecast.forecastday.day.maxtemp_c'),
                                  f.col('forecast.forecastday.day.mintemp_c'),
                                  f.col('forecast.forecastday.day.avgtemp_c'), f.col('forecast.forecastday.day.condition.text'),
                                  f.col('forecast.forecastday.astro.sunrise'), f.col('forecast.forecastday.astro.sunset'),
                                  f.col('forecast.forecastday.day.maxwind_kph'), f.col('forecast.forecastday.day.daily_will_it_rain'),
                                  f.col('forecast.forecastday.day.daily_chance_of_rain'), f.col('location.name'), f.col('location.region'))

# Criando lista com colunas para o forecast do tipo array para dar o explode
colunas_forecast = []
for i in df_forecast2.dtypes:
  if re.search("array", i[1]):
    colunas_forecast.append(i[0])

# Criando coluna temporária para zipar e dando o explode, utilizando as colunas da lista colunas_forecast
df_forecast3 = df_forecast2.withColumn("tmp", f.explode_outer(f.arrays_zip(*colunas_forecast)))

# Criando lista com todas as colunas, derivadas do forecast2 e com a lista temporária
colunas = []
for i in df_forecast2.dtypes: 
    if re.search("array", i[1]):
      colunas.append('tmp.'+i[0])

# Forecast final
forecast_final = df_forecast3.select(f.col('country'), f.col('lat'), f.col('lon'), f.col('name'), f.col('region'), *colunas)

forecast_final.show()

+-------+------+------+---------+---------+----------+---------+---------+---------+--------------------+--------+--------+-----------+------------------+--------------------+
|country|   lat|   lon|     name|   region|      date|maxtemp_c|mintemp_c|avgtemp_c|                text| sunrise|  sunset|maxwind_kph|daily_will_it_rain|daily_chance_of_rain|
+-------+------+------+---------+---------+----------+---------+---------+---------+--------------------+--------+--------+-----------+------------------+--------------------+
| Brazil|-22.83|-45.23|Aparecida|Sao Paulo|2022-09-04|     15.6|     13.8|     15.0|Possibilidade de ...|06:08 AM|05:52 PM|        6.8|                 1|                  77|
| Brazil|-22.83|-45.23|Aparecida|Sao Paulo|2022-09-05|     26.4|     12.5|     17.4|Possibilidade de ...|06:07 AM|05:52 PM|       10.8|                 1|                  86|
| Brazil|-22.83|-45.23|Aparecida|Sao Paulo|2022-09-06|     29.0|     12.7|     18.3|Possibilidade de ...|06:06 AM|05:52 

In [82]:
# 6. Criar view com as previsões
forecast_final.createOrReplaceTempView("forecast")

In [83]:
# Observando os Schemas para juntar no Pyspark SQL
final_df_ibge.printSchema()
forecast_final.printSchema()

root
 |-- id: long (nullable = true)
 |-- microrregiao: struct (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- mesorregiao: struct (nullable = true)
 |    |    |-- UF: struct (nullable = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |    |-- nome: string (nullable = true)
 |    |    |    |-- regiao: struct (nullable = true)
 |    |    |    |    |-- id: long (nullable = true)
 |    |    |    |    |-- nome: string (nullable = true)
 |    |    |    |    |-- sigla: string (nullable = true)
 |    |    |    |-- sigla: string (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- nome: string (nullable = true)
 |    |-- nome: string (nullable = true)
 |-- nome: string (nullable = true)
 |-- regiao-imediata: struct (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- nome: string (nullable = true)
 |    |-- regiao-intermediaria: struct (nullable = true)
 |    |    |-- UF: struct (nullable = true)
 |    |    |    |-- id: long (n

In [84]:
df_tabela1 = spark.sql("""SELECT 
                        CT.nome as Cidade,
                        CT.id as CodigoDaCidade,
                        FC.date as Data,
                        FC.region as Regiao,
                        FC.country as Pais,
                        FC.lat as Latitude,
                        FC.lon as Longigute,
                        FC.maxtemp_c as TemperaturaMaxima,
                        FC.mintemp_c as TemperaturaMinima,
                        FC.avgtemp_c as TemperaturaMedia,
                          CASE WHEN FC.daily_will_it_rain = 0 THEN 'Não' 
                          ELSE 'Sim' 
                        END as VaiChover,
                        FC.daily_chance_of_rain as ChanceDeChuva,
                        FC.text as CondicaoDoTempo,
                        FC.sunrise as NascerDoSol,
                        FC.sunset as PorDoSol,
                        FC.maxwind_kph as VelocidadeMaximaDoVento
                        FROM cities as CT
                        INNER JOIN forecast as FC
                        ON CT.nome_formatado = FC.name
                        WHERE FC.country = 'Brazil'
                        """)

In [85]:
df_tabela1.toPandas()

Unnamed: 0,Cidade,CodigoDaCidade,Data,Regiao,Pais,Latitude,Longigute,TemperaturaMaxima,TemperaturaMinima,TemperaturaMedia,VaiChover,ChanceDeChuva,CondicaoDoTempo,NascerDoSol,PorDoSol,VelocidadeMaximaDoVento
0,Aparecida,3502507,2022-09-04,Sao Paulo,Brazil,-22.83,-45.23,15.6,13.8,15.0,Sim,77,Possibilidade de chuva irregular,06:08 AM,05:52 PM,6.8
1,Aparecida,3502507,2022-09-05,Sao Paulo,Brazil,-22.83,-45.23,26.4,12.5,17.4,Sim,86,Possibilidade de chuva irregular,06:07 AM,05:52 PM,10.8
2,Aparecida,3502507,2022-09-06,Sao Paulo,Brazil,-22.83,-45.23,29.0,12.7,18.3,Sim,88,Possibilidade de chuva irregular,06:06 AM,05:52 PM,14.4
3,Aparecida,3502507,2022-09-07,Sao Paulo,Brazil,-22.83,-45.23,26.1,11.3,17.0,Sim,83,Chuva moderada,06:05 AM,05:53 PM,11.9
4,Aparecida,3502507,2022-09-08,Sao Paulo,Brazil,-22.83,-45.23,34.2,10.2,20.1,Não,0,Sol,06:04 AM,05:53 PM,13.3
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
130,Tremembé,3554805,2022-09-04,Sao Paulo,Brazil,-22.97,-45.55,14.4,12.5,13.6,Sim,89,Possibilidade de chuva irregular,06:10 AM,05:53 PM,10.1
131,Tremembé,3554805,2022-09-05,Sao Paulo,Brazil,-22.97,-45.55,25.8,10.7,16.3,Sim,86,Possibilidade de chuva irregular,06:09 AM,05:53 PM,16.6
132,Tremembé,3554805,2022-09-06,Sao Paulo,Brazil,-22.97,-45.55,28.9,13.1,17.7,Sim,88,Possibilidade de chuva irregular,06:08 AM,05:54 PM,12.6
133,Tremembé,3554805,2022-09-07,Sao Paulo,Brazil,-22.97,-45.55,27.1,11.6,16.5,Sim,86,Possibilidade de chuva irregular,06:07 AM,05:54 PM,17.6


In [86]:
# Criando view da Tabela 1
df_tabela1.createOrReplaceTempView("df1")

In [87]:
# Criando DF da Tabela 2 utilizando Soma e IF
df_tabela2 = spark.sql("""SELECT 
                            Cidade, 
                            SUM(IF(VaiChover='Sim', 1, 0)) as QtdDiasVaiChover, 
                            SUM(IF(VaiChover='Não', 1, 0)) as QtdDiasNaoVaiChover,
                            COUNT(Cidade) as TotalDiasMapeados
                          FROM df1 
                          GROUP BY Cidade""")

df_tabela2.show()

+-------------------+----------------+-------------------+-----------------+
|             Cidade|QtdDiasVaiChover|QtdDiasNaoVaiChover|TotalDiasMapeados|
+-------------------+----------------+-------------------+-----------------+
|          Paraibuna|               3|                  2|                5|
|          Aparecida|               4|                  1|                5|
|            Igaratá|               3|                  2|                5|
|             Arapeí|               2|                  3|                5|
|       Santa Branca|               3|                  2|                5|
|    Pindamonhangaba|               4|                  1|                5|
|          Lavrinhas|               0|                  5|                5|
|   Campos do Jordão|               5|                  0|                5|
|           Ilhabela|               5|                  0|                5|
|São José dos Campos|               3|                  2|                5|

In [88]:
# Exportando DFs para CSVs
df_tabela1.write.csv(
    path='/content/dataside_df1',
    mode='overwrite',
    header=True
)

df_tabela2.write.csv(
    path='/content/dataside_df2',
    mode='overwrite',
    header=True
)