In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!pwd

In [None]:
%cd /content

In [None]:
# # pafy, youtube-dl 모델 설치
# !pip install git+https://github.com/Cupcakus/pafy
# !pip install youtube-dl

# pafy, youtube-dl 모델 설치
!pip install pafy
!pip install youtube-dl==2020.12.2

In [None]:
# scenedetect 모델 설치
!pip install scenedetect[opencv]

# easyocr 모델 설치
!pip install easyocr

# py-hanspell 라이브러리 설치
!pip install git+https://github.com/ssut/py-hanspell.git

!pip install imutils opencv-python scikit-image

In [None]:
# 모듈 가져오기
import easyocr
import cv2
import math
from hanspell import spell_checker
import pandas as pd
import numpy as np
import re
from skimage.metrics import structural_similarity as ssim
import pafy
import matplotlib.pyplot as plt
from glob import glob

from scenedetect import VideoManager, SceneManager, StatsManager
from scenedetect.detectors import ContentDetector
from scenedetect.scene_manager import save_images, write_scene_list_html

In [None]:
import csv
from string import Template
from typing import Iterable, List, Tuple, Optional, Dict, Callable, Union, TextIO
import threading
import queue
import logging
import math
import sys

import cv2
import numpy as np

from scenedetect.frame_timecode import FrameTimecode
from scenedetect.platform import (tqdm, get_and_create_path, get_cv2_imwrite_params)
from scenedetect.video_stream import VideoStream
from scenedetect.stats_manager import StatsManager, FrameMetricRegistered
from scenedetect.scene_detector import SceneDetector, SparseSceneDetector
from scenedetect.thirdparty.simpletable import (SimpleTableCell, SimpleTableImage, SimpleTableRow,
                                                SimpleTable, HTMLPage)

logger = logging.getLogger('pyscenedetect')

