<a href="https://colab.research.google.com/github/choarauc/form/blob/main/reco_vocale_WaneNet.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

https://datascientest.fr/train/assets/python_nlp_speech_text_correspondance.png

Modèle pour la reconnaissance manuscrite

séparer l'image en fragment découpé de gauche à droite et d'extraire des features en utilisant un réseau convolutionnel. Puisque, le texte manuscrit est écrit de gauche à droite, le résultat peut être alors représenté sous forme d'une séquence temporelle.



afficher une aperçu du modèle de reconnaissance optique.

In [1]:
# P(y|X) avec y le caractère et X le fragment de l'image

import tensorflow as tf
from tensorflow.keras.layers import Conv2D, Conv2DTranspose, BatchNormalization, MaxPooling2D, LeakyReLU, Lambda, Dense, Dropout
from tensorflow.keras.layers import GRU, Bidirectional
from keras.utils.vis_utils import plot_model
numHidden = 256
alphabet = ' abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'

model_ocr = tf.keras.Sequential()
# Convolution Part : Extraction Feature
# Layer 1
model_ocr.add(Conv2D(filters=32, kernel_size=(5,5), padding='SAME', input_shape = (128, 32, 1)))

model_ocr.add(BatchNormalization())
model_ocr.add(LeakyReLU())
model_ocr.add(MaxPooling2D(pool_size=(2,2), strides=(2,2)))

# Layer 2
model_ocr.add(Conv2D(filters=64, kernel_size=(5,5), padding='SAME'))
model_ocr.add(BatchNormalization())
model_ocr.add(LeakyReLU())
model_ocr.add(MaxPooling2D(pool_size=(2,2), strides=(2,2)))

# Layer 3
model_ocr.add(Conv2D(filters=128, kernel_size=(3,3), padding='SAME'))
model_ocr.add(BatchNormalization())
model_ocr.add(LeakyReLU())
model_ocr.add(MaxPooling2D(pool_size=(1,2), strides=(1,2)))

# Layer 4
model_ocr.add(Conv2D(filters=128, kernel_size=(3,3), padding='SAME'))
model_ocr.add(BatchNormalization())
model_ocr.add(LeakyReLU())
model_ocr.add(MaxPooling2D(pool_size=(1,2), strides=(1,2)))

# Layer 5
model_ocr.add(Conv2D(filters=256, kernel_size=(3,3), padding='SAME'))
model_ocr.add(BatchNormalization())
model_ocr.add(LeakyReLU())
model_ocr.add(MaxPooling2D(pool_size=(1,2), strides=(1,2)))


# Remove axis 2
model_ocr.add(Lambda(lambda x :tf.squeeze(x, axis=2)))

# Bidirectionnal RNN
model_ocr.add(Bidirectional(GRU(numHidden, return_sequences=True)))
# Classification of characters
model_ocr.add(Dense(len(alphabet)+1))

