# Qwen3-TTS Voice Clone Demo

This notebook demonstrates how to run Qwen3-TTS Voice Cloning.

In [None]:
# 1. Install Dependencies
# Install system dependencies first (fixes 'sox: not found' errors)
!sudo apt-get update && sudo apt-get install -y sox libsox-dev ffmpeg

!pip install -U qwen-tts
# flash-attn is recommended for performance
!pip install -U flash-attn --no-build-isolation
!pip install pyngrok
!pip install modelscope
!pip install boto3 requests beautifulsoup4 pysbd


In [None]:
# 2. Imports
import torch
import soundfile as sf
from IPython.display import Audio
from qwen_tts import Qwen3TTSModel
import os
import threading
import time

In [None]:
# 3. Load Model (Voice Clone Base 1.7B)
print("Loading Model...")
model = Qwen3TTSModel.from_pretrained(
    "Qwen/Qwen3-TTS-12Hz-1.7B-Base",
    device_map="cuda:0",
    dtype=torch.bfloat16,
    attn_implementation="flash_attention_2",
)
print("Model loaded.")

In [None]:
# 5. Blog Scraper (Updated for Headers & Structure)
import requests
from bs4 import BeautifulSoup
import re
import pysbd

TARGET_URL = "https://www.hung-truong.com/blog/" 
# Or set a specific post URL: 
# TARGET_URL = "https://www.hung-truong.com/blog/2026/01/22/how-low-can-wegovy/"

def get_latest_post_url(index_url):
    try:
        response = requests.get(index_url)
        response.raise_for_status()
        soup = BeautifulSoup(response.text, 'html.parser')
        # Find the first link that looks like a blog post
        for a in soup.find_all('a', href=True):
            href = a['href']
            if re.match(r'.*/blog/\d{4}/\d{2}/\d{2}/.+', href):
                if href.startswith("/"):
                    return "https://www.hung-truong.com" + href
                return href
    except Exception as e:
        print(f"Error fetching index: {e}")
        return None
    return None

# Determine actual URL
if "index.html" in TARGET_URL or TARGET_URL.endswith("/blog/") or TARGET_URL == "https://www.hung-truong.com/blog":
    print("Searching for latest post...")
    latest_url = get_latest_post_url(TARGET_URL)
    if latest_url:
        print(f"Found latest post: {latest_url}")
        TARGET_URL = latest_url
    else:
        print("Could not find latest post link. Using original URL.")

print(f"Scraping: {TARGET_URL}")

# Global variables to store scraped data
scraped_data = []
scraped_filename = None

# Extract filename from URL (Always extracted first)
clean_url = TARGET_URL.rstrip('/')
scraped_filename = clean_url.split('/')[-1]
print(f"Target Base Filename: {scraped_filename}")

