# AI powered Highlight Clipper
Use the whisper API to get transcipts and then use ChatGPT for sentiment analysis to get if its interesting or not.

In [None]:
%pip install openai
%pip install natsort
%pip install yt-dlp

In [None]:
import config

import pytube
import yt_dlp

import requests
import subprocess
import json
import os
import shutil
import ast

import openai

from natsort import natsorted

In [None]:
openai.api_key = config.OAI_API_TOKEN

In [None]:
# YouTube video URL
vid_url = "https://www.youtube.com/watch?v=oUrdoDs4q2Y"

# how long the testing clips should be
testing_clips_time = 20

# How high the sentiment has to be for it to be in the video
sentiment_score = 0.6
max_token_count = 3000
custom_prompt = "evaluate each sentence's entertainment value for a casual YouTube viewer"

# debug
delete_directories = True

In [None]:
# Get the audio stream
try:
    video_info = yt_dlp.YoutubeDL({"format": "bestaudio[ext!=webm]/best[ext!=webm]"}).extract_info(vid_url, download=False)
    video_stream = video_info['formats'][0]
except:
    video_info = yt_dlp.YoutubeDL({"format": "bestvideo[ext!=webm]+bestaudio[ext!=webm]/best[ext!=webm]"}).extract_info(vid_url, download=False)
    video_stream = video_info['formats'][0]

# Check if audio stream is not None before attempting to download it
if video_stream is not None:
    # Check if the file extension is a typical audio file format
    file_ext = video_stream['ext']
    if file_ext not in ['mp3', 'm4a', 'aac', 'wav', 'flac', 'ogg', 'opus']:
        file_ext = 'mp3'
    
    # Download the audio stream
    output_file_name = 'input.' + file_ext
    ydl_opts = {
        'format': 'bestaudio[ext!=webm]/best[ext!=webm]',
        'outtmpl': output_file_name,
    }
    with yt_dlp.YoutubeDL(ydl_opts) as ydl:
        ydl.download([vid_url])
else:
    print("No audio stream found.")

Break the video down into 25mb chunks

In [None]:
def split_video_by_seconds(input_file_path, chunk_duration):
    """
    Splits a video file into smaller chunks using ffmpeg.
    """
    # Get the duration of the input video
    duration_command = ["ffprobe", "-i", input_file_path, "-show_entries", "format=duration", "-v", "quiet", "-of", "csv=p=0"]
    duration_output = subprocess.check_output(duration_command, encoding="utf-8")
    input_duration = float(duration_output.strip())

    # Calculate the number of chunks and their start and end times
    chunk_start_times = range(0, int(input_duration), int(chunk_duration))
    chunk_end_times = [min(start_time + chunk_duration, input_duration) for start_time in chunk_start_times]

    # Split the video into chunks using ffmpeg
    for i, (start_time, end_time) in enumerate(zip(chunk_start_times, chunk_end_times)):
        output_file_path = f"downloads/input_{i}.mp4"
        split_command = ["ffmpeg", "-i", input_file_path, "-ss", str(start_time), "-to", str(end_time), "-c", "copy", output_file_path]
        subprocess.run(split_command, check=True)

newpath = os.getcwd() + "/downloads/"
if not os.path.exists(newpath):
    os.makedirs(newpath)

split_video_by_seconds(output_file_name, testing_clips_time)

Create metadata for the files <br/>
<sub>Start time and duration</sub>

In [None]:
newpath = os.getcwd() + "/metadata/"
if not os.path.exists(newpath):
    os.makedirs(newpath)

start_time = 0.0

for i, filename in enumerate(os.listdir('downloads')):
    md_output_file_path = f"metadata/input_{i}.txt"
    with open(md_output_file_path, 'w') as output_file:
        clip = AudioFileClip('downloads/'+filename)
        duration = clip.duration
        clip.close()
        output_file.write(f"[{start_time},{duration}]")
        start_time += duration

Input the audio into Whisper and get transcription


In [None]:
newpath = os.getcwd() + "/sentences/"
if not os.path.exists(newpath):
    os.makedirs(newpath)

