# Данные

In [None]:
from google.colab import drive

drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import h5py
import pandas as pd

with h5py.File('/content/drive/MyDrive/Методы компрессии/common_fields_images.h5') as f:
    images = f['images'][:]
    additional_bits = f['additional_bit'][:]

with open('/content/drive/MyDrive/Методы компрессии/common_fields_labels.txt', encoding='cp1251') as f:
    markup = [e.strip() for e in f.readlines()]

In [None]:
import numpy as np

def encode_texts(texts):
    def _label_to_num(label, alphabet):
        label_num = []
        for ch in label:
            label_num.append(alphabet.find(ch))
        return np.array(label_num)

    alphabet = ''.join(sorted(set(''.join(texts))))

    nums = np.ones([len(texts), max([len(text) for text in texts])], dtype='int64') * len(alphabet)
    for i, text in enumerate(texts):
        nums[i][:len(text)] = _label_to_num(text, alphabet)

    return nums, alphabet

In [None]:
labels_encoded, alphabet = encode_texts(markup)

print(alphabet)

images = images.astype('float64') / 255

additional_bits_expanded = np.zeros((len(images), 50, 2))
additional_bits_expanded[:, :, additional_bits] = 1

 -.0123456789АБВГДЕЖЗИЙКЛМНОПРСТУФХЦЧШЩЪЫЬЭЮЯ


In [None]:
train_indices = np.random.choice(np.arange(images.shape[0]), int(images.shape[0]*0.8), replace=False)
val_indices = [e for e in np.arange(images.shape[0]) if e not in train_indices]

assert len(set(train_indices) & set(val_indices)) == 0
assert len(set(train_indices) | set(val_indices)) == images.shape[0]

train_imgs = images[train_indices]
val_imgs = images[val_indices]

train_abits = additional_bits_expanded[train_indices]
val_abits = additional_bits_expanded[val_indices]

train_labels = labels_encoded[train_indices]
val_labels = labels_encoded[val_indices]

# Функции

In [None]:
import tensorflow as tf
from keras import backend as K


class CER(tf.keras.metrics.Metric):
    """
    A custom Keras metric to compute the Character Error Rate
    """
    def __init__(self, name='CER', decode_greedy=True, **kwargs):
        super(CER, self).__init__(name=name, **kwargs)
        self.decode_greedy = decode_greedy
        self.cer_accumulator = self.add_weight(name="total_cer", initializer="zeros")
        self.counter = self.add_weight(name="cer_count", initializer="zeros")

    def update_state(self, y_true, y_pred, sample_weight=None):
        input_shape = K.shape(y_pred)
        input_length = tf.ones(shape=input_shape[0]) * K.cast(input_shape[1], 'float32')

        decode, log = K.ctc_decode(y_pred, input_length, greedy=True)

        decode = K.ctc_label_dense_to_sparse(decode[0], K.cast(input_length, 'int32'))
        y_true_sparse = K.ctc_label_dense_to_sparse(y_true, K.cast(input_length, 'int32'))
        y_true_sparse = tf.sparse.retain(y_true_sparse, tf.not_equal(y_true_sparse.values, tf.math.reduce_max(y_true_sparse.values)))

        decode = tf.sparse.retain(decode, tf.not_equal(decode.values, -1))
        distance = tf.edit_distance(decode, y_true_sparse, normalize=True)

        self.cer_accumulator.assign_add(tf.reduce_sum(distance))
        self.counter.assign_add(K.cast(len(y_true), 'float32'))

    def result(self):
        return tf.math.divide_no_nan(self.cer_accumulator, self.counter)

    def reset_state(self):
        self.cer_accumulator.assign(0.0)
        self.counter.assign(0.0)


def CTCLoss(y_true, y_pred):
    """
    Compute the training-time loss value
    """
    batch_len = tf.cast(tf.shape(y_true)[0], dtype="int64")
    input_length = tf.cast(tf.shape(y_pred)[1], dtype="int64")
    label_length = tf.cast(tf.shape(y_true)[1], dtype="int64")

    input_length = input_length * tf.ones(shape=(batch_len, 1), dtype="int64")
    label_length = label_length * tf.ones(shape=(batch_len, 1), dtype="int64")

    loss = K.ctc_batch_cost(y_true, y_pred, input_length, label_length)
    return loss

# Модель

In [None]:
import time

model = tf.keras.models.load_model('/content/drive/MyDrive/Методы компрессии/crnn_common_fields.h5',
                                   custom_objects={'CTCLoss': CTCLoss, 'CER': CER})

# warmup
_ = model.predict([val_abits, val_imgs], verbose=0)

start = time.time()
loss, cer = model.evaluate([val_abits, val_imgs], val_labels)
print(f'Time spent: {round(time.time()-start, 6)}, loss: {round(loss, 6)}, CER: {round(cer, 6)}')

Time spent: 6.911379, loss: 0.148245, CER: 0.002677


# Оптимизация

In [None]:
!pip install -q tensorflow_model_optimization

[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/241.2 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m41.0/241.2 kB[0m [31m986.6 kB/s[0m eta [36m0:00:01[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m [32m235.5/241.2 kB[0m [31m3.6 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m241.2/241.2 kB[0m [31m3.2 MB/s[0m eta [36m0:00:00[0m
[?25h

In [None]:
import tensorflow_model_optimization as tfmot

epochs = 10
batch_size = 256

num_images = train_imgs.shape[0]
end_step = (num_images // 256 + 1) * epochs

# Define model for pruning.
pruning_params = {
      'pruning_schedule': tfmot.sparsity.keras.PolynomialDecay(initial_sparsity=0.50,
                                                               final_sparsity=0.80,
                                                               begin_step=0,
                                                               end_step=end_step)
}


model_for_pruning = tfmot.sparsity.keras.prune_low_magnitude(model, **pruning_params)
model_for_pruning.compile(optimizer=tf.keras.optimizers.Nadam(learning_rate=1e-4),
              loss=CTCLoss,
              metrics=[CER()])

Первые неудачные эксперименты показали, что дообучаться нужно не слишком мало эпох + возможно не стоит брать слишком большой lr (мы же тут fine-tuning'ом занимаемся).

Вообще, было бы неплохо автоматизировать выбор числа эпох каким-нибудь callback'ом (типа учись не больше условных 20 эпох пока значения фп значительно изменяются каждую эпоху)

In [None]:
model_for_pruning.fit([train_abits, train_imgs], train_labels, validation_data=([val_abits, val_imgs], val_labels),
                      batch_size=batch_size, epochs=epochs,
                      callbacks=[tfmot.sparsity.keras.UpdatePruningStep(),])

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<keras.src.callbacks.History at 0x7f82d5f5a9b0>

In [None]:
start = time.time()
model_for_export = tfmot.sparsity.keras.strip_pruning(model_for_pruning)
model_for_export.compile(optimizer=tf.keras.optimizers.Nadam(learning_rate=1e-4),
              loss=CTCLoss,
              metrics=[CER()])
loss, cer = model_for_export.evaluate([val_abits, val_imgs], val_labels)
print(f'Time spent: {round(time.time()-start, 6)}, loss: {round(loss, 6)}, CER: {round(cer, 6)}')

Time spent: 8.591957, loss: 0.772015, CER: 0.013807


Ускорения нет(

In [None]:
from google.colab import runtime

runtime.unassign()