## Module 1: Dowload Video

In [None]:
!pip install pytube

Collecting pytube
  Downloading pytube-15.0.0-py3-none-any.whl (57 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m57.6/57.6 kB[0m [31m1.5 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: pytube
Successfully installed pytube-15.0.0


In [None]:
from multiprocessing import Pool
from pytube import YouTube
from pytube import Playlist
import os
from concurrent.futures import ThreadPoolExecutor
import re

def download_video(video_url, download_path):
    try:
        yt = YouTube(video_url)
        if yt.length <= 300:
            video_title = re.sub(r'[^\w\s]', '', yt.title).replace(' ', '_')
            video_filename = f"{video_title}.mp4"

            if video_filename in [f for f in os.listdir(download_path) if f.endswith('.mp4')]:
                print(f"Video '{video_title}' đã tồn tại, bỏ qua.")
                return

            video_stream = yt.streams.get_highest_resolution()
            video_stream.download(output_path=download_path, filename=video_filename)
            print(f"Tải xuống {video_title} hoàn thành.")
        else:
            print(f"Bỏ qua {yt.title} vì độ dài lớn hơn 5 phút.")
    except Exception as e:
        print(f"Lỗi khi tải xuống {video_url}: {e}")

In [None]:
def main():
    playlist_url = 'https://youtube.com/playlist?list=PLH6o6WQeJwt2Z83ClxRkEsQwi6B1X6IdO&si=pg6G15KukaQK2Bo5'
    download_path = '/content/Giáo dục'

    if not os.path.exists(download_path):
        os.makedirs(download_path)

    playlist = Playlist(playlist_url)
    video_urls = playlist.video_urls

    with ThreadPoolExecutor(max_workers=300) as executor:
        for x in range(2):
            for video_url in video_urls:
                executor.submit(download_video, video_url, download_path)

if __name__ == "__main__":
    main()

# Module 2: Convert Video to Text

In [None]:
pip install pydub

In [None]:
pip install SpeechRecognition

In [None]:
pip install moviepy

In [None]:
import os
from moviepy.editor import VideoFileClip
from pydub import AudioSegment
import speech_recognition as sr
import concurrent.futures

In [None]:
def mp3_to_text(mp3_path, language="vi-VN"):
    audio = AudioSegment.from_mp3(mp3_path)
    wav_path = mp3_path.replace(".mp3", ".wav")
    audio.export(wav_path, format="wav")

    recognizer = sr.Recognizer()
    with sr.AudioFile(wav_path) as source:
        audio_data = recognizer.record(source)

    try:
        text = recognizer.recognize_google(audio_data, language=language)
        return text
    except sr.UnknownValueError:
        return "Không nhận dạng được giọng nói."
    except sr.RequestError:
        return "Lỗi kết nối tới API nhận dạng giọng nói."

def process_video(video_filename, video_folder, audio_output_folder, text_output_folder):
    video_path = os.path.join(video_folder, video_filename)
    audio_output_path = os.path.join(audio_output_folder, f"{os.path.splitext(video_filename)[0]}.wav")
    text_output_path = os.path.join(text_output_folder, f"{os.path.splitext(video_filename)[0]}.txt")

    convert_mp4_to_wav(video_path, audio_output_path)
    mp3_to_text(audio_output_path)

    # Kiểm tra và xác thực kết quả nhận dạng giọng nói
    recognized_text = mp3_to_text(audio_output_path)
    if "Lỗi kết nối tới API" in recognized_text:
        recognized_text = "Không nhận dạng được giọng nói."
    with open(text_output_path, "w", encoding="utf-8") as text_file:
        text_file.write(recognized_text)

    print(f"Đã xử lý video {video_filename}, lưu âm thanh vào {audio_output_path}, và lưu văn bản vào {text_output_path}")

def convert_mp4_to_wav(input_path, output_path):
    video_clip = VideoFileClip(input_path)
    audio_clip = video_clip.audio
    audio_clip.write_audiofile(output_path)


def process_audio(audio_path, text_output_path):
    recognized_text = mp3_to_text(audio_path)
    if "Lỗi kết nối tới API" in recognized_text:
        recognized_text = "Không nhận dạng được giọng nói."
    with open(text_output_path, "w", encoding="utf-8") as text_file:
        text_file.write(recognized_text)

        #  video_folder = "//content/drive/MyDrive/Data_Video/data"

In [None]:
if __name__ == "__main__":
    video_folder = "/content/Giáo dục"
    audio_output_folder = "audio_path1"
    output_text_folder = "/content/Giaoduc"

    if not os.path.exists(audio_output_folder):
        os.makedirs(audio_output_folder)

    if not os.path.exists(output_text_folder):
        os.makedirs(output_text_folder)

    with concurrent.futures.ThreadPoolExecutor() as executor:
        video_filenames = [filename for filename in os.listdir(video_folder) if filename.endswith(".mp4")]
        for video_filename in video_filenames:
            video_path = os.path.join(video_folder, video_filename)
            audio_output_path = os.path.join(audio_output_folder, f"{os.path.splitext(video_filename)[0]}.wav")
            text_output_path = os.path.join(output_text_folder, f"{os.path.splitext(video_filename)[0]}.txt")

            # Kiểm tra nếu tệp văn bản đã tồn tại, bỏ qua nếu đã tồn tại
            if os.path.exists(text_output_path):
                print(f"Tệp văn bản {text_output_path} đã tồn tại. Bỏ qua xử lý.")
            else:
                try:
                    convert_mp4_to_wav(video_path, audio_output_path)
                    executor.submit(process_audio, audio_output_path, text_output_path)

                    print(f"Đã xử lý video {video_filename}, lưu âm thanh vào {audio_output_path}, và lưu văn bản vào {text_output_path}")
                except OSError as e:
                    print(f"Lỗi khi xử lý video {video_filename}: {e}. Bỏ qua xử lý.")


In [None]:
import os

folder_path = '/content/Giaoduc'  # Đường dẫn đến thư mục chứa các tệp txt

def process_text_file(file_path):
    try:
        with open(file_path, 'r', encoding='utf-8') as file:  # Sửa thành encoding='utf-8'
            content = file.read()
            if "Không nhận dạng được giọng nói." in content or "Lỗi kết nối tới API." in content or "Lỗi kết nối tới API nhận dạng giọng nói." in content:
                os.remove(file_path)
                print(f"Đã xóa tệp: {file_path}")
    except Exception as e:
        print(f"Lỗi khi xử lý tệp {file_path}: {e}")

def main():
    # Lặp qua tất cả các tệp trong thư mục
    for filename in os.listdir(folder_path):
        if filename.endswith('.txt'):
            file_path = os.path.join(folder_path, filename)
            process_text_file(file_path)

if __name__ == "__main__":
    main()


In [None]:
import os
import zipfile

# Thư mục bạn muốn nén
source_folder = "/content/Giaoduc"  # Thay thế bằng đường dẫn thư mục bạn muốn nén

# Tạo tên file zip và đường dẫn lưu trữ file zip
zip_file_name = "GD.zip"
zip_file_path = "/content/" + zip_file_name

# Nén thư mục
with zipfile.ZipFile(zip_file_path, 'w') as zipf:
    for root, _, files in os.walk(source_folder):
        for file in files:
            file_path = os.path.join(root, file)
            zipf.write(file_path, os.path.relpath(file_path, source_folder))

print("Đã nén thư mục thành công.")