Importamos las librerías necesarias

In [None]:
import thingspeak, time, random
import cv2, json, re, ast

import numpy as np
import paho.mqtt.client as mqtt
import matplotlib.pyplot as plt

from PIL import Image as PIL_Image
from datetime import datetime
from time import time, sleep
from PIL import Image
from pynq.overlays.base import BaseOverlay
from pynq_peripherals import ArduinoSEEEDGroveAdapter

Definimos la corresponcencia hardware-software

In [None]:
base = BaseOverlay("base.bit")

adapter = ArduinoSEEEDGroveAdapter(base.ARDUINO,
                                  D2='grove_led_stick',
                                  D4='grove_buzzer',
                                  D6='grove_usranger')

led_stick = adapter.D2
buzzer = adapter.D4
usranger = adapter.D6

Estas funciones reproducen un sonido al iniciar y finalizar el proceso de detección de anomalías en el dispositivo PYNQ

In [None]:
def inicio_programa():
    """
    Toca la melodia de inicio de programa
    """
    buzzer.play_note("b", 100*2)
    sleep(0.025)
    buzzer.play_note("a", 100*2)
    sleep(0.025)
    buzzer.play_note("g", 200*2)
    sleep(0.3)
    buzzer.play_note("b", 100*2)
    sleep(0.025)
    buzzer.play_note("g", 150*2)

In [None]:
def fin_programa():
    """
    Toca la melodia de fin del programa
    """
    buzzer.play_note("f", 100*2)
    sleep(0.025)
    buzzer.play_note("g", 100*2)
    sleep(0.025)
    buzzer.play_note("a", 250*2)
    sleep(0.025)
    buzzer.play_note("d", 100*2)
    sleep(0.025)
    buzzer.play_note("d", 150*2)
    sleep(0.2)
    buzzer.play_note("e", 100*2)
    sleep(0.025)
    buzzer.play_note("f", 100*2)
    sleep(0.025)
    buzzer.play_note("g", 250*2)
    sleep(0.025)
    buzzer.play_note("c", 100*2)
    sleep(0.025)
    buzzer.play_note("c", 150*2)

La función `webcam` toma una foto y la guarda según el directorio deseado añadiendole también un timestamp para permitir la trazabilidad de la foto por último redimensiona la imagen ya que `Anomalib 0.3.7` requiere que la imagen tenga dimensiones de misma anchura y altura.

In [None]:
def webcam():
    timestamp = re.sub(r'[- :]', '_', str(datetime.now())[:19])
    orig_img_path = f'./data/webcam_photos/new_photos/webcam_{timestamp}.jpg'
    !fswebcam  --no-banner --save {orig_img_path} -d /dev/video0 2> /dev/null
    img = np.array(Image.open(orig_img_path)) # (288, 352, 3)
    # img = Image.open(orig_img_path) # (288, 352, 3)
    img_new = cv2.resize(img, (288,288))
    return img_new, timestamp

Con esta función guardamos la foto en función de si el modelo la ha considerado o no una anomalía

In [None]:
def save_photo(anomaly, photo, timestamp):
    if anomaly:
        cv2.imwrite(f'./data/webcam_photos/new_abnormal/webcam_{timestamp}.png', photo)
    else:
        cv2.imwrite(f'./data/webcam_photos/new_normal/webcam_{timestamp}.png', photo)

A continuación definimos como serán las alertas dependiendo de si la imagen es anómala o no.

In [None]:
def leds_alert(anomaly):
    if not anomaly:
        for i in range(10):
            led_stick.set_pixel(i, 0x00FF00)
        led_stick.show()
    else:
        for i in range(10):
            led_stick.set_pixel(i, 0xFF0000)
        led_stick.show()
    return None

def buzz_alert(anomaly):
    if not anomaly:
        buzzer.play_tone(1000, 100)
        buzzer.play_tone(2150, 100)
    else:
        buzzer.play_tone(100, 400)
    return None

def get_alerts(prediction,sound,light):
    if light:
        leds_alert(prediction)
    if sound:
        buzz_alert(prediction)
    sleep(1)
    led_stick.clear()
    return None

A continuación definimos las funciones necesarias en las Comunicaciones MQTT.

In [None]:
def pub_photo(photo, c, timestamp, host, port):
    client = mqtt.Client()
    client.connect(host, port, keepalive=180)
    client.publish("photo",json.dumps({c:photo.tolist(),'timestamp':timestamp}))
    return None

def sub_results(host, port):
    def on_connect(client, userdata, flags, rc):
        #print('Connected to RESULTS '+str(rc))
        client.subscribe('results')
        
    def on_message(client, userdata, msg):
        data = msg.payload.decode("utf8")
        with open('./data/pred_label.txt','w') as f:
            f.write(data)
        #print('RESULTS written')
        client.disconnect()
    client = mqtt.Client()
    client.on_connect = on_connect
    client.on_message = on_message

    client.connect(host, port, keepalive=180)
    client.loop_forever()
    return None

Para la monitorización de los resultados a tiempo real utilizamos la plataforma `thinspeak`, para la cual es necesarios disponer de un código de canal y una contraseña.

In [None]:
channel_id = 1978844
write_key = "64FJFZJ35WMXK34T"
channel = thingspeak.Channel(id=channel_id,api_key=write_key)

A continuación se encuentra todo el proceso correspondiente al dispositivo PYNQ, descrito en la sección 5.1 del TFG.

In [None]:
def anomalias_industriales(host, port):
    inicio_programa()
    print("Starting Program")
    init_distance = int(usranger.get_distance())
    while base.buttons.read() == 0:
        while int(usranger.get_distance()) == init_distance:
            sleep(1)
            if base.buttons.read() != 0:
                break
        if base.buttons.read() != 0:
            break 
        start = time()
        print("Object detected")
        photo, timestamp = webcam()
        print('photo taken')
        for i in range(3):
            pub_photo(photo[:,:,i],i,timestamp, host, port) 
        print('submitting photo')
        sub_results(host, port)
        f = open("./data/pred_label.txt", "r")
        anomaly = int(f.read())
        print("Anomaly: ", bool(anomaly), 'Timestamp: ', timestamp)
        get_alerts(anomaly, base.switches[0].read(), base.switches[1].read())
        save_photo(anomaly, photo, timestamp)
        totaltime = round(time()-start, 4)
        channel.update({"field1":anomaly,"field2":totaltime})
        print('TIME TAKEN: ', totaltime)
        sleep(1)
    return 

In [None]:
host = "192.168.137.70"
port = 1883

anomalias_industriales(host, port)