# 🎯 **Objetivo do Notebook**

Este notebook tem como objetivo demonstrar a construção de um **Stream Pipeline** utilizando o **Apache Spark Streaming** no ambiente do **Databricks**. O pipeline será desenvolvido com foco em:

1. **Ingestão de dados em tempo real**: Utilizaremos a **OpenWeatherMap API** como fonte de dados, capturando dados meteorológicos em tempo real.
2. **Filtragem dos dados**: Aplicaremos um filtro para selecionar apenas os dados relevantes, como temperaturas acima de um determinado valor.
3. **Agregação dos dados**: Utilizaremos uma função de janela (*window*) para agregar os dados em intervalos de tempo definidos.
4. **Output dos dados**: Os dados processados serão salvos em formato **Parquet**, um formato colunar otimizado para big data.
5. **Deploy do pipeline**: Ao final, discutiremos como fazer o deploy desse pipeline em um ambiente de produção.

# 🛠️ **Configuração do Ambiente**

Vamos configurar o ambiente para utilizar o **Apache Spark**.

In [0]:
pip install pyspark

Python interpreter will be restarted.
Python interpreter will be restarted.


Imports

In [0]:
import requests
import time
import json

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr, avg, countDistinct, when, max, last, row_number
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, ArrayType
from pyspark.sql.window import Window

# 🛠️ **Configuração da OpenWeatherMap API**

Neste passo, vamos configurar a **OpenWeatherMap API** para capturar dados meteorológicos em tempo real.

In [0]:
spark = SparkSession.builder \
    .appName("SparkStreamingExample") \
    .getOrCreate()

In [0]:
# 🔑 Chave da API OpenWeatherMap
API_KEY = "585b0ba17238ebcef6c63abdabbaf94f"

# 📍 Lista de cidades do Brasil
cidades = [
    "Belem", "Belo Horizonte", "Brasilia", "Curitiba", "Fortaleza",
    "Manaus", "Porto Alegre", "Recife", "Rio de Janeiro", "Salvador", "Sao Paulo"
]

# 🌍 País
pais = "BR"

# 🔄 Fazer requisições para cada cidade
for cidade in cidades:
    url = f"http://api.openweathermap.org/data/2.5/weather?q={cidade},{pais}&appid={API_KEY}&units=metric"

    # 🚀 Fazer a requisição
    response = requests.get(url)
    dados = response.json()

    # 📊 Exibir os dados
    if dados["cod"] == 200:
        print(f"🌍 Clima em {cidade}:")
        print(f"🌡️ Temperatura: {dados['main']['temp']}°C")
        print(f"🌤️ Condição: {dados['weather'][0]['description']}")
    else:
        print(f"❌ Erro ao buscar dados para: {cidade}")

    # ⏳ Aguardar 1 segundo para evitar sobrecarga na API
    time.sleep(1)


🌍 Clima em Belem:
🌡️ Temperatura: 29.02°C
🌤️ Condição: scattered clouds
🌍 Clima em Belo Horizonte:
🌡️ Temperatura: 30.44°C
🌤️ Condição: clear sky
🌍 Clima em Brasilia:
🌡️ Temperatura: 30.51°C
🌤️ Condição: clear sky
🌍 Clima em Curitiba:
🌡️ Temperatura: 24.5°C
🌤️ Condição: scattered clouds
🌍 Clima em Fortaleza:
🌡️ Temperatura: 28.07°C
🌤️ Condição: few clouds
🌍 Clima em Manaus:
🌡️ Temperatura: 26.27°C
🌤️ Condição: heavy intensity rain
🌍 Clima em Porto Alegre:
🌡️ Temperatura: 30.53°C
🌤️ Condição: few clouds
🌍 Clima em Recife:
🌡️ Temperatura: 29.02°C
🌤️ Condição: scattered clouds
🌍 Clima em Rio de Janeiro:
🌡️ Temperatura: 29.08°C
🌤️ Condição: few clouds
🌍 Clima em Salvador:
🌡️ Temperatura: 28.98°C
🌤️ Condição: clear sky
🌍 Clima em Sao Paulo:
🌡️ Temperatura: 25.72°C
🌤️ Condição: scattered clouds



# 📌 Criar um Diretório para Armazenar os Arquivos JSON

Neste passo, criamos um diretório (`/FileStore/weather_data`) onde vamos armazenar os arquivos JSON que simulam um fluxo contínuo de dados meteorológicos.  
Este diretório será utilizado como fonte de dados para o Spark Streaming.


