In [3]:
import os
import subprocess
import tempfile
import json
import uuid
import tkinter as tk
from tkinter import filedialog, messagebox, simpledialog, scrolledtext
from tkinter.simpledialog import askstring
import sounddevice as sd
from scipy.io.wavfile import write
import threading
import numpy as np

import whisper
from deepmultilingualpunctuation import PunctuationModel
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, pipeline

# === Load Models ===
punctuation_model = PunctuationModel()
tokenizer = AutoTokenizer.from_pretrained("google/flan-t5-large")
model = AutoModelForSeq2SeqLM.from_pretrained("google/flan-t5-large")
toxicity_classifier = pipeline("text-classification", model="unitary/toxic-bert")
whisper_model = whisper.load_model("base")  # Try "medium" or "large" for better quality if needed

# === Helper Functions ===

def extract_audio_ffmpeg(input_path, output_path):
    command = [
        "ffmpeg", "-y",
        "-i", input_path,
        "-ac", "1",
        "-ar", "16000",
        "-acodec", "pcm_s16le",
        output_path
    ]
    subprocess.run(command, capture_output=True)

def transcribe_with_whisper(audio_path):
    result = whisper_model.transcribe(audio_path)
    return result["text"]

def clean_and_punctuate_text(text):
    return punctuation_model.restore_punctuation(text)

def split_into_sentences(text):
    return [s.strip() for s in text.split('.') if s.strip()]

def classify_sentence(sentence, labels):
    prompt = f"Classify the following sentence into one of these topics based on their intent and meaning: {', '.join(labels)}.\nSentence: {sentence}\nAnswer:"
    inputs = tokenizer(prompt, return_tensors="pt")
    outputs = model.generate(**inputs, max_new_tokens=20)
    return tokenizer.decode(outputs[0], skip_special_tokens=True).strip()

def detect_toxicity(sentence):
    results = toxicity_classifier(sentence)[0]
    toxic_score = next((r['score'] for r in results if r['label'] == 'toxic'), 0.0)
    return "toxic" if toxic_score >= 0.5 else "non-toxic", toxic_score


def generate_justification(sentence, topic, severity, toxicity, toxicity_score):
    prompt = (
        f'Sentence: "{sentence}"\n\n'
        f'Classify the sentence into a topic, severity level, and toxicity category. '
        f'Then, generate a concise and neutral explanation justifying each label.\n\n'
        f'→ Topic: {topic} | Severity: {severity} | Toxicity: {toxicity} (Score: {toxicity_score})\n\n'
        f'Justification:'
    )
    inputs = tokenizer(prompt, return_tensors="pt", truncation=True)
    outputs = model.generate(**inputs, max_length=100)
    return tokenizer.decode(outputs[0], skip_special_tokens=True).strip()

def redact_sentence(sentence, severity, toxicity_label):
    return "[REDACTED]" if severity in ["Warning", "Critical"] or toxicity_label == "toxic" else sentence


def save_results_to_json(results, output_path):
    with open(output_path, "w", encoding="utf-8") as f:
        json.dump(results, f, indent=2)

# === GUI App ===

class AudioAnalyzerApp:
    def __init__(self, root):
        self.root = root
        self.root.title("Audio/Video Content Analyzer (Whisper Edition)")
        self.root.geometry("850x600")

        self.transcript = ""
        self.results = []
        self.recording = False
        self.audio_data = []


        self.create_widgets()

    def create_widgets(self):
        self.output_box = scrolledtext.ScrolledText(self.root, wrap=tk.WORD, width=100, height=25)
        self.output_box.pack(padx=10, pady=10)

        btn_frame = tk.Frame(self.root)
        btn_frame.pack(pady=5)

        tk.Button(btn_frame, text="Attach Audio/Video File", command=self.choose_file_gui, width=30).grid(row=0, column=0, padx=10)
        tk.Button(btn_frame, text="🎙️ Record Live Audio", command=self.record_audio_live, width=30).grid(row=0, column=1, padx=10)
        tk.Button(btn_frame, text="⏹️ Stop Recording", command=self.stop_recording, width=30).grid(row=0, column=4, padx=10)
        tk.Button(btn_frame, text="Analyze Content", command=self.analyze_text, width=30).grid(row=0, column=2, padx=10)
        tk.Button(btn_frame, text="Save to JSON", command=self.save_to_json, width=30).grid(row=0, column=3, padx=10)

    def choose_file_gui(self):
        self.output_box.delete(1.0, tk.END)
        file_path = filedialog.askopenfilename(title="Select Audio or Video File",
                                               filetypes=[("Media files", "*.mp3 *.wav *.m4a *.mp4 *.mov *.mkv *.avi")])
        if not file_path:
            return

        temp_audio = os.path.join(tempfile.gettempdir(), f"converted_{uuid.uuid4().hex}.wav")

        try:
            self.output_box.insert(tk.END, f"Processing: {file_path}\n")
            extract_audio_ffmpeg(file_path, temp_audio)
            self.output_box.insert(tk.END, f"✅ Audio extracted to: {temp_audio}\n")

            self.transcript = transcribe_with_whisper(temp_audio)
            self.output_box.insert(tk.END, f"📝 Transcript:\n{self.transcript}\n")

        except Exception as e:
            messagebox.showerror("Error", str(e))



    def record_audio_live(self):
        self.output_box.delete(1.0, tk.END)
        self.output_box.insert(tk.END, "\n🎙️ Recording... Press 'Stop Recording' to finish.\n")
        self.root.update()
    
        self.recording = True
        self.audio_data = []
        fs = 16000  # Sample rate
    
        def callback(indata, frames, time, status):
            if self.recording:
                self.audio_data.append(indata.copy())
            else:
                raise sd.CallbackStop()
    
        def start_recording():
            try:
                with sd.InputStream(samplerate=fs, channels=1, dtype='int16', callback=callback):
                    while self.recording:
                        sd.sleep(100)
            except Exception as e:
                messagebox.showerror("Recording Error", str(e))
    
            # Save recorded audio
            if self.audio_data:
                audio_array = np.concatenate(self.audio_data, axis=0)
                temp_audio = os.path.join(tempfile.gettempdir(), f"recording_{uuid.uuid4().hex}.wav")
                write(temp_audio, fs, audio_array)
    
                self.output_box.insert(tk.END, f"🎧 Audio saved: {temp_audio}\n")
                self.transcript = transcribe_with_whisper(temp_audio)
                self.output_box.insert(tk.END, f"📝 Transcript:\n{self.transcript}\n")
    
        threading.Thread(target=start_recording).start()

    def stop_recording(self):
        self.recording = False
        self.output_box.insert(tk.END, "\n⏹️ Recording stopped. Processing...\n")
        self.root.update()



