**This Gradio app version 4 updated on 15th Oct 2024.**

*   Please change beep sound wave filepath according to your local dir in **"Beeped_Audio_Path": line 254**

*  change the whisper model  at **whisper.load_model** (medium/large/small)
* Whisper version should be Nov 06, 2023 version. **pip install openai-whisper==20231106**



In [None]:
from google.colab import drive
drive.mount('/content/drive')

**Other Notes:**

*   Input audio: TEST_9.mp3,TEST_10.wav,TEST_11.mp3
*   Spacy transformer based Models
1.   Final_augmented_data_base_sim_0.6_trf.zip or
2.   Final_augmented_data_base_sim_0.3_trf.zip
* output file stored in **pii_beep_audio_uploads**
For example, *new_1083801646TEST_11.wav* for TEST_11.mp3

**Code changelog**
1.   Minor change in whisper.transcribe function parameters
2.   Removed the fullstop [.] or comma [.] on the transcription_text except
     email on line 83. It improves the accuracy of the detection


3.   The Submit button enables **only when Audio file /Model loaded**.otherwise it is disabled.
4.   Output Beeped Audio will have Timestamp like
    **beeped_audio_2024_10_14_02_15_21_TEST_10**.wav



In [None]:
#@markdown **GPU check and Python version check** (you typically atleast A100 GPU)
!nvidia-smi -L
#!nvidia-smi
!python3 --version


In [None]:
#@markdown **Google Drive mount**
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import os
os.chdir('/content/drive/MyDrive/Colab Notebooks/Pipeline/NER')
print("Current directory:", os.getcwd())
directory = '/content/drive/MyDrive/Colab Notebooks/Pipeline/NER'
filename = '/content/drive/MyDrive/Colab Notebooks/beep2.mp3'
BeepAudiofileName = os.path.join(directory, filename)
print("Beep Audio File path:", BeepAudiofileName)

In [None]:
!pwd

In [None]:
# ! and % command similar function. % handles well if multiple version of a library is detected in the system
#%pip uninstall openai-whisper -y
#%pip install gradio

!pip install openai-whisper==20231106
!sudo apt install python3-pip

In [None]:
import whisper
print(whisper.__version__)

In [None]:
# To avoid the WARNING:whisper_timestamped:Please install onnxruntime to use more efficiently silero VAD
!pip install silero-vad

In [None]:
!sudo apt update && sudo apt install ffmpeg
!sudo pip3 install setuptools-rust
%pip install openai
%pip install openai-whisper
%pip install whisper-timestamped
%pip install gradio
!pip install gradio-rich-textbox

In [None]:
import gradio as gr
import os
import random
import whisper_timestamped as whisper
from pydub import AudioSegment
import numpy as np
import spacy
import torch
import threading
import zipfile
import shutil
from pathlib import Path
from werkzeug.utils import secure_filename
import time
from gradio_rich_textbox import RichTextbox
import re
from datetime import datetime

