In [None]:
# ============================= FORM ============================= #
# @markdown #### ⬅️ Extract Files
MODE = "7Z"  # @param ["UNZIP", "UNTAR", "UNRAR", "7Z"]
PATH_TO_FILE = ""  # @param {type:"string"}
extractPath = ""  # @param {type:"string"}
ARCHIVE_PASSWORD = "" #@param {type:"string"}
DELETE_ORIGINAL = False #@param {type:"boolean"}

# ================================================================ #
import os
import urllib.request
import shutil
import tempfile
import time
import sys
import threading
import subprocess
from google.colab import drive
import re

HOME = os.path.expanduser("~")

# Install tqdm if not already installed
try:
    from tqdm.notebook import tqdm  # Use notebook version for better visuals
except ImportError:
    subprocess.run([sys.executable, "-m", "pip", "install", "tqdm"], check=True)
    from tqdm.notebook import tqdm

# Setting up helper functions
if not os.path.exists(f"{HOME}/.ipython/ocr.py"):
    hCode = "https://raw.githubusercontent.com/biplobsd/OneClickRun/master/res/ocr.py"
    urllib.request.urlretrieve(hCode, f"{HOME}/.ipython/ocr.py")

from ocr import checkAvailable

def runSh(cmd, output=True):
    extraction_commands = ['unzip', 'unrar', '7z', 'tar']
    is_extraction = any(cmd.startswith(ex_cmd) for ex_cmd in extraction_commands)

    if is_extraction and output:
        process = subprocess.Popen(
            cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
            universal_newlines=True, bufsize=1
        )

        total_files = 0
        processed_files = 0

        pbar = tqdm(total=100, desc="Extracting", unit='%', colour='blue',
                    bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt}% [{elapsed}<{remaining}]')

        def update_spinner():
            progress = 0
            while progress < 99 and process.poll() is None:
                time.sleep(0.5)
                progress += 1
                pbar.update(1)

        spinner_thread = threading.Thread(target=update_spinner)
        spinner_thread.daemon = True
        spinner_thread.start()

        for line in process.stdout:
            if 'Extracting' in line or 'extracting' in line:
                pbar.update(0)

        process.wait()
        pbar.close()

        returncode = process.returncode
        if returncode != 0:
            raise subprocess.CalledProcessError(returncode, cmd)

        return returncode
    else:
        process = subprocess.run(cmd, shell=True, check=True, text=True, stdout=subprocess.PIPE if output else None)
        if output:
            return process.stdout
        return process.returncode

def extractFiles():
    global extractPath

    if not PATH_TO_FILE:
        raise ValueError("❌ Please provide the path to the archive file")

    if not os.path.exists(PATH_TO_FILE):
        raise FileNotFoundError(f"❌ Archive file not found: {PATH_TO_FILE}")

    archive_size = os.path.getsize(PATH_TO_FILE)
    print(f"📦 Archive size: {archive_size / (1024*1024):.2f} MB")

    if not extractPath:
        extractPath = "/content/extract"

    should_delete = DELETE_ORIGINAL
    os.makedirs(extractPath, exist_ok=True)

    passADD = f'-p"{ARCHIVE_PASSWORD}"' if ARCHIVE_PASSWORD else ''

    try:
        if MODE == "UNZIP":
            print(f"🔄 Extracting ZIP archive to {extractPath}...")
            runSh(f'unzip {passADD} "{PATH_TO_FILE}" -d "{extractPath}"', output=True)
        elif MODE == "UNRAR":
            print(f"🔄 Extracting RAR archive to {extractPath}...")
            runSh(f'unrar x "{PATH_TO_FILE}" "{extractPath}" {passADD} -o+', output=True)
        elif MODE == "UNTAR":
            print(f"🔄 Extracting TAR archive to {extractPath}...")
            runSh(f'tar -C "{extractPath}" -xvf "{PATH_TO_FILE}"', output=True)
        else:  # 7Z
            print(f"🔄 Extracting 7Z archive to {extractPath}...")
            runSh(f'7z x "{PATH_TO_FILE}" -o"{extractPath}" {passADD} -y', output=True)

        print(f"✅ Extraction completed to: {extractPath}")

    except Exception as e:
        print(f"❌ Error during extraction: {str(e)}")
        raise

