# Youtube Video Downloader

Download video from Youtube, and generate english and chinese subtitle using HKUST Xunfei's api.

* ffmpeg https://ffmpeg.org/
* ImageMagick https://imagemagick.org/
* pip install youtube-dl yt-dlp ffmpeg-python pysrt moviepy


In [1]:
import requests
import json
import ffmpeg
import pysrt
import re
from moviepy.editor import *
from moviepy.video.tools.subtitles import SubtitlesClip
from srtmerge import srtmerge
import pandas as pd
from downloader import *
from downloader.DownloaderFactory import DownloaderFactory
from utils import *

* 科大讯飞平台API https://www.xfyun.cn/
* 创建应用，复制APPID，作为 video_carrier.py 25行 XF_APP_ID 的值
* 语音识别->语音转写，开通，复制 SecretKey，作为 video_carrier.py 27行 XF_LFASR_SECRET_KEY 的值
* 自然语言处理->及其翻译，开通，复制 APISecret， 作为 video_carrier.py 29行 XF_MT_API_SECRET 的值； 复制 APIKey，作为 video_carrier.py 30行 XF_MT_API_KEY 的值

In [None]:
# 科大讯飞平台的 appid,
XF_APP_ID = ''
# 科大讯飞语音转写的 SecretKey
XF_LFASR_SECRET_KEY = ''
# 科大讯飞机器翻译 APISecret、APIKey
XF_MT_API_SECRET = ''
XF_MT_API_KEY = ''
# !set http_proxy=http://127.0.0.1:25379/ #your vpn port
# !set https_proxy=https://127.0.0.1:25379/

In [13]:
def speech_to_text(outfile, duration):
    """
    调用讯飞语音转写API完成语音识别成为文本
    :param file: 语音文件相名
    :param duration: 音频时长，单位毫秒
    :return: 识别结果
    """
    api = xf_lfasr.RequestApi(
        appid=XF_APP_ID,
        secret_key=XF_LFASR_SECRET_KEY,
        upload_file_path=outfile
    )
    return api.recognize(duration)

def translate(text, from_lang='en', to_lang='cn'):
    """
    翻译
    :param text: 待翻译文本列表
    :param from_lang: 原语言
    :param to_lang: 目标语言
    :return: 无
    """
    for item in text:
        item['text'] = xf_mt.translate(
            XF_APP_ID, XF_MT_API_SECRET, XF_MT_API_KEY, item['text'], from_lang, to_lang)
def file_name(file):
    return file[:file.rindex('.')]

def text_to_srt(text, outfile):
    """
    讯飞识别完的文本转为字幕文件(srt)
    :param text: 文本(携带每一句文本的开始和结束时间)
    :param file: srt文件名
    :return:
    """
    sub_titles = []
    for i, item in enumerate(text):
        sub_titles.append(pysrt.SubRipItem(
            i,
            pysrt.SubRipTime(milliseconds=int(item['begin_at'])),
            pysrt.SubRipTime(milliseconds=int(item['end_at'])),
            item['text']
        ))
    subtitle_file = pysrt.SubRipFile(sub_titles)
    subtitle_file.save(outfile, encoding='utf-8')

def write_srt_to_video(video_file, srt_file):
    """
    字幕文件写入到视频中进行显示
    :param video_file: 视频文件名
    :param srt_file: 字母文件名
    :return: 结果文件名
    """
    input_video = VideoFileClip(video_file)
    generator = lambda text: TextClip(text, font='Microsoft-YaHei-Bold-&-Microsoft-YaHei-UI-Bold', fontsize=round(input_video.size[0]/38.4), color='White', stroke_color='black',
        stroke_width=1.5, bg_color='transparent',transparent=True,method='caption', align='South',size=(input_video.size[0] * 0.98, input_video.size[1] * 0.99))
    sub_titles = SubtitlesClip(srt_file,generator, encoding='utf-8') # 需要更新新版pip install moviepy==2.0.0.dev2
    # sub_titles = SubtitlesClip(srt_file,generator)
    final_video = CompositeVideoClip([input_video, sub_titles.set_position((0.01, 0), relative=True)] ).set_duration(input_video.duration)
    out_file =file_name(video_file) + '_srt.mp4'
    # threads为FFmpeg处理视频时可用的线程数，根据机器的核心数进行配置
    final_video.write_videofile(out_file, audio=False, threads=24)
    return out_file

