In [None]:
# Importa bibliotecas necessárias para requisições HTTP, manipulação de datas e dados, e operações com Spark
import requests  # Para realizar requisições à API dos pluviômetros
from datetime import datetime, timedelta  # Para manipular datas e horas
import pandas as pd  # Para manipulação de dados em DataFrames (utilizado em etapas específicas)
from delta.tables import DeltaTable  # Para operações com tabelas Delta no Lakehouse
from pyspark.sql.functions import col, max, min, current_timestamp  # Funções do PySpark para manipulação de colunas e timestamps
from pyspark.sql import functions as F  # Alias para funções do PySpark
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType, TimestampType  # Tipos de dados para definição de schemas

# Dicionário de configuração contendo informações sobre as tabelas que serão utilizadas no Lakehouse
configTables = {
    "sensordata": {
        # Condição de junção para operações de merge entre dados existentes e novos
        "queryMerge": "t.sensorId = s.sensorId AND t.readingDate = s.readingDate",
        # Esquema (schema) da tabela 'sensordata', definindo os nomes e tipos de cada coluna
        "schema": StructType([
            StructField("sensorId", LongType(), True),  # ID único do sensor
            StructField("softSensorId", LongType(), True),  # ID do sensor virtual (soft sensor)
            StructField("readingDate", TimestampType(), True),  # Data e hora da leitura
            StructField("sensorValue", DoubleType(), True),  # Valor medido pelo sensor (precipitação em mm)
            StructField("deviceName", StringType(), True),  # Nome do dispositivo
            StructField("deviceId", LongType(), True),  # ID único do dispositivo
            StructField("sensorType", StringType(), True)  # Tipo do sensor
        ])
    },
    "softsensordata": {
        # Condição de junção para operações de merge entre dados existentes e novos
        "queryMerge": "t.softsensorId = s.softsensorId AND t.readingDate = s.readingDate",
        # Esquema (schema) da tabela 'softsensordata', com campos adicionais específicos para sensores virtuais
        "schema": StructType([
            StructField("sensorId", LongType(), True),  # ID único do sensor
            StructField("softSensorId", LongType(), True),  # ID do sensor virtual (soft sensor)
            StructField("readingDate", TimestampType(), True),  # Data e hora da leitura
            StructField("sensorValue", DoubleType(), True),  # Valor medido pelo sensor
            StructField("deviceName", StringType(), True),  # Nome do dispositivo
            StructField("deviceId", LongType(), True),  # ID único do dispositivo
            StructField("sensorType", StringType(), True),  # Tipo do sensor
            StructField("customName", StringType(), True)  # Nome personalizado atribuído ao sensor
        ])
    }
}


StatementMeta(, 36b874ce-9ff6-42d4-85e6-c8f9362a2c31, 21, Finished, Available, Finished)

In [None]:
# Classe responsável por se comunicar com a API fornecida pela empresa terceirizada dos pluviômetros.
# Essa classe encapsula os métodos de autenticação, obtenção de dispositivos e coleta de dados dos sensores.

class OrAPI:
    def __init__(self):
        # Chave de API usada para autenticar a requisição (substituída por 'X' por motivos de segurança)
        self.apiKey = "xXXXXXXXXXXXXXXXXXXXXXXXXXx" 
        # URL base da API fornecida pela empresa contratada
        self.url = "https://xxxxxxxxxxxxxxxxxxxx/api" 
        # Header com o token de autenticação (será obtido automaticamente no construtor)
        self.headers = self.getToken()

    # Função auxiliar para formatar datas no padrão esperado pela API
    def formatDate(self, date):
        return date.strftime("%Y-%m-%d %H:%M:%S")

    # Método que faz a requisição inicial para obter um token de acesso (Bearer Token)
    def getToken(self):
        params = {"apiKey" : self.apiKey}
        response = requests.get(self.url + '/token', params=params)

        # Se a requisição for bem-sucedida (status 200), retorna o header com token
        if(response.status_code == 200):
            token = response.json()['token']
            return {
                'Authorization': f'Bearer {token}'
            }

    # Método para buscar a lista de dispositivos (pluviômetros) cadastrados para o usuário
    def getUserDevices(self):
        if(self.headers):
            response = requests.get(self.url + "/userdevices", headers=self.headers)
            if(response.status_code == 200):
                return response.json()

    # Método que coleta os dados de sensores entre duas datas específicas
    # Permite filtrar por sensorId ou softSensorId
    def getSensorData(self, startDate, endDate, sensorIds = 0, softSensorIds = 0, numAttempts = 3):
        data = []

        if(self.headers):
            params = {
                "startDate": self.formatDate(startDate),  # Data inicial formatada
                "endDate": self.formatDate(endDate),      # Data final formatada
                "offset": 0                               # Parâmetro de paginação (caso necessário)
            }

            # Adiciona filtros opcionais para os sensores (físicos ou virtuais)
            if(sensorIds):
                params["sensorIds"] = sensorIds
            if(softSensorIds):
                params["softSensorIds"] = softSensorIds

            # Requisição à API para obter os dados do sensor
            response = requests.get(self.url + "/sensordata", params=params, headers=self.headers)

            if(response.status_code == 200):
                data = response.json()
            else:
                # Caso a requisição falhe, imprime código de erro e IDs dos sensores
                print("****Request Code: ", response.status_code, "SensorId:", sensorIds, "softSensorIds", softSensorIds)

            # Caso a resposta venha vazia, tenta novamente até esgotar o número de tentativas
            if(len(data) == 0 and numAttempts > 0):
                attempt = numAttempts - 1
                data = self.getSensorData(startDate, endDate, sensorIds, softSensorIds, numAttempts = attempt)

        return data


