In [25]:
# Training
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import numpy as np
import os
import glob
import random
from keras.models import load_model
from pynput.keyboard import Key, Controller, Listener
import cv2



# 특정 키들
valid_keys = ['w', 'a', 's', 'd']
special_labels = ['no_keys_pressed']

# 데이터셋 클래스 정의
class KeypressDataset(Dataset):
    def __init__(self, base_folders, transform=None, w_ratio=0.3, nokey_ratio = 0.5):
        self.transform = transform
        self.image_paths = []
        self.labels = []
        self.speeds = []

        for base_folder in base_folders:
            # 속도 값을 저장할 경로
            speed_file_path = os.path.join(base_folder, 'text', 'speed.txt')

            # 속도 값 읽기
            with open(speed_file_path, 'r') as f:
                speed_values = [float(line.strip()) for line in f]

            # 유효한 폴더 이름만 필터링
            label_folders = [f for f in os.listdir(base_folder) if os.path.isdir(os.path.join(base_folder, f)) and self.is_valid_label(f)]

            # 각 폴더 내의 모든 이미지 경로를 수집
            all_image_paths = []
            for label_folder in label_folders:
                folder_path = os.path.join(base_folder, label_folder)
                image_paths = glob.glob(os.path.join(folder_path, '*.png'))
                for img_path in image_paths:
                    all_image_paths.append((img_path, label_folder))

            # 파일 이름을 기준으로 정렬
            all_image_paths.sort(key=lambda x: int(os.path.basename(x[0]).split('.')[0]))

            # 정렬된 이미지 경로와 라벨, 속도 값을 각각 저장
            for i, (img_path, label_folder) in enumerate(all_image_paths):
                label_array = self.label_to_array(label_folder)
                # 'w' 데이터의 비율을 조정
                if label_array == [1, 0, 0, 0]:
                    if random.random() > w_ratio:
                        continue
                # if label_array == [1, 1, 0, 0]:
                #     if random.random() > w_ratio:
                #         continue
                # if label_array == [1, 0, 0, 1]:
                #     if random.random() > w_ratio:
                #         continue
                if label_array == [0, 0, 0, 0]:
                    if random.random() > nokey_ratio:
                        continue
                self.image_paths.append(img_path)
                self.labels.append(label_array)
                self.speeds.append(speed_values[i])

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image_path = self.image_paths[idx]
        image = Image.open(image_path).convert('RGB')

        if self.transform:
            image = self.transform(image)

        label = self.labels[idx]
        speed = self.speeds[idx]

        return image, torch.tensor(label, dtype=torch.float), torch.tensor(speed, dtype=torch.float).unsqueeze(0)

    def is_valid_label(self, label):
        if label in special_labels:
            return True
        for char in label.split('_'):
            if char not in valid_keys:
                return False
        return True

    def label_to_array(self, label):
        if label == 'no_keys_pressed':
            return [0, 0, 0, 0]
        array = [0, 0, 0, 0]
        for char in label.split('_'):
            if char in valid_keys:
                index = valid_keys.index(char)
                array[index] = 1
        return array

# 데이터 변환 정의
transform = transforms.Compose([
    transforms.Resize((128, 128)),  # 이미지 크기 조정
    transforms.ToTensor(),  # 텐서로 변환
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # 정규화
])

