<a href="https://colab.research.google.com/github/LifeHackInnovationsLLC/whisper-video-transcription/blob/main/LHI_WhisperVideoDrive.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

If you're looking at this on GitHub and new to Python Notebooks or Colab, click the Google Colab badge above 👆

#📼 OpenAI Whisper + Google Drive Video Transcription

📺 Getting started video: https://youtu.be/YGpYinji7II

###This application will extract audio from all the video files in a Google Drive folder and create a high-quality transcription with OpenAI's Whisper automatic speech recognition system.

*Note: This requires giving the application permission to connect to your drive. Only you will have access to the contents of your drive, but please read the warnings carefully.*

This notebook application:
1. Connects to your Google Drive when you give it permission.
2. Creates a WhisperVideo folder and three subfolders (ProcessedVideo, AudioFiles and TextFiles.)
3. When you run the application it will search for all the video files (.mp4, .mov, mkv and .avi) in your WhisperVideo folder, transcribe them and then move the file to WhisperVideo/ProcessedVideo and save the transcripts to WhisperVideo/TextFiles. It will also add a copy of the new audio file to WhisperVideo/AudioFiles

###**For faster performance set your runtime to "GPU"**
*Click on "Runtime" in the menu and click "Change runtime type". Select "GPU".*


**Note: If you add a new file after running this application you'll need to remount the drive in step 1 to make them searchable**

## 1. Setup & Mount Drive

In [3]:
import os

# Reusable function to check and mount Google Drive
def check_and_mount_drive():
    print("Checking /content/drive status...")
    if os.path.exists("/content/drive"):
        print("Mount directory exists. Checking contents...")
        if os.listdir("/content/drive"):
            print("Mountpoint already contains files. Attempting to unmount...")
            try:
                # Unmount the existing mountpoint
                !fusermount -u /content/drive
                print("Unmounted successfully.")
            except Exception as e:
                print(f"Failed to unmount: {e}")
                print("If the issue persists, please select 'Runtime > Disconnect and delete runtime' and try again.")
                return False

    # Mount Google Drive
    print("Mounting Google Drive...")
    from google.colab import drive
    try:
        drive.mount("/content/drive", force_remount=True)
        print("Google Drive mounted successfully.")
    except ValueError as e:
        print(f"Mounting failed: {e}")
        print("If the issue persists, please select 'Runtime > Disconnect and delete runtime' and try again.")
        return False

    # Verify mount
    if os.path.exists("/content/drive/MyDrive"):
        print("Drive is mounted and ready.")
        return True
    else:
        print("Mounting seems incomplete. Please check your drive configuration.")
        return False

# Attempt to check and mount the drive
if check_and_mount_drive():
    print("Proceeding with folder creation...")
else:
    print("Drive mount failed. Cannot proceed.")
    raise SystemExit("Drive mount failed. Exiting.")

# Predefined options for client folders
clients = {
    "1": "/content/drive/MyDrive/Clients/WCBradley/Videos/",
    "2": "/content/drive/MyDrive/Clients/SiriusXM/Videos/",
    "3": "/content/drive/MyDrive/Clients/LHI/Videos/"
}

# Display options to the user
print("Select a client folder:")
print("1: WCBradley")
print("2: SiriusXM")
print("3: LHI")
print("4: Enter a custom folder path")

# Get user input
choice = input("Enter the number corresponding to your choice (default: 1): ").strip()

# Determine the root folder for the client
if choice in clients:
    client_videos_folder = clients[choice]
elif choice == "4":
    client_videos_folder = input("Enter the full path to your Videos folder: ").strip()
else:
    # Default to WCBradley if no valid input
    client_videos_folder = clients["1"]

# Define the WhisperVideo root folder within the client's Videos folder
rootFolder = client_videos_folder + "WhisperVideo/"

# Define subfolder paths relative to the WhisperVideo root folder
audio_folder = rootFolder + "AudioFiles/"
text_folder = rootFolder + "TextFiles/"
processed_folder = rootFolder + "ProcessedVideo/"

