In [3]:
import tempfile
import random
import os
import time
import json
import re
import functools
import operator
from os import path

import google_auth_oauthlib.flow
from google.oauth2.credentials import Credentials
from google.cloud import texttospeech

import googleapiclient.discovery
from googleapiclient.http import MediaFileUpload

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC

from webdriver_manager.chrome import ChromeDriverManager

from youtube_transcript_api import YouTubeTranscriptApi
from youtube_transcript_api.formatters import SRTFormatter

from aeneas.executetask import ExecuteTask
from aeneas.task import Task

In [11]:
def scrape_reddit(reddit_url: str, transcript_path: str, screenshot_path: str) -> str:
    try:
        options = webdriver.ChromeOptions()
        options.add_argument('headless')
        options.add_argument('user-agent=Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3112.50 Safari/537.36')
        options.add_argument('--disable-blink-features=AutomationControlled')

        driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options)

        driver.get(reddit_url)

        user_dropdown = WebDriverWait(driver, 2.5).until(EC.presence_of_element_located((By.ID, 'USER_DROPDOWN_ID')))
        user_dropdown.click()

        dark = WebDriverWait(driver, 2.5).until(EC.presence_of_element_located((By.XPATH, "//div[text()='Dark Mode']")))
        dark.click()
        user_dropdown.click()

        
        title = driver.find_element(By.CSS_SELECTOR, 'div[data-adclicklocation="title"]').text
        transcript = title + '\n' + driver.find_element(By.CSS_SELECTOR, 'div[data-click-id="text"]').text

        driver.execute_script(
            """
            const post_content = document.querySelector('[data-test-id="post-content"]')
            const content = document.querySelector('div[data-click-id="text"]')
            post_content.removeChild(content)
            """ 
        )

        driver.find_element(By.CSS_SELECTOR, 'div[data-test-id="post-content"]').screenshot(screenshot_path)

        with open(transcript_path, 'w') as transcript_file:
            transcript_file.write(transcript)

        print('Scraped transcript from Reddit')
        return title
    
    except: 
        return scrape_reddit(reddit_url, transcript_path, screenshot_path)


def pick_random_background_video(backgrounds_folder_path) -> str:
    background_videos = list(filter(lambda file_name: not file_name.startswith('.') ,os.listdir(backgrounds_folder_path)))
    background_video_name = random.choice(background_videos)
    print('Picked background video')
    return path.join(backgrounds_folder_path, background_video_name)



def preprocess_transcript(transcript_path: str):

    def halve(sentence: str):
        split = sentence.split()

        if len(split) < 10:
            return [sentence]
        
        mid = int(len(split) / 2)

        s1, s2 = ' '.join(split[:mid]), ' '.join(split[mid:])
        return halve(s1) + halve(s2)

    def swap_acronyms(s: str) -> str:
        slang = {
            'AITA': 'Am I The Asshole',
            'AMA': 'Ask Me Anything', 
            'NSFW': 'Not Safe For Work',
            'SFW': 'Safe For Work', 
            'IMO': 'In My Opinion', 
            'IIRC': 'If I Recall Correctly',
            'SJW': 'Social Justice Warrior', 
            'TIL': 'Today I Learned',
            'TLDR': "Too Long Didn't Read"
        }

        swaped_string = ''

        for word in s.split(' '):
            if word.upper() in slang:
                swaped_string += slang[word.upper()] + ' '
            else:
                swaped_string += word + ' '

        return swaped_string
    

    with open(transcript_path) as transcript_file:
        transcript = transcript_file.read()

    # 1) Replace acronyms
    transcript = swap_acronyms(transcript)

    # 2) Keep title on it's own seperate line
    title, body = transcript.split('\n', maxsplit=1)

    # 3) Split a sentence by all punctuation
    body_split = re.sub('[,!?.]', lambda punc: punc.group(0) + '\n', body).split('\n')

    # 4) If a sentence is longer than 10 words, split it into chunks of 10 words and under
    sentences = [halve(s) for s in body_split]
    processed_body = '\n'.join([sentence.strip() for sentence in functools.reduce(operator.iconcat, sentences, [])])
    processed_transcript = title + '\n' + processed_body


    with open(transcript_path, 'w') as processed_transcript_file:
        processed_transcript_file.write(processed_transcript)
    
    print('Transcript preprocessing completed')



def synthesize_voice(client: texttospeech.TextToSpeechClient, transcript_path: str, voice_path: str):
    with open(transcript_path) as transcript_file:
        transcript = transcript_file.read()
        input_text = texttospeech.SynthesisInput(text=transcript)

    voice = texttospeech.VoiceSelectionParams(name='en-US-Wavenet-D', language_code='en-US', ssml_gender=texttospeech.SsmlVoiceGender.MALE)
    audio_config = texttospeech.AudioConfig(audio_encoding=texttospeech.AudioEncoding.MP3, speaking_rate=1.20, pitch=-2.4)
    response = client.synthesize_speech(request={'input': input_text, 'voice': voice, 'audio_config': audio_config})

    with open(voice_path, 'wb') as voice_file:
        voice_file.write(response.audio_content)

    print('Voice generated')


