<a href="https://colab.research.google.com/github/galencky/whisper-stt-project/blob/main/Whisper_STT_project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [10]:
# 1) Mount your Google Drive
from google.colab import drive
drive.mount('/content/drive')

# 2) Create (if it doesn't exist) a subfolder named "Whisper-STT-project"
import os

PROJECT_SUBFOLDER = "/content/drive/MyDrive/Whisper-STT-project"
os.makedirs(PROJECT_SUBFOLDER, exist_ok=True)
print(f"✅ Project folder ready at: {PROJECT_SUBFOLDER}")

# 3) Install dependencies via pip (use the PyPI release of google-generativeai)
!pip install --upgrade openai-whisper tqdm google-generativeai requests


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
✅ Project folder ready at: /content/drive/MyDrive/Whisper-STT-project


In [4]:
import os
from pathlib import Path

# ─── Adjust this if your Drive folder is named differently ───
PROJECT_SUBFOLDER = "/content/drive/MyDrive/Whisper-STT-project"

BASE_DIR        = Path(PROJECT_SUBFOLDER)
INBOX_DIR       = BASE_DIR / "inbox"
PROCESSED_DIR   = BASE_DIR / "processed"
TRANSCRIPTS_DIR = BASE_DIR / "transcripts"
#MODEL_CACHE_DIR = BASE_DIR / "models"
PARSED_DIR      = BASE_DIR / "parsed"
MARKDOWN_DIR    = BASE_DIR / "markdown"
UPLOADED_DIR    = BASE_DIR / "uploaded"

# Create all folders if they don't exist
for folder in (
    BASE_DIR,
    INBOX_DIR,
    PROCESSED_DIR,
    TRANSCRIPTS_DIR,
#    MODEL_CACHE_DIR,
    PARSED_DIR,
    MARKDOWN_DIR,
    UPLOADED_DIR,
):
    folder.mkdir(parents=True, exist_ok=True)

print("✅ Folder structure under Drive is ready:")
print(f"  BASE_DIR       = {BASE_DIR}")
print(f"  INBOX_DIR      = {INBOX_DIR}")
print(f"  PROCESSED_DIR  = {PROCESSED_DIR}")
print(f"  TRANSCRIPTS_DIR= {TRANSCRIPTS_DIR}")
#print(f"  MODEL_CACHE_DIR= {MODEL_CACHE_DIR}")
print(f"  PARSED_DIR     = {PARSED_DIR}")
print(f"  MARKDOWN_DIR   = {MARKDOWN_DIR}")
print(f"  UPLOADED_DIR   = {UPLOADED_DIR}")


✅ Folder structure under Drive is ready:
  BASE_DIR       = /content/drive/MyDrive/Whisper-STT-project
  INBOX_DIR      = /content/drive/MyDrive/Whisper-STT-project/inbox
  PROCESSED_DIR  = /content/drive/MyDrive/Whisper-STT-project/processed
  TRANSCRIPTS_DIR= /content/drive/MyDrive/Whisper-STT-project/transcripts
  PARSED_DIR     = /content/drive/MyDrive/Whisper-STT-project/parsed
  MARKDOWN_DIR   = /content/drive/MyDrive/Whisper-STT-project/markdown
  UPLOADED_DIR   = /content/drive/MyDrive/Whisper-STT-project/uploaded


In [1]:
import datetime, shutil, sys, time
from pathlib import Path
from tqdm import tqdm
import whisper

AUDIO_EXT = {'.wav', '.mp3', '.m4a', '.flac', '.ogg', '.webm'}

def _now():
    """Log-friendly timestamp."""
    return datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")

def _fmt_ts(seconds: float) -> str:
    """float seconds → HH:MM:SS.mmm string."""
    h, m = divmod(int(seconds), 3600)
    m, s = divmod(m, 60)
    ms = int((seconds - int(seconds)) * 1000)
    return f"{h:02d}:{m:02d}:{s:02d}.{ms:03d}"

def save_transcript(result: dict, out_path: Path):
    """
    Write Whisper's segment list to a .txt file.
    Each line: [start → end] text
    """
    with open(out_path, "w", encoding="utf-8") as f:
        for seg in result["segments"]:
            f.write(f"[{_fmt_ts(seg['start'])} → {_fmt_ts(seg['end'])}] "
                    f"{seg['text'].strip()}\n")