In [0]:
# 📂 Caminho fixo para salvar os arquivos JSON no FileStore
input_dir = "/FileStore/weather_data"

# 🛠 Criar o diretório se não existir
dbutils.fs.mkdirs(input_dir)

print(f"📁 Diretório criado: {input_dir}")

def salvar_dados_json(cidade):
    # 📡 URL da API para a cidade atual
    url = f"http://api.openweathermap.org/data/2.5/weather?q={cidade},BR&appid={API_KEY}&units=metric"
    
    # 🌍 Consulta a API OpenWeatherMap
    response = requests.get(url)
    dados = response.json()

    if dados["cod"] == 200:
        # 📄 Define o nome do arquivo com a cidade + timestamp
        file_path = f"{input_dir}/weather_{cidade.replace(' ', '_')}_{int(time.time())}.json"

        # 💾 Salva os dados no formato JSON usando dbutils.fs.put()
        dbutils.fs.put(file_path, json.dumps(dados), overwrite=False)

        print(f"✅ Dados salvos: {file_path}")
    else:
        print(f"❌ Cidade não encontrada: {cidade}")

# 🔄 Gerar arquivos JSON para todas as cidades
for cidade in cidades:
    salvar_dados_json(cidade)
    time.sleep(1)  # ⏳ Pequeno intervalo para evitar sobrecarga na API

# 🔍 Verificar se os arquivos foram salvos
print("📂 Arquivos JSON na pasta:")
dbutils.fs.ls(input_dir)


📁 Diretório criado: /FileStore/weather_data
Wrote 577 bytes.
✅ Dados salvos: /FileStore/weather_data/weather_Belem_1740171583.json
Wrote 571 bytes.
✅ Dados salvos: /FileStore/weather_data/weather_Belo_Horizonte_1740171584.json
Wrote 571 bytes.
✅ Dados salvos: /FileStore/weather_data/weather_Brasilia_1740171585.json
Wrote 574 bytes.
✅ Dados salvos: /FileStore/weather_data/weather_Curitiba_1740171586.json
Wrote 569 bytes.
✅ Dados salvos: /FileStore/weather_data/weather_Fortaleza_1740171587.json
Wrote 596 bytes.
✅ Dados salvos: /FileStore/weather_data/weather_Manaus_1740171588.json
Wrote 575 bytes.
✅ Dados salvos: /FileStore/weather_data/weather_Porto_Alegre_1740171589.json
Wrote 572 bytes.
✅ Dados salvos: /FileStore/weather_data/weather_Recife_1740171590.json
Wrote 573 bytes.
✅ Dados salvos: /FileStore/weather_data/weather_Rio_de_Janeiro_1740171592.json
Wrote 581 bytes.
✅ Dados salvos: /FileStore/weather_data/weather_Salvador_1740171593.json
Wrote 580 bytes.
✅ Dados salvos: /FileStore/we


# 📌 Ler os Arquivos JSON com Spark Streaming

Agora que temos um fluxo contínuo de arquivos JSON sendo gerado no diretório `/FileStore/weather_data`, vamos configurar o **Spark Structured Streaming** para:

- Monitorar esse diretório em tempo real.
- Ler os arquivos JSON assim que forem criados.
- Converter os dados para um **DataFrame de Streaming**.

Isso nos permitirá processar os dados dinamicamente à medida que novos arquivos forem adicionados. 🚀


In [0]:
# 📌 Definir o esquema corrigido do JSON
schema = StructType([
    StructField("name", StringType(), True),  # Nome da cidade
    StructField("main", StructType([
        StructField("temp", DoubleType(), True)  # Temperatura em Celsius
    ]), True),
    StructField("weather", ArrayType(StructType([  # Ajustado para ser uma lista de Structs
        StructField("description", StringType(), True)
    ])), True)
])

# 📂 Caminho do diretório dos arquivos JSON
input_dir = "/FileStore/weather_data"

# 📝 Ler os arquivos JSON como DataFrame
df = spark.read.schema(schema).json(input_dir)

# 📌 Extrair corretamente a descrição do clima (primeiro item do array)
df = df.withColumn("weather_description", expr("weather[0].description")).drop("weather")

# 📊 Exibir os dados processados
print("📊 Dados processados:")
df.show(truncate=False)