In [20]:
# Worker class to process the audio file and load models
class Worker(threading.Thread):
    def __init__(self, audio_file_path, model_directory, callback):
        threading.Thread.__init__(self)
        self._AudiofileName = audio_file_path
        self._ModelDirectory = model_directory
        #self._BeepAudiofileName = "beep2.mp3"
        self._BeepAudiofileName = BeepAudiofileName
        self.callback = callback

        self._PII_text_and_Timestamp =""
        self._Transcribe_Text_With_Entities =""
        self._Metrics =""
        self._BeepedAudiofileName =""

        print(f"Audio File: {self._AudiofileName}")
        print(f"Model Directory: {self._ModelDirectory}")
        print(f"Beep Audio File: {self._BeepAudiofileName}")

    def run(self):
        try:
            print("loading SpaCy model with custom model ",str(self._ModelDirectory))
            # Load spaCy model from directory or a known model name
            self.nlp = spacy.load(str(self._ModelDirectory))
            print("SpaCy model loaded.")

            # Load Whisper model
            devices = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
            print(devices)
            time.sleep(0.2)
            self.model = whisper.load_model("medium", device=devices)

            print("Whisper model loaded.")

            self.processData()
            self.callback("callback Processing complete!")

        except Exception as e:
            print(f"Error during processing: {str(e)}")

    def count_entities(self,entities):
        entity_counts = {}  # Initialize an empty dictionary to store counts

        for _, entity_type in entities:
            # Increment the count for each entity type
            entity_counts[entity_type] = entity_counts.get(entity_type, 0) + 1

        return entity_counts

    def colorize_entities(self, data, entities):
        # Define color mappings (you can customize these)
        color_map = {
            'PERSON': 'blue',
            'GPE': 'green',
            'LOC': 'purple',
            'PHONE': 'orange',
            'EMAIL': 'blue',
            'CAR_PLATE':'red',
            'ORG':'purple',
            'NRIC': 'red',
            'PASSPORT_NUM':'green'
        }

        print("entities",entities)
        # Replace entities with colored versions
        for entity, entity_type in entities:
            #print("before update data",data)
            color = color_map.get(entity_type, 'blue')  # Default to blue if type not found
            colored_entity = f'<span style="color: {color};">{entity} {entity_type}</span>'
            data = data.replace(entity, colored_entity)
            #print("after update data",data)

        return data

    def processData(self):
        # Transcribe audio and extract entities
        try:
            # Load audio
            audio = whisper.load_audio(self._AudiofileName)
            output = whisper.transcribe(self.model, audio, beam_size=5, best_of=5, temperature=(0.0, 0.2, 0.4, 0.6, 0.8, 1.0),vad=True, language="en", remove_punctuation_from_words=True,refine_whisper_precision=0.6,min_word_duration=0.01)
            #output = whisper.transcribe(self.model, audio, language="en", task='transcribe', temperature=(0.0, 0.2, 0.4, 0.6, 0.8, 1.0), best_of=5, beam_size=5)""
            transcription_text = output['text']
            transcription_text = re.sub(r"\.(?!\S)", " ", transcription_text)
            print("~~~~~~~~~~~~~~~~")
            print(transcription_text)

            #append text
            self._PII_text_and_Timestamp += (transcription_text)+"\n"
            # Run NER with spaCy
            doc = self.nlp(transcription_text)
            entities = [(ent.text, ent.label_) for ent in doc.ents]
            uniqueentities = list(set(entities))
            entity_counts = self.count_entities(entities)

            for entity_type, count in entity_counts.items():
                #append to metrics
                self._Metrics += (entity_type+ " : "+ str(count))+"\n"

            transcribeWithEntities = self.colorize_entities(transcription_text, uniqueentities)

            #append to transcribeWithEntities
            self._Transcribe_Text_With_Entities = transcribeWithEntities

            print(f"Transcription: {transcription_text}")
            print(f"Entities: {entities}")

            # Beepify audio segments containing PII entities
            audio_to_beep = AudioSegment.from_file(self._AudiofileName)

            # Process the audio file to beepify words (remaining unchanged)
            # Extract segments to be beeped
            self.segments_to_beep = []

            pii_Text_TimeStamp = []

            for ent in doc.ents:
                self.segments_to_beep.append((ent.start_char, ent.end_char))
                pii_Text_TimeStamp.append((ent.text,ent.start_char*200,ent.end_char*200))
                print("=======")
                print("ent.text",ent.text)
                print("ent.start",ent.start_char)
                print("ent.end",ent.end_char)

                print(pii_Text_TimeStamp)
            for ent in pii_Text_TimeStamp:
                self._PII_text_and_Timestamp += ("Timestamp: "+str(ent[1]/1000)+ " --- "+str(ent[2]/1000)+" sec")+"\n"
                self._PII_text_and_Timestamp  += ("Text: "+ent[0])+"\n"


            # Convert character offsets to time (assuming 1 character = 20 ms)
            segments_in_ms = [(start*200, end*200) for start, end in self.segments_to_beep]
            print("Segments:", segments_in_ms)



            words_to_beepify =[]

            # append the all text in the doc the words_to_beepify array
            for word in doc.ents:
                # words_to_beepify.append(word.text)
                words_to_beepify.append(word.text.replace('.', ''))

            print(words_to_beepify)

            # New list to store individual words
            individual_words_to_beepify = []

            # Split each phrase into individual words and append to the new list
            for phrase in words_to_beepify:
                individual_words_to_beepify.extend(phrase.split())

            # Remove duplicates by converting the list to a set and then back to a list
            #individual_words_to_beepify = list(set(individual_words_to_beepify))
            individual_words_to_beepify = list(dict.fromkeys(individual_words_to_beepify))

            print(individual_words_to_beepify)

            # Load the beep sound
            beep_sound = AudioSegment.from_file(self._BeepAudiofileName)



            # Iterate over the words array in segment array of the output
            for segment in output["segments"]:
                for word in segment["words"]:

                    # Check if the word is in the list of words to beepify
                    if word["text"] in individual_words_to_beepify:
                        # Get the start and end time of the word
                        print("*******")
                        print(word)

                        start_time = word["start"]
                        end_time = word["end"]

                        # Get the start and end indices of the word
                        start_index = float(start_time * 1000)
                        end_index = float(end_time * 1000 + 100) # Add 100ms buffer

                        # Calculate the duration of the word segment
                        word_duration = (end_index - start_index)
                        print(word_duration)
                        # Create a silent segment with the same duration as the word
                        silent_segment = AudioSegment.silent(duration=word_duration)


                        # Replace the word segment with the silent segment in the original audio
                        audio_to_beep = audio_to_beep[:int(start_index)] + silent_segment + audio_to_beep[int(end_index):]

                        # Get the start and end indices of the beep sound to match the word's duration
                        beep_start_index = 0
                        beep_end_index = word_duration + 200 # Add 200ms
                        #beep_end_index = word_duration


                        # Trim the beep sound to match the word's duration
                        beep_sound = beep_sound[beep_start_index:beep_end_index]

                        """ if word_duration > len(beep_sound):
                            beep_sound = beep_sound + AudioSegment.silent(duration=word_duration - len(beep_sound))
                        else:
                            beep_sound = beep_sound[:word_duration] """

                        #Overlay the beep sound on the silent segment
                        audio_to_beep = audio_to_beep.overlay(beep_sound, position=int(start_index))

            # Save the beeped audio file
            # Get formatted date and time
            formatted_datetime = datetime.now().strftime("%Y_%m_%d_%H_%M_%S_")
            random_filename =   str(formatted_datetime) + secure_filename(Path(self._AudiofileName).name)
            output_path = os.path.join("pii_beep_audio_uploads", f"beeped_audio_{random_filename}")
            os.makedirs("pii_beep_audio_uploads", exist_ok=True)


            audio_to_beep.export(output_path)
            #audio_to_beep.export(output_path, format="wav")
            self._BeepedAudiofileName =output_path

            print(f"Beeped audio file saved at: {output_path}")
            self.callback({
                "PII_text_and_Timestamp": self._Transcribe_Text_With_Entities,
                "Transcribe_Text_With_Entities": self._PII_text_and_Timestamp,
                "Metrics": self._Metrics,
                "Beeped_Audio_Path": self._BeepedAudiofileName
            })
        except Exception as e:
            print(f"An error occurred during transcription: {str(e)}")