try:
    print(f"🚀 Starting extraction of {PATH_TO_FILE} using {MODE} mode")
    print(f"🗑️ Delete original file after extraction: {DELETE_ORIGINAL}")

    print("\n" + "="*60)
    print(f" 🚀 EXTRACTION PROCESS STARTED ".center(60, "="))
    print("="*60 + "\n")

    start_time = time.time()
    extractFiles()
    total_time = time.time() - start_time

    print("\n" + "="*60)
    print(f" ✅ EXTRACTION COMPLETED SUCCESSFULLY ".center(60, "="))
    print(f" ⏱️ Total time: {total_time:.2f} seconds ".center(60, "="))
    print("="*60 + "\n")

except Exception as e:
    print("\n" + "="*60)
    print(f" ❌ EXTRACTION FAILED ".center(60, "="))
    print("="*60)
    print(f"Failed to complete extraction: {str(e)}")


In [None]:
# Data Archiver Tool
# Supports archiving in TAR, ZIP, and 7z formats

from IPython.display import clear_output
import subprocess
import sys


def install_packages():
    subprocess.check_call([sys.executable, "-m", "pip", "install", "py7zr", "pyminizip"],
                         stdout=subprocess.DEVNULL,
                         stderr=subprocess.DEVNULL)
    clear_output(wait=True)


# Install required packages
install_packages()

import os
import tarfile
import zipfile
import py7zr
import shutil
from pathlib import Path
from tqdm.notebook import tqdm
import getpass

# Input parameters
ARCHIVE_FORMAT = "7Z" #@param ["TAR", "ZIP", "7Z"]
FILE_PATH = "" #@param {type:"string"}
USE_MANUAL_INPUT = False #@param {type:"boolean"}
SAVE_PATH = "" #@param {type:"string"}
ARCHIVE_PASSWORD = "" #@param {type:"string"}

def get_archive_extension(format_type):
    """Return the appropriate file extension based on the archive format."""
    if format_type == "TAR":
        return ".tar"
    elif format_type == "ZIP":
        return ".zip"
    elif format_type == "7Z":
        return ".7z"
    return ""

def get_archive_name(path):
    """Generate archive name based on the input path."""
    if os.path.isfile(path):
        # For single file, use the file name without extension
        base_name = os.path.basename(path)
        name_without_ext = os.path.splitext(base_name)[0]
        return name_without_ext
    else:
        # For directory, use the directory name
        return os.path.basename(os.path.normpath(path))

def get_total_size(paths):
    """Calculate total size of files to be archived."""
    total = 0
    for path in paths:
        if os.path.isfile(path):
            total += os.path.getsize(path)
        else:
            for dirpath, dirnames, filenames in os.walk(path):
                for f in filenames:
                    fp = os.path.join(dirpath, f)
                    if os.path.isfile(fp):
                        total += os.path.getsize(fp)
    return total

def create_tar_archive(sources, output_path):
    """Create a TAR archive."""
    total_size = get_total_size(sources)

    with tarfile.open(output_path, "w") as tar:
        with tqdm(total=total_size, unit='B', unit_scale=True, desc="Creating TAR archive") as pbar:
            for source in sources:
                if os.path.isfile(source):
                    arcname = os.path.basename(source)
                    tar.add(source, arcname=arcname)
                    pbar.update(os.path.getsize(source))
                else:
                    parent_dir = os.path.basename(os.path.normpath(source))
                    for dirpath, dirnames, filenames in os.walk(source):
                        for f in filenames:
                            fp = os.path.join(dirpath, f)
                            if os.path.isfile(fp):
                                arcname = os.path.join(parent_dir, os.path.relpath(fp, source))
                                tar.add(fp, arcname=arcname)
                                pbar.update(os.path.getsize(fp))