StatementMeta(, 36b874ce-9ff6-42d4-85e6-c8f9362a2c31, 22, Finished, Available, Finished)

In [None]:
class Repository:
    def __init__(self):
        # Inicializa o repositório com as configurações definidas
        # em configTables, que inclui os schemas e condições de merge para as tabelas.
        self.configTables = configTables

    def overwriteData(self, df, tableName):
        """
        Sobrescreve completamente os dados existentes na tabela Delta com o DataFrame fornecido.
        
        Essa função é útil em cargas iniciais ou quando uma reconstrução completa da tabela é necessária.
        
        Parâmetros:
        - df: DataFrame do PySpark contendo os dados que serão salvos.
        - tableName: Nome da tabela Delta no Lakehouse.
        """
        df.write.format("delta")\
            .mode("overwrite")\
            .option("overwriteSchema", "true")\
            .saveAsTable(tableName)

    def mergeData(self, df, tableName):
        """
        Realiza a atualização incremental (merge) dos dados na tabela Delta.
        
        Essa função verifica, utilizando a condição definida em configTables (queryMerge),
        se os registros já existem na tabela. Se existirem, os registros são atualizados;
        caso contrário, novos registros são inseridos.
        
        Isso possibilita armazenar os dados de forma incremental, evitando a necessidade
        de sobrescrever toda a tabela diariamente, o que melhora a performance das consultas.
        """
        config = self.configTables[tableName]  # Recupera a configuração específica da tabela

        # Carrega a tabela Delta existente no ambiente Spark.
        deltaTable = DeltaTable.forName(spark, tableName)

        # Realiza a operação de merge:
        # - 't' representa os dados existentes na tabela.
        # - 's' representa os novos dados (DataFrame).
        # A condição de junção é definida pela query em config["queryMerge"].
        # Atualiza todos os registros correspondentes ou insere os que não existem.
        deltaTable.alias("t").merge(
            df.alias("s"),
            config["queryMerge"]
        ).whenMatchedUpdateAll()\
         .whenNotMatchedInsertAll()\
         .execute()


StatementMeta(, 36b874ce-9ff6-42d4-85e6-c8f9362a2c31, 23, Finished, Available, Finished)

