<a href="https://colab.research.google.com/github/MBayezid/data-into-video-demo/blob/main/data_convertion(Main).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## 🧩 How It Works (Simplified)

### 1. **Your Files → ZIP Package**  
   *"Bundles everything like a digital suitcase"*

### 2. **ZIP → Pixel Art**  
   *"Converts 1s/0s to colorful pixels (think Minecraft data art!)"*

### 3. **Pixels → Video**  
   *"Makes a 'slideshow' that secretly holds your files"*

### 4. **Video → Original Files**  
   *"Reverse the magic to get your stuff back!"*

---

## 🚀 Try It Yourself

### Encode Files (3 Lines!)
```python
# 1. Choose files
my_files = ["cat.jpg", "diary.txt"]

# 2. Make video
files_to_video(my_files, "secret_cat_video.avi")

# 3. Upload to YouTube!
```

### Decode Anytime
```python
# 1. Download video
# 2. Run this:
video_to_files("secret_cat_video.avi", "recovered_files")

```



---



---



# ========== Imports ========== #

In [None]:
# !pip uninstall ffmpeg ffmpeg-python -y
!pip install ffmpeg-python
# !pip install PIL

Collecting ffmpeg-python
  Downloading ffmpeg_python-0.2.0-py3-none-any.whl.metadata (1.7 kB)
Downloading ffmpeg_python-0.2.0-py3-none-any.whl (25 kB)
Installing collected packages: ffmpeg-python
Successfully installed ffmpeg-python-0.2.0


In [None]:
import os
import zipfile
import numpy as np
from PIL import Image
import ffmpeg


# ========== ENCODING ========== #

## === 1. Import Files === ##

In [None]:
from google.colab import files

def select_files():
    """
    Prompts the user to select or create a directory to store uploaded files.
    Returns a list of selected file paths.
    """

    # Get user input for directory path
    while True:
        uploaded_dir = input("Enter directory to store files (or leave empty): ")

        # Use default if input is empty
        if not uploaded_dir:
            uploaded_dir = "uploaded_files"
            break


        # Check if directory exists, create if not
        if not os.path.exists(uploaded_dir):
            create_dir = input(f"Directory '{uploaded_dir}' does not exist. Create it? (y/n): ")
            if create_dir.lower() == 'y' or create_dir.lower()=='':

                break
            else:
                print("Please enter a valid directory or choose to create one.")
        else:
            break

    # Change to the selected directory
    os.makedirs(uploaded_dir, exist_ok=True)
    # os.chdir(uploaded_dir)
    print(f"Uploading to: {uploaded_dir}")

    # Upload files
    print("Select files to upload:")
    uploaded = files.upload(target_dir = uploaded_dir)
    return list(uploaded.keys())

## === 2. Files ->  ZIP === ##