url = 'https://api.openai.com/v1/audio/transcriptions'
headers = {
    'Authorization': f'Bearer {config.OAI_API_TOKEN}'
}
data = {
    'model':'whisper-1',
    'prompt':'Woah! I for a fact don\'t believe that... Okay, I know. I love those~ [silence]. [music]. [ingame_sound]'
}
for chunk_num, filename in enumerate(os.listdir('downloads')):
    with open("downloads/" + filename, "rb") as input_file:
        files = {
            'file':(filename,input_file)
        }
        transcription = requests.post(url=url, headers=headers, files=files, data=data)
        output_file_path = f"sentences/input_{chunk_num}.txt"
        with open(output_file_path, 'w', encoding="utf-8") as output_file:
            output_file.write(transcription.json().get('text'))

### ChatGPT Implementation

In [None]:
url = 'https://api.openai.com/v1/chat/completions'
headers = {
    'Authorization': f'Bearer {config.OAI_API_TOKEN}'
}

Breakdown the text to the max_token_count

In [None]:
newpath = os.getcwd() + "/temporary/"
if not os.path.exists(newpath):
    os.makedirs(newpath)

# Initialize chunk_num variable
chunk_num = 0

# Loop through os.listdir('sentences')
for filename in natsorted(os.listdir("sentences")):
    with open(os.path.join("sentences", filename), "r", encoding="utf-8") as chunk_file, \
         open(os.path.join("metadata", filename), "r", encoding="utf-8") as timestamp_file:

        # Create a new file in the newly created temporary folder
        temporary_file_path = os.path.join(os.getcwd() + "/temporary/", f"input_{chunk_num}.txt")

        # Initialize word count
        word_count = 0
        with open(temporary_file_path, "a", encoding="utf-8") as temp_file:
                temp_file.write("[")

        # Get metadata and sentence
        zipped_content = list(zip(timestamp_file, chunk_file))
        for i, (metadata, sentence) in enumerate(zipped_content):
            # Split sentence into words
            words = sentence.split()

            # Check if adding the current sentence would exceed the 3000 words limit
            if word_count + len(words) > max_token_count:
                with open(temporary_file_path, 'rb+') as filehandle:
                    filehandle.seek(-1, os.SEEK_END)
                    filehandle.truncate()
                with open(temporary_file_path, "a") as temp_file:
                    temp_file.write("]")
                # Increment chunk_num and create a new file
                chunk_num += 1
                temporary_file_path = os.path.join(os.getcwd() + "/temporary/", f"input_{chunk_num}.txt")
                with open(temporary_file_path, "a", encoding="utf-8") as temp_file:
                    temp_file.write("[")
                word_count = 0

            # Add metadata and sentence to the file
            with open(temporary_file_path, "a", encoding="utf-8") as temp_file:
                # Add a comma if not the first element in the file (to avoid extra comma at the end)
                temp_file.write(f"[{metadata.strip()},\"{sentence.strip()}\"],")

            # Update word_count
            word_count += len(words)
            
with open(temporary_file_path, 'rb+') as filehandle:
    filehandle.seek(-1, os.SEEK_END)
    filehandle.truncate()
with open(temporary_file_path, "a") as temp_file:
    temp_file.write("]")

Use ChatGPT to break down the text and meta data into workable chunks

In [None]:
newpath = os.getcwd() + "/video_transcription/"
if not os.path.exists(newpath):
    os.makedirs(newpath)

for chunk_num, filename in enumerate(os.listdir('temporary')):
    with open(f"temporary/input_{chunk_num}.txt","r", encoding="utf-8") as input_file:
        data = {
            "model": "gpt-3.5-turbo",
            "messages": [{"role": "user", "content": f"Using this list of [start_time, duration, video_transcription]. Respond with only a new list of chunks of the text you as an AI model find appropriately fits in speaking english by combinding list elements and then getting the new start_time by getting the first start_time of that chunk, then get the duration by adding the duration times. Get the new start and duration together to create the new list of [start_time, duration, chunk]. DO NOT give any answer outisde of the format. \n\"{input_file.read()}\""}] 
        }
        response = requests.post(url=url, headers=headers, json=data)
        created_chunks = response.json().get('choices')[0].get('message').get('content')
        with open(f"video_transcription/input_{chunk_num}.txt", "w", encoding="utf-8") as vidtrans:
            vidtrans.write(created_chunks)