📊 Dados processados:
+--------------+-------+--------------------+
|name          |main   |weather_description |
+--------------+-------+--------------------+
|Manaus        |{26.27}|heavy intensity rain|
|Manaus        |{26.27}|heavy intensity rain|
|Salvador      |{28.98}|clear sky           |
|Salvador      |{28.98}|clear sky           |
|São Paulo     |{25.89}|scattered clouds    |
|São Paulo     |{25.72}|scattered clouds    |
|Belém         |{29.02}|scattered clouds    |
|Belém         |{29.02}|scattered clouds    |
|Curitiba      |{25.03}|scattered clouds    |
|Porto Alegre  |{30.93}|few clouds          |
|Porto Alegre  |{30.53}|few clouds          |
|Curitiba      |{24.5} |scattered clouds    |
|Rio de Janeiro|{29.08}|few clouds          |
|Rio de Janeiro|{29.08}|few clouds          |
|Recife        |{29.02}|scattered clouds    |
|Recife        |{29.02}|scattered clouds    |
|Belo Horizonte|{30.44}|clear sky           |
|Belo Horizonte|{30.44}|clear sky           |
|Brasília    


# 📌 Aplicar Filtros e Agregações
Agora que os dados estão sendo processados, vamos:
- **Filtrar apenas temperaturas acima de 25°C**.
- **Calcular a média de temperatura das últimas medições**.
- **Salvar o resultado final no formato Parquet**.

Essas transformações permitirão analisar melhor os dados capturados. 🚀


In [0]:
# 🔹 **Aplicar um filtro**: Somente cidades com temperatura acima de 25°C
df_filtrado = df.filter(col("main.temp") > 25)

# 📊 Mostrar os dados após o filtro
print("📊 Dados após filtro (temperatura > 25°C):")
df_filtrado.show(truncate=False)

# 🔹 **Aplicar uma agregação**: Calcular a média da temperatura por cidade
df_aggregado = df_filtrado.groupBy("name").agg(avg(col("main.temp")).alias("avg_temp"))

# 📊 Mostrar os dados agregados
print("📊 Temperatura média por cidade:")
df_aggregado.show(truncate=False)


📊 Dados após filtro (temperatura > 25°C):
+--------------+-------+--------------------+
|name          |main   |weather_description |
+--------------+-------+--------------------+
|Manaus        |{26.27}|heavy intensity rain|
|Manaus        |{26.27}|heavy intensity rain|
|Salvador      |{28.98}|clear sky           |
|Salvador      |{28.98}|clear sky           |
|São Paulo     |{25.89}|scattered clouds    |
|São Paulo     |{25.72}|scattered clouds    |
|Belém         |{29.02}|scattered clouds    |
|Belém         |{29.02}|scattered clouds    |
|Curitiba      |{25.03}|scattered clouds    |
|Porto Alegre  |{30.93}|few clouds          |
|Porto Alegre  |{30.53}|few clouds          |
|Rio de Janeiro|{29.08}|few clouds          |
|Rio de Janeiro|{29.08}|few clouds          |
|Recife        |{29.02}|scattered clouds    |
|Recife        |{29.02}|scattered clouds    |
|Belo Horizonte|{30.44}|clear sky           |
|Belo Horizonte|{30.44}|clear sky           |
|Brasília      |{30.51}|clear sky     


# 📌 Classificação da Temperatura
Agora, vamos **criar uma nova coluna** chamada **`temperature_category`**, que classifica as temperaturas em categorias com base na seguinte lógica:

| Temperatura (`temp`) | Categoria              |
|----------------------|------------------------|
| Menos de 15°C       | 🧊 **Frio**            |
| Entre 15°C e 25°C   | 🌤️ **Agradável**      |
| Acima de 25°C       | 🔥 **Quente**          |

Isso ajudará a entender melhor a distribuição das temperaturas e facilitará futuras análises.


In [0]:
# 🔹 **Criar uma nova coluna 'temperature_category'**
df_categorizado = df_filtrado.withColumn(
    "temperature_category",
    when(col("main.temp") < 15, "Frio")
    .when((col("main.temp") >= 15) & (col("main.temp") <= 25), "Agradável")
    .otherwise("Quente")
)

# 📊 Exibir os dados com a nova categoria de temperatura
print("📊 Dados categorizados:")
df_categorizado.show(truncate=False)


