# Desafio: Consumo de Dados para Previsão do Tempo das Cidades do Vale do Paraíba.

## Objetivo

Avaliar conhecimentos nas linguagens Python e SQL e na engine de processamento Apache Spark.

## Descrição

Neste desafio, você desenvolverá um notebook que será responsável por extrair dados de previsão do tempo das cidades do Vale do Paraíba, região onde se localiza a Dataside. Para consultar todas as cidades dessa região, utilizaremos a API do IBGE. No caso, basta realizar uma requisição HTTP com o método GET, utilizando a URL abaixo:

```
https://servicodados.ibge.gov.br/api/v1/localidades/mesorregioes/3513/municipios
```

Com esses dados, gerar um data frame e a partir dele uma temp view. Ex: "cities"

Utilizando os nomes das cidades, deverão ser consultados os dados de previsão de tempo para cada cidade. Para realizar essa consulta, poderá ser utilizada qualquer uma das APIs informadas no link abaixo.

[Public APIs - Wather](https://github.com/public-apis/public-apis#weather)

Obs.: Para algumas, pode ser necessário cadastrar-se para acessar sua API Key. Mas nenhuma delas deve precisar cadastrar cartão de crédito ou adicionar qualquer valor monetário para utilizar. Caso alguma solicite, basta optar por outra.

Com os dados consultados, gerar um data frame e partir dele outra temp view. Ex: "forecasts"

Com as temp views geradas, utilizar Spark SQL para criar queries e gerar data frames das seguintes tabelas:

- Tabela 1: dados de previsão do tempo para os próximos cinco dias, para cada data e cidade consultadas. As colunas dessa tabela serão:
    - Cidade
    - CodigoDaCidade
    - Data
    - Regiao
    - Pais
    - Latitude
    - Longigute
    - TemperaturaMaxima
    - TemperaturaMinima
    - TemperaturaMedia
    - VaiChover
    - ChanceDeChuva
    - CondicaoDoTempo
    - NascerDoSol
    - PorDoSol
    - VelocidadeMaximaDoVento
    
    Obs.: Os valores da coluna "VaiChover" deverá ser "Sim" ou "Não". E a coluna "CodigoDaCidade" é o ID retornado junto com os nomes da cidades na API do IBGE.
    Obs.: Dependendo da API utilizada, algumas colunas podem não existir e ficarão em branco. Você deve optar por uma API que traga o maior número de informações possível.

- Tabela 2: quantidade de dias com chuva e sem chuva para os dias consultados, para cada data consultada. Colunas:
    - Cidade
    - QtdDiasVaiChover
    - QtdDiasNaoVaiChover
    - TotalDiasMapeados

Essas tabelas deverão ser exportadas em formado CSV e entregue no final do desafio.

## To Do

[ ] - Consultar municípios do Vale do Paraíba, gerar um data frame e criar uma temp view com esses dados.
[ ] - Consultar dados do tempo para cada município, gerar um data frame e criar uma outra temp view.
[ ] - Utilizar Spark SQL para gerar os data frames das Tabelas 1 e 2.
[ ] - Exportar os data frames para CSV.

## Atenção

- Existe um limite de requisições de 10000 requests por conta cadastrada na m3o.
- Essa API pode retornar cidades de outras regiões que possuem nome semelhante a alguma cidade do Vale do Paraiba. Pode mantê-las ou filtrar para gerar as tabelas apenas com dados de Regiao = Sao Paulo. Fica a seu critério.

## Entregando o desafio

Concluindo todos os passos informados em To Do, basta salvar o arquivo .ipynb do notebook e enviar para a Dataside juntamente com os CSVs das duas tabelas.


In [1]:
# pyspark
pip install pyspark

# findspark
pip install findspark

# jupyterlab
pip install jupyterlab

SyntaxError: invalid syntax (304148470.py, line 2)

In [40]:
import findspark
findspark.init()

import requests
import json
import unidecode
from datetime import datetime, timedelta
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date, current_date

spark = SparkSession.builder \
      .master("local[1]") \
      .appName("SparkByExamples.com") \
      .getOrCreate()

In [4]:
# Buscar cidades do Vale do Paraíba
request = requests.get("https://servicodados.ibge.gov.br/api/v1/localidades/mesorregioes/3513/municipios")
cities_info_dict = request.json()

# Criar data frame com as cidades
row_list = []
for city in cities_info_dict:
    row = {'ID_CITY': city['id'], 'CITY_NAME': city['nome'], 'CITY_REGION': city['microrregiao']['mesorregiao']['UF']['regiao']['nome']}
    row_list.append(row)
    
df_cities = spark.createDataFrame(row_list, schema='ID_CITY int, CITY_NAME string, CITY_REGION string')

# Criar view com as cidades
df_cities.createOrReplaceTempView('CITIES')


In [74]:
# Seleciona a data incial (dia atual) e data final (5 dias após o dia atual)
initial_date = datetime.today().date()
end_date  = initial_date + timedelta(days=6)
days = abs(initial_date - end_date).days


In [75]:
# Buscar previsão do tempo para as cidades
translate_map = str.maketrans({'é': 'e', 'í': 'i', 'á': 'a', 'ç': 'c', 'ã': 'a', 'ô': 'o'})
row_list = []
for city in df_cities.collect():
    # API para buscar as coordenadas geográficas (latitude e longitude)
    geographic_coordinates_API = f"https://nominatim.openstreetmap.org/search?city={city['CITY_NAME']}&state=SP&country=BR&format=json"

    # Solicita os dados geográficos
    request = requests.get(geographic_coordinates_API)
    request_dict = request.json()

    latitude = request_dict[0]['lat']
    longitude = request_dict[0]['lon']

    # API para previsão do tempo
    forecast_weather_API = f"http://api.weatherapi.com/v1/forecast.json?key=c499600de8e14cb6804160909230701&q={latitude},{longitude}&days={days}&aqi=no&alerts=no&lang=pt"

    # Solicita dados de previsão do tempo da cidade
    request = requests.get(forecast_weather_API)
    forecast_request_dict = request.json()
    
    for d in range(0, days):
        forecast_day_dict = forecast_request_dict['forecast']['forecastday'][d]
        location_dict = forecast_request_dict['location']
        
        # Formatação de datas e horários
        date = datetime.fromisoformat(forecast_day_dict['date']).date()
        sunrise_time = datetime.strptime((forecast_day_dict['date'] + forecast_day_dict['astro']['sunrise']), '%Y-%m-%d%I:%M %p')
        sunset_time = datetime.strptime((forecast_day_dict['date'] + forecast_day_dict['astro']['sunset']), '%Y-%m-%d%I:%M %p')

        # Linha do registro de previsão do tempo
        row = {'Cidade': city['CITY_NAME'].translate(translate_map),
               'CodigoDaCidade': city['ID_CITY'],
               'Regiao': city['CITY_REGION'],
               'Data': date,
               'Pais': location_dict['country'],
               'Latitude': float(latitude),
               'Longitude': float(longitude),
               'TemperaturaMaxima': round(forecast_day_dict['day']['maxtemp_c']),
               'TemperaturaMinima': round(forecast_day_dict['day']['mintemp_c']),
               'TemperaturaMedia': round(forecast_day_dict['day']['avgtemp_c']),
               'VaiChover': forecast_day_dict['day']['daily_will_it_rain'],
               'ChanceDeChuva': forecast_day_dict['day']['daily_chance_of_rain'],
               'CondicaoDoTempo': forecast_day_dict['day']['condition']['text'],
               'NascerDoSol': sunrise_time,
               'PorDoSol': sunset_time,
               'VelocidadeMaximaDoVento': forecast_day_dict['day']['maxwind_mph']}

        row_list.append(row)
    
# Criar data frame com as previsões
df_forecast = spark.createDataFrame(row_list, schema='Cidade string, CodigoDaCidade int, Regiao string,\
                                                      Data date, Pais string, Latitude float, Longitude float,\
                                                      TemperaturaMaxima int, TemperaturaMinima int, TemperaturaMedia int,\
                                                      VaiChover int, ChanceDeChuva int, CondicaoDoTempo string,\
                                                      NascerDoSol timestamp, PorDoSol timestamp, VelocidadeMaximaDoVento float')

# Criar view com as previsões
df_forecast.createOrReplaceTempView('FORECASTS')



In [76]:
# Criar DF da Tabela 1
df_table1 = spark.sql(f'SELECT Cidade, CodigoDaCidade, Regiao, Data,\
                       Pais, Latitude, Longitude, TemperaturaMaxima,\
                       TemperaturaMinima, TemperaturaMedia,\
                       case VaiChover WHEN 1 THEN "Sim" WHEN 0 THEN "Não" END VaiChover,\
                       ChanceDeChuva,\
                       CondicaoDoTempo,\
                       date_format(NascerDoSol, "HH:mm:ss") NascerDoSol,\
                       date_format(PorDoSol, "HH:mm:ss") PorDoSol,\
                       VelocidadeMaximaDoVento\
                       FROM FORECASTS\
                       WHERE Data BETWEEN "{initial_date}" AND "{end_date}"')


In [77]:
# Criar DF da Tabela 2
df_table2 = spark.sql(f'SELECT Cidade, \
                       COUNT(CASE VaiChover WHEN 1 THEN 1 END) AS QtdDiasVaiChover, \
                       COUNT(CASE VaiChover WHEN 0 THEN 1 END) AS QtdDiasNaoVaiChover, \
                       COUNT(Data) AS TotalDiasMapeados \
                       FROM FORECASTS \
                       WHERE Data BETWEEN "{initial_date}" AND "{end_date}" \
                       GROUP BY Cidade')


In [78]:
# Exportar CSVs
df_table1.write.option('header', True).csv(r"C:\Tabela1", mode='overwrite')
df_table2.write.option('header', True).csv(r"C:\Tabela2", mode='overwrite')