In [None]:
import tensorflow as tf
import tensorflow.keras as keras
import tensorflow_datasets as tfds
import os 
import tensorflow_addons as tfa
from skimage import io,transform
import numpy as np

# Read training data in-memory

In [None]:
PATH = r'D:\Training_data_sets\Car-plate\Origin'
Folder = os.listdir(PATH)

for file in Folder:

    img = io.imread(PATH+'/'+ file)
    (img[:,:,0])[(img[:,:,3]) > 1] = 255
    (img[:,:,1])[(img[:,:,3]) > 1] = 255
    (img[:,:,2])[(img[:,:,3]) > 1] = 255

    img = transform.resize(img,(40,150))
    print(img)
    output = img[0:-8,3:-3,:]

    io.imsave(r'D:\Training_data_sets\Car-plate\Reshape'+'/'+file,output)

# Embed token

In [None]:
PATH = r'D:\Training_data_sets\Car-plate\Reshape'
Folder = os.listdir(PATH)

Train_input = []
Target = []
for file in Folder:
    
    img = io.imread(PATH+'/'+ file)
    
    
    word = ['BOS'] + list(file[:-4]) + ['EOS']
    Target.append(word)
    Train_input.append(img[:,:,0:3])
    
Train_input = np.asarray(Train_input)
tokenizer = keras.preprocessing.text.Tokenizer(num_words=100)
tokenizer.fit_on_texts(Target)

Train_target = tokenizer.texts_to_sequences(Target)
Train_target = keras.preprocessing.sequence.pad_sequences(Train_target,maxlen=10,padding='post')
Train_target = np.asarray(Train_target)

In [None]:
for seq in Train_target:
    print([tokenizer.index_word[idx].upper() for idx in seq])

In [None]:
print(np.max(Train_target))

In [None]:
Train_target

In [None]:
#unit test
io.imshow(Train_input[15])
print([tokenizer.index_word[idx].upper() for idx in Train_target[15]])

# Img 2 Patches

In [None]:
class ImgPatches(tf.keras.layers.Layer):
    def __init__(self,d_model,patch_size,):
        super(ImgPatches,self).__init__()
        self.conv1 = tf.keras.layers.Conv2D(d_model,patch_size,patch_size,padding = 'valid')
        self.patch_size = patch_size
        self.d_model = d_model
    def call(self,x):
        batch_size = x.get_shape()[0]
        x = self.conv1(x)
        def Reshape(x):
            x = tf.reshape(x,(-1,tf.multiply(x.get_shape()[1],x.get_shape()[2]),x.get_shape()[3]))
            return x
        x = tf.keras.layers.Lambda(Reshape)(x)
        return x

In [None]:
img = ImgPatches(d_model = 32,patch_size=16)
y = img((Train_input/255).astype('float32'))
y

In [None]:
def point_wise_feed_forward_network(d_model, dff):
  
    return tf.keras.Sequential([
      tf.keras.layers.Dense(dff, activation='relu'),  # (batch_size, seq_len, dff)
      tf.keras.layers.Dense(d_model)  # (batch_size, seq_len, d_model)
    ])

# Encoder

In [None]:
class EncoderBlock(tf.keras.layers.Layer):
    def __init__(self,num_heads,d_model,dff,dropout_rate = 0.1):
        super(EncoderBlock,self).__init__()
        self.norm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.norm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.ffn = point_wise_feed_forward_network(d_model, dff)
        #self.Add = Add()
        self.mha = tf.keras.layers.MultiHeadAttention(num_heads=num_heads,key_dim = d_model)
        self.dropout1 = tf.keras.layers.Dropout(dropout_rate)
    
    def call(self,inp,training = True):
        x = self.norm1(inp)
        x = self.mha(x,x)
        x = self.dropout1(x,training = training)
        x = x + inp

        y = self.norm2(x)
        y = self.ffn(y)
        y = x+y

        return y

In [None]:
E = EncoderBlock(2,32,64)
y = E(y)
y.shape

In [None]:
class Encoder(tf.keras.layers.Layer):
    def __init__(self,num_heads,d_model,dff,num_layers,dropout_rate = 0.1):
        super(Encoder,self).__init__()
        self.encoder_layers = [EncoderBlock(num_heads,d_model,dff,dropout_rate=dropout_rate) for _ in range(num_layers)]
        self.num_layers = num_layers
        self.norm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        
    def call(self,x,training = True):
        emb = positional_encoding(x.get_shape()[1],x.get_shape()[2])
        x = tf.add(x,emb)
        for layer in range(self.num_layers):
            x = self.encoder_layers[layer](x,training=training)
        x = self.norm1(x)
        return x

In [None]:
E = Encoder(2,32,64,2)
y = E(y)
# y

# Decoder

