# Desafio: Consumo de Dados para Previsão do Tempo das Cidades do Vale do Paraíba.

## Objetivo

Avaliar conhecimentos nas linguagens Python e SQL e na engine de processamento Apache Spark.

## Descrição

Neste desafio, você desenvolverá um notebook que será responsável por extrair dados de previsão do tempo das cidades do Vale do Paraíba, região onde se localiza a Dataside. Para consultar todas as cidades dessa região, utilizaremos a API do IBGE. No caso, basta realizar uma requisição HTTP com o método GET, utilizando a URL abaixo:

```
https://servicodados.ibge.gov.br/api/v1/localidades/mesorregioes/3513/municipios
```

Com esses dados, gerar um data frame e a partir dele uma temp view. Ex: "cities"

Utilizando os nomes das cidades, deverão ser consultados os dados de previsão de tempo para cada cidade. Para realizar essa consulta, poderá ser utilizada qualquer uma das APIs informadas no link abaixo.

[Public APIs - Wather](https://github.com/public-apis/public-apis#weather)

Obs.: Para algumas, pode ser necessário cadastrar-se para acessar sua API Key. Mas nenhuma delas deve precisar cadastrar cartão de crédito ou adicionar qualquer valor monetário para utilizar. Caso alguma solicite, basta optar por outra.

Com os dados consultados, gerar um data frame e partir dele outra temp view. Ex: "forecasts"

Com as temp views geradas, utilizar Spark SQL para criar queries e gerar data frames das seguintes tabelas:

- Tabela 1: dados de previsão do tempo para os próximos cinco dias, para cada data e cidade consultadas. As colunas dessa tabela serão:
    - Cidade
    - CodigoDaCidade
    - Data
    - Regiao
    - Pais
    - Latitude
    - Longigute
    - TemperaturaMaxima
    - TemperaturaMinima
    - TemperaturaMedia
    - VaiChover
    - ChanceDeChuva
    - CondicaoDoTempo
    - NascerDoSol
    - PorDoSol
    - VelocidadeMaximaDoVento
    
    Obs.: Os valores da coluna "VaiChover" deverá ser "Sim" ou "Não". E a coluna "CodigoDaCidade" é o ID retornado junto com os nomes da cidades na API do IBGE.
    Obs.: Dependendo da API utilizada, algumas colunas podem não existir e ficarão em branco. Você deve optar por uma API que traga o maior número de informações possível.

- Tabela 2: quantidade de dias com chuva e sem chuva para os dias consultados, para cada data consultada. Colunas:
    - Cidade
    - QtdDiasVaiChover
    - QtdDiasNaoVaiChover
    - TotalDiasMapeados

Essas tabelas deverão ser exportadas em formado CSV e entregue no final do desafio.

## To Do

[ ] - Consultar municípios do Vale do Paraíba, gerar um data frame e criar uma temp view com esses dados.
[ ] - Consultar dados do tempo para cada município, gerar um data frame e criar uma outra temp view.
[ ] - Utilizar Spark SQL para gerar os data frames das Tabelas 1 e 2.
[ ] - Exportar os data frames para CSV.

## Atenção

- Existe um limite de requisições de 10000 requests por conta cadastrada na m3o.
- Essa API pode retornar cidades de outras regiões que possuem nome semelhante a alguma cidade do Vale do Paraiba. Pode mantê-las ou filtrar para gerar as tabelas apenas com dados de Regiao = Sao Paulo. Fica a seu critério.

## Entregando o desafio

Concluindo todos os passos informados em To Do, basta salvar o arquivo .ipynb do notebook e enviar para a Dataside juntamente com os CSVs das duas tabelas.


In [15]:
import findspark
findspark.init()

import requests
import json
import unidecode
from pyspark.sql import SparkSession

spark = SparkSession.builder \
      .master("local[1]") \
      .appName("SparkByExamples.com") \
      .getOrCreate()

RuntimeError: Java gateway process exited before sending its port number

# Buscar cidades do Vale do Paraíba

In [214]:
import requests
import json
# está comentado e em outro bloco para não esquecer de descomentar kkk deu o limite de requisições
# entao ficou dando erro, ai como ja tinha puxado ontem de madru, ficou aquela.
    # url = "https://servicodados.ibge.gov.br/api/v1/localidades/mesorregioes/3513/municipios"
    # r = requests.get(url)

import csv
import datetime

    # linhas_df = []
    # for i in r.json():
    #     nome_cidade = i['nome'] # NOME CIDADE
    #     id_cidade = i['id'] # CÓDIGO CIDADE
    #     data = datetime.datetime.now() # DATA
    #     regiao_cidade = i['microrregiao']['nome'] # região
    #     pais_cidade = "BR" # pais
    #     linhas_df.append([nome_cidade,id_cidade,data,regiao_cidade,pais_cidade])

    # with open("cidades.csv","w",newline="") as csvfile:
    #     writer = csv.writer(csvfile,delimiter=",")
    #     writer.writerow(["Cidade","CodigoDaCidade","Data","Regiao","Pais"])
    #     for linha in linhas_df:
    #         writer.writerow(linha)
        
# Criar data frame com as cidades
"""
Bem, acabei tendo um probleminha com o pyspark, tbm não vou mentir, nunca cheguei a usar ele 
somente uma estudada sobre spark na faculdade. Não consegui resolver o problema do pyspark,
como esse projeto fala sobre a estrutura como data frame, pensei "pq não no pandas então né",
focando em solucionar pelo menos o problema conseguindo responder as perguntas do projeto.
Bem, isso pode dar um prejudicada, né? Mas tentando focar na solução rapida.
"""
import pandas as pd

lista_para_dataframe = []
for i in open("cidades.csv"):
    i = i.replace("\n","").split(",")
    lista_para_dataframe.append(i)

colunas = [_ for _ in lista_para_dataframe[1:]]
col_header = lista_para_dataframe[0]

df_pandas = pd.DataFrame(colunas,columns=col_header)

# problema futuro que deu, por isso essa linha aqui
df_pandas['Cidade'] = df_pandas['Cidade'].replace("São José do Barreiro","Sao Jose do Barreiro")

# Passaria o df_pandas para o objeto spark chamando o createdataframe, porém 
# tive problemas para utilizar o pyspark.

# Criar view com as cidades
df_pandas.head()

Unnamed: 0,Cidade,CodigoDaCidade,Data,Regiao,Pais
0,Aparecida,3502507,2022-05-04 01:31:27.344911,Guaratinguetá,BR
1,Arapeí,3503158,2022-05-04 01:31:27.344911,Bananal,BR
2,Areias,3503505,2022-05-04 01:31:27.344911,Bananal,BR
3,Bananal,3504909,2022-05-04 01:31:27.344911,Bananal,BR
4,Caçapava,3508504,2022-05-04 01:31:27.344911,São José dos Campos,BR


# Buscar previsão do tempo para as cidades

In [20]:
# Vamos consumir a API do OpenWeather, no qual criei uma instancia lá com uma KEY
# para utilizr somente nesse projeto, assim fica temporaria, não tem um "risco"
# de ser utilizada depois.
api_key = open("key.txt",'r').read()

# vou criar essa lista pq tem um limite de requisições, então vou salvar as que fizer
# numa lista, qq coisa vou ter elas salvas para testalas em outra celula do notebook
r_cidades = []
for city in df_pandas['Cidade']:
    url = f"http://api.openweathermap.org/geo/1.0/direct?q={city},SP,BR&limit={5}&appid={api_key}"
    r = requests.get(url)
    r_cidades.append(r)

In [50]:
# agora crio um dicionario para passar os dados de latitude e longitude.
# são os parametros que vamos usar para puxar o clima :D é verao é verao ta quente ta quente  ~~mentira ta chovendo
dic_info_cidades ={}
c = 0
for req in r_cidades:
    for unit_req in req.json():
        if unit_req['state'] == "São Paulo":
            dic_info_cidades[unit_req['name']] = {"lat":unit_req['lat'],
                                                  "lon":unit_req['lon']}
            break
dic_info_cidades

{'Aparecida': {'lat': -22.851552, 'lon': -45.2340924},
 'Arapeí': {'lat': -22.6738889, 'lon': -44.4477778},
 'Areias': {'lat': -22.5800929, 'lon': -44.6975127},
 'Bananal': {'lat': -22.6828195, 'lon': -44.3221095},
 'Caçapava': {'lat': -23.099204, 'lon': -45.707645},
 'Cachoeira Paulista': {'lat': -22.666498, 'lon': -45.015384},
 'Campos do Jordão': {'lat': -22.7395263, 'lon': -45.5912829},
 'Canas': {'lat': -22.7029347, 'lon': -45.0526508},
 'Caraguatatuba': {'lat': -23.62028, 'lon': -45.41306},
 'Cruzeiro': {'lat': -22.5783685, 'lon': -44.9642044},
 'Cunha': {'lat': -23.07753, 'lon': -44.9567436},
 'Guaratinguetá': {'lat': -22.8057839, 'lon': -45.1908926},
 'Igaratá': {'lat': -23.2063475, 'lon': -46.156934},
 'Ilhabela': {'lat': -23.816628, 'lon': -45.368685},
 'Jacareí': {'lat': -23.30528, 'lon': -45.96583},
 'Jambeiro': {'lat': -23.2556416, 'lon': -45.6919926},
 'Lagoinha': {'lat': -23.0898337, 'lon': -45.1903825},
 'Lavrinhas': {'lat': -22.570047, 'lon': -44.902359},
 'Lorena': {'

In [204]:
# Uma gambiarra para armazenar os dados temporariamente

class Cidades:
    lista_cidades = []
    def __init__(self,nome_cidade):
        self.cidade = cidade
        self.dias = []
    
    def adicionar_dia(self,dia):
        self.dias.append(dia)
        
    def get_dados(self):
        return self.dias
    def get_nome(self):
        return self.cidade

In [205]:
# agora vamos de clima
# criar um dicio para o resultado do clima com o mesmo motivo de cima, já salvar as requisições
# qualquer coisa já temos elas todas para testar (bem podia puxar só uma para testar né kk)
lista_resultado = []

for cidade in dic_info_cidades:
    cada = dic_info_cidades[cidade]
    city_atual = Cidades(cidade)

    url = f"https://api.openweathermap.org/data/2.5/onecall?lat={cada['lat']}&lon={cada['lon']}&units=metric&exclude=minutely,hourly&appid={api_key}"
    r = requests.get(url)
    lista_resultado.append(r)
    for i in r.json()['daily']:
        dia = datetime.datetime.fromtimestamp(i['dt']).date()
        nascer_sol = datetime.datetime.fromtimestamp(i['sunrise']).time()
        por_do_sol = datetime.datetime.fromtimestamp(i['sunset']).time()

        # temperaturas
        temp = i['temp']
        temp_minima = temp['min']
        temp_maxima = temp['max']
        temp_media = temp['day']

        # infos chuva
        clima = i['weather'] # aqui checo se vai chover
        condicao_tempo = [_['main'] for _ in clima][0]
        if condicao_tempo == "Rain":
            vai_chover = "Sim"
        else:
            vai_chover = "Não"
        possibilidade_chuva = i['pop']
        velocidade_vento = i['wind_speed']
        city_atual.adicionar_dia([dia,r.json()['lat'],r.json()['lon'],
                                  temp_maxima,temp_minima,temp_media,
                                  vai_chover,condicao_tempo,nascer_sol,
                                  por_do_sol,velocidade_vento,possibilidade_chuva])
    
    Cidades.lista_cidades.append(city_atual)

# Criar data frame com as previsões

In [210]:
# criar um dicionario para deixar os dados gerais da cidade
dic_cidade = {}

for i in range(len(df_pandas)):
    linha = df_pandas.iloc[i]
    
    dic_cidade[linha.Cidade] = {"codigo":linha.CodigoDaCidade,
                                "regiao":linha.Regiao,
                                "Pais":linha.Pais}

In [224]:
lista_data_frame_clima = [['Cidade','CodigoDaCidade','Data','Regiao','Pais','Latitude',
 'Longigute','TemperaturaMaxima (ºC)','TemperaturaMinima (ºC)','TemperaturaMedia (ºC)',
 'VaiChover','ChanceDeChuva','CondicaoDoTempo','NascerDoSol',
 'PorDoSol','VelocidadeMaximaDoVento (Metros/Segundos)']]

for i in Cidades.lista_cidades:
    cidade_atual = i.get_nome()
    look_city = dic_cidade[cidade_atual]
    dados = i.get_dados()
    for linha in dados:
        row_linha_dados = [cidade_atual,look_city['codigo'],linha[0],look_city['regiao'],
                           look_city['Pais'],linha[1],linha[2],linha[3],linha[4],linha[5],
                           linha[6],linha[11],linha[7],linha[8],linha[9],linha[10]]
        
        lista_data_frame_clima.append(row_linha_dados)
    
colunas_clima = [_ for _ in lista_data_frame_clima[1:]]
col_header_clima = lista_data_frame_clima[0]

df_previsoes = pd.DataFrame(colunas_clima,columns=col_header_clima)
df_previsoes.head(2)

Unnamed: 0,Cidade,CodigoDaCidade,Data,Regiao,Pais,Latitude,Longigute,TemperaturaMaxima (ºC),TemperaturaMinima (ºC),TemperaturaMedia (ºC),VaiChover,ChanceDeChuva,CondicaoDoTempo,NascerDoSol,PorDoSol,VelocidadeMaximaDoVento (Metros/Segundos)
0,Aparecida,3502507,2022-05-05,Guaratinguetá,BR,-22.8516,-45.2341,23.22,12.24,21.58,Não,0.22,Clear,06:22:05,17:33:06,2.16
1,Aparecida,3502507,2022-05-06,Guaratinguetá,BR,-22.8516,-45.2341,24.64,13.02,22.69,Não,0.12,Clear,06:22:32,17:32:31,1.04


# Criar view com as previsões

In [221]:
import pandasql as ps

# bem já que era para trabalhar com o spark sql, fazer a query da view pro df no pandas tbm
ps.sqldf("select * from df_previsoes")

Unnamed: 0,Cidade,CodigoDaCidade,Data,Regiao,Pais,Latitude,Longigute,TemperaturaMaxima (ºC),TemperaturaMinima (ºC),TemperaturaMedia (ºC),VaiChover,ChanceDeChuva,CondicaoDoTempo,NascerDoSol,PorDoSol,VelocidadeMaximaDoVento (Metros/Segundos)
0,Aparecida,3502507,2022-05-05,Guaratinguetá,BR,-22.8516,-45.2341,23.22,12.24,21.58,Não,0.22,Clear,06:22:05.000000,17:33:06.000000,2.16
1,Aparecida,3502507,2022-05-06,Guaratinguetá,BR,-22.8516,-45.2341,24.64,13.02,22.69,Não,0.12,Clear,06:22:32.000000,17:32:31.000000,1.04
2,Aparecida,3502507,2022-05-07,Guaratinguetá,BR,-22.8516,-45.2341,24.11,14.48,21.63,Não,0.09,Clouds,06:22:59.000000,17:31:58.000000,1.06
3,Aparecida,3502507,2022-05-08,Guaratinguetá,BR,-22.8516,-45.2341,22.93,16.49,22.93,Não,0.17,Clouds,06:23:27.000000,17:31:25.000000,1.61
4,Aparecida,3502507,2022-05-09,Guaratinguetá,BR,-22.8516,-45.2341,24.68,14.72,24.03,Não,0.04,Clear,06:23:54.000000,17:30:53.000000,1.41
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
307,Ubatuba,3555406,2022-05-08,Caraguatatuba,BR,-23.4332,-45.0834,23.82,19.52,23.82,Sim,0.32,Rain,06:23:41.000000,17:29:59.000000,2.42
308,Ubatuba,3555406,2022-05-09,Caraguatatuba,BR,-23.4332,-45.0834,24.32,19.78,24.32,Não,0.00,Clear,06:24:09.000000,17:29:26.000000,1.83
309,Ubatuba,3555406,2022-05-10,Caraguatatuba,BR,-23.4332,-45.0834,24.68,18.64,24.68,Sim,0.38,Rain,06:24:37.000000,17:28:54.000000,1.87
310,Ubatuba,3555406,2022-05-11,Caraguatatuba,BR,-23.4332,-45.0834,22.30,19.58,22.30,Sim,0.96,Rain,06:25:06.000000,17:28:24.000000,2.77


# Criar DF da Tabela 1

In [256]:
"""
se fosse utilizar o pyspark seria tipo ??
spark.CreateDataFrame(df_dataframe) -> da para utilizar o df do pandas né?
ou
spark.CreateDataFrame(Row(coluna1=conteudo_tupla1,coluna2,conteudo_tupla2),
                      Row(coluna1=conteudo_tupla1,coluna2,conteudo_tupla2))
acabei conhecendo esses métodos.
"""
# Como já estavamos trabalhando com o df pelo pandas, vou so chamar ele
df_previsoes

Unnamed: 0,Cidade,CodigoDaCidade,Data,Regiao,Pais,Latitude,Longigute,TemperaturaMaxima (ºC),TemperaturaMinima (ºC),TemperaturaMedia (ºC),VaiChover,ChanceDeChuva,CondicaoDoTempo,NascerDoSol,PorDoSol,VelocidadeMaximaDoVento (Metros/Segundos)
0,Aparecida,3502507,2022-05-05,Guaratinguetá,BR,-22.8516,-45.2341,23.22,12.24,21.58,Não,0.22,Clear,06:22:05,17:33:06,2.16
1,Aparecida,3502507,2022-05-06,Guaratinguetá,BR,-22.8516,-45.2341,24.64,13.02,22.69,Não,0.12,Clear,06:22:32,17:32:31,1.04
2,Aparecida,3502507,2022-05-07,Guaratinguetá,BR,-22.8516,-45.2341,24.11,14.48,21.63,Não,0.09,Clouds,06:22:59,17:31:58,1.06
3,Aparecida,3502507,2022-05-08,Guaratinguetá,BR,-22.8516,-45.2341,22.93,16.49,22.93,Não,0.17,Clouds,06:23:27,17:31:25,1.61
4,Aparecida,3502507,2022-05-09,Guaratinguetá,BR,-22.8516,-45.2341,24.68,14.72,24.03,Não,0.04,Clear,06:23:54,17:30:53,1.41
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
307,Ubatuba,3555406,2022-05-08,Caraguatatuba,BR,-23.4332,-45.0834,23.82,19.52,23.82,Sim,0.32,Rain,06:23:41,17:29:59,2.42
308,Ubatuba,3555406,2022-05-09,Caraguatatuba,BR,-23.4332,-45.0834,24.32,19.78,24.32,Não,0.00,Clear,06:24:09,17:29:26,1.83
309,Ubatuba,3555406,2022-05-10,Caraguatatuba,BR,-23.4332,-45.0834,24.68,18.64,24.68,Sim,0.38,Rain,06:24:37,17:28:54,1.87
310,Ubatuba,3555406,2022-05-11,Caraguatatuba,BR,-23.4332,-45.0834,22.30,19.58,22.30,Sim,0.96,Rain,06:25:06,17:28:24,2.77


# Criar DF da Tabela 2

In [241]:
# criar um novo df, e passar uma coluna numerica para calcular os dias que podem chover ou n

import numpy as np
df_table2 = df_previsoes.copy()
colunas_df_table2 = [_ for _ in df_table2.columns if _ not in ["Cidade","vai_chover","nao_vai_chover"]]

df_table2['vai_chover'] = np.where(df_table2['VaiChover']=='Sim',1,0)
df_table2['nao_vai_chover'] = np.where(df_table2['VaiChover']=='Não',1,0)


In [253]:
df_table2.drop(columns=colunas_df_table2,inplace=True)
df_table2_agrupada = df_table2.groupby(by="Cidade").sum()

df_table2_agrupada['total_dias'] = df_table2_agrupada['vai_chover'] + df_table2_agrupada['nao_vai_chover']

mudar_nome_coluna = {k:v for (k,v) in zip(df_table2_agrupada.columns,["QtdDiasVaiChover","QtdDiasNaoVaiChover","TotalDiasMapeados"])}
df_table2_agrupada.rename(columns=mudar_nome_coluna, inplace=True)
df_table2_agrupada

Unnamed: 0_level_0,QtdDiasVaiChover,QtdDiasNaoVaiChover,TotalDiasMapeados
Cidade,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
Aparecida,2,6,8
Arapeí,5,3,8
Areias,4,4,8
Bananal,3,5,8
Cachoeira Paulista,2,6,8
Campos do Jordão,5,3,8
Canas,2,6,8
Caraguatatuba,6,2,8
Caçapava,2,6,8
Cruzeiro,3,5,8


# Exportar CSVs

In [257]:
# tabela 1
df_previsoes.to_csv("exported/df_1_previsoes.csv")

# tabela 2
df_table2_agrupada.to_csv("exported/df_2_agrupados.csv")
