## **0️⃣ Table Of Contents**
1. [Install & Download](#install-download)  
2. [Initialize Models](#initialize-models)  
3. [Pipeline](#pipeline)  
4. [Backend](#backend)  

## **1️⃣ Install & Download** <a id="install-download"></a>

In [1]:
from IPython.display import clear_output

!pip install -U yt-dlp
!pip install -U openai-whisper
!pip install scenedetect
!pip install -q transformers accelerate pillow
!pip install -U bitsandbytes
!pip install -U transformers
!pip install accelerate
!pip install nbformat
!pip install sentence_transformers
!pip install huggingface_hub[hf_xet]
!pip install flask flask-ngrok
!pip install pyngrok flask
!pip install flask-cors
!pip install flask flask-ngrok
!pip install pyngrok flask
!pip install flask-cors
!pip install mysql-connector-python

clear_output()

In [2]:
import subprocess
import os
import whisper
import pandas as pd
from glob import glob
from huggingface_hub import login
from PIL import Image
from transformers import AutoProcessor, AutoModelForVision2Seq
import torch
import json
import re
import datetime
from typing import List, Dict, Any, Protocol
from tqdm import tqdm
import nbformat
from nbformat.v4 import new_notebook, new_markdown_cell, new_code_cell
from transformers import AutoTokenizer, AutoModelForCausalLM
!export TRANSFORMERS_VERBOSITY=error
import copy
from transformers import pipeline
from sentence_transformers import SentenceTransformer, util
import nltk
nltk.download('stopwords')
from pyngrok import ngrok
from flask import Flask, request, jsonify
from flask_cors import CORS
from flask import Flask, send_file, request, jsonify
from flask_cors import CORS
from pyngrok import ngrok
import time, threading
import mysql.connector

clear_output()

## **2️⃣ Initialize Models** <a id="initialize-models"></a>

In [None]:
jf_token = '' # write you hf token

login(jf_token) # Sometimes doesn't work for some reason. Thus, I also pass the token whenever I need to download anything from hf

asr_model = whisper.load_model("base").to("cuda")

vlm_model_id = "HuggingFaceTB/SmolVLM-256M-Instruct"
vlm_processor = AutoProcessor.from_pretrained(vlm_model_id, use_auth_token=jf_token)
vlm_model = AutoModelForVision2Seq.from_pretrained(vlm_model_id, torch_dtype=torch.float16, device_map="auto", use_auth_token=jf_token)

class LLMModelAPI(Protocol):
    def __init__(self, model_name):
        self.tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=True, use_auth_token=jf_token)
        self.model = AutoModelForCausalLM.from_pretrained(
            model_name,
            device_map="auto",
            #load_in_4bit=True,
            torch_dtype=torch.float16,
            use_auth_token=jf_token
        )
        self.device = next(self.model.parameters()).device

    def generate_text(self, prompt: str, max_new_tokens:int = 512, do_sample:bool=False) -> str:
        enc = self.tokenizer(prompt, return_tensors="pt").to(self.device)
        gen_kwargs = dict(
            input_ids=enc["input_ids"],
            attention_mask=enc.get("attention_mask"),
            max_new_tokens=max_new_tokens,
            do_sample=do_sample,
            eos_token_id=self.tokenizer.eos_token_id,
            pad_token_id=self.tokenizer.pad_token_id,
        )
        out = self.model.generate(**gen_kwargs)
        text = self.tokenizer.decode(out[0], skip_special_tokens=True)
        if text.startswith(prompt):
            text = text[len(prompt):].strip()
        return text

    def close(self):
        del self.model

llm_model = LLMModelAPI("Qwen/Qwen2.5-14B-Instruct-1M")
#llm_model = LLMModelAPI("Qwen/Qwen2.5-7B-Instruct-1M")

embedder = SentenceTransformer("all-MiniLM-L6-v2", token=jf_token)

clear_output()

## **3️⃣ Pipeline** <a id="pipeline"></a>

In [4]:
def download_video(url):
  !rm -rf downloaded

  # download full video for the VLM model
  video_result = subprocess.run([
      'yt-dlp',
      '-f', 'best[ext=mp4]',
      '-o', os.path.join('downloaded', 'video.mp4'),
      url.split('&')[0]
  ])

  # download audio only for the ASR model
  audio_result = subprocess.run([
      'yt-dlp',
      '-x', '--audio-format', 'mp3',
      '-o', os.path.join('downloaded', 'audio.mp3'),
      '--postprocessor-args', '-ac 1',# mono
      url.split('&')[0]
  ])

  return video_result.returncode == 0 and audio_result.returncode == 0

def transcript_extraction():
  """
    This method converts the audio file into transcripts and return a list of
    segments, similar to the following format:
        [
          {
            'start' : 0.5,
            'end' : 2.7,
            'text' : 'LMH is a beautiful place.'
          },
          {
            'start' : 3.3,
            'end' : 5,
            'text' : 'It is filled with interesting activities.'
          }
        ]
  """
  return asr_model.transcribe("/content/downloaded/audio.mp3", word_timestamps=True)['segments']

def keyframes_extraction():
  """
    This method extracts the keyframes of the downloaded video, and returns a list as follows:
        [
          {'time': 0.0, 'image_path': 'keyframes/video-Scene-001-03.jpg'},
          {'time': 7.875, 'image_path': 'keyframes/video-Scene-002-02.jpg'},
          {'time': 19.987, 'image_path': 'keyframes/video-Scene-003-02.jpg'}
        ]
  """
  # delete the previous keyframes
  !rm -rf keyframes

  # extract the keyframes
  !scenedetect -i "/content/downloaded/video.mp4" detect-content list-scenes --output keyframes save-images --output keyframes

  df = pd.read_csv("keyframes/video-Scenes.csv", skiprows=1)
  keyframes = []

  # attach an approximate time for each keyframe
  for _, row in df.iterrows():
      current_scene = row['Scene Number'] # the number of the current scene
      start = row['Start Time (seconds)'] # the beginning time of the current scene
      length = row['Length (seconds)'] # the duration of the current scene
      frames = glob(f'keyframes/video-Scene-{current_scene:03}-*') # list of the current scene's keyframes
      for i, frame in enumerate(frames):
          keyframes.append({
              # this is the linear interpolation method (LERP)
              # it's simply divides the duration between all the current scene's keyframes linearly
              # for example, let:
              #     start = 1.2
              #     length = 5.8
              #     len(frames) = 5
              # then, the time of each keyframe is: 1.2, 2.65, 4.1, 5.55, 7, respectfully
              'time' : start + length * (i / (len(frames) - 1) if len(frames) > 1 else 1),
              'image_path' : frame
          })
          break # for simplicity, I am only taking the first keyframe of each scene

  return keyframes

def chunkig(segments, keyframes):
  """
    This method takes inputs from both the "transcript_extraction()" and "keyframes_extraction()" methods
    and attach each keyframe to a group of text segments that were mentioned in a cloose time period
    the output is as follows:
        [
          {
            'start': np.float64(0.0),
            'end': np.float64(5.14),
            'speech': ' Python, a high-level, interpreted programming language famous for its zen-like code.',
            'keyframe_path': 'keyframes/video-Scene-001-03.jpg'
          },
          {
            'start': np.float64(5.32),
            'end': np.float64(14.44),
            'speech': " It's arguably the most popular language in the world because it's easy to learn,  yet practical for serious projects. In fact, you're watching this YouTube video in a Python web",
            'keyframe_path': 'keyframes/video-Scene-002-02.jpg'
          }
        ]
  """

  # if there is only one keyframe, return one chunk
  if len(keyframes) == 1:
      full_speech = " ".join(s['text'] for s in segments)
      return [{
          "start": segments[0]['start'],
          "end": segments[-1]['end'],
          "speech": full_speech,
          "keyframe_path": keyframes[0]['image_path']
      }]

  # create a list for each keyframe to hold its corresponding text segments
  chunk_per_keyframe = [[] for _ in keyframes]
  for segment in segments:
    segment_midpoint = (segment['start'] + segment['end']) / 2.0

    distances = [abs(segment_midpoint - kf['time']) for kf in keyframes]
    closest_keyframe_idx = distances.index(min(distances))

    chunk_per_keyframe[closest_keyframe_idx].append(segment)

  # merge the text segments that corresponds to a similar keyframe together
  final_chunks = []
  for i, segments in enumerate(chunk_per_keyframe):
    if not segments:
        continue
    segments.sort(key=lambda s: s['start'])

    chunk_speech = " ".join(s['text'] for s in segments)

    final_chunks.append({
        "start": segments[0]['start'],
        "end": segments[-1]['end'],
        "speech": chunk_speech,
        "keyframe_path": keyframes[i]['image_path']
    })

  return final_chunks

def visual_information_extraction(chunks):
  """
    This method takes the list of chunks produced by the "chunkig(segments, keyframes)" method
    as input and returns a version of chunks that contain a text description for each keyframe.
    the output format is as follows:
        [
          {
            'start': np.float64(0.0),
            'end': np.float64(5.14),
            'speech': ' Python, a high-level, interpreted programming language famous for its zen-like code.',
            'keyframe_path': 'keyframes/video-Scene-001-03.jpg',
            'keyframe_description': 'The visual content in this keyframe is a VIBE (Visual Basic) learning chart. The chart is divided into five sections, each representing a different language. The chart is titled "ViB" and is labeled as "ViB" in the bottom right corner.\n\nThe chart is divided into five sections:\n1. Python\n2. C\n3. Java\n4. C++\n5. Visual Basic\n\nEach section is labeled with a number and a percentage value. The percentages are:\n- 11.27% for Python\n- 10.46% for C\n- 10.16% for C++\n- 10.04% for Java\n- 10.02% for C++\n- 10.01% for Visual Basic\n\nThe chart is designed to help learners understand the differences between different programming languages. The percentages indicate the percentage of each language being'
          },
          {
            'start': np.float64(5.32),
            'end': np.float64(14.44),
            'speech': " It's arguably the most popular language in the world because it's easy to learn,  yet practical for serious projects. In fact, you're watching this YouTube video in a Python web",
            'keyframe_path': 'keyframes/video-Scene-002-02.jpg',
            'keyframe_description': 'The keyframe shows a monitor screen displaying a sequence of numbers and letters. The sequence includes the numbers 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 4'
          }
        ]
  """
  # Loop through keyframes and get description
  for item in chunks:
    img = Image.open(item["keyframe_path"]).convert("RGB")

    messages = [{
        "role": "user",
        "content": [
            {"type": "image", "image": img},
            {"type": "text", "text": "Describe the visual content in this educational keyframe. Mention diagrams, equations, and the main topic."}
        ],
    }]

    text = vlm_processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
    inputs = vlm_processor(text=[text], images=[img], return_tensors="pt").to('cuda')

    out = vlm_model.generate(**inputs, max_new_tokens=200)
    desc = vlm_processor.batch_decode(out, skip_special_tokens=True)[0].split("Assistant:", 1)[1].strip()
    item["keyframe_description"] = desc

  return chunks

def rewrite_transcription(_chunks):
  """
    just rewrites the transcripts using the llm model for better quality.
    this method takes the input from the method: "visual_information_extraction(chunks)",
    and return an output with the exact same format.
  """
  chunks = copy.deepcopy(_chunks)
  rewrite_prompt = f'You are given a text transcription from a lecture video, clean it if needed and reorganize it. Do not add anything extra, just the clearified text. The transcription: "<TEXT>".\n\n Your clearified version:'
  for i in range(len(chunks)):
      prompt = rewrite_prompt.replace("<TEXT>", chunks[i]["speech"])
      rewritten = llm_model.generate_text(prompt)
      chunks[i]["speech"] = rewritten
  return chunks

def blend(_chunks):
  """
    this method combined/blends the visual information and the transcriptions together using the llm model.
    it takes the input from the method: "rewrite_transcription(_chunks)", and return a list as follows:
        [
          {
            'start': np.float64(0.0),
            'end': np.float64(5.14),
            'speech': 'Python is a high-level, interpreted programming language known for its clean and elegant code.',
            'keyframe_path': 'keyframes/video-Scene-001-03.jpg',
            'keyframe_description': 'The visual content in this keyframe is a VIBE (Visual Basic) learning chart. The chart is divided into five sections, each representing a different language. The chart is titled "ViB" and is labeled as "ViB" in the bottom right corner.\n\nThe chart is divided into five sections:\n1. Python\n2. C\n3. Java\n4. C++\n5. Visual Basic\n\nEach section is labeled with a number and a percentage value. The percentages are:\n- 11.27% for Python\n- 10.46% for C\n- 10.16% for C++\n- 10.04% for Java\n- 10.02% for C++\n- 10.01% for Visual Basic\n\nThe chart is designed to help learners understand the differences between different programming languages. The percentages indicate the percentage of each language being',
            'blended_text': '"Python is a high-level, interpreted programming language known for its clean and elegant code. In the context of the VIBE learning chart displayed, Python accounts for 11.27% of the total representation, highlighting its significance among other languages like C, Java, C++, and Visual Basic."'
          },
          {'start': np.float64(5.32),
            'end': np.float64(14.44),
            'speech': '"It is arguably the most popular language in the world because it is easy to learn yet practical for serious projects. In fact, you are watching this YouTube video on a Python web platform."',
            'keyframe_path': 'keyframes/video-Scene-002-02.jpg',
            'keyframe_description': 'The keyframe shows a monitor screen displaying a sequence of numbers and letters. The sequence includes the numbers 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 4',
            'blended_text': '"It is arguably the most popular language in the world because it is easy to learn yet practical for serious projects. In fact, you are watching this YouTube video on a Python web platform, which is displayed on the monitor showing a sequence of numbers and letters."'
          }
        ]
  """
  chunks = copy.deepcopy(_chunks)
  blend_prompt = f'You are given a text transcription from a lecture video, alongside the visual description of the lecture, your task is to blend both of these texts and generate a useful new text that emphasizes both the transcription and the visual description. Do not add anything extra, just the clearified text. The transcription: "<TRANSCRIPTION>".\n\n The visual descriptoin: "<VISUAL_DESCRIPTION>".\n\n The blended version: '
  for i in range(len(chunks)):
      prompt = blend_prompt.replace("<TRANSCRIPTION>", chunks[i]["speech"]).replace("<VISUAL_DESCRIPTION>", chunks[i]["keyframe_description"])
      blended = llm_model.generate_text(prompt)
      chunks[i]["blended_text"] = blended
  return chunks

def segment_blended_text(obj, alpha=0.6):
  blended_texts = [obj[i]["blended_text"] for i in range(len(obj))]
  embeddings = embedder.encode(blended_texts, convert_to_tensor=True)
  segments = []
  current_segment = [blended_texts[0]]

  for i in range(1, len(blended_texts)):
      sim = util.cos_sim(embeddings[i], embeddings[i-1]).item()
      if sim > alpha:
          current_segment.append(blended_texts[i])
      else:
          segments.append(current_segment)
          current_segment = [blended_texts[i]]

  segments.append(current_segment)

  return [' '.join(joined) for joined in segments]

def write_notebook(segments, notebook_name):
  INTRO_PROMPT = r"""
  Write the introduction part of a jupyter notebook given the following text:

  <SEGMENT>
  {segment_text}
  </SEGMENT>

  Task:
  1) Produce **rich markdown** cells describing:
    - What this notebook is about (summary/purpose).
    - Who it's for and prerequisites.
    - Key learning outcomes (bulleted).
    - A short suggested roadmap (2-4 items).
  2) **Always** write formulas, if there are any, in the correct markdown format using $$.
  3) Use headers to splits topics.
  4) Don't write too much, stop when needed.
  5) If you wrote summary, called it abstract instead.
  6) Each markdown cell should be written like the following:
  start+++markdown
  contents...
  end+++markdown

  Your response:
  """

  SEGMENT_PROMPT = r"""
  <FINISHED PART>
  {finished_part}
  </FINISHED PART>

  Above the finished parts of a jupyter notebook. Complete the notebook by writing cells that explain the following text segment in details with examples and quizes and using formulas, all as needed:

  <SEGMENT>
  {segment_text}
  </SEGMENT>

  Task:
  1) Produce **markdown** or **code** cells describing everything was mentioned in this segment of text.
  2) **Always** write formulas, if there are any, in the correct markdown format using $$.
  3) Use headers to splits topics.
  4) Write codes if needed.
  5) Don't write too much, stop when needed.
  6) Only add more to the notebook, but don't repeat what was previously mentioned and explained.
  7) Do not write summaries at all.
  8) Each markdown cell should be written like the following:
  start+++markdown
  contents...
  end+++markdown

  Your response:
  """

  EXTRO_PROMPT = r"""
  <FINISHED PART>
  {finished_part}
  </FINISHED PART>

  Given the above finished part of a jupyter notebook, produce cells for the conclusion part that summarize all the previous parts.

  Task:
  1) **Always** write formulas, if there are any, in the correct markdown format using $$.
  2) Use headers to splits topics.
  3) Don't write too much, stop when needed.
  4) Each markdown cell should be written like the following:
  start+++markdown
  contents...
  end+++markdown

  Your response:
  """

  # create a notebook object
  nb = new_notebook()
  nb["metadata"] = {
      "created_by": "The best project ever!",
      "created_at": datetime.datetime.utcnow().isoformat() + "Z",
  }
  nb["cells"] = []

  # iterate through all the segments and combined them to create the introductory section
  excerpts = []
  for seg in tqdm(segments, leave=False):
      excerpts.append(seg.strip())
  joined = "\n\n---\n\n".join(excerpts)
  prompt = INTRO_PROMPT.format(segment_text=joined)

  # this is the introduction
  finished = llm_model.generate_text(prompt, max_new_tokens=4096, do_sample=True)

  # add cells to the notebook object
  # each generated cell is encapsulated in start+++markdown ... end+++markdown or start+++code ... end+++code, so the
  # following code extract all the cells, and iterate through them
  for i, (t, cell) in enumerate(re.findall(r"start\+\+\+(markdown|code)\s*(.*?)end\+\+\+[markdown|code]", finished, re.DOTALL), 1):
    # add cell with the correct type (code or markdown)
    nb["cells"].append(new_markdown_cell(cell) if t == 'markdown' else new_code_cell(cell))

  # create the notebook part for each segment
  for seg in tqdm(segments, leave=False):
    # the generated segment
    out = llm_model.generate_text(SEGMENT_PROMPT.format(segment_text=seg.strip(), finished_part=finished.strip()), max_new_tokens=4096, do_sample=True)
    finished += '\n' + out
    # each generated cell is encapsulated in start+++markdown ... end+++markdown or start+++code ... end+++code, so the
    # following code extract all the cells, and iterate through them
    for i, (t, cell) in enumerate(re.findall(r"start\+\+\+(markdown|code)\s*(.*?)end\+\+\+[markdown|code]", out, re.DOTALL), 1):
        # add cell with the correct type (code or markdown)
        nb["cells"].append(new_markdown_cell(cell) if t == 'markdown' else new_code_cell(cell))

  extro = llm_model.generate_text(EXTRO_PROMPT.format(finished_part=finished.strip()), max_new_tokens=4096, do_sample=True)
  for i, (t, cell) in enumerate(re.findall(r"start\+\+\+(markdown|code)\s*(.*?)end\+\+\+[markdown|code]", extro, re.DOTALL), 1):
    nb["cells"].append(new_markdown_cell(cell) if t == 'markdown' else new_code_cell(cell))

  # save the generated notebook at disk
  nbformat.write(nb, notebook_name)

## **4️⃣ Backend** <a id="backend"></a>

In [None]:
ngrok.set_auth_token("") # Write your auth token

# "status": "running", "ready", or "idle"
progress = {"status": "idle", "percentage": "0.0%", 'details' : 'downloading the video...'}
notebook_name = ''

app = Flask(__name__)
CORS(app)

@app.route("/process", methods=["POST"])
def process():
    global progress
    if progress["status"] != 'idle':
        return jsonify({"message": "The server is busy!"})

    data = request.json
    user_input = data.get("input", "")
    global notebook_name
    notebook_name = user_input.split('v=')[1]  + '.ipynb'

    def background_job(url, notebook_name):
        global progress
        try:
            progress["status"] = 'running'

            progress["percentage"] = '0.0%'
            progress["details"] = 'checking the cash...'
            conn = mysql.connector.connect(
            host="srv1798.hstgr.io",
            user="u370625083_v2n",
            password="v2n_ABCD",
            database="u370625083_v2n"
            )
            cursor = conn.cursor()
            sql = "SELECT id FROM cashed_notebooks WHERE url = %s"
            cursor.execute(sql, (url,))
            repeated = cursor.fetchone()
            cursor.close()
            conn.close()

            if repeated:
                progress["percentage"] = '50%'
                progress["details"] = 'downloading cash...'
                conn = mysql.connector.connect(
                host="srv1798.hstgr.io",
                user="u370625083_v2n",
                password="v2n_ABCD",
                database="u370625083_v2n"
                )
                cursor = conn.cursor()
                sql = "SELECT notebook FROM cashed_notebooks WHERE url = %s"
                cursor.execute(sql, (url,))
                result = cursor.fetchone()
                with open(notebook_name, "wb") as f:
                    f.write(result[0])
                cursor.close()
                conn.close()
                progress["percentage"] = '100%'
                progress["details"] = 'finished!'
                progress["status"] = "ready"
                return

            progress["percentage"] = '9%'
            progress["details"] = 'downloading the video...'
            download_status = download_video(url)
            if not download_status:
              print('can\'t download the video!')
              progress["status"] = "idle"
              return

            progress["percentage"] = '18%'
            progress["details"] = 'extracting transcripts...'
            segments = transcript_extraction()

            progress["percentage"] = '27%'
            progress["details"] = 'extracting keyframes...'
            keyframes = keyframes_extraction()

            progress["percentage"] = '36%'
            progress["details"] = 'chunkig...'
            segments = chunkig(segments, keyframes)

            progress["percentage"] = '45%'
            progress["details"] = 'extracting visual information...'
            segments = visual_information_extraction(segments)

            progress["percentage"] = '54%'
            progress["details"] = 'rewriting transcriptions...'
            segments = rewrite_transcription(segments)

            progress["percentage"] = '63%'
            progress["details"] = 'belnding...'
            segments = blend(segments)

            progress["percentage"] = '72%'
            progress["details"] = 'resegmenting...'
            segments = segment_blended_text(segments, alpha=0.6)

            progress["percentage"] = '81%'
            progress["details"] = 'generating notebook...'
            write_notebook(segments, notebook_name)

            if not repeated:
                progress["percentage"] = '90%'
                progress["details"] = 'cashing...'
                conn = mysql.connector.connect(
                host="srv1798.hstgr.io",
                user="u370625083_v2n",
                password="v2n_ABCD",
                database="u370625083_v2n"
                )
                cursor = conn.cursor()

                with open(notebook_name, "rb") as f:
                    file_data = f.read()
                sql = "INSERT INTO cashed_notebooks (url, notebook) VALUES (%s, %s)"
                cursor.execute(sql, (url, file_data))
                conn.commit()

                cursor.close()
                conn.close()

            progress["percentage"] = '100%'
            progress["details"] = 'finished!'
            progress["status"] = "ready"

        except Exception as ex:
          progress["status"] = "idle"
          print(f'ex:\n{ex.message}')

    # Run job in separate thread
    threading.Thread(target=background_job, args=(user_input, notebook_name)).start()

    return jsonify({"message": "Job started!"})

@app.route("/status")
def get_status():
    return jsonify(progress)

@app.route("/download")
def download():
    time.sleep(1)
    global progress
    progress["status"] = 'idle'
    global notebook_name
    return send_file(notebook_name, as_attachment=True)

public_url = ngrok.connect(5000, domain='') # write your public domain
print("Public URL:", public_url)
app.run(port=5000)