# Ensure WhisperVideo folder and its subfolders exist
folders = [rootFolder, audio_folder, text_folder, processed_folder]
for folder in folders:
    try:
        print(f"Checking folder: {folder}")
        if not os.path.exists(folder):
            os.makedirs(folder)
            print(f"Created folder: {folder}")
        else:
            print(f"Folder already exists: {folder}")
    except Exception as e:
        print(f"Error ensuring folder {folder}: {e}")

print(f"WhisperVideo folder and subfolders initialized for client:")
print(f"WhisperVideo folder: {rootFolder}")
print(f"Audio files folder: {audio_folder}")
print(f"Text files folder: {text_folder}")
print(f"Processed videos folder: {processed_folder}")


Checking /content/drive status...
Mounting Google Drive...
Mounted at /content/drive
Google Drive mounted successfully.
Drive is mounted and ready.
Proceeding with folder creation...
Select a client folder:
1: WCBradley
2: SiriusXM
3: LHI
4: Enter a custom folder path
Enter the number corresponding to your choice (default: 1): 1
Checking folder: /content/drive/MyDrive/Clients/WCBradley/Videos/WhisperVideo/
Folder already exists: /content/drive/MyDrive/Clients/WCBradley/Videos/WhisperVideo/
Checking folder: /content/drive/MyDrive/Clients/WCBradley/Videos/WhisperVideo/AudioFiles/
Folder already exists: /content/drive/MyDrive/Clients/WCBradley/Videos/WhisperVideo/AudioFiles/
Checking folder: /content/drive/MyDrive/Clients/WCBradley/Videos/WhisperVideo/TextFiles/
Folder already exists: /content/drive/MyDrive/Clients/WCBradley/Videos/WhisperVideo/TextFiles/
Checking folder: /content/drive/MyDrive/Clients/WCBradley/Videos/WhisperVideo/ProcessedVideo/
Folder already exists: /content/drive/MyD

## 2. Dependencies & Model Loading


In [4]:
!pip install git+https://github.com/openai/whisper.git
!sudo apt update && sudo apt install ffmpeg
!pip install librosa
!pip install audioread

import whisper
import time
import librosa
import soundfile as sf
import re
import os

# model = whisper.load_model("tiny.en")
model = whisper.load_model("base.en")
# model = whisper.load_model("small.en") # load the small model
# model = whisper.load_model("medium.en")
# model = whisper.load_model("large")

Collecting git+https://github.com/openai/whisper.git
  Cloning https://github.com/openai/whisper.git to /tmp/pip-req-build-hjw6ifmt
  Running command git clone --filter=blob:none --quiet https://github.com/openai/whisper.git /tmp/pip-req-build-hjw6ifmt
  Resolved https://github.com/openai/whisper.git to commit 90db0de1896c23cbfaf0c58bc2d30665f709f170
  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Collecting tiktoken (from openai-whisper==20240930)
  Downloading tiktoken-0.8.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (6.6 kB)
Collecting triton>=2.0.0 (from openai-whisper==20240930)
  Downloading triton-3.1.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (1.3 kB)