In [None]:
def easyocr_image(scene_list: List[Tuple[FrameTimecode, FrameTimecode]],
                video: VideoStream,
                num_images: int = 3,
                frame_margin: int = 1,
                image_extension: str = 'jpg',
                encoder_param: int = 95,
                image_name_template: str = '$VIDEO_NAME-Scene-$SCENE_NUMBER-$IMAGE_NUMBER',
                output_dir: Optional[str] = None,
                show_progress: Optional[bool] = False,
                scale: Optional[float] = None,
                height: Optional[int] = None,
                width: Optional[int] = None,
                video_manager=None) -> Dict[int, List[str]]:
   
    # TODO: Remove `video_manager`.
    if video_manager is not None:
        logger.error('video_manager is deprecated, use video instead.')
        video = video_manager

    if not scene_list:
        return {}
    if num_images <= 0 or frame_margin < 0:
        raise ValueError()

    # TODO: Validate that encoder_param is within the proper range.
    # Should be between 0 and 100 (inclusive) for jpg/webp, and 1-9 for png.
    imwrite_param = [get_cv2_imwrite_params()[image_extension], encoder_param
                    ] if encoder_param is not None else []

    video.reset()

    # Setup flags and init progress bar if available.
    completed = True
    logger.info('Generating output images (%d per scene)...', num_images)
    progress_bar = None
    if show_progress and tqdm:
        progress_bar = tqdm(total=len(scene_list) * num_images, unit='images', dynamic_ncols=True)

    filename_template = Template(image_name_template)

    scene_num_format = '%0'
    scene_num_format += str(max(3, math.floor(math.log(len(scene_list), 10)) + 1)) + 'd'
    image_num_format = '%0'
    image_num_format += str(math.floor(math.log(num_images, 10)) + 2) + 'd'

    framerate = scene_list[0][0].framerate

    # TODO(v1.0): Split up into multiple sub-expressions so auto-formatter works correctly.
    timecode_list = [
        [
            FrameTimecode(int(f), fps=framerate) for f in [
                                                                                               # middle frames
                a[len(a) // 2] if (0 < j < num_images - 1) or num_images == 1

                                                                                               # first frame
                else min(a[0] + frame_margin, a[-1]) if j == 0

                                                                                               # last frame
                else max(a[-1] - frame_margin, a[0])

                                                                                               # for each evenly-split array of frames in the scene list
                for j, a in enumerate(np.array_split(r, num_images))
            ]
        ] for i, r in enumerate([
                                                                                               # pad ranges to number of images
            r if 1 + r[-1] - r[0] >= num_images else list(r) + [r[-1]] * (num_images - len(r))
                                                                                               # create range of frames in scene
            for r in (
                range(start.get_frames(), end.get_frames())
                                                                                               # for each scene in scene list
                for start, end in scene_list)
        ])
    ]

    image_frames = []
    image_filenames = {i: [] for i in range(len(timecode_list))}
    aspect_ratio = video.aspect_ratio
    if abs(aspect_ratio - 1.0) < 0.01:
        aspect_ratio = None

    for i, scene_timecodes in enumerate(timecode_list):
        for j, image_timecode in enumerate(scene_timecodes):
            video.seek(image_timecode)
            frame_im = video.read()
            if frame_im is not None:
                file_path = '%s.%s' % (filename_template.safe_substitute(
                    VIDEO_NAME=video.name,
                    SCENE_NUMBER=scene_num_format % (i + 1),
                    IMAGE_NUMBER=image_num_format % (j + 1),
                    FRAME_NUMBER=image_timecode.get_frames()), image_extension)
                image_filenames[i].append(file_path)
                if aspect_ratio is not None:
                    frame_im = cv2.resize(
                        frame_im, (0, 0), fx=aspect_ratio, fy=1.0, interpolation=cv2.INTER_CUBIC)

                # Get frame dimensions prior to resizing or scaling
                frame_height = frame_im.shape[0]
                frame_width = frame_im.shape[1]

                # Figure out what kind of resizing needs to be done
                if height and width:
                    frame_im = cv2.resize(frame_im, (width, height), interpolation=cv2.INTER_CUBIC)
                elif height and not width:
                    factor = height / float(frame_height)
                    width = int(factor * frame_width)
                    frame_im = cv2.resize(frame_im, (width, height), interpolation=cv2.INTER_CUBIC)
                elif width and not height:
                    factor = width / float(frame_width)
                    height = int(factor * frame_height)
                    frame_im = cv2.resize(frame_im, (width, height), interpolation=cv2.INTER_CUBIC)
                elif scale:
                    frame_im = cv2.resize(
                        frame_im, (0, 0), fx=scale, fy=scale, interpolation=cv2.INTER_CUBIC)

                # cv2.imwrite(get_and_create_path(file_path, output_dir), frame_im, imwrite_param)
                image_frames.append(frame_im[65:570,:])
                
            else:
                completed = False
                break
            if progress_bar is not None:
                progress_bar.update(1)

    if progress_bar is not None:
        progress_bar.close()

    if not completed:
        logger.error('Could not generate all output images.')

    return image_filenames, image_frames

In [None]:
def video_to_frame(url) :

    video = pafy.new(url)
    video_path = video.getbest(preftype="mp4")

    video_manager = VideoManager([video_path.url])
    stats_manager = StatsManager()
    scene_manager = SceneManager(stats_manager)

    # threshold : 0~100 사이의 값으로, 0으로 갈수록 민감하게
    scene_manager.add_detector(ContentDetector(threshold=1))

    # 처리속도 향상을 위해 이미지 크기를 낮춤
    video_manager.set_downscale_factor()

    video_manager.start()
    scene_manager.detect_scenes(frame_source=video_manager)

    # 결과

    scene_list = scene_manager.get_scene_list()

    image_filenames, image_frames = easyocr_image(
        scene_list,
        video_manager,
        num_images=1,
        image_name_template='$SCENE_NUMBER',
        output_dir='./result/scenes'
    )

    # reader = easyocr.Reader(['ko','en'])
    reader = easyocr.Reader(['ko'], gpu=True, 
                            # 커스텀 학습 모델 경로 지정
                            model_storage_directory='/content/drive/MyDrive/user_network_dir',
                            user_network_directory='/content/drive/MyDrive/user_network_dir',
                            recog_network='None-VGG-BiLSTM-CTC') 
    file_list = image_frames

    result={}
    times = []
    for time, value in zip(scene_list, file_list):

        start, end = time
        sec = f'{math.trunc(start.get_seconds())}'
        k = '{}:{:02d}'.format(math.trunc(int(sec)/60), math.ceil(int(sec)%60))
            
        v = reader.readtext(value, detail = 0, paragraph=True, batch_size = 10)
        
        result[k] = v
            
    return result

In [None]:
result = video_to_frame('https://www.youtube.com/watch?v=pmnP26UIIEM')

In [None]:
result

In [None]:
## 중복 제거 1
def duplicated(result) :

  list_ = []
  for key, val in result.items() :
    # 딕셔너리 내 리스트 문자를 합치기(합칠 때 공백을 구분자로 사용)
    cont = ' '.join(val)

    dict_result = {}
    dict_result['timestamps'] = key
    dict_result['original'] = val

    list_.append(dict_result)
    df = pd.DataFrame(list_, columns = ['timestamps', 'original'])

  # 중복제거
  df_sort = df.sort_index(ascending=False)

  row_index = []

  for i in range(len(df_sort)-1) :
    list1 = ' '.join(df_sort['original'][i]).split()
    list2 = ' '.join(df_sort['original'][i+1]).split()

    intersection = set(list1) & set(list2)  

    try :
      score = round(len(intersection) / len(list1), 2)

      # 스코어 조정을 통해 중복 제거 미세 조정
      if score >= 0.5 :
        row_index.append(i)
      else :
        pass

      df_dup = df_sort.drop(index = row_index, axis = 0)

    except :
      pass

  return df_dup

In [None]:
df_dup = duplicated(result)
df_dup

In [None]:
## 맞춤법 검사
def spell_check(df_dup) :

  # 특수문자 제거
  def cleanText(readData):
    text = re.sub('[-=+,#/\?:^$.@*\"※~&%ㆍ!』\\‘|\(\)\[\]\<\>`\'…》]','', readData)
    return text

  df_dup.reset_index(inplace = True, drop=True)

  for i in range(len(df_dup)) :
    cont = ''.join(df_dup['original'][i])
    cont = cleanText(cont)

    # 스펠체크
    spell_ck = spell_checker.check(cont)

    bool_spell = spell_ck[0]

    if bool_spell == True :
      df_dup.loc[i, 'checked'] = spell_ck[2]
    else : 
      df_dup.loc[i, 'checked'] = cleanText(df_dup.loc[i, 'original'])
  
  df_check = df_dup.sort_index(ascending=False)

  # 데이터 최종 정리
  indexs = df_check[df_check['checked'] == ''].index
  df_check.drop(indexs, inplace = True)

  return df_check

In [None]:
df_check = spell_check(df_dup)
df_check

In [None]:
df_check.to_csv("./dataedu_py.csv")

In [None]:
df_check.to_csv("/content/drive/MyDrive/preprocessing_NA_py.csv")