In [1]:
import os
import math
import glob
from moviepy import VideoFileClip
from pydub import AudioSegment
from openai import OpenAI


def transcribe_mp4(video_path, chunk_duration_minutes=5):
    """
    Extracts audio from an .mp4 video, splits the audio into chunks,
    transcribes each chunk using the Whisper API, and returns the full transcription.

    :param video_path: Path to the input .mp4 video file.
    :param chunk_duration_minutes: Duration of each chunk in minutes (default 5).
    :return: A string containing the full transcription of the audio.
    """

    # -------------------------------------------------------------------
    # 1) EXTRACT AUDIO FROM VIDEO AND SAVE AS .MP3
    # -------------------------------------------------------------------
    try:
        # Load the video
        video = VideoFileClip(video_path)
        # Derive an MP3 filename from the MP4 path (same folder, same basename)
        base_name = os.path.splitext(os.path.basename(video_path))[0]
        audio_file = f"{base_name}.mp3"
        video.audio.write_audiofile(audio_file)
        print(f"Audio extracted and saved as {audio_file}")
    except Exception as e:
        print("Error extracting audio from video:", e)
        return ""

    # -------------------------------------------------------------------
    # 2) LOAD THE AUDIO AND SPLIT INTO CHUNKS
    # -------------------------------------------------------------------
    try:
        audio = AudioSegment.from_file(audio_file, format="mp3")
        print("Audio Imported ✅")

        # Calculate chunk sizes in milliseconds
        chunk_length_ms = chunk_duration_minutes * 60 * 1000
        total_length_ms = len(audio)
        num_chunks = math.ceil(total_length_ms / chunk_length_ms)

        # Create a folder to store chunks, e.g. "myvideo_chunks"
        chunk_folder = f"{base_name}_chunks"
        os.makedirs(chunk_folder, exist_ok=True)

        # Export each chunk as .mp3
        for i in range(num_chunks):
            start_ms = i * chunk_length_ms
            end_ms = min((i + 1) * chunk_length_ms, total_length_ms)
            chunk = audio[start_ms:end_ms]
            chunk_file = os.path.join(chunk_folder, f"{base_name}_{i}.mp3")
            chunk.export(chunk_file, format="mp3")

        print("Chunks created ✅")

    except Exception as e:
        print("Error creating audio chunks:", e)
        return ""

    # -------------------------------------------------------------------
    # 3) TRANSCRIBE AUDIO CHUNKS WITH OPENAI WHISPER
    # -------------------------------------------------------------------
    transcripts = []
    try:
        client = OpenAI()  # Based on your snippet - adjust if using 'import openai'
        # Find each chunk file in ascending order
        chunk_files = sorted(
            glob.glob(os.path.join(chunk_folder, f"{base_name}_*.mp3"))
        )

        for chunk_file in chunk_files:
            with open(chunk_file, "rb") as f:
                transcription = client.audio.transcriptions.create(
                    model="whisper-1", file=f
                )
                transcripts.append(transcription.text)

        full_transcription = " ".join(transcripts)
        print("Full transcription done ✅")

    except Exception as e:
        print("Error transcribing chunks:", e)
        return ""

    # -------------------------------------------------------------------
    # 4) RETURN THE COMBINED TRANSCRIPTION
    # -------------------------------------------------------------------
    return full_transcription

In [2]:
video_path = "../test/podcast.MP4"
transcription_text = transcribe_mp4(video_path)
transcription_text