Get sentiment from ChatGPT and create sentiment analysis file

In [None]:
newpath = os.getcwd() + "/sentiment_analysis/"
if not os.path.exists(newpath):
    os.makedirs(newpath)

for chunk_num, filename in enumerate(os.listdir('video_transcription')):
    with open(f"video_transcription/input_{chunk_num}.txt", "r", encoding="utf-8") as input_file:
        data = {
            "model": "gpt-3.5-turbo",
            "messages": [{"role": "user", "content": f"Given a list of video segments in the format [start_time, duration, sentence]. {custom_prompt}. Assign a float score representing the entertainment value, with higher values indicating more entertaining content. Provide the results as a list in the format [start_time, duration, sentence, float_score]. DO NOT give any answer outisde of the format. \n\"{input_file.read()}\""}] 
        }
        response = requests.post(url=url, headers=headers, json=data)
        sentiment = response.json().get('choices')[0].get('message').get('content')
        with open(f"sentiment_analysis/input_{chunk_num}.txt","w", encoding="utf-8") as sentanal: # hehe
            sentanal.write(sentiment)

Create Video from choosen segments

In [None]:
ydl_opts = {
    'format': 'bestvideo[ext=avi]+bestaudio[ext=m4a]/best[ext=avi]/best',
    'outtmpl': 'input_video.avi'
}

with yt_dlp.YoutubeDL(ydl_opts) as ydl:
    ydl.download([vid_url])

# Run ffprobe command to get video metadata
ffprobe_command = [
    'ffprobe',
    '-v', 'quiet',
    '-print_format', 'json',
    '-show_streams',
    'input_video.avi'
]

# Capture the output of the ffprobe command
result = subprocess.run(ffprobe_command, capture_output=True, text=True)

# Parse the JSON output
metadata = json.loads(result.stdout)

# Extract the fps from the metadata
fps = None
for stream in metadata['streams']:
    if stream['codec_type'] == 'video':
        # Calculate fps from r_frame_rate (numerator/denominator)
        numerator, denominator = map(int, stream['r_frame_rate'].split('/'))
        fps = numerator / denominator
        break   

In [None]:
newpath = os.getcwd() + "/videos/"
if not os.path.exists(newpath):
    os.makedirs(newpath)

for chunk_num, filename in enumerate(os.listdir('sentiment_analysis')):
    with open(f"sentiment_analysis/input_{chunk_num}.txt","r", encoding="utf-8") as sentanal:
        for i, sentlist in enumerate(ast.literal_eval(sentanal.read())):
            if(float(sentlist[3]) >= sentiment_score):
                output_file_path = f"videos/output_{i}.avi"
                split_command = ["ffmpeg", "-i", "input_video.avi", "-framerate", str(fps), "-ss", str(sentlist[0]), "-t", str(sentlist[1]), "-c:v", "libx264", "-crf", "23", "-c:a", "copy", output_file_path]
                try:
                    subprocess.run(split_command, check=True, capture_output=True, text=True)
                except subprocess.CalledProcessError as e:
                    print(f"Error occurred while running command: {e}")
                    print(f"Output: {e.output}")
                    print(f"Error output: {e.stderr}")

Combine the videos

In [None]:
# Create a list of commands to concatenate the videos
commands = ['ffmpeg', '-i', f'concat:{"|".join(["videos/" + x for x in natsorted(os.listdir("videos"))])}', '-c', 'copy', 'output.avi']

# Run the ffmpeg command and capture the output and error messages
subprocess.run(commands, capture_output=True, text=True)

Delete directories

In [None]:
if(delete_directories):
    newpath = ["/metadata/","/downloads/","/sentences/","/videos/","/video_transcription","/sentiment_analysis","/temporary/"]
    for i in newpath:
        if os.path.exists(os.getcwd() + i):
            shutil.rmtree(os.getcwd() + i)

    newfilepath = [output_file_name,"/input_video.avi"]
    for i in newfilepath:
        if os.path.exists(os.getcwd() + i):
            os.remove(os.getcwd() + i)