# Thumbs up/down

## Camera

In [43]:
import cv2
from io import BytesIO
import IPython.display
import PIL.Image
import numpy as np

# To reduce output size while working with vscode
%config InlineBackend.figure_format = 'png'

In [44]:
def camera_init():
    camera = cv2.VideoCapture(0)
    camera.set(3, 640)
    camera.set(4, 480)
    return camera

def camera_clenup(camera):
    camera.release()

def camera_capture(camera=None):
    if not camera:
        camera_temp = camera_init()
        ret, image = camera_temp.read()
    else:
        ret, image = camera.read()
    if not ret:
        return None
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    if not camera:
        camera_clenup(camera_temp)
    return image

def camera_stream_widget(camera_widget):
    global running
    camera = camera_init()
    try: 
        while running:
            frame = camera_capture(camera)
            stream = BytesIO()
            PIL.Image.fromarray(np.uint8(frame)).save(stream, format="jpeg")
            camera_widget.value = stream.getvalue()
    except KeyboardInterrupt:
        pass
    camera_clenup(camera)
    print("Stream Stopped")

## Model

In [61]:
import torch
import torchvision

TRAIN_THUMBS_UP = os.path.normpath("train/up/")
TRAIN_THUMBS_DOWN = os.path.normpath("train/down/")

model = None
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

def model_create(b=None):
    global device

    transforms = torchvision.transforms.Compose([
        torchvision.transforms.ColorJitter(0.2, 0.2, 0.2, 0.2),
        torchvision.transforms.Resize((224, 224)),
        torchvision.transforms.ToTensor(),
        torchvision.transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])

    normalize = torchvision.transforms.Normalize(
        mean=[0.485, 0.456, 0.406],
        std=[0.229, 0.224, 0.225])

    global model
    model = torchvision.models.resnet18(pretrained=True)
    model.fc = torch.nn.Linear(512, 1)

    model = model.to(device)

def model_predict(b=None, frame=None):
    if frame is None: 
        return 0.5
    frame = torchvision.transforms.functional.to_tensor(PIL.Image.fromarray(frame)).to(device)
    frame = frame[None, ...]
    return torch.nn.functional.sigmoid(model(frame)).detach().cpu().numpy().flatten()[0]

def model_retrain(b=None):
    global model


model_create()

## Live execution

In [62]:
import threading
from ipywidgets.widgets import Button, Label, FloatSlider, FloatProgress, FloatText, Image
from ipywidgets.widgets import Layout, HBox, VBox
from IPython.display import display
from time import sleep

def saved_thumbs_up():
    return len(os.listdir("train/up"))

def saved_thumbs_down():
    return len(os.listdir("train/down"))

def save_thumbs_up(b):
    count = saved_thumbs_up()
    b.description = f"Add frame to thumbs up train dataset ({count + 1})"
    with open(f"train/up/{count}.png", "wb") as f:
        f.write(camera_widget.value)

def save_thumbs_down(b):
    count = saved_thumbs_down()
    b.description = f"Add frame to thumbs down train dataset ({count + 1})"
    with open(f"train/down/{count}.png", "wb") as f:
        f.write(camera_widget.value)

running = True

camera_widget = Image(width=600)
b_thumbs_up = Button(
    description=f"Add frame to thumbs up train dataset ({saved_thumbs_up()})", 
    icon="fa-thumbs-up", 
    button_style="success", 
    layout=Layout(width="300px"))
b_thumbs_up.on_click(save_thumbs_up)
b_thumbs_down = Button(
    description=f"Add frame to thumbs down train dataset ({saved_thumbs_down()})", 
    icon="fa-thumbs-down", 
    button_style="danger", 
    layout=Layout(width="300px"))
b_thumbs_down.on_click(save_thumbs_down)
b_retrain = Button(
    description=f"Retrain the model with the newest dataset", 
    icon="fa-refresh",
    layout=Layout(width="300px"))
train_progress = FloatProgress(value=100, description="Progress:")
loss = FloatText(value=0, description="Loss:", disabled=True)
accuracy = FloatText(value=0, description="Accuracy:", disabled=True)
slider_down = FloatSlider(value=50, description="Down:", disabled=True)
slider_up = FloatSlider(value=50, description="Up:", disabled=True)

all_widgets = VBox((
    camera_widget, 
    Label(value="1. Collecting testing data:"),
    b_thumbs_up,
    b_thumbs_down,
    Label(value="2. Training the model:"),
    b_retrain,
    train_progress,
    loss,
    accuracy,
    Label(value="3. Making predictions:"),
    slider_down,
    slider_up,
))

def live_execution():
    global camera_widget
    IPython.display.display(all_widgets)

    global running
    camera = camera_init()
    try: 
        while running:
            frame = camera_capture(camera)
            stream = BytesIO()
            PIL.Image.fromarray(np.uint8(frame)).save(stream, format="jpeg")
            camera_widget.value = stream.getvalue()
            predictions = model_predict(frame=frame)
            slider_down.value = predictions * 100
            slider_up.value = 100 - predictions * 100
    except KeyboardInterrupt:
        pass
    print("Stream Stopped")
    camera_clenup(camera)

thread = threading.Thread(target=live_execution)
thread.start()

VBox(children=(Image(value=b'', width='600'), Label(value='1. Collecting testing data:'), Button(button_style=…

In [63]:
running = False