In [13]:
import pygame
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader
import cv2
import numpy as np
import json
import random
import gym
import minerl
import pickle

# --------- HYPERPARAMETERS ---------
keys_classes = 22
camera_classes = 2
learning_rate = 1e-4
batch_size = 1
num_epochs = 1000
num_workers = 4
channels, temporal_depth, height, width = 1, 7, 90, 160  # Video dimensions

# Define the number of layers for each block in the ResNet3D
resnet_layers = [2, 2, 2, 2]

In [14]:
video_path = '/mnt/d/py/vpt/data/labeller-training/video/mc-0.mp4'

device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(device)

def read_video_to_memory(video_path):
    cap = cv2.VideoCapture(video_path)
    frames = []

    while True:
        ret, frame = cap.read()
        if not ret:
            break
        gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        gray_frame = cv2.resize(gray_frame, (height, width))
        frames.append(gray_frame)
        

    cap.release()
    return torch.from_numpy(np.array(frames))

# video_tensor will equal [] because I tried to load a game instance and reset the video content
video_tensor = read_video_to_memory(video_path)
print(video_tensor.shape)

cuda
torch.Size([112, 160, 90])


In [15]:
class BasicBlock3D(nn.Module):
    expansion = 1

    def __init__(self, in_channels, out_channels, stride=1, downsample=None):
        super(BasicBlock3D, self).__init__()
        self.conv1 = nn.Conv3d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm3d(out_channels)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv3d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm3d(out_channels)
        self.downsample = downsample
        self.stride = stride

    def forward(self, x):
        identity = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        if self.downsample is not None:
            identity = self.downsample(x)

        out += identity
        out = self.relu(out)

        return out
    
    # --------- RESNET3D DEFINITION ---------

class ResNet3D(nn.Module):
    def __init__(self, block, layers, num_classes):
        super(ResNet3D, self).__init__()
        self.in_channels = 64

        self.conv1 = nn.Conv3d(channels, self.in_channels, kernel_size=7, stride=(1, 2, 2), padding=(3, 3, 3), bias=False)
        self.bn1 = nn.BatchNorm3d(self.in_channels)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool3d(kernel_size=3, stride=2, padding=1)

        # Layers
        self.layer1 = self._make_layer(block, 64, layers[0])
        self.layer2 = self._make_layer(block, 128, layers[1], stride=2)
        self.layer3 = self._make_layer(block, 256, layers[2], stride=2)
        self.layer4 = self._make_layer(block, 512, layers[3], stride=2)

        self.avgpool = nn.AdaptiveAvgPool3d((1, 1, 1))
        self.fc = nn.Linear(512 * block.expansion, num_classes)

    def _make_layer(self, block, out_channels, blocks, stride=1):
        downsample = None
        if stride != 1 or self.in_channels != out_channels * block.expansion:
            downsample = nn.Sequential(
                nn.Conv3d(self.in_channels, out_channels * block.expansion, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm3d(out_channels * block.expansion),
            )

        layers = []
        layers.append(block(self.in_channels, out_channels, stride, downsample))
        self.in_channels = out_channels * block.expansion
        for _ in range(1, blocks):
            layers.append(block(self.in_channels, out_channels))

        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.relu(self.bn1(self.conv1(x)))
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)

        return x
    
    # --------- IDM MODEL DEFINITION ---------

class KeysIDM(nn.Module):
    def __init__(self, keys_classes):
        super(KeysIDM, self).__init__()
        self.keys_resnet3d = ResNet3D(BasicBlock3D, resnet_layers, keys_classes)
        self.camera_resnet3d = ResNet3D(BasicBlock3D, resnet_layers, camera_classes)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        keys_out = self.keys_resnet3d(x)
        # camera_out = self.camera_resnet3d(x)
        keys_out = self.sigmoid(keys_out)
        return keys_out
    
class CameraIDM(nn.Module):
    def __init__(self, camera_classes):
        super(CameraIDM, self).__init__()
        # self.keys_resnet3d = ResNet3D(BasicBlock3D, resnet_layers, keys_classes)
        self.camera_resnet3d = ResNet3D(BasicBlock3D, resnet_layers, camera_classes)
        # self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        # keys_out = self.keys_resnet3d(x)
        camera_out = self.camera_resnet3d(x)
        # keys_out = self.sigmoid(keys_out)
        return camera_out


In [16]:
def get_video(rand_indices):
    
    batch = torch.zeros((batch_size, 1, temporal_depth, width, height), dtype=video_tensor.dtype)

    for i in range(batch_size):
        batch[i, 0] = video_tensor[rand_indices[i]:rand_indices[i] + temporal_depth]

    return batch

def get_actions(action_dict):

    # Define default actions
    default_actions = {
        "attack": 0, "back": 0, "drop": 0, "forward": 0, 
        "hotbar.1": 0, "hotbar.2": 0, "hotbar.3": 0, "hotbar.4": 0, "hotbar.5": 0,
        "hotbar.6": 0, "hotbar.7": 0, "hotbar.8": 0, "hotbar.9": 0, "inventory": 0,
        "jump": 0, "left": 0, "right": 0, "pickItem": 0, "sneak": 0, "sprint": 0,
        "swapHands": 0, "use": 0, "camera": [0.0, 0.0]
    }
    # Update with the actual values from action_dict
    default_actions.update(action_dict)
    # Extract keyboard actions and camera actions
    keyboard_actions = [1 if default_actions[key] else 0 for key in default_actions if key not in ["camera", "ESC", "noop"]]
    camera_actions = default_actions.get("camera", [0.0, 0.0])

    return keyboard_actions, camera_actions


