# Setup

In [None]:
%conda install pytorch torchvision torchaudio cudatoolkit = 11.3 - c pytorch


In [None]:
%conda install numpy pandas matplotlib
%pip install opencv-python keyboard Pillow pyautogui wandb

In [2]:
import os, time, shutil, copy, uuid
import numpy as np
import cv2
import keyboard as kb
from PIL import ImageGrab, ImageFilter
import pyautogui
from pathlib import Path
import pandas as pd
import matplotlib.pyplot as plt
import wandb

import torch
import torchvision
import torch.nn as nn
from torchvision.io import read_image
from torch.utils.data import Dataset
from torchvision import datasets, transforms, utils, models
import torch.optim as optim
from torch.optim import lr_scheduler


(1055, 10)


In [None]:
# Gets coordinates for bbox
kb.wait('enter')

currentMouseX, currentMouseY = pyautogui.position()
print(f'({currentMouseX}, {currentMouseY})')


In [None]:
BBOX = (2, 113, 1275, 1027)
DATA_DIR = 'data'
EPOCHS = 25
LEARNING_RATE = 0.001
MOMENTUM = 0.9
STEP_SIZE = 7
GAMMA = 0.1

# Data Collection

In [9]:
# Resets data folder
dirpath = Path('data')
if dirpath.exists():
    shutil.rmtree(dirpath)

os.makedirs(os.path.join('data', 'w'))
os.makedirs(os.path.join('data', 'a'))
os.makedirs(os.path.join('data', 's'))
os.makedirs(os.path.join('data', 'd'))
os.makedirs(os.path.join('data', 'none'))

In [None]:
def shoot_screen():
    screenshot = ImageGrab.grab(bbox=BBOX)
    return screenshot

In [10]:
kb.wait('enter')
print('Starting data collection...')

while True:
    screenshot = shoot_screen()
    filename = str(uuid.uuid4()) + '.jpg'

    if kb.is_pressed('w'): dir = 'w'
    elif kb.is_pressed('a'): dir = 'a'
    elif kb.is_pressed('s'): dir = 's'
    elif kb.is_pressed('d'): dir = 'd'
    else: dir = 'none'

    screenshot.save(f'data/{dir}/{filename}')

    if kb.is_pressed('esc'):
        print('Exiting data collection!')
        break

Starting data collection...
Exiting data collection!


# Training

## Load Data

In [None]:
wandb.login()
wandb.init(project="paper-io-ai", config={
    "epochs": EPOCHS,
    "learning_rate": LEARNING_RATE,
    "momentum": MOMENTUM,
    "step_size": STEP_SIZE,
    "gamma": GAMMA
})

In [None]:
data_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize([0.485], [0.229])
])

data = datasets.ImageFolder(DATA_DIR, data_transform)

train_size = int(0.8 * len(data))
val_size = len(data) - train_size

train_data, val_data = torch.utils.data.random_split(
    data, [train_size, val_size])
image_datasets = {
    'train': train_data,
    'val': val_data
}
dataloaders = {x: torch.utils.data.DataLoader(image_datasets[x], batch_size=4,
                                              shuffle=True, num_workers=4) for x in ['train', 'val']}
dataset_sizes = {x: len(image_datasets[x]) for x in ['train', 'val']}
class_names = data.classes

device = torch.device("cuda:0")
torch.cuda.get_device_name(device)


## Visualize Images

In [None]:
def imshow(inp, title=None):
    """Imshow for Tensor."""
    inp = inp.numpy().transpose((1, 2, 0))
    mean = np.array([0.485])
    std = np.array([0.229])
    inp = std * inp + mean
    inp = np.clip(inp, 0, 1)
    plt.imshow(inp)
    if title is not None:
        plt.title(title)
    plt.pause(0.001)  # pause a bit so that plots are updated


# Get a batch of training data
inputs, classes = next(iter(dataloaders['train']))

# Make a grid from batch
out = torchvision.utils.make_grid(inputs)

imshow(out, title=[class_names[x] for x in classes])


## Train the Model

In [None]:
def train_model(model, criterion, optimizer, scheduler, num_epochs=EPOCHS):
    wandb.watch(model, log_freq=100)

    since = time.time()
    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    for epoch in range(num_epochs):
        print(f"Epoch {epoch + 1}/{num_epochs}")
        print('-' * 10)

        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()
            else:
                model.eval()

            running_loss = 0.0
            running_corrects = 0

            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device) # load data to GPU
                labels = labels.to(device)

                optimizer.zero_grad() # clear gradients for this training step

                with torch.set_grad_enabled(phase == 'train'):
                    # Calculate loss
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)

                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)
            if phase == 'train':
                scheduler.step()

            epoch_loss = running_loss / dataset_sizes[phase]
            epoch_acc = running_corrects.double() / dataset_sizes[phase]

            print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')
            wandb.log({"loss": epoch_loss, "acc": epoch_acc})

            # save the model if it is the best so far
            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())
        
        print()
    
    time_elapsed = time.time() - since
    print(f'Training complete in {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s')
    print(f'Best val Acc: {best_acc:4f}')
    wandb.log({"best_acc": best_acc})

    model.load_state_dict(best_model_wts)
    return model

In [None]:
model_ft = models.resnet18(pretrained=True)
num_ftrs = model_ft.fc.in_features

model_ft.fc = nn.Linear(num_ftrs, len(class_names))

model_df = model_ft.to(device)

criterion = nn.CrossEntropyLoss()

optimizer_ft = optim.SGD(model_ft.parameters(),
                         lr=LEARNING_RATE, momentum=MOMENTUM)

exp_lr_scheduler = lr_scheduler.StepLR(
    optimizer_ft, step_size=STEP_SIZE, gamma=GAMMA)


In [None]:
model_ft = train_model(model_ft, criterion, optimizer_ft,
                       exp_lr_scheduler, EPOCHS)

## Visualize Model Predictions

In [None]:
def visualize_model(model, num_images=6):
    """Shows predictions for a few images"""
    was_training = model.training
    model.eval()
    images_so_far = 0
    fig = plt.figure()

    with torch.no_grad():  # Reduces memory consumption for inference
        for i, (inputs, labels) in enumerate(dataloaders['val']):
            inputs = inputs.to(device)
            labels = labels.to(device)

            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)  # Returns the max output

            for j in range(inputs.size()[0]):
                images_so_far += 1

                # // divides and only keeps the integer part
                ax = plt.subplot(num_images//2, 2, images_so_far)
                ax.axis('off')
                ax.set_title(f'predicted: {class_names[preds[j]]}')
                imshow(inputs.cpu().data[j])

                if images_so_far == num_images:
                    model.train(mode=was_training)
                    return
        model.train(mode=was_training)


In [None]:
visualize_model(model_ft)

## Save Model

In [None]:
torch.save(model_ft, os.path.join(wandb.run.dir, 'model.pt'))


In [None]:
wandb.finish()

# Playing

## Load the Model

In [None]:
device = torch.device("cuda")
model_file = wandb.restore(
    'model.pt', run_path="eddiezhuang/paper-io-ai/1o0dyh11")

model = torch.load(model_file.name)
model.to(device)
model.eval()


In [None]:
with torch.no_grad():
    screenshot = su.shoot_screen()
    screenshot = su.data_transform(screenshot)
    output = model(screenshot)
    print(output)


## Play in Real Time!

In [None]:
kb.wait('enter')
print('Starting AI playing...')

while True:
    screenshot = su.shoot_screen()

    kb.send('w')
    kb.send('a')
    kb.send('s')
    kb.send('d')

    if kb.is_pressed('esc'):
        print('Exiting AI playing!')
        break
