# [Kannada MNIST](https://www.kaggle.com/c/Kannada-MNIST/data)

In [None]:
import os
import numpy as np
import tensorflow as tf

In [None]:
IW = 28
IH = 28
TP = 10
seed = 0
np.random.seed(seed)

class MNISTLoader():
    def __init__(self):
        
        root = '../../data/Kannada-MNIST'
        sample_file = os.path.join(root, 'sample_submission.csv')
        dev_file = os.path.join(root, 'Dig-MNIST.csv')
        train_file = os.path.join(root, 'train.csv')
        test_file = os.path.join(root, 'test.csv')

        for file in [dev_file, sample_file, train_file, test_file]:
            assert os.path.exists(file), 'Please download dataset and save to "data/Kannada-MNIST/" before boot'

        self.X_train, self.Y_train = self.read_csv(train_file, type='train')
        self.X_dev, self.Y_dev = self.read_csv(dev_file, type='dev')
        self.X_test, _ = self.read_csv(test_file, type='test')
        self.num_train_data, self.num_dev_data, self.num_test_data = self.X_train.shape[0], self.X_dev.shape[0], self.X_test.shape[0]

    @staticmethod
    def read_csv(file_path, type='train'):
        """ 读取 csv 数据 """
        X = []
        Y = []
        with open(file_path, 'r') as f:
            for i, line in enumerate(f):
                print(f'get data {i}', end='\r')
                if i == 0:
                    continue
                line = line.rstrip()
                items = line.split(',')
                if type != 'test':
                    Y.append(np.array(items[0], dtype=np.int32))
                X.append(np.array(items[1: ], dtype=np.float32).reshape((IH, IW, 1)) / 255.0)

        X = np.array(X)
        Y = np.eye(TP)[Y] if type != 'test' else None
        return X, Y

    def random_mini_batches(self, batch_size = 64):
        """ 切分训练集为 mini_batch """
        
        data_size = len(self.X_train)
        permutation = list(np.random.permutation(data_size))
        batch_permutation_indices = [permutation[i: i + batch_size] for i in range(0, data_size, batch_size)]
        for batch_permutation in batch_permutation_indices:
            yield self.X_train[batch_permutation], self.Y_train[batch_permutation]

In [None]:
class CNN(tf.keras.Model):
    """ CNN 模型 """
    def __init__(self, dropout_rate=0.1):
        super().__init__()
        self.conv1 = tf.keras.layers.Conv2D(
            filters=32,
            kernel_size=[3, 3],
            strides=1,
            padding='same',
            activation=tf.nn.relu
        )
        self.pool1 = tf.keras.layers.MaxPool2D(pool_size=[2, 2], strides=2)
        self.conv2 = tf.keras.layers.Conv2D(
            filters=64,
            kernel_size=[3, 3],
            strides=1,
            padding='same',
            activation=tf.nn.relu
        )
        self.pool2 = tf.keras.layers.MaxPool2D(pool_size=[2, 2], strides=2)
        self.flatten = tf.keras.layers.Reshape(target_shape=(7 * 7 * 64,))
        self.dense1 = tf.keras.layers.Dense(units=1024, activation=tf.nn.relu)
        self.dropout1 = tf.keras.layers.Dropout(rate=dropout_rate)
        self.dense2 = tf.keras.layers.Dense(units=128, activation=tf.nn.relu)
        self.dropout2 = tf.keras.layers.Dropout(rate=dropout_rate)
        self.dense3 = tf.keras.layers.Dense(units=10)

    def call(self, inputs):
        x = self.conv1(inputs)
        x = self.pool1(x)
        x = self.conv2(x)
        x = self.pool2(x)
        x = self.flatten(x)
        x = self.dense1(x)
        x = self.dropout1(x)
        x = self.dense2(x)
        x = self.dropout2(x)
        x = self.dense3(x)
        output = tf.nn.softmax(x)
        return output

