In [22]:
import pytube
import pydub
import cv2
import imagehash
import PIL
import pytesseract
import zhon
import string
import os
import re


os.environ["IMAGEIO_FFMPEG_EXE"] = "/opt/homebrew/bin/ffmpeg"

import moviepy.editor as mpeditor
import moviepy.video.fx.all as mpfx

target_dir = "./all_data"
video_fps = 3

In [23]:

def isSimilar(img1, img2, cutoff = 5):

    img1 = PIL.Image.fromarray(cv2.cvtColor(img1, cv2.COLOR_BGR2RGB))  
    img2 = PIL.Image.fromarray(cv2.cvtColor(img2, cv2.COLOR_BGR2RGB)) 

    n0 = imagehash.average_hash(img1) 
    n1 = imagehash.average_hash(img2) 
    
    if n0 - n1 < cutoff:
        return True
   
    return False


def sliceVideo(clip, fps):
    im0 = ""            # 目标帧
    start_time = 0      # 片段开始时间
    end_time = 0        # 片段结束时间
    success_durations = []  # 成功片段时间列表
    for i,img in enumerate(clip.iter_frames(fps)):
        if i == 0: 
            im0 = img
        time = (i) / fps
        
        if not isSimilar(im0, img):  # 结果为不相似
            end_time = (i-1) / fps
            if(end_time - start_time < 0.5):
                continue
            if start_time != end_time:  # 排除情况，开始时间和结束时间相同的话moviepy会报错；也可以根据需要筛选时长大于多少的片段
                success_durations.append([start_time, end_time])
            start_time = time
        im0 = img
    # 前面的循环并没有包括视频中最后一段画面，因此需要在最后加上
    end_time = clip.duration
    if start_time != end_time:
        success_durations.append([start_time, end_time])
    
    return success_durations

def cleartext(text):
    text = re.sub(r"\s+", "", text)
    text = re.sub("\(.*?\)","", text)
    for i in string.punctuation:
        text = text.replace(i, "")
    for i in zhon.hanzi.punctuation:
        text = text.replace(i, "")
    return text


def lcs(s1, s2):
    n1 = len(s1)
    n2 = len(s2)

    dp = [[None] * (n2 + 1) for i in range(n1 + 1)]

    for i in range(n1 + 1):
        for j in range(n2 + 1):
            if i == 0 or j == 0 :
                dp[i][j] = 0
            elif s1[i - 1] == s2[j - 1]:
                dp[i][j] = dp[i - 1][j - 1] + 1
            else:
                dp[i][j] = max(dp[i - 1][j], dp[i][j - 1])
                
    return dp[n1][n2]

def get_subtitle(path):
    video = cv2.VideoCapture(path)
    total_frame = int(video.get(cv2.CAP_PROP_FRAME_COUNT))

    subtitle = open(path.replace(".mp4", ".txt"), "w")

    pre_text = ""

    for i in range(total_frame):
        success, img = video.read()
        if i & 3 != 0: 
            continue
        if not success:
            break

        img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY);
        ret, img = cv2.threshold(img, 230, 255, cv2.THRESH_BINARY)
        img = cv2.bitwise_not(img)
  

        kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (2, 2))
        img = cv2.erode(img, kernel)

        # cv2.imshow("video", img)
        # cv2.waitKey(0)
        # cv2.destroyAllWindows()

        img = PIL.Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
        text = pytesseract.image_to_string(img, lang='chi_tra+eng', config="--psm 7")

        text = cleartext(text)
        if text == "" or lcs(text, pre_text) > len(text) - 5:
            continue
        pre_text = text
        subtitle.write(text + "\n")

    subtitle.close()



    

In [24]:
if not os.path.isdir(target_dir + "/mp4"):
    os.makedirs(target_dir + "/mp4")
if not os.path.isdir(target_dir + "/wav"):
    os.makedirs(target_dir + "/wav")



traincsv = open(target_dir + "/train.csv", "w")
traincsv.write("\"file\",\"Description\"\n")
failfile = open(target_dir + "/fail.txt", "w")
list_url = "https://www.youtube.com/playlist?list=PL96kIIcXJpMtmsQGlsNVqWduASZnh4HnE"
playlist = pytube.Playlist(list_url)

idx = 0
for video in playlist.videos:

    if idx == 1:
        break

    tmp_vidoe = target_dir + "/mp4/tmp" + str(idx) + ".mp4"
    mp4_path = target_dir + "/mp4/" + str(idx) + ".mp4"
    wav_path = target_dir + "/wav/" + str(idx) + ".wav"

    if not os.path.isfile(mp4_path):
        try:
            video.streams.filter().get_highest_resolution().download(filename="tmp" + str(idx) + ".mp4", output_path=target_dir + "/mp4")
            mp4 = mpeditor.VideoFileClip(tmp_vidoe)
            mp4 = mpfx.crop(mp4, x1=200, y1=565, width=900, height=50)
            mp4 = mp4.subclip(0, mp4.duration - 5)
            mp4.write_videofile(filename=mp4_path, fps=video_fps, logger=None)
            os.remove(tmp_vidoe)

        except:
            print(idx, video.title, "cant downlaod")
            failfile.write(str(idx) + ". " + "cant downlaod.\n")
            continue
    

    if not os.path.isfile(wav_path):
        try:
            audio = mpeditor.AudioFileClip(mp4_path)
            audio.write_audiofile(filename=wav_path, fps=16000, nbytes=2, logger=None)
            sound = pydub.AudioSegment.from_wav(wav_path)
            sound = sound.set_channels(1)
            sound.export(wav_path, format="wav")
        except:
            print(idx, "cant convert")
            failfile.write(str(idx) + ". " + "cant convert.\n")
            continue

    get_subtitle(mp4_path)


    print(idx, "success")
    idx += 1

traincsv.close()
failfile.close()
    



0 success
