In [3]:
import pandas as pd
import numpy as np
import cv2

from sklearn.feature_extraction.text import TfidfVectorizer
from ast import literal_eval

import torch.cuda
import torch.nn as nn

from sklearn.model_selection import train_test_split

In [4]:
def LoadTrainTestData(pathToDataCsv, testSetSize=0.1, shuffle=False, randomState=1):

    dataFrame = pd.read_csv(pathToDataCsv)

    dataFrame['affordances'] = dataFrame['affordances'].apply(lambda x: literal_eval(str(x)))
    dataFrame['tiles'] = dataFrame['tiles'].apply(lambda x: np.array(literal_eval(str(x))))

    return train_test_split(dataFrame, test_size=testSetSize, random_state=randomState, shuffle=shuffle)

def LoadCrossValTrainTestData(pathToDataCsv, shuffle=False, randomState=1):

    crossValDataDict = {}

    dataFrame = pd.read_csv(pathToDataCsv)

    dataFrame['affordances'] = dataFrame['affordances'].apply(lambda x: literal_eval(str(x)))
    dataFrame['tiles'] = dataFrame['tiles'].apply(lambda x: np.array(literal_eval(str(x))))

    if shuffle:
        dataFrame = dataFrame.sample(frac=1, random_state=randomState).reset_index()

    for gameName in dataFrame['gamename'].unique():
        testData = dataFrame[dataFrame['gamename'] == gameName]
        trainData = dataFrame.drop(testData.index)
        
        crossValDataDict[gameName] = {"trainData": trainData, "testData": testData}
    
    return crossValDataDict

In [None]:
# Loading standard train test split
trainData, testData = LoadTrainTestData("../data/tomData/unshuffled3x3tiles.csv", shuffle=True)

In [None]:
def TextTileToImage(tileArray, tileSize, spritePath, savePath=None):

    outputImage = np.empty((tileSize*tileArray.shape[0], tileSize*tileArray.shape[1], 3), dtype=np.uint8)

    fileName = ""

    for i, row in enumerate(tileArray):
        for j, tile in enumerate(row):

            tile = '@' if tile == '.' else tile
            fileName += tile

            tileImage = cv2.cvtColor(cv2.imread(f"{spritePath}/{tile}.png"), cv2.COLOR_BGR2RGB)
            outputImage[i*tileSize:(i+1)*tileSize, j*tileSize:(j+1)*tileSize] = tileImage

    if savePath:
        cv2.imwrite(f"{savePath}/{fileName}.png", outputImage)

    return outputImage

In [None]:
spritePaths = {
    "kidicarus": "../data/tomData/sprites/kidicarus", 
    "loderunner": "../data/tomData/sprites/loderunner",
    "megaman": "../data/tomData/sprites/megaman",
    "supermariobros": "../data/tomData/sprites/supermariobros",
    "thelegendofzelda": "../data/tomData/sprites/thelegendofzelda",
}

# Does the same thing as above but faster
trainData["image"] = [TextTileToImage(row['tiles'], 16, spritePaths[row['gamename']]) for index, row in trainData.iterrows()]

In [None]:
# Initialize TfidfVectorizer from sklearn
vectorizer = TfidfVectorizer(stop_words=None)

# Find all unique affordance values
flattendedAffordanceList = []
for affordanceList in trainData['affordances'].to_list():
    flattendedAffordanceList += affordanceList
affordanceClasses = np.unique(np.array(flattendedAffordanceList))

# Fit the TfidfVectorizer to affordance values in the training set
vectorizer.fit_transform(trainData["affordances"].apply(lambda x: str(x)))

# Add the weights created for each affordance class to a easily indexable dictionary
newDict = {affordanceClass: vectorizer.idf_[vectorizer.vocabulary_[affordanceClass]] for affordanceClass in affordanceClasses}

# Average each weight and scale by 1000
weightFreq = {k: v / sum(newDict.values()) for k, v in newDict.items()}
# weightVector = [v * 1000 for v in newDict.values()] Forked github code dont know why * 1000??
weightVector = [v for v in newDict.values()]


tensorFromList = torch.tensor(weightVector, dtype=torch.float32)
print(tensorFromList)

In [None]:
# Does the same thing as above but faster
trainData["encodedAffordances"] = [np.sum(np.array([np.where(np.array(list(newDict.keys())) == affordance, 1, 0) for affordance in row['affordances']]), axis=0) for index, row in trainData.iterrows()]
testData["encodedAffordances"] = [np.sum(np.array([np.where(np.array(list(newDict.keys())) == affordance, 1, 0) for affordance in row['affordances']]), axis=0) for index, row in testData.iterrows()]