In [21]:
# Callback function for Gradio
def start_worker(audio_file_path, model_directory):
    result = {
        "PII_text_and_Timestamp": "Processing...",
        "Transcribe_Text_With_Entities": "Processing...",
        "Metrics": "Processing...",
        "Beeped_Audio_Path": "beep2.mp3"
    }

    def update_result(message):
        if isinstance(message, dict):
            result.update({
                "PII_text_and_Timestamp": str(message.get("PII_text_and_Timestamp")),
                "Transcribe_Text_With_Entities": message.get("Transcribe_Text_With_Entities"),
                "Metrics":  str(message.get('Metrics')),
                "Beeped_Audio_Path":  str(message.get('Beeped_Audio_Path'))

            })
        print("Processing complete.")

    if not audio_file_path or os.stat(audio_file_path).st_size == 0:
        return gr.update(visible=True), "Error: No input provided. Please upload a audio file"

    if not model_directory or os.stat(model_directory).st_size == 0:
        return gr.update(visible=True), "Error: No input provided. Please upload model(.zip)file"


    # Start worker in a separate thread
    worker = Worker(audio_file_path, model_directory, update_result)
    worker.start()

    # Wait for the worker to finish
    worker.join()

    #returning result to called function
    return result["PII_text_and_Timestamp"], result["Transcribe_Text_With_Entities"], result["Metrics"], result["Beeped_Audio_Path"]

In [22]:
def reset():
    return None, None, None, None, None
def get_audio_file_path(audio):
    return audio

