In [None]:
#Instalando o JDK.
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

#Instalando o Spark.
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz
!tar xf spark-3.0.0-bin-hadoop3.2.tgz

#Setando as variáveis de ambiente no sistema.
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"

#Instalando o FindSpark.
!pip install -q findspark

#Instalando o UniDecode.
!pip install -q unidecode

#Importando os objetos que serão usados no projeto.
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark import SparkContext
import requests
import json
import unidecode
import pandas as pd

#Buildando o SparkSession.
spark = SparkSession.builder \
        .master("local[1]") \
        .appName("SparkbyExamples.com") \
        .config('spark.ui.port', '4050') \
        .getOrCreate()

#Instalando o ngrok.
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip

#Definindo a URL local para a consumo da API.
get_ipython().system_raw('./ngrok http 4050 &')
!curl -s http://localhost:4040/api/tunnels

In [16]:
#Constante da UF do ES.
CodUF = '32'

#Endereço da API do IBGE.
UrlDaApi = f"http://servicodados.ibge.gov.br/api/v1/localidades/estados/{CodUF}/municipios"

#Efetuando a requisição na API do IBGE.
RequestDaApi = requests.get(UrlDaApi)

In [None]:
#Convertento o retorno da API em JSON.
JsonIBGE = RequestDaApi.json()

#Criando o DataFrame GERAL da API de IBGE.
dfGeralApiIbge = spark.createDataFrame(JsonIBGE)

In [18]:
#Criando o DataFrame APENAS das cidades da API do IBGE.
DataFrameCidadesApiIbge = dfGeralApiIbge.withColumn("NomeCidade", F.col("nome"))\
                                        .drop("id", "microrregiao", "nome", "regiao-imediata")

#Criando a TempView "cidades"
DataFrameCidadesApiIbge.createOrReplaceTempView("cidades")

#Criando a variável das cidades p/ a coleção.
VarCidadeIbge = spark.sql('select * from cidades')

#Criando a coleção de cidades para usar na API de Previsões do Tempo.
ColecaoCidadesIbge = VarCidadeIbge.collect()

In [None]:
#Criando os Metadados das Previsões do Tempo.
DataFramePrevTempo = pd.DataFrame(columns= ['Cidade',
                                            'CodigoDaCidade',
                                            'Pais',
                                            'Latitude',
                                            'Longitude',
                                            'NascerDoSol',
                                            'PorDoSol',
                                            'Populacao',
                                            'Data',
                                            'TemperaturaMaxima',
                                            'TemperaturaMinima',
                                            'SensacaoTermica',
                                            'CondicaoDoTempo',
                                            'VelocidadeDoVento'])

#Constantes p/uso na API.
MyAPIKey = "d15318e0f35125eb2040ecd74952410a"
StateCod = "ES"
CountryCod = "BR"

#Iniciando as requisições da API "openweathermap" por cidade.
for cid in ColecaoCidadesIbge:
  Link = f"https://api.openweathermap.org/data/2.5/forecast?q={cid['NomeCidade']},{StateCod},{CountryCod}&appid={MyAPIKey}&lang=pt_br"
  Requisicao = requests.get(Link)
  ForecastsJson = Requisicao.json()
  if ForecastsJson['cod'] == '200':
    Cidade = (ForecastsJson['city']['name'])
    CodigoDaCidade = (ForecastsJson['city']['id'])
    Pais = (ForecastsJson['city']['country'])
    Latitude = (ForecastsJson['city']['coord']['lat'])
    Longitude = (ForecastsJson['city']['coord']['lon'])
    NascerDoSol = (ForecastsJson['city']['sunrise'])
    PorDoSol = (ForecastsJson['city']['sunset'])
    Populacao = (ForecastsJson['city']['population'])
    for CidDesc in ForecastsJson['list']:
      Data = (CidDesc['dt_txt']) #Data
      TemperaturaMaxima = (CidDesc['main']['temp_max'] -273.15)
      TemperaturaMinima = (CidDesc['main']['temp_min'] -273.15)
      SensacaoTermica = (CidDesc['main']['feels_like'] -273.15)
      CondicaoDoTempo = (CidDesc['weather'][0]['description'])
      VelocidadeDoVento = (CidDesc['wind']['speed'])

      #Inserindo os dados da API no DataFrame de Previsões do Tempo.
      DataFramePrevTempo = DataFramePrevTempo.append({'Cidade' : Cidade,
                                                      'CodigoDaCidade' : CodigoDaCidade,
                                                      'Pais' : Pais,
                                                      'Latitude' : Latitude,
                                                      'Longitude' : Longitude,
                                                      'NascerDoSol' : NascerDoSol,
                                                      'PorDoSol' : PorDoSol,
                                                      'Populacao' : Populacao,
                                                      'Data' : Data,
                                                      'TemperaturaMaxima' : TemperaturaMaxima,
                                                      'TemperaturaMinima' : TemperaturaMinima,
                                                      'SensacaoTermica' : SensacaoTermica,
                                                      'CondicaoDoTempo' : CondicaoDoTempo,
                                                      'VelocidadeDoVento' : VelocidadeDoVento},
                                                      ignore_index = True)

