In [2]:
# Importing required libraries and dependencies

from pytubefix import YouTube
from moviepy.editor import VideoFileClip
from moviepy.video.io.ffmpeg_tools import ffmpeg_extract_subclip
from concurrent.futures import ThreadPoolExecutor
from tqdm import tqdm
import logging
import cv2
import os
import random
import shutil

In [3]:
# Configuring the logger
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
# Create a formatter
formatter = logging.Formatter("%(asctime)s - %(message)s")

# Create handlers
info_handler = logging.FileHandler("info_logs.txt")
info_handler.setLevel(logging.INFO)
info_handler.setFormatter(formatter)

error_handler = logging.FileHandler("error_logs.txt")
error_handler.setLevel(logging.ERROR)
error_handler.setFormatter(formatter)

# Add handlers to the logger
logger.addHandler(info_handler)
logger.addHandler(error_handler)

In [4]:
def download(vid_link: str):
  """
  This function downloaded any video from YouTube by passing it the link as an argument, it then return the file path of the downloaded video
  """
  try:
    yt = YouTube(vid_link)
    vid_title = yt.title.split('|')[0] + "- Full Documentary Film"
    logger.info(f"Retrieved video: {vid_title}")

    file_path = f"/content/{vid_title}.mp4"

    if os.path.isfile(file_path):
      logger.info("Video already downloaded!")
      return file_path

    else:
      try:
        video = yt.streams.get_highest_resolution().download(filename=vid_title)
        logging.info("Downloading video...")
        os.rename(video, file_path)  # Rename the file to ensure consistent naming
        logging.info("Download is completed successfully!")
        return file_path

      except Exception as e:
        logger.error(f"Failed to download video: {vid_title}. Error: {e}")
        return None

  except Exception as e:
    logger.error(f"Could not find the video. Please double-check the link provided. Error: {e}")
    return None



In [5]:
def split_video(filepath: str, num_parts: int = 4):
  """
  Split the video into specified number of parts.
  """
  video = VideoFileClip(filepath)
  duration = video.duration
  video.close()
  part_duration = duration / num_parts

  def process_part(i):
    part_path= f"/content/part{i+1}.mp4"
    target_name = f"part{i + 1}.mp4"
    if os.path.isfile(target_name):
      logger.info(f"{target_name} already exists, skipping...")
    else:
      start_time = i * part_duration
      end_time = (i + 1) * part_duration
      ffmpeg_extract_subclip(filepath, start_time, end_time, targetname=target_name)
      logging.info(f"{target_name} created successfully.")
    return part_path

  with ThreadPoolExecutor() as executor:
    part_paths = list(tqdm(executor.map(process_part, range(num_parts)), total=num_parts))
  return part_paths

In [6]:
def frame_capture(video_path, output_folder, start_frame=1):
  """
  Capture frames from the video and save them in the output folder.
  Frames are counted starting from start_frame.
  """
  # Creating a global variable to store count of frames
  frame_counter = start_frame

  # Checking if the output folder is created or not
  if os.path.exists(output_folder):
    logger.info(f"Output folder already exists: {output_folder}")
  else:
    logger.info(f"Creating the output folder: {output_folder}")
    os.makedirs(output_folder)

    # Trying to open the video
    vid = cv2.VideoCapture(video_path)
    if not vid.isOpened():
          logger.error("Error opening video file")
          return None

    while True:
      ret, frame = vid.read()

      if not ret:
        logger.error(" Exiting the loop if no frames are left.")
        break


      frame_filename = os.path.join(output_folder, f"frame_{frame_counter}.jpg")
      cv2.putText(frame, f"Frame_{frame_counter}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
      cv2.imwrite(frame_filename, frame)
      frame_counter += 1

    vid.release()
    cv2.destroyAllWindows()
    logger.info(f"Extracted {frame_counter} frames to '{output_folder}'")

  return frame_counter

In [7]:
filepath = download("https://www.youtube.com/watch?v=6-ADixXflJw")
parts_path = split_video(filepath, 4)

total_frames = 0
for video_path in parts_path:
  output = video_path.split('/')[-1].split('.')[0]
  total_frames = frame_capture(video_path, output, start_frame=total_frames + 1)

INFO:__main__:Retrieved video: “Whatever it Takes”  Full Documentary Film – 4K Original Version- Full Documentary Film


Moviepy - Running:
>>> "+ " ".join(cmd)
Moviepy - Running:
>>> "+ " ".join(cmd)
Moviepy - Running:
>>> "+ " ".join(cmd)
Moviepy - Running:
>>> "+ " ".join(cmd)


100%|██████████| 4/4 [00:02<00:00,  1.59it/s]
INFO:__main__:Creating the output folder: part1


Moviepy - Command successful
Moviepy - Command successful
Moviepy - Command successful
Moviepy - Command successful


ERROR:__main__: Exiting the loop if no frames are left.
INFO:__main__:Extracted 33361 frames to 'part1'
INFO:__main__:Creating the output folder: part2
ERROR:__main__: Exiting the loop if no frames are left.
INFO:__main__:Extracted 66721 frames to 'part2'
INFO:__main__:Creating the output folder: part3
ERROR:__main__: Exiting the loop if no frames are left.
INFO:__main__:Extracted 100082 frames to 'part3'
INFO:__main__:Creating the output folder: part4
ERROR:__main__: Exiting the loop if no frames are left.
INFO:__main__:Extracted 133440 frames to 'part4'


In [10]:
directories = [
    "/content/part1",
    "/content/part2",
    "/content/part3",
    "/content/part4"
]
destination_folder = "/content/random_selection"
drive_folder = "/content/drive/MyDrive/Frames_video"
os.makedirs(destination_folder, exist_ok=True)

for path in directories:
  try:
    files = os.listdir(path)
    random_file = random.choice(files)
    random_file_path = os.path.join(path, random_file)
    shutil.copy(random_file_path, destination_folder)
  except (FileNotFoundError, NotADirectoryError) as e:
    print(f"Error accessing {path}: {e}")
  except IndexError:
    print(f"No files found in {path}")

In [None]:
#!pip install pytubefix

Collecting pytubefix
  Downloading pytubefix-6.8.1-py3-none-any.whl.metadata (4.6 kB)
Downloading pytubefix-6.8.1-py3-none-any.whl (73 kB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/73.8 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m73.8/73.8 kB[0m [31m3.3 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: pytubefix
Successfully installed pytubefix-6.8.1


In [9]:
#import shutil


#shutil.rmtree('/content/part1')
#shutil.rmtree('/content/part2')
#shutil.rmtree('/content/part3')
#shutil.rmtree('/content/part4')


In [None]:
# for i in range(len(parts_path)):
#   filepath = f"/content/part{i+1}.mp4"
#   video = VideoFileClip(filepath)
#   duration = video.duration
#   print(duration)

In [None]:
#######################################################

In [None]:
# shutil.rmtree('/content/sample_data')

NameError: name 'pytube_path' is not defined

In [1]:
pwd()

'/content'