with open('/mnt/d/py/vpt/data/labeller-training/actions/mc-0.json', 'r') as json_file:
    actions_data = json.load(json_file)



def get_batch():
    
    total_frames = video_tensor.shape[0]
    # print(video_tensor.shape)
    # print([random.randint(0, total_frames-temporal_depth) for _ in range(batch_size)])
    # print(total_frames)
    rand_indices = [random.randint(0, total_frames-temporal_depth) for _ in range(batch_size)]
    batch_frames = get_video(rand_indices)


    batch_keys = []
    batch_cams = []
    
    for i in range(batch_size):
        keyboard, camera = get_actions(actions_data[rand_indices[i]+(temporal_depth//2)])

        batch_keys.append(keyboard)
        batch_cams.append(camera)

    return batch_frames.to(device).float() / 255.0, torch.tensor(batch_keys, dtype=torch.float32, device='cuda'), torch.tensor(batch_cams, dtype=torch.float32, device='cuda')


In [17]:
# Assuming you have the train_loader set up correctly with your dataset

keys_idm_model = KeysIDM(keys_classes=keys_classes).to(device)
camera_idm_model = CameraIDM(camera_classes=camera_classes).to(device)
keys_idm_model.eval()
camera_idm_model.eval()


# with open('models/resnet-keyspred.pkl', 'rb') as f:
#     keys_idm_model = pickle.load(f)


with open('models/resnet-camerapred.pkl', 'rb') as f:
    camera_idm_model = pickle.load(f)

keys_idm_model = keys_idm_model.to(device)
camera_idm_model = camera_idm_model.to(device)
# optimizer = optim.Adam(idm_model.parameters(), lr=learning_rate)
# criterion = nn.BCELoss()

# Initialize the Minecraft environment
env = gym.make('MineRLBasaltBuildVillageHouse-v0')
# env = gym.make('MineRLBasaltFindCave-v0')

env.seed(2143)
obs = env.reset()

FileNotFoundError: [Errno 2] No such file or directory: 'models/resnet-keyspred.pkl'

In [None]:
pygame.init()

    # frames = []

RESOLUTION = (640, 360)  # Resolution at which to capture and save the video
screen = pygame.display.set_mode(RESOLUTION)
pygame.display.set_caption('Minecraft')
SENS = 0.05



pygame.mouse.set_visible(True)
pygame.mouse.set_pos(screen.get_width() // 2, screen.get_height() // 2)  # Center the mouse
pygame.event.set_grab(False)

keys_actions_arr = [
    "attack", "back", "drop", "forward", "hotbar.1", "hotbar.2", "hotbar.3", "hotbar.4", "hotbar.5",
    "hotbar.6", "hotbar.7", "hotbar.8", "hotbar.9", "inventory", "jump", "left", "right", "pickItem",
    "sneak", "sprint", "swapHands", "use"
]

final_action = {}

try:
    # Iterate over data loader
    for i in range(video_tensor.shape[0]-temporal_depth):
    
        # start_time = time.time()
        # framestack = video_loader()
        # end_time = time.time()
      
        
        frames = video_tensor[i:i+temporal_depth].unsqueeze(0).to(device).float() / 255.0
        frames = frames.unsqueeze(0)
        # print(frames.shape)
    
        done = False
        keys_out = keys_idm_model(frames)
        camera_out = camera_idm_model(frames)
        
        keys_action_predicted = keys_out.squeeze().round().tolist()
        camera_out = camera_out.squeeze().cpu().detach().numpy()
        # print(f'dataloader returned: {keys_action_predicted}')
        # print(keys_action_predicted)

        
    
        keys_converted_act = [int(x) for x in keys_action_predicted]
        keys_actions_dict = dict(zip(keys_actions_arr, keys_converted_act))
        # print(keys_actions_dict)

        image = np.array(obs['pov'])

        image = np.flip(image, axis=1)
        image = np.rot90(image)
        # image = image * 0.1 # <- brightness
        image = pygame.surfarray.make_surface(image)
        screen.blit(image, (0, 0))
        pygame.display.update()
    
        
    
        # Get the current state of all keys
        terminate_keys = pygame.key.get_pressed()
    
        action = {'noop': []}

    
        # Now, use delta_x and delta_y for the camera movement
        # camera_out[0] *= 255
        # camera_out[1] *= 255
        # print(camera_out)
        action['camera'] = camera_out.tolist()
        # print(camera_action)
        # print(keys_actions)
        
    
        # Add the in-game 'ESC' action to the beginning of the action
        final_action = {'ESC': 0, **keys_actions_dict, **action}
        # print(final_action)
        # action_log.append(action)
    
        # Apply the action in the environment
        obs, reward, done, _ = env.step(final_action)
    
        # Check if the 'q' key is pressed to terminate the loop
        if terminate_keys[pygame.K_q]:
            break
    
        # Handle pygame events to avoid the window becoming unresponsive
        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                done = True
except KeyboardInterrupt:
    pass
finally:    

        # env.render()
    
    
    # Cleanup
    # out.release()
    # cv2.destroyAllWindows()
    pygame.quit()
        