def generate_srt_from_force_align(transcript_path: str, voice_path: str, captions_path: str):
    config_string = "task_language=eng|is_text_type=plain|os_task_file_format=srt"
    task = Task(config_string=config_string)
    task.audio_file_path_absolute = voice_path
    task.text_file_path_absolute = transcript_path
    task.sync_map_file_path_absolute = captions_path

    ExecuteTask(task).execute()
    task.output_sync_map_file()


def remove_title_from_srt(captions_path: str) -> float:
    with open(captions_path) as captions_file:
        lines = captions_file.readlines()
        overlay_duration = float(lines[1].split(':')[-1].replace(',', '.'))
    
    capitions_without_title = '\n'.join(lines[4:])

    with open(captions_path, 'w') as captions_file:
        captions_file.write(capitions_without_title)

    return overlay_duration


def merge_background_and_voice(background_path: str, voice_path: str, video_voice_path: str):
    os.system(f'ffmpeg -stream_loop -1 -i {background_path} -i {voice_path} -shortest -map 0:v:0 -map 1:a:0 -y {video_voice_path}')
    print('Voiceover applied to background video')



def get_google_credentials(client_secret_path: str, credentials_path: str) -> Credentials:
    os.environ["OAUTHLIB_INSECURE_TRANSPORT"] = "1"

    if path.exists(credentials_path):
        return Credentials.from_authorized_user_file(credentials_path)

    else:
        scopes = ['https://www.googleapis.com/auth/youtube.force-ssl', 'https://www.googleapis.com/auth/cloud-platform']
        flow = google_auth_oauthlib.flow.InstalledAppFlow.from_client_secrets_file(client_secret_path, scopes)
        credentials = flow.run_local_server()

        with open(credentials_path, 'w') as credentials_file:
            credentials_file.write(credentials.to_json())

        return credentials


def upload_video_to_youtube(youtube: any, title: str, video_path: str) -> str:
    request = youtube.videos().insert(
        part='snippet,status',
        body={
             'snippet': {
                'title': title
             },
            'status': {
                'privacyStatus': 'unlisted'
            }
        },
        media_body=MediaFileUpload(video_path)
    )

    response = request.execute()
    print('Video uploaded with ID:', response['id'])
    return response['id']


def upload_captions_to_youtube(youtube: any, video_id: str, transcript_path: str) -> str:
    request = youtube.captions().insert(
        part='snippet',
        sync=True,
        media_body=MediaFileUpload(transcript_path),
        body={
            'snippet': {
                'language': 'en',
                'name': 'Default',
                'videoId': video_id
            }
         }
    )

    response = request.execute()
    print('Captions uploaded with ID:', response['id'])
    return response['id']


def get_srt_from_youtube(video_id: str, captions_path: str) -> float:
    try:
        print('Waiting for YouTube captions to be available')
        time.sleep(60)
        transcripts = YouTubeTranscriptApi.list_transcripts(video_id)
        captions = transcripts.find_manually_created_transcript(['en']).fetch()

        header = captions.pop(0)
        overlay_duration = header['duration']

        formatter = SRTFormatter()
        srt_captions = formatter.format_transcript(captions).replace('.', '') # Remove periods for aesthetic purposes

        with open(captions_path, 'w') as captions_file:
            captions_file.write(srt_captions)

        print('SRT File extracted')
        return overlay_duration
    
    except Exception as e:
        return get_srt_from_youtube(video_id, captions_path)


def overlay_screenshot_and_captions(video_voice_path: str, screenshot_path: str, captions_path: str, video_voice_image_captions_path: str, overlay_duration: float):
    os.system(f'''ffmpeg -i {video_voice_path} -i {screenshot_path} -filter_complex "overlay=(W-w)/2:(H-h)/2:enable='between(t,0,{overlay_duration})', subtitles={captions_path}:force_style='Alignment=10,Fontname=Consolas,BackColour=&H80000000,Spacing=0.2,Outline=0,Shadow=0.75'" -preset fast -c:a copy {video_voice_image_captions_path}''')
    print('Reddit screenshot overlayed and subtitles added')

    # def main():

    # if len(sys.argv) != 2:
    #     print('generate.py <REDDIT_URL>')
    #     sys.exit(0)
    
    # _, reddit_url = sys.argv

