# Transcription and summarization notebook with AIs

---



Repository: https://github.com/martinopiaggi/summarize

In [44]:
Source = "" #@param {type:"string"}
Type_of_source = "Youtube video or playlist" #@param ['Youtube video or playlist', 'Google Drive video link','Dropbox video link']
Type = Type_of_source
URL = Source

#@markdown ---
#@markdown Insert your API key depending on which endpoint you want to use (by default Groq)
api_key = "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" #@param {type:"string"}


#@markdown Do you want to use OpenAI endpoint ?
OpenAI_endpoint = False  #@param {type:"boolean"}

#@markdown Do you want to use Youtube captions ?
use_Youtube_captions = True #@param {type:"boolean"}

#@markdown ---
#@markdown ### Other settings

#@markdown You want timestamps in final text?
Timestamps = True #@param {type:"boolean"}


In [7]:
%%capture
#@markdown ## Installation of libraries
#@markdown Installation of libraries

import subprocess
import re

import torch
from torch.utils.data import Dataset, DataLoader

if use_Youtube_captions:
  !pip install youtube-transcript-api
  from youtube_transcript_api import YouTubeTranscriptApi

if (not Type == "Youtube video or playlist") or (not use_Youtube_captions):
  import torch
  from torch.utils.data import Dataset, DataLoader
  !pip install faster-whisper
  from faster_whisper import WhisperModel


if OpenAI_endpoint:
  !pip install openai
  import openai
  client = openai.OpenAI(api_key=api_key)
else:
  !pip install groq
  from groq import Groq
  client = Groq(api_key=api_key)

if Type == "Youtube video or playlist":
  !pip install git+https://github.com/pytube/pytube
  from pytube import YouTube

if Type == "Google Drive video link":
  from google.colab import drive
  drive.mount('/gdrive')

if Type == ("Dropbox video link"):
  !sudo apt update && sudo apt install ffmpeg


In [45]:
#@markdown ## Video downloads
#@markdown Downloading video sources
video_path_local_list = []
skip_transcription=False

Text = ""
TextTimestamps = ""

def seconds_to_time_format(s):
    hours = s // 3600
    s %= 3600
    minutes = s // 60
    s %= 60
    seconds = s // 1
    return f"{int(hours):02d}:{int(minutes):02d}:{int(seconds):02d}"



def download_youtube_audio_only(url, output_dir=".", filename=None, filename_prefix=None, skip_existing=True, timeout=None, max_retries=0):
    yt = YouTube(url)
    audio_stream = yt.streams.get_audio_only()
    # Prepare the filename if a prefix is provided.
    if filename_prefix and filename:
        filename = f"{filename_prefix}{filename}"
    elif filename_prefix:
        filename = f"{filename_prefix}{audio_stream.default_filename}"
    # Download the audio stream.
    saved_path = audio_stream.download(output_path=output_dir, filename=filename, skip_existing=skip_existing, timeout=timeout, max_retries=max_retries)
    return saved_path


def download_youtube_captions(url):
    regex = r'(?:https?:\/\/)?(?:www\.)?(?:youtube\.com\/(?:[^\/\n\s]+\/\S+\/|(?:v|e(?:mbed)?)\/|\S*?[?&]v=)|youtu\.be\/)([a-zA-Z0-9_-]{11})'
    video_id =  re.search(regex, url).group(1)
    transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)

    try:
      transcript = YouTubeTranscriptApi.get_transcript(video_id, languages=['en'])
    except:
      for available_transcript in transcript_list:
        if available_transcript.is_translatable:
          transcript = available_transcript.translate('en').fetch()
          break

    text = ""
    textTimestamps = ""
    for entry in transcript:
            start_time = seconds_to_time_format(entry['start'])
            text += entry['text'].strip() + " "
            textTimestamps += f"[{start_time}] {entry['text'].strip()}\n"

    transcript_file_name = f"{video_id}_captions.txt"
    transcript_file_name_timestamps = f"{video_id}_captions_with_timestamps.txt"

    with open(transcript_file_name, 'w', encoding='utf-8') as f:
      f.write(Text)

    with open(transcript_file_name_timestamps, 'w', encoding='utf-8') as ft:
      ft.write(TextTimestamps)

    return text, textTimestamps , transcript_file_name , transcript_file_name_timestamps

if Type == "Youtube video or playlist":
    if use_Youtube_captions:
      Text, TextTimestamps, transcript_file_name , transcript_file_name_timestamps = download_youtube_captions(URL)
      skip_transcription=True
    else:
      download_youtube_audio_only(URL)
      video_path_local_list.append(audio_path)

elif Type == "Google Drive video link":
  subprocess.run(['ffmpeg', '-y', '-i', '/gdrive/My Drive/' + URL, '-vn', '-acodec', 'pcm_s16le',
                  '-ar', '16000', '-ac', '1', 'gdrive_audio.wav'], check=True)
  video_path_local_list.append("gdrive_audio.wav")

elif Type == "Dropbox video link":
    subprocess.run(['wget', '-O', 'dropbox_video.mp4', UnprocessableEntityError], check=True)
    subprocess.run(['ffmpeg', '-y', '-i', 'dropbox_video.mp4', '-vn', '-acodec', 'pcm_s16le',
                    '-ar', '16000', '-ac', '1', 'dropbox_video_audio.wav'], check=True)
    video_path_local_list.append("dropbox_video_audio.wav")


In [None]:
%%capture
# @markdown ## Transcription using Faster Whisper
# @markdown Manually specifying the language can increase speed.

