In [1]:
import time
import re
import os
import ipywidgets as widgets
from threading import Timer
from IPython.display import display
from faster_whisper import WhisperModel


In [2]:
class TextBufferWidget:
    def __init__(self):
        self.text_area = widgets.Textarea(layout=widgets.Layout(width='600px', height='600px'))
        self.original_sentences = []
        self.modified_sentences_info = []
        self.edit_timer = None
        self.lock = False
        
        display(self.text_area)
    
        # Event listener for text changes
        self.text_area.observe(self.on_text_change, names='value')
        
    def do_lock(self, mode):
        """Lock or unlock the text area."""
        self.lock = mode

    def update(self, new_content):
        """Append new content and update original sentences."""
        self.lock = True
        self.text_area.value += new_content
        sentences = new_content.split('. ')
        self.original_sentences.extend(sentences)
        for sent in sentences:
            self.modified_sentences_info.append({
                'original': sent,
                'modified': False,
                'changes': []
            })
        self.lock = False
            
    def on_text_change(self, change):
        """Handle text modifications and potentially trigger transcribe."""
        if not self.lock:
            if self.edit_timer is not None:
                self.edit_timer.cancel()

            self.edit_timer = Timer(0.5, self.process_modification, [change])
            self.edit_timer.start()

    def process_modification(self, change):
        """Handle text modifications."""
        current_text = change['new'].split('. ')
        for i, (orig_dict, curr_sent) in enumerate(zip(self.modified_sentences_info, current_text)):
            if orig_dict['original'] != curr_sent:
                orig_dict['modified'] = True
                orig_dict['changes'] = self.compare_sentences(orig_dict['original'], curr_sent)
                
        print(self.modified_sentences_info)
        self.pre_retranscribe()

    def compare_sentences(self, original, current):
        """Compare two sentences and return a list of changes."""
        orig_words = original.split()
        curr_words = current.split()
        changes = []
        for orig_word, curr_word in zip(orig_words, curr_words):
            if orig_word != curr_word:
                changes.append({'original': orig_word, 'modified': curr_word})
        return changes

    def get_modified_sentences(self):
        """Return detailed info about modified sentences."""
        return [info for info in self.modified_sentences_info if info['modified']]
    
    def pre_retranscribe(self):
        """Process the text up to the point of the first modification."""
        start_index_of_modification = self.find_first_modification_index()
        print('Mods from ', start_index_of_modification)

        if start_index_of_modification is not None:
            # Delete text from the index of the first modification
            # self.delete_from_index(start_index_of_modification)
            # Extract modifications for processing (if needed)
            modifications = ' '.join([' '.join(d['modified'] for d in info['changes']) for info in self.modified_sentences_info if info['modified']])
            print('PROMPT:', modifications)
        else:
            print('PROMPT: ❌')

    def find_first_modification_index(self):
        """
        Find the index of the first modification in the text.
        Returns the index of the first character of the first modified sentence in the full text,
        or None if there are no modifications.
        """
        last_original_index = 0
        for i, info in enumerate(self.modified_sentences_info):
            if info['modified']:
                break
            else:
                last_original_index = i
                
        return last_original_index + 1 if last_original_index is not None and last_original_index + 1 < len(self.modified_sentences_info) else None
                    
    def find_start_index_of_sentence(self, text, sentence_index):
        """Find the start index of a sentence in the full text."""
        sentences = text.split('. ')
        return len('. '.join(sentences[:sentence_index]))
    
    def delete_from_index(self, index):
        """Delete text starting from the specified index."""
        if index < 0 or index >= len(self.text_area.value):
            print("Invalid index")
            return

        # Delete the text from the TextArea
        self.text_area.value = self.text_area.value[:index]

        # Update original_sentences and modified_sentences_info
        updated_text = self.text_area.value
        updated_sentences = updated_text.split('. ')
        self.original_sentences = updated_sentences
        self.modified_sentences_info = [{'original': sent, 'modified': False, 'changes': []} for sent in updated_sentences]

In [3]:
class Processor:
    def __init__(self):
        self.model_size = "large-v2"
        self.model = WhisperModel(self.model_size, device="cpu", compute_type="float32")
        self.textBufferWidget = TextBufferWidget()

    def extract_number(self, filename):
        match = re.search(r'audio-(\d+)\.wav', filename)
        if match:
            return int(match.group(1))
        return -1

    def concat_previous(self, str_list):
        result = ""
        for s in str_list:
            clean_str = s.strip()
            if result and not result.endswith((' ', '.', ',')):
                result += " "
            result += clean_str
        return result

    def process_record(self, d, numSents=2):
        previous = None
        for file in sorted(os.listdir(d), key=self.extract_number):
            pattern = r'audio-(\d+)\.wav'
            match = re.match(pattern, file)
            if match:
                data = os.path.join(d, file)
                prompt = self.concat_previous(previous) if previous is not None else None
                segments, info = self.model.transcribe(data, beam_size=5, initial_prompt=prompt)
                concat = ""
                for segment in segments:
                    concat += segment.text

                if (not isinstance(previous, list)):
                        previous = [concat]
                else:
                    if (len(previous) == numSents):
                        previous = [concat] + previous[:-1]
                    else:
                        previous.append(concat)
                self.textBufferWidget.update(concat)

Textarea(value='', layout=Layout(height='200px', width='400px'))

[{'original': ' On behalf of the Green Zephyr Group, I would like to thank Mr', 'modified': False, 'changes': []}, {'original': 'Brake for his great report.', 'modified': True, 'changes': [{'original': 'report.', 'modified': 'report'}]}, {'original': ' And we appreciate his point that consulting the VISA using the number of the VISA sticker in combination with the verification of fingerprints will create a lot of problems.', 'modified': True, 'changes': [{'original': 'problems.', 'modified': 'problems'}]}, {'original': ' And therefore, we welcome introducing of derogation in exceptional cases, consulting the VISA without verification of fingerprints.', 'modified': True, 'changes': [{'original': 'fingerprints.', 'modified': 'fingerprints'}]}, {'original': ' Nevertheless, in our opinion, the report is not as ambitious as it should be.', 'modified': True, 'changes': [{'original': 'be.', 'modified': 'be'}]}, {'original': ' The derogation must be a general rule instead.', 'modified': True, 

In [4]:
process_record("/Users/karelvlk/Developer/mff/ufal/whisper-prompting/ESICv1.0/dev/20080901/018_006_EN_Ždanoka")