In [None]:
import torch
import numpy as np
import cv2
import os
import torch.nn as nn
from torchvision import datasets, models, transforms
from torch.utils.data import DataLoader, Subset
from tqdm import tqdm, trange
import torch.nn.functional as F
import logging

torch.manual_seed(69)

# Logger

In [None]:
logging.basicConfig(
    filename="app.log",         # File to log to
    level=logging.INFO,         # Logging level
    format="%(asctime)s - %(levelname)s - %(message)s",  # Log message format
    datefmt="%Y-%m-%d %H:%M:%S" # Date and time format
)

# Downscaler

In [None]:

# Downscaling all images in the dataset. 

def downscale_images(input_root, output_root, downscale_factor):
    for root, dirs, files in os.walk(input_root):
        # Create corresponding directory in the output structure
        relative_path = os.path.relpath(root, input_root)
        output_dir = os.path.join(output_root, relative_path)
        os.makedirs(output_dir, exist_ok=True)
        
        for file in tqdm(files):
            if file.lower().endswith(('png', 'jpg', 'jpeg', 'bmp', 'tiff', 'gif')):
                input_path = os.path.join(root, file)
                output_path = os.path.join(output_dir, file)
                try:
                    # Read and downscale the image
                    image = cv2.imread(input_path)
                    if image is not None:
                        downsampled_image = cv2.resize(
                            image,
                            None,
                            fx=1/downscale_factor,
                            fy=1/downscale_factor,
                            interpolation=cv2.INTER_AREA
                        )
                        # Save the downscaled image
                        cv2.imwrite(output_path, downsampled_image)
                        #print(f"Processed: {input_path} -> {output_path}")
                    else:
                        #print(f"Failed to read: {input_path}")
                        pass
                except Exception as e:
                    print(f"Error processing {input_path}: {e}")
            else:
                #print(f"Skipped non-image file: {file}")
                pass

if __name__ == "__main__":
    input_root = "ProperDataset"  # Input dataset root directory
    output_root = "training_images"  # Output dataset root directory
    downscale_factor = 10  # Adjust this factor as needed
    downscale_images(input_root, output_root, downscale_factor)


# Dataset Maker

In [None]:
import splitfolders
splitfolders.ratio("training_images", output="Dataset",
    seed=1337, ratio=(.8, .1, .1), group_prefix=None, move=False)

In [None]:
# Paths to dataset
data_dir = "Dataset"
train_dir = os.path.join(data_dir, "train")
val_dir = os.path.join(data_dir, "val")
test_dir = os.path.join(data_dir, "test") 

# PASTE YOUR TRANSFORMATIONS BELOW
transform = {
    "train": transforms.Compose([
        transforms.Resize((64,64)), 
        transforms.Grayscale(num_output_channels=1),
        transforms.ToTensor() 
    ]),
    "val": transforms.Compose([
        transforms.Resize((64, 64)),
        transforms.Grayscale(num_output_channels=1),
        transforms.ToTensor()
    ]),
    "test": transforms.Compose([
        transforms.Resize((64, 64)),
        transforms.Grayscale(num_output_channels=1),
        transforms.ToTensor()
    ])
}

# Load datasets
dataset_dict = {
    "train": datasets.ImageFolder(train_dir, transform=transform["train"]),
    "val": datasets.ImageFolder(val_dir, transform=transform["val"]),
    "test": datasets.ImageFolder(test_dir, transform=transform["test"]) 
}

# Create DataLoaders
dataloaders = {
    "train": DataLoader(dataset_dict["train"], batch_size=32, shuffle=True, num_workers=4),
    "val": DataLoader(dataset_dict["val"], batch_size=32, shuffle=False, num_workers=4),
    "test": DataLoader(dataset_dict["test"], batch_size=32, shuffle=False, num_workers=4)  
}


# The Model

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
fast = FastCNN(5)
model = get_model(5)
model = model.to(device)

# Define loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# Training Loop

In [None]:
# Use it to load the trained weights or train it yourself in the below cell
# model.load_state_dict( torch.load("model_weights.pth"))


In [114]:
import random

g = "v4.3"

num_epochs = 20

noop_class_index = dataset_dict["train"].class_to_idx["noop"]  # Index of the "noop" class

# Get indices for all "noop" samples
noop_indices = [i for i, (_, label) in enumerate(dataset_dict["train"].samples) if label == noop_class_index]

def get_random_noop_subset(base_dataset, noop_indices, num_samples=1000):
    # Randomly sample 1000 indices from the "noop" indices
    selected_noop_indices = random.sample(noop_indices, num_samples)

    # Combine the noop subset with the rest of the dataset
    non_noop_indices = [i for i in range(len(base_dataset)) if i not in noop_indices]
    final_indices = non_noop_indices + selected_noop_indices

    # Return a Subset dataset
    return Subset(base_dataset, final_indices)