# Scrape Content
try:
    response = requests.get(TARGET_URL)
    response.raise_for_status()
    soup = BeautifulSoup(response.text, 'html.parser')
    
    content_div = soup.find('div', class_='content')
    
    if content_div:
        # Initialize PySBD Segmenter
        seg = pysbd.Segmenter(language="en", clean=False)
        
        # A. Remove Code Blocks (Keep figcaption if you want, but user said NO)
        for unwanted in content_div.find_all(['figcaption', 'pre']):
            unwanted.decompose()
            
        # B. Extract Title
        title_tag = soup.find('h1', class_='postTitle')
        title_text = None
        if title_tag:
            title_text = title_tag.get_text().strip()
            print(f"Title: {title_text}")
            scraped_data.append({'text': title_text, 'type': 'header'})
        
        # C. Extract Date
        meta_tag = soup.find('p', class_='meta')
        if meta_tag:
            meta_text = meta_tag.get_text().strip()
            if "|" in meta_text:
                date_text = meta_text.split('|')[0].strip()
                scraped_data.append({'text': f"Published on {date_text}.", 'type': 'paragraph_end'})
            else:
                scraped_data.append({'text': f"Published on {meta_text}.", 'type': 'paragraph_end'})

        # D. Extract Text and Headers in Order
        # We include li and blockquote to avoid missing content.
        tags_to_find = ['p', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'li', 'blockquote']
        all_elements = content_div.find_all(tags_to_find)
        # Filter out elements that are nested within other tags_to_find to avoid duplicates
        elements = [el for el in all_elements if not any(parent in all_elements for parent in el.parents)]
        
        for el in elements:
            if 'meta' in el.get('class', []):
                continue
            
            # Use separator=' ' and normalize whitespace to fix "broken" sentences caused by newlines or tags
            text = el.get_text(separator=' ', strip=True)
            text = re.sub(r'\s+', ' ', text)
            if not text: continue

            if el.name.startswith('h'):
                if "Leave a Comment" in text:
                    print("Reached comment section. Stopping.")
                    break
                if title_text and text == title_text:
                    continue
                scraped_data.append({'text': text, 'type': 'header'})
            else:
                sentences = seg.segment(text)
                processed_sentences = []
                current_sentence = ""
                for sent in sentences:
                    sent = sent.strip()
                    if not sent: continue
                    if current_sentence: current_sentence += " " + sent
                    else: current_sentence = sent
                    if len(current_sentence.split()) >= 3:
                        processed_sentences.append(current_sentence)
                        current_sentence = ""
                if current_sentence:
                    if processed_sentences: processed_sentences[-1] += " " + current_sentence
                    else: processed_sentences.append(current_sentence)

                for i, sent in enumerate(processed_sentences):
                    if i == len(processed_sentences) - 1:
                        scraped_data.append({'text': sent, 'type': 'paragraph_end'})
                    else:
                        scraped_data.append({'text': sent, 'type': 'sentence'})
        
        print(f"Scraped {len(scraped_data)} segments.")
    else:
        print("❌ Could not find <div class='content'>")
        
except Exception as e:
    print(f"❌ Scraping Error: {e}")


In [None]:
from kaggle_secrets import UserSecretsClient
user_secrets = UserSecretsClient()
# 6. Batch Generation & S3 Upload (MP3 & VTT)
import requests
import boto3
import os
import numpy as np
import soundfile as sf
import subprocess
from IPython.display import Audio
from datetime import datetime

# --- CONFIGURATION ---
S3_ACCESS_KEY = user_secrets.get_secret("S3_ACCESS_KEY")
S3_BUCKET_NAME = user_secrets.get_secret("S3_BUCKET_NAME")
S3_ENDPOINT_URL = user_secrets.get_secret("S3_ENDPOINT_URL")
S3_SECRET_KEY = user_secrets.get_secret("S3_SECRET_KEY")

# GitHub Dispatch Config
GITHUB_REPO = "hungtruong/jekyll-blog"
GITHUB_TOKEN = user_secrets.get_secret("GITHUB_TOKEN") # Needs 'repo' scope
# You might want to get this from a secret if it's a private storage
PUBLIC_URL_BASE = "https://pub-2289fc0aae4245debaa2fd741bdf5605.r2.dev/blogaudio/"

# Input Config
lines_to_process = []
if 'scraped_data' in locals() and scraped_data:
    print(f"Using {len(scraped_data)} items from scraper.")
    lines_to_process = scraped_data
else:
    # Fallback for manual text URL
    TEXT_FILE_URL = "https://gist.githubusercontent.com/hungtruong/5a6f3a7d835784a2f8e6bf9120272f8e/raw/5f567eddd7b09089186aa07c393544b96d65031c/blog.txt"
    print(f"Scraped data not found. Fetching from URL: {TEXT_FILE_URL}")
    try:
        response = requests.get(TEXT_FILE_URL)
        response.raise_for_status()
        text_content = response.text
        # Convert plain text to simple sentence structure
        raw_lines = [line.strip() for line in text_content.split('\n') if line.strip()]
        lines_to_process = [{'text': l, 'type': 'sentence'} for l in raw_lines]
    except Exception as e:
        print(f"Error fetching text file: {e}")
        lines_to_process = []

