In [1]:
from src.get_from_mongo import get_data
from sklearn.model_selection import train_test_split
from pymongo import MongoClient
import numpy as np
import tensorflow as tf
import pickle

In [2]:
id_from_char = {
    'CAPTAIN_FALCON' : 1 ,
    'DONKEY_KONG'    : 2 ,
    'FOX'            : 3 ,
    'GAME_AND_WATCH' : 4 ,
    'KIRBY'          : 5 ,
    'BOWSER'         : 6 ,
    'LINK'           : 7 ,
    'LUIGI'          : 8 ,
    'MARIO'          : 9 ,
    'MARTH'          : 10 ,
    'MEWTWO'         : 11 ,
    'NESS'           : 12 ,
    'PEACH'          : 13 ,
    'PIKACHU'        : 14 ,
    'ICE_CLIMBERS'   : 15 ,
    'JIGGLYPUFF'     : 16 ,
    'SAMUS'          : 17 ,
    'YOSHI'          : 18 ,
    'ZELDA'          : 19 ,
    'SHEIK'          : 20 ,
    'FALCO'          : 21 ,
    'YOUNG_LINK'     : 22 ,
    'DR_MARIO'       : 23 ,
    'ROY'            : 24 ,
    'PICHU'          : 25 ,
    'GANONDORF'      : 26 ,
}

char_from_id = {v:k for k, v in id_from_char.items()}

In [3]:
database_name = 'slippi'
collection_name = 'melee_clips_30s'

# Connect to the hosted MongoDB instance
client = MongoClient('localhost', 27017)
db = client[database_name]
collection = db[collection_name]

In [4]:
def data_generator(clip_collection=collection, # collection containing clips
                   batch_size = 100,
                   skip=None,
                   step=1,
                   repeat=False,
                   repeat_offset=1):

        cur = clip_collection.find()
        
        if skip:
            cur.skip(skip)
            
        while cur.alive:
            
            xi = []
            yi = []
            
            for _ in range(batch_size):
                for _ in range(step):
                    try:
                        clip = next(cur)
                        
                    except StopIteration:
                        if repeat is not None:
                            skip += repeat_offset
                            cur = clip_collection.find()
                            cur.skip(skip)
                            clip = next(cur)
                        else:
                            raise
                            
                xi.append(pickle.loads(clip['istream']).toarray())
                yi.append(id_from_char[clip['character']])

            Xi = np.stack(xi, axis=0)
            Yi = tf.one_hot(yi, 26)

            yield Xi, Yi

In [5]:
'''
    TODO: finish this
'''

# def class_weights (clip_collection=collection, # collection containing clips
#                    skip=None,
#                    step=11,
#                    repeat=0,
#                    repeat_offset=1):

#         cur = clip_collection.find()
        
#         if skip:
#             cur.skip(skip)
            
#         for 
            
#         while cur.alive:
            
#             class_labels = []
                
#             for _ in range(step):
#                 try:
#                     clip = next(cur)
                    
#                 except StopIteration:
#                     if repeat is not None:
#                         skip += repeat_offset
#                         cur = clip_collection.find()
#                         cur.skip(skip)
#                         clip = next(cur)
#                     else:
#                         raise
                            
#                 xi.append(pickle.loads(clip['istream']).toarray())
#                 yi.append(id_from_char[clip['character']])

#             Xi = np.stack(xi, axis=0)
#             Yi = tf.one_hot(yi, 26)

#             yield Xi, Yi

'\n    TODO: finish this\n'

In [6]:
from tensorflow import keras

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, Flatten, BatchNormalization
from tensorflow.keras.layers import Conv1D, MaxPooling1D, AveragePooling1D, SpatialDropout1D
from tensorflow.keras.activations import swish

import tensorflow_addons as tfa

top_8_accuracy = keras.metrics.TopKCategoricalAccuracy(k=8, name='top 8 accuracy')

focal_loss = tfa.losses.SigmoidFocalCrossEntropy()

In [7]:
model = Sequential()

# first conv layer
# sees .5s
model.add(Conv1D(150, #num of features extracted from istream
                 15, #number of frames filter can see at once
                 activation=swish))

