In [49]:
import os
import cv2
import json
import numpy as np
from IPython.display import Image
import PIL
import pandas as pd
from tqdm import tqdm
from random import randint, shuffle
import random
import sys
import keras

In [53]:
def gatherData(dataPathList, framesViewed):
    folders = []
    images = []
    labels = {"attack": [], 
               "forward": [], 
               "back": [], 
               "left": [], 
               "right": [], 
               "jump": [], 
               "sneak": [], 
               "sprint": [], 
               "use": [], 
               "drop": [], 
               "inventory": [], 
               "hotbar": [], 
               "camera1": [], 
               "camera2": []}
    
    for dataPath in dataPathList:
        for folder in os.listdir(dataPath):
            folders.append(dataPath + folder)
            if not ".DS_Store" in folder:
                newMoves = pd.read_csv(dataPath + folder + "/moves.csv")
                for index, move in newMoves.iterrows():
                    framesToInclude = []
                    for i in range(0, framesViewed):
                        framesToInclude.append(dataPath + folder + "/" + str(int(move["startImage"] + i)) + ".jpg")
                    images.append(framesToInclude)
                    labels["attack"].append(move["attack"])
                    labels["forward"].append(move["forward"])
                    labels["back"].append(move["back"])
                    labels["left"].append(move["left"])
                    labels["right"].append(move["right"])
                    labels["jump"].append(move["jump"])
                    labels["sneak"].append(move["sneak"])
                    labels["sprint"].append(move["sprint"])
                    labels["use"].append(move["use"])
                    labels["drop"].append(move["drop"])
                    labels["inventory"].append(move["inventory"])
                    labels["hotbar"].append(move["hotbar"])
                    labels["camera1"].append(move["camera1"])
                    labels["camera2"].append(move["camera2"])
                    
    return images, labels

In [54]:
class Generator(keras.utils.Sequence):
    
    def __init__(self, images, labels,
                 batch_size,
                 input_size=(640, 360, 3),
                 shuffle=True):

        self.images = images
        self.labels = labels
        self.batch_size = batch_size
        self.input_size = input_size
        self.shuffle = shuffle
        
        self.n = len(self.labels)

        
    def __get_input(self, path):

        image= np.array(PIL.Image.open(path))
        image = image.astype('float32')

        return image/255.
    
    def on_epoch_end(self):
        if self.shuffle:
            temp = list(zip(self.images, self.labels))
            random.shuffle(temp)
            res1, res2 = zip(*temp)
            # res1 and res2 come out as tuples, and so must be converted to lists.
            self.images, self.labels = list(res1), list(res2)
    
    def __getitem__(self, index):

        imageBatch = self.images[index * self.batch_size:(index + 1) * self.batch_size]
        labelBatch = self.labels[index * self.batch_size:(index + 1) * self.batch_size]
        X, y = self.__get_data(imageBatch, labelBatch)        
        return X, y
    
    def test_getitem(self, index):
        return self.__getitem__(index)
    
    def __len__(self):
        return self.n // self.batch_size
    
    def __get_output(self, startImages, endImages, labelBatch):
        
        X1 = np.array(startImages, np.float32)
        X2 = np.array(endImages, np.float32)

        Y = np.array(list(map(int,labelBatch)), np.int64)
        
        return X1, X2, Y

    def __get_data(self, imageBatch, labelBatch):
        # Generates data containing batch_size samples
        
        startImages = []
        endImages = []
        for imagePaths in imageBatch:
            startImages.append(self.__get_input(imagePaths[0]))
            endImages.append(self.__get_input(imagePaths[1]))
        
        X1, X2, Y = self.__get_output(startImages, endImages, labelBatch)

        return tuple([X1, X2]), Y

In [70]:
DATAPATHS = ["../assets/datasets/Move Classifier Data/MineRLBasaltFindCave-v0/", 
                            "../assets/datasets/Move Classifier Data/MineRLBasaltBuildVillageHouse-v0/", 
                            "../assets/datasets/Move Classifier Data/MineRLBasaltCreateVillageAnimalPen-v0/", 
                            "../assets/datasets/Move Classifier Data/MineRLBasaltMakeWaterfall-v0/"]
images, labels = gatherData(DATAPATHS, 2)
temp = list(zip(images, labels["attack"]))
random.shuffle(temp)
res1, res2 = zip(*temp)
# res1 and res2 come out as tuples, and so must be converted to lists.
images, labels = list(res1), list(res2)

X_train = images[:int(len(images) * 0.8)]
Y_train = labels[:int(len(labels) * 0.8)]
X_val = images[int(len(images) * 0.8):]
Y_val = labels[int(len(labels) * 0.8):]

generator = Generator(X_train, Y_train, batch_size=16)
val_generator = Generator(X_val, Y_val, batch_size=16)

In [71]:
X, Y = generator.test_getitem(1)

In [72]:
print(len(X))

2


In [73]:
print(len(X[0]))

16


In [74]:
print(len(Y))

16


In [91]:
print(type(X), type(X[0]), X[0].shape, type(X[0][0][0][0][0]))
print(type(X), type(X[1]), X[1].shape, type(X[1][0][0][0][0]))

<class 'tuple'> <class 'numpy.ndarray'> (16, 360, 640, 3) <class 'numpy.float32'>
<class 'tuple'> <class 'numpy.ndarray'> (16, 360, 640, 3) <class 'numpy.float32'>


In [86]:
print(type(Y), Y.shape, type(Y[0]))

<class 'numpy.ndarray'> (16,) <class 'numpy.int64'>
