In [1]:
import findspark
findspark.init()

import requests as r
import json
import unidecode

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, LongType
from pyspark.sql.functions import col, lit, when, count
from pyspark import SparkContext

spark = SparkSession.builder \
      .master("spark://spark-master:7077") \
      .config("spark.executor.memory", "512m") \
      .appName("Previsão do tempo") \
      .getOrCreate()

sc = SparkContext.getOrCreate(spark)

22/10/21 11:08:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [2]:
# Buscar cidades do Vale do Paraíba
# TODO

cities_url = "https://servicodados.ibge.gov.br/api/v1/localidades/mesorregioes/3513/municipios"

cities_req = r.get(cities_url)
cities_json = cities_req.text

cities_temp = spark.read.json(sc.parallelize([cities_json]))
cities_temp = cities_temp.withColumnRenamed("nome", "Cidade")

# Extraindo latitude e longitude para consulta de api
# utilizando dataset disponível em
# https://github.com/kelvins/Municipios-Brasileiros/tree/main/csv
# Coordenadas verificadas co google maps

lat_lon_temp = spark.read.csv("./municipios.csv", header=True)


                                                                                

In [3]:
# Criar data frame com as cidades
# TODO

cities_join = cities_temp.join(lat_lon_temp,cities_temp["id"] == lat_lon_temp["codigo_ibge"],"inner")


# Criar view com as cidades
# TODO

cities = cities_join.select(col("id").alias("CodigoDaCidade"), "Cidade", col("microrregiao.nome").alias("Regiao"), col("latitude").alias("Latitude"), col("longitude").alias("Longitude"))
cities.show()

[Stage 4:>                                                          (0 + 1) / 1]

+--------------+------------------+--------------------+--------+---------+
|CodigoDaCidade|            Cidade|              Regiao|Latitude|Longitude|
+--------------+------------------+--------------------+--------+---------+
|       3502507|         Aparecida|       Guaratinguetá|-22.8495| -45.2325|
|       3503158|            Arapeí|             Bananal|-22.6717| -44.4441|
|       3503505|            Areias|             Bananal|-22.5786| -44.6992|
|       3504909|           Bananal|             Bananal|-22.6819| -44.3281|
|       3508504|          Caçapava| São José dos Campos|-23.0992| -45.7076|
|       3508603|Cachoeira Paulista|       Guaratinguetá|-22.6665| -45.0154|
|       3509700|  Campos do Jordão|    Campos do Jordão|-22.7296| -45.5833|
|       3509957|             Canas|       Guaratinguetá|-22.7003| -45.0521|
|       3510500|     Caraguatatuba|       Caraguatatuba|-23.6125| -45.4125|
|       3513405|          Cruzeiro|       Guaratinguetá|-22.5728|  -44.969|
|       3513

                                                                                

In [4]:
# Buscar previsão do tempo para as cidades
# TODO

# Criando funções para chamar api para cada cidade
# e para limpar as strings


def clean_json(json: str):

    json = json.replace("\n", "")
    json = json.replace("\t", "")


    if json[14:24] == 'civillight':
        substring = json[0:66]
    else:
        substring = json[0:61]

    json = json.replace(substring, "")
    json = json[0:-2] +','

    return  json


# Forecast uct: u, ct >> umidade(chance de chuva), condição do tempo


def search_forecast_uct(lat: str, lon: str):

    forecast_url = f"http://www.7timer.info/bin/api.pl?lon={lon}&lat={lat}&product=civil&output=json"

    forecast_req = r.get(forecast_url)
    return clean_json(forecast_req.text)


# Forecast tcv: t, c, v>> temperatura, clima, vento


def search_forecast_tcv(lat: str, lon: str):

    forecast_url = f"http://www.7timer.info/bin/api.pl?lon={lon}&lat={lat}&product=civillight&output=json"

    forecast_req = r.get(forecast_url)
    return clean_json(forecast_req.text)


def mult_uct_json(lat: list, lon: list):

    temp_json = ""

    for i, (lat_val, lon_val) in enumerate(zip(lat, lon)):

        json_string = search_forecast_uct(lat_val, lon_val)
        
        temp_json = temp_json + json_string

    temp_json = temp_json[0:-1]

    definitive_json = "[" + temp_json + "]"
    return definitive_json


def mult_tcv_json(lat: list, lon: list):

    temp_json = ""

    for i, (lat_val, lon_val) in enumerate(zip(lat, lon)):

        json_string = search_forecast_tcv(lat_val, lon_val)

        temp_json = temp_json + json_string
    
    temp_json = temp_json[0:-1]

    definitive_json = "[" + temp_json + "]"
    return definitive_json



