# **Instala Spark e Pub/Sub**

In [2]:
!pip install google-cloud-pubsub
!pip install google-cloud-storage
!pip install gcsfs




# **Define as Variavies de Ambiente**

In [3]:
import os

os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = r"iot-faker-spark-37582a9f1fac.json"

In [4]:
from pyspark.sql import SparkSession
import matplotlib.pyplot as plt
from pyspark.sql.functions import col, explode, avg, stddev, max, min, date_format, to_date
from pyspark.sql.types import *
import json
from datetime import datetime
from google.cloud import storage

In [5]:
csv_file = f'Data/{datetime.now()}.csv'

# **Realiza a Coleta, Transformação e Analise dos Dados IOT**


In [6]:
# Criar a sessão Spark

spark = SparkSession.builder.appName("IoTDataProcessing").\
config("spark.jars.packages", "com.google.cloud.bigdataoss:gcs-connector:hadoop3-2.2.5")\
.master("local").getOrCreate()


In [7]:
project_id = "" # Seu Projeto
subscription_id = "" # Sua  Inscriçâo 

# Configurar o caminho do bucket
bucket_name = "" # Seu Bucket


# Definir o esquema para 'data'
data_schema = StructType([
    StructField("temperature", IntegerType(), True),
    StructField("humidity", IntegerType(), True)
])

# Definir o esquema para 'device_data'
device_schema = StructType([
    StructField("device_id", StringType(), True),
    StructField("timestamp", StringType(), True),
    StructField("date", StringType(), True),
    StructField("status", StringType(), True),
    StructField("data", data_schema, True)
])

# Definir o esquema do JSON principal
main_schema = StructType([
    StructField("batch_id", StringType(), True),
    StructField("timestamp", StringType(), True),
    StructField("device_data", ArrayType(device_schema), True)
])


In [8]:

def upload_to_gcs(bucket_name, source_file_name, destination_blob_name):
    """Envia um arquivo para o bucket GCS especificado."""
    # Cria um cliente do GCS
    storage_client = storage.Client()
    
    # Obtém o bucket
    bucket = storage_client.bucket(bucket_name)
    
    # Cria um blob no bucket
    blob = bucket.blob(destination_blob_name)
    
    # Faz o upload do arquivo para o blob
    blob.upload_from_filename(source_file_name)
    
    print(f"Arquivo {source_file_name} enviado para {destination_blob_name}.")

In [9]:
def analyze_data(df):
    # Calcular estatísticas básicas

    print("Mostrando estatísticas básicas")

    stats = df.groupBy().agg(
        avg("temperature").alias("avg_temperature"),
        avg("humidity").alias("avg_humidity"),
        stddev("temperature").alias("stddev_temperature"),
        stddev("humidity").alias("stddev_humidity"),
        max("temperature").alias("max_temperature"),
        min("temperature").alias("min_temperature"),
        max("humidity").alias("max_humidity"),
        min("humidity").alias("min_humidity")
    )

    stats.show(truncate=False)

def analyze_time_series(df):
    # Adicionar uma coluna de data
    df_with_date = df.withColumn("date", to_date(col("device_timestamp"), "HH:mm:ss"))

    # Agrupar por data e calcular a média
        
    print("Mostrando Status do dia")

    daily_stats = df_with_date.groupBy("date").agg(
        avg("temperature").alias("avg_temperature"),
        avg("humidity").alias("avg_humidity")
    )

    daily_stats.show(truncate=False)

def analyze_by_device(df):
    # Calcular a média de temperatura e umidade por dispositivo

    print("Mostrando média de temperatura e umidade por dispositivo")

    device_stats = df.groupBy("device_id").agg(
        avg("temperature").alias("avg_temperature"),
        avg("humidity").alias("avg_humidity")
    )

    device_stats.show(truncate=False)

# def plot_daily_temperature(df):
#     # Coletar dados como Pandas DataFrame para visualização
#     pandas_df = df.toPandas()

#     # Plotar dados
#     plt.figure(figsize=(12, 6))
#     plt.plot(pandas_df["date"], pandas_df["avg_temperature"], marker='o')
#     plt.title("Daily Average Temperature")
#     plt.xlabel("Date")
#     plt.ylabel("Average Temperature")
#     plt.grid(True)
#     plt.xticks(rotation=45)
#     plt.tight_layout()
#     plt.show()

def analyze_correlation(df):
    # Calcular a correlação entre temperatura e umidade
    correlation = df.stat.corr("temperature", "humidity")
    print(f"Correlation between temperature and humidity: {correlation}")

In [10]:
# Função para transformar a mensagem Pub/Sub em um DataFrame Spark
def pubsub_message_to_dataframe(messages):
    data = [json.loads(message.data.decode("utf-8")) for message in messages]
    return spark.createDataFrame(data, schema=main_schema)


In [11]:
# Lógica de Processamento
def process_iot_data(df):
    # Explodir a coluna 'device_data' para criar uma linha por dispositivo
    df_exploded = df.withColumn("device_data_exploded", explode("device_data"))

    # Selecionar e mapear as colunas
    df_mapped = df_exploded.select(
        col("batch_id"),
        col("timestamp"),
        col("device_data_exploded.device_id"),
        col("device_data_exploded.timestamp").alias("device_timestamp"),
        col("device_data_exploded.date"),
        col("device_data_exploded.status"),
        col("device_data_exploded.data.temperature"),
        col("device_data_exploded.data.humidity")
    )
    
    df_pandas = df_mapped.toPandas()

    csv_file = f'Data/{datetime.now()}.csv'
    df_pandas.to_csv(csv_file, index=False)
    upload_to_gcs("data-lake-iot",csv_file,f"IOT_DATA/{datetime.now()}.csv")
    


    # Exibir o esquema e uma amostra dos dados
    # df_mapped.printSchema()
    # df_mapped.show(truncate=False)

    # Executar análises
    analyze_data(df_mapped)
    analyze_time_series(df_mapped)
    analyze_by_device(df_mapped)
    analyze_correlation(df_mapped)

    # Plotar resultados
    daily_stats_df = analyze_time_series(df_mapped)
    #plot_daily_temperature(daily_stats_df)


In [None]:
# Inicia o consumidor
def start_subscriber():
    from google.cloud import pubsub_v1

    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(project_id, subscription_id)

    def callback(message):
        # Converte a mensagem Pub/Sub em um DataFrame Spark e processa
        df = pubsub_message_to_dataframe([message])
        process_iot_data(df)
        message.ack()

    # Configura o streaming listener
    streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
    print(f"Listening for messages on {subscription_path}...")
    try:
        streaming_pull_future.result()
    except KeyboardInterrupt:
        streaming_pull_future.cancel()

# Inicia o processo de assinante Pub/Sub
start_subscriber()


Listening for messages on projects/iot-faker-spark/subscriptions/Spark-IOT...
Arquivo Data/2024-10-17 23:22:39.687635.csv enviado para IOT_DATA/2024-10-17 23:22:39.690321.csv.
Mostrando estatísticas básicas
+---------------+------------+------------------+------------------+---------------+---------------+------------+------------+
|avg_temperature|avg_humidity|stddev_temperature|stddev_humidity   |max_temperature|min_temperature|max_humidity|min_humidity|
+---------------+------------+------------------+------------------+---------------+---------------+------------+------------+
|49.35          |53.8        |18.073752413812397|21.350705645925323|78             |24             |80          |18          |
+---------------+------------+------------------+------------------+---------------+---------------+------------+------------+

Mostrando Status do dia
+----------+---------------+------------+
|date      |avg_temperature|avg_humidity|
+----------+---------------+------------+
|1970-0