Downloading triton-3.1.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (209.5 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m

100%|███████████████████████████████████████| 139M/139M [00:08<00:00, 16.4MiB/s]
  checkpoint = torch.load(fp, map_location=device)


## 3. Processing Logic (Transcription)


In [5]:
# # Mount Google Drive
# from google.colab import drive
# drive.mount("/content/drive", force_remount=True)  # This will prompt for authorization.

# import os

# # Ensure WhisperVideo folder and its subfolders exist
# folders = [rootFolder, audio_folder, text_folder, processed_folder]
# for folder in folders:
#     try:
#         if not os.path.exists(folder):
#             os.makedirs(folder)
#             print(f"Created folder: {folder}")
#         else:
#             print(f"Folder already exists: {folder}")
#     except Exception as e:
#         print(f"Error ensuring folder {folder}: {e}")

# print(f"All folders verified and ready under: {rootFolder}")


## 4. Reporting & CSV Logging


## 5. (Future) Drive API Integration


In [6]:
import os
import shutil
from datetime import timedelta
import subprocess
import logging
import csv
from datetime import datetime
# Removed: from urllib.parse import quote

# Helper function to format time
def format_time(seconds):
    """Convert seconds to HH:MM:SS format."""
    return str(timedelta(seconds=int(seconds)))

# Removed the generate_shareable_link function

# Setup logging
logging.basicConfig(
    filename="processing_log.txt",
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
)

# Ensure folders are created
folders = [rootFolder, audio_folder, text_folder, processed_folder]
for folder in folders:
    if not os.path.exists(folder):
        os.makedirs(folder)

# Initialize logs
success_log = []
error_log = []
skipped_log = []

# Process all video files in WhisperVideo folder
video_files = [f for f in os.listdir(rootFolder) if os.path.isfile(os.path.join(rootFolder, f))]

for video_file in video_files:
    # Exclude the report file from processing
    if video_file == "processing_report.txt":
        continue

    # Skip non-video files
    if not video_file.endswith((".mp4", ".mov", ".avi", ".mkv")):
        skipped_log.append((video_file, "Invalid format"))
        print(f"Skipped {video_file}: Invalid format.")
        continue

    # Define paths
    video_path = os.path.join(rootFolder, video_file)
    audio_path = os.path.join(audio_folder, video_file[:-4] + ".wav")
    text_path = os.path.join(text_folder, video_file[:-4] + ".txt")
    processed_path = os.path.join(processed_folder, video_file)

    try:
        print(f"Extracting audio for {video_file} to {audio_path}")
        # Extract audio
        try:
            # Attempt to load the audio using librosa
            y, sr = librosa.load(video_path, sr=16000)  # Load audio with 16 kHz sampling rate
            sf.write(audio_path, y, sr)  # Save audio as a WAV file
            print(f"Audio extraction successful using librosa for {video_file}")
        except Exception as e_librosa:
            print(f"Librosa extraction failed for {video_file}: {e_librosa}")
            print(f"Falling back to ffmpeg for {video_file}")
            # Use ffmpeg as a fallback
            subprocess.run(["ffmpeg", "-i", video_path, "-ar", "16000", "-ac", "1", audio_path], check=True)
            print(f"Audio extraction successful using ffmpeg for {video_file}")

        print(f"Starting transcription for {audio_path}")
        # Transcribe the audio using Whisper
        result = model.transcribe(audio_path)
        print(f"Transcription completed for {audio_path}")

        # Create initial transcription
        text = ""
        for segment in result["segments"]:
            start_time = format_time(segment["start"])
            end_time = format_time(segment["end"])
            text_segment = segment["text"].strip()
            text += f"[{start_time} - {end_time}] {text_segment}\n\n"

        print(f"Saving transcription to {text_path}")
        # Save the transcription
        with open(text_path, "w") as f:
            f.write(text)
        print(f"Transcription saved successfully for {video_file}")

        print(f"Moving file {video_file} to processed folder")
        # Move the video to ProcessedVideo folder
        shutil.move(video_path, processed_path)
        print(f"File moved to processed folder: {processed_path}")

        # Log success
        success_log.append(video_file)
        logging.info(f"Successfully processed {video_file}")
        print(f"Successfully processed {video_file}")

    except subprocess.CalledProcessError as e:
        error_message = f"FFmpeg error for {video_file}: {e}"
        print(error_message)
        error_log.append((video_file, error_message))
        logging.error(error_message)
    except Exception as e:
        error_message = f"General error for {video_file}: {e}"
        print(error_message)
        error_log.append((video_file, error_message))
        logging.error(error_message)

# Perform folder parity check
def get_file_bases(folder):
    return {os.path.splitext(f)[0] for f in os.listdir(folder) if os.path.isfile(os.path.join(folder, f))}

videos = get_file_bases(processed_folder)
audios = get_file_bases(audio_folder)
texts = get_file_bases(text_folder)

all_match = videos == audios == texts

# Generate completion report
report = "Processing Report\n"
report += f"\nSuccessfully Processed Files ({len(success_log)}):\n"
report += "\n".join(success_log)

report += f"\n\nSkipped Files ({len(skipped_log)}):\n"
report += "\n".join([f"{file} - {reason}" for file, reason in skipped_log])

report += f"\n\nErrors ({len(error_log)}):\n"
report += "\n".join([f"{file} - {reason}" for file, reason in error_log])

report += f"\n\nFolder Parity Check:\n"
report += f"All folders have matching files: {'Yes' if all_match else 'No'}\n"
report += f"Processed Videos: {len(videos)}\n"
report += f"Audio Files: {len(audios)}\n"
report += f"Text Files: {len(texts)}\n"

# Save the report
report_path = os.path.join(rootFolder, "processing_report.txt")
with open(report_path, "w") as f:
    f.write(report)

# Display completion report
print(report)

csv_path = os.path.join(rootFolder, "processing_log.csv")
file_exists = os.path.isfile(csv_path)

# We'll store a timestamp for each run and each file processed/skipped/error
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

with open(csv_path, "a", newline="", encoding="utf-8") as csvfile:
    writer = csv.writer(csvfile)
    # If file doesn't exist, write header
    if not file_exists:
        writer.writerow(["Timestamp", "FileName", "Status", "Notes"])

    # Append rows for each processed file
    for fname in success_log:
        writer.writerow([current_time, fname, "Processed", ""])

    # Append rows for each skipped file
    for (fname, reason) in skipped_log:
        writer.writerow([current_time, fname, "Skipped", reason])

    # Append rows for each error file
    for (fname, reason) in error_log:
        writer.writerow([current_time, fname, "Error", reason])

print("\nCurrent CSV log entries:")
with open(csv_path, "r", encoding="utf-8") as csvfile:
    print(csvfile.read())


Extracting audio for Production Deployment API Cosmos Telemetry Etc Allyssa Screen Recording 2024-06-28 at 2.22.03 PM.mov to /content/drive/MyDrive/Clients/WCBradley/Videos/WhisperVideo/AudioFiles/Production Deployment API Cosmos Telemetry Etc Allyssa Screen Recording 2024-06-28 at 2.22.03 PM.wav


  y, sr = librosa.load(video_path, sr=16000)  # Load audio with 16 kHz sampling rate
	Deprecated as of librosa version 0.10.0.
	It will be removed in librosa version 1.0.
  y, sr_native = __audioread_load(path, offset, duration, dtype)


Audio extraction successful using librosa for Production Deployment API Cosmos Telemetry Etc Allyssa Screen Recording 2024-06-28 at 2.22.03 PM.mov
Starting transcription for /content/drive/MyDrive/Clients/WCBradley/Videos/WhisperVideo/AudioFiles/Production Deployment API Cosmos Telemetry Etc Allyssa Screen Recording 2024-06-28 at 2.22.03 PM.wav
Transcription completed for /content/drive/MyDrive/Clients/WCBradley/Videos/WhisperVideo/AudioFiles/Production Deployment API Cosmos Telemetry Etc Allyssa Screen Recording 2024-06-28 at 2.22.03 PM.wav
Saving transcription to /content/drive/MyDrive/Clients/WCBradley/Videos/WhisperVideo/TextFiles/Production Deployment API Cosmos Telemetry Etc Allyssa Screen Recording 2024-06-28 at 2.22.03 PM.txt
Transcription saved successfully for Production Deployment API Cosmos Telemetry Etc Allyssa Screen Recording 2024-06-28 at 2.22.03 PM.mov
Moving file Production Deployment API Cosmos Telemetry Etc Allyssa Screen Recording 2024-06-28 at 2.22.03 PM.mov to pr