# CNN 모델 정의 (속도 값 추가)
class ModifiedCNNModelWithSpeed(nn.Module):
    def __init__(self):
        super(ModifiedCNNModelWithSpeed, self).__init__()
        self.conv1 = nn.Conv2d(3, 32, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(32)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(64)
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        self.bn3 = nn.BatchNorm2d(128)
        self.conv4 = nn.Conv2d(128, 256, kernel_size=3, padding=1)
        self.bn4 = nn.BatchNorm2d(256)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2, padding=0)
        self.fc1 = nn.Linear(256 * 8 * 8, 512)
        self.fc2 = nn.Linear(512 + 1, 4)  # 속도 값을 추가한 노드 수 (512 + 1)

        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.5)
        self.sigmoid = nn.Sigmoid()  # 시그모이드 활성화 함수

    def forward(self, x, speed):
        x = self.pool(self.relu(self.bn1(self.conv1(x))))
        x = self.pool(self.relu(self.bn2(self.conv2(x))))
        x = self.pool(self.relu(self.bn3(self.conv3(x))))
        x = self.pool(self.relu(self.bn4(self.conv4(x))))
        x = x.view(-1, 256 * 8 * 8)
        x = self.relu(self.fc1(x))

        # 속도 값을 추가하여 입력으로 사용
        x = torch.cat((x, speed), dim=1)
        x = self.dropout(x)
        x = self.sigmoid(self.fc2(x))  # 시그모이드 활성화
        return x

# 학습 루프 정의
num_epochs = 100

def train_model(base_folders):
    # 데이터셋 및 데이터로더 생성
    dataset = KeypressDataset(base_folders=base_folders, transform=transform, w_ratio=0.5, nokey_ratio = 0.5)
    dataloader = DataLoader(dataset, batch_size=32, shuffle=True, num_workers=0)  # num_workers=2로 설정

    # 모델, 손실 함수, 최적화 알고리즘 설정
    model = ModifiedCNNModelWithSpeed()
    criterion = nn.BCELoss()  # 이진 크로스 엔트로피 손실 함수
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    for epoch in range(num_epochs):
        running_loss = 0.0
        for images, labels, speeds in dataloader:
            # 모델 예측 초기화
            optimizer.zero_grad()

            # 순전파 + 역전파 + 최적화
            outputs = model(images, speeds)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            # 손실 누적
            running_loss += loss.item()

        print(f'Epoch {epoch + 1}/{num_epochs}, Loss: {running_loss / len(dataloader)}')

    print('Finished Training')

    # 모델 저장
    torch.save(model.state_dict(), 'modified_cnn_model_with_speed.pth')
    print('Model saved to modified_cnn_model_with_speed.pth')

def predict_image(image_path, speed, model, transform):
    image = Image.open(image_path).convert('RGB')
    image = transform(image)
    image = image.unsqueeze(0)  # 배치 차원 추가

    speed = torch.tensor([[speed]], dtype=torch.float)  # 속도 값을 텐서로 변환

    with torch.no_grad():
        output = model(image, speed)
        output = output.squeeze().numpy()

    return output

# 모델 로드 및 추론
def load_model_and_predict(image_path, speed):
    # 모델 인스턴스 생성 및 가중치 로드
    model = ModifiedCNNModelWithSpeed()
    model.load_state_dict(torch.load('modified_cnn_model_with_speed.pth'))
    model.eval()

    # 이미지 예측
    prediction = predict_image(image_path, speed, model, transform)
    print(f'Prediction for {image_path} with speed {speed}: {prediction}')

if __name__ == '__main__':

    # mp.set_start_method('spawn')
    # 현재 디렉토리의 모든 폴더를 base_folders로 설정

    current_directory = os.getcwd()
    base_folders = [os.path.join(current_directory, name) for name in os.listdir(current_directory) if os.path.isdir(os.path.join(current_directory, name))]

    # 학습 모델
    train_model(base_folders)

    # 추론 예시
    # example_image_path = 'path_to_your_image.png'
    # example_speed = 0.7
    # load_model_and_predict(example_image_path, example_speed)