def create_zip_archive(sources, output_path, password=None):
    """Create a ZIP archive with optional password protection."""
    total_size = get_total_size(sources)

    # Password-protected ZIP requires a different approach
    import pyminizip

    if password:
        # Create a temporary directory for organizing files
        temp_dir = os.path.join(os.path.dirname(output_path), "temp_zip_dir")
        os.makedirs(temp_dir, exist_ok=True)

        try:
            # Prepare files in temp directory
            source_files = []
            dest_names = []

            with tqdm(total=total_size, unit='B', unit_scale=True, desc="Preparing files") as pbar:
                for source in sources:
                    if os.path.isfile(source):
                        basename = os.path.basename(source)
                        dest = os.path.join(temp_dir, basename)
                        shutil.copy2(source, dest)
                        source_files.append(dest)
                        dest_names.append(basename)
                        pbar.update(os.path.getsize(source))
                    else:
                        parent_dir = os.path.basename(os.path.normpath(source))

                        for dirpath, dirnames, filenames in os.walk(source):
                            for d in dirnames:
                                rel_path = os.path.relpath(os.path.join(dirpath, d), source)
                                os.makedirs(os.path.join(temp_dir, parent_dir, rel_path), exist_ok=True)

                            for f in filenames:
                                src_file = os.path.join(dirpath, f)
                                if os.path.isfile(src_file):
                                    rel_path = os.path.relpath(dirpath, source)
                                    dest_file_dir = os.path.join(temp_dir, parent_dir, rel_path)
                                    os.makedirs(dest_file_dir, exist_ok=True)
                                    dest_file = os.path.join(dest_file_dir, f)
                                    shutil.copy2(src_file, dest_file)

                                    # Add to our list for zipping
                                    source_files.append(dest_file)
                                    dest_names.append(os.path.join(parent_dir, rel_path, f))
                                    pbar.update(os.path.getsize(src_file))

            # Create the password-protected ZIP
            print("Creating password-protected ZIP archive...")
            pyminizip.compress_multiple(source_files, dest_names, output_path, password, 5)
            print("ZIP archive with password protection created successfully!")

        finally:
            # Clean up temporary directory
            if os.path.exists(temp_dir):
                shutil.rmtree(temp_dir)
    else:
        # Regular ZIP without password
        with zipfile.ZipFile(output_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
            with tqdm(total=total_size, unit='B', unit_scale=True, desc="Creating ZIP archive") as pbar:
                for source in sources:
                    if os.path.isfile(source):
                        arcname = os.path.basename(source)
                        zipf.write(source, arcname=arcname)
                        pbar.update(os.path.getsize(source))
                    else:
                        parent_dir = os.path.basename(os.path.normpath(source))
                        for dirpath, dirnames, filenames in os.walk(source):
                            for f in filenames:
                                fp = os.path.join(dirpath, f)
                                if os.path.isfile(fp):
                                    arcname = os.path.join(parent_dir, os.path.relpath(fp, source))
                                    zipf.write(fp, arcname=arcname)
                                    pbar.update(os.path.getsize(fp))

def create_7z_archive(sources, output_path, password=None):
    """Create a 7z archive with optional password protection."""
    total_size = get_total_size(sources)

    # Create a temporary directory for organizing files
    temp_dir = os.path.join(os.path.dirname(output_path), "temp_7z_dir")
    os.makedirs(temp_dir, exist_ok=True)

    try:
        with tqdm(total=total_size, unit='B', unit_scale=True, desc="Preparing files") as pbar:
            for source in sources:
                if os.path.isfile(source):
                    dest = os.path.join(temp_dir, os.path.basename(source))
                    shutil.copy2(source, dest)
                    pbar.update(os.path.getsize(source))
                else:
                    parent_dir = os.path.basename(os.path.normpath(source))
                    dest_dir = os.path.join(temp_dir, parent_dir)
                    os.makedirs(dest_dir, exist_ok=True)

                    for dirpath, dirnames, filenames in os.walk(source):
                        for d in dirnames:
                            rel_path = os.path.relpath(os.path.join(dirpath, d), source)
                            os.makedirs(os.path.join(dest_dir, rel_path), exist_ok=True)

                        for f in filenames:
                            src_file = os.path.join(dirpath, f)
                            if os.path.isfile(src_file):
                                rel_path = os.path.relpath(dirpath, source)
                                dest_file_dir = os.path.join(dest_dir, rel_path)
                                os.makedirs(dest_file_dir, exist_ok=True)
                                dest_file = os.path.join(dest_file_dir, f)
                                shutil.copy2(src_file, dest_file)
                                pbar.update(os.path.getsize(src_file))

        print("Creating 7z archive...")
        # Configure proper encryption for 7z
        filters = [{"id": py7zr.FILTER_LZMA2, "preset": 9}]

        # For 7z, we need to ensure we're using the right encryption method
        if password:
            # Use AES-256 encryption when password is provided
            with py7zr.SevenZipFile(output_path, 'w', filters=filters, password=password,
                                  header_encryption=True) as archive:
                for item in os.listdir(temp_dir):
                    archive.writeall(os.path.join(temp_dir, item), item)
        else:
            # No password
            with py7zr.SevenZipFile(output_path, 'w', filters=filters) as archive:
                for item in os.listdir(temp_dir):
                    archive.writeall(os.path.join(temp_dir, item), item)

        print("7z archive created successfully!")

    finally:
        # Clean up temporary directory
        if os.path.exists(temp_dir):
            shutil.rmtree(temp_dir)

def main():
    # Create a list of source paths
    source_paths = []

    if USE_MANUAL_INPUT:
        print("Enter file/directory paths (type 'Exit' when done):")
        while True:
            path = input("Path: ")
            if path.lower() == 'exit':
                break
            if os.path.exists(path):
                source_paths.append(path)
            else:
                print(f"Path does not exist: {path}")
    else:
        if FILE_PATH and os.path.exists(FILE_PATH):
            source_paths.append(FILE_PATH)
        else:
            print(f"Path does not exist: {FILE_PATH}")
            return

    if not source_paths:
        print("No valid paths provided.")
        return

    # Determine archive name and path
    archive_name = get_archive_name(source_paths[0] if len(source_paths) == 1 else "Archive")
    extension = get_archive_extension(ARCHIVE_FORMAT)

    if SAVE_PATH:
        os.makedirs(SAVE_PATH, exist_ok=True)
        output_path = os.path.join(SAVE_PATH, f"{archive_name}{extension}")
    else:
        output_path = f"{archive_name}{extension}"

    # Create the archive based on the selected format
    password = ARCHIVE_PASSWORD if ARCHIVE_PASSWORD else None

    try:
        if ARCHIVE_FORMAT == "TAR":
            create_tar_archive(source_paths, output_path)
            print(f"TAR archive created at: {output_path}")
            if password:
                print("Note: TAR format does not support password protection.")
        elif ARCHIVE_FORMAT == "ZIP":
            create_zip_archive(source_paths, output_path, password)
            print(f"ZIP archive created at: {output_path}")
            if password:
                print("ZIP archive is password protected.")
        elif ARCHIVE_FORMAT == "7Z":
            create_7z_archive(source_paths, output_path, password)
            print(f"7Z archive created at: {output_path}")
            if password:
                print("7Z archive is password protected.")
    except Exception as e:
        print(f"Error creating archive: {e}")

if __name__ == "__main__":
    main()

## <img src='https://upload.wikimedia.org/wikipedia/commons/thumb/e/e4/Rclone_wide_logo.svg/1920px-Rclone_wide_logo.svg.png' height="50" alt="rclone-logo"/>
##**Mount Other Drives to Google Colab**

In [None]:
#@markdown <center> <strong> <h1> ⚙️ RUN THIS CELL TO INSTALL DEPENDENCIES ⚙️ </strong> </center>
import os
import subprocess
import re

# Install fuse3 (if not already installed)
print("Installing fuse3...")
os.system("apt-get -y install fuse3 > /dev/null 2>&1")

# Fetch the latest rclone version dynamically
print("Fetching the latest rclone version...")
version_info = subprocess.run(["curl", "-s", "https://downloads.rclone.org/version.txt"], capture_output=True, text=True).stdout
latest_version_match = re.search(r"rclone v([\d.]+)", version_info)

if latest_version_match:
    latest_version = latest_version_match.group(1)
    print(f"Latest rclone version found: {latest_version}")
else:
    print("Failed to fetch the latest rclone version.")
    exit()

# Download and install the latest rclone version
print(f"Installing rclone v{latest_version}...")
os.system(f"wget -q https://downloads.rclone.org/v{latest_version}/rclone-v{latest_version}-linux-amd64.deb")
os.system(f"apt install -y ./rclone-v{latest_version}-linux-amd64.deb > /dev/null 2>&1")

# Verify the installation
print("Verifying rclone installation...")
os.system("rclone version")

In [None]:
#@markdown <h1><strong><center> Authenticate 🔐
!rclone config

In [None]:
#@markdown <h1> <strong> <center> Mount Drive 🔥
import os

# User input for the drive name (Google Colab's @param style)
drive_name = ""  #@param {type:"string"}

# Create mount point dynamically
mount_point = f"/content/{drive_name}"
os.system(f"mkdir -p {mount_point}")

# Mount the drive dynamically using the user-defined name
os.system(f"nohup rclone --vfs-cache-mode writes mount {drive_name}: {mount_point} &")

print(f"✅ Drive '{drive_name}' successfully mounted at '{mount_point}'.")