In [None]:
class DataCollectionService:
    def __init__(self):
        # Inicializa as configurações e instâncias necessárias para o processo de coleta
        self.configTables = configTables  # Configurações de schema e regras de merge definidas anteriormente
        self.OrAPI = OrAPI()  # Instância da classe OrAPI para realizar requisições à API dos pluviômetros
        self.repository = Repository()  # Instância da classe Repository para manipular os dados no Lakehouse
        
        # Inicializa variáveis para armazenar os dados coletados
        self.userDevices = None  # Lista de dispositivos (pluviômetros) obtidos a partir da API
        self.sensorDataList = []  # Lista para armazenar dados dos sensores físicos
        self.softSensorDataList = []  # Lista para armazenar dados dos sensores virtuais (soft sensors)
        self.processCollect = []  # Lista para registrar o progresso e detalhes da coleta de dados
        
        # DataFrames que serão criados a partir dos dados coletados
        self.dfSensorData = None  
        self.dfSoftSensorData = None
        self.dfUserDevices = None
        self.dfProcessCollect = None
        
        # Define o período para a coleta de dados
        self.days_to_subtract = 1  # Número de dias a subtrair para definir o início do período (ex: coleta dos dados do dia anterior)
        self.startDate = datetime.now() - timedelta(days=self.days_to_subtract)
        self.endDateFinal = datetime.now()  # Data final (atual) para o período de coleta

    def collectAndUpdateUserDevice(self):
        """
        Coleta a lista de dispositivos (pluviômetros) da API e atualiza a tabela 'devices' no Lakehouse.
        
        Para cada dispositivo:
        - Converte as strings de datas ('lastUpload' e 'dateInstalled') para objetos datetime.
        - Filtra os sensores, ignorando os do tipo 'Unallocated'.
        - Registra o número de sensores físicos e virtuais ativos.
        Por fim, converte os dados para um DataFrame do PySpark e sobrescreve a tabela 'devices'.
        """
        # Obtém os dispositivos a partir da API
        self.userDevices = self.OrAPI.getUserDevices()
        for item in self.userDevices:
            # Converte o campo 'lastUpload' para datetime (formato com fração de segundos)
            item['lastUpload'] = datetime.strptime(item['lastUpload'], '%Y-%m-%dT%H:%M:%S.%f')
            # Converte o campo 'dateInstalled' para datetime (formato sem fração de segundos)
            item['dateInstalled'] = datetime.strptime(item['dateInstalled'], '%Y-%m-%dT%H:%M:%S')
            
            # Filtra os sensores, considerando apenas aqueles que não devem ser ignorados (não são 'Unallocated')
            sensorActive = [s for s in item['sensors'] if self.sensorIngoreRequest(s)]
            softSensorsActive = [s for s in item['softSensors'] if self.sensorIngoreRequest(s)]
            
            # Armazena o número de sensores ativos em campos específicos
            item['numSensor'] = len(sensorActive)
            item['numSoftSensor'] = len(softSensorsActive)
        
        # Converte a lista de dispositivos (obtidos da API) de um DataFrame Pandas para um DataFrame PySpark
        self.dfUserDevices = spark.createDataFrame(self.userDevices)
        
        # Sobrescreve a tabela 'devices' no Lakehouse com o novo DataFrame
        self.repository.overwriteData(self.dfUserDevices, "devices")

    def sensorIngoreRequest(self, sensor):
        """
        Determina se um sensor deve ser ignorado com base no seu tipo.
        
        Parâmetro:
        - sensor: Dicionário contendo os dados de um sensor.
        
        Retorna:
        - True se o sensor NÃO for do tipo 'Unallocated'; caso contrário, False.
        """
        return sensor['sensorType'] != 'Unallocated'

    def getSensorData(self, startDate, endDate, userDevice, sensorNameList, sensorId):
        """
        Coleta os dados dos sensores para um dispositivo específico em um intervalo de tempo.
        
        Parâmetros:
        - startDate: Data inicial do período de coleta.
        - endDate: Data final do período de coleta.
        - userDevice: Dicionário com informações do dispositivo (inclui nome, ID e listas de sensores).
        - sensorNameList: Nome da lista a ser acessada no dispositivo ('sensors' ou 'softSensors').
        - sensorId: Chave que identifica o ID do sensor na lista (pode ser 'sensorId' ou 'softSensorId').
        
        Para cada sensor que não deve ser ignorado:
        - Requisita os dados dos sensores para o período definido.
        - Adiciona informações do dispositivo (nome e ID) e tipo do sensor aos registros retornados.
        - Para sensores virtuais (softSensors), adiciona o campo 'customName'.
        
        Imprime a quantidade de itens coletados e retorna a lista agregada de dados.
        """
        deviceName = userDevice['deviceName']
        deviceId = userDevice['deviceId']
        sensorDataList = []  # Lista para armazenar os dados coletados para esse dispositivo
        
        # Itera sobre cada sensor (ou soft sensor) do dispositivo
        for sensor in userDevice[sensorNameList]:
            # Apenas processa sensores que não são do tipo 'Unallocated'
            if self.sensorIngoreRequest(sensor):
                # Realiza a coleta dos dados conforme o tipo do sensor (físico ou virtual)
                if sensorNameList == 'sensors':
                    data = self.OrAPI.getSensorData(startDate, endDate, sensorIds=sensor[sensorId])
                else:
                    data = self.OrAPI.getSensorData(startDate, endDate, softSensorIds=sensor[sensorId])
                
                # Enriquecer cada registro com dados adicionais do dispositivo e do sensor
                for item in data:
                    item['deviceName'] = deviceName
                    item['deviceId'] = deviceId
                    item['sensorType'] = sensor['sensorType']
                    # Para soft sensors, adiciona o nome personalizado do sensor
                    if sensorNameList == 'softSensors':
                        item['customName'] = sensor['customName']
                
                # Acrescenta os dados coletados à lista geral para este dispositivo
                sensorDataList.extend(data)
        
        # Exibe no console o número de itens coletados para facilitar o monitoramento do processo
        print("Num Item: {} - {} items ".format(sensorNameList, len(sensorDataList)))
        return sensorDataList

    def parseDfSpark(self, data, tableName):
        """
        Processa a lista de dados coletados, realizando conversões necessárias, 
        removendo duplicatas e convertendo para um DataFrame do PySpark com o schema correto.
        
        Parâmetros:
        - data: Lista de dicionários com os dados dos sensores coletados.
        - tableName: Nome da tabela para a qual os dados serão convertidos ('sensordata' ou 'softsensordata').
        
        Retorna:
        - DataFrame do PySpark sem duplicatas.
        """
        config = self.configTables[tableName]  # Recupera o schema para a tabela especificada
        
        # Converte os campos de cada item para os tipos corretos
        for item in data:
            # Converte 'readingDate' (string) para objeto datetime
            item['readingDate'] = datetime.strptime(item['readingDate'], '%Y-%m-%dT%H:%M:%S')
            # Garante que 'sensorValue' seja tratado como float
            item['sensorValue'] = item['sensorValue'] * 1.0
        
        # Cria o DataFrame do PySpark utilizando o schema definido
        df = spark.createDataFrame(data, schema=config["schema"])
        # Remove duplicatas para garantir a consistência dos dados
        df_no_duplicates = df.dropDuplicates()
        
        return df_no_duplicates

    def registerProcessCollect(self, current_date, endDate, count, userDevice, userDevices, requestSensors):
        """
        Registra o progresso do processo de coleta de dados para cada dispositivo.
        
        Calcula e imprime a porcentagem de dispositivos processados e armazena as informações de progresso,
        como a data, o nome do dispositivo e a quantidade de sensores físicos e virtuais.
        
        Parâmetros:
        - current_date: Data de início do período atual de coleta.
        - endDate: Data final do período atual de coleta.
        - count: Contador que representa a ordem de processamento do dispositivo.
        - userDevice: Dicionário com as informações do dispositivo sendo processado.
        - userDevices: Lista com todos os dispositivos.
        - requestSensors: Lista com as configurações das requisições (para sensores e softSensors).
        """
        # Calcula a porcentagem de dispositivos processados
        percent = int(count * 100 / len(userDevices))
        print("\- {}% Collect Sensor Device: {} | Num Sensor: {}  | Num SoftSensor: {} ".format(
            percent, userDevice['deviceName'], len(userDevice['sensors']), len(userDevice['softSensors'])
        ))
        # Registra os dados do processo de coleta para esse dispositivo
        process = {
            "current_date": current_date,
            "endDate": endDate,
            "percent": percent,
            "deviceName": userDevice['deviceName'],
            "numSensor": len(userDevice['sensors']),
            "numSoftSensor": len(userDevice['softSensors']),
        }
        self.processCollect.append(process)

    def collectAndUpdateSensorData(self):
        """
        Função principal que gerencia a coleta e atualização dos dados dos sensores.
        
        Para cada dia no intervalo definido (entre startDate e endDateFinal):
        - Define o período diário (current_date até endDate).
        - Para cada dispositivo, coleta os dados dos sensores físicos e virtuais usando 'getSensorData'.
        - Registra o progresso da coleta por meio de 'registerProcessCollect'.
        - Incrementa o contador e o período para o próximo dia.
        
        Após a coleta:
        - Converte as listas de dados (sensorDataList e softSensorDataList) em DataFrames do PySpark com
          os schemas apropriados usando 'parseDfSpark'.
        - Realiza operações de merge incremental nos dados, atualizando as tabelas 'sensordata' e 'softsensordata'
          no Lakehouse por meio do método 'mergeData' da classe Repository.
        """
        # Inicia a coleta a partir do início do dia da data inicial (startDate)
        current_date = self.startDate.replace(hour=0, minute=0, second=0, microsecond=0)

        # Caso os dispositivos ainda não tenham sido coletados, obtém-os da API
        if self.userDevices is None:
            self.userDevices = self.OrAPI.getUserDevices()

        # Reinicia as listas de dados e de progresso
        self.sensorDataList = []
        self.softSensorDataList = []
        self.processCollect = []
        
        # Loop principal: coleta dados diariamente enquanto current_date for menor que endDateFinal
        while current_date < self.endDateFinal:
            endDate = current_date + timedelta(days=1)  # Define o fim do período de 1 dia
            count = 1  # Inicializa contador para dispositivos processados no dia
            
            # Exibe informações de progresso, incluindo número do dia e período de coleta
            print(
                (current_date - self.startDate.replace(hour=0, minute=0, second=0, microsecond=0)).days,
                "/", (self.endDateFinal - self.startDate).days,
                " | Init:", current_date,
                "Final:", endDate,
                " Period:", endDate - current_date
            )
            
            # Para cada dispositivo na lista de userDevices, coleta os dados dos sensores
            for userDevice in self.userDevices:
                # Caso queira filtrar dispositivos específicos, pode descomentar e utilizar a lista:
                # deviceNames = ['EWS6989 - Abrigo km 347','EWS6288 - Abrigo km 587']
                # if(userDevice['deviceName'] in deviceNames):
                
                # Define as configurações para coleta: uma para sensores físicos e outra para softSensors
                requestSensors = [
                    {"list": "sensors", "colId": "sensorId"},
                    {"list": "softSensors", "colId": "softSensorId"}
                ]
                for r in requestSensors:
                    # Coleta os dados para o período atual e para o tipo de sensor especificado
                    dataList = self.getSensorData(current_date, endDate, userDevice, r['list'], r['colId'])
                    # Acumula os dados na lista correspondente (físicos ou virtuais)
                    if r['list'] == "sensors":
                        self.sensorDataList.extend(dataList)
                    else:
                        self.softSensorDataList.extend(dataList)

                # Registra o progresso da coleta para o dispositivo atual
                self.registerProcessCollect(current_date, endDate, count, userDevice, self.userDevices, requestSensors)
                count += 1  # Incrementa o contador para o próximo dispositivo
            
            # Avança para o próximo dia
            current_date += timedelta(days=1)

        # Após a coleta, converte as listas de dados em DataFrames com o esquema definido
        self.dfSensorData = self.parseDfSpark(self.sensorDataList, "sensordata")
        self.dfSoftSensorData = self.parseDfSpark(self.softSensorDataList, "softsensordata")
        
        # Realiza a operação de merge incremental, atualizando as tabelas Delta no Lakehouse
        self.repository.mergeData(self.dfSensorData, "sensordata")
        self.repository.mergeData(self.dfSoftSensorData, "softsensordata")