Epoch 1/100, Loss: 5.089885365962982
Epoch 2/100, Loss: 1.4241427159309388
Epoch 3/100, Loss: 0.4524289619922638
Epoch 4/100, Loss: 0.40766318440437316
Epoch 5/100, Loss: 0.40406485795974734
Epoch 6/100, Loss: 0.38881996750831604
Epoch 7/100, Loss: 0.3880790030956268
Epoch 8/100, Loss: 0.3888913929462433
Epoch 9/100, Loss: 0.38452597975730896
Epoch 10/100, Loss: 0.3918797528743744
Epoch 11/100, Loss: 0.3861772406101227
Epoch 12/100, Loss: 0.3800950503349304
Epoch 13/100, Loss: 0.37518787026405337
Epoch 14/100, Loss: 0.3679965341091156
Epoch 15/100, Loss: 0.3692064851522446
Epoch 16/100, Loss: 0.36501635909080504
Epoch 17/100, Loss: 0.35977696418762206
Epoch 18/100, Loss: 0.36474592447280885
Epoch 19/100, Loss: 0.36670536279678345
Epoch 20/100, Loss: 0.3592632019519806
Epoch 21/100, Loss: 0.35158018112182615
Epoch 22/100, Loss: 0.362984277009964
Epoch 23/100, Loss: 0.3590904605388641
Epoch 24/100, Loss: 0.3621082258224487
Epoch 25/100, Loss: 0.34561349630355837
Epoch 26/100, Loss: 0.346

In [26]:
# test
example_image_path = 'screenshots_20240607-024658/a/19.png'
example_speed = 80
load_model_and_predict(example_image_path, example_speed)
example_image_path = 'screenshots_20240607-024658/a_w/33.png'
example_speed = 88
load_model_and_predict(example_image_path, example_speed)
example_image_path = 'screenshots_20240607-024658/d_w/12.png'
example_speed = 94
load_model_and_predict(example_image_path, example_speed)
example_image_path = 'screenshots_20240607-024058/w/15.png'
example_speed = 86
load_model_and_predict(example_image_path, example_speed)
example_image_path = 'screenshots_20240607-024058/s/77.png'
example_speed = 35
load_model_and_predict(example_image_path, example_speed)
example_image_path = 'screenshots_20240607-024822/d/50.png'
example_speed = 83
load_model_and_predict(example_image_path, example_speed)
example_image_path = 'screenshots_20240607-024822/no_keys_pressed/6.png'
example_speed = 47


Prediction for screenshots_20240607-024658/a/19.png with speed 80: [3.8110595e-02 8.3816338e-01 5.7020073e-04 5.0863409e-06]
Prediction for screenshots_20240607-024658/a_w/33.png with speed 88: [8.2777435e-01 9.2151177e-01 8.6279528e-05 4.5925929e-04]
Prediction for screenshots_20240607-024658/d_w/12.png with speed 94: [4.0236190e-01 1.8015543e-03 3.2642987e-04 4.8683730e-01]
Prediction for screenshots_20240607-024058/w/15.png with speed 86: [9.8386776e-01 6.6202033e-06 1.0456417e-10 1.8290447e-02]
Prediction for screenshots_20240607-024058/s/77.png with speed 35: [8.4734135e-13 5.4380320e-07 9.9733633e-01 3.4091322e-06]
Prediction for screenshots_20240607-024822/d/50.png with speed 83: [2.8228235e-01 4.0299376e-04 1.1603635e-05 6.8229479e-01]


In [None]:
#eval
import torch
import torch.nn as nn
import torchvision.transforms as transforms
from PIL import ImageGrab, Image
import numpy as np
from keras.models import load_model
import cv2
from pynput.keyboard import Key, Controller, Listener

# 특정 키들
valid_keys = ['w', 'a', 's', 'd']

# 데이터 변환 정의
transform = transforms.Compose([
    transforms.Resize((128, 128)),  # 이미지 크기 조정
    transforms.ToTensor(),  # 텐서로 변환
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # 정규화
])


