# 無制限クラウド

In [5]:
from yt_dlp import YoutubeDL
import re
import os
from typing import Generator
from typing import List,Tuple
import cv2
import numpy

EXPANSION_RATE = 8

class Encoder:
    OUTPUT_FMT = cv2.VideoWriter_fourcc(*'mp4v') # 作る動画のコーデック
    FRAME_RATE = 60 # 作る動画のフレームレート

    # バイトをピクセルの値に変換
    @staticmethod
    def _byte2pixels(byte:numpy.uint8) -> List[numpy.uint8]:
        pixels = []
        for _ in range(8):
            pixels.append((byte & 1) * 255)
            byte //= 2
        
        return pixels
    
    def __init__(
        self,
        save_path:str, # 動画の保存先
        output_shape:Tuple[int,int,int]=(270, 480, 3) # 動画サイズ
    ):
        self.batch_size:int = numpy.prod(output_shape) // 8 # 画像1枚あたりのバイト数
        self.output_shape = output_shape
        
        self.save_path = save_path

    # バイト列を画像に変換
    def _get_frame(self, batch:numpy.ndarray):
        frame = []
        for byte in batch:
            frame.extend(Encoder._byte2pixels(byte))

        # バイト列が画像サイズより小さかったら0で埋める
        if self.batch_size * 8 > len(frame):
            frame = numpy.concatenate([
                frame,
                numpy.zeros(self.batch_size * 8 - len(frame))
            ])
        
        frame = numpy.asarray(frame, dtype=numpy.uint8).reshape(self.output_shape)
        return frame

    # ファイルから画像を1枚ずつ生成
    def _frames(self) -> Generator[numpy.ndarray, None, None]:
        while True:
            batch = numpy.frombuffer(
                self.input_file.read(self.batch_size),
                dtype=numpy.uint8
            )
            yield self._get_frame(batch)

            # データがなくなったら終わり
            if self.batch_size != batch.shape[0]:
                return

    # 実行
    def run(
        self,
        input_path:str # 入力パス
    ):
        # 保存先
        output_file = os.path.join(
            self.save_path,
            input_path.split('/')[-1] + '.mp4'
        )

        # 動画作成
        writer = cv2.VideoWriter(
            output_file,
            Encoder.OUTPUT_FMT, Encoder.FRAME_RATE,
            (
                self.output_shape[1] * EXPANSION_RATE,
                self.output_shape[0] * EXPANSION_RATE
            ),
        )
        with open(input_path, 'rb') as self.input_file:
            for frame in self._frames():
                # 画像の大きさを8倍にして動画書き出し
                frame = cv2.resize(
                    frame,
                    dsize=None,
                    fx=EXPANSION_RATE, fy=EXPANSION_RATE,
                    interpolation=cv2.INTER_NEAREST
                )
                writer.write(frame)
        writer.release()

class Decoder:
    THRESHOLD = 128 # 元のビットを計算するときの閾値

    # 画像からバイト列に変換
    @staticmethod
    def _decode(frame:numpy.ndarray) -> bytes:
        # 画像の大きさを1/8倍にする
        frame = cv2.resize(
            frame,
            dsize=None,
            fx=1/EXPANSION_RATE, fy=1/EXPANSION_RATE,
            interpolation=cv2.INTER_NEAREST
        )
        frame = numpy.where(Decoder.THRESHOLD < frame, 1, 0) # 閾値超えてたら1
        
        frame = frame.ravel()
        buffer = []
        byte = 0
        count = 1
        for value in frame:
            byte += value * count
            count *= 2
            
            if 256 == count:
                buffer.append(byte)
                byte = 0
                count = 1
        
        return bytes(buffer)

    # 実行
    def run(
        self,
        input_path:str, # 入力パス
        output_folder:str # 保存先
    ):
        video = cv2.VideoCapture(input_path)
        
        buffer = b''
        while True:
            ret, frame = video.read()
            # 動画が終わったら終わり
            if not ret:
                break
            
            buffer += Decoder._decode(frame)
        buffer = re.sub(b'\x00+$', b'', buffer) # 末尾の0埋めを消す
        
        with open(os.path.join(
            output_folder,
            input_path.split('/')[-1][:-4]
        ), 'wb') as output_file:
            output_file.write(buffer)
            
        video.release()

class YTCloud:# tempは一時的なファイル置き場。中は最後には空になる。
    def __init__(self, temp_path:str='temp', save_path:str='encoded'):
        self.encoder = Encoder(save_path)
        self.decoder = Decoder()
        
        self.ytdl = YoutubeDL({
            'outtmpl': os.path.join(temp_path, '%(title)s.mp4'),
            'format': 'bestvideo'
        })
        
        self.temp_path = temp_path

    def encode(
        self,
        input_path:str # 入力パス
    ):
        self.encoder.run(input_path)

    def decode(
        self,
        input_link:str, # Youtubeリンク
        output_folder:str="復元完了" # 保存先。デフォルトは復元完了ディレクトリ
    ):
        self.ytdl.download([input_link])
        
        input_path = os.path.join(self.temp_path, os.listdir(self.temp_path)[-1])
        self.decoder.run(input_path, output_folder)
        os.remove(input_path)


# 実行

In [3]:
ytc = YTCloud()
ytc.encode("net.txt") # 引数は入力パス。encodedディレクトリに出力される。

Deprecated Feature: Support for Python version 3.7 has been deprecated. See  https://github.com/yt-dlp/yt-dlp/issues/7803  for details.
                    You may stop receiving updates on this version at any time! Please update to Python 3.8 or above


In [4]:
ytc.decode("https://youtu.be/cbqm5QepZu0") # 引数はyoutubeリンク, 保存先。デフォルト保存先は復元完了ディレクトリ。

TypeError: decode() missing 1 required positional argument: 'output_folder'