#    def record_audio_live(self):
#        self.output_box.delete(1.0, tk.END)
#        self.output_box.insert(tk.END, "\n🎙️ Recording for 10 seconds...\n")
#        self.root.update()
#
#        fs = 16000  # Sample rate
#        duration = 40  # seconds
#
#        try:
#            recording = sd.rec(int(duration * fs), samplerate=fs, channels=1, dtype='int16')
#            sd.wait()
#
#            temp_audio = os.path.join(tempfile.gettempdir(), f"recording_{uuid.uuid4().hex}.wav")
#            write(temp_audio, fs, recording)
#
#            self.output_box.insert(tk.END, f"🎧 Audio saved: {temp_audio}\n")
#
#            self.transcript = transcribe_with_whisper(temp_audio)
#            self.output_box.insert(tk.END, f"📝 Transcript:\n{self.transcript}\n")
#
#        except Exception as e:
#            messagebox.showerror("Recording Error", str(e))

    def analyze_text(self):
        if not self.transcript:
            messagebox.showinfo("No Transcript", "No transcript found. Please process a file first.")
            return
    
        def get_labels_and_continue():
            label_input = askstring("Topic Labels", "Enter comma-separated sensitive topics (e.g., Mental Health, NDA, Harassment):")
            if not label_input:
                return
            labels = [l.strip() for l in label_input.split(",") if l.strip()]
    
            self.output_box.insert(tk.END, "\n🧠 Analyzing content...\n")
    
            punctuated = clean_and_punctuate_text(self.transcript)
            sentences = split_into_sentences(punctuated)
    
            results = []
    
            for sentence in sentences:
                topic = classify_sentence(sentence, labels)
                tox_label, tox_score = detect_toxicity(sentence)
    
                is_toxic = tox_label == "toxic" and tox_score >= 0.5
                severity = (
                    "Critical" if is_toxic and tox_score >= 0.75 else
                    "Warning" if is_toxic else
                    "Safe"
                )
                redacted = redact_sentence(sentence, severity, "toxic" if is_toxic else "non-toxic")
                justification = generate_justification(sentence, topic, severity, tox_label, tox_score)
    
                results.append({
                    "sentence": sentence,
                    "matched_topic": topic if topic in labels else "Uncertain",
                    "severity": severity,
                    "toxicity": tox_label,
                    "toxicity_score": round(tox_score, 3),
                    "redacted_sentence": redacted,
                    "justification": justification
                })
    
            self.results = results
            self.display_results()
    
        # Schedule the dialog to run on the main thread after the current event completes
        self.root.after(0, get_labels_and_continue)



    def display_results(self):
        self.output_box.insert(tk.END, "\n✅ Classification Results:\n")
        for r in self.results:
            self.output_box.insert(tk.END, f"\n📌 Sentence: {r['sentence']}\n")
            self.output_box.insert(tk.END, f"   → Redacted: {r['redacted_sentence']}\n")
            self.output_box.insert(tk.END, f"   → Topic: {r['matched_topic']} | Severity: {r['severity']}\n")
            self.output_box.insert(tk.END, f"   → Toxicity: {r['toxicity']} (Score: {r['toxicity_score']})\n")
            self.output_box.insert(tk.END, f"   → Justification: {r['justification']}\n")


    def save_to_json(self):
        if not self.results:
            messagebox.showinfo("Nothing to Save", "Run an analysis first.")
            return

        save_path = filedialog.asksaveasfilename(defaultextension=".json",
                                                 filetypes=[("JSON files", "*.json")])
        if not save_path:
            return
        save_results_to_json(self.results, save_path)
        messagebox.showinfo("Saved", f"Results saved to:\n{save_path}")


# === Launch App ===
if __name__ == "__main__":
    root = tk.Tk()
    app = AudioAnalyzerApp(root)
    root.mainloop()


Device set to use cpu
Device set to use cpu


In [2]:
import os