if not skip_transcription:

  language = "en" # @param {type:"string"}
  # @markdown An initial prompt with specific context-aware words and names can improve accuracy.

  initial_prompt = "" # @param {type:"string"}

  video_path_local = str(video_path_local_list[0])


  model = WhisperModel('small', device="cuda", compute_type='int8')
  segments, info = model.transcribe(str(video_path_local), beam_size=5,
                                    language=None if language == "auto" else language,
                                    task="translate",
                                    initial_prompt=initial_prompt)

  transcript_file_name = video_path_local.replace(".mp4", ".txt")
  transcript_file_name_timestamps = video_path_local.replace(".wav", "") + "Timestamps" + ".txt"

  with open(transcript_file_name, 'w') as f:
    for segment in segments:
      start_time = seconds_to_time_format(segment.start)
      Text += segment.text.strip() + " "
      TextTimestamps += f"[{start_time}] {segment.text.strip()} "

    f.write(Text)
    with open(transcript_file_name_timestamps, 'w') as ft:
      ft.write(TextTimestamps)

In [None]:
# @markdown ## Summarization and elaboration

prompt_type =  "Only grammar correction with highlights" #@param ['Summarization', 'Only grammar correction", 'Only grammar correction with highlights']

# Define your prompts (Keep them together for easier editing)
summary_prompt_1 = """Summarize the video transcript excerpt including a concise title that reflects the content. Wrap the title with **markdown bold notation**. Write the summary as if you are continuing a conversation without needing to signal a beginning. Here is the transcript: """
summary_prompt_2 = """Repeat the following text correcting any grammatical errors  or mis-transcripted word. Focus solely on the essence of the content as if you are continuing a conversation without using any form of introduction like 'Here's the corrected text:' . Here is the text to fix: """
summary_prompt_3 = """Repeat the following text correcting any grammatical errors or mis-transcripted word and **highlight the important quote with markdown bold notation**. Focus solely on the essence of the content as if you are continuing a conversation without using any form of introduction like 'Here's the corrected text:' . Here is the text to fix: """

# Display the selected prompt
if prompt_type == 'Summarization':
  summary_prompt = summary_prompt_1
elif prompt_type == 'Only grammar correction':
  summary_prompt = summary_prompt_2
else:
  summary_prompt = summary_prompt_3


def query_openai_gpt(prompt, model="gpt-3.5-turbo", max_tokens=4096):
    try:
        completion = client.chat.completions.create(
            model=model,
            messages=[
                {"role": "user", "content":  prompt}
            ],
            max_tokens=max_tokens
        )
        return completion.choices[0].message.content
    except Exception as e:  # General exception handling
        return f"An error occurred: {str(e)}"


def query_groq_api(prompt, model="mixtral-8x7b-32768", max_tokens=4096):
    chat_completion = client.chat.completions.create(
        messages=[
            {"role": "system", "content": summary_prompt},
            {"role": "user", "content": prompt}
        ],
        model=model,
        max_tokens=max_tokens
    )
    return chat_completion.choices[0].message.content


def summarize(prompt):
    if OpenAI_endpoint:
      return query_openai_gpt(summary_prompt + prompt)
    else:
      return query_groq_api(prompt)


# Define the TextDataset
class TextDataset(Dataset):
    def __init__(self, texts):
        self.texts = texts

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        return self.texts[idx]

# Function to extract timestamp ranges
def extract_timestamp_ranges(text_timestamp_chunks):
    timestamp_pattern = re.compile(r'\[(\d{2}:\d{2}:\d{2})\]')
    ranges = []

    for chunk in text_timestamp_chunks:
        matches = timestamp_pattern.findall(chunk)
        if matches:
            start_time = matches[0]
            ranges.append(f"[{start_time}]")
    return ranges

# Process and summarize text
def process_and_summarize(Text, TextTimestamps=None):
    chunk_size = 4096
    overlap_size = 40

    texts = [Text[i:i+chunk_size] for i in range(0, len(Text), chunk_size - overlap_size)]
    dataset = TextDataset(texts)
    dataloader = DataLoader(dataset, batch_size=1, shuffle=False)

    summary = ''
    summaryTimestamps = ''

    if TextTimestamps:
        ratio = len(TextTimestamps) / len(Text)
        timestamps_chunk_size = int(chunk_size * ratio)
        timestamps_overlap_size = int(overlap_size * ratio)
        text_timestamps_chunks = [TextTimestamps[i:i+timestamps_chunk_size] for i in range(0, len(TextTimestamps), timestamps_chunk_size - timestamps_overlap_size)]
        timestamp_ranges = extract_timestamp_ranges(text_timestamps_chunks)

    ts_idx = 0

    for idx, batch in enumerate(dataloader):
        text_chunk = batch[0]
        summarized_chunk = summarize(text_chunk)
        summary += summarized_chunk + "\n"

        if TextTimestamps and idx < len(timestamp_ranges):
            newPiece = timestamp_ranges[idx] + " " + summarized_chunk + "\n\n"

        else:
            newPiece = summarized_chunk + "\n"
        print(newPiece)
        summaryTimestamps += newPiece
    # Save the final summar
    final_name =  transcript_file_name.replace(".txt", "_SUMMARY.txt") if Type != "Dropbox video link" else "summary_dropbox_video_audio.txt"
    with open(final_name, 'w') as f:
        f.write(summaryTimestamps)

process_and_summarize(Text, TextTimestamps)