# CNN 모델 정의 (속도 값 추가)
class ModifiedCNNModelWithSpeed(nn.Module):
    def __init__(self):
        super(ModifiedCNNModelWithSpeed, self).__init__()
        self.conv1 = nn.Conv2d(3, 32, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(32)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(64)
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        self.bn3 = nn.BatchNorm2d(128)
        self.conv4 = nn.Conv2d(128, 256, kernel_size=3, padding=1)
        self.bn4 = nn.BatchNorm2d(256)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2, padding=0)
        self.fc1 = nn.Linear(256 * 8 * 8, 512)
        self.fc2 = nn.Linear(512 + 1, 4)  # 속도 값을 추가한 노드 수 (512 + 1)

        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.5)
        self.sigmoid = nn.Sigmoid()  # 시그모이드 활성화 함수

    def forward(self, x, speed):
        x = self.pool(self.relu(self.bn1(self.conv1(x))))
        x = self.pool(self.relu(self.bn2(self.conv2(x))))
        x = self.pool(self.relu(self.bn3(self.conv3(x))))
        x = self.pool(self.relu(self.bn4(self.conv4(x))))
        x = x.view(-1, 256 * 8 * 8)
        x = self.relu(self.fc1(x))

        # 속도 값을 추가하여 입력으로 사용
        x = torch.cat((x, speed), dim=1)
        x = self.dropout(x)
        x = self.sigmoid(self.fc2(x))  # 시그모이드 활성화
        return x

# 모델 로드
model = ModifiedCNNModelWithSpeed()
model.load_state_dict(torch.load('modified_cnn_model_with_speed.pth'))
model.eval()

# 속도 예측 모델 로드
speed_model = load_model('speed_rec.h5')

# 키보드 컨트롤러
keyboard = Controller()

# 화면 및 속도 영역 정의
screen_region = (320, 170, 1650, 1080)
speed_region = (145, 905, 202, 940)

# ESC 키 종료 플래그
exit_flag = False

def on_press(key):
    global exit_flag
    if key == Key.esc:
        exit_flag = True
        return False

def get_speed(speed_image):
    speed_image = cv2.cvtColor(speed_image, cv2.COLOR_BGR2GRAY)  # Grayscale로 변환
    speed_image = speed_image.astype('float32') / 255  # 정규화
    speed_image = np.expand_dims(speed_image, axis=0)
    speed_image = np.expand_dims(speed_image, axis=-1)  # 채널 차원 추가
    # speed = speed_model.predict(speed_image)[0][0]
    # return speed
    prediction = speed_model.predict(speed_image)
    predicted_label = np.argmax(prediction, axis=1)[0]

    return predicted_label

with Listener(on_press=on_press) as listener:
    while not exit_flag:
        # 화면 캡처
        screen = ImageGrab.grab(bbox=screen_region)
        screen = np.array(screen)
        screen = cv2.cvtColor(screen, cv2.COLOR_BGR2RGB)
        screen_image = Image.fromarray(screen)
        
        # 속도 캡처
        speed_screen = ImageGrab.grab(bbox=speed_region)
        speed_screen = np.array(speed_screen)
        speed_screen = cv2.cvtColor(speed_screen, cv2.COLOR_BGR2RGB)
        
        # 속도 예측
        speed = get_speed(speed_screen)
        speed_tensor = torch.tensor([[speed]], dtype=torch.float)

        # 이미지 변환
        screen_image = transform(screen_image)
        screen_image = screen_image.unsqueeze(0)  # 배치 차원 추가

        # 모델 예측
        with torch.no_grad():
            output = model(screen_image, speed_tensor)
            output = output.squeeze().numpy()

        # 예측 확률에 따른 키 입력
        pressed_keys = []
        for i, prob in enumerate(output):
            if prob >= 0.5:
                key = valid_keys[i]
                pressed_keys.append(key)
                keyboard.press(key)
            else:
                key = valid_keys[i]
                keyboard.release(key)

        # 콘솔에 속도 값과 입력된 키 표시
        print(f'Speed: {speed}, Keys: {" ".join(pressed_keys)}')
        print(f'[ {output[0]:.4f} ] [ {output[1]:.4f} ] [ {output[2]:.4f} ] [ {output[3]:.4f} ]')

        # ESC 키로 프로그램 종료
        if exit_flag:
            break

    listener.join()