In [None]:
class WeightedBCE(nn.Module):

    def __init__(self, weightedArray, debug=False):
        super().__init__()

        self.debug = debug

        self.weighedArray = weightedArray

    def forward(self, yPred, yTrue):

        bce_array = nn.functional.binary_cross_entropy(yPred, yTrue, reduction="none")
        weighted_array = torch.mul(bce_array, self.weighedArray)

        if self.debug:
            print(weighted_array.shape)

        bce_sum = torch.sum(weighted_array, axis=1)
        loss = torch.div(bce_sum, 13.0)
        loss = torch.mean(loss)

        return loss

In [None]:
class TileEmbeddingVAE(nn.Module):

    def __init__(self, debug=False):
        super().__init__()

        self.debug = debug

        self.imageEncoder = nn.Sequential(
            nn.Conv2d(3, 32, kernel_size=3, stride=3),
            nn.BatchNorm2d(32),
            nn.Tanh(),
            nn.Conv2d(32, 32, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(32),
            nn.Tanh(),
            nn.Conv2d(32, 16, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(16),
            nn.Tanh(),
            nn.Flatten(),
        )

        self.textEncoder = nn.Sequential(
            nn.Linear(13, 32),
            nn.Tanh(),
            nn.Linear(32, 16),
            nn.Tanh(),
        )

        self.embeddingLayer = nn.Linear(4112, 256)

        self.imageDecoder = nn.Sequential(
            nn.Linear(256, 4096),
            nn.Unflatten(1, (16, 16, 16)),
            nn.ConvTranspose2d(16, 32, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(32),
            nn.Tanh(),
            nn.ConvTranspose2d(32, 32, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(32),
            nn.Tanh(),
            nn.ConvTranspose2d(32, 3, kernel_size=3, stride=1, padding=1),
        )

        self.textDecoder = nn.Sequential(
            nn.Linear(256, 16),
            nn.Tanh(),
            nn.Linear(16, 32),
            nn.Tanh(),
            nn.Linear(32, 13),
            nn.Sigmoid(),
        )

    def encode(self, xImages, xText):
        encodedImage = self.imageEncoder(xImages)
        encodedText = self.textEncoder(xText)

        if self.debug:
            print("EncodedImage shape: ", encodedImage.shape)
            print("encodedText shape: ", encodedText.shape)

        concatenateEmbeddding = torch.cat((encodedImage, encodedText), 1)

        embedding = self.embeddingLayer(concatenateEmbeddding)

        return embedding

    def decode(self, embedding):
        decodedImage = self.imageDecoder(embedding)
        decodedText = self.textDecoder(embedding)
        return decodedImage, decodedText

    def forward(self, xImages, xText):
        # Encoder
        encodedEmbedding = self.encode(xImages, xText)

        # Decoder
        yPredImage, yPredText = self.decode(encodedEmbedding)
        
        return yPredImage, yPredText

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"

model = TileEmbeddingVAE()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

imageCritierion = nn.MSELoss()
textCritierion = WeightedBCE(tensorFromList.to(device))

In [None]:
batchSize = 32
epochs = 1

imageLossWeight = 0.8
textLossWeight = 1.0 - imageLossWeight

model.to(device)
model.train()

for i in range(epochs):

    losses = []

    for j in range(0, trainData.shape[0], batchSize):

        xImages = np.array(trainData.iloc[j:j+batchSize]["image"].tolist())
        yImages = xImages[:, 16:32, 16:32, :]
        # print(xImages.shape)
        # print(yImages.shape)

        xImageBatch = torch.tensor(xImages, dtype=torch.float32)
        xImageBatch = xImageBatch.reshape((-1, 3, 48, 48))
        xImageBatch = xImageBatch.to(device)

        yImageBatch = torch.tensor(yImages, dtype=torch.float32)
        yImageBatch = yImageBatch.reshape((-1, 3, 16, 16))
        yImageBatch = yImageBatch.to(device)
        
        xTextbatch = torch.tensor(trainData.iloc[j:j+batchSize]["encodedAffordances"].tolist(), dtype=torch.float32).to(device)

        yPredImages, yPredTexts = model(xImageBatch, xTextbatch)
        # print(yPredImages.shape)
        # print(yImageBatch.shape)

        imageLoss = imageCritierion(yPredImages, yImageBatch)
        textLoss = textCritierion(yPredTexts, xTextbatch)
        # print(imageLoss)
        # print(textLoss)

        loss = torch.add(torch.mul(imageLoss, imageLossWeight), torch.mul(textLoss, textLossWeight))
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        losses.append(loss.cpu().detach().item())

    print(f"Epoch {i}: loss {sum(losses)/len(losses)}")