In [2]:
import cv2
import datetime
import os
import serial
import shutil
import time
import torch

import matplotlib.image as mpimg
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import torch.nn.functional as F
import seaborn as sb

from collections import OrderedDict
# from labjack import ljm
from PIL import Image
from torch import nn
from torch import optim
from torchvision import datasets, transforms, models
from typing import Tuple

from skimage.metrics import structural_similarity as compare_ssim

In [3]:
def take_picture(cam_ID: int, width: int, height: int, name_ID: int, crop: bool = False):
    cap = cv2.VideoCapture(cam_ID)
    cap.set(cv2.CAP_PROP_FRAME_WIDTH, width)
    cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
    path = '/home/michael/Cax_Box_Pics/temp'
    if cap.isOpened():
        for i in range(20):
            _,_ = cap.read()
        ret, frame = cap.read()
        if crop:
            crop_img = frame[0:-100, 150:-75]
            cv2.imwrite(
                os.path.join(path , "litterbox_cropped_" + str(name_ID) + ".png"),
                crop_img)
        else:
            cv2.imwrite(
                os.path.join(path , "litterbox_" + str(name_ID) + ".png"),
                frame)
        cap.release()
        cv2.destroyAllWindows()
        return frame

# def toggle_fan(handle, fan_state: int) -> None:
#     ljm.eWriteName(handle, "FIO3", fan_state)

def toggle_fan(fan_state: int) -> None:
    s = serial.Serial('/dev/ttyACM0', 9600)
    if fan_state == 0:
        s.write(b'fan_off')
        s.write(b'\n')
    elif fan_state == 1:
        s.write(b'fan_on')
        s.write(b'\n')
    s.close()

In [4]:
def load_checkpoint(filepath: str):
    """ Reload a saved .pth model."""
    checkpoint = torch.load(filepath, map_location='cpu')

    if checkpoint['arch'] == 'vgg16':
        model = models.vgg16(pretrained=True)
        for param in model.parameters():
            param.requires_grad = False
    else:
        print("Architecture not recognized.")

    model.class_to_idx = checkpoint['class_to_idx']
    classifier = nn.Sequential(
        OrderedDict([('fc1', nn.Linear(25088, 5000)),
                    ('relu', nn.ReLU()),
                    ('drop', nn.Dropout(p=0.5)),
                    ('fc2', nn.Linear(5000, 102)),
                    ('output', nn.LogSoftmax(dim=1))]))
    model.classifier = classifier
    model.load_state_dict(checkpoint['model_state_dict'])
    return model


def process_image(image_path: str) -> np.array:
    ''' Image preprocessor for inference.

        Scales, crops, and normalizes a PIL image for a PyTorch model,
        returns an Numpy array
    '''

    # Process a PIL image for use in a PyTorch model
    pil_image = Image.open(image_path)

    # Resize
    if pil_image.size[0] > pil_image.size[1]:
        pil_image.thumbnail((5000, 256))
    else:
        pil_image.thumbnail((256, 5000))

    # Crop
    left_margin = (pil_image.width-224)/2
    bottom_margin = (pil_image.height-224)/2
    right_margin = left_margin + 224
    top_margin = bottom_margin + 224

    pil_image = pil_image.crop((left_margin, bottom_margin,
                                right_margin, top_margin))

    # Normalize
    np_image = np.array(pil_image)/255
    mean = np.array([0.485, 0.456, 0.406])
    std = np.array([0.229, 0.224, 0.225])
    np_image = (np_image - mean) / std

    # PyTorch expects the color channel to be the first dimension but it's the
    # third dimension in the PIL image and Numpy array
    # Color channel needs to be first; retain the order of the other two
    # dimensions.
    np_image = np_image.transpose((2, 0, 1))

    return np_image