def load_model(files):
    if files:
        # Assume the uploaded file is a zip file representing the directory
        zip_file_path = files.name

        # Define a directory to extract the zip
        extract_dir = "extracted_model"

        # Clean the directory if it already exists
        if os.path.exists(extract_dir):
            shutil.rmtree(extract_dir)

        os.makedirs(extract_dir, exist_ok=True)

        # Extract the zip file contents
        with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
            zip_ref.extractall(extract_dir)

        # Debug output: List the contents of the extracted directory
        extracted_files = []
        for root, dirs, files in os.walk(extract_dir):
            for file in files:
                extracted_files.append(os.path.join(root, file))

        print("Extracted files:")
        for file in extracted_files:
            print(file)

        # Determine the base directory inside the extracted directory
        base_dir = None
        for root, dirs, files in os.walk(extract_dir):
            if files and 'meta.json' in files:
                base_dir = root
                break

        # Check if meta.json was found and construct the path
        if base_dir:
            meta_path = os.path.join(base_dir, "meta.json")
            if os.path.exists(meta_path):
                return base_dir
            else:
                directory_message = "Invalid model directory: meta.json not found"
        else:
            directory_message = "Invalid model directory: meta.json not found"

    else:
        directory_message = "No directory selected"

    return directory_message

In [23]:
# Function to load and return the audio file path
def load_audio(beep_audio_file_output):
    if beep_audio_file_output is not None:
        return beep_audio_file_output.name  # Return the path to the uploaded file
    return None

# Function to enable the button based on the text input status
def check_texts(audio_output, model_output_path):
    return gr.update(interactive=bool(audio_output and model_output_path))


In [24]:
# Gradio UI
with gr.Blocks(css="""
    .centered {
        display: flex;
        justify-content: center;
        align-items: center;    }

        .custom-label {
            font-size: 14px;
            font-weight: bold;
            text-align: left;
            height: 100px;
            border: 0px solid black;
        }
""") as demo:

    gr.Markdown("# Speech De-Identification Framework ver-3.0", elem_classes="centered")

    with gr.Column():

        with gr.Row():

            audio_input = gr.Audio(label="Upload Audio File", type="filepath")
            audio_output = gr.Textbox(label="Audio File Path", interactive=False, visible = False)
            audio_input.change(fn=get_audio_file_path, inputs=audio_input, outputs=audio_output)




        # Model directory input (as a zip file)
            model_dir_input = gr.File(label="Select ML Model as zip file", file_count="single")
            model_output_path = gr.Textbox(label="Model Load Status", interactive=False, visible = False)
            model_dir_input.change(fn=load_model, inputs=model_dir_input, outputs=model_output_path)



        with gr.Row():
            gr.Markdown("")
            gr.Markdown("")
            gr.Markdown("")
            gr.Markdown("")
            gr.Markdown("")

            reset_button = gr.Button("Reset")
            submit_button = gr.Button("Submit",interactive = False)


        #enable or dsiable submit button based on file upload status
        audio_output.change(check_texts, inputs=[audio_output, model_output_path], outputs=submit_button)
        model_output_path.change(check_texts, inputs=[audio_output, model_output_path], outputs=submit_button)



        gr.Markdown("### Transcribe Text and Entities:")
        pii_text_output = RichTextbox(show_label=False , interactive=False)
        gr.Markdown("### PII Text and Time Stamps:")
        transcribe_text_output = gr.Textbox(show_label=False , interactive=False)
        gr.Markdown("### Metrics:")
        metrics_output = gr.Textbox(show_label=False , interactive=False)

        with gr.Row():
            # Audio component to display the audio file in the interface
            beep_audio_file_output = gr.File(label="Download Beeped Audio", interactive=False)

            # Audio player component to play the selected audio file
            audio_player = gr.Audio(label="Play Beeped Audio", type="filepath")

            # Automatically update the audio player when the file component changes
            beep_audio_file_output.change(load_audio, inputs=beep_audio_file_output, outputs=audio_player)


    # Event Handlers
    reset_button.click(reset, [], [audio_input, model_dir_input, pii_text_output, transcribe_text_output, metrics_output])
    submit_button.click(start_worker, [audio_output, model_output_path], [pii_text_output, transcribe_text_output, metrics_output,beep_audio_file_output])

In [None]:
demo.launch(inbrowser=True, show_error=True,share = True,debug=True )

In [None]:
#!pip freeze > requirements.txt
!pwd