In [None]:
from google.colab import drive
drive.mount('/content/gdrive')
%cd gdrive/MyDrive/EE698V/

In [None]:
NUM = 10
row_len = 513 # Number of rows: 1 + n_fft/2
frames = 50 # frames*(min_duration*1/0.01) = columns
outname = "Task2.csv" # header is off, columns are "File" and "Labels"

In [None]:
import numpy as np
import pandas as pd

import numpy as np
import pandas as pd

from tensorflow import config, distribute
from keras.models import Sequential
from keras.layers import Dense, Dropout, Flatten, Conv2D, MaxPool2D, BatchNormalization
from keras.callbacks import LearningRateScheduler, ModelCheckpoint, EarlyStopping

In [None]:
gpus = config.list_physical_devices('GPU');
print(gpus)

if len(gpus) == 1:
    strategy = distribute.OneDeviceStrategy(device="/gpu:0")
else:
    strategy = distribute.MirroredStrategy()

config.optimizer.set_experimental_options({"auto_mixed_precision": True})

In [None]:
def Slices(npfile):
    X = np.load(npfile)

    if X.shape[1] < frames:
        return [np.zeros((row_len, frames))]

    if X.shape[0] > row_len:
        max_offset = X.shape[0] - row_len
        offset = np.random.randint(max_offset)
        X = X[offset : (row_len + offset), :]
    else:
        if X.shape[0] < row_len:
            max_offset = row_len - X.shape[0]
            offset = np.random.randint(max_offset)
        else:
            offset = 0
        X = np.pad(X, ((offset, row_len - X.shape[0] - offset), (0, 0)), "constant")

    STFTs = []
    C = int(X.shape[1]/frames)
    for c in range(C):
        STFT_sample = X[:, c*frames : (c + 1)*frames]
        STFTs.append(STFT_sample)

    return STFTs

def prepare_data(df, data_dir):
    print("Number of test samples processed: ")
    X = []
    nums = []
    for i, fname in enumerate(df["File"]):
        fpath = data_dir + "/" + fname + ".npy"
        STFTs = Slices(fpath)
        nums.append(len(STFTs))
        for _, stft in enumerate(STFTs):
            stft = np.expand_dims(stft, axis = -1)
            X.append(stft)

        if(i != 0 and i%200 == 0):
            print(i, end = ".. ")
    print(df.shape[0], end = ".. ")
    print("Done!")

    X = np.stack(X)
    return X, nums

def build_model():
    model = Sequential()
    model.add(Conv2D(32, kernel_size = (7, 3), strides = 2, activation = 'relu', input_shape = (row_len, frames, 1)))
    model.add(BatchNormalization())
    model.add(MaxPool2D(pool_size = (4, 2)))
    model.add(Conv2D(64, kernel_size = (7, 3), strides = 2, activation = 'relu'))
    model.add(BatchNormalization())
    model.add(MaxPool2D(pool_size = (4, 1)))
    model.add(Conv2D(128, kernel_size = 3, strides = 2, activation = 'relu'))
    model.add(BatchNormalization())
    
    model.add(Flatten())
    model.add(Dropout(0.4))
    model.add(Dense(32, activation = 'relu'))
    model.add(Dropout(0.4))
    model.add(Dense(n_classes, activation = 'softmax'))

    model.compile(optimizer = "adam", loss = "categorical_crossentropy", metrics = ["accuracy"])
    
    return model

In [None]:
train = pd.read_csv("labels_train.csv")
test = pd.read_csv("predict_test_slices.csv", header = None)
test.columns =["File", "Labels"]
sub = test

LABELS = list(train['class'].unique())
label_idx = {label: i for i, label in enumerate(LABELS)}
n_classes = len(train["class"].unique())

X_test, labels = prepare_data(test, "test_data_slices")

normalize = np.load("models/normalize_slices.npy")
MEAN = normalize[0]
STD = normalize[1]
X_test = (X_test - MEAN)/STD

In [None]:
result = np.zeros((X_test.shape[0], n_classes))

with strategy.scope():
    model = build_model()

for num in range(NUM):
    model.load_weights("models/best_slices_%d.h5"%(num + 1))
    predictions = model.predict(X_test, batch_size = 32, verbose = 0)
    
    for i in range(result.shape[0]):
        result[i, predictions.argmax(axis = 1)[i]] = result[i, predictions.argmax(axis = 1)[i]] + 1

In [None]:
false_labels = []
threshhold = 5
for i in range(result.shape[0]):
    row = result[i]
    not_false = True
    for j in range(len(row)):
        if (row[j] > threshhold):
            not_false = False  
    if not_false:
        false_labels.append(i)

In [None]:
result = np.array(LABELS)[np.argmax(result, axis = 1)]

In [None]:
for i, idx in enumerate(false_labels):
    result[idx] = "None" + result[idx]

In [None]:
idx = 0
for i, num in enumerate(labels):
    ans = ""
    curr = ""
    poss = ""
    for j in range(num):
        if (result[idx][:4] != "None" and result[idx] != curr):
            ans += "-" + result[idx]
            curr = result[idx]
        if result[idx][:4] == "None":
            poss = result[idx]
        idx += 1

    if len(ans):
        ans = ans[1:]
    else:
        ans = poss[4:]
    sub.iloc[i]['Labels'] = ans

sub.to_csv(outname, header = None, index = False)
print(sub)

In [None]:
# !pip install python-Levenshtein > rubish.txt
# from Levenshtein import distance
# import string

# def editDistance(gt, est):
#     gttokens = gt.split('-')
#     esttokens = est.split('-')
#     tokenset = list(set(gttokens+esttokens))
#     token_char = {}
#     for i in range(len(tokenset)):
#         token_char[tokenset[i]] = string.ascii_uppercase[i]

#     gtstr = [token_char[t] for t in gttokens]
#     gtstr = ''.join(gtstr)
#     eststr = [token_char[t] for t in esttokens]
#     eststr = ''.join(eststr)
    
#     editdist = distance(gtstr, eststr)
#     score = 1 - editdist/len(gtstr)
#     return editdist, score

# true = pd.read_csv("task2_truelabels.csv", header = None)

# score = 0.0
# for i in range(sub.shape[0]):
#     _, ss = editDistance(true[1].tolist()[i], sub['Labels'].tolist()[i])
#     print()
#     score += ss
# print(score)