In [None]:
class Xception(tf.keras.Model):
    """ Xception 模型 """
    def __init__(self):
        super().__init__()
        self.conv1 = tf.keras.layers.Conv2D(
            filters=16,
            kernel_size=[3, 3],
            padding='same',
            activation=tf.nn.relu
        )
        self.separable_conv2 = tf.keras.layers.SeparableConv2D(
            filters=32,
            kernel_size=[3, 3],
            padding='same'
        )
        self.pool2 = tf.keras.layers.MaxPool2D(pool_size=[2, 2], strides=2)
        self.separable_conv3 = tf.keras.layers.SeparableConv2D(
            filters=64,
            kernel_size=[3, 3],
            padding='same'
        )
        self.pool3 = tf.keras.layers.MaxPool2D(pool_size=[2, 2], strides=2)
        self.separable_conv4 = tf.keras.layers.SeparableConv2D(
            filters=128,
            kernel_size=[3, 3],
            padding='same'
        )
        self.dw_conv = tf.keras.layers.DepthwiseConv2D(
            kernel_size=(7, 7),
            strides=(1, 1),
            padding='valid',
        )
        #         self.global_average_pool = tf.keras.layers.GlobalAveragePooling2D()
        self.flatten = tf.keras.layers.Flatten()
        self.dense = tf.keras.layers.Dense(units=10)

    def call(self, inputs):
        x = self.conv1(inputs)
        x = self.separable_conv2(x)
        x = self.pool2(x)
        # x = self.separable_conv3(x)
        # x = self.pool3(x)
        x = self.separable_conv4(x)
        x = self.dw_conv(x)
        #         x = self.global_average_pool(x)
        x = self.flatten(x)
        x = self.dense(x)
        output = tf.nn.softmax(x)
        return output

In [None]:
num_epochs = 100
batch_size = 50
print_step = 100
dev_step = 1000
learning_rate = 0.001

In [None]:
model = CNN(dropout_rate=0.1)
# model = Xception()
data_loader = MNISTLoader()
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)

In [None]:
def one_hots_to_labels(one_hots):
    return np.array([[np.argmax(one_hot)] for one_hot in one_hots])

def predict(model, data_loader, batch_size=10000, type='train'):
    wrongs = 0
    data_length = data_loader.num_train_data if type == 'train' else data_loader.num_dev_data
    X = data_loader.X_train if type == 'train' else data_loader.X_dev
    Y = data_loader.Y_train if type == 'train' else data_loader.Y_dev
    for i in range(0, data_length, batch_size):
        print(f'predict {i}', end='\r')
        X_batch = X[i: i + batch_size]
        Y_batch = Y[i: i + batch_size]
        Y_batch_= model.predict(X_batch)
        Y_batch, Y_batch_ = one_hots_to_labels(Y_batch), one_hots_to_labels(Y_batch_)
        mask = Y_batch.reshape((Y_batch.shape[0], )) - Y_batch_.reshape(Y_batch_.shape[0], )
        wrongs += len(np.flatnonzero(mask))
    return 1- wrongs / data_length

print(predict(model, data_loader, type='dev'))

In [None]:
for i in range(num_epochs):
# for i in range(2):
    num_batches = int(data_loader.num_train_data // batch_size * num_epochs)
    for j, (X, Y) in enumerate(data_loader.random_mini_batches(batch_size)):
        with tf.GradientTape() as tape:
            Y_ = model(X)
            loss = tf.reduce_mean(tf.keras.losses.categorical_crossentropy(
                y_true=Y,
                y_pred=Y_
            ))
        grads = tape.gradient(loss, model.variables)
        optimizer.apply_gradients(grads_and_vars=zip(grads, model.variables))
        if (i * data_loader.num_train_data + j * batch_size) % print_step == 0:
            print(f"{i} - {j * batch_size: 6}: loss {loss.numpy()}")
        if (i * data_loader.num_train_data + j * batch_size) % dev_step == 0:
            train_accuracy = predict(model, data_loader, type='train')
            dev_accuracy = predict(model, data_loader, type='dev')
            print(f'train accuracy: {train_accuracy: .2%} dev accuracy: {dev_accuracy: .2%}')

In [None]:
print(predict(model, data_loader, type='dev'))