def predict(image_path, model, topk: int = 3):
    ''' Predict the class of an image using trained deep learning model.'''

    image = process_image(image_path)

    # Convert image to PyTorch tensor first
    
    #CUDA
    # image = torch.from_numpy(image).type(torch.cuda.FloatTensor)
    
    #CPU
    image = torch.from_numpy(image).type(torch.FloatTensor)

    # Returns a new tensor with a dimension of size one inserted
    # at the specified position.
    image = image.unsqueeze(0)
    output = model.forward(image)
    probabilities = torch.exp(output)

    # Probabilities and the indices of those probabilities
    # corresponding to the classes
    top_prob, top_ind = probabilities.topk(topk)

    # Convert to lists
    top_prob = top_prob.detach().type(torch.FloatTensor).numpy().tolist()[0]
    top_ind = top_ind.detach().type(torch.FloatTensor).numpy().tolist()[0]

    # Convert topk_indices to the actual class labels using class_to_idx
    # Invert the dictionary so you get a mapping from index to class.

    idx_to_class = {value: key for key, value in model.class_to_idx.items()}
    top_classes = [idx_to_class[index] for index in top_ind]

    return top_prob, top_classes

def _check_image(image_path: str, inf_model):
    probs, classes = predict(image_path, model)
    print(classes)
    print(probs)
    print('\n')
    return classes[0], float(probs[0])

In [None]:
%matplotlib inline
path = '/home/michael/Cax_Box_Pics/temp'
config_path = '/home/michael/Cax_Box_Pics'
config_path = os.path.join(config_path , "cax_box_config" + ".conf")
#model = load_checkpoint('May_22_2021_basic_vgg.pth')
model = load_checkpoint('Sep_12_2021_basic_vgg.pth')
# handle = ljm.openS("T7", "USB", "ANY")
toggle_fan(0)

with open(config_path) as f:
    image_counter = [line for line in f]

image_counter = int(image_counter[0])
image_last = take_picture(0, 640, 480, image_counter, crop = True)
gray_last = cv2.cvtColor(image_last, cv2.COLOR_BGR2GRAY)
previous_state = ["0"]
default_wait_time = 30
extended_wait_time = 300
wait_time = default_wait_time

save_counters = {"0": 1, "1": 1, "2": 1}

while True:
    try:
        image = take_picture(0, 640, 480, image_counter, crop = True)
        image_path = os.path.join(path , "litterbox_cropped_"
                                  + str(image_counter) + ".png")
        f = open(config_path, "w")
        f.write(str(image_counter + 1))
        f.close()

        box_state_inf, box_state_prob = _check_image(image_path, model)
        save_counters[box_state_inf] += 1

        if box_state_inf == "0" and save_counters["0"] % 20 == 0:
            shutil.copy(image_path, 'inf//' + str(box_state_inf))
        elif box_state_inf == "1" and save_counters["1"] % 20 == 0:
            shutil.copy(image_path, 'inf//' + str(box_state_inf))
        elif box_state_inf == "2":
            shutil.copy(image_path, 'inf//' + str(box_state_inf))
        elif box_state_inf == "3":
            pass

        gray_current = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
        score, diff = compare_ssim(gray_last, gray_current, full=True)

        print("Box State Probabaility: {:.3f} \n".format(box_state_prob))
        print("Structural Diff: {:.2f} \n".format(score))
        print("Prev state:  {}, Current State: {}".format(previous_state[0], box_state_inf))

        if box_state_prob >= 0.90 and score <= 0.90:
            if box_state_inf == "1" and previous_state[0] == "0":
                toggle_fan(1)
                previous_state[0] = "1"
                wait_time = extended_wait_time
            elif box_state_inf == "0" and previous_state[0] == "1":
                previous_state[0] = "0"
                wwait_time = extended_wait_time
            if box_state_inf == "2":
                previous_state[0] = "0"
                wait_time = default_wait_time
        gray_last = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
        os.remove(image_path)
    except Exception as e:
        print(e)
    image_counter += 1
    time.sleep(wait_time)
    toggle_fan(0)
    if image_counter % 10 == 0:
        print(datetime.datetime.now())

toggle_fan(0)
! ls /dev | grep ttyACM

In [5]:
toggle_fan(1)