In [None]:
def create_archive(file_paths, ziped_out_dir="temp_archives", zip_filename="original.zip"):
  """
  Zips multiple files into a single zip archive.

  Args:
    file_paths: A list of file paths to be zipped.
    zip_filename: The name of the output zip file (including .zip extension).
    ziped_out_dir: The directory where the zip file will be saved.
  """
  try:
    if not zip_filename.endswith(".zip"):
      zip_filename = zip_filename + ".zip"

    full_zip_path = os.path.join(ziped_out_dir, zip_filename)
    os.makedirs(ziped_out_dir, exist_ok=True)

    with zipfile.ZipFile(full_zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
      for file_path in file_paths:
        if os.path.exists(file_path):
          zipf.write(file_path, arcname=os.path.basename(file_path))
        else:
          print(f"Warning: File not found: {file_path}")
    print(f"Successfully created {full_zip_path}")
    return full_zip_path

  except FileNotFoundError:
    print(f"Error: One or more files not found.")
  except Exception as e:
    print(f"An error occurred: {e}")


## === 3. ZIP -> Images/Frames  === ##

In [None]:
def create_frames(data, frame_size):
    """Convert binary to RGB frames with padding"""
    bytes_per_frame = frame_size[0] * frame_size[1] * 3
    padding = (bytes_per_frame - (len(data) % bytes_per_frame)) % bytes_per_frame
    padded_data = data + b'\x00' * padding
    return [
        np.frombuffer(padded_data[i:i+bytes_per_frame], dtype=np.uint8)
        for i in range(0, len(padded_data), bytes_per_frame)
    ]


## === 4. Frames -> Video  === ##

In [None]:
def encode_video(frames, output_path, frame_size):
    """Lossless video encoding with error handling"""
    try:
        process = (
            ffmpeg
            .input('pipe:', format='rawvideo', pix_fmt='rgb24',
                  s=f'{frame_size[0]}x{frame_size[1]}')
            .output(output_path, vcodec='ffv1')
            .overwrite_output()
            .run_async(pipe_stdin=True)
        )

        for frame in frames:
            process.stdin.write(frame.tobytes())

        process.stdin.close()
        process.wait()

    except ffmpeg.Error as e:
        print(f"FFmpeg error: {e.stderr.decode()}")
        raise

# ========== DECODING ========== #

## === 4. Video -> Frames  === ##

In [None]:
def extract_frames(input_video):
    """Decode video to frames"""
    probe = ffmpeg.probe(input_video)
    video_info = next(s for s in probe['streams'] if s['codec_type'] == 'video')

    width = int(video_info['width'])
    height = int(video_info['height'])
    frame_size = width * height * 3  # RGB (3 channels)

    out, _ = (
        ffmpeg
        .input(input_video)
        .output('pipe:', format='rawvideo', pix_fmt='rgb24')
        .run(capture_stdout=True)
    )

    return [
        np.frombuffer(out[i:i+frame_size], dtype=np.uint8)
        for i in range(0, len(out), frame_size)
    ]


## === 3.Frames -> Ziip === ##

In [None]:
def extract_archive(zip_path, output_dir="extracted_files"):
    """
    Extracts a zip archive with comprehensive error handling

    Args:
        zip_path (str): Path to zip file
        output_dir (str): Directory to extract files to

    Returns:
        bool: True if successful, False otherwise
    """
    try:
        if not os.path.exists(zip_path):
            raise FileNotFoundError(f"Zip file not found: {zip_path}")

        os.makedirs(output_dir, exist_ok=True)

        with zipfile.ZipFile(zip_path, 'r') as zipf:
            zipf.extractall(output_dir)

        return True

    except zipfile.BadZipFile:
        print("Error: File is not a valid ZIP archive")
    except Exception as e:
        print(f"Extraction failed: {str(e)}")

    return False

# ========== CORE WORKFLOWS ========== #

In [None]:
# ========== ENCODING WORKFLOWS ========== #
def files_to_video(output_video='data_video.avi', frame_size=(640, 480)):
    """Main encoding workflow with size header"""
    input_files = select_files()
    zip_path = create_archive(input_files)

    with open(zip_path, 'rb') as f:
        original_data = f.read()

    # Add size header
    header = len(original_data).to_bytes(4, 'big')
    data_with_header = header + original_data

    frames = create_frames(data_with_header, frame_size)
    encode_video(frames, output_video, frame_size)


# ========== DECODING WORKFLOWS ========== #
def video_to_files(input_video, output_dir):
    """Main decoding workflow with size parsing"""
    frames = extract_frames(input_video)
    all_data = b''.join([frame.tobytes() for frame in frames])

    # Extract original size from header
    data_size = int.from_bytes(all_data[:4], 'big')
    original_data = all_data[4:4+data_size]

    zip_path = os.path.join(output_dir, 'output.zip')
    with open(zip_path, 'wb') as f:
        f.write(original_data)

    extract_archive(zip_path, output_dir)

# ========== USAGE ========== #

In [None]:
# Encode
files_to_video(frame_size=(256, 256))

Enter directory to store files (or leave empty): 
Uploading to: uploaded_files
Select files to upload:


Saving How to get started with Drive.pdf to uploaded_files/How to get started with Drive.pdf
Saving DSC_0813.JPG to uploaded_files/DSC_0813.JPG
Saving MUKTIJUDDH- JAHANGIR HOSSAIN.doc to uploaded_files/MUKTIJUDDH- JAHANGIR HOSSAIN.doc
Saving Ahahara of Hindu Leaders.docx to uploaded_files/Ahahara of Hindu Leaders.docx
Saving AOUDIT  REPORT.docx to uploaded_files/AOUDIT  REPORT.docx
Saving 2096.doc to uploaded_files/2096.doc
Saving image2.jpg to uploaded_files/image2.jpg
Saving image1.jpg to uploaded_files/image1.jpg
Saving image3.jpg to uploaded_files/image3.jpg
Saving New data plan.jpg to uploaded_files/New data plan.jpg
Saving TEACHAR NAME & MOBIL.doc to uploaded_files/TEACHAR NAME & MOBIL.doc
Saving BANGLA.docx to uploaded_files/BANGLA.docx
Saving Saroni .docx to uploaded_files/Saroni .docx
Saving IMG_20170808_0001.pdf to uploaded_files/IMG_20170808_0001.pdf
Saving short stories.doc to uploaded_files/short stories.doc
Saving অ্যাসাইনমেন্ট (সংশোধিত), মোল্লাহাট, বাগেরহাট.docx to uploa

In [None]:
# Decode
video_to_files('data_video.avi', 'temp_archives')

In [None]:
# Compare original and extracted files
original = open('temp_archives/original.zip', 'rb').read()
extracted = open('temp_archives/output.zip', 'rb').read()
print("Match:", original == extracted)  # Should print True

Match: True
