# Libraries

In [1]:
!pip -q install --upgrade openai python-dotenv

In [20]:
import os, getpass, json, io, math, json, tempfile
from openai import OpenAI
from google.colab import userdata
from google.colab import files
from pydub import AudioSegment
from datetime import timedelta

# OpenAI key

In [4]:
open_ai_key = userdata.get('open_ai_key')

In [6]:
# Your key is already in a variable called `open_ai_key`
os.environ["OPENAI_API_KEY"] = open_ai_key
client = OpenAI()

# Import audio file

In [8]:
# ==== 2) Upload your .m4a file ====
print("Upload your .m4a file…")
uploaded = files.upload()
assert uploaded, "No file uploaded."
audio_path = list(uploaded.keys())[0]
audio_path

Upload your .m4a file…


Saving Order_Batching_Warehousing_podcast_notebook_LM.m4a to Order_Batching_Warehousing_podcast_notebook_LM.m4a


'Order_Batching_Warehousing_podcast_notebook_LM.m4a'

# SRT transript

In [22]:
# Parameters you can tweak
CHUNK_MINUTES = 5           # chunk size
TARGET_FRAME_RATE = 16000    # 16 kHz
TARGET_BITRATE = "64k"       # MP3 bitrate to keep chunks small
MONO_CHANNELS = 1
CHUNK_OVERLAP_MS = 500       # small overlap to avoid cutting words at boundaries

# Load original audio (any common format, including m4a)
original = AudioSegment.from_file(audio_path)
duration_ms = len(original)

chunk_len_ms = CHUNK_MINUTES * 60 * 1000
chunks_meta = []  # will store {start_ms, end_ms, path}

start_ms = 0
i = 0
while start_ms < duration_ms:
    end_ms = min(start_ms + chunk_len_ms, duration_ms)
    # Add slight overlap (except for the very first chunk)
    cut_start = max(0, start_ms - CHUNK_OVERLAP_MS if i > 0 else start_ms)
    cut_end   = end_ms

    piece = original[cut_start:cut_end]
    piece = piece.set_channels(MONO_CHANNELS).set_frame_rate(TARGET_FRAME_RATE)

    tmp_name = f"chunk_{i:03d}.mp3"
    piece.export(tmp_name, format="mp3", bitrate=TARGET_BITRATE)

    chunks_meta.append({
        "index": i,
        "start_ms": cut_start,   # actual audio cut start we exported
        "end_ms": cut_end,
        "nominal_start_ms": start_ms,  # the intended chunk start (no overlap)
        "path": tmp_name
    })

    i += 1
    start_ms = end_ms  # advance without overlap (we added overlap in the exported cut)

print(f"Created {len(chunks_meta)} chunk(s).")

Created 3 chunk(s).


In [25]:
# --- 4) Transcribe each chunk with timestamps (whisper-1, verbose_json) ---

def srt_timestamp(seconds: float) -> str:
    ms = int(round(seconds * 1000))
    hrs = ms // 3_600_000; ms %= 3_600_000
    mins = ms // 60_000;   ms %= 60_000
    secs = ms // 1_000;    ms %= 1_000
    return f"{hrs:02}:{mins:02}:{secs:02},{ms:03}"

def safe_get(obj, name, default=None):
    # Works for Pydantic objects and dicts
    if hasattr(obj, name):
        return getattr(obj, name)
    if isinstance(obj, dict):
        return obj.get(name, default)
    return default

In [26]:
all_segments = []

for meta in chunks_meta:
    # Transcribe the chunk
    with open(meta["path"], "rb") as f:
        resp = client.audio.transcriptions.create(
            model="whisper-1",
            file=f,
            response_format="verbose_json",  # required for segments
            temperature=0.0,
        )

    # Try to get segments regardless of SDK shape
    segments = getattr(resp, "segments", None)
    if segments is None:
        try:
            # Some SDK versions support model_dump() to get a dict
            segments = resp.model_dump().get("segments", [])
        except Exception:
            segments = []

    global_offset_sec = meta["nominal_start_ms"] / 1000.0

    for seg in segments or []:
        start_sec = float(safe_get(seg, "start", 0.0)) + global_offset_sec
        end_sec   = float(safe_get(seg, "end", 0.0)) + global_offset_sec
        text      = (safe_get(seg, "text", "") or "").strip()

        if not text or end_sec <= start_sec:
            continue

        all_segments.append({"start": start_sec, "end": end_sec, "text": text})

print(f"Collected {len(all_segments)} segments across all chunks.")

Collected 342 segments across all chunks.


In [27]:
# Sort by global start time
all_segments.sort(key=lambda x: (x["start"], x["end"]))

merged_segments = []
last_end = -1.0
TOL = 0.25  # seconds tolerance to skip near-duplicate due to overlap

for seg in all_segments:
    if seg["start"] < last_end - TOL and merged_segments:
        # Likely duplicate from overlap; skip it
        continue
    merged_segments.append(seg)
    last_end = seg["end"]

print(f"Merged into {len(merged_segments)} SRT segments after de-dup.")

Merged into 341 SRT segments after de-dup.


In [28]:
srt_lines = []
for i, seg in enumerate(merged_segments, start=1):
    start_ts = srt_timestamp(seg["start"])
    end_ts   = srt_timestamp(seg["end"])
    text     = seg["text"]
    srt_lines.append(f"{i}")
    srt_lines.append(f"{start_ts} --> {end_ts}")
    srt_lines.append(text)
    srt_lines.append("")  # blank line between cues

srt_text = "\n".join(srt_lines)

out_srt = "transcript.srt"
with open(out_srt, "w", encoding="utf-8") as f:
    f.write(srt_text)

# Simple text transcript

In [30]:
audio = AudioSegment.from_file(audio_path)

# Split into 10-minute chunks (600,000 ms)
CHUNK_MINUTES = 5
chunk_length_ms = CHUNK_MINUTES * 60 * 1000
chunks = [audio[i:i + chunk_length_ms] for i in range(0, len(audio), chunk_length_ms)]

# Export each chunk
chunk_files = []
for i, chunk in enumerate(chunks):
    filename = f"chunk_{i}.mp3"
    chunk.export(filename, format="mp3")
    chunk_files.append(filename)

print(f"Created {len(chunk_files)} chunks.")

Created 3 chunks.


In [31]:
all_text = []
for f in chunk_files:
    resp = client.audio.transcriptions.create(
        model="whisper-1",  # or "whisper-1"
        file=open(f, "rb"),
    )
    all_text.append(resp.text)

final_transcript = "\n".join(all_text)

In [32]:
with open("transcript.txt", "w", encoding="utf-8") as f:
    f.write(final_transcript)