StatementMeta(, 36b874ce-9ff6-42d4-85e6-c8f9362a2c31, 29, Finished, Available, Finished)

In [None]:
# Cria uma instância da classe DataCollectionService.
# Essa instância gerencia todo o processo de coleta e atualização dos dados dos
# pluviômetros: desde a obtenção das informações da API, passando pela transformação dos dados,
# até o armazenamento incremental no Lakehouse via operações de merge.
dataCollectionService = DataCollectionService()

StatementMeta(, 36b874ce-9ff6-42d4-85e6-c8f9362a2c31, 30, Finished, Available, Finished)

In [None]:
# Inicia o processo de coleta e atualização dos dados dos sensores dos pluviômetros.
# Este método coordena as seguintes etapas:
# 1. Coleta dos dados dos dispositivos para o período definido.
# 2. Requisição dos dados dos sensores (físicos e virtuais) para cada dispositivo.
# 3. Registro do progresso da coleta para monitoramento.
# 4. Conversão das listas coletadas em DataFrames do PySpark com o schema definido.
# 5. Atualização incremental das tabelas 'sensordata' e 'softsensordata' no Lakehouse,
#    utilizando operações de merge para inserir novos registros e atualizar os existentes.
dataCollectionService.collectAndUpdateSensorData()

StatementMeta(, 36b874ce-9ff6-42d4-85e6-c8f9362a2c31, 31, Finished, Available, Finished)