# ─── Load Whisper Model (no persistent cache) ──────────────────────────
start = time.time()
print(f"[{_now()}] Loading Whisper large-v3 …")
model = whisper.load_model("large-v3")  # no download_root → uses default HF cache in /root/.cache/
print(f"[{_now()}] Model ready (took {time.time() - start:.1f} s)\n")

[2025-05-31 03:12:05] Loading Whisper large-v3 …


100%|█████████████████████████████████████| 2.88G/2.88G [00:39<00:00, 77.9MiB/s]


[2025-05-31 03:13:24] Model ready (took 79.5 s)



In [5]:
# ─── Set a preferred language if you want (or set to None for autodetect) ───
PREFERRED_LANGUAGE = "zh"  # or None to let Whisper auto-detect

if PREFERRED_LANGUAGE:
    print(f"🌐 Preferred language set to '{PREFERRED_LANGUAGE}'")
else:
    print("🌐 Using Whisper's automatic language detection")

# ─── Gather all audio files from INBOX_DIR ─────────────────────────────
audio_files = [p for p in INBOX_DIR.iterdir() if p.suffix.lower() in AUDIO_EXT]
total = len(audio_files)

if total == 0:
    print("📂 No audio in inbox. Drop files there and re-run this cell.")
else:
    for idx, audio in enumerate(audio_files, 1):
        print(f"\n[{_now()}] ▶️  ({idx}/{total})  {audio.name}")

        # Build Whisper kwargs
        kwargs = dict(word_timestamps=True, verbose=True)
        if PREFERRED_LANGUAGE:
            kwargs["language"] = PREFERRED_LANGUAGE
            print(f"[{_now()}] 🔧 Forcing language = '{PREFERRED_LANGUAGE}'")
        else:
            print(f"[{_now()}] 🔍 Auto language detection")

        # Transcribe – this prints segments as it goes
        result = model.transcribe(str(audio), **kwargs)

        # Save transcript to TRANSCRIPTS_DIR
        out_txt = TRANSCRIPTS_DIR / f"{audio.stem}.txt"
        save_transcript(result, out_txt)
        print(f"[{_now()}] 📝 Saved transcript → {out_txt.name}")

        # Move original audio into PROCESSED_DIR
        shutil.move(str(audio), PROCESSED_DIR / audio.name)
        print(f"[{_now()}] ✔️ Moved audio to processed/")

    print("\n🎉 All transcription jobs finished!")

🌐 Preferred language set to 'zh'

[2025-05-31 03:13:59] ▶️  (1/2)  錄製 (5).m4a
[2025-05-31 03:13:59] 🔧 Forcing language = 'zh'




[00:00.000 --> 00:02.020] 錄音測試
[00:02.020 --> 00:03.340] 錄音測試
[00:04.000 --> 00:05.660] 123 123
[00:06.360 --> 00:07.840] 錄音測試
[2025-05-31 03:16:05] 📝 Saved transcript → 錄製 (5).txt
[2025-05-31 03:16:05] ✔️ Moved audio to processed/

[2025-05-31 03:16:05] ▶️  (2/2)  錄製 (6).m4a
[2025-05-31 03:16:05] 🔧 Forcing language = 'zh'




[00:00.000 --> 00:07.000] Audio record test, audio record test, test, 1, 2, 3, test, 1, 2, 3
[2025-05-31 03:17:57] 📝 Saved transcript → 錄製 (6).txt
[2025-05-31 03:17:57] ✔️ Moved audio to processed/

🎉 All transcription jobs finished!


In [6]:
import re
from datetime import timedelta

# Regex to match lines like: [HH:MM:SS.mmm → HH:MM:SS.mmm] text
timestamp_pattern = re.compile(
    r"\[(\d{2}:\d{2}:\d{2}\.\d{3})\s*→\s*(\d{2}:\d{2}:\d{2}\.\d{3})\]\s*(.*)"
)

def parse_time(s: str) -> timedelta:
    """Convert 'HH:MM:SS.mmm' → timedelta."""
    h, m, rest = s.split(":")
    s_part, ms = rest.split(".")
    return timedelta(hours=int(h), minutes=int(m),
                     seconds=int(s_part), milliseconds=int(ms))

