### ML Model Process for Level 1

#### Import Library

In [55]:
import os
import numpy as np
import tensorflow as tf
import librosa
import pydub
from pydub import AudioSegment
from tensorflow import keras
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv1D, Flatten, Dense, Lambda, Dropout, MaxPooling1D
from tensorflow.keras import backend as K
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
from tensorflow.keras.models import save_model
from tensorflow.keras.models import load_model
from sklearn.model_selection import train_test_split


#### Audio Data Processing Functions

In [56]:
# Function to load and preprocess audio files
def load_and_preprocess(file_path, target_length=16000):
    audio, _ = librosa.load(file_path, sr=16000, mono=True)
    
    # Ensure audio length is not greater than target_length
    if len(audio) > target_length:
        audio = audio[:target_length]
    else:
        # Pad audio to target_length if shorter
        pad_amount = target_length - len(audio)
        audio = np.pad(audio, (0, pad_amount), mode='constant')
    
    # Normalize audio
    audio = audio / np.max(np.abs(audio))
    
    # Reshape audio to include time steps dimension
    audio = np.expand_dims(audio, axis=-1)
    
    return audio

In [57]:
# Function to load augmented dataset
def load_data(base_dir):
    sentences = []
    file_paths = []

    for sentence in os.listdir(base_dir):
        sentence_dir = os.path.join(base_dir, sentence)
        for file in os.listdir(sentence_dir):
            if file.endswith(".wav"):
                file_paths.append(os.path.join(sentence_dir, file))
                sentences.append(sentence)
    
    return np.array(file_paths), np.array(sentences)

In [58]:
# Function to create pairs of audio samples with their labels
def create_pairs(files, sentences):
    pairs = []
    labels = []
    num_samples = len(files)
    
    for i in range(num_samples):
        for j in range(i+1, num_samples):
            if sentences[i] == sentences[j]:
                pairs.append((i, j))
                labels.append(1)
            else:
                pairs.append((i, j)) 
                labels.append(0)
    
    return np.array(pairs, dtype=np.int32), np.array(labels, dtype=np.int32)

#### Data Generator for Training

In [59]:
def data_generator(files, sentences, batch_size=32, target_length=16000):
    while True:
        indices = np.random.permutation(len(files))
        pairs, labels = create_pairs(files, sentences)
        batch_start = 0
        while batch_start < len(pairs):
            batch_end = min(batch_start + batch_size, len(pairs))
            batch_indices = indices[batch_start:batch_end]
            batch_pairs = pairs[batch_indices]
            batch_labels = labels[batch_indices]
            
            audio_1 = np.array([load_and_preprocess(files[i], target_length) for i in batch_pairs[:, 0]])
            audio_2 = np.array([load_and_preprocess(files[i], target_length) for i in batch_pairs[:, 1]])
            
            if len(batch_pairs) == 0:
                break
            
            yield [audio_1, audio_2], batch_labels
            batch_start += batch_size

#### Define Siamese CNN Model

In [60]:
# Function to create the Siamese CNN model
def create_siamese_model(input_shape):
    def cnn_network(input_shape):
        model = tf.keras.Sequential()
        model.add(Conv1D(64, 5, activation='relu', input_shape=input_shape))
        model.add(MaxPooling1D(pool_size=2))
        model.add(Dropout(0.2))
        model.add(Conv1D(128, 5, activation='relu'))
        model.add(MaxPooling1D(pool_size=2))
        model.add(Dropout(0.2))
        model.add(Flatten())
        model.add(Dense(256, activation='relu'))
        model.add(Dropout(0.2))
        return model

    input_left = Input(shape=input_shape)
    input_right = Input(shape=input_shape)

    cnn = cnn_network(input_shape)

    encoded_left = cnn(input_left)
    encoded_right = cnn(input_right)

    L1_distance = Lambda(lambda x: K.abs(x[0] - x[1]))
    L1_distance_out = L1_distance([encoded_left, encoded_right])

    prediction = Dense(1, activation='sigmoid')(L1_distance_out)

    siamese_model = Model(inputs=[input_left, input_right], outputs=prediction)

    return siamese_model

#### Load and Split Data

In [61]:
# Load augmented dataset
output_dir = "tts_output_words"

file_paths, sentences = load_data(output_dir)

# Sel Split data into training and validation sets
train_files, val_files, train_sentences, val_sentences = train_test_split(file_paths, sentences, test_size=0.2, random_state=42)


#### Create Generators and Compile Model

In [62]:
# Create generators
train_gen = data_generator(train_files, train_sentences, batch_size=32, target_length=16000)
val_gen = data_generator(val_files, val_sentences, batch_size=32, target_length=16000)

In [63]:
# Define input shape
input_shape = (16000, 1)
siamese_model = create_siamese_model(input_shape)

# Compile the model
siamese_model.compile(loss='binary_crossentropy', optimizer=Adam(learning_rate=0.001), metrics=['accuracy'])
siamese_model.summary()