📊 Dados categorizados:
+--------------+-------+--------------------+--------------------+
|name          |main   |weather_description |temperature_category|
+--------------+-------+--------------------+--------------------+
|Manaus        |{26.27}|heavy intensity rain|Quente              |
|Manaus        |{26.27}|heavy intensity rain|Quente              |
|Salvador      |{28.98}|clear sky           |Quente              |
|Salvador      |{28.98}|clear sky           |Quente              |
|São Paulo     |{25.89}|scattered clouds    |Quente              |
|São Paulo     |{25.72}|scattered clouds    |Quente              |
|Belém         |{29.02}|scattered clouds    |Quente              |
|Belém         |{29.02}|scattered clouds    |Quente              |
|Curitiba      |{25.03}|scattered clouds    |Quente              |
|Porto Alegre  |{30.93}|few clouds          |Quente              |
|Porto Alegre  |{30.53}|few clouds          |Quente              |
|Rio de Janeiro|{29.08}|few clouds     


# 📌 Contagem de Cidades por Categoria de Temperatura
Agora que categorizamos as temperaturas, vamos **contar quantas cidades pertencem a cada categoria** (`Frio`, `Agradável`, `Quente`).

Isso nos permite entender **como a temperatura está distribuída entre as cidades** e identificar tendências climáticas.


In [0]:
# 🔹 **Contar cidades distintas em cada categoria de temperatura**
df_contagem = df_categorizado.groupBy("temperature_category").agg(countDistinct("name").alias("total_cities"))

# 📊 Exibir os resultados ordenados por categoria
print("📊 Quantidade de cidades distintas por categoria de temperatura:")
df_contagem.show(truncate=False)


📊 Quantidade de cidades distintas por categoria de temperatura:
+--------------------+------------+
|temperature_category|total_cities|
+--------------------+------------+
|Quente              |11          |
+--------------------+------------+




# 📌 Ranking das Cidades Mais Quentes
Agora, vamos criar um **ranking das cidades com as temperaturas mais altas**, ordenando os dados da **maior para a menor temperatura**.

Isso nos ajudará a identificar **quais cidades estão enfrentando as temperaturas mais elevadas** no momento da análise.


In [0]:
# 🔹 **Selecionar a maior temperatura registrada para cada cidade**
df_ranking = df_categorizado.groupBy("name").agg(
    max("main.temp").alias("max_temp")  # Pegando a maior temperatura registrada por cidade
)

# 🔹 **Criar a posição no ranking**
window_spec = Window.orderBy(col("max_temp").desc())
df_ranking = df_ranking.withColumn("ranking", row_number().over(window_spec))

# 📊 Exibir o Top 5 das cidades mais quentes
print("📊 Top 5 Cidades Mais Quentes:")
df_ranking.select("ranking", "name", "max_temp").show(5, truncate=False)


📊 Top 5 Cidades Mais Quentes:
+-------+--------------+--------+
|ranking|name          |max_temp|
+-------+--------------+--------+
|1      |Porto Alegre  |30.93   |
|2      |Brasília      |30.51   |
|3      |Belo Horizonte|30.44   |
|4      |Rio de Janeiro|29.08   |
|5      |Fortaleza     |29.07   |
+-------+--------------+--------+
only showing top 5 rows




# 📌 Cálculo da Média Móvel da Temperatura
Agora, vamos calcular uma **média móvel da temperatura** para cada cidade, considerando os últimos **3 registros**.

Isso ajudará a suavizar variações e entender **a tendência de temperatura ao longo do tempo**.


In [0]:
# 🔹 Criar um ID incremental para simular timestamps diferentes
df_temporal = df_categorizado.withColumn("timestamp", expr("current_timestamp() + interval 10 seconds * (monotonically_increasing_id() % 5)"))

# 🔹 Criar uma janela de tempo para calcular a média móvel considerando os últimos 3 registros
window_spec = Window.partitionBy("name").orderBy(col("timestamp")).rowsBetween(-2, 0)

# 🔹 Calcular a média móvel da temperatura
df_media_movel = df_temporal.withColumn("avg_temp_3_records", avg("main.temp").over(window_spec))

# 🔹 Selecionar apenas a última média móvel calculada para cada cidade
df_final = df_media_movel.groupBy("name").agg(last("avg_temp_3_records").alias("avg_temp_3_records"))

# 📊 Exibir o resultado final (um único registro por cidade)
print("📊 Média Móvel Final por Cidade:")
df_final.show(truncate=False)


📊 Média Móvel Final por Cidade:
+--------------+------------------+
|name          |avg_temp_3_records|
+--------------+------------------+
|Belo Horizonte|30.44             |
|Belém         |29.02             |
|Brasília      |30.51             |
|Curitiba      |25.03             |
|Fortaleza     |28.57             |
|Manaus        |26.27             |
|Porto Alegre  |30.73             |
|Recife        |29.02             |
|Rio de Janeiro|29.08             |
|Salvador      |28.98             |
|São Paulo     |25.805            |
+--------------+------------------+