# reddit_url = "https://www.reddit.com/r/AmItheAsshole/comments/12m7nsu/aita_for_not_serving_my_husband_leftovers/"
reddit_url = '''https://www.reddit.com/r/AmItheAsshole/comments/100z9la/aita_for_defending_my_cat_after_my_mil_was/'''
output_path = path.join('.', f"{reddit_url.split('/')[-2 if reddit_url[-1] == '/' else -1]}-{int(time.time())}.mp4")

required_path = path.join(os.getcwd(), 'required')
backgrounds_folder_path = path.join(required_path, 'backgrounds')
fonts_folder_path = required_path
credentials_path = path.join(required_path, 'credentials.json')
client_secret_path = path.join(required_path, 'client_secret.json')

with tempfile.TemporaryDirectory() as temp_dir:
    transcript_path = path.join(temp_dir, 'transcript.txt')
    captions_path = path.join(temp_dir, 'captions.srt')
    voice_path = path.join(temp_dir, 'voice.mp3')
    video_voice_path = path.join(temp_dir, 'video-with-voice.mp4')
    screenshot_path = path.join(temp_dir, 'screenshot.png')

    credentials = get_google_credentials(client_secret_path, credentials_path)
    client = texttospeech.TextToSpeechClient(credentials=credentials)   

    background_path = pick_random_background_video(backgrounds_folder_path)
    title = scrape_reddit(reddit_url, transcript_path, screenshot_path)

    preprocess_transcript(transcript_path)
    synthesize_voice(client, transcript_path, voice_path)
    generate_srt_from_force_align(transcript_path, voice_path, captions_path)
    overlay_duration = remove_title_from_srt(captions_path)


    # def generate_video(background_path: str, voice_path: str, captions_path: str, output_path: str, overlay_duration: float):
    os.system(f'ffmpeg -stream_loop -1 -i {background_path} -i {voice_path} -shortest -map 0:v:0 -map 1:a:0 -y {video_voice_path}')
    os.system(f'''ffmpeg -i {video_voice_path} -i {screenshot_path} -filter_complex "overlay=(W-w)/2:(H-h)/2:enable='between(t,0,{overlay_duration})', subtitles={captions_path}:fontsdir='{fonts_folder_path}':force_style='Fontname=Roboto Slab Black,Outline=1.25,Alignment=10'" -preset fast -c:a copy {output_path}''')


# if __name__ == "__main__":
#     main()

Picked background video
Scraped transcript from Reddit
Transcript preprocessing completed


E0419 04:53:31.356174000 140704328837312 client_channel.cc:625]        chand=0x7fc17cea1f50: Illegal keepalive throttling value 
E0419 04:53:31.356200000 140704328837312 client_channel.cc:625]        chand=0x7fc17cea1f50: Illegal keepalive throttling value 
E0419 04:53:31.356209000 140704328837312 client_channel.cc:625]        chand=0x7fc17cea1f50: Illegal keepalive throttling value 
E0419 04:53:31.356213000 140704328837312 client_channel.cc:625]        chand=0x7fc17cea1f50: Illegal keepalive throttling value 
E0419 04:53:31.356217000 140704328837312 client_channel.cc:625]        chand=0x7fc17cea1f50: Illegal keepalive throttling value 
E0419 04:53:31.356221000 140704328837312 client_channel.cc:625]        chand=0x7fc17cea1f50: Illegal keepalive throttling value 
E0419 04:53:31.356225000 140704328837312 client_channel.cc:625]        chand=0x7fc17cea1f50: Illegal keepalive throttling value 
E0419 04:53:31.356978000 140704328837312 client_channel.cc:625]        chand=0x7fc17cea1f50: Ille

Voice generated


ffmpeg version 5.1.2 Copyright (c) 2000-2022 the FFmpeg developers
  built with Apple clang version 14.0.0 (clang-1400.0.29.202)
  configuration: --prefix=/usr/local/Cellar/ffmpeg/5.1.2_6 --enable-shared --enable-pthreads --enable-version3 --cc=clang --host-cflags= --host-ldflags= --enable-ffplay --enable-gnutls --enable-gpl --enable-libaom --enable-libaribb24 --enable-libbluray --enable-libdav1d --enable-libmp3lame --enable-libopus --enable-librav1e --enable-librist --enable-librubberband --enable-libsnappy --enable-libsrt --enable-libsvtav1 --enable-libtesseract --enable-libtheora --enable-libvidstab --enable-libvmaf --enable-libvorbis --enable-libvpx --enable-libwebp --enable-libx264 --enable-libx265 --enable-libxml2 --enable-libxvid --enable-lzma --enable-libfontconfig --enable-libfreetype --enable-frei0r --enable-libass --enable-libopencore-amrnb --enable-libopencore-amrwb --enable-libopenjpeg --enable-libspeex --enable-libsoxr --enable-libzmq --enable-libzimg --disable-libjack --

: 