In [2]:
# Full reset of the camera
!echo 'dlinano' | sudo -S systemctl restart nvargus-daemon && printf '\n'
# Check device number
!ls -ltrh /dev/video*

# CSI Camera (Raspberry Pi Camera Module V2)
from jetcam.csi_camera import CSICamera
camera = CSICamera(width=224, height=224)

camera.running = True
print("camera created")


crw-rw----+ 1 root video 81, 0 12월  2 18:58 /dev/video0
camera created


In [3]:
import torchvision.transforms as transforms
from dataset import ImageClassificationDataset

TASK = 'mask'
CATEGORIES = ['mask_on', 'mask_off']
DATASETS = ['A', 'B']

TRANSFORMS = transforms.Compose([
    transforms.ColorJitter(0.2, 0.2, 0.2, 0.2),
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

datasets = {}
for name in DATASETS:
    datasets[name] = ImageClassificationDataset(TASK + '_' + name, CATEGORIES, TRANSFORMS)
    
print("{} task with {} categories defined".format(TASK, CATEGORIES))

mask task with ['mask_on', 'mask_off'] categories defined


In [5]:
import ipywidgets
import traitlets
from IPython.display import display
from jetcam.utils import bgr8_to_jpeg

# initialize active dataset
dataset = datasets[DATASETS[0]]

# unobserve all callbacks from camera in case we are running this cell for second time
camera.unobserve_all()

# create image preview
camera_widget = ipywidgets.Image()
traitlets.dlink((camera, 'value'), (camera_widget, 'value'), transform=bgr8_to_jpeg)

data_collection_widget = ipywidgets.HBox([camera_widget])

print("data_collection_widget created")

data_collection_widget created


In [6]:
import torch
import torchvision


device = torch.device('cuda')

# RESNET 18
model = torchvision.models.resnet18(pretrained=True)
model.fc = torch.nn.Linear(512, len(dataset.categories))
    
model = model.to(device)
model.load_state_dict(torch.load('10model.pth'))

print("model loaded")

<All keys matched successfully>

In [7]:
import threading
import time
from utils import preprocess
import torch.nn.functional as F

state_widget = ipywidgets.ToggleButtons(options=['stop', 'live'], description='state', value='stop')
prediction_widget = ipywidgets.Text(description='prediction')
score_widgets = []
for category in dataset.categories:
    score_widget = ipywidgets.FloatSlider(min=0.0, max=1.0, description=category, orientation='vertical')
    score_widgets.append(score_widget)

def live(state_widget, model, camera, prediction_widget, score_widget):
    global dataset
    while state_widget.value == 'live':
        image = camera.value
        preprocessed = preprocess(image)
        output = model(preprocessed)
        output = F.softmax(output, dim=1).detach().cpu().numpy().flatten()
        category_index = output.argmax()
        prediction_widget.value = dataset.categories[category_index]
        for i, score in enumerate(list(output)):
            score_widgets[i].value = score
            
def start_live(change):
    if change['new'] == 'live':
        execute_thread = threading.Thread(target=live, args=(state_widget, model, camera, prediction_widget, score_widget))
        execute_thread.start()

state_widget.observe(start_live, names='value')

live_execution_widget = ipywidgets.VBox([
    ipywidgets.HBox(score_widgets),
    prediction_widget,
    state_widget
])

print("live_execution_widget created")

live_execution_widget created


In [8]:
BATCH_SIZE = 8

optimizer = torch.optim.Adam(model.parameters())

epochs_widget = ipywidgets.IntText(description='epochs', value=1)
loss_widget = ipywidgets.FloatText(description='loss')
accuracy_widget = ipywidgets.FloatText(description='accuracy')
progress_widget = ipywidgets.FloatProgress(min=0.0, max=1.0, description='progress')

def train_eval(is_training):
    global BATCH_SIZE, LEARNING_RATE, MOMENTUM, model, dataset, optimizer, eval_button, train_button, accuracy_widget, loss_widget, progress_widget, state_widget
    
    try:
        train_loader = torch.utils.data.DataLoader(
            dataset,
            batch_size=BATCH_SIZE,
            shuffle=True
        )

        if is_training:
            model = model.train()
        else:
            model = model.eval()
        while epochs_widget.value > 0:
            i = 0
            sum_loss = 0.0
            error_count = 0.0
            for images, labels in iter(train_loader):
                # send data to device
                images = images.to(device)
                labels = labels.to(device)

                if is_training:
                    # zero gradients of parameters
                    optimizer.zero_grad()

                # execute model to get outputs
                outputs = model(images)

                # compute loss
                loss = F.cross_entropy(outputs, labels)

                if is_training:
                    # run backpropogation to accumulate gradients
                    loss.backward()

                    # step optimizer to adjust parameters
                    optimizer.step()

                # increment progress
                error_count += len(torch.nonzero(outputs.argmax(1) - labels).flatten())
                count = len(labels.flatten())
                i += count
                sum_loss += float(loss)
                progress_widget.value = i / len(dataset)
                loss_widget.value = sum_loss / i
                accuracy_widget.value = 1.0 - error_count / i
                
            if is_training:
                epochs_widget.value = epochs_widget.value - 1
            else:
                break
    except e:
        pass
    model = model.eval()
    state_widget.value = 'live'

train_eval(is_training=False)
                     
train_eval_widget = ipywidgets.VBox([
    epochs_widget,
    progress_widget,
    loss_widget,
    accuracy_widget,
])

print("trainer configured and train_eval_widget created")

trainer configured and train_eval_widget created


In [9]:
# Combine all the widgets into one display
all_widget = ipywidgets.VBox([
    ipywidgets.HBox([live_execution_widget]), 
    train_eval_widget,
    #model_widget
])

display(all_widget)

VBox(children=(HBox(children=(VBox(children=(HBox(children=(FloatSlider(value=0.0, description='mask_on', max=…

In [None]:
### Bell Control & Mask Detection & Server Communication function
import RPi.GPIO as GPIO
import time
import requests
import json

# HTTP Request
def httpRequest(method, url, is_urlencoded=True):
    if method == 'GET':
        response = requests.get(url=url)
    elif method == 'POST':
        if is_urlencoded is True:
            response = requests.post(url=url, headers={'Content-Type': 'application/x-www-form-urlencoded'})
        else:
            response = requests.post(url=url, headers={'Content-Type': 'application/json'})
    
    dict_meta = {'status_code':response.status_code, 'ok':response.ok, 'encoding':response.encoding, 'Content-Type': response.headers['Content-Type']}
    
    return {**dict_meta, **response.json()}

# Bell state up&download
def getBell():
    url  = 'https://asia-northeast2-transportation-helper.cloudfunctions.net/api/stop?bus=937'
    response = httpRequest(method='GET', url=url)

    print(response)


def updateBell():
    bell_state = '1';
    url  = 'https://asia-northeast2-transportation-helper.cloudfunctions.net/api/stop?bus=937&action=' + bell_state
    response = httpRequest(method='POST', url=url)

    print(response)

# Pin Definitions
green_pin = 18  # BOARD pin 12, BCM pin 18 IN1 - Green LED
blue_pin = 23  # BOARD pin 16, BCM pin 23 IN1 - Blue LED
button_pin = 25 # BOARD pin 22, BCM pin 25 IN

GPIO.setmode(GPIO.BCM)

GPIO.setup(green_pin, GPIO.OUT, initial=GPIO.LOW)
GPIO.setup(button_pin, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
GPIO.setup(blue_pin, GPIO.OUT, initial=GPIO.HIGH)

BUTTON = GPIO.input(button_pin)

print("Starting demo now! Press CTRL+C to exit")
try:
    while True:
        if(prediction_widget.value == 'mask_on'):
            # Pass Mode 
            GPIO.setup(green_pin, GPIO.OUT, initial=GPIO.LOW)
            print('Entry Accepted')
            time.sleep(5)
            BUTTON = GPIO.input(button_pin)
            
        if(prediction_widget.value == 'mask_off'):
            # Alert
            GPIO.setup(green_pin, GPIO.OUT, initial=GPIO.HIGH)
            print('Entry Rejected')
            time.sleep(5)
            BUTTON = GPIO.input(button_pin)
        
        if(BUTTON == 1):
            BUTTON = 0
            GPIO.setup(blue_pin, GPIO.OUT, initial=GPIO.LOW)
            print('Open door')
            getBell()
            time.sleep(5)
            BUTTON = GPIO.input(button_pin)
        
        #if(response == 0):
        #    getBell()
        #else:
        #    GPIO.setup(blue_pin, GPIO.OUT, initial=GPIO.HIGH)
        #    response = 0
            
except KeyboardInterrupt:
    GPIO.setup(blue_pin, GPIO.OUT, initial=GPIO.LOW)
    GPIO.setup(green_pin, GPIO.OUT, initial=GPIO.LOW)
    GPIO.setup(button_pin, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
    
finally:
    GPIO.cleanup()

Starting demo now! Press CTRL+C to exit
Entry Rejected
Open door
{'status_code': 404, 'ok': False, 'encoding': 'utf-8', 'Content-Type': 'application/json; charset=utf-8'}
Entry Accepted
Open door
{'status_code': 404, 'ok': False, 'encoding': 'utf-8', 'Content-Type': 'application/json; charset=utf-8'}
Entry Accepted
Open door
{'status_code': 404, 'ok': False, 'encoding': 'utf-8', 'Content-Type': 'application/json; charset=utf-8'}
Entry Rejected