In [None]:
def create_look_forward_mask(size):
    mask = tf.linalg.band_part(tf.ones((size, size)), -1, 0)
    return mask

In [None]:
look_forward_mask = create_look_forward_mask(Train_target.shape[1])
look_forward_mask

In [None]:
class DecoderBlock(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads, dff, rate=0.1):
        super(DecoderBlock, self).__init__()

       
        self.mha1 = tfa.layers.MultiHeadAttention(d_model, num_heads,return_attn_coef = True)
        self.mha2 = tf.keras.layers.MultiHeadAttention(d_model, num_heads)
        self.ffn = point_wise_feed_forward_network(d_model, dff)

        
        self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm3 = tf.keras.layers.LayerNormalization(epsilon=1e-6)

        
        self.dropout1 = tf.keras.layers.Dropout(rate)
        self.dropout2 = tf.keras.layers.Dropout(rate)
        self.dropout3 = tf.keras.layers.Dropout(rate)


    def call(self, x, enc_output,look_forward_mask, training):
       
        attn1,_ = self.mha1([x, x],mask = look_forward_mask)
        attn1 = self.dropout1(attn1, training=training)
        out1 = self.layernorm1(attn1 + x)

   
        attn2 = self.mha2(out1, enc_output)  # (batch_size, target_seq_len, d_model)
        attn2 = self.dropout2(attn2, training=training)
        out2 = self.layernorm2(attn2 + out1)  # (batch_size, target_seq_len, d_model)

        
        ffn_output = self.ffn(out2)  # (batch_size, target_seq_len, d_model)

        ffn_output = self.dropout3(ffn_output, training=training)
        out3 = self.layernorm3(ffn_output + out2)  # (batch_size, target_seq_len, d_model)

        return out3,_

In [None]:
Train_target = Train_target.astype("float32")
Train_target = tf.convert_to_tensor(Train_target)

# Train_target = Train_target[tf.newaxis,...]
Train_target.dtype

In [None]:
E = DecoderBlock(2,32,64)
y = E(y,Train_target)