#Criando a TempView de Previsões do Tempo.
DataFramePrevTempo.createOrReplaceTempView("previsoes_tempo")

#Efetuando uma consulta na TempView...
spark.sql('select * from previsoes_tempo').show()

In [None]:
#Criando e tratando os dados do DataFrame de Previsões do Tempo.
QueryPrevTempo = """
  select
    p.Cidade as Cidade,
    p.CodigoDaCidade as CodigoDaCidade,
    p.Data as Data,
    'Sudeste' as Regiao,
    p.Pais as Pais,
    p.Latitude as Latitude,
    p.Longitude as Longigute,
    p.TemperaturaMaxima as TemperaturaMaxima,
    p.TemperaturaMinima as TemperaturaMinima,
    (p.TemperaturaMaxima + p.TemperaturaMinima)/2 as TemperaturaMedia,
    p.SensacaoTermica as SensacaoTermica,
    (case when p.CondicaoDoTempo like '%chuva%' then 'Sim' else 'Não' end) as VaiChover,
    p.CondicaoDoTempo as CondicaoDoTempo,
    p.NascerDoSol as NascerDoSol,
    p.PorDoSol as PorDoSol,
    p.VelocidadeDoVento as VelocidadeMaximaDoVento,
    p.Populacao as Populacao
  from previsoes_tempo p
"""
DataFramePrevTempo = spark.sql(QueryPrevTempo)

#Criando a TempView de Previsões do Tempo.
DataFramePrevTempo.createOrReplaceTempView("previsoes_tempo")

#Efetuando uma consulta na TempView...
spark.sql('select * from previsoes_tempo').show()

In [None]:
#Criando o DataFrame do KPI de Previsões do Tempo.
QueryKpiPrevTempo = """
  select
    prev.Cidade,
    count(case when prev.VaiChover = 'Sim' then 1 end) as QtdPeriodosChuvosos,
    count(*) as QtdPeriodosApurados
  from previsoes_tempo prev
  group by prev.Cidade
"""
DataFrameKpiPrevTempo = spark.sql(QueryKpiPrevTempo)

#Criando a TempView do KPI de Previsões do Tempo.
DataFrameKpiPrevTempo.createOrReplaceTempView("kpi_previsoes_tempo")

#Efetuando uma consulta na TempView...
spark.sql('select * from kpi_previsoes_tempo').show()

In [25]:
#Exportando os arquivos (JSON e CSV).

#Criando o arquivo JSON c/ o retorno da API do IBGE.
with open('DadosIbge.json', 'w') as json_file:
  json.dump(JsonIBGE, json_file, indent=4)

#Criando o arquivo CSV das cidades da API do IBGE.
DataFrameCidadesApiIbge.write.option('header', True).csv("/content/CidadesIbge")

#Criando o arquivo CSV das previsões do tempo da API "openweathermap".
DataFramePrevTempo.write.option('header', True).csv("/content/PrevisaoTempo")

#Criando o arquivo CSV dos KPI's das previsçoes do tempo.
DataFrameKpiPrevTempo.write.option('header', True).csv("/content/KpiPrevisaoTempo")