In [None]:
from pytube import YouTube
import ffmpeg
import threading
import os

In [None]:
video_path = './video_files/'
audio_path = './audio_files/'
final_path = './final_files/'
caption_path = './captions/'

In [None]:
link = input("Enter a YouTube link: ")
yt_obj = YouTube(link)

In [None]:
#Title of video
print("Title: ",yt_obj.title)
#Number of views of video
print("Number of views: ",yt_obj.views)
#Length of the video
print("Length of video: ",yt_obj.length,"seconds")
#Rating
print("Ratings: ",yt_obj.rating)

In [None]:
filter_set = 'abcdefghijklmnopqrstuvwxyz'
filter_set = filter_set + filter_set.upper() + ''.join(list(map(str, range(0, 10)))) + ' '

title = '_'.join(''.join([x for x in yt_obj.title if x in filter_set]).split())

print(title)

In [None]:
video_files = list()
audio_files = list()

for file in yt_obj.streams.filter(adaptive=True):

    if 'video' in file.mime_type:
        if int(file.resolution[:-1]) >= 1080:
            video_files.append(file)
    else:
        audio_files.append(file)

In [None]:
best_video = video_files[0]

for video in video_files[1:]:

    if int(video.resolution[:-1]) > int(best_video.resolution[:-1]) and int(video.fps[:-3]) > int(best_video.fps[:-3]):
        best_video = video

best_audio = audio_files[0]

for audio in audio_files[1:]:

    if int(audio.abr[:-4]) > int(best_audio.abr[:-4]):
        best_audio = audio

print(best_video, best_audio, sep='\n')

In [None]:
message = ''

def progress_callback(stream, chunk, bytes_remaining):

    try:
        size = video.filesize
        message = 'Video File Downloaded'
    except Exception as e:
        size = audio.filesize
        message = 'Audio File Downloaded'

    progress = int(((size*10 - bytes_remaining)) / 1000000)
    print(f'Total: {int(size / 100000)}MB | {progress}MB Completed', end='\r')

def complete_callback(stream, file_handle):
    print(message)

yt_obj.register_on_progress_callback(progress_callback)
yt_obj.register_on_complete_callback(complete_callback)

caption_track = None

for caption in yt_obj.caption_tracks:

    if 'en' in caption.code and 'auto' not in caption.name:
        caption_track = yt_obj.captions[caption.code]

        with open(caption_path+title+'.srt', 'w') as fp:
            fp.write(caption_track.generate_srt_captions())

best_video.download(output_path=video_path, filename=title+'.'+best_video.mime_type[6:])
print()
best_audio.download(output_path=audio_path, filename=title+'.'+best_audio.mime_type[6:])

In [None]:
def start_ffmpeg_thread(audio_part, video_part, path, caption_part):
    threading.Thread(target=start_ffmpeg, args=(audio_part, video_part, path, caption_part)).start()

def start_ffmpeg(audio_part, video_part, path, caption_part):
    #while True:
    #    if is_cancelled:
    #        break
    global ffmpeg_process

    # Allow only one instance of FFmpeg to be executed - ffmpeg_process = None in the first time, and ffmpeg_process.poll() is not None when FFmpeg is not running
    if (ffmpeg_process is None) or ffmpeg_process.poll():
        threading.Thread(target=ffmpeg_func, args=(audio_part, video_part, path, caption_part)).start()

def ffmpeg_func(audio_part, video_part, path, caption_part):
    global ffmpeg_process
    #ffmpeg.output(audio_part, video_part, path).run(overwrite_output=True)
    if caption_part:
        ffmpeg_process = ffmpeg.output(audio_part, video_part, caption_part, path, acodec='copy', vcodec='copy').overwrite_output().run_async(pipe_stdin=True)
    else:
        ffmpeg_process = ffmpeg.output(audio_part, video_part, path, acodec='copy', vcodec='copy').overwrite_output().run_async(pipe_stdin=True)
    
def cancel_ffmpeg():
    #global is_cancelled
    #is_cancelled = True
    global ffmpeg_process

    #Check if FFmpeg sub-process is running
    if (ffmpeg_process is not None) and (ffmpeg_process.poll() is None):
        # Terminate FFmpeg gracefully
        ffmpeg_process.stdin.write('q'.encode("GBK"))  # Simulate user pressing 'q' key
        ffmpeg_process.communicate()
        ffmpeg_process.wait()
        ffmpeg_process = None

#is_cancelled = False
ffmpeg_process = None

video_stream = ffmpeg.input(video_path+title+'.'+best_video.mime_type[6:])
audio_stream = ffmpeg.input(audio_path+title+'.'+best_audio.mime_type[6:])

path = final_path+title+'.'+best_video.mime_type[6:]

caption_stream = None

if title+'.srt' in os.listdir(caption_path):
    caption_stream = ffmpeg.input(caption_path+title+'.srt')
    start_ffmpeg_thread(audio_stream, video_stream, path, caption_stream)
else:
    start_ffmpeg_thread(audio_stream, video_stream, path, caption_stream)