In [5]:
# Criar data frame com as previsões
# TODO

latitudes = cities.select("Latitude").rdd.flatMap(lambda x: x).collect()
longitudes = cities.select("Longitude").rdd.flatMap(lambda x: x).collect()

tcv_json = mult_tcv_json(latitudes, longitudes)
tcv_forecast = spark.read.json(sc.parallelize([tcv_json]))

uct_json = mult_uct_json(latitudes, longitudes)
uct_forecast = spark.read.json(sc.parallelize([uct_json]))


                                                                                

In [6]:
# O serviço da api que disponibilizará os dados uct
# entrega 8 dias(contando com o atual) de previsões
# enquanto os dados tcv tem apenas 7 dias

# Cada dia dos dados uct é dividido em 8 partes(timepoints)
# Criando função de timepoints para separar
# 7 dias contando com o atual do dataframe uct

def create_timepoints(time: int):
    
    # Parâmetro time representa a hora do dia em que será
    # disṕnibilizada a previsão
    
    values = []
    
    timepoint = time

    # Loop pega a hora inicial e pula de 24 em 24
    # para extrair os timepoints que representam o mesmo horário
    # em cada dia seguinte

    for i in range(7):
        values.append(timepoint)
        
        timepoint += 24
    
    return values



In [7]:
# Selecionando as previsões de 9 horas da manhã
# apenas para parametro
# de acordo com os timepoints, 00h(meia-noite) tem valor 24

morning_timepoints = create_timepoints(9)

uct_forecast = uct_forecast.select("*").where(uct_forecast.timepoint.isin(morning_timepoints))

In [8]:
# Criar view com as previsões
# TODO

tcv_forecast.show()

uct_forecast.show()

+--------+--------+---------+-----------+
|    date|  temp2m|  weather|wind10m_max|
+--------+--------+---------+-----------+
|20221021|[27, 17]|lightrain|          3|
|20221022|[26, 17]|  ishower|          2|
|20221023|[26, 16]|     rain|          2|
|20221024|[24, 16]|     rain|          2|
|20221025|[26, 16]|     rain|          2|
|20221026|[29, 15]|    clear|          2|
|20221027|[33, 15]|    clear|          2|
|20221021|[22, 15]|lightrain|          2|
|20221022|[23, 14]|  ishower|          2|
|20221023|[20, 12]|     rain|          2|
|20221024|[18, 13]|     rain|          2|
|20221025|[18, 12]|     rain|          2|
|20221026|[25, 11]|  pcloudy|          2|
|20221027|[27, 13]|    clear|          2|
|20221021|[27, 17]|lightrain|          3|
|20221022|[28, 17]|     rain|          2|
|20221023|[27, 16]|     rain|          3|
|20221024|[27, 16]|     rain|          2|
|20221025|[27, 16]|     rain|          3|
|20221026|[30, 15]|    clear|          3|
+--------+--------+---------+-----

In [9]:
# Extraindo colunas do dataframe cities

codigos = cities.select("CodigoDaCidade").rdd.flatMap(lambda x: x).collect()

cidades = cities.select("Cidade").rdd.flatMap(lambda x: x).collect()

regioes = cities.select("Regiao").rdd.flatMap(lambda x: x).collect()


                                                                                

In [10]:
# Criando função para replicar cada dado 7 vezes
# já que para cada registro no dataframe cities
# há 7 nos dataframes uct/tcv


def mult_cities_values(values: list):
    
    mult_values = []
    
    for item in values:

        for i in range(7):
            mult_values.append(item)
    
    return mult_values

In [11]:
# Replicando valores do dataframe cities

codigos_mult = mult_cities_values(codigos)

cidades_mult = mult_cities_values(cidades)

regioes_mult = mult_cities_values(regioes)

latitudes_mult = mult_cities_values(latitudes)

longitudes_mult = mult_cities_values(longitudes)


In [12]:
# Extraindo colunas do dataframe tcv

data = tcv_forecast.select("date").rdd.flatMap(lambda x: x).collect()

temps_max = tcv_forecast.select("temp2m.max").rdd.flatMap(lambda x: x).collect()
temps_min = tcv_forecast.select("temp2m.min").rdd.flatMap(lambda x: x).collect()

veloc_max_vento = tcv_forecast.select("wind10m_max").rdd.flatMap(lambda x: x).collect()

In [13]:
# Extraindo colunas do dataframe uct