for epoch in trange(11, num_epochs+1):

    # Training phase
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    current_train_dataset = get_random_noop_subset(dataset_dict["train"], noop_indices)

    # Create a DataLoader
    train_loader = DataLoader(current_train_dataset, batch_size=32, shuffle=True, num_workers=4)


    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * inputs.size(0)
        _, preds = torch.max(outputs, 1)
        correct += torch.sum(preds == labels.data)
        total += labels.size(0)

    train_loss = running_loss / len(dataloaders["train"].dataset)
    train_acc = correct.double() / total
    logging.info(f"{epoch} : Train Loss: {train_loss:.4f}, Train Accuracy: {train_acc:.4f}")

    if epoch % 5 == 0:
        # Save model weights
        model.eval()  # Set model to evaluation mode
        save_path = f"Trained_Weights/{g}_{epoch}th.pth"
        torch.save(model.state_dict(), save_path)

        # Initialize validation metrics
        running_loss = 0.0
        correct = 0
        total = 0

        with torch.no_grad():  # Disable gradient calculations for validation
            for inputs, labels in dataloaders["val"]:
                # Move inputs and labels to the appropriate device
                inputs, labels = inputs.to(device), labels.to(device)

                # Forward pass
                outputs = model(inputs)
                loss = criterion(outputs, labels)

                # Update loss and accuracy
                running_loss += loss.item() * inputs.size(0)
                _, preds = torch.max(outputs, 1)  # Get predicted class indices
                correct += torch.sum(preds == labels)
                total += labels.size(0)

        # Calculate validation loss and accuracy
        val_loss = running_loss / len(dataloaders["val"].dataset)
        val_acc = correct.double() / total

        # Log validation metrics
        logging.info(f"Epoch {epoch}: Val Loss: {val_loss:.4f}, Val Accuracy: {val_acc:.4f}")


100%|██████████| 10/10 [05:38<00:00, 33.82s/it]


# Tester

In [None]:
with torch.no_grad():
    running_loss = 0.0
    correct = 0
    total = 0
    for inputs, labels in dataloaders["test"]:
        inputs, labels = inputs.to(device), labels.to(device)

        outputs = model(inputs)
        loss = criterion(outputs, labels)

        running_loss += loss.item() * inputs.size(0)
        _, preds = torch.max(outputs, 1)
        correct += torch.sum(preds == labels.data)
        total += labels.size(0)

test_loss = running_loss / len(dataloaders["test"].dataset)
test_acc = correct.double() / total
print(f"Test Loss: {test_loss:.4f}, Test Accuracy: {test_acc:.4f}")

# Player

In [116]:
import cv2
import mss
import numpy as np
import torch
from torchvision import transforms
from pynput.keyboard import Controller, Key
from PIL import Image
import time
# Initialize keyboard controller
keyboard = Controller()

# PASTE YOUR TRANSFORMATIONS HERE
transform = transforms.Compose([
    transforms.Resize((64, 64)), 
    transforms.Grayscale(num_output_channels=1),
    transforms.ToTensor() 
])

# Monitor region for screen capture
monitor = {
    "left": 350,  # x-coordinate for the top-left corner
    "top": 180,   # y-coordinate for the top-left corner
    "width": 740, # width of the capture region
    "height": 690 # height of the capture region
}
acc=set()
# Key mapping for predictions
key_map = {
    "left": Key.left,
    "right": Key.right,
    "up": Key.up,
    "down": Key.down
}

# Function to simulate key presses
def press_key(action):
    if action in key_map:
        keyboard.press(key_map[action])
        keyboard.release(key_map[action])

# Set model to evaluation mode
model.eval()

# Capture and process the screen
with mss.mss() as sct:
    i = 0
    while True:
        # Capture screen region
        screenshot = sct.grab(monitor)
        i+=1
        screenshot_np = np.array(screenshot)

        # Resize and convert the image
        downscale_factor = 2
        resized_image = cv2.resize(
            screenshot_np,
            None,
            fx=1 / downscale_factor,
            fy=1 / downscale_factor,
            interpolation=cv2.INTER_AREA
        )
        rgb_image = cv2.cvtColor(resized_image, cv2.COLOR_BGRA2RGB)

        # Preprocess image
        img_tensor = transform(Image.fromarray(rgb_image)).unsqueeze(0) # Add batch dimension

        # Make predictions
        with torch.no_grad():
            outputs = model(img_tensor)
            _, predicted = torch.max(outputs, 1)
            prediction_idx = predicted.item()

        # Map prediction index to action
        class_names = dataset_dict['train'].classes
        
        action = class_names[prediction_idx]

        # Simulate key press if necessary
        acc.add(action)
        if action != "noop":
            acc.add(action)
            press_key(action)
            time.sleep(0.6) # manual bottleneck to prevent rapid movements

KeyboardInterrupt: 

## Weight Viewer

In [None]:
for k, v in model.named_parameters():
    print(k, v)

In [None]:
total_params = sum(p.numel() for p in model.parameters())
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)

print(f"Total parameters: {total_params}")
print(f"Trainable parameters: {trainable_params}")