# 📌 Diferença entre Temperatura Atual e Média Móvel
Agora, vamos calcular a **diferença entre a temperatura mais recente de cada cidade e a média móvel**.

Isso nos ajudará a entender **se a temperatura está subindo ou caindo** em relação à tendência recente.



In [0]:
# 🔹 Pegar a temperatura mais recente para cada cidade
df_temp_recente = df_temporal.groupBy("name").agg(
    max("main.temp").alias("latest_temp")  # Última temperatura registrada
)

# 🔹 Juntar com a média móvel final
df_com_diferenca = df_temp_recente.join(df_final, "name", "inner") \
    .withColumn("temp_variation", col("latest_temp") - col("avg_temp_3_records"))

# 📊 Exibir os resultados finais
print("📊 Diferença entre Temperatura Atual e Média Móvel:")
df_com_diferenca.select("name", "latest_temp", "avg_temp_3_records", "temp_variation").show(truncate=False)


📊 Diferença entre Temperatura Atual e Média Móvel:
+--------------+-----------+------------------+-------------------+
|name          |latest_temp|avg_temp_3_records|temp_variation     |
+--------------+-----------+------------------+-------------------+
|Belo Horizonte|30.44      |30.44             |0.0                |
|Belém         |29.02      |29.02             |0.0                |
|Brasília      |30.51      |30.51             |0.0                |
|Curitiba      |25.03      |25.03             |0.0                |
|Fortaleza     |29.07      |28.57             |0.5                |
|Manaus        |26.27      |26.27             |0.0                |
|Porto Alegre  |30.93      |30.73             |0.1999999999999993 |
|Recife        |29.02      |29.02             |0.0                |
|Rio de Janeiro|29.08      |29.08             |0.0                |
|Salvador      |28.98      |28.98             |0.0                |
|São Paulo     |25.89      |25.805            |0.0850000000000008


# 📌 Passo Final: Exportação dos Dados Processados
Agora, vamos **salvar os dados finais** para que possam ser usados posteriormente.  
Os arquivos serão exportados nos seguintes formatos:
- 🗂 **Parquet** (`.parquet`) → Compactado e otimizado para Big Data
- 📄 **CSV** (`.csv`) → Fácil de visualizar e compartilhar
- 📜 **JSON** (`.json`) → Bom para integração com APIs

Isso garante que os dados estejam prontos para serem consumidos por outras aplicações e análises futuras.


In [0]:
# 📂 Diretório onde os arquivos serão salvos
output_path_parquet = "dbfs:/FileStore/weather_final.parquet"
output_path_csv = "dbfs:/FileStore/weather_final.csv"
output_path_json = "dbfs:/FileStore/weather_final.json"

# 🗑️ Deletar arquivos antigos antes de salvar os novos
dbutils.fs.rm(output_path_parquet, recurse=True)
dbutils.fs.rm(output_path_csv, recurse=True)
dbutils.fs.rm(output_path_json, recurse=True)

# 💾 Salvar os dados finais em Parquet
df_com_diferenca.write.mode("overwrite").parquet(output_path_parquet)
print(f"✅ Dados salvos em Parquet: {output_path_parquet}")

# 💾 Salvar os dados finais em CSV
df_com_diferenca.write.mode("overwrite").option("header", "true").csv(output_path_csv)
print(f"✅ Dados salvos em CSV: {output_path_csv}")

# 💾 Salvar os dados finais em JSON
df_com_diferenca.write.mode("overwrite").json(output_path_json)
print(f"✅ Dados salvos em JSON: {output_path_json}")


✅ Dados salvos em Parquet: dbfs:/FileStore/weather_final.parquet
✅ Dados salvos em CSV: dbfs:/FileStore/weather_final.csv
✅ Dados salvos em JSON: dbfs:/FileStore/weather_final.json



# 📌 Deploy do Pipeline no Databricks
Agora, vamos configurar a **execução automática** do pipeline, garantindo que os dados sejam processados e salvos sem intervenção manual.

### **📌 Como funciona?**
- Criamos um **Job no Databricks** para rodar automaticamente.
- Configuramos um **agendamento** (a cada 1 hora).
- O Job roda nosso **Notebook de Pipeline**, garantindo que os dados estejam sempre atualizados.

Isso permite **automatizar o processamento**, tornando o pipeline **mais eficiente e escalável**! 🚀