def f_conver(file):
    name = file_name(file)
    video = ffmpeg.input(file)
    video_stream = video.video
    audio_stream = video.audio

    # 抽取音频
    ori_audio = name + '_audio.wav'
    print(ori_audio)
    if not os.path.exists(ori_audio):
        ffmpeg.output(audio_stream, ori_audio).run()
    else:
        print('file already exist: '+ori_audio)
    # 抽取视频
    ori_video = name + '_video.mp4'
 
    if not os.path.exists(ori_video):
        ffmpeg.output(video_stream, ori_video).run()
    else:
        print('file already exist: '+ori_video)
    return  ori_audio,ori_video  

def tran_srt(ori_audio,video_id,out_dir,**opts):
    # 音频提取文本(并翻译，如果需要的话)
    duration_ms = int(float(ffmpeg.probe(ori_audio)['streams'][0]['duration']) * 1000)
    en_srt = speech_to_text(ori_audio, duration_ms)
    tmp=en_srt.copy()
    srt_file1 = out_dir+video_id + '_en.srt'
    text_to_srt(en_srt, srt_file1)
    if opts.get('lang', 0) and opts.get('translate', 0):
        translate(tmp, opts['lang'], opts['translate'])
        cn_srt=tmp.copy()
        srt_file2 = out_dir+video_id + '_cn.srt'
        text_to_srt(cn_srt, srt_file2)
    if opts.get('merge', 0):
        final_srt=out_dir+video_id + '_fn.srt'
        srtmerge([srt_file2,srt_file1],final_srt, offset=1000)
    return srt_file1,srt_file2

def produce_video(ori_audio,ori_video, srt_file,ratio,video_id,out_dir):
    srt_video = write_srt_to_video(ori_video, srt_file)
    video_stream = VideoFileClip(ori_video)
    audio_stream = AudioFileClip(ori_audio)
    finalvideo = video_stream.set_audio(audio_stream)  # 将提取的视频1的音频合成到2视频中
    final_name = out_dir+ video_id+ '_result.mp4'
    finalvideo.write_videofile(final_name)

def process_video(ori_audio,ori_video, srt_file,ratio,video_id,out_dir,ifsrt=True):
     # 字幕写入视频文件
    if ifsrt:
        ori_video = write_srt_to_video(ori_video, srt_file)
    # Input video and audio
    input_audio=ffmpeg.input(ori_audio)
    input_video = ffmpeg.input(ori_video)
    if ratio !=1:
        # Speed up the video
        input_video = input_video.filter('setpts', str(ratio) + "*PTS")  # Speed up the video by 1/ratio
        # Speed up the audio
        input_audio = input_audio.filter('atempo', str(1/ratio))  # Speed up the audio by 1/ratio
    # Concatenate the video and audio
    concatenated = ffmpeg.concat(input_video, input_audio, v=1, a=1).node
    # Output the final video
    final_name = out_dir+ video_id+ '_result.mp4'
    output_video = ffmpeg.output(concatenated[0], concatenated[1], final_name)
    # Run the command
    ffmpeg.run(output_video)
    return final_name
def save_csv(rs,video_id,out_dir):
    with open(out_dir+video_id+'.csv', 'w') as f:
        [f.write('{0},{1}\n'.format(key, value)) for key, value in rs.items()]

In [None]:
if __name__ == '__main__':
    # 要下载的YouTube视频的ID，在YouTube播放界面上，复制地址中的参数
    url='https://www.youtube.com/watch?v=gSjrUPMLkZY'
    video_id = url.split('watch?v=')[-1]
    outdir='./videos/'
    # 代理服务器地址
    proxy_url = 'http://127.0.0.1:25379/'

    # 从YouTube下载视频到本地
    downloader = DownloaderFactory.create_downloader('YouTube', video_id, proxy_url)
    # downloader = DownloaderFactory.create_downloader('YouTube', video_id)
    rs = downloader.download()
    save_csv(rs,video_id,outdir) 
    print(rs)

In [14]:
# 下载完需要切断vpn才能使用科大讯飞api
file=outdir+video_id+'.webm'
final_srt=outdir+video_id + '_fn.srt'
ori_audio,ori_video =f_conver(file)
# # en_srt=outdir+video_id+'_en.srt'
# # cn_srt=outdir+video_id+'_cn.srt'
en_srt,cn_srt=tran_srt(ori_audio,video_id,outdir,lang='en', translate='cn',merge=True)
# ori_audio=outdir+video_id+'_audio.wav'
# ori_video=outdir+video_id+'_video.mp4'
process_video(ori_audio,ori_video,final_srt,0.9,video_id,outdir,ifsrt=False)

'./videos/gSjrUPMLkZY_result.mp4'