Simple copy code

In [None]:
import os
import shutil

In [None]:
def copy_files_with_extension(source_directory, destination_directory, file_extension='', chunk_size=1000):
    # Create the destination directory if it doesn't exist
    os.makedirs(destination_directory, exist_ok=True)

    # Use os.scandir() for efficient file listing
    with os.scandir(source_directory) as entries:
        # Filter files with the specified extension
        files_list = sorted([entry.name for entry in entries if entry.is_file() and entry.name.endswith(file_extension)])

    # Split the list of files into sublists, each containing chunk_size elements
    file_chunks = [files_list[i * chunk_size:(i + 1) * chunk_size] for i in range((len(files_list) + chunk_size - 1) // chunk_size)]

    # Copy files to destination directories
    for i, files_chunk in enumerate(file_chunks):
        dest_path = os.path.join(destination_directory, str(i))
        os.makedirs(dest_path, exist_ok=True)

        for file in files_chunk:
            source_path = os.path.join(source_directory, file)
            destination_path = os.path.join(dest_path, file)

            try:
                shutil.copy2(source_path, destination_path)  # Use shutil.copy2 to preserve metadata
            except FileNotFoundError:
                print(f"Warning: {source_path} not found. Skipped.")

In [None]:
if __name__ == "__main__":
    source_dir = '/home/acer/workspace/videos/src'
    dest_dir = '/home/acer/workspace/videos/dest'
    file_extension = '.jpg'  # Specify the desired file extension, or leave it as an empty string to include all files
    copy_files_with_extension(source_dir, dest_dir, file_extension=file_extension, chunk_size=100)

Advanced copy code

In [None]:
import os
import shutil
import logging
import hashlib
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm

logging.basicConfig(level=logging.INFO)

In [None]:
def get_file_hash(file_path, block_size=65536):
    hasher = hashlib.sha256()
    with open(file_path, 'rb') as file:
        buffer = file.read(block_size)
        while len(buffer) > 0:
            hasher.update(buffer)
            buffer = file.read(block_size)
    return hasher.hexdigest()

def copy_file(source_path, destination_path, deduplicate=False, file_hashes=None):
    if deduplicate:
        source_hash = get_file_hash(source_path)
        destination_hash = get_file_hash(destination_path) if os.path.exists(destination_path) else None

        if source_hash == destination_hash:
            logging.info(f"File {source_path} is a duplicate. Skipped.")
            return

    try:
        shutil.copy2(source_path, destination_path)  # Use shutil.copy2 to preserve metadata
        logging.info(f"File {source_path} copied to {destination_path}")
    except FileNotFoundError:
        logging.warning(f"Warning: {source_path} not found. Skipped.")
    except Exception as e:
        logging.error(f"Error copying {source_path} to {destination_path}: {e}")

def copy_files_with_extension(source_directory, destination_directory, file_extension='', chunk_size=1000, deduplicate=False, parallelism=1):
    # Create the destination directory if it doesn't exist
    os.makedirs(destination_directory, exist_ok=True)

    # Use os.scandir() for efficient file listing and sort the files
    with os.scandir(source_directory) as entries:
        # Filter files with the specified extension and sort them
        files_list = sorted([entry.name for entry in entries if entry.is_file() and entry.name.endswith(file_extension)])

    # Split the list of files into sublists, each containing chunk_size elements
    file_chunks = [files_list[i * chunk_size:(i + 1) * chunk_size] for i in range((len(files_list) + chunk_size - 1) // chunk_size)]

    # Set up progress bar
    progress_bar = tqdm(total=len(files_list), desc="Copying Files")

    # Use ThreadPoolExecutor for parallel processing
    # with ThreadPoolExecutor(max_workers=os.cpu_count() or 1) as executor:
    with ThreadPoolExecutor(max_workers=parallelism) as executor:
        futures = set()
        file_hashes = set()  # For deduplication

        for i, files_chunk in enumerate(file_chunks):
            dest_path = os.path.join(destination_directory, str(i))
            os.makedirs(dest_path, exist_ok=True)

            for file in files_chunk:
                source_path = os.path.join(source_directory, file)
                destination_path = os.path.join(dest_path, file)
                futures.add(executor.submit(copy_file, source_path, destination_path, deduplicate, file_hashes))

            # Wait for a subset of file copying tasks to complete before moving to the next chunk
            if len(futures) >= parallelism:
                for future in as_completed(futures):
                    future.result()
                    progress_bar.update(1)
                futures.clear()

        # Wait for the remaining file copying tasks to complete
        for future in as_completed(futures):
            future.result()
            progress_bar.update(1)

    progress_bar.close()

In [None]:
if __name__ == "__main__":
    source_dir = '/home/acer/workspace/videos/src'
    dest_dir = '/home/acer/workspace/videos/dest'
    file_extension = '.jpg'  # Specify the desired file extension, or leave it as an empty string to include all files
    copy_files_with_extension(source_dir, dest_dir, file_extension=file_extension, chunk_size=100, deduplicate=True, parallelism=2)
