<a href="https://colab.research.google.com/github/alexanderkrauck/alexanderkrauck.github.io/blob/main/TranscribedVideoSlides.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install moviepy SpeechRecognition pydub opencv-python-headless numpy openai opencv-contrib-python fpdf python-docx moviepy

Collecting SpeechRecognition
  Downloading SpeechRecognition-3.11.0-py2.py3-none-any.whl.metadata (28 kB)
Collecting pydub
  Downloading pydub-0.25.1-py2.py3-none-any.whl.metadata (1.4 kB)
Collecting fpdf
  Downloading fpdf-1.7.2.tar.gz (39 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting python-docx
  Downloading python_docx-1.1.2-py3-none-any.whl.metadata (2.0 kB)
Downloading SpeechRecognition-3.11.0-py2.py3-none-any.whl (32.8 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m32.8/32.8 MB[0m [31m8.2 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading pydub-0.25.1-py2.py3-none-any.whl (32 kB)
Downloading python_docx-1.1.2-py3-none-any.whl (244 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m244.3/244.3 kB[0m [31m10.0 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: fpdf
  Building wheel for fpdf (setup.py) ... [?25l[?25hdone
  Created wheel for fpdf: filename=fpdf-1.7.2-py2.py3-none-any.whl size=40704 sh

In [None]:
from google.colab import userdata
openai_api_key = userdata.get('OPENAI_KEY')

In [None]:
import cv2
import os
import numpy as np
from moviepy.editor import VideoFileClip
import openai
from tqdm import tqdm
import matplotlib.pyplot as plt
from IPython.display import display, Markdown
from openai import OpenAI
from skimage.metrics import structural_similarity as ssim
import base64
from moviepy.editor import VideoFileClip

# Initialize OpenAI client
client = OpenAI(api_key=openai_api_key)
openai.api_key = openai_api_key

def detect_slide_changes(video_path, sample_rate=10, ssim_threshold=0.9):
    print("Starting slide change detection...")
    video = cv2.VideoCapture(video_path)
    fps = video.get(cv2.CAP_PROP_FPS)
    frame_interval = int(fps * sample_rate)
    total_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT))
    duration = total_frames / fps

    print(f"Video FPS: {fps}")
    print(f"Total frames: {total_frames}")
    print(f"Video duration: {duration:.2f} seconds")
    print(f"Frame interval: {frame_interval} frames (every {sample_rate} seconds)")

    slide_changes = []
    frames = []
    timestamps = []

    success, prev_frame = video.read()
    if not success:
        print("Failed to read the first frame of the video.")
        return [], [], []

    prev_frame_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)
    frame_count = frame_interval
    current_time = 0

    slide_changes.append(0)  # Start with the first frame
    frames.append(prev_frame)
    timestamps.append(current_time)

    print("Analyzing frames for slide changes...")
    with tqdm(total=total_frames, desc="Frames Processed", unit="frame") as pbar:
        while frame_count < total_frames:
            video.set(cv2.CAP_PROP_POS_FRAMES, frame_count)
            success, frame = video.read()
            if not success:
                print(f"Failed to read frame at position {frame_count}.")
                break

            current_time = frame_count / fps

            frame_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
            score, _ = ssim(prev_frame_gray, frame_gray, full=True)

            if score < ssim_threshold:
                # Significant change detected
                print(f"Slide change detected at {current_time:.2f} seconds (frame {frame_count}). SSIM: {score:.4f}")
                slide_changes.append(current_time)
                frames.append(frame)
                timestamps.append(current_time)
                prev_frame_gray = frame_gray
            else:
                # No significant change
                pass  # You can add additional logging here if desired

            prev_frame_gray = frame_gray
            increment = min(frame_interval, total_frames - frame_count)
            frame_count += frame_interval
            pbar.update(increment)

    video.release()

    # Append the last timestamp if not included
    if slide_changes[-1] != duration:
        slide_changes.append(duration)
        print(f"Adding final slide change at end of video ({duration:.2f} seconds).")

    print(f"Total slide changes detected: {len(slide_changes) - 1}")
    return slide_changes, frames, timestamps


# MoviePy replacement for extracting audio segment
def extract_audio_with_moviepy(video_path, start_time, end_time, output_audio_path):
    try:
        video = VideoFileClip(video_path).subclip(start_time, end_time)
        video.audio.write_audiofile(output_audio_path, codec="pcm_s16le", fps=16000, nbytes=2, buffersize=2000)
        print(f"Audio extracted to {output_audio_path}.")
    except Exception as e:
        print(f"Error extracting audio with moviepy: {e}")

def transcribe_audio(start_time, end_time, segment_index, video_path):
    print(f"Transcribing audio for segment {segment_index} from {start_time:.2f}s to {end_time:.2f}s...")
    temp_audio_file = f"temp_audio_{segment_index}.wav"
    # Use ffmpeg to extract the audio segment
    #ffmpeg_command = f"ffmpeg -y -i \"{video_path}\" -ss {start_time} -to {end_time} -vn -acodec pcm_s16le -ar 16000 -ac 1 \"{temp_audio_file}\" -loglevel quiet"
    #os.system(ffmpeg_command)
    extract_audio_with_moviepy(video_path, start_time, end_time, temp_audio_file)

    try:
        with open(temp_audio_file, "rb") as audio_file:
            transcript = client.audio.transcriptions.create(
                model="whisper-1",
                file=audio_file
            )
        print(f"Transcription for segment {segment_index} completed.")
    except Exception as e:
        print(f"Error during transcription of segment {segment_index}: {e}")
        transcript = None

    os.remove(temp_audio_file)
    return transcript.text if transcript else ""

# Function to encode the frame image to base64
def encode_image_from_frame(frame):
    print("Encoding slide image to base64...")
    # Convert the frame to JPEG format
    _, buffer = cv2.imencode('.jpg', frame)
    base64_image = base64.b64encode(buffer).decode('utf-8')
    print("Image encoding completed.")
    return base64_image

def summarize_slide(transcription, base64_image, segment_index):
    print(f"Generating summary for segment {segment_index}...")
    # Prepare the message content
    messages = [
        {
            "role": "user",
            "content": [
                {
                    "type": "text",
                    "text": "Please provide a comprehensive summary that combines the information from the slide image and the transcription."
                },
                {
                    "type": "image_url",
                    "image_url": {
                        "url": f"data:image/jpeg;base64,{base64_image}"
                    },
                },
                {
                    "type": "text",
                    "text": f"Transcription:\n{transcription}"
                },
            ],
        }
    ]

    try:
        response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=messages
        )
        summary = response.choices[0].message.content
        print(f"Summary generation for segment {segment_index} completed.")
    except Exception as e:
        print(f"Error during summary generation for segment {segment_index}: {e}")
        summary = ""
    return summary

def main(video_path, sample_rate=10, ssim_threshold=0.9):
    print("Starting main processing...")
    print("Step 1: Detecting slide changes...")
    slide_changes, frames, timestamps = detect_slide_changes(video_path, sample_rate, ssim_threshold)

    if not slide_changes:
        print("No slide changes detected. Exiting.")
        return []

    print("\nStep 2: Processing slides and audio...")
    results = []
    duration = VideoFileClip(video_path).duration

    for i in tqdm(range(len(slide_changes) - 1), desc="Segments Processed", unit="segment"):
        start_time = slide_changes[i]
        end_time = slide_changes[i+1]
        frame = frames[i]
        timestamp = start_time

        print(f"\nProcessing segment {i}: {start_time:.2f}s to {end_time:.2f}s")

        # Transcribe audio
        transcription = transcribe_audio(start_time, end_time, i, video_path)

        # Encode the slide image to base64
        base64_image = encode_image_from_frame(frame)

        # Summarize transcription and slide image
        summary = summarize_slide(transcription, base64_image, i)

        # Collect results
        results.append({
            'timestamp': timestamp,
            'frame': frame,
            'transcription': transcription,
            'summary': summary
        })

    print("\nProcessing completed.")
    return results




  if event.key is 'enter':



In [None]:
from docx import Document
from docx.shared import Inches
import matplotlib.pyplot as plt
import cv2
import os
import tempfile

def get_filename_without_extension(path):
    # Get the base name of the path (filename with extension)
    filename_with_ext = os.path.basename(path)
    # Split the extension from the filename
    filename_without_ext, _ = os.path.splitext(filename_with_ext)
    return filename_without_ext

def export_results(results, output_filename="my_analysis.docx"):
    """
    Export results to DOCX format

    Args:
        results: List of dictionaries containing timestamp, frame, transcription, and summary
        output_filename: Name of output file (should end with .docx)
    """
    # Create temporary directory for images
    with tempfile.TemporaryDirectory() as temp_dir:
        doc = Document()

        for idx, result in enumerate(results):
            # Save the frame as an image
            frame = result['frame']
            img_path = os.path.join(temp_dir, f'frame_{idx}.png')

            # Convert BGR to RGB and save
            plt.figure(figsize=(10, 6))
            plt.imshow(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
            plt.axis('off')
            plt.savefig(img_path, bbox_inches='tight', pad_inches=0)
            plt.close()

            # Add timestamp
            doc.add_heading(f"Timestamp: {result['timestamp']:.2f} seconds", level=1)

            # Add image
            doc.add_picture(img_path, width=Inches(6))

            # Add summary
            doc.add_heading("Summary:", level=2)
            doc.add_paragraph(result['summary'])

            # Add transcription if available
            #if result.get('transcription'):
            #    doc.add_heading("Transcription:", level=2)
            #    doc.add_paragraph(result['transcription'])

            # Add separator except for the last item
            if idx < len(results) - 1:
                doc.add_paragraph("---")
                doc.add_page_break()

        # Save the document
        doc.save(output_filename)

        print(f"Document saved as {output_filename}")

# Example usage:
# export_results(results, "my_analysis.docx")

In [None]:
import os
from pathlib import Path
os.makedirs("outputs", exist_ok=True)

In [None]:
# Get all MP4 files in the directory and its subdirectories
directory = "/content"
mp4_files = list(Path(directory).rglob("*.mp4"))

if not mp4_files:
    print(f"No MP4 files found in {directory} or its subdirectories")
else:
    print(f"Found {len(mp4_files)} MP4 files to process")

    # Process each video
    for video_path in mp4_files:
        try:
            print(f"\nProcessing: {str(video_path)}")

            # Generate output filename
            output_file = "/content/outputs/"+ str(video_path).split("/")[-1].split(".")[0] + ".docx"

            # Process the video
            results = main(str(video_path), sample_rate=10, ssim_threshold=0.9)

            # Export results
            export_results(results, output_file)

            print(f"Successfully processed: {str(video_path)}")
            print(f"Output saved as: {output_file}")

        except Exception as e:
            print(f"Error processing {str(video_path)}: {str(e)}")
            continue

    print("\nProcessing complete!")

In [None]:
import shutil
import os

# Replace these paths with your folder and output file paths
folder_to_compress = "/content/outputs"
output_zip = "/content/words"

# Ensure the parent directory of the output exists
os.makedirs(os.path.dirname(output_zip), exist_ok=True)

# Compress the folder
shutil.make_archive(output_zip, 'zip', folder_to_compress)

print(f"Folder '{folder_to_compress}' has been compressed to '{output_zip}.zip'")


Folder '/content/outputs' has been compressed to '/content/words.zip'
Folder '/content/outputs' has been compressed to '/content/words.zip'


In [None]:
import shutil
import os

def move_file(source_path, destination_path):
    """
    Move a file from source to destination path.
    Creates destination directory if it doesn't exist.
    """
    # Create destination directory if needed
    os.makedirs(os.path.dirname(destination_path), exist_ok=True)

    # Move the file
    shutil.move(source_path, destination_path)

In [None]:
move_file('/content/words.zip', '/content/drive/MyDrive/file.zip')