In [8]:
import findspark
findspark.init()

import pandas as pd
import os
import requests
import json
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql.functions import udf, lit, array, from_json, explode, get_json_object, repeat, col, expr, row_number
from pyspark.sql.window import Window
from datetime import datetime, timedelta
import time
from itertools import *

In [None]:
spark = SparkSession.builder \
      .master("local[1]") \
      .appName("SparkByExamples.com") \
      .getOrCreate()

In [None]:
# Buscar cidades do Vale do Paraíba
response = requests.get("https://servicodados.ibge.gov.br/api/v1/localidades/mesorregioes/3513/municipios")
cities_data = json.loads(response.text)

# Criar data frame com as cidades
cities_df = spark.createDataFrame(cities_data)
cities_df.createOrReplaceTempView("cities")
cities_df = cities_df.where("""nome != 'São Luiz do Paraitinga'""")

In [31]:
# Criar view com as cidades
df_cidade= cities_df.select('nome','id')
df_cidade = df_cidade.withColumn("regiao", F.lit("Vale do Paraiba"))

In [11]:
# Buscar previsão do tempo para as cidades
forecasts = []
chave_aux_cidade = []
chave_aux_forecast = []

for index, city in enumerate(cities_data):
    city_name = city["nome"]
    response = requests.get(f"https://api.hgbrasil.com/weather?array_limit=5&fields=only_results,city_name,forecast,date,max,min,rain_probability,condition,sunrise,sunset,wind_speedy&key=f056459c&city_name={city_name},SP")
    forecast_data = json.loads(response.text)
    chave_forecast = response.json()['forecast']
    chave_cidade = response.json()['city_name']
    chave_aux_forecast = chave_aux_forecast + [chave_forecast]
    chave_aux_cidade = chave_aux_cidade + [chave_cidade]
    forecasts.append(forecast_data)
    chave_aux_cidade.append(chave_cidade)

In [14]:
# Criar data frame com as previsões
df_clima = spark.createDataFrame(forecasts)
sun = df_clima.select('city_name','sunrise', 'sunset')

schema = StructType([
    StructField("nome", StringType())
])

rdd = spark.sparkContext.parallelize([])

cidades = []
df = spark.createDataFrame(rdd, schema)

for index, values in enumerate(chave_aux_forecast):
    cidades = chave_aux_forecast[index]
    nome_cidade = []
    day_one = cidades[0]
    day_two = cidades[1]
    day_three = cidades[2]
    day_four = cidades[3]
    day_five = cidades[4]
    df1_clima = spark.createDataFrame([day_one,day_two,day_three,day_four,day_five])
    df = df1_clima.unionByName(df, allowMissingColumns = True).drop('nome')

In [16]:
# Criar view com as previsões
df1_clima = df.select('date','max','min','rain_probability','wind_speedy','condition')
chave_cidade = df_clima.select('city_name')

In [18]:
w= Window.partitionBy(['date']).orderBy(col('date').desc())
teste = df.withColumn('row',row_number().over(w))

