# Desafio: Consumo de Dados para Previsão do Tempo das Cidades do Vale do Paraíba.

## Objetivo

Avaliar conhecimentos nas linguagens Python e SQL e na engine de processamento Apache Spark.

## Descrição

Neste desafio, você desenvolverá um notebook que será responsável por extrair dados de previsão do tempo das cidades do Vale do Paraíba, região onde se localiza a Dataside. Para consultar todas as cidades dessa região, utilizaremos a API do IBGE. No caso, basta realizar uma requisição HTTP com o método GET, utilizando a URL abaixo:

```
https://servicodados.ibge.gov.br/api/v1/localidades/mesorregioes/3513/municipios
```

Com esses dados, gerar um data frame e a partir dele uma temp view. Ex: "cities"

Utilizando os nomes das cidades, deverão ser consultados os dados de previsão de tempo para cada cidade. Para realizar essa consulta, poderá ser utilizada qualquer uma das APIs informadas no link abaixo.

[Public APIs - Wather](https://github.com/public-apis/public-apis#weather)

Obs.: Para algumas, pode ser necessário cadastrar-se para acessar sua API Key. Mas nenhuma delas deve precisar cadastrar cartão de crédito ou adicionar qualquer valor monetário para utilizar. Caso alguma solicite, basta optar por outra.

Com os dados consultados, gerar um data frame e partir dele outra temp view. Ex: "forecasts"

Com as temp views geradas, utilizar Spark SQL para criar queries e gerar data frames das seguintes tabelas:

- Tabela 1: dados de previsão do tempo para os próximos cinco dias, para cada data e cidade consultadas. As colunas dessa tabela serão:
    - Cidade
    - CodigoDaCidade
    - Data
    - Regiao
    - Pais
    - Latitude
    - Longigute
    - TemperaturaMaxima
    - TemperaturaMinima
    - TemperaturaMedia
    - VaiChover
    - ChanceDeChuva
    - CondicaoDoTempo
    - NascerDoSol
    - PorDoSol
    - VelocidadeMaximaDoVento
    
    Obs.: Os valores da coluna "VaiChover" deverá ser "Sim" ou "Não". E a coluna "CodigoDaCidade" é o ID retornado junto com os nomes da cidades na API do IBGE.
    Obs.: Dependendo da API utilizada, algumas colunas podem não existir e ficarão em branco. Você deve optar por uma API que traga o maior número de informações possível.

- Tabela 2: quantidade de dias com chuva e sem chuva para os dias consultados, para cada data consultada. Colunas:
    - Cidade
    - QtdDiasVaiChover
    - QtdDiasNaoVaiChover
    - TotalDiasMapeados

Essas tabelas deverão ser exportadas em formado CSV e entregue no final do desafio.

## To Do

[ ] - Consultar municípios do Vale do Paraíba, gerar um data frame e criar uma temp view com esses dados.
[ ] - Consultar dados do tempo para cada município, gerar um data frame e criar uma outra temp view.
[ ] - Utilizar Spark SQL para gerar os data frames das Tabelas 1 e 2.
[ ] - Exportar os data frames para CSV.

## Atenção

- Existe um limite de requisições de 10000 requests por conta cadastrada na m3o.
- Essa API pode retornar cidades de outras regiões que possuem nome semelhante a alguma cidade do Vale do Paraiba. Pode mantê-las ou filtrar para gerar as tabelas apenas com dados de Regiao = Sao Paulo. Fica a seu critério.

## Entregando o desafio

Concluindo todos os passos informados em To Do, basta salvar o arquivo .ipynb do notebook e enviar para a Dataside juntamente com os CSVs das duas tabelas.


In [None]:
# Dependências do SPARK
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark

In [None]:
# Importações 
import os
import findspark
import requests
import json
from pyspark.sql import SparkSession

In [None]:
# Configuração das variáveis de ambiente

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

# tornar o pyspark "importável"
findspark.init('spark-2.4.4-bin-hadoop2.7')

# Inicia sessão spark
spark = SparkSession.builder \
      .master("local[1]") \
      .appName("SparkByExamples.com") \
      .getOrCreate()

Extrair dados de previsão do tempo das cidades do Vale do Paraíba, região onde se localiza a Dataside. Para consultar todas as cidades dessa região, utilizaremos a API do IBGE. No caso, basta realizar uma requisição HTTP com o método GET, utilizando a URL abaixo:

```
https://servicodados.ibge.gov.br/api/v1/localidades/mesorregioes/3513/municipios
```
Com esses dados, gerar um data frame e a partir dele uma temp view. Ex: "cities"

In [None]:
def get_cities(url):
    """
        Recebe como parâmetro a URL da API IBGE
        Retorna uma lista de cidades e algumas informações sobre elas, como: 
        - Código da cidade usada na API IBGE
        - Nome da cidade
        - Nome do estado 
        - Sigla do estado
        - Nome da região 
        - Sigla da Região
    """
    response_city = requests.get(url)
    response_city = response_city.json()
    
    cities = list()

    for i in response_city: 
        cities.append(
            dict(
                cidade_id = i.get('id')
                , cidade_nome = i.get('nome') 
                , estado_nome = i.get('microrregiao').get('mesorregiao').get('UF').get('nome')
                , estado_sigla = i.get('microrregiao').get('mesorregiao').get('UF').get('sigla')
                , regiao_nome = i.get('microrregiao').get('mesorregiao').get('UF').get('regiao').get('nome')
                , regiao_sigla = i.get('microrregiao').get('mesorregiao').get('UF').get('regiao').get('sigla')
            )
        )

    return cities

In [None]:
# Buscar cidades do Vale do Paraíba
resp_cities = get_cities('https://servicodados.ibge.gov.br/api/v1/localidades/mesorregioes/3513/municipios')

# Criar data frame com as cidades
df_cidades = spark.createDataFrame(resp_cities)

# Criar view com as cidades
df_cidades.createOrReplaceTempView("cities")

In [None]:
#API_KEY_ACCS
op1 = 'sDDNoolJDvW2ftkQQ67WhtvwlWVHJaph'
op2 = 'tWjNMi1PjXvh4A5ZLy8kSYEoSBNtvmWg'

In [None]:
#Configuração parâmetros API Wather
api_wather_settings = {
    'url_base': 'http://dataservice.accuweather.com/'
    , 'api_key_value': op1
}

In [None]:
def get_location_info(city):
    """
        Recebe como parâmetro o nome da cidade
        Retorna informações sobre a localidade, como: 
        - location_key (chave usada na busca da API)
        - País
        - Latitude
        - Longitude
    """
    url_loc_key = api_wather_settings.get('url_base','') + 'locations/v1/cities/search?apikey='\
            + api_wather_settings.get('api_key_value','') + '&q=' + city + '&language=pt-br'

    response_loc_key = requests.get(url_loc_key)
    response_loc_key = response_loc_key.json()
    response_loc_key = response_loc_key[0]

    location_info = dict(
        key = response_loc_key.get('Key')
        , country = response_loc_key.get('Country').get('LocalizedName')
        , lat = response_loc_key.get('GeoPosition').get('Latitude')
        , lon = response_loc_key.get('GeoPosition').get('Longitude')
    )

    return location_info

Utilizando os nomes das cidades, deverão ser consultados os dados de previsão de tempo para cada cidade. Para realizar essa consulta, poderá ser utilizada qualquer uma das APIs informadas no link abaixo.

[Public APIs - Wather](https://github.com/public-apis/public-apis#weather)

Com os dados consultados, gerar um data frame e partir dele outra temp view. Ex: "forecasts"

In [None]:
def get_forecasts(column, table, limit=''):
    """
        Recebe como parâmetro 
            - Coluna a ser consultada*
            - Tabela a ser consultada*
            - Limite de cidades a serem consultadas (Opcional) 

        Retorna previsão do tempo de 5 dias para cada cidade consultada
    """
    cities = spark.sql(f'select {column} from {table} limit {limit}').rdd.flatMap(lambda x: x).collect()

    forecasts = list()

    for city in cities:
        """
            Como a API escolhida tem limite de requisição diária
            Aproveitamos a requisição para obter location_key para obter dados geográficos
        """
        # Busca informações geográfica da cidade
        location_info = get_location_info(city)
        
        # Monta url para buscar forecast
        location_key = location_info.get('key','') 
        url_forecast = api_wather_settings.get('url_base','') + 'forecasts/v1/daily/5day/'+ location_key\
            + '?apikey=' + api_wather_settings.get('api_key_value','') + '&language=pt-br&details=true'

        response_forecast = requests.get(url_forecast)
        response_forecast = response_forecast.json()
        response_forecast = response_forecast['DailyForecasts']

        for day in response_forecast: 
            # Monta lista de forecasts para cada cidade
            forecasts.append(
                dict(
                     cidade_nome = city
                     , pais = location_info.get('country')
                     , latitude = location_info.get('lat')
                     , longitude = location_info.get('lon')
                     , data = day.get('Date')
                     , temperatura_minima = day.get('Temperature').get('Minimum').get('Value') 
                     , temperatura_maxima = day.get('Temperature').get('Maximum').get('Value') 
                     , temperatura_media = (day.get('Temperature').get('Maximum').get('Value')\
                                            + day.get('Temperature').get('Minimum').get('Value')) / 2
                     , vai_chover = day.get('Day').get('HasPrecipitation')
                     , chance_de_chuva = day.get('Day').get('RainProbability')
                     , condicao_do_tempo = day.get('Day').get('ShortPhrase')
                     , nascer_do_sol = day.get('Sun').get('Rise')
                     , por_do_sol = day.get('Sun').get('Set')
                     , velocidade_max_do_vento = day.get('Day').get('Wind').get('Speed').get('Value')
                )
            )

    return forecasts

In [None]:
# Buscar previsão do tempo para as cidades
resp_forecasts = get_forecasts(column = 'cidade_nome', table = 'cities')

# Criar data frame com as previsões
df_forecasts = spark.createDataFrame(resp_forecasts)

# Criar view com as previsões
df_forecasts.createOrReplaceTempView("forecasts")

In [None]:
# Criar DF da Tabela 1
tab1_sql = """
    select 
        c.cidade_nome as Cidade
        , c.cidade_id as CodigoDaCidade
        , substr(f.data, 0, 10) as Data
        , c.regiao_nome as Regiao
        , f.pais as Pais 
        , f.latitude as Latitude
        , f.longitude as Longitude
        , round((f.temperatura_maxima -32.0) / 1.8, 2) || ' Cº' as TemperaturaMaxima
        , round((f.temperatura_minima -32.0) / 1.8, 2) || ' Cº' as TemperaturaMinima
        , round((f.temperatura_media -32.0) / 1.8, 2) || ' Cº' as TemperaturaMedia
        , case when f.vai_chover then 'Sim' else 'Não' end as VaiChover
        , f.chance_de_chuva as ChanceDeChuva
        , f.condicao_do_tempo as CondicaoDoTempo
        , substr(f.nascer_do_sol, 12, 8) as NascerDoSol
        , substr(f.por_do_sol, 12, 8) as PorDoSol
        , round(f.velocidade_max_do_vento * 1.61, 2) || ' km/h' as VelocidadeMaximaDoVento 

    from forecasts f
        inner join 
    cities c
        on 
    f.cidade_nome = c.cidade_nome
"""

df_1 = spark.sql(tab1_sql)

In [None]:
# Criar DF da Tabela 2
tab2_sql = """
    select 
        vai_chover.cidade_nome as Cidade
        , vai_chover.QtdDiasVaiChover
        , n_vai_chover.QtdDiasNaoVaiChover
        , vai_chover.QtdDiasVaiChover + n_vai_chover.QtdDiasNaoVaiChover as TotalDiasMapeados
    from 
    (
        select 
            cidade_nome
            , count(vai_chover) as QtdDiasVaiChover
        from forecasts 
        where vai_chover = true
        group by cidade_nome
    ) vai_chover
    
    inner join 
    
    (
        select 
            cidade_nome
            , count(vai_chover) as QtdDiasNaoVaiChover
        from forecasts 
        where vai_chover = false
        group by cidade_nome
    ) n_vai_chover

    on vai_chover.cidade_nome = n_vai_chover.cidade_nome

"""
df_2 = spark.sql(tab2_sql)

In [None]:
# Exportar CSVs
df_1.repartition(1).write.option("header",True).\
    mode("overwrite").csv("tabela1.csv")

df_2.repartition(1).write.option("header",True).\
    mode("overwrite").csv("tabela2.csv")