# Output Filenames
if 'scraped_filename' in locals() and scraped_filename:
    BASE_FILENAME = scraped_filename
    print(f"Using base filename: {BASE_FILENAME}")
else:
    BASE_FILENAME = f"generated_audio_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
    print(f"Using default filename: {BASE_FILENAME}")

OUTPUT_FILENAME_MP3 = f"{BASE_FILENAME}.mp3"
OUTPUT_FILENAME_VTT = f"{BASE_FILENAME}.vtt"

# Reference Audio
local_ref_audio = "/kaggle/input/voice-cloning-dataset/longblog2.wav"
ref_text_content = "I was at Whole Foods today getting some groceries when I came across this mini food testing area at the end of an aisle. There were two nice sales people (one lady and one dude) who were hawking cereal. The type of cereal was super organic and it came in a pouch. The lady bragged that all of the ingredients were on the front of the bag in large type. The cereal was available for testing in cereal form, baked into a cookie, and blended into a smoothie (which was apparently made with apple cider and yogurt or something). Sidenote: While I was deciding what to taste test (I eventually went with the smoothie and it was not bad, and followed up with a chunk of cookie), an old Asian lady walked up to me and started talking in Chinese. I tried to tell her that I don’t really speak Chinese, but I forgot how to say “I don’t know Chinese” in Chinese. It’s kind of absurd, anyway, to say you don’t speak a language in that very language you’re saying you don't speak. Anyway, she mumbled some more stuff and then said “Chinese.” Like, yeah, lady, we're both Chinese. I guess she walked away after that. So anyway, here's the real part of the story. I'm tasting the cookie and am about to leave when another woman walks up to the food tasting area. The sales guy asks if she wants to buy some cereal and she's like “oh, I already have some at home! I love it! I'm just going to have some samples.”"

# Helper for VTT Time
def format_vtt_time(seconds):
    m, s = divmod(seconds, 60)
    h, m = divmod(m, 60)
    return f"{int(h):02d}:{int(m):02d}:{s:06.3f}"

def get_audio_duration(filename):
    try:
        result = subprocess.run(
            ["ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", filename],
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT,
            text=True
        )
        return float(result.stdout.strip())
    except Exception as e:
        print(f"Error checking duration: {e}")
        return 0.0

# --- EXECUTION ---
if not lines_to_process:
    print("❌ No lines to process. Aborting.")
elif S3_ACCESS_KEY == "YOUR_ACCESS_KEY":
    print("⚠️ PLEASE SET YOUR S3 CONFIGURATIONS IN SECRETS ⚠️")