0 / 1  | Init: 2025-04-10 00:00:00 Final: 2025-04-11 00:00:00  Period: 1 day, 0:00:00
Num Item: sensors - 155 items 
Num Item: softSensors - 195 items 
\- 3% Collect Sensor Device: EWS6989 - Abrigo km 035 | Num Sensor:7  | Num SoftSensor:3 
Num Item: sensors - 155 items 
Num Item: softSensors - 195 items 
\- 7% Collect Sensor Device: EWS6985 - Abrigo km 409 | Num Sensor:7  | Num SoftSensor:3 
Num Item: sensors - 154 items 
Num Item: softSensors - 195 items 
\- 10% Collect Sensor Device: EWS6984  - Abrigo km 807 | Num Sensor:7  | Num SoftSensor:3 
Num Item: sensors - 155 items 
Num Item: softSensors - 292 items 
\- 14% Collect Sensor Device: EWS6998 - Abrigo km 70 (Ramal) | Num Sensor:7  | Num SoftSensor:5 
Num Item: sensors - 155 items 
Num Item: softSensors - 195 items 
\- 17% Collect Sensor Device: EWS6977 - Abrigo km 193 | Num Sensor:7  | Num SoftSensor:3 
Num Item: sensors - 154 items 
Num Item: softSensors - 292 items 
\- 21% Collect Sensor Device: EWS7002 - Abrigo km 11 (Ramal) |

# Teste e BackUp Códigos

In [None]:
dfSensordata = spark.read.table("sensordata")


StatementMeta(, de42de6d-14f0-4a9d-9f2e-3caf7c005ad4, 48, Finished, Available, Finished)

In [None]:
display(dfSensordata.groupBy('deviceName','sensorId').agg({"readingDate": "min"}))

StatementMeta(, de42de6d-14f0-4a9d-9f2e-3caf7c005ad4, 49, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 0df10e36-00a7-44e4-8b4c-93c942a20512)

In [None]:
dfSoftSensordata = spark.read.table("softsensordata")

StatementMeta(, de42de6d-14f0-4a9d-9f2e-3caf7c005ad4, 59, Finished, Available, Finished)

In [None]:
dfSoftSensorDataGroupDayWithoutData = dfSoftSensordata.groupBy('deviceName','customName').agg({"readingDate": "min", "readingDate": "max"})

StatementMeta(, de42de6d-14f0-4a9d-9f2e-3caf7c005ad4, 84, Finished, Available, Finished)

In [None]:
#dfSoftSensorDataGroupDayWithoutData["LastDataInDay"] = datetime.now() - dfSoftSensorDataGroupDayWithoutData['max(readingDate)']

from pyspark.sql.window import Window

windowSpec = Window.partitionBy('customName')


dfSoftSensorDataGroupDayWithoutData = dfSoftSensorDataGroupDayWithoutData.withColumn("LastDataInDay", current_timestamp() - col('max(readingDate)'))


StatementMeta(, de42de6d-14f0-4a9d-9f2e-3caf7c005ad4, 86, Finished, Available, Finished)

In [None]:
display(dfSoftSensorDataGroupDayWithoutData)

StatementMeta(, de42de6d-14f0-4a9d-9f2e-3caf7c005ad4, 88, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, e4e9623b-afa7-44f6-88da-2d1a095d38f1)

In [6]:
def sensorIngoreRequest(sensor):
    return sensor['sensorType'] != 'Unallocated'

sensorDataList = []
softSensorDataList = []    
current_date = startDate.replace(hour=0, minute=0, second=0, microsecond=0)
while current_date < endDate:
    end = current_date + timedelta(days=1)
    count = 1
    print(startDate - current_date,"/",startDate.day - endDate.day," | Init:" , current_date, "Final:",end ," Period:",end - current_date)
    for userDevice in userDevices:
        deviceName = userDevice['deviceName']
        deviceId   = userDevice['deviceId']
        percent = int(count*100/len(userDevices))
        print("\- {}% Collect Sensor Device: {} | Num Sensor:{}  | Num SoftSensor:{} ".format(percent,deviceName, len(userDevice['sensors']), len(userDevice['softSensors']) ) )
        for sensor in userDevice['sensors']:
            if(sensorIngoreRequest(sensor)):
                data = OrAPI.getSensorData(current_date,end,sensorIds = sensor['sensorId'])
                for item in data:
                    item['deviceName'] = deviceName
                    item['deviceId'] = deviceId
                    item['sensorType'] = sensor['sensorType']
                sensorDataList.extend(data)

        for softSensor in userDevice['softSensors']:
            if(sensorIngoreRequest(sensor)):
                data = OrAPI.getSensorData(current_date,end,softSensorIds = softSensor['softSensorId'])
                for item in data:
                    item['deviceName'] = deviceName
                    item['deviceId'] = deviceId
                    item['sensorType'] = softSensor['sensorType']
                    item['customName'] =softSensor['customName']
                softSensorDataList.extend(data)
        count += 1
    current_date += timedelta(days=1)

StatementMeta(, d3cbe9f8-95e4-4e08-8507-87a8ec067e4b, 8, Finished, Available, Finished)