{'video_found': True, 'audio_found': True, 'metadata': {'major_brand': 'mp42', 'minor_version': '0', 'compatible_brands': 'mp42isom'}, 'inputs': [{'streams': [{'input_number': 0, 'stream_number': 0, 'stream_type': 'video', 'language': None, 'default': True, 'size': [480, 848], 'bitrate': 1348, 'fps': 29.63, 'codec_name': 'h264', 'profile': '(Baseline)', 'metadata': {'Metadata': '', 'vendor_id': '[0][0][0][0]'}}, {'input_number': 0, 'stream_number': 1, 'stream_type': 'audio', 'language': None, 'default': True, 'fps': 44100, 'bitrate': 60, 'metadata': {'Metadata': '', 'vendor_id': '[0][0][0][0]'}}], 'input_number': 0}], 'duration': 64.1, 'bitrate': 1412, 'start': 0.0, 'default_video_input_number': 0, 'default_video_stream_number': 0, 'video_codec_name': 'h264', 'video_profile': '(Baseline)', 'video_size': [480, 848], 'video_bitrate': 1348, 'video_fps': 29.63, 'default_audio_input_number': 0, 'default_audio_stream_number': 1, 'audio_fps': 44100, 'audio_bitrate': 60, 'video_duration': 64.1

                                                                      

MoviePy - Done.
Audio extracted and saved as podcast.mp3
Audio Imported ✅
Chunks created ✅
Full transcription done ✅


"If you're going to try, go all the way. Otherwise, don't even start. If you're going to try, go all the way. This can mean losing girlfriends, wives, relatives, jobs, and maybe your mind. Go all the way. It can mean not eating for three or four days. It can mean freezing on a park bench. It can mean jail. It can mean derision, mockery, isolation. Isolation is the gift. All the others are a test of your endurance. How much you really want to do it. And you'll do it. Despite rejection and the worst odds, it will be better than anything you can imagine. If you're going to try, go all the way. There's no other feeling like that. You'll be alone with the gods, and the knights will flame with fire. Do it. Do it. All the way. All the way. You will ride life straight to perfect laughter. It's the only good fight there is."

In [None]:
video_path = "../test/podcast.MP4"
transcription_text = transcribe_mp4(video_path)
transcription_text = "test"

In [6]:
video_path.split("/")[-1]

'podcast.MP4'

In [7]:
transcription_text

'test'

In [14]:
from datetime import datetime
import os


def generate_idkey(filename: str) -> str:
    """
    Generate an ID key composed of the current timestamp (in YYYYMMDDHHMMSS format)
    and the first two characters of the base filename.

    Args:
        filename (str): The name (or full path) of the file.

    Returns:
        str: The generated ID key.
    """
    # Extract just the base name of the file
    base_filename = os.path.basename(filename)
    # Get the first two characters (or the full filename if it's shorter than 2)
    prefix = base_filename[:2]
    # Generate a timestamp string
    timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
    # Combine timestamp and prefix with an underscore
    idkey = f"ID{timestamp}_{prefix}"
    return idkey


# Example usage:
filename = "/path/to/video.mp4"
print("Generated ID key:", generate_idkey(filename))

Generated ID key: ID20250212114138_vi


In [8]:
from pymongo import MongoClient
from dotenv import load_dotenv

load_dotenv()

connection_string = os.getenv("connection_string")

# Create a MongoClient
client = MongoClient(connection_string)

# Access a database (if it doesn’t exist, MongoDB will create it on first write)
db = client["CanvasDB"]

# Access a collection within the database
collection = db["Transcripts"]


# Test by inserting a document
result = collection.insert_one(
    {"file_name": video_path.split("/")[-1], "transcript": transcription_text}
)
print("Inserted document with id:", result.inserted_id)

  client = MongoClient(connection_string)


Inserted document with id: 67acd082331d15f2fbc41941


In [None]:
def insert_transcript(transcript: str, file_name: str, course_name: str) -> str:
    """
    Inserts a transcript document into the CanvasDB.Transcripts collection if the transcript is not empty.

    Parameters:
        transcript (str): The transcript text.
        file_name (str): The name of the file.
        course_name (str): The name of the course.

    Returns:
        str: The inserted document's id as a string if successful; an empty string otherwise.
    """
    # Check if the transcript is empty or only whitespace
    if not transcript or not transcript.strip():
        print("Transcript is empty. Aborting upload.")
        return ""

    try:
        # Load environment variables from the .env file
        load_dotenv()
        connection_string = os.getenv("connection_string")
        if not connection_string:
            raise ValueError("Connection string not found in environment variables.")

        # Create a MongoClient and access the database and collection
        client = MongoClient(connection_string)
        db = client["CanvasDB"]
        collection = db["Transcripts"]

        # Prepare the document to insert
        document = {
            "file_name": file_name,
            "transcript": transcript,
            "course_name": course_name,
        }

        # Insert the document into the collection
        result = collection.insert_one(document)
        print("Inserted document with id:", result.inserted_id)
        return str(result.inserted_id)

    except Exception as e:
        # Print error message to help with debugging
        print("Error inserting transcript:", e)
        return ""


if __name__ == "__main__":
    # Example usage
    video_path = "test/podcast.MP4"
    transcription_text = transcribe_mp4(video_path)
    print("\nFINAL TRANSCRIPTION:\n", transcription_text[:100])

    insert_transcript(transcription_text, file_name="test", course_name="test_course")