model.add(SpatialDropout1D(.5))
model.add(MaxPooling1D(pool_size=2))

# sees 1s
model.add(Conv1D(100,
                 15,
                 activation=swish))

model.add(SpatialDropout1D(.5))
model.add(MaxPooling1D(pool_size=2))

# sees 2s
model.add(Conv1D(100,
                 15,
                 activation=swish))

model.add(SpatialDropout1D(.5))
model.add(MaxPooling1D(pool_size=3))

# sees 6s
model.add(Conv1D(80,
                 15,
                 activation=swish))

model.add(SpatialDropout1D(.5))

# sees whole 30s, takes avg pool
model.add(BatchNormalization())
model.add(MaxPooling1D(pool_size=15*5))
model.add(Flatten())

model.add(Dense(80, activation=swish))

model.add(Dropout(.5))

model.add(Dense(80, activation=swish))

model.add(Dropout(.5))

model.add(Dense(40, activation=swish))

model.add(Dropout(.5))

# final output layer
model.add(Dense(26, activation='softmax'))
                
model.compile(loss=focal_loss,
              optimizer='nadam',
              metrics=['accuracy', top_8_accuracy])

# Training

In [8]:
data = data_generator(batch_size=50, skip=100003, step=199, repeat=True) # keep first 100000 clips as test data

In [None]:
# during fit process watch train and test error simultaneously
model.fit(data, epochs=5, steps_per_epoch=500, verbose=1)

score = model.evaluate(data, steps=50, verbose=0)
print('\nTest score:', round(score[0], 3))
print(f'Test accuracy: {round(score[1]*100)}%')
print(f'Test test top 8 accuracy: {round(score[2]*100)}%')  # this is the one we care about

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5

# Testing

In [None]:
def get_conf_matrix(labels_as_id, predictions_as_id):
    conf_matrix = np.zeros((27,27))
    for i_real, i_pred in zip(labels_as_id, predictions_as_id):
        conf_matrix[i_real, i_pred] += 1
    return conf_matrix

In [None]:
X_test, Y_test = next(data_generator(batch_size=5000, step = 5))

In [None]:
pred = np.argmax(model.predict(X_test), axis = 1)
y_test = np.argmax(Y_test, axis = 1)

In [None]:
score = model.evaluate(X_test, Y_test, verbose=0)

In [None]:
print('\nTest score:', round(score[0], 3))
print(f'Test accuracy: {round(score[1]*100)}%')
print(f'Test test top 8 categorical accuracy: {round(score[2]*100)}%') 

In [None]:
recalls = {k:0 for k in range(1,27)}
precisions = {k:0 for k in range(1,27)}
conf_matrix = get_conf_matrix(y_test, pred)
char_id = 0

In [None]:
char_id += 1
row = conf_matrix[char_id, :]
sorted_row_indices = np.argsort(row)[::-1]
correct = conf_matrix[char_id, char_id]
total = np.sum(row)
recall = correct/total if total else 0
recalls[char_id] = recall
print(f'{char_from_id[char_id]}')
print(f'Recall: {round(100*recall, 1)}%\n ------------------')
for i in sorted_row_indices:
    if i > 0:
        print(f'{char_from_id[i]} : {row[i]}')

In [None]:
for i, acc in {k: v for k, v in sorted(recalls.items(), key=lambda item: -item[1])}.items():
    print(f'{char_from_id[i]}:\t{round(100*acc, 1)}%')

In [None]:
char_id = 0

In [None]:
char_id += 1
col = conf_matrix[:, char_id]
sorted_col_indices = np.argsort(col)[::-1]
correct = conf_matrix[char_id, char_id]
total = np.sum(col)
precision = correct/total if total else 0
precisions[char_id] = precision
print(f'{char_from_id[char_id]}')
print(f'Precision: {round(100*precision, 1)}%\n ------------------')
for i in sorted_col_indices:
    if i > 0:
        print(f'{char_from_id[i]} : {col[i]}')

In [None]:
for i, acc in {k: v for k, v in sorted(precisions.items(), key=lambda item: -item[1])}.items():
    print(f'{char_from_id[i]}:\t{round(100*acc, 1)}%')

In [None]:
model.summary()