Model: "model_2"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_5 (InputLayer)        [(None, 16000, 1)]           0         []                            
                                                                                                  
 input_6 (InputLayer)        [(None, 16000, 1)]           0         []                            
                                                                                                  
 sequential_2 (Sequential)   (None, 256)                  1310154   ['input_5[0][0]',             
                                                          24         'input_6[0][0]']             
                                                                                                  
 lambda_2 (Lambda)           (None, 256)                  0         ['sequential_2[0][0]',  

#### Train the Model

In [52]:
# Callbacks for training
checkpoint = ModelCheckpoint('siamese_model.h5', monitor='val_loss', verbose=1, save_best_only=True, mode='min')
early_stopping = EarlyStopping(monitor='val_loss', patience=5, verbose=1, restore_best_weights=True)

In [53]:
# Train the model
steps_per_epoch = len(train_files) // 32
validation_steps = len(val_files) // 32

In [None]:
history = siamese_model.fit(
    train_gen,
    steps_per_epoch=steps_per_epoch,
    validation_data=val_gen,
    validation_steps=validation_steps,
    epochs=20,
    callbacks=[checkpoint, early_stopping],
    verbose=1
)

In [None]:
# Load an existing model
# model = load_model('siamese_model.h5')

# Save the model to a new .h5 file
# save_model(siamese_model, 'siamese_model.h5')

  save_model(siamese_model, 'siamese_model.h5')


In [None]:
# Load the trained Siamese model
siamese_model = tf.keras.models.load_model('siamese_model.h5', compile=False)

# Get the input shape of the model
input_shapes = [input_layer.shape.as_list() for input_layer in siamese_model.inputs]

# Replace None values with default dimensions (e.g., 224 for image size)
for shape in input_shapes:
    for i, dim in enumerate(shape):
        if dim is None:
            shape[i] = 224  # Replace with a default size, e.g., 224 for image dimensions

# Verify the input shapes
print(f"Input shapes: {input_shapes}")

# Define a function to generate representative dataset
def representative_dataset_gen():
    num_calibration_steps = 100
    for _ in range(num_calibration_steps):
        # Generate a batch of input data
        test_input_1 = np.random.rand(*input_shapes[0]).astype(np.float32)
        test_input_2 = np.random.rand(*input_shapes[1]).astype(np.float32)
        yield [test_input_1, test_input_2]

# Convert the model to TensorFlow Lite format
converter = tf.lite.TFLiteConverter.from_keras_model(siamese_model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
converter.inference_input_type = tf.uint8
converter.inference_output_type = tf.uint8

# Set the representative dataset for full integer quantization
converter.representative_dataset = representative_dataset_gen

# Convert the model to TensorFlow Lite format
tflite_model = converter.convert()

# Save the TensorFlow Lite model
with open('siamese_model_lite.tflite', 'wb') as f:
    f.write(tflite_model)


In [None]:
# Muat model Keras dari file .h5
siamese_model = tf.keras.models.load_model('siamese_model.h5')

# Buat konverter TFLite
converter = tf.lite.TFLiteConverter.from_keras_model(siamese_model)

# Mengatur optimasi ke kuantisasi float16
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.target_spec.supported_types = [tf.float16]

# Konversi model
tflite_model = converter.convert()

# Simpan model TFLite ke file
with open('model_quant_float16.tflite', 'wb') as f:
    f.write(tflite_model)

print("Model berhasil dikonversi dan disimpan sebagai model_quant_float16.tflite")


INFO:tensorflow:Assets written to: C:\Users\MUHAMM~1\AppData\Local\Temp\tmpym5ey2sl\assets


INFO:tensorflow:Assets written to: C:\Users\MUHAMM~1\AppData\Local\Temp\tmpym5ey2sl\assets


Model berhasil dikonversi dan disimpan sebagai model_quant_float16.tflite


#### Test and Predict

In [96]:
# Paths to the test audio files (wav format)
test_audio_file_1 = 'converted_kemarin_jelas.wav'
test_audio_file_2 = 'converted_kemarin_mayan.wav'

# Load and preprocess the test audio files
test_audio_1 = load_and_preprocess(test_audio_file_1, target_length=16000)
test_audio_2 = load_and_preprocess(test_audio_file_2, target_length=16000)

# Add batch dimension
test_audio_1 = np.expand_dims(test_audio_1, axis=0)
test_audio_2 = np.expand_dims(test_audio_2, axis=0)

In [None]:
# Load the trained Siamese model
siamese_model = tf.keras.models.load_model('siamese_model.h5', compile=False)

# Predict the similarity
similarity_score = siamese_model.predict([test_audio_1, test_audio_2])

# Define the min and max scores for normalization
min_score = 0.0
max_score = 0.5

# Normalize the similarity score to the range 0-100%
normalized_similarity_score = (similarity_score[0][0] - min_score) / (max_score - min_score) * 100

# Clip the value to ensure it stays within the 0-100% range
normalized_similarity_score = np.clip(normalized_similarity_score, 0, 100)

# Output the normalized similarity score
print(f'Similarity score: {normalized_similarity_score:.2f}%')