umidade_mult = uct_forecast.select("rh2m").rdd.flatMap(lambda x: x).collect()

condicao_tempo = tcv_forecast.select("weather").rdd.flatMap(lambda x: x).collect()

In [14]:
# 273 loop criar list rows e append a outra list

rows = []

for i, (codigo, cidade, regiao, latitude, longitude,
        dia, temp_max, temp_min, veloc_vento,
        umidade, condicao) in \
        enumerate(zip(codigos_mult, cidades_mult, regioes_mult,
                      latitudes_mult, longitudes_mult, data,
                      temps_max, temps_min, veloc_max_vento, umidade_mult, condicao_tempo)):

    row = []
    
    row.append(codigo)
    row.append(cidade)
    row.append(regiao)
    row.append(latitude)
    row.append(longitude)
    row.append(dia)
    row.append(temp_max)
    row.append(temp_min)
    row.append(veloc_vento)
    row.append(umidade)
    row.append(condicao)

    rows.append(row)


In [15]:
# Criar DF da Tabela 1
# TODO

table1_schema = StructType([       
    StructField('CodigoDaCidade', LongType(), True),
    StructField('Cidade', StringType(), True),
    StructField('Regiao', StringType(), True),
    StructField('Latitude', StringType(), True),
    StructField('Longitude', StringType(), True),
    StructField('Data', LongType(), True),
    StructField('TemperaturaMaxima', LongType(), True),
    StructField('TemperaturaMinima', LongType(), True),
    StructField('VelocidadeMaximaDoVento', LongType(), True),
    StructField('ChanceDeChuva', StringType(), True),
    StructField('CondicaoDoTempo', StringType(), True)
])

table1 = spark.createDataFrame(data=rows, schema=table1_schema)

In [16]:
# Criando coluna Pais e Vai chover

table1 = table1.withColumn("VaiChover", when((table1.CondicaoDoTempo.contains("rain")), lit("Sim")).otherwise(lit("Nao")))

table1 = table1.withColumn("Pais", lit("Brasil"))


In [17]:
# VIEW TABELA 1

table1.sort("Cidade").show()

                                                                                

+--------------+---------+-------------+--------+---------+--------+-----------------+-----------------+-----------------------+-------------+---------------+---------+------+
|CodigoDaCidade|   Cidade|       Regiao|Latitude|Longitude|    Data|TemperaturaMaxima|TemperaturaMinima|VelocidadeMaximaDoVento|ChanceDeChuva|CondicaoDoTempo|VaiChover|  Pais|
+--------------+---------+-------------+--------+---------+--------+-----------------+-----------------+-----------------------+-------------+---------------+---------+------+
|       3502507|Aparecida|Guaratinguetá|-22.8495| -45.2325|20221026|               29|               15|                      2|          90%|          clear|      Nao|Brasil|
|       3502507|Aparecida|Guaratinguetá|-22.8495| -45.2325|20221023|               26|               16|                      2|          93%|           rain|      Sim|Brasil|
|       3502507|Aparecida|Guaratinguetá|-22.8495| -45.2325|20221027|               33|               15|                

In [18]:
# Criar DF da Tabela 2
# TODO


table2 = table1.select("Cidade", "VaiChover").sort("Cidade")

table2 = table2.groupBy("Cidade", "VaiChover").count().sort("Cidade")
table2 = table2.withColumnRenamed("count", "QtdDiasVaiChoverOuNao")

table2 = table2.withColumn("TotalDiasMapeados", lit(7))

table2.show()




+------------------+---------+---------------------+-----------------+
|            Cidade|VaiChover|QtdDiasVaiChoverOuNao|TotalDiasMapeados|
+------------------+---------+---------------------+-----------------+
|         Aparecida|      Nao|                    3|                7|
|         Aparecida|      Sim|                    4|                7|
|            Arapeí|      Sim|                    4|                7|
|            Arapeí|      Nao|                    3|                7|
|            Areias|      Sim|                    5|                7|
|            Areias|      Nao|                    2|                7|
|           Bananal|      Sim|                    4|                7|
|           Bananal|      Nao|                    3|                7|
|Cachoeira Paulista|      Sim|                    5|                7|
|Cachoeira Paulista|      Nao|                    2|                7|
|  Campos do Jordão|      Sim|                    5|                7|
|  Cam

                                                                                

In [20]:
# Exportar CSVs
# TODO

table1.repartition(1).write.option("header",True) \
 .csv("./tabela_1.csv")

table2.repartition(1).write.option("header",True) \
 .csv("./tabela_2.csv")


                                                                                