<a href="https://colab.research.google.com/github/GurgelLucas/weather-cite/blob/main/spark_test.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Desafio: Consumo de Dados para Previsão do Tempo das Cidades do Vale do Paraíba.

## Objetivo

Avaliar conhecimentos nas linguagens Python e SQL e na engine de processamento Apache Spark.

## Descrição

Neste desafio, você desenvolverá um notebook que será responsável por extrair dados de previsão do tempo das cidades do Vale do Paraíba, região onde se localiza a Dataside. Para consultar todas as cidades dessa região, utilizaremos a API do IBGE. No caso, basta realizar uma requisição HTTP com o método GET, utilizando a URL abaixo:

```
https://servicodados.ibge.gov.br/api/v1/localidades/mesorregioes/3513/municipios
```

Com esses dados, gerar um data frame e a partir dele uma temp view. Ex: "cities"

Utilizando os nomes das cidades, deverão ser consultados os dados de previsão de tempo para cada cidade. Para realizar essa consulta, poderá ser utilizada qualquer uma das APIs informadas no link abaixo.

[Public APIs - Wather](https://github.com/public-apis/public-apis#weather)

Obs.: Para algumas, pode ser necessário cadastrar-se para acessar sua API Key. Mas nenhuma delas deve precisar cadastrar cartão de crédito ou adicionar qualquer valor monetário para utilizar. Caso alguma solicite, basta optar por outra.

Com os dados consultados, gerar um data frame e partir dele outra temp view. Ex: "forecasts"

Com as temp views geradas, utilizar Spark SQL para criar queries e gerar data frames das seguintes tabelas:

- Tabela 1: dados de previsão do tempo para os próximos cinco dias, para cada data e cidade consultadas. As colunas dessa tabela serão:
    - Cidade
    - CodigoDaCidade
    - Data
    - Regiao
    - Pais
    - Latitude
    - Longigute
    - TemperaturaMaxima
    - TemperaturaMinima
    - TemperaturaMedia
    - VaiChover
    - ChanceDeChuva
    - CondicaoDoTempo
    - NascerDoSol
    - PorDoSol
    - VelocidadeMaximaDoVento
    
    Obs.: Os valores da coluna "VaiChover" deverá ser "Sim" ou "Não". E a coluna "CodigoDaCidade" é o ID retornado junto com os nomes da cidades na API do IBGE.
    Obs.: Dependendo da API utilizada, algumas colunas podem não existir e ficarão em branco. Você deve optar por uma API que traga o maior número de informações possível.

- Tabela 2: quantidade de dias com chuva e sem chuva para os dias consultados, para cada data consultada. Colunas:
    - Cidade
    - QtdDiasVaiChover
    - QtdDiasNaoVaiChover
    - TotalDiasMapeados

Essas tabelas deverão ser exportadas em formado CSV e entregue no final do desafio.

## To Do

[ ] - Consultar municípios do Vale do Paraíba, gerar um data frame e criar uma temp view com esses dados.
[ ] - Consultar dados do tempo para cada município, gerar um data frame e criar uma outra temp view.
[ ] - Utilizar Spark SQL para gerar os data frames das Tabelas 1 e 2.
[ ] - Exportar os data frames para CSV.

## Atenção

- Existe um limite de requisições de 10000 requests por conta cadastrada na m3o.
- Essa API pode retornar cidades de outras regiões que possuem nome semelhante a alguma cidade do Vale do Paraiba. Pode mantê-las ou filtrar para gerar as tabelas apenas com dados de Regiao = Sao Paulo. Fica a seu critério.

## Entregando o desafio

Concluindo todos os passos informados em To Do, basta salvar o arquivo .ipynb do notebook e enviar para a Dataside juntamente com os CSVs das duas tabelas.


In [None]:
# instalar o JAVA
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [None]:
# Baixar a versão mais recente do Spark
!wget -q https://downloads.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz

In [None]:
# Deszipar o spark
!tar xf /content/spark-3.1.2-bin-hadoop2.7.tgz
!rm spark-3.1.2-bin-hadoop2.7.tgz

In [None]:
# Declarando as variáveis de ambiente
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

In [None]:
# Instalar a lib que ajuda e a encontrar o Spark no sistema e importá-lo como uma biblioteca regular
!pip install -q findspark unidecode

[?25l[K     |█▍                              | 10 kB 30.4 MB/s eta 0:00:01[K     |██▉                             | 20 kB 39.5 MB/s eta 0:00:01[K     |████▏                           | 30 kB 40.3 MB/s eta 0:00:01[K     |█████▋                          | 40 kB 12.8 MB/s eta 0:00:01[K     |███████                         | 51 kB 12.8 MB/s eta 0:00:01[K     |████████▍                       | 61 kB 14.6 MB/s eta 0:00:01[K     |█████████▊                      | 71 kB 12.3 MB/s eta 0:00:01[K     |███████████▏                    | 81 kB 13.7 MB/s eta 0:00:01[K     |████████████▌                   | 92 kB 15.2 MB/s eta 0:00:01[K     |██████████████                  | 102 kB 13.7 MB/s eta 0:00:01[K     |███████████████▎                | 112 kB 13.7 MB/s eta 0:00:01[K     |████████████████▊               | 122 kB 13.7 MB/s eta 0:00:01[K     |██████████████████              | 133 kB 13.7 MB/s eta 0:00:01[K     |███████████████████▌            | 143 kB 13.7 MB/s eta 0:

In [None]:
import findspark
findspark.init()

import urllib.parse
import requests
import json
import unidecode
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import from_json, col, explode_outer

spark = SparkSession.builder \
      .master("local[1]") \
      .appName("SparkByExamples.com") \
      .getOrCreate()

In [190]:
# Buscar cidades do Vale do Paraíba
# TODO

req = requests.get("https://servicodados.ibge.gov.br/api/v1/localidades/mesorregioes/3513/municipios")
data = req.json()

# Criar data frame com as cidades
# TODO

#cities = spark.createDataFrame(data)
cities = spark.read.json(spark.sparkContext.parallelize([data]))
cities.printSchema()



# Criar view com as cidades
# TODO
vw_cities = cities.createOrReplaceTempView("cities")
#spark.sql("SELECT microrregiao FROM cities").show()

root
 |-- id: string (nullable = true)
 |-- microrregiao: struct (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- mesorregiao: struct (nullable = true)
 |    |    |-- UF: struct (nullable = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |    |-- nome: string (nullable = true)
 |    |    |    |-- regiao: struct (nullable = true)
 |    |    |    |    |-- id: long (nullable = true)
 |    |    |    |    |-- nome: string (nullable = true)
 |    |    |    |    |-- sigla: string (nullable = true)
 |    |    |    |-- sigla: string (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- nome: string (nullable = true)
 |    |-- nome: string (nullable = true)
 |-- nome: string (nullable = true)
 |-- regiao-imediata: struct (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- nome: string (nullable = true)
 |    |-- regiao-intermediaria: struct (nullable = true)
 |    |    |-- UF: struct (nullable = true)
 |    |    |    |-- id: long 

In [189]:
# Melhoria e automatização de etapas 
# Pegar colunas que são structs 
def get_struct_columns(dataframe):
  columns_struct = []
  for x in cities.dtypes:
    if 'struct' in x[1]:
      columns_struct.append(x[0])
  return columns_struct

In [197]:
for struct in get_struct_columns(cities):
  for i in cities.schema[struct].dataType.names:
    cities = cities.select("*", col(f"{struct}.{i}").alias(f"{struct}_{i}"))
  cities = cities.drop(col(struct))

In [198]:
cities.printSchema()

root
 |-- id: string (nullable = true)
 |-- nome: string (nullable = true)
 |-- microrregiao_id: long (nullable = true)
 |-- microrregiao_nome: string (nullable = true)
 |-- regiao-imediata_id: long (nullable = true)
 |-- regiao-imediata_nome: string (nullable = true)
 |-- microrregiao_mesorregiao_id: long (nullable = true)
 |-- microrregiao_mesorregiao_nome: string (nullable = true)
 |-- regiao-imediata_regiao-intermediaria_id: long (nullable = true)
 |-- regiao-imediata_regiao-intermediaria_nome: string (nullable = true)
 |-- microrregiao_mesorregiao_UF_id: long (nullable = true)
 |-- microrregiao_mesorregiao_UF_nome: string (nullable = true)
 |-- microrregiao_mesorregiao_UF_sigla: string (nullable = true)
 |-- regiao-imediata_regiao-intermediaria_UF_id: long (nullable = true)
 |-- regiao-imediata_regiao-intermediaria_UF_nome: string (nullable = true)
 |-- regiao-imediata_regiao-intermediaria_UF_sigla: string (nullable = true)
 |-- microrregiao_mesorregiao_UF_regiao_id: long (nullabl

In [None]:
# Buscar previsão do tempo para as cidades
# TODO

data = []
for city in cities.collect():
  cidade = urllib.parse.quote(city.nome)
  req = requests.get(f"http://api.weatherapi.com/v1/forecast.json?key=49e9a2605cc34c618b172338220405&q={cidade}&days=5&aqi=no&alerts=no")
  res = req.json()
  data.append(res)

# Criar data frame com as previsões
# TODO
weather_forecast = spark.read.json(spark.sparkContext.parallelize(data))
weather_forecast = weather_forecast.filter(weather_forecast.location.region == "Sao Paulo")
#weather_forecast = weather_forecast.select("current.*", "location.*", explode_outer("forecast.forecastday") \
#                                    .alias("forecastday")).drop("current", "forecast", "error", "location") \
#                                   .select("*","forecastday.*").drop("forecastday") \
#                                   .select("*","day.*").drop("day")
weather_forecast.printSchema()

# Criar view com as previsões
# TODO
vw_weather_forecast = weather_forecast.createOrReplaceTempView("weather_forecast")

root
 |-- current: struct (nullable = true)
 |    |-- cloud: long (nullable = true)
 |    |-- condition: struct (nullable = true)
 |    |    |-- code: long (nullable = true)
 |    |    |-- icon: string (nullable = true)
 |    |    |-- text: string (nullable = true)
 |    |-- feelslike_c: double (nullable = true)
 |    |-- feelslike_f: double (nullable = true)
 |    |-- gust_kph: double (nullable = true)
 |    |-- gust_mph: double (nullable = true)
 |    |-- humidity: long (nullable = true)
 |    |-- is_day: long (nullable = true)
 |    |-- last_updated: string (nullable = true)
 |    |-- last_updated_epoch: long (nullable = true)
 |    |-- precip_in: double (nullable = true)
 |    |-- precip_mm: double (nullable = true)
 |    |-- pressure_in: double (nullable = true)
 |    |-- pressure_mb: double (nullable = true)
 |    |-- temp_c: double (nullable = true)
 |    |-- temp_f: double (nullable = true)
 |    |-- uv: double (nullable = true)
 |    |-- vis_km: double (nullable = true)
 |    

In [None]:
weather_forecast.show()

+-----+--------------------+-----------+-----------+--------+--------+--------+------+----------------+------------------+---------+---------+-----------+-----------+------+------+---+------+---------+-----------+--------+--------+--------+-------+------+----------------+---------------+------+------------------+---------+-----------------+--------------------+----------+----------+--------------------+-----------+---------+---------+---------+------------+--------------------+--------------------+--------------------+------------------+------------------+---------+---------+-----------+-----------+---------+---------+--------------+--------------+---+
|cloud|           condition|feelslike_c|feelslike_f|gust_kph|gust_mph|humidity|is_day|    last_updated|last_updated_epoch|precip_in|precip_mm|pressure_in|pressure_mb|temp_c|temp_f| uv|vis_km|vis_miles|wind_degree|wind_dir|wind_kph|wind_mph|country|   lat|       localtime|localtime_epoch|   lon|              name|   region|            tz_

In [None]:
# Criar DF da Tabela 1
# TODO

table01 = spark.sql("\
SELECT \
  c.nome as Cidade, \
  c.id as CodigoDaCidade, \
  wf.date as Data, \
  c.regiao_nome as Regiao, \
  wf.country as Pais, \
  wf.lat as Latitude, \
  wf.lon as Longitude, \
  wf.mintemp_c as TemperaturaMinima, \
  wf.maxtemp_c as TemperaturaMaxima, \
  wf.avgtemp_c as TemperaturaMedia, \
  IF(wf.daily_will_it_rain = 1, 'Sim', 'Não') as VaiChover, \
  wf.daily_chance_of_rain as ChanceDeChuva, \
  wf.astro.sunrise as NascerDoSol, \
  wf.astro.sunset as PorDoSol, \
  wf.maxwind_kph as VelocidadeMaximaDoVento \
FROM \
  cities as c \
LEFT JOIN \
  weather_forecast as wf \
ON \
  c.nome = wf.name"
)

In [None]:
table01.printSchema()

root
 |-- Cidade: string (nullable = true)
 |-- CodigoDaCidade: string (nullable = true)
 |-- Data: string (nullable = true)
 |-- Regiao: string (nullable = true)
 |-- Pais: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- TemperaturaMinima: double (nullable = true)
 |-- TemperaturaMaxima: double (nullable = true)
 |-- TemperaturaMedia: double (nullable = true)
 |-- VaiChover: string (nullable = false)
 |-- ChanceDeChuva: long (nullable = true)
 |-- NascerDoSol: string (nullable = true)
 |-- PorDoSol: string (nullable = true)
 |-- VelocidadeMaximaDoVento: double (nullable = true)



In [None]:
# Explode columns array

# 01 - Indentify columns with array 
columns = [weather_forecast.columns[z] for z in range(1, len(weather_forecast.columns)) if "array" in weather_forecast.dtypes[z][1]]

In [None]:
# Pegar a coluna array e a coluna pai
columns_array = []
for x in weather_forecast.dtypes:
  if "array" in x[1]:
    print(x[1].split(":array"))
    #columns_array.append([x[0]] + [z for i in x[1].split("array") for z in i.split(",") if ":" not in z])
#print(columns_array)

['struct<forecastday', '<struct<astro:struct<moon_illumination:string,moon_phase:string,moonrise:string,moonset:string,sunrise:string,sunset:string>,date:string,date_epoch:bigint,day:struct<avghumidity:double,avgtemp_c:double,avgtemp_f:double,avgvis_km:double,avgvis_miles:double,condition:struct<code:bigint,icon:string,text:string>,daily_chance_of_rain:bigint,daily_chance_of_snow:bigint,daily_will_it_rain:bigint,daily_will_it_snow:bigint,maxtemp_c:double,maxtemp_f:double,maxwind_kph:double,maxwind_mph:double,mintemp_c:double,mintemp_f:double,totalprecip_in:double,totalprecip_mm:double,uv:double>,hour', '<struct<chance_of_rain:bigint,chance_of_snow:bigint,cloud:bigint,condition:struct<code:bigint,icon:string,text:string>,dewpoint_c:double,dewpoint_f:double,feelslike_c:double,feelslike_f:double,gust_kph:double,gust_mph:double,heatindex_c:double,heatindex_f:double,humidity:bigint,is_day:bigint,precip_in:double,precip_mm:double,pressure_in:double,pressure_mb:double,temp_c:double,temp_f:dou

In [None]:
# Renomeando as Colunas e Removendo colunas
#for column in columns:
#  table01 = table01.select("*", explode_outer(column).alias(f"df_{column}")).drop(column)

In [None]:
# Criar DF da Tabela 2
# TODO

table02 = spark.sql("\
SELECT \
  c.nome as Cidade, \
  COUNT(wf.date) as TotalDiasMapeado, \
  SUM(wf.daily_will_it_rain) as QtdDiasVaiChover, \
  COUNT(wf.date) - SUM(wf.daily_will_it_rain) as QtdDiasNaoVaiChover \
FROM \
  cities as c \
INNER JOIN \
  weather_forecast as wf \
ON \
  c.nome = wf.name \
GROUP BY \
  c.nome \
HAVING \
  SUM(wf.daily_will_it_rain) >= 1"
)

In [None]:
# Exportar CSVs
# TODO

table01.repartition(1).write.format("csv") \
       .option("header", "true") \
       .mode("overwrite") \
       .save("tabela_01.csv")
table02.repartition(1).write.format("csv") \
       .option("header", "true") \
       .mode("overwrite") \
       .save("tabela_02.csv")

In [None]:
!zip -r tabela_01.zip tabela_01.csv
!zip -r tabela_02.zip tabela_02.csv

  adding: tabela_01.csv/ (stored 0%)
  adding: tabela_01.csv/_SUCCESS (stored 0%)
  adding: tabela_01.csv/._SUCCESS.crc (stored 0%)
  adding: tabela_01.csv/part-00000-39fddafa-fb61-44ef-bf09-a8167b572baf-c000.csv (deflated 78%)
  adding: tabela_01.csv/.part-00000-39fddafa-fb61-44ef-bf09-a8167b572baf-c000.csv.crc (stored 0%)
  adding: tabela_02.csv/ (stored 0%)
  adding: tabela_02.csv/_SUCCESS (stored 0%)
  adding: tabela_02.csv/part-00000-320f9cbe-4a87-400f-a401-eaf438a85bfc-c000.csv (deflated 39%)
  adding: tabela_02.csv/._SUCCESS.crc (stored 0%)
  adding: tabela_02.csv/.part-00000-320f9cbe-4a87-400f-a401-eaf438a85bfc-c000.csv.crc (stored 0%)
