In [None]:
import tensorflow as tf
import numpy as np
from data_prep import load_tfrecord_dataset, TextTransform

model = tf.keras.models.load_model('stt_custom_model.h5', compile=False)

text_transform = TextTransform()
test_dataset = load_tfrecord_dataset(
    'test.tfrecord',
    batch_size=12,
    shuffle=False
)

idx2char = text_transform.index_map
blank_index = len(idx2char)
num_samples = 5


def ctc_beam_search(log_probs, beam_width=10, blank_idx=None, idx2char=None):
    T, V = log_probs.shape
    if blank_idx is None:
        blank_idx = V - 1

    beams = {("", True): 0.0}
    for t in range(T):
        next_beams = {}
        for (pref, last_blank), score in beams.items():
            s_blank = score + log_probs[t, blank_idx]
            next_beams[(pref, True)] = np.logaddexp(
                next_beams.get((pref, True), -1e30), s_blank
            )
            for c in range(V):
                if c == blank_idx:
                    continue
                ch = idx2char.get(c, "")
                if not ch:
                    continue
                if len(pref) > 0 and ch == pref[-1] and not last_blank:
                    s_char = score + log_probs[t, c]
                    new_key = (pref, False)
                else:
                    s_char = score + log_probs[t, c]
                    new_key = (pref + ch, False)
                next_beams[new_key] = np.logaddexp(
                    next_beams.get(new_key, -1e30), s_char
                )
        beams = dict(sorted(next_beams.items(), key=lambda kv: kv[1], reverse=True)[:beam_width])
    best = max(beams.items(), key=lambda kv: kv[1])[0][0]
    return best

def collapse_repeats(text):
    collapsed = []
    prev = ''
    for ch in text:
        if ch != prev or ch == ' ':
            collapsed.append(ch)
        prev = ch
    return ''.join(collapsed).strip()

for i, (spectrogram, _, _, _) in enumerate(test_dataset.take(num_samples)):
    preds = model.predict(spectrogram, verbose=0)[0]
    log_probs = np.log(np.clip(preds, 1e-10, 1.0))

    decoded_beam = ctc_beam_search(
        log_probs,
        beam_width=10,
        blank_idx=blank_index,
        idx2char=idx2char
    )

    decoded_clean = collapse_repeats(decoded_beam)
    print(f"\n🟩 Sample {i+1}")
    print(f"Beam search raw: {decoded_beam}")
    print(f"Collapsed clean: {decoded_clean}")

In [1]:
import tensorflow as tf
import numpy as np
from data_prep import load_tfrecord_dataset, TextTransform
from model import build_model, ctc_loss
from tensorflow.keras import layers, Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.optimizers.schedules import ExponentialDecay

train_tfrecord = 'train.tfrecord'
test_tfrecord = 'test.tfrecord'
input_shape = (128, 1200, 1)
batch_size = 12
epochs = 15

In [2]:
train_dataset = load_tfrecord_dataset(train_tfrecord, batch_size=batch_size, shuffle=True)
test_dataset = load_tfrecord_dataset(test_tfrecord, batch_size=batch_size, shuffle=False)

text_transform = TextTransform()
vocab_size = len(text_transform.char_map)

model = build_model(input_shape, vocab_size)

In [3]:
labels = layers.Input(name='label', shape=(None,), dtype='int32')
input_length = layers.Input(name='input_length', shape=(1,), dtype='int64')
label_length = layers.Input(name='label_length', shape=(1,), dtype='int64')

loss_out = layers.Lambda(lambda x: ctc_loss(*x))([labels, model.output, input_length, label_length])
training_model = Model(inputs=[model.input, labels, input_length, label_length], outputs=loss_out)

lr_schedule = ExponentialDecay(initial_learning_rate=3e-5, decay_steps=500, decay_rate=0.95)
opt = Adam(learning_rate=lr_schedule, clipnorm=5.0, epsilon=1e-7)
training_model.compile(optimizer=opt, loss=lambda y_true, y_pred: y_pred)





In [None]:
for spectrogram, label, input_len, label_len in train_dataset.take(1):
    print("Spectrogram shape:", spectrogram.shape)
    print("Label shape:", label.shape)
    print("Input lengths:", input_len.numpy()[:5])
    print("Label lengths:", label_len.numpy()[:5])

true_len = int(label_len[0].numpy())
sample_label = label[0][:true_len].numpy()
decoded_text = text_transform.int_to_text(sample_label)
print("Decoded text:", decoded_text)

import matplotlib.pyplot as plt

sample_spec = spectrogram[0].numpy().squeeze()

plt.figure(figsize=(12, 5))
plt.imshow(sample_spec.T, aspect='auto', origin='lower')
plt.title(f"Mel-spektrogram\nTekst: \"{decoded_text.strip()}\"")
plt.xlabel("Vreme (frame-ovi)")
plt.ylabel("Mel frekvencije")
plt.colorbar(label='Intenzitet')
plt.show()

In [None]:
steps_per_epoch = 300
train_dataset = train_dataset.repeat()

for epoch in range(epochs):
    print(f"\nEpoha {epoch + 1}/{epochs}")
    total_loss = 0
    num_batches = 0

    for step, (spectrogram, label, input_len, label_len) in enumerate(train_dataset):
        input_len_b = tf.expand_dims(tf.cast(input_len, tf.int64), axis=-1)
        label_len_b = tf.expand_dims(tf.cast(label_len, tf.int64), axis=-1)
        loss = training_model.train_on_batch(
            [spectrogram, label, input_len_b, label_len_b],
            np.zeros(len(spectrogram))
        )
        total_loss += loss
        num_batches += 1
        if (step + 1) % 20 == 0:
            print(f"  Korak {step + 1}, gubitak(loss): {loss:.4f}")

        if step + 1 >= steps_per_epoch:
            break

    avg_loss = total_loss / num_batches
    print(f"Prosečan gubitak: {avg_loss:.4f}")

model.save('stt_custom_model.h5')
print("Model je sačuvan kao stt_custom_model.h5")

blank_index = len(text_transform.char_map)
for spectrogram, _, _, _ in test_dataset.take(3):
    preds = model.predict(spectrogram, verbose=0)
    pred_indices = np.argmax(preds, axis=-1)
    deduped = []
    prev = -1
    for idx in pred_indices[0]:
        if idx != prev and idx != blank_index:
            deduped.append(idx)
        prev = idx
    tekst = ''.join([text_transform.index_map.get(i, '') for i in deduped])
    print("\nPrimer predikcije:", tekst)