In [None]:
def get_angles(pos, i, d_model):
    angle_rates = 1 / np.power(10000, (2 * (i//2)) / np.float32(d_model))
    return pos * angle_rates

def positional_encoding(position, d_model):
    angle_rads = get_angles(np.arange(position)[:, np.newaxis],
                          np.arange(d_model)[np.newaxis, :],
                          d_model)

    # apply sin to even indices in the array; 2i
    sines = np.sin(angle_rads[:, 0::2])

    # apply cos to odd indices in the array; 2i+1
    cosines = np.cos(angle_rads[:, 1::2])

    pos_encoding = np.concatenate([sines, cosines], axis=-1)

    pos_encoding = pos_encoding[np.newaxis, ...]

    return tf.cast(pos_encoding, dtype=tf.float32)

In [None]:
class Decoder(tf.keras.layers.Layer):
    # 初始參數跟 Encoder 只差在用 `target_vocab_size` 而非 `inp_vocab_size`
    def __init__(self, num_layers, d_model, num_heads, dff, target_vocab_size,rate=0.1):
        super(Decoder, self).__init__()

        self.d_model = d_model

        # 為中文（目標語言）建立詞嵌入層
        self.embedding = tf.keras.layers.Embedding(target_vocab_size, d_model)
        self.pos_encoding = positional_encoding(target_vocab_size, self.d_model)

        self.dec_layers = [DecoderBlock(d_model, num_heads, dff, rate) 
                           for _ in range(num_layers)]
        self.dropout = tf.keras.layers.Dropout(rate)

        # 呼叫時的參數跟 DecoderLayer 一模一樣
    def call(self, x, enc_output,look_forward_mask, training = True):

        tar_seq_len = tf.shape(x)[1]  

        # 這邊跟 Encoder 做的事情完全一樣
        x = self.embedding(x)  # (batch_size, tar_seq_len, d_model)
#         x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
        x += self.pos_encoding[:, :tar_seq_len, :]
        x = self.dropout(x, training=training)


        for i, dec_layer in enumerate(self.dec_layers):
            x,_ = dec_layer(x, enc_output,look_forward_mask, training)


            # x.shape == (batch_size, tar_seq_len, d_model)
        return x

In [None]:
#unit test
D = Decoder(1,32,2,64,36)
X  = D(Train_target,y,look_forward_mask)

In [None]:
class Transformer(tf.keras.Model):
    # 初始參數包含 Encoder & Decoder 都需要超參數以及中英字典數目
    def __init__(self,patch_size ,num_layers, d_model, num_heads, dff, target_vocab_size, rate=0.1):
        super(Transformer, self).__init__()
        
        self.ImagePatches = ImgPatches(d_model,patch_size)
        self.encoder = Encoder(num_layers, d_model, num_heads, dff, rate)

        self.decoder = Decoder(num_layers, d_model, num_heads, dff, 
                               target_vocab_size, rate)
        # 這個 FFN 輸出跟中文字典一樣大的 logits 數，等通過 softmax 就代表每個中文字的出現機率
        self.final_layer = tf.keras.layers.Dense(target_vocab_size)

    # enc_padding_mask 跟 dec_padding_mask 都是英文序列的 padding mask，
    # 只是一個給 Encoder layer 的 MHA 用，一個是給 Decoder layer 的 MHA 2 使用
    def call(self, inp, tar,look_forward_mask, training=True):
        
        inp = self.ImagePatches(inp)
        
        enc_output = self.encoder(inp, training)  # (batch_size, inp_seq_len, d_model)

        # dec_output.shape == (batch_size, tar_seq_len, d_model)
        dec_output = self.decoder(tar, enc_output,look_forward_mask, training)

        # 將 Decoder 輸出通過最後一個 linear layer
        final_output = self.final_layer(dec_output)  # (batch_size, tar_seq_len, target_vocab_size)

        return final_output

In [None]:
T = Transformer(16,2,32,2,64,36)
A = T((Train_input/255).astype('float32'),Train_target,look_forward_mask)


# Hyperparameter

In [None]:
num_layers = 1
d_model = 32
dff = 64
num_heads = 2
patch_size = 16

num_of_token = 36
dropout_rate = 0.1 

In [None]:
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
    from_logits=True, reduction='none')

train_loss = tf.keras.metrics.Mean(name='train_loss')
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
    name='train_accuracy')

In [None]:
class CustomSchedule(tf.keras.optimizers.schedules.LearningRateSchedule):
    # 論文預設 `warmup_steps` = 4000
    def __init__(self, d_model, warmup_steps=4):
        super(CustomSchedule, self).__init__()

        self.d_model = d_model
        self.d_model = tf.cast(self.d_model, tf.float32)

        self.warmup_steps = warmup_steps

    def __call__(self, step):
        arg1 = tf.math.rsqrt(step)
        arg2 = step * (self.warmup_steps ** -1.5)

        return tf.math.rsqrt(self.d_model) * tf.math.minimum(arg1, arg2)

# 將客製化 learning rate schdeule 丟入 Adam opt.
# Adam opt. 的參數都跟論文相同
learning_rate = CustomSchedule(d_model)
optimizer = tf.keras.optimizers.Adam(0.003, beta_1=0.9, beta_2=0.98, 
                                     epsilon=1e-9,decay = 0.00)

In [None]:
cartransformer = Transformer(patch_size,num_layers, d_model, num_heads, dff,num_of_token, dropout_rate)

In [None]:
checkpoint_path = (r'C:\Users\User\GGjupyter\checkpoint\Car_transformer_1layer')
log_dir = (r'C:\Users\User\GGjupyter\checkpoint\Car_transformer_1layer')

# tf.train.Checkpoint 可以幫我們把想要存下來的東西整合起來，方便儲存與讀取
# 一般來說你會想存下模型以及 optimizer 的狀態
ckpt = tf.train.Checkpoint(cartransformer=cartransformer,
                           optimizer=optimizer)

# ckpt_manager 會去 checkpoint_path 看有沒有符合 ckpt 裡頭定義的東西
# 存檔的時候只保留最近 5 次 checkpoints，其他自動刪除
ckpt_manager = tf.train.CheckpointManager(ckpt, checkpoint_path, max_to_keep=5)

# 如果在 checkpoint 路徑上有發現檔案就讀進來
if ckpt_manager.latest_checkpoint:
    ckpt.restore(ckpt_manager.latest_checkpoint)

    # 用來確認之前訓練多少 epochs 了
    last_epoch = int(ckpt_manager.latest_checkpoint.split("-")[-1])
    print(f'已讀取最新的 checkpoint，模型已訓練 {last_epoch} epochs。')
else:
    last_epoch = 0
    print("沒找到 checkpoint，從頭訓練。")

In [None]:
@tf.function  
def train_step(inp, tar):
    
    tar_inp = tar[:, :-1]
    tar_real = tar[:, 1:]
    
    look_forward_mask = create_look_forward_mask(tar_inp.shape[1])
    
    with tf.GradientTape() as tape:
       
        predictions = cartransformer(inp, tar_inp,look_forward_mask, True)
        
        loss = loss_object(tar_real, predictions)
   
    gradients = tape.gradient(loss, cartransformer.trainable_variables)    
    optimizer.apply_gradients(zip(gradients, cartransformer.trainable_variables))

    train_loss(loss)
    train_accuracy(tar_real, predictions)

In [None]:
EPOCHS = 80
from tqdm import tqdm


# 用來寫資訊到 TensorBoard，非必要但十分推薦
summary_writer = tf.summary.create_file_writer(log_dir)

# 比對設定的 `EPOCHS` 以及已訓練的 `last_epoch` 來決定還要訓練多少 epochs
for epoch in range(EPOCHS):

  
    # 重置紀錄 TensorBoard 的 metrics
    train_loss.reset_states()
    train_accuracy.reset_states()
  
  # 一個 epoch 就是把我們定義的訓練資料集一個一個 batch 拿出來處理，直到看完整個數據集 
    i=1
    for inp, tar in zip((Train_input).astype('float32'),Train_target):
#         print("epoch", epoch+1,"step",i)
        inp = tf.convert_to_tensor(inp)
        inp = inp[tf.newaxis, ...]
        
        tar = tf.convert_to_tensor(tar)
        tar = tar[tf.newaxis, ...]
        

    # 每次 step 就是將數據丟入 Transformer，讓它生預測結果並計算梯度最小化 loss
        train_step(inp, tar)  
        i+=1
  # 每個 epoch 完成就存一次檔    
    if (epoch + 1) % 1 == 0:
        ckpt_save_path = ckpt_manager.save()
        print ('Saving checkpoint for epoch {} at {}'.format(epoch+1,
                                                             ckpt_save_path))
    
  # 將 loss 以及 accuracy 寫到 TensorBoard 上
    with summary_writer.as_default():
        tf.summary.scalar("train_loss", train_loss.result(), step=epoch + 1)
        tf.summary.scalar("train_acc", train_accuracy.result(), step=epoch + 1)
  
        print('Epoch {} Loss {:.4f} Accuracy {:.4f}'.format(epoch + 1, 
                                                train_loss.result(), 
                                                train_accuracy.result()))

In [None]:
cartransformer.summary()

In [None]:
def evaluate(inp_sentence):
  
    # 準備英文句子前後會加上的 <start>, <end>
    start_token = 1
    end_token = 3

    inp_sentence = inp_sentence[tf.newaxis, ...]

    # 跟我們在影片裡看到的一樣，Decoder 在第一個時間點吃進去的輸入
    # 是一個只包含一個中文 <start> token 的序列
    decoder_input = [1]
    output = tf.expand_dims(decoder_input, 0)  # 增加 batch 維度
    # auto-regressive，一次生成一個中文字並將預測加到輸入再度餵進 Transformer
    for i in range(10):
    # 每多一個生成的字就得產生新的遮罩
        look_forward_mask = create_look_forward_mask(output.shape[1])
        # predictions.shape == (batch_size, seq_len, vocab_size)
        predictions = cartransformer(inp_sentence,output,look_forward_mask,False )
        # 將序列中最後一個 distribution 取出，並將裡頭值最大的當作模型最新的預測字
        predictions = predictions[: , -1:, :]  # (batch_size, 1, vocab_size)
        predicted_id = tf.cast(tf.argmax(predictions, axis=-1), tf.int32)
        # 遇到 <end> token 就停止回傳，代表模型已經產生完結果
        if tf.equal(predicted_id, end_token):
            Carplate = np.asarray(output)
#             print('A')
            print([tokenizer.index_word[idx].upper() for idx in Carplate[0]])
            return tf.squeeze(output, axis=0)
            
        
        Carplate = np.asarray(output)
#         print('B')
        print([tokenizer.index_word[idx].upper() for idx in Carplate[0]])
        #將 Transformer 新預測的中文索引加到輸出序列中，讓 Decoder 可以在產生
        # 下個中文字的時候關注到最新的 `predicted_id`
        output = tf.concat([output, predicted_id], axis=-1)

        # 將 batch 的維度去掉後回傳預測的中文索引序列
    return tf.squeeze(output, axis=0)

In [None]:
print([tokenizer.index_word[idx].upper() for idx in Carplate])

In [None]:
print([tokenizer.index_word[idx].upper() for idx in Train_target[0]])

In [None]:
Train_target[0]

In [None]:
test_img = Train_input[22]
# Train_input[22]/255

In [None]:
for inp, tar in zip((Train_input/255).astype('float32'),Train_target):
    try:
        inp = tf.convert_to_tensor(inp)
        inp = inp[tf.newaxis, ...]
        
        tar = tf.convert_to_tensor(tar)
        tar = tar[tf.newaxis, ...]
        tar_inp = tar[:, :-1]
        tar_real = tar[:, 1:]
        
        inp = np.asarray(inp[0])
        tar = np.asarray(tar[0])
        tar_inp = np.asarray(tar_inp[0])
        tar_real = np.asarray(tar_real[0])
        
        
        print([tokenizer.index_word[idx].upper() for idx in tar])
        print([tokenizer.index_word[idx].upper() for idx in tar_inp])
        print([tokenizer.index_word[idx].upper() for idx in tar_real])
        io.imshow(inp*255)
        io.show()
    except:
        pass