<a href="https://colab.research.google.com/github/LealDias/AulasGit/blob/main/C%C3%B3pia_de_spark_test.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Desafio: Consumo de Dados para Previsão do Tempo das Cidades do Vale do Paraíba.

## Objetivo

Avaliar conhecimentos nas linguagens Python e SQL e na engine de processamento Apache Spark.

## Descrição

Neste desafio, você desenvolverá um notebook que será responsável por extrair dados de previsão do tempo das cidades do Vale do Paraíba, região onde se localiza a Dataside. Para consultar todas as cidades dessa região, utilizaremos a API do IBGE. No caso, basta realizar uma requisição HTTP com o método GET, utilizando a URL abaixo:

```
https://servicodados.ibge.gov.br/api/v1/localidades/mesorregioes/3513/municipios
```

Com esses dados, gerar um data frame e a partir dele uma temp view. Ex: "cities"

Utilizando os nomes das cidades, deverão ser consultados os dados de previsão de tempo para cada cidade. Para realizar essa consulta, poderá ser utilizada qualquer uma das APIs informadas no link abaixo.

[Public APIs - Wather](https://github.com/public-apis/public-apis#weather)

Obs.: Para algumas, pode ser necessário cadastrar-se para acessar sua API Key. Mas nenhuma delas deve precisar cadastrar cartão de crédito ou adicionar qualquer valor monetário para utilizar. Caso alguma solicite, basta optar por outra.

Com os dados consultados, gerar um data frame e partir dele outra temp view. Ex: "forecasts"

Com as temp views geradas, utilizar Spark SQL para criar queries e gerar data frames das seguintes tabelas:

- Tabela 1: dados de previsão do tempo para os próximos cinco dias, para cada data e cidade consultadas. As colunas dessa tabela serão:
    - Cidade
    - CodigoDaCidade
    - Data
    - Regiao
    - Pais
    - Latitude
    - Longigute
    - TemperaturaMaxima
    - TemperaturaMinima
    - TemperaturaMedia
    - VaiChover
    - ChanceDeChuva
    - CondicaoDoTempo
    - NascerDoSol
    - PorDoSol
    - VelocidadeMaximaDoVento
    
    Obs.: Os valores da coluna "VaiChover" deverá ser "Sim" ou "Não". E a coluna "CodigoDaCidade" é o ID retornado junto com os nomes da cidades na API do IBGE.
    Obs.: Dependendo da API utilizada, algumas colunas podem não existir e ficarão em branco. Você deve optar por uma API que traga o maior número de informações possível.

- Tabela 2: quantidade de dias com chuva e sem chuva para os dias consultados, para cada data consultada. Colunas:
    - Cidade
    - QtdDiasVaiChover
    - QtdDiasNaoVaiChover
    - TotalDiasMapeados

Essas tabelas deverão ser exportadas em formado CSV e entregue no final do desafio.

## To Do

[ ] - Consultar municípios do Vale do Paraíba, gerar um data frame e criar uma temp view com esses dados.
[ ] - Consultar dados do tempo para cada município, gerar um data frame e criar uma outra temp view.
[ ] - Utilizar Spark SQL para gerar os data frames das Tabelas 1 e 2.
[ ] - Exportar os data frames para CSV.

## Atenção

- Existe um limite de requisições de 10000 requests por conta cadastrada na m3o.
- Essa API pode retornar cidades de outras regiões que possuem nome semelhante a alguma cidade do Vale do Paraiba. Pode mantê-las ou filtrar para gerar as tabelas apenas com dados de Regiao = Sao Paulo. Fica a seu critério.

## Entregando o desafio

Concluindo todos os passos informados em To Do, basta salvar o arquivo .ipynb do notebook e enviar para a Dataside juntamente com os CSVs das duas tabelas.


In [12]:
%pip install pyspark

# findspark
%pip install findspark

# jupyterlab
#pip install jupyterlab

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [13]:
import findspark
findspark.init()

import requests
import json
#import unidecode
from pyspark.sql import SparkSession, types as T
from pyspark.sql.functions import col, substring

spark = SparkSession.builder \
      .master("local[1]") \
      .appName("SparkByExamples.com") \
      .getOrCreate()

In [14]:
# Buscar cidades do Vale do Paraíba
# TODO
vale_pb = "https://servicodados.ibge.gov.br/api/v1/localidades/mesorregioes/3513/municipios"

response = requests.get(vale_pb) # Instanciando a função GET da biblioteca requests

print(response) # Verificar conexão com API

dados_json = json.loads(response.text)

# Criar data frame com as cidades
# TODO

itens = []
lista = []

for i in dados_json:
  itens.append(i['nome'])
  itens.append(i['id'])
  
  lista.append(itens)

  itens = []

colunas_cidade = ['nome', 'id']

cidade_df = spark.createDataFrame(data=lista, schema=colunas_cidade)

cidade_df.show()

# Criar view com as cidades
# TODO

<Response [200]>
+------------------+-------+
|              nome|     id|
+------------------+-------+
|         Aparecida|3502507|
|            Arapeí|3503158|
|            Areias|3503505|
|           Bananal|3504909|
|          Caçapava|3508504|
|Cachoeira Paulista|3508603|
|  Campos do Jordão|3509700|
|             Canas|3509957|
|     Caraguatatuba|3510500|
|          Cruzeiro|3513405|
|             Cunha|3513603|
|     Guaratinguetá|3518404|
|           Igaratá|3520202|
|          Ilhabela|3520400|
|           Jacareí|3524402|
|          Jambeiro|3524907|
|          Lagoinha|3526308|
|         Lavrinhas|3526605|
|            Lorena|3527207|
|   Monteiro Lobato|3531704|
+------------------+-------+
only showing top 20 rows



In [15]:
# Buscar previsão do tempo para as cidades
# TODO

# Setando as variáveis
API_KEY = "07538430feb338991b11f72aa9ff47e2"
idioma = "pt_br"

# Listas para a Base de Dados 2
dias_mapeados = 5 # De acordo com a documentação da API
lista_temp_2 = []
lista_geral = []

data_flag = ""
data_atual = ""
cont_sim = 0
cont = 0

# Listas para a Base de Dados 1
lista_mestre = []
lista_temp = []


# Variáveis
cidade = ""
cod_cidade = ""
data = ""
REGIAO = "Sudeste" # -> Constante da Região Sudeste
pais = ""
lat = ""
log = ""
temp_max = ""
temp_min = ""
temp_med = ""
vai_chover = ""
chance_chuva = ""
cond_tempo = ""
nascer_sol = ""
por_sol = ""
vel_max_vento = ""

# Loop para execução das colsutas de API's

for item in lista:

  # Data Frame 2

  if data_atual != "":

    lista_temp_2.append(cidade_name) # Coluna Cidade
    lista_temp_2.remove(cidade_name)
    lista_temp_2.append(int(cont_sim)) # Coluna DiasVaiChover
    lista_temp_2.append(int(dias_mapeados - cont_sim)) # Coluna DiasNaoVaiChover
    lista_temp_2.append(int(dias_mapeados)) # Coluna DiasMapeados

    lista_geral.append(lista_temp_2)
    lista_temp_2 = []
    cont_sim = 0
    cont = 0

  #--------------------------------------------------------------------------------------------------------------------#

  cidade_name = item[0]

  link = f"https://api.openweathermap.org/data/2.5/forecast?q={cidade_name}&appid={API_KEY}&units=metric&lang={idioma}"

  requisicao = requests.get(link)

  req_dic = requisicao.json()

  lista_temp_2.append(cidade_name) # Parte da contrução do DF 2


  for i in req_dic['list']:

    #data_anterior = data_atual
    lista_temp = []

    cidade = cidade_name
    cod_cidade = item[1]
    data = i['dt_txt']
    regiao = REGIAO
    pais = req_dic['city']['country']
    lat = req_dic['city']['coord']['lat']
    log = req_dic['city']['coord']['lon']
    temp_max = float(i['main']['temp_max'])
    temp_min = float(i['main']['temp_min'])
    temp_med = float((temp_max + temp_min) / 2)
    vai_chover = "Sim" if i['weather'][0]['main'] == 'Rain' else "Não"
    chance_chuva = float(i['pop'])
    cond_tempo = i['weather'][0]['description']
    nascer_sol = req_dic['city']['sunrise']
    por_sol = req_dic['city']['sunset']
    vel_max_vento = float(i['wind']['gust'])

    lista_temp.append(cidade)
    lista_temp.append(cod_cidade)
    lista_temp.append(data)
    lista_temp.append(regiao)
    lista_temp.append(pais)
    lista_temp.append(lat)
    lista_temp.append(log)
    lista_temp.append(temp_max)
    lista_temp.append(temp_min)
    lista_temp.append(temp_med)
    lista_temp.append(vai_chover)
    lista_temp.append(chance_chuva)
    lista_temp.append(cond_tempo)
    lista_temp.append(nascer_sol)
    lista_temp.append(por_sol)
    lista_temp.append(vel_max_vento)

    lista_mestre.append(lista_temp)


    #data_flag = data[0:10]
    data_atual = data[0:10]


    if (data_atual == data_flag) or (data_flag == ""):
      data_flag = data_atual
      if vai_chover == "Sim":
        cont = 1
    else:
      cont_sim = cont_sim + cont
      cont = 0
      data_flag = data_atual

      if vai_chover == 'Sim':
        cont = 1

# Repetição do código para pegar os dados depois do laço

lista_temp_2.append(cidade_name) # Coluna Cidade
lista_temp_2.remove(cidade_name)
lista_temp_2.append(int(cont_sim)) # Coluna DiasVaiChover
lista_temp_2.append(int(dias_mapeados - cont_sim)) # Coluna DiasNaoVaiChover
lista_temp_2.append(int(dias_mapeados)) # Coluna DiasMapeados

lista_geral.append(lista_temp_2)

# Criar data frame com as previsões
# TODO

colunas_DF_1 = ['Cidade', 'CodigoDaCidade', 'Data', 'Regiao', 'Pais', 'Latitude', 'Longigute', 'TemperaturaMaxima', 'TemperaturaMinima', 'TemperaturaMedia', 'VaiChover', 'ChanceDeChuva', 'CondicaoDoTempo', 'NascerDoSol', 'PorDoSol', 'VelocidadeMaximaDoVento']

previsao_df = spark.createDataFrame(data=lista_mestre, schema=colunas_DF_1)

# Criar view com as previsões
# TODO
previsao_df.show()


+---------+--------------+-------------------+-------+----+--------+---------+-----------------+-----------------+------------------+---------+-------------+----------------+-----------+----------+-----------------------+
|   Cidade|CodigoDaCidade|               Data| Regiao|Pais|Latitude|Longigute|TemperaturaMaxima|TemperaturaMinima|  TemperaturaMedia|VaiChover|ChanceDeChuva| CondicaoDoTempo|NascerDoSol|  PorDoSol|VelocidadeMaximaDoVento|
+---------+--------------+-------------------+-------+----+--------+---------+-----------------+-----------------+------------------+---------+-------------+----------------+-----------+----------+-----------------------+
|Aparecida|       3502507|2022-09-06 18:00:00|Sudeste|  BR|-22.8469| -45.2297|            24.51|            23.02|            23.765|      Não|          0.0|         nublado| 1662455204|1662497536|                   4.24|
|Aparecida|       3502507|2022-09-06 21:00:00|Sudeste|  BR|-22.8469| -45.2297|            19.75|            18.4

In [None]:
# Criar DF da Tabela 2
# TODO

#Tabela 2: quantidade de dias com chuva e sem chuva para os dias consultados, para cada data consultada. Colunas:

#Cidade
#QtdDiasVaiChover
#QtdDiasNaoVaiChover
#TotalDiasMapeados

colunas_DF_2 = ['Cidade', 'QtdDiasVaiChover', 'QtdDiasNaoVaiChover', 'TotalDiasMapeados']

DF_2 = spark.createDataFrame(data=lista_geral, schema=colunas_DF_2)

DF_2.show()

+------------------+----------------+-------------------+-----------------+
|            Cidade|QtdDiasVaiChover|QtdDiasNaoVaiChover|TotalDiasMapeados|
+------------------+----------------+-------------------+-----------------+
|         Aparecida|               2|                  3|                5|
|            Arapeí|               0|                  5|                5|
|            Areias|               0|                  5|                5|
|           Bananal|               0|                  5|                5|
|          Caçapava|               2|                  3|                5|
|Cachoeira Paulista|               1|                  4|                5|
|  Campos do Jordão|               3|                  2|                5|
|             Canas|               5|                  0|                5|
|     Caraguatatuba|               4|                  1|                5|
|          Cruzeiro|               1|                  4|                5|
|           

In [16]:
# Exportar CSVs
# TODO
previsao_df.write.csv("DF_1/df1.csv", header=True)
DF_2.write.csv("DF_2/df2.csv", header=True)