model_ocr.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv2d (Conv2D)             (None, 128, 32, 32)       832       
                                                                 
 batch_normalization (BatchN  (None, 128, 32, 32)      128       
 ormalization)                                                   
                                                                 
 leaky_re_lu (LeakyReLU)     (None, 128, 32, 32)       0         
                                                                 
 max_pooling2d (MaxPooling2D  (None, 64, 16, 32)       0         
 )                                                               
                                                                 
 conv2d_1 (Conv2D)           (None, 64, 16, 64)        51264     
                                                                 
 batch_normalization_1 (Batc  (None, 64, 16, 64)       2

https://datascientest.fr/train/assets/exam_tensorflow_all_model.png

Modèle de reconnaissance vocal : WaveNet 



In [2]:
import tensorflow as tf
import numpy as np
X = np.expand_dims(np.arange(11, dtype=float), -1)/1

layer_conv1d = tf.keras.layers.Conv1D(1, 2, padding='same', use_bias=False, dilation_rate=2)
layer_conv1d.build([1, 11, 1])
layer_conv1d.weights[0].assign(np.ones([2,1,1]))
with tf.device('/cpu:0'):
    y = layer_conv1d(np.array([X])).numpy()

print('Input :')
print(X, '\n')
print('Output :')
print(y)

Input :
[[ 0.]
 [ 1.]
 [ 2.]
 [ 3.]
 [ 4.]
 [ 5.]
 [ 6.]
 [ 7.]
 [ 8.]
 [ 9.]
 [10.]] 

Output :
[[[ 1.]
  [ 2.]
  [ 4.]
  [ 6.]
  [ 8.]
  [10.]
  [12.]
  [14.]
  [16.]
  [18.]
  [ 9.]]]


définir les classes de notre modèle de reconnaissance vocal.

In [None]:
class AtrousConv1D(tf.keras.layers.Layer):
    def __init__(self,
                 filters,
                 kernel_size,
                 dilation_rate,
                 use_bias=True,
                 kernel_initializer=tf.keras.initializers.GlorotNormal(),
                 causal=True
                ):
        super(AtrousConv1D, self).__init__()
        
        self.filters = filters
        self.kernel_size = kernel_size
        self.dilation_rate = dilation_rate
        self.causal = causal
        
        # Convolution with dilation
        self.conv1d = tf.keras.layers.Conv1D(
            filters=filters,
            kernel_size=kernel_size,
            dilation_rate=dilation_rate,
            padding='valid' if causal else 'same',
            use_bias=use_bias,
            kernel_initializer=kernel_initializer
        )
        
    def call(self, inputs):
        # If padding 'valid', the shape of tensor change.
        if self.causal:
            padding = (self.kernel_size - 1) * self.dilation_rate
            inputs = tf.pad(inputs, tf.constant([(0, 0,), (1, 0), (0, 0)]) * padding)
        
        return self.conv1d(inputs)
    
    
class ResidualBlock(tf.keras.layers.Layer):
    def __init__(self, filters, kernel_size, dilation_rate, causal, **kwargs):
        super(ResidualBlock, self).__init__(**kwargs)
        
        self.batch_normalization = tf.keras.layers.BatchNormalization()
        
        # First convolution of ResidualBloack
        self.dilated_conv1 = AtrousConv1D(
            filters=filters,
            kernel_size=kernel_size,
            dilation_rate=dilation_rate,
            causal=causal
        )
        
        # Second convolution of ResidualBloack
        self.dilated_conv2 = AtrousConv1D(
            filters=filters,
            kernel_size=kernel_size,
            dilation_rate=dilation_rate,
            causal=causal
        )
        
        self.out = tf.keras.layers.Conv1D(
            filters=filters,
            kernel_size=1
        )
        
    def call(self, inputs, training=True):
        # Normalization of data
        data = self.batch_normalization(
            inputs
        )
        # Dilated convolution filters
        filters = self.dilated_conv1(data)
        filters = tf.nn.tanh(filters)
        
        # Dilated convolution gates
        gates = self.dilated_conv2(data) 
        gates = tf.nn.sigmoid(gates)
        
        # Elem-wise multiply
        out = tf.nn.tanh(
            self.out(
                filters * gates
            )
        )
        
        return out + inputs, out
    
        
class ResidualStack(tf.keras.layers.Layer):
    def __init__(self, filters, kernel_size, dilation_rates, causal, **kwargs):
        super(ResidualStack, self).__init__(**kwargs)
        
        # Definition of all Residual Block
        self.blocks = [
            ResidualBlock(
                filters=filters,
                kernel_size=kernel_size,
                dilation_rate=dilation_rate,
                causal=causal
            )
            for dilation_rate in dilation_rates
        ]
        
    def call(self, inputs, training=True):
        data = inputs
        skip = 0
        
        for block in self.blocks:
            # Output of Residual Block
            data, current_skip = block(data, training=training)
            # add all each skip connection
            skip += current_skip

        return skip


class SpeechNet(tf.keras.Model):
    def __init__(self, params, **kwargs):
        super(SpeechNet, self).__init__(**kwargs)
        
        self.batchnormalization1 =tf.keras.layers.BatchNormalization()
        
        # Expand convolution: extract features
        self.expand = tf.keras.layers.Conv1D(
            filters = params['stack_filters'],
            kernel_size=1,
            padding='same'
        )
        # Definition of all Residual Stack
        self.stacks = [
            ResidualStack(
                filters=params['stack_filters'],
                kernel_size=params['stack_kernel_size'],
                dilation_rates=params['stack_dilation_rates'],
                causal=params['causal_convolutions']
            )
            for _ in range(params['stacks'])
        ]
        # Definition of the last convolution
        self.out = tf.keras.layers.Conv1D(
            filters=len(params['alphabet']) + 1,
            kernel_size=1,
            padding='same'
        )
        
        self.batchnormalization2 = tf.keras.layers.BatchNormalization()
        
    def call(self, inputs, training=True):
        # Data Normalization
        data = self.batchnormalization1(
            inputs
        )
        
        # Right shape for convolution.
        if len(data.shape) == 2:
            data = tf.expand_dims(data, 0)
            
        # Extract features    
        data = self.expand(data)
        
        # Residual Stack
        for stack in self.stacks:
            data = stack(data, training=training)
        
        # Data Normalization
        data = self.batchnormalization2(
            data
        )
        
        return self.out(data) + 1e-8

modele de reconnaissance vocale

In [None]:
params = {
    'max_wave_length': 20,
    'alphabet': ' !"&\',-.01234:;\\abcdefghijklmnopqrstuvwxyz',
    'causal_convolutions': False,
    'stack_dilation_rates': [1, 3, 9, 27],
    'stacks': 6,
    'stack_kernel_size': 7,
    'stack_filters': 3*128,
    'sampling_rate': 16000,
    'n_fft': 160*8,
    'frame_step': 160*4,
    'lower_edge_hertz': 0,
    'upper_edge_hertz': 8000,
    'num_mel_bins': 160
}

model = SpeechNet(params)
number_exploited_data = params['max_wave_length']*params['sampling_rate']-params['n_fft']
lengths = int(number_exploited_data/params['frame_step']+1)
model(np.random.uniform(size=[1, lengths, params['num_mel_bins']]))
model.load_weights('model/model.h5')

In [None]:
# load_wave charger les dix premiers fichiers audio dans la variable X.

def load_wave(path_audio, params):
    # Load audio
    wave, fe = load_audio(path_audio)
    # Return None for a too long file
    if len(wave) > params['max_wave_length']*fe:
        print('Shape invalid')
        return None
   
    # After this transformation add zeroes to have the right shape
    else :
        return np.concatenate([wave, np.zeros(params['max_wave_length']*fe - len(wave))])
    
X_audio = [load_wave(p, params) for p in df.audio_path[:10]]
y = df.text[:10]

Inférence - CTC Decoder

les fichiers audios X_audio sous la forme d'un tableau array de log mel spectrogramme dans la variable X.

In [None]:
fe = 16000
X = np.array([logMelSpectrogram(audio, params, fe) for audio in X_audio])

la méthode de Best path decoding sur la matrice p_matrix.

In [None]:
p_matrix = np.array(
        [[0.3, 0.1, 0.05, 0.05, 0.5],
        [0.5, 0.05, 0.05, 0.1, 0.3],
        [0.3, 0.1, 0.05, 0.05, 0.5],
        [0.5, 0.05, 0.05, 0.1, 0.3],
        [0.3, 0.1, 0.05, 0.05, 0.5],
        [0.5, 0.05, 0.05, 0.1, 0.3],
        [0.3, 0.1, 0.05, 0.05, 0.5],
        [0.5, 0.05, 0.05, 0.1, 0.3],
        [0.3, 0.1, 0.05, 0.05, 0.5],
        [0.5, 0.05, 0.05, 0.1, 0.3]])

print('Transpose probabilty matrix')
print(p_matrix.T)
# Example of greedy_decoder
def greedy_decoder(data):
    # index for largest probability each row
    return [np.argmax(s) for s in data]

greedy_decoder(p_matrix)

Beam search decoder

In [None]:
# Example of beam search decoder
def beam_search_decoder(data, k):
    sequences = [[list(), 1.0]]
    # walk over each step in sequence
    for row in data:
        all_candidates = list()
        # expand each current candidate
        for i in range(len(sequences)):
            seq, score = sequences[i]
            for j in range(len(row)):
                candidate = [seq + [j], score * row[j]]
                all_candidates.append(candidate)
        # order all candidates by score
        ordered = sorted(all_candidates, key=lambda tup:tup[1], reverse=True)
        # select k best
        sequences = ordered[:k]
    return sequences
 

print(p_matrix.T)


# decode sequence
result = beam_search_decoder(p_matrix, 3)

# print result
for seq in result:
    print(seq)

a fonction ctc_beam_search_decoder de tf.nn retourne un tuple (decoded, log_probabilities) à l'aide d'un décodeur beam search decoder. L'élément decoded est un SparseTensor contenant la sortie décodée. Et, log_probabilities est le log de la probabilité présenté dans la partie précédente.

La fonction a comme argument :

inputs: Tensor de forme [max_time x batch_size x num_classes] représentant la sortie de notre réseau de neuronne.
sequence_length: Vecteur de taille [batch_size] représentant la longueur de la séquence pour chaque élément dans le batch de données.
beam_width: Nombre de meilleurs chemins uniques.

 définir le beam search decoder de tensorflow.

In [None]:
def decode_codes(codes, charList):
    table = tf.lookup.StaticHashTable(
        tf.lookup.KeyValueTensorInitializer(
            np.arange(len(charList)),
            charList,
            key_dtype=tf.int32
        ),
        '',
        name='id2char'
    )
    return table.lookup(codes)

def greedy_decoder(logits, params):
    # ctc beam search decoder
    predicted_codes, _ = tf.nn.ctc_beam_search_decoder(
        inputs = tf.transpose(logits, (1, 0, 2)),
        sequence_length = [logits.shape[1]]*logits.shape[0],
        beam_width = 100,
        top_paths = 1
    )
    # convert to int32
    codes = tf.cast(predicted_codes[0], tf.int32)
    
    # Decode the index of caracter
    text = decode_codes(codes, list(params['alphabet']))
    
    # Convert a SparseTensor to string
    text = tf.sparse.to_dense(text).numpy().astype(str)
    
    return list(map(lambda x: ''.join(x), text))

Résultat

In [None]:
y_logit = model(X)
transcriptions = greedy_decoder(y_logit, params)
transcriptions

In [None]:
# prédiction et la vrai transcription du premier fichier audio de X_audio.

In [None]:
sample_id = 0
print('Prediction :\n', transcriptions[sample_id], '\n')
print('Real Transcription :\n', y[sample_id])
Audio(X_audio[sample_id], rate=fe)