0 / 16  | Init: 2025-03-27 00:00:00 Final: 2025-03-28 00:00:00  Period: 1 day, 0:00:00
\- 3% Collect Sensor Device: EWS6989 - Abrigo km 035 | Num Sensor:7  | Num SoftSensor:3 
\- 7% Collect Sensor Device: EWS6985 - Abrigo km 409 | Num Sensor:7  | Num SoftSensor:3 
\- 10% Collect Sensor Device: EWS6984  - Abrigo km 807 | Num Sensor:7  | Num SoftSensor:3 
\- 14% Collect Sensor Device: EWS6998 - Abrigo km 70 (Ramal) | Num Sensor:7  | Num SoftSensor:5 
\- 17% Collect Sensor Device: EWS6977 - Abrigo km 193 | Num Sensor:7  | Num SoftSensor:3 
\- 21% Collect Sensor Device: EWS7002 - Abrigo km 11 (Ramal) | Num Sensor:7  | Num SoftSensor:5 
\- 25% Collect Sensor Device: EWS6737  - Abrigo km 629 | Num Sensor:7  | Num SoftSensor:3 
\- 28% Collect Sensor Device: EWS6966 - Abrigo km 244 | Num Sensor:7  | Num SoftSensor:3 
\- 32% Collect Sensor Device: EWS6973 - Abrigo km 51 (Ramal) | Num Sensor:7  | Num SoftSensor:3 
\- 35% Collect Sensor Device: EWS6949  - Abrigo km 371 | Num Sensor:13  | Num Soft

In [10]:
dfSensorData = pd.DataFrame(sensorDataList)
dfSensorData.fillna(0, inplace=True)
dfSensorData['readingDate'] = pd.to_datetime(dfSensorData['readingDate'])
dfSensorDataSpark = spark.createDataFrame(dfSensorData)
        # Salvar o DataFrame do SoftSensor em tabela formato Delta
dfSensorDataSpark.write.format("delta").mode("overwrite").saveAsTable('sensordata')

StatementMeta(, 67d92f8d-5234-4660-a801-bbc382d240ee, 12, Finished, Available, Finished)

In [11]:
dfSoftSensorData = pd.DataFrame(softSensorDataList)
dfSoftSensorData.fillna(0, inplace=True)
dfSoftSensorData['readingDate'] = pd.to_datetime(dfSoftSensorData['readingDate'])
dfSoftSensorData = spark.createDataFrame(dfSoftSensorData)
        # Salvar o DataFrame do SoftSensor em tabela formato Delta
dfSoftSensorData.write.format("delta").mode("overwrite").saveAsTable('softsensordata')

StatementMeta(, 67d92f8d-5234-4660-a801-bbc382d240ee, 13, Finished, Available, Finished)

In [None]:
#dfSensorDataSpark = dfSensorDataSpark.orderBy('readingDate')
dffilter =  dfSensorDataSpark.where((dfSensorDataSpark['sensorType'] == 'Air Temperature') & (dfSensorDataSpark['sensorValue'] > 0)  & (dfSensorDataSpark['sensorId'] == 87689) )
dffilter.orderBy('readingDate')
dffilter.select(max(col("readingDate"))).show()

StatementMeta(, 11899cbe-f972-45ed-ab1d-edaec7670138, 44, Finished, Available, Finished)

+-------------------+
|   max(readingDate)|
+-------------------+
|2025-04-09 22:00:00|
+-------------------+



In [None]:
dfSensorDataSpark.where((dfSensorDataSpark['deviceId'] == 40635)).groupBy(['sensorType','sensorId']).agg(max('readingDate')).show()

StatementMeta(, 11899cbe-f972-45ed-ab1d-edaec7670138, 51, Finished, Available, Finished)

+------------------+--------+-------------------+
|        sensorType|sensorId|   max(readingDate)|
+------------------+--------+-------------------+
|Battery (External)|   82094|2025-03-30 03:00:00|
|          Rainfall|   82095|2025-03-27 00:45:00|
|   Air Temperature|   86631|2025-03-30 03:00:00|
|           Battery|   82096|2025-04-08 03:15:00|
+------------------+--------+-------------------+



In [None]:
count= 0
count =+ 1
count

StatementMeta(, dd446ec3-963e-4957-910c-a45bdd5fcd36, 8, Finished, Available, Finished)

1

In [None]:
repository.mergeSensorData(sensorDataList,"sensordata")
repository.mergeSensorData(softSensorDataList,"softsensordata")

StatementMeta(, 11899cbe-f972-45ed-ab1d-edaec7670138, 45, Finished, Available, Finished)

AnalysisException: [DELTA_MISSING_DELTA_TABLE] `sensordata` is not a Delta table.

In [None]:
df = pd.DataFrame(sensorDataList)
df['readingDate'] = pd.to_datetime(df['readingDate'])
df['readingDate'].max()

StatementMeta(, 656558c0-1531-4d96-a818-f7d1644bf3da, 32, Finished, Available, Finished)

Timestamp('2025-04-08 00:00:00')

In [None]:
dfSensorData = pd.DataFrame(sensorDataList)
dfSensorData.fillna(0, inplace=True)
dfSensorDataSpark = spark.createDataFrame(dfSensorData)

dfSoftSensorData = pd.DataFrame(softSensorDataList)
dfSoftSensorData.fillna(0, inplace=True)
dfSoftSensorDataSpark = spark.createDataFrame(dfSoftSensorData)

In [None]:
userDevices

StatementMeta(, 656558c0-1531-4d96-a818-f7d1644bf3da, 25, Finished, Available, Finished)

[{'deviceId': 40645,
  'deviceName': 'EWS6989 - Abrigo km 035',
  'reference': '6989',
  'serialNumber': '301434061805050',
  'dateInstalled': '2024-12-10T00:00:00',
  'status': 'Online',
  'lastUpload': '2025-04-10T12:02:40.93',
  'latitude': -2.903368,
  'longitude': -44.364541,
  'batteryPercentage': 95,
  'sensors': [{'sensorId': 90526,
    'sensorType': 'Unallocated',
    'uom': 'Unallocated',
    'channelNumber': '257',
    'customName': '',
    'scadaTag': ''},
   {'sensorId': 90160,
    'sensorType': 'Air Temperature',
    'uom': 'Degs (C)',
    'channelNumber': '3',
    'customName': '',
    'scadaTag': ''},
   {'sensorId': 82146,
    'sensorType': 'Battery (External)',
    'uom': 'Volts',
    'channelNumber': '2',
    'customName': '',
    'scadaTag': ''},
   {'sensorId': 82147,
    'sensorType': 'Rainfall',
    'uom': 'mm',
    'channelNumber': '1',
    'customName': '',
    'scadaTag': ''},
   {'sensorId': 90528,
    'sensorType': 'Unallocated',
    'uom': 'Unallocated',
  

In [10]:
# Converte o DataFrame Pandas para DataFrame PySpark


dfDevices = pd.DataFrame(userDevices)
dfDevices['lastUpload'] = pd.to_datetime(dfDevices['lastUpload'])
dfDevices['lastUpload'].max()
dfDevices = spark.createDataFrame(userDevices)
dfDevices.write.format("delta").mode("overwrite").saveAsTable("devices")
dfDevices.show()


StatementMeta(, 656558c0-1531-4d96-a818-f7d1644bf3da, 36, Finished, Available, Finished)

+-----------------+-------------------+--------+--------------------+--------------------+---------+----------+---------+--------------------+---------------+--------------------+------+
|batteryPercentage|      dateInstalled|deviceId|          deviceName|          lastUpload| latitude| longitude|reference|             sensors|   serialNumber|         softSensors|status|
+-----------------+-------------------+--------+--------------------+--------------------+---------+----------+---------+--------------------+---------------+--------------------+------+
|               95|2024-12-10T00:00:00|   40645|EWS6989 - Abrigo ...|2025-04-10T12:02:...|-2.903368|-44.364541|     6989|[{uom -> NULL, ch...|301434061805050|[{scadaTag -> NUL...|Online|
|               95|2024-11-12T00:00:00|   40643|EWS6985 - Abrigo ...|2025-04-10T12:02:...|-4.387275| -46.79522|     6985|[{uom -> NULL, ch...|301434060823820|[{scadaTag -> NUL...|Online|
|              105|2024-11-05T00:00:00|   40641|EWS6984  - Abrigo

#### Criar Tabela Devices

In [6]:
# Converte o DataFrame Pandas para DataFrame PySpark
dfDevices = spark.createDataFrame(userDevices)

# Salvar o DataFrame do SoftSensor em tabela formato Delta
dfDevices.write.format("delta").mode("overwrite").saveAsTable("devices")


# from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType, TimestampType

# # Definir o esquema das colunas
# schema = StructType([
#     StructField("sensorId", LongType(), True),
#     StructField("readingDate", TimestampType(), True),
#     StructField("sensorValue", DoubleType(), True),
#     StructField("deviceName", StringType(), True),
#     StructField("deviceId", LongType(), True),
#     StructField("sensorType", StringType(), True)
# ])

# # Criar o DataFrame com o esquema definido
# #dfSensorDataSpark = spark.createDataFrame(dfSensorDataSpark.rdd, schema)
# dfSensorDataSpark = spark.createDataFrame(dfSensorData, schema)
# # Salvar o DataFrame em formato Delta com o esquema definido
# dfSensorDataSpark.write.format("delta").mode("overwrite").saveAsTable("sensordata")

StatementMeta(, 656558c0-1531-4d96-a818-f7d1644bf3da, 27, Finished, Available, Finished)

#### Sensor Data

In [14]:
# Carregar a tabela Delta existente
deltaTable = DeltaTable.forName(spark, "sensordata")

# Definir a condição de junção e as ações de atualização/inserção
deltaTable.alias("t").merge(
    dfSensorDataSpark.alias("s"),
    "t.sensorId = s.sensorId AND t.readingDate = s.readingDate"
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

dfSensorDataSpark.show()

StatementMeta(, b64da358-674f-4305-b897-066987a34b81, 16, Finished, Available, Finished)

+--------+------------+-------------------+------------------+--------------------+--------+----------+
|sensorId|softSensorId|        readingDate|       sensorValue|          deviceName|deviceId|sensorType|
+--------+------------+-------------------+------------------+--------------------+--------+----------+
|   75788|           0|2025-04-08T00:00:00|               0.0|EWS6288 - Abrigo ...|   39974|  Rainfall|
|   75788|           0|2025-04-08T00:15:00|               0.0|EWS6288 - Abrigo ...|   39974|  Rainfall|
|   75788|           0|2025-04-08T00:30:00|               0.0|EWS6288 - Abrigo ...|   39974|  Rainfall|
|   75788|           0|2025-04-08T00:45:00|               0.0|EWS6288 - Abrigo ...|   39974|  Rainfall|
|   75788|           0|2025-04-08T01:00:00|               0.0|EWS6288 - Abrigo ...|   39974|  Rainfall|
|   75788|           0|2025-04-08T01:15:00|               0.0|EWS6288 - Abrigo ...|   39974|  Rainfall|
|   75788|           0|2025-04-08T01:30:00|               0.0|EW

#### Soft Sebsor Data

In [15]:
# Carregar a tabela Delta existente
deltaTable = DeltaTable.forName(spark, "softsensordata")

# Definir a condição de junção e as ações de atualização/inserção
deltaTable.alias("t").merge(
    dfSoftSensorDataSpark.alias("s"),
    "t.softsensorId = s.softsensorId AND t.readingDate = s.readingDate"
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

dfSoftSensorDataSpark.show()

StatementMeta(, b64da358-674f-4305-b897-066987a34b81, 17, Finished, Available, Finished)

+--------+------------+-------------------+------------------+--------------------+--------+----------+----+--------------------+
|sensorId|softSensorId|        readingDate|       sensorValue|          deviceName|deviceId|sensorType| uom|          customName|
+--------+------------+-------------------+------------------+--------------------+--------+----------+----+--------------------+
|   82072|       10083|2025-04-08T23:59:00|0.6000000089406967|EWS6734  - Abrigo...|   40629|  Rainfall|[mm]|Precipitação (mm/...|
|   82072|       10233|2025-04-08T00:00:00| 44.19999976456165|EWS6734  - Abrigo...|   40629|  Rainfall|[mm]|Precipitação (acu...|
|   82072|       10233|2025-04-08T00:15:00| 38.39999967813492|EWS6734  - Abrigo...|   40629|  Rainfall|[mm]|Precipitação (acu...|
|   82072|       10233|2025-04-08T00:30:00| 38.59999968111515|EWS6734  - Abrigo...|   40629|  Rainfall|[mm]|Precipitação (acu...|
|   82072|       10233|2025-04-08T00:45:00| 38.59999968111515|EWS6734  - Abrigo...|   4062

In [None]:
dfSensorDataSpark = dfSensorDataSpark.orderBy('readingDate')
dfSensorDataSpark.where((dfSensorDataSpark['sensorType'] == 'Unallocated') & (dfSensorDataSpark['sensorId'] == 86793)).show()

StatementMeta(, 9658dc7a-5399-4866-8b1f-b7117dfe2618, 25, Finished, Available, Finished)

+--------+------------+-----------+-----------+----------+--------+----------+
|sensorId|softSensorId|readingDate|sensorValue|deviceName|deviceId|sensorType|
+--------+------------+-----------+-----------+----------+--------+----------+
+--------+------------+-----------+-----------+----------+--------+----------+



In [15]:
sensorDataList[0]['readingDate']

StatementMeta(, d3cbe9f8-95e4-4e08-8507-87a8ec067e4b, 17, Finished, Available, Finished)

datetime.datetime(2025, 3, 27, 0, 0)

In [9]:
display(dfSensorDataSpark)

StatementMeta(, c764ddb8-8fb2-40ec-b02b-278de5696488, 23, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, c3c96fd3-0dea-4ca6-a45e-9a3934f6d65a)

In [8]:


# Converte a string de data para objeto datetime

for item in dataCollectionService.sensorDataList:
    #item['readingDate'] = datetime.strptime(item['readingDate'], '%Y-%m-%dT%H:%M:%S')
    item['sensorValue'] = item['sensorValue']*1.0

# Define o esquema com tipos consistentes
schema = StructType([
    StructField("sensorId", LongType(), True),
    StructField("softSensorId", LongType(), True),
    StructField("readingDate", TimestampType(), True),
    StructField("sensorValue", DoubleType(), True),
    StructField("deviceName", StringType(), True),
    StructField("deviceId", LongType(), True),
    StructField("sensorType", StringType(), True)
])

dfSensorDataSpark = spark.createDataFrame(dataCollectionService.sensorDataList,schema)

StatementMeta(, c764ddb8-8fb2-40ec-b02b-278de5696488, 18, Finished, Available, Finished)

In [None]:
display(dfSensorDataSpark.groupBy("sensorId","readingDate").agg({"sensorValue": "count"}))


StatementMeta(, c764ddb8-8fb2-40ec-b02b-278de5696488, 25, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, cde551dd-0e93-42a1-8d5e-4bc16657b7b5)

In [16]:
dfSensorDataSpark.write.format("delta").mode("overwrite").saveAsTable("testdevices")

StatementMeta(, d3cbe9f8-95e4-4e08-8507-87a8ec067e4b, 18, Finished, Available, Finished)

In [14]:
softSensorDataList[0]['readingDate']

StatementMeta(, d3cbe9f8-95e4-4e08-8507-87a8ec067e4b, 16, Finished, Available, Finished)

datetime.datetime(2025, 3, 27, 23, 59)

In [12]:


# Converte a string de data para objeto datetime

for item in softSensorDataList:
    item['readingDate'] = datetime.strptime(item['readingDate'], '%Y-%m-%dT%H:%M:%S')
    item['sensorValue'] = item['sensorValue']*1.0

# Define o esquema com tipos consistentes
schema = StructType([
    StructField("sensorId", LongType(), True),
    StructField("softSensorId", LongType(), True),
    StructField("readingDate", TimestampType(), True),
    StructField("sensorValue", DoubleType(), True),
    StructField("deviceName", StringType(), True),
    StructField("deviceId", LongType(), True),
    StructField("sensorType", StringType(), True),
    StructField("customName", StringType(), True)
])

dfSoftSensorDataSpark = spark.createDataFrame(softSensorDataList,schema)

dfSoftSensorDataSpark.write.format("delta").mode("overwrite").saveAsTable("softsensordata")


StatementMeta(, d3cbe9f8-95e4-4e08-8507-87a8ec067e4b, 14, Finished, Available, Finished)

In [13]:

display(dfSoftSensorDataSpark)

StatementMeta(, d3cbe9f8-95e4-4e08-8507-87a8ec067e4b, 15, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 488eca35-5a9b-407d-87f3-fae48a5bc272)