In [19]:
teste_ = teste.withColumn(
    "cidade",
    F.when(F.col("row") == "1", F.lit("Aparecida"))
    .when(F.col("row") == "2", F.lit("Arapeí"))
    .when(F.col("row") == "3", F.lit("Areias"))
    .when(F.col("row") == "4", F.lit("Bananal"))
    .when(F.col("row") == "5", F.lit("Caçapava"))
    .when(F.col("row") == "6", F.lit("Cachoeira Paulista"))
    .when(F.col("row") == "7", F.lit("Campos do Jordão"))
    .when(F.col("row") == "8", F.lit("Canas"))
    .when(F.col("row") == "9", F.lit("Caraguatatuba"))
    .when(F.col("row") == "10", F.lit("Cruzeiro"))
    .when(F.col("row") == "11", F.lit("Cunha"))
    .when(F.col("row") == "12", F.lit("Guaratinguetá"))
    .when(F.col("row") == "13", F.lit("Igaratá"))
    .when(F.col("row") == "14", F.lit("Ilhabela"))
    .when(F.col("row") == "15", F.lit("Jacareí"))
    .when(F.col("row") == "16", F.lit("Jambeiro"))
    .when(F.col("row") == "17", F.lit("Lagoinha"))
    .when(F.col("row") == "18", F.lit("Lavrinhas"))
    .when(F.col("row") == "19", F.lit("Lorena"))
    .when(F.col("row") == "20", F.lit("Monteiro Lobato"))
    .when(F.col("row") == "21", F.lit("Natividade da Serra"))
    .when(F.col("row") == "22", F.lit("Paraibuna"))
    .when(F.col("row") == "23", F.lit("Pindamonhangaba"))
    .when(F.col("row") == "24", F.lit("Piquete"))
    .when(F.col("row") == "25", F.lit("Potim"))
    .when(F.col("row") == "26", F.lit("Queluz"))
    .when(F.col("row") == "27", F.lit("Redenção da Serra"))
    .when(F.col("row") == "28", F.lit("Roseira"))
    .when(F.col("row") == "29", F.lit("Santa Branca"))
    .when(F.col("row") == "30", F.lit("Santo Antônio do Pinhal"))
    .when(F.col("row") == "31", F.lit("São Bento do Sapucaí"))
    .when(F.col("row") == "32", F.lit("São José do Barreiro"))
    .when(F.col("row") == "33", F.lit("São José dos Campos"))
    .when(F.col("row") == "34", F.lit("São Sebastião"))
    .when(F.col("row") == "35", F.lit("Silveiras"))
    .when(F.col("row") == "36", F.lit("Taubaté"))
    .when(F.col("row") == "37", F.lit("Tremembé"))
    .when(F.col("row") == "38", F.lit("Ubatuba")),
)


In [20]:
teste_ = teste_.drop('row')

In [21]:
join_condition = [(sun.city_name == teste_.cidade)] 

In [22]:
df_final = teste_.join(sun, join_condition, how='left').drop('city_name').dropna()

In [23]:
join_condition = [(df_cidade.nome == df_final.cidade)] 

In [24]:
df_final_ = df_cidade.join(df_final, join_condition, how='left').drop('nome')

In [25]:
df_final_ = df_final_.withColumn("Pais", F.lit("Brasil"))
df_final_ = df_final_.withColumn("VaiChover", F.when(F.col('condition')=='rain', F.lit('Sim')).otherwise('Nao'))
df_final_ = df_final_.withColumnRenamed("cidade", "Cidade")
df_final_ = df_final_.withColumnRenamed("id", "CodigoDaCidade")
df_final_ = df_final_.withColumnRenamed("date ", "Data")
df_final_ = df_final_.withColumnRenamed("regiao", "Regiao")
df_final_ = df_final_.withColumnRenamed("max", "TemperaturaMaxima")
df_final_ = df_final_.withColumnRenamed("min", "TemperaturaMinima")
df_final_ = df_final_.withColumnRenamed("rain_probability", "ChanceDeChuva")
df_final_ = df_final_.withColumnRenamed("condition", "CondicaoDoTempo")
df_final_ = df_final_.withColumnRenamed("sunrise", "NascerDoSol")
df_final_ = df_final_.withColumnRenamed("sunset", "PorDoSol")
df_final_ = df_final_.withColumnRenamed("wind_speedy", "VelocidadeMaximaDoVento")

In [26]:
# Criar DF da Tabela 1
df_tabela1 = df_final_.select('Cidade','CodigoDaCidade','date','Regiao','Pais','TemperaturaMaxima','TemperaturaMinima','VaiChover','ChanceDeChuva','CondicaoDoTempo','NascerDoSol','PorDoSol','VelocidadeMaximaDoVento')

In [27]:
path = os.getcwd()

In [28]:
df_tabela1 = df_tabela1.toPandas()

In [30]:
df_tabela1.to_csv(f'{path}/Tabela.csv',sep=';',index= False)

In [None]:
# Impedimento para fazer a tabela 2

""" 
O enunciado da tabela 2 não se fez entendível, em uma situação de trabalho no dia a dia eu consultaria o cliente para 
levantar e apurar os requisitos.
"""