<a href="https://colab.research.google.com/github/gustavoborgesguimaraes/python-projects/blob/main/desafio_autoglass_nivelamento.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Autor: Gustavo Borges Guimarães

# Desafio: Consumo de Dados para Previsão do Tempo dos Municípios do ES.

## Objetivo

Avaliar conhecimentos nas linguagens Python e SQL e na engine de processamento Apache Spark.

## Descrição

Neste desafio, você desenvolverá um notebook que será responsável por extrair dados de previsão do tempo dos municípios do ES. Para consultar todas as cidades dessa região, utilizaremos a API do IBGE. No caso, basta realizar uma requisição HTTP com o método GET, utilizando a URL abaixo:

```
https://servicodados.ibge.gov.br/api/v1/localidades/estados/32/municipios
```

Com esses dados, gerar um data frame e a partir dele uma temp view. Ex: "cities"

Utilizando os nomes das cidades, deverão ser consultados os dados de previsão de tempo para cada município. Para realizar essa consulta, poderá ser utilizada qualquer uma das APIs informadas no link abaixo.

[Public APIs - Wather](https://github.com/public-apis/public-apis#weather)

Obs.: Para algumas, pode ser necessário cadastrar-se para acessar sua API Key. Mas nenhuma delas deve precisar cadastrar cartão de crédito ou adicionar qualquer valor monetário para utilizar. Caso alguma solicite, basta optar por outra.

Com os dados consultados, gerar um data frame e partir dele outra temp view. Ex: "forecasts"

Com as temp views geradas, utilizar Spark SQL para criar queries e gerar data frames das seguintes tabelas:

- Tabela 1: dados de previsão do tempo para os próximos cinco dias, para cada data e cidade consultadas. As colunas dessa tabela serão:
    - Cidade
    - CodigoDaCidade
    - Data
    - Regiao
    - Pais
    - Latitude
    - Longigute
    - TemperaturaMaxima
    - TemperaturaMinima
    - TemperaturaMedia
    - VaiChover
    - ChanceDeChuva
    - CondicaoDoTempo
    - NascerDoSol
    - PorDoSol
    - VelocidadeMaximaDoVento
    
    Obs.: Os valores da coluna "VaiChover" deverá ser "Sim" ou "Não". E a coluna "CodigoDaCidade" é o ID retornado junto com os nomes da cidades na API do IBGE.
    Obs.: Dependendo da API utilizada, algumas colunas podem não existir e ficarão em branco. Você deve optar por uma API que traga o maior número de informações possível.

- Tabela 2: quantidade de dias com chuva e sem chuva para os dias consultados, para cada data consultada. Colunas:
    - Cidade
    - QtdDiasVaiChover
    - QtdDiasNaoVaiChover
    - TotalDiasMapeados

Essas tabelas deverão ser exportadas em formado CSV e entregue no final do desafio.

## To Do

[ ] - Consultar municípios do ES, gerar um data frame e criar uma temp view com esses dados.
[ ] - Consultar dados do tempo para cada município, gerar um data frame e criar uma outra temp view.
[ ] - Utilizar Spark SQL para gerar os data frames das Tabelas 1 e 2.
[ ] - Exportar os data frames para CSV.

## Atenção

- Existe um limite de requisições de 10000 requests por conta cadastrada na m3o.
- Essa API pode retornar cidades de outras regiões que possuem nome semelhante a alguma cidade do ES. Pode mantê-las ou filtrar para gerar as tabelas apenas com dados de Regiao = Vila Velha. Fica a seu critério.

## Entregando o desafio

Concluindo todos os passos informados em To Do, basta salvar o arquivo .ipynb do notebook e zipar juntamente com os CSVs das duas tabelas e postar no grupo do Teams.



PREPA-----------------------------------------------
### Caso queira utilizar o colab do google para executar o script Spark descomente as linhas abaixo

In [None]:
#PREPARAÇÃO DO AMBIENTE.

#Instalando o JDK.
!apt-get install openjdk-11-jdk-headless -qq > /dev/null

#Baixando o Apache Hadoop.
!wget -q https://dlcdn.apache.org/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz

#Descompactando o Apache Hadoop.
!tar xf spark-3.3.1-bin-hadoop3.tgz

#Definindo as variáveis de ambiente do Java e do Spark.
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.1-bin-hadoop3"

#Baixando o ngrok.
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip

#Descompactando o ngrok.
!unzip ngrok-stable-linux-amd64.zip

#Definindo a URL local para a consumo da API.
get_ipython().system_raw('./ngrok http 4050 &')
!curl -s http://localhost:4040/api/tunnels

#Instalando o FindSpark.
!pip install -q findspark

#Instalando o UniDecode.
!pip install -q unidecode

#Importando os módulos/libs que serão usados.
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark import SparkContext
import requests
import json
import unidecode
import pandas as pd

#Buildando o Spark Session
spark = SparkSession.builder \
        .master("local[1]") \
        .appName("SparkbyExamples.com") \
        .config('spark.ui.port', '4050') \
        .getOrCreate()

In [2]:
#Constante da Unidade Federativa do ES
#Para mais, consulte em "servicodados.ibge.gov.br"
CodUF = '32'

#Endereço da API do IBGE
UrlDaApi = f"https://servicodados.ibge.gov.br/api/v1/localidades/estados/{CodUF}/municipios"

#Efetuando a requisição na API do IBGE
RequestDaApi = requests.get(UrlDaApi)

In [None]:
#Convertento o retorno da API em JSON
JsonIBGE = RequestDaApi.json()

#OPCIONAL: Criando um arquivo JSON local.
with open('JsonFileResponse.json', 'w') as json_file:
  json.dump(JsonIBGE, json_file, indent=4)

#OPCIONAL: Criando um arquivo Parquet local.
  #Código...

#Criando o DataFrame GERAL da API do IBGE.
dfApiIbge = spark.createDataFrame(JsonIBGE)

#Criando um DataFrame de cidades com colunas específicas.
df_cidades = dfApiIbge.withColumn("NomeCidade", F.col("nome"))\
                      .drop("id", "microrregiao", "nome", "regiao-imediata")
                      
#Visualizando todos os registros do Data frame Cidades
df_cidades.show()

In [167]:
#Criando a TempView "cities"
df_cidades.createOrReplaceTempView("cities")

#Testando a consulta SparkSQL na TempView
VarCity = spark.sql('select * from cities')

#Criando a coleção de cidades para usar na API de previsões
CityCollect = VarCity.collect()

In [22]:
#Criando o Data Frame das previsões do tempo.
dfPrevTempo = pd.DataFrame(columns= ['Cidade', 'CodigoDaCidade', 'Pais', 'Latitude',
                                     'Longitude', 'NascerDoSol', 'PorDoSol', 'Populacao',
                                     'Data', 'TemperaturaMaxima', 'TemperaturaMinima',
                                     'SensacaoTermica', 'CondicaoDoTempo', 'VelocidadeDoVento'])

#Constantes p/uso na API
MyAPIKey = "d15318e0f35125eb2040ecd74952410a"
StateCod = "ES"
CountryCod = "BR"

for cid in CityCollect:
  Link = f"https://api.openweathermap.org/data/2.5/forecast?q={cid['NomeCidade']},{StateCod},{CountryCod}&appid={MyAPIKey}&lang=pt_br"
  Requisicao = requests.get(Link)
  ForecastsJson = Requisicao.json()
  if ForecastsJson['cod'] == '200':
    Cidade = (ForecastsJson['city']['name'])
    CodigoDaCidade = (ForecastsJson['city']['id'])
    Pais = (ForecastsJson['city']['country'])
    Latitude = (ForecastsJson['city']['coord']['lat'])
    Longitude = (ForecastsJson['city']['coord']['lon'])
    NascerDoSol = (ForecastsJson['city']['sunrise'])
    PorDoSol = (ForecastsJson['city']['sunset'])
    Populacao = (ForecastsJson['city']['population'])
    for CidDesc in ForecastsJson['list']:
      Data = (CidDesc['dt_txt']) #Data
      TemperaturaMaxima = (CidDesc['main']['temp_max'] -273.15)
      TemperaturaMinima = (CidDesc['main']['temp_min'] -273.15)
      SensacaoTermica = (CidDesc['main']['feels_like'] -273.15)
      CondicaoDoTempo = (CidDesc['weather'][0]['description'])
      VelocidadeDoVento = (CidDesc['wind']['speed'])
      #Inserindo os dados no DataFrame de Previsões do tempo
      dfPrevTempo = dfPrevTempo.append({'Cidade' : Cidade, 'CodigoDaCidade' : CodigoDaCidade, 'Pais' : Pais, 'Latitude' : Latitude,
                                        'Longitude' : Longitude, 'NascerDoSol' : NascerDoSol, 'PorDoSol' : PorDoSol, 'Populacao' : Populacao,
                                        'Data' : Data, 'TemperaturaMaxima' : TemperaturaMaxima, 'TemperaturaMinima' : TemperaturaMinima,
                                        'SensacaoTermica' : SensacaoTermica, 'CondicaoDoTempo' : CondicaoDoTempo, 'VelocidadeDoVento' : VelocidadeDoVento},
                                        ignore_index = True)


In [None]:
#Criando a TempView "Forecasts"
dfPrevTempo = spark.createDataFrame(dfPrevTempo)
dfPrevTempo.createOrReplaceTempView("forecasts")


In [117]:
#Visualizando os dados da TemView "Forecasts"
spark.sql('select * from forecasts').show()

+--------------+--------------+----+--------+---------+-----------+----------+---------+-------------------+------------------+------------------+------------------+---------------+-----------------+
|        Cidade|CodigoDaCidade|Pais|Latitude|Longitude|NascerDoSol|  PorDoSol|Populacao|               Data| TemperaturaMaxima| TemperaturaMinima|   SensacaoTermica|CondicaoDoTempo|VelocidadeDoVento|
+--------------+--------------+----+--------+---------+-----------+----------+---------+-------------------+------------------+------------------+------------------+---------------+-----------------+
|Afonso Cláudio|       3473129|  BR|-20.0742| -41.1239| 1674807912|1674854936|    13543|2023-01-27 18:00:00|25.610000000000014|25.260000000000048| 26.52000000000004| chuva moderada|             2.24|
|Afonso Cláudio|       3473129|  BR|-20.0742| -41.1239| 1674807912|1674854936|    13543|2023-01-27 21:00:00|24.170000000000016|23.370000000000005| 25.04000000000002| chuva moderada|             1.13|


In [173]:
#Criando a tabela 1 com SparkSQL
Query1 = """
  select
    f.Cidade as Cidade,
    f.CodigoDaCidade as CodigoDaCidade,
    f.Data as Data,
    'Sudeste' as Regiao,
    f.Pais as Pais,
    f.Latitude as Latitude,
    f.Longitude as Longigute,
    f.TemperaturaMaxima as TemperaturaMaxima,
    f.TemperaturaMinima as TemperaturaMinima,
    (f.TemperaturaMaxima + f.TemperaturaMinima)/2 as TemperaturaMedia,
    f.SensacaoTermica as SensacaoTermica,
    (case f.CondicaoDoTempo
          when 'chuva leve' then 'Sim'
          when 'chuva moderada' then 'Sim'
          when 'chuva forte' then 'Sim'
          else 'Não'
    end) as VaiChover,
    f.CondicaoDoTempo as CondicaoDoTempo,
    f.NascerDoSol as NascerDoSol,
    f.PorDoSol as PorDoSol,
    f.VelocidadeDoVento as VelocidadeMaximaDoVento,
    f.Populacao as Populacao
  from forecasts f
"""

DataFrame1 = spark.sql(Query1)
DataFrame1.createOrReplaceTempView("tabela1")

#Efetuando a consulta da "Tabela 1"
spark.sql('select * from tabela1').show()

+--------------+--------------+-------------------+-------+----+--------+---------+------------------+------------------+------------------+------------------+---------+---------------+-----------+----------+-----------------------+---------+
|        Cidade|CodigoDaCidade|               Data| Regiao|Pais|Latitude|Longigute| TemperaturaMaxima| TemperaturaMinima|  TemperaturaMedia|   SensacaoTermica|VaiChover|CondicaoDoTempo|NascerDoSol|  PorDoSol|VelocidadeMaximaDoVento|Populacao|
+--------------+--------------+-------------------+-------+----+--------+---------+------------------+------------------+------------------+------------------+---------+---------------+-----------+----------+-----------------------+---------+
|Afonso Cláudio|       3473129|2023-01-27 18:00:00|Sudeste|  BR|-20.0742| -41.1239|25.610000000000014|25.260000000000048| 25.43500000000003| 26.52000000000004|      Sim| chuva moderada| 1674807912|1674854936|                   2.24|    13543|
|Afonso Cláudio|       34731

In [225]:
#Criando a tabela 2 com SparkSQL
Query2 = """
  select
    tab1.Cidade,
    count(case when tab1.VaiChover = 'Sim' then 1 end) as PeriodosDoDiaVaiChover,
    count(case when tab1.VaiChover = 'Não' then 1 end) as PeriodosDoDiaNaoVaiChover
  from tabela1 tab1
  group by 1
"""
DataFrame2 = spark.sql(Query2)
DataFrame2.createOrReplaceTempView("tabela2")

#Efetuando a consulta da "Tabela 2"
spark.sql('select * from tabela2').show()

+--------------------+----------------------+-------------------------+
|              Cidade|PeriodosDoDiaVaiChover|PeriodosDoDiaNaoVaiChover|
+--------------------+----------------------+-------------------------+
|           Ibitirama|                    12|                       28|
|              Guaçuí|                    12|                       28|
|            Colatina|                    15|                       25|
|Cachoeiro de Itap...|                    12|                       28|
|          Ecoporanga|                    11|                       29|
|        Mantenópolis|                    10|                       30|
|               Piúma|                    12|                       28|
|            Mucurici|                    10|                       30|
|              Apiacá|                    15|                       25|
|Venda Nova do Imi...|                    12|                       28|
|             Ibatiba|                    17|                   

In [56]:
#Exportando as tabelas para CSV
DataFrame1.write.option('header', True).csv("/content/csv-tab-1")
DataFrame1.write.option('header', True).csv("/content/csv-tab-2")

-------------------------------------------------