def process_transcript(text: str) -> str:
    """
    Group segments every 5 minutes. Output format:
      [HH:MM:SS.mmm]
      <concatenated text for that 5-minute chunk>

    Blank line between chunks.
    """
    lines = text.splitlines()
    segments = []
    for line in lines:
        m = timestamp_pattern.match(line)
        if m:
            start_ts, end_ts, content = m.groups()
            segments.append((parse_time(start_ts), start_ts, content.strip()))

    if not segments:
        return ""  # no timed segments found

    result = []
    buffer = ""
    last_mark_minute = None

    for ts, start_ts_str, content in segments:
        curr_minute = int(ts.total_seconds() // 300)  # chunk index (every 300 sec)
        if last_mark_minute is None or curr_minute != last_mark_minute:
            if buffer:
                result.append(buffer.strip())
                buffer = ""
            result.append(f"[{start_ts_str}]")
            last_mark_minute = curr_minute
        buffer += content + " "

    if buffer:
        result.append(buffer.strip())

    # Combine into blocks of 3 lines: timestamp, paragraph, blank line
    output = []
    for i in range(0, len(result), 2):
        if i + 1 < len(result):
            output.append(result[i])   # timestamp line
            output.append(result[i+1]) # text for that block
            output.append("")          # blank line

    return "\n".join(output).strip()

# ─── Batch processing: read every .txt in TRANSCRIPTS_DIR ─────────────
for txtfile in TRANSCRIPTS_DIR.glob("*.txt"):
    with txtfile.open(encoding="utf-8") as f:
        text = f.read()

    processed = process_transcript(text)
    out_path = PARSED_DIR / txtfile.name.replace(".txt", "_parsed.txt")

    with out_path.open("w", encoding="utf-8") as f:
        f.write(processed)
    print(f"🔧 Processed {txtfile.name} → {out_path.name}")

print("\n✅ All transcripts parsed.")


🔧 Processed 錄製 (5).txt → 錄製 (5)_parsed.txt
🔧 Processed 錄製 (6).txt → 錄製 (6)_parsed.txt

✅ All transcripts parsed.


In [7]:
# ─── System prompt remains unchanged ───────────────────────────────────
SYSTEM_PROMPT = """
## System Prompt

You are tasked with summarizing a speech provided in its original language. Create a concise, structured summary using clear and informative markdown formatting. Follow the outline and format precisely. You may use markdown tables, bullet points, paragraphs, or a combination as appropriate.

## Summary Structure

### Title

Provide a concise, relevant title reflecting the key theme or message of the speech.
Please make sure the title starts with a # to be recognized as title.

# Title

### Speaker

* **Name**: \\[Speaker's Name]
* **Affiliation/Role**: \\[Speaker’s Affiliation or Role, if known]
* **Event**: \\[Event or occasion where the speech was given, if applicable]
* **Date**: \\[Date of the speech, if available]

### Overview

Provide a short paragraph summarizing the overall purpose and main points of the speech.

### Key Points

Summarize each major point clearly. You may use markdown tables, bullet points, or paragraphs as needed:

* **Key Point 1**: Brief description with supporting details.
* **Key Point 2**: Brief description with supporting details.
* Additional points as necessary.

Or alternatively, use a markdown table.

### Notable Quotes

Include one or two significant quotes from the speech, if available, highlighting central themes or key statements made by the speaker:

* *"Quote 1..."*
* *"Quote 2..."*

### Audience Reaction

Briefly describe audience reactions, if mentioned or apparent (e.g., applause, questions raised, notable silence).

### Conclusion

Summarize briefly how the speaker concluded their speech and highlight any key takeaway messages.

---

Ensure clarity, accuracy, and conciseness in the summary, preserving essential context and meaning.
Please summarize using the native language of the speech.
"""

# ─── Retrieve Gemini API key from Colab secrets ─────────────────────────
from google.colab import userdata
import google.generativeai as genai

api_key = userdata.get('GEMINI_API_KEY')
if api_key is None:
    raise ValueError("GEMINI_API_KEY not found in Colab secrets! Add it via Settings → Secrets.")

genai.configure(api_key=api_key)


In [8]:
from pathlib import Path

def generate_summary_with_gemini(speech_text: str, system_prompt: str) -> str:
    """
    Sends full_prompt = (system_prompt + speech_text) to Gemini-2.5-Flash.
    Returns Gemini's textual response (Markdown).
    """
    model = genai.GenerativeModel("gemini-2.5-flash-preview-05-20")
    full_prompt = system_prompt.strip() + "\n\n" + speech_text.strip()
    try:
        response = model.generate_content(
            full_prompt,
            generation_config=genai.types.GenerationConfig(temperature=0.5),
            stream=False,
        )
        return response.text
    except Exception as e:
        print(f"[ERROR] Gemini API error: {e}")
        return None

def process_all_txt_files(parsed_dir: Path, markdown_dir: Path, system_prompt: str):
    """
    For every .txt in parsed_dir, generate a summary via Gemini and save .md in markdown_dir.
    """
    parsed_dir = Path(parsed_dir)
    markdown_dir = Path(markdown_dir)
    markdown_dir.mkdir(parents=True, exist_ok=True)

    txt_files = list(parsed_dir.glob("*.txt"))
    print(f"[DEBUG] Found {len(txt_files)} .txt files in {parsed_dir}")

    for txt_path in txt_files:
        print(f"\n[DEBUG] Processing: {txt_path.name}")
        try:
            with open(txt_path, "r", encoding="utf-8") as f:
                speech_text = f.read().strip()
        except Exception as e:
            print(f"[ERROR] Could not read {txt_path}: {e}")
            continue

        if not speech_text:
            print(f"[WARNING] {txt_path.name} is empty, skipping.")
            continue

        summary_md = generate_summary_with_gemini(speech_text, system_prompt)
        if summary_md is None:
            print(f"[ERROR] Gemini API failed for {txt_path.name}, skipping.")
            continue

        md_path = markdown_dir / (txt_path.stem + ".md")
        try:
            with open(md_path, "w", encoding="utf-8") as f:
                f.write(summary_md)
            print(f"[INFO] Saved summary → {md_path.name}")
        except Exception as e:
            print(f"[ERROR] Could not save {md_path}: {e}")

# ─── Run the batch summarization ────────────────────────────────────────
process_all_txt_files(PARSED_DIR, MARKDOWN_DIR, SYSTEM_PROMPT)
print("\n✅ All summaries generated.")


[DEBUG] Found 2 .txt files in /content/drive/MyDrive/Whisper-STT-project/parsed

[DEBUG] Processing: 錄製 (5)_parsed.txt
[INFO] Saved summary → 錄製 (5)_parsed.md

[DEBUG] Processing: 錄製 (6)_parsed.txt
[INFO] Saved summary → 錄製 (6)_parsed.md

✅ All summaries generated.


In [9]:
import requests
import shutil

# ─── Retrieve HackMD token from Colab secrets ───────────────────────────
hackmd_token = userdata.get('HACKMD_TOKEN')
if hackmd_token is None:
    raise ValueError("HACKMD_TOKEN not found in Colab secrets! Add it via Settings → Secrets.")

def upload_to_hackmd(md_content: str, filename: str, api_token: str) -> dict:
    """
    Uploads a single markdown string to HackMD. Returns {"title":..., "url":...} on success.
    """
    # Derive a clean title from the filename
    if filename.endswith('.md'):
        filename = filename[:-3]
    raw_title = filename.replace('_parsed', '').strip()
    title = raw_title.replace('_', ' ').strip()

    # Ensure there's a top-level heading
    md_lines = md_content.lstrip().splitlines()
    if not md_lines or not md_lines[0].strip().startswith("# "):
        md_content = f"# {title}\n\n" + md_content.lstrip()
    else:
        md_lines[0] = f"# {title}"
        md_content = "\n".join(md_lines)

    # Append hashtag if missing
    hashtag = "#whisper-stt-project"
    content_lines = md_content.rstrip().splitlines()
    if not any(line.strip() == hashtag for line in content_lines[-3:]):
        md_content = md_content.rstrip() + "\n\n" + hashtag + "\n"

    url = "https://api.hackmd.io/v1/notes"
    headers = {
        "Authorization": f"Bearer {api_token}",
        "Content-Type": "application/json"
    }
    data = {
        "title": title,
        "content": md_content,
        "readPermission": "guest",
        "writePermission": "signed_in"
    }
    response = requests.post(url, headers=headers, json=data)
    if response.ok:
        note_id = response.json().get("id")
        shared_url = f"https://hackmd.io/{note_id}"
        print(f"[INFO] Uploaded to HackMD: {shared_url}")
        return {"title": title, "url": shared_url}
    else:
        print(f"[ERROR] HackMD upload failed for {filename}: {response.status_code} {response.text}")
        return None

def batch_upload_markdown_and_move(markdown_dir: Path, uploaded_dir: Path, hackmd_token: str) -> list:
    """
    For each .md in markdown_dir: upload via upload_to_hackmd → move file to uploaded_dir.
    Returns list of {"title":..., "url":...}.
    """
    markdown_dir = Path(markdown_dir)
    uploaded_dir = Path(uploaded_dir)
    uploaded_dir.mkdir(parents=True, exist_ok=True)

    md_files = list(markdown_dir.glob("*.md"))
    print(f"[DEBUG] Found {len(md_files)} markdown files to upload.")

    shared_links = []
    for md_file in md_files:
        print(f"[DEBUG] Processing: {md_file.name}")
        try:
            with open(md_file, "r", encoding="utf-8") as f:
                md_content = f.read()
        except Exception as e:
            print(f"[ERROR] Could not read {md_file.name}: {e}")
            continue

        result = upload_to_hackmd(md_content, md_file.name, hackmd_token)
        if result:
            shared_links.append(result)
            dest_file = uploaded_dir / md_file.name
            try:
                shutil.move(str(md_file), dest_file)
                print(f"[INFO] Moved {md_file.name} → {dest_file}")
            except Exception as e:
                print(f"[ERROR] Failed to move {md_file.name}: {e}")
    return shared_links

# ─── Run HackMD upload and collect shared links ─────────────────────────
shared_links = batch_upload_markdown_and_move(MARKDOWN_DIR, UPLOADED_DIR, hackmd_token)
print("\n✅ All markdown files uploaded to HackMD.")


[DEBUG] Found 2 markdown files to upload.
[DEBUG] Processing: 錄製 (5)_parsed.md
[INFO] Uploaded to HackMD: https://hackmd.io/S0MSTpwkSrWeZYie42EOWw
[INFO] Moved 錄製 (5)_parsed.md → /content/drive/MyDrive/Whisper-STT-project/uploaded/錄製 (5)_parsed.md
[DEBUG] Processing: 錄製 (6)_parsed.md
[INFO] Uploaded to HackMD: https://hackmd.io/W_P6WdSRTsuJOU84wcOfVA
[INFO] Moved 錄製 (6)_parsed.md → /content/drive/MyDrive/Whisper-STT-project/uploaded/錄製 (6)_parsed.md

✅ All markdown files uploaded to HackMD.


In [10]:
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.header import Header

# ─── Retrieve email credentials from Colab secrets ────────────────────
email_user = userdata.get('EMAIL_USER')  # e.g. "youremail@gmail.com"
email_pass = userdata.get('EMAIL_PASS')  # application-specific password or OAUTH token
email_to   = userdata.get('EMAIL_TO')    # comma-separated list or single recipient

if not (email_user and email_pass and email_to):
    raise ValueError("EMAIL_USER, EMAIL_PASS, or EMAIL_TO not found in Colab secrets!")

subject = "📝 Your Uploaded HackMD Speech Summaries"

# Build email body lines
body_lines = [
    "Hello,",
    "",
    "Your audio files have been automatically transcribed using Whisper AI,",
    "and the speech content was summarized using Gemini Flash 2.5.",
    "",
    "Here are the links to your uploaded speech summaries on HackMD:",
    ""
]
for link in shared_links:
    body_lines.append(f"- {link['title']}: {link['url']}")
body_lines += [
    "",
    "All documents are shared and accessible to anyone with the link.",
    "",
    "If you have any questions or encounter any problems, feel free to reply to this email!",
    "",
    "Best regards,",
    "Whisper-STT-Project Bot"
]
body = "\n".join(body_lines)

# Compose the email
msg = MIMEMultipart()
msg['From'] = email_user
msg['To']   = email_to
msg['Subject'] = Header(subject, 'utf-8')
msg.attach(MIMEText(body, 'plain', 'utf-8'))

# Use Gmail's SMTP (SSL) on port 465
with smtplib.SMTP_SSL('smtp.gmail.com', 465) as server:
    server.login(email_user, email_pass)
    server.send_message(msg)
    print("[INFO] Email sent successfully.")


[INFO] Email sent successfully.