else:
    print(f"Processing {len(lines_to_process)} items...")
    try:
        all_wavs = []
        vtt_lines = ["WEBVTT\n"]
        total_samples = 0 # Track samples for precision
        
        for i, item in enumerate(lines_to_process):
            text = item.get('text', '').strip()
            item_type = item.get('type', 'sentence')
            
            if not text or len(text) < 2: continue
            
            if i % 5 == 0:
                print(f"Progress: {i}/{len(lines_to_process)} - Current Logical Duration: {total_samples / 24000:.2f}s") # 24k is placeholder if sr unknown
            
            # Generate Audio
            wavs, sr = model.generate_voice_clone(
                text=text,
                language="English",
                ref_audio=local_ref_audio,
                ref_text=ref_text_content,
            )
            audio_chunk = wavs[0]
            
            # VTT Timestamp
            start_time_str = format_vtt_time(total_samples / sr)
            total_samples += len(audio_chunk)
            end_time_str = format_vtt_time(total_samples / sr)
            
            vtt_lines.append(f"{start_time_str} --> {end_time_str}")
            vtt_lines.append(f"{text}\n")
            
            all_wavs.append(audio_chunk)
            
            # Silence Logic
            if item_type == 'header':
                silence_dur = 1.5
            elif item_type == 'paragraph_end':
                silence_dur = 1.0
            else: # sentence
                silence_dur = 0.5
                
            silence_samples = int(silence_dur * sr)
            all_wavs.append(np.zeros(silence_samples, dtype=np.float32))
            total_samples += silence_samples
            
        if all_wavs:
            # 1. Save Temp WAV
            temp_wav = "temp_output.wav"
            final_wav = np.concatenate(all_wavs)
            
            final_duration_wav = len(final_wav) / sr
            sf.write(temp_wav, final_wav, sr)
            print(f"WAV saved. Samples: {len(final_wav)}, Duration: {final_duration_wav:.3f}s")
            
            # 2. Convert to MP3
            print("Converting to MP3...")
            # Use -map_metadata -1 to avoid tag-related shifts
            !ffmpeg -y -i {temp_wav} -codec:a libmp3lame -qscale:a 2 -map_metadata -1 {OUTPUT_FILENAME_MP3}
            
            # 3. Validation
            final_duration_mp3 = get_audio_duration(OUTPUT_FILENAME_MP3)
            drift = final_duration_mp3 - final_duration_wav
            print(f"Encoded MP3 Duration: {final_duration_mp3:.3f}s")
            print(f"Total Drift (MP3 vs WAV): {drift:.4f}s")
            
            if abs(drift) > 0.1:
                print(f"⚠️ Warning: Significant drift detected ({drift:.4f}s). This may be due to MP3 encoder padding.")

            # 4. Save VTT
            print("Saving VTT...")
            with open(OUTPUT_FILENAME_VTT, "w", encoding="utf-8") as f:
                f.write("\n".join(vtt_lines))
                
            # 5. Upload Both
            print(f"Uploading to S3 bucket: {S3_BUCKET_NAME}...")
            s3 = boto3.client(
                's3',
                endpoint_url=S3_ENDPOINT_URL,
                aws_access_key_id=S3_ACCESS_KEY,
                aws_secret_access_key=S3_SECRET_KEY
            )
            
            s3_key_mp3 = os.path.basename(OUTPUT_FILENAME_MP3)
            s3_key_vtt = os.path.basename(OUTPUT_FILENAME_VTT)
            
            with open(OUTPUT_FILENAME_MP3, "rb") as f:
                s3.upload_fileobj(f, S3_BUCKET_NAME, s3_key_mp3)
            print(f"✅ Uploaded: {s3_key_mp3}")
            
            with open(OUTPUT_FILENAME_VTT, "rb") as f:
                s3.upload_fileobj(f, S3_BUCKET_NAME, s3_key_vtt)
            print(f"✅ Uploaded: {s3_key_vtt}")

            # 6. GitHub Dispatch
            if GITHUB_TOKEN and GITHUB_TOKEN != "YOUR_GITHUB_PERSONAL_ACCESS_TOKEN":
                print(f"Triggering GitHub Workflow for {GITHUB_REPO}...")
                dispatch_url = f"https://api.github.com/repos/{GITHUB_REPO}/dispatches"
                headers = {
                    "Accept": "application/vnd.github.v3+json",
                    "Authorization": f"token {GITHUB_TOKEN}"
                }
                payload = {
                    "event_type": "audio-ready",
                    "client_payload": {
                        "slug": BASE_FILENAME,
                        "mp3_url": f"{PUBLIC_URL_BASE}{s3_key_mp3}",
                        "vtt_url": f"{PUBLIC_URL_BASE}{s3_key_vtt}"
                    }
                }
                dispatch_response = requests.post(dispatch_url, headers=headers, json=payload)
                print(f"GitHub Dispatch Status Code: {dispatch_response.status_code}")
                if dispatch_response.status_code == 204:
                    print("✅ GitHub Workflow triggered successfully!")
                else:
                    print(f"❌ GitHub Dispatch Failed: {dispatch_response.text}")
            else:
                print("⏭️ Skipping GitHub Dispatch: GITHUB_TOKEN not set or placeholder.")
            
    except Exception as e:
        print(f"❌ Error: {e}")
