In [6]:
# importing basic libraries
from __future__ import annotations
import xgboost as xgb
import pandas as pd
import numpy as np
import asyncio
import json
from memphis import Memphis, MemphisError, MemphisConnectError


In [7]:
import os
from dotenv import load_dotenv
load_dotenv()

from dataclasses import dataclass

@dataclass
class Config:
    memphis_host: str = os.environ.get("HOST")
    memphis_user: str = os.environ.get("USER")
    memphis_pwd: str = os.environ.get("PWD")
    memphis_id: str = os.environ.get("ID")
    
config = Config()

In [4]:
def update_neighbors(df_aux):
    
    df_filled = df_aux.copy()
   
    # Definindo os índices das vizinhanças que queremos preencher
    neighborhood = [(i, j) for i in range(-1, 2) for j in range(-1, 2) if (i, j) != (0, 0)]
    fields = ['temp_day-1_0_0', 'temp_day-1_0_1', 'temp_day-1_0_2', 'temp_day-1_1_0', 'temp_day-1_1_2', 'temp_day-1_2_0', 'temp_day-1_2_1', 'temp_day-1_2_2']

    # Iterando sobre os registros do DataFrame original
    for index, row in df_filled.iterrows():
        x = index[0]
        y = index[1]
        default = 90 # value of no fire
        # print(x, y, default)
        for i in range(len(neighborhood)):
            new_x, new_y = neighborhood[i][0] + x, neighborhood[i][1] + y
            # print(new_x, new_y)

            # Verificando se os índices vizinhos estão dentro dos limites do DataFrame
            field = fields[i]
            if 0 <= new_x < 30 and 0 <= new_y < 30:
                value = df_filled.loc[(new_x, new_y), 'temperature']
                df_filled.loc[(x, y), field] = value
            else:
                df_filled.loc[(x, y), field] = default
   
    return df_filled

In [5]:
# Column list
cols = ['geospatial_x', 'geospatial_y', 'temperature', 
        'temp_day-1_0_0', 'temp_day-1_0_1', 'temp_day-1_0_2',
        'temp_day-1_1_0', 'temp_day-1_1_2',
        'temp_day-1_2_0', 'temp_day-1_2_1', 'temp_day-1_2_2',
        'alarm']
df_base = pd.DataFrame(columns=cols)

# Create matrix  x and y        
x = np.arange(0, 30)
y = np.arange(0, 30)
coords = pd.MultiIndex.from_product([x, y])

# Create dataframe and fill with NaN
# Extrair níveis do MultiIndex em colunas separadas
df_base['geospatial_x'] = coords.get_level_values(0)
df_base['geospatial_y'] = coords.get_level_values(1)

# Definir MultiIndex como índice do DataFrame
df_base.set_index(['geospatial_x', 'geospatial_y'], inplace=True)

df = df_base.copy()

In [8]:
# get new records of sensors
df_sensors = pd.DataFrame(columns=['day', 'geospatial_x', 'geospatial_y', 'temperature'])

async def main():
    async def msg_handler(msgs, error, context):
        try:
            for msg in msgs:
                print("message: ", msg.get_data())
                await msg.ack()
                if error:
                    print(error)
        except (MemphisError, MemphisConnectError, MemphisHeaderError) as e:
            print(e)
            return
        
    try:
        memphis = Memphis()
        await memphis.connect(host=config.memphis_host, username=config.memphis_user, password=config.memphis_pwd, account_id=config.memphis_id)
        consumer = await memphis.consumer(station_name="zakar-fire-alerts", consumer_name="prediction")
        
        while True:
            batch = await consumer.fetch(batch_size=900)
            if batch is not None:
                for msg in batch:
                    serialized_record = msg.get_data()
                    record = json.loads(serialized_record)
                    df_sensors.append(record['day'], record['geospatial_x'], record['geospatial_y'], record['temperature'])
                    # Acknowledge the message to the MEMPHIS queue
                    await msg.ack()
            else:
                break 
        
    except (MemphisError, MemphisConnectError) as e:
        print(e)
        
       
if __name__ == "__main__":
    asyncio.run(main())

RuntimeError: asyncio.run() cannot be called from a running event loop

In [None]:
# update df_base with the new sensors of the date -1 (actual)
for one_sensor in df_sensors:
    day = one_sensor[0]
    x = one_sensor[1]
    y = one_sensor[2]
    temperature = one_sensor[3]
    # print(x, y, temperature)
    idx = (x, y)
    df.loc[idx, 'temperature'] = temperature

In [None]:
# get the average temperature 
average = df["temperature"].mean()
print(average)

# update df_sensors with the neighbors
df = update_neighbors(df)

In [None]:
features = ['geospatial_x', 'geospatial_y', 'temperature', 
        'temp_day-1_0_0', 'temp_day-1_0_1', 'temp_day-1_0_2',
        'temp_day-1_1_0', 'temp_day-1_1_2',
        'temp_day-1_2_0', 'temp_day-1_2_1', 'temp_day-1_2_2']


In [None]:

df = df.reset_index()
df = df[features]
# convert fields to int
df[features] = df[features].astype(int)

In [None]:
# loading the modelo
loaded_model = xgb.XGBClassifier()
loaded_model.load_model('xgb_model.bin')

In [None]:
# Obter as probabilidades das previsões para a classe positiva (1)
y_prob = loaded_model.predict_proba(df)[:, 1]

In [None]:
# Definir o valor de corte personalizado para a classificação
valor_corte = 0.45 # Escolha um valor de corte adequado

In [None]:
# Transformar as probabilidades em rótulos discretos com base no valor de corte
y_pred = (y_prob >= valor_corte).astype(int)

In [None]:
matrix = y_pred.reshape(30, 30)

In [None]:
# Encontrar índices onde matriz == 1
list_predict = []
idx = np.where(matrix == 1)

print('ALERTS TO BE SENT')
# Imprimir coordenadas 
for i, j in zip(idx[0], idx[1]):
    print(f'Coordinate: ({i}, {j})')
    list_predict.append((i,j))

In [None]:
async def main():
    try:
        
        producer = await memphis.producer(station_name="zakar-fire-alerts", producer_name="prediction") # you can send the message parameter as dict as well
        headers = Headers()
        headers.add("key", "value") 
        for alarm in list_predict:
            await producer.produce(bytearray({"event_day": alarm['day'], "notification_day": alarm['day'], "geospatial_x": alarm['geospatial_x'], "geospatial_y": alarm['geospatial_x']}), headers=headers)
        
    except (MemphisError, MemphisConnectError, MemphisHeaderError, MemphisSchemaError) as e:
        print(e)
        
    finally:
        await memphis.close()
        
if __name__ == "__main__":
    asyncio.run(main())

In [None]:
# Criar conjuntos a partir das listas
set_predict = set(list_predict)
set_official = set(list_official)

# Elementos iguais
elements_in_both_lists = set_predict.intersection(set_official)
print("Both lists:")
for element in elements_in_both_lists:
    print(element)