In [2]:
!pip install requests tqdm trimesh thingi10k numpy-stl numpy



In [2]:
import os
import requests
import zipfile
import shutil
import trimesh
import thingi10k
import numpy as np
from stl import mesh
from tqdm import tqdm
from urllib.parse import urlencode
import warnings
!pip install scipy



In [None]:
import os
import scipy.io
import requests
import zipfile
import shutil
import trimesh
import numpy as np
import thingi10k
from stl import mesh
from tqdm import tqdm
import warnings
from urllib.parse import urlencode
from huggingface_hub import login, snapshot_download

class DatasetsManager:
    """
    A manager class to download, process, and structure 3D datasets for ML tasks.
    Follows a consistent structure: data/[dataset]/[category]/[type]/[file]
    """
    def __init__(self, root_dir="data"):
        self.root_dir = root_dir
        self.thingi10k_dir = os.path.join(root_dir, "thingi10k")
        self.modelnet_dir = os.path.join(root_dir, "ModelNet40")
        self.abc_dir = os.path.join(root_dir, "abc_dataset")
        self.objectnet_dir = os.path.join(root_dir, "objectnet3d")
        self.shapenet_dir = os.path.join(root_dir, "shapenet")
        self.custom_data_dir = os.path.join(root_dir, "custom_dataset")
        
        os.makedirs(self.root_dir, exist_ok=True)
        print(f"DatasetsManager initialized. All data will be stored in '{self.root_dir}'")

    def prepare_all_datasets(self):
        print("\n--- Starting all dataset preparation processes ---")
        self.prepare_thingi10k()
        self.prepare_modelnet40()
        self.prepare_abc_dataset()
        self.prepare_objectnet3d()
        self.prepare_shapenet()
        self.prepare_custom_dataset()
        print("\n--- All dataset preparation processes are complete! ---")
    
    # ... [Остальные методы (prepare_thingi10k, prepare_modelnet40, и т.д.) остаются без изменений] ...
    def prepare_thingi10k(self):
        print("\n[1/6] Processing Thingi10k...")
        models_out_dir = os.path.join(self.thingi10k_dir, "models")
        os.makedirs(models_out_dir, exist_ok=True)

        warnings.filterwarnings("ignore", category=UserWarning, module="thingi10k")
        try:
            thingi10k.init()
        except Exception as e:
            print(f"Could not initialize Thingi10k dataset. Maybe servers are down? Error: {e}")
            return

        for entry in tqdm(thingi10k.dataset(), desc="Converting Thingi10k"):
            file_id = entry["file_id"]
            output_filepath = os.path.join(models_out_dir, f"{file_id}.stl")
            if os.path.exists(output_filepath):
                continue
            
            try:
                with np.load(entry["file_path"]) as data:
                    vertices = np.asarray(data["vertices"], dtype=np.float64)
                    facets = np.asarray(data["facets"], dtype=np.int32)
                
                if vertices.shape[0] < 3 or facets.shape[0] == 0: continue

                mesh_data = vertices[facets]
                stl_mesh = mesh.Mesh(np.zeros(mesh_data.shape[0], dtype=mesh.Mesh.dtype))
                stl_mesh.vectors = mesh_data
                stl_mesh.save(output_filepath)
            except Exception as e:
                print(f"Skipping Thingi10k file_id {file_id}. Error: {e}")
        print("Thingi10k preparation complete.")

    def prepare_modelnet40(self):
        print("\n[2/6] Processing ModelNet40...")
        url = "http://modelnet.cs.princeton.edu/ModelNet40.zip"
        zip_path = os.path.join(self.root_dir, "ModelNet40.zip")
        
        if os.path.exists(self.modelnet_dir) and any(os.scandir(self.modelnet_dir)):
            is_processed = True
            for item in os.scandir(self.modelnet_dir):
                if item.is_dir():
                    # Check if 'train' or 'test' folders are gone
                    if 'train' in os.listdir(item.path) or 'test' in os.listdir(item.path):
                        is_processed = False
                        break
            if is_processed:
                print("ModelNet40 appears to be processed already. Skipping.")
                return

        if not os.path.exists(os.path.join(self.root_dir, "ModelNet40")):
             self._download_file(url, zip_path)
             print(f"Extracting {zip_path}...")
             with zipfile.ZipFile(zip_path, 'r') as zip_ref:
                 zip_ref.extractall(self.root_dir)
             os.remove(zip_path)

        for category_d in tqdm(os.scandir(self.modelnet_dir), desc="Processing ModelNet40 categories"):
            if not category_d.is_dir() or category_d.name.startswith('__'):
                continue
            
            for subfolder in ["train", "test"]:
                subfolder_path = os.path.join(category_d.path, subfolder)
                if not os.path.exists(subfolder_path):
                    continue
                
                for off_file in os.scandir(subfolder_path):
                    if off_file.name.endswith(".off"):
                        stl_filename = off_file.name.replace(".off", ".stl")
                        stl_filepath = os.path.join(category_d.path, stl_filename)
                        if os.path.exists(stl_filepath):
                            continue
                        try:
                            mesh = trimesh.load_mesh(off_file.path)
                            mesh.export(stl_filepath)
                        except Exception as e:
                            print(f"Failed to process {off_file.path}: {e}")
                shutil.rmtree(subfolder_path)
        print("ModelNet40 preparation complete.")
    
    def prepare_abc_dataset(self):
        """[3/6] Prepares ABC Dataset: downloads, extracts, filters for '512' .obj, and converts to .stl."""
        print("\n[3/6] Processing ABC Dataset...")
        url = "https://archive.nyu.edu/retrieve/120666/abc_full_100k_v00.zip"
        zip_path = os.path.join(self.root_dir, "abc_full_100k_v00.zip")
        
        # Correct path to the extracted folder, as seen in your screenshot.
        extracted_path = os.path.join(self.root_dir, "100k") 
        
        models_out_dir = os.path.join(self.abc_dir, "models")
        os.makedirs(models_out_dir, exist_ok=True)

        if os.path.exists(models_out_dir) and len(os.listdir(models_out_dir)) > 1000:
             print("ABC Dataset appears to be processed already. Skipping.")
             return

        if not os.path.exists(extracted_path):
            self._download_file(url, zip_path)
            print(f"Extracting {zip_path} (this may take a while)...")
            with zipfile.ZipFile(zip_path, 'r') as zip_ref:
                zip_ref.extractall(self.root_dir)
            os.remove(zip_path)

        # Paths to the '512' folders are built from the correct base path '100k'.
        dirs_to_process = [
            os.path.join(extracted_path, "train", "512"),
            os.path.join(extracted_path, "test", "512")
        ]

        for source_dir in tqdm(dirs_to_process, desc="Processing ABC splits"):
            if not os.path.exists(source_dir):
                print(f"Warning: Source directory not found: {source_dir}")
                continue
            
            for obj_file in tqdm(os.scandir(source_dir), desc=f"Converting from {os.path.basename(source_dir)}", leave=False):
                if not obj_file.name.endswith(".obj"):
                    continue
                
                stl_filename = obj_file.name.replace(".obj", ".stl")
                stl_filepath = os.path.join(models_out_dir, stl_filename)
                
                if os.path.exists(stl_filepath):
                    continue
                    
                try:
                    mesh = trimesh.load_mesh(obj_file.path)
                    mesh.export(stl_filepath)
                except Exception as e:
                    print(f"Failed to process {obj_file.path}: {e}")

        # Cleanup will now target the correct folder.
        print(f"Cleaning up original extracted folder: {extracted_path}")
        shutil.rmtree(extracted_path)
        print("ABC Dataset preparation complete.")
        
    def prepare_objectnet3d(self):
        """[4/6] Prepares ObjectNet3D: downloads, links images to models via .mat files, and converts to .stl."""
        print("\n[4/6] Processing ObjectNet3D...")
        
        os.makedirs(self.objectnet_dir, exist_ok=True)
        if len(os.listdir(self.objectnet_dir)) > 0:
            print("ObjectNet3D appears to be processed already. Skipping.")
            return

        temp_dir = os.path.join(self.root_dir, "ObjectNet3D_temp_processing")
        os.makedirs(temp_dir, exist_ok=True)

        try:
            urls = {
                "annotations": "ftp://cs.stanford.edu/cs/cvgl/ObjectNet3D/ObjectNet3D_annotations.zip",
                "cads": "ftp://cs.stanford.edu/cs/cvgl/ObjectNet3D/ObjectNet3D_cads.zip",
                "images": "ftp://cs.stanford.edu/cs/cvgl/ObjectNet3D/ObjectNet3D_images.zip"
            }
            unpacked_base_path = os.path.join(temp_dir, "ObjectNet3D")

            for name, url in urls.items():
                if name == "annotations" and os.path.exists(os.path.join(unpacked_base_path, "Annotations")):
                     print(f"{name} data already exists. Skipping download and extraction.")
                     continue
                
                zip_path = os.path.join(temp_dir, os.path.basename(url))
                self._download_file(url, zip_path)
                print(f"Extracting {name}...")
                with zipfile.ZipFile(zip_path, 'r') as zip_ref:
                    zip_ref.extractall(temp_dir)
                os.remove(zip_path)

            print("Processing CAD models (.off -> .stl)...")
            source_cad_dir = os.path.join(unpacked_base_path, "CAD", "off")
            if os.path.exists(source_cad_dir):
                for category_dir in tqdm(os.scandir(source_cad_dir), desc="Converting CAD categories"):
                    if not category_dir.is_dir(): continue
                    
                    category_name = category_dir.name
                    dest_model_dir = os.path.join(self.objectnet_dir, category_name, "models")
                    os.makedirs(dest_model_dir, exist_ok=True)
                    
                    for off_file in os.scandir(category_dir.path):
                        if not (off_file.name.endswith(".off") and len(off_file.name.split('.')[0]) == 6): continue
                        
                        stl_filename = off_file.name.replace(".off", ".stl")
                        stl_filepath = os.path.join(dest_model_dir, stl_filename)
                        
                        try:
                            mesh = trimesh.load_mesh(off_file.path)
                            mesh.export(stl_filepath)
                        except Exception as e:
                            print(f"Failed to process CAD {off_file.path}: {e}")

            print("Linking images to categories using .mat annotations...")
            source_ann_dir = os.path.join(unpacked_base_path, "Annotations")
            source_img_dir = os.path.join(unpacked_base_path, "Images")
            
            if os.path.exists(source_ann_dir) and os.path.exists(source_img_dir):
                for mat_file in tqdm(os.scandir(source_ann_dir), desc="Processing annotations"):
                    if not (mat_file.name.endswith(".mat") and len(mat_file.name.split('.')[0]) == 6): continue
                    
                    try:
                        mat = scipy.io.loadmat(mat_file.path)
                        record = mat['record'][0, 0]
                        img_filename = str(record['filename'][0])
                        
                        category_name = str(record['objects'][0, 0]['class'][0])
                        
                        source_img_path = os.path.join(source_img_dir, img_filename)
                        if not os.path.exists(source_img_path): continue
                            
                        dest_img_dir = os.path.join(self.objectnet_dir, category_name, "images")
                        os.makedirs(dest_img_dir, exist_ok=True)
                        dest_img_path = os.path.join(dest_img_dir, os.path.basename(source_img_path))

                        shutil.copy(source_img_path, dest_img_path)
                    except Exception as e:
                        print(f"Could not process annotation {mat_file.name}. Error: {e}")

        finally:
            print(f"Cleaning up temporary directory: {temp_dir}")
            if os.path.exists(temp_dir):
                shutil.rmtree(temp_dir)
        print("ObjectNet3D preparation complete.")
        
    # --- FINALLY CORRECTED METHOD FOR SHAPENET ---
    def prepare_shapenet(self):
        """[5/6] Prepares ShapeNetCore: downloads from HF, filters, and structures models with their screenshots."""
        print("\n[5/6] Processing ShapeNetCore from Hugging Face...")
        os.makedirs(self.shapenet_dir, exist_ok=True)
        
        hf_token = os.environ.get("HF_TOKEN")
        if not hf_token:
            print("Error: Hugging Face token not found. Please set the HF_TOKEN environment variable.")
            return
            
        try:
            login(token=hf_token)
            print("Hugging Face login successful.")
            repo_id = "ShapeNet/ShapeNetCore"
            hf_cache_dir = snapshot_download(repo_id=repo_id, repo_type="dataset")
            print(f"ShapeNet data available in cache: {hf_cache_dir}")
        except Exception as e:
            print(f"Failed to download from Hugging Face Hub: {e}")
            return
            
        temp_obj_path = os.path.join(self.root_dir, "shapenet_temp_model.obj")

        for zip_filename in tqdm(os.listdir(hf_cache_dir), desc="Processing ShapeNet Categories"):
            if not zip_filename.endswith('.zip'):
                continue
            
            category_id = zip_filename.replace('.zip', '')
            category_path = os.path.join(self.shapenet_dir, category_id)
            
            # --- FIX START ---
            # This is the new, robust check.
            # We check if the final destination 'models' folder exists AND is not empty.
            dest_models_dir = os.path.join(category_path, "models")
            if os.path.exists(dest_models_dir) and len(os.listdir(dest_models_dir)) > 0:
                # print(f"Skipping already processed category: {category_id}")
                continue
            # --- FIX END ---
            
            dest_images_dir = os.path.join(category_path, "images")
            os.makedirs(dest_models_dir, exist_ok=True)
            os.makedirs(dest_images_dir, exist_ok=True)
            
            zip_filepath = os.path.join(hf_cache_dir, zip_filename)
            
            try:
                with zipfile.ZipFile(zip_filepath, 'r') as zip_ref:
                    all_files = zip_ref.namelist()
                    
                    model_ids = sorted(list(set(
                        [f.split('/')[1] for f in all_files if f.count('/') >= 2]
                    )))
                    print(model_ids)


                    for model_id in tqdm(model_ids, desc=f"Models in {category_id}", leave=False):
                        internal_prefix = f"{category_id}/{model_id}"
                        screenshot_path_prefix = f"{internal_prefix}/screenshots/"

                        if not any(f.startswith(screenshot_path_prefix) for f in all_files):
                            continue
                        
                        model_path_prefix = f"{internal_prefix}/models/"
                        model_file_path_norm = f"{model_path_prefix}model_normalized.obj"
                        
                        model_file_path = model_file_path_norm if model_file_path_norm in all_files else next((f for f in all_files if f.startswith(model_path_prefix) and f.endswith(".obj")), None)
                        
                        if not model_file_path:
                            continue

                        stl_filepath = os.path.join(dest_models_dir, f"{model_id}.stl")
                        if os.path.exists(stl_filepath):
                            continue

                        try:
                            with zip_ref.open(model_file_path) as source_file, open(temp_obj_path, "wb") as target_file:
                                shutil.copyfileobj(source_file, target_file)
                            
                            mesh = trimesh.load_mesh(temp_obj_path)
                            mesh.export(stl_filepath)
                        finally:
                            if os.path.exists(temp_obj_path):
                                os.remove(temp_obj_path)

                        screenshot_files = [f for f in all_files if f.startswith(screenshot_path_prefix) and (f.endswith('.png') or f.endswith('.jpg'))]
                        for i, screenshot_file_path in enumerate(screenshot_files):
                            ext = ".png" if screenshot_file_path.endswith(".png") else ".jpg"
                            img_dest_path = os.path.join(dest_images_dir, f"{model_id}_{i}{ext}")
                            
                            with zip_ref.open(screenshot_file_path) as source, open(img_dest_path, "wb") as target:
                                shutil.copyfileobj(source, target)
            except Exception as e:
                print(f"An error occurred while processing {zip_filename}: {e}")
        
        print("ShapeNetCore preparation complete.")

    def prepare_custom_dataset(self):
        """[6/6] Downloads and structures the custom dataset from Yandex.Disk."""
        print("\n[6/6] Processing Custom Dataset from Yandex.Disk...")
        os.makedirs(self.custom_data_dir, exist_ok=True)
        
        if os.path.exists(self.custom_data_dir) and len(os.listdir(self.custom_data_dir)) > 0:
            print("Custom dataset appears to be prepared. Skipping.")
            return

        files_to_download = {
            "train_data": "https://disk.yandex.ru/d/RRXJu9ZtEmSXzQ",
            "test_data": "https://disk.yandex.ru/d/TmbB7BsGzg1dQQ",
        }
        for key, val in files_to_download.items():
            self.download_and_unzip_yandex_disk(val, self.custom_data_dir)

        print("Custom dataset preparation complete.")
        
    def _download_file(self, url, filename):
        print(f"Downloading {url} to {os.path.basename(filename)}...")
        try:
            if url.startswith('ftp://'):
                import urllib.request
                urllib.request.urlretrieve(url, filename)
                print(f"FTP download of {os.path.basename(filename)} complete.")
            else:
                with requests.get(url, stream=True) as r:
                    r.raise_for_status()
                    total_size = int(r.headers.get('content-length', 0))
                    with open(filename, 'wb') as f, tqdm(
                        total=total_size, unit='iB', unit_scale=True, desc=os.path.basename(filename)
                    ) as bar:
                        for chunk in r.iter_content(chunk_size=8192):
                            size = f.write(chunk)
                            bar.update(size)
        except Exception as e:
            print(f"Error downloading file {url}: {e}")
            if os.path.exists(filename): os.remove(filename)
            raise

    def download_and_unzip_yandex_disk(self, public_url, extract_to_folder):
        print(f"Starting to process Yandex.Disk link: {public_url}")
        base_api_url = "https://cloud-api.yandex.net/v1/disk/public/resources/download?"
        api_url = base_api_url + urlencode(dict(public_key=public_url))
        try:
            response = requests.get(api_url)
            response.raise_for_status()
            download_url = response.json().get("href")
            if not download_url:
                print(f"Error: Could not get a direct download link for {public_url}"); return

            zip_filename = os.path.join(self.root_dir, "temp_yandex_download.zip")
            self._download_file(download_url, zip_filename)

            with zipfile.ZipFile(zip_filename, "r") as zip_ref:
                zip_ref.extractall(extract_to_folder)
            
            os.remove(zip_filename)
        except requests.exceptions.RequestException as e:
            print(f"A network or API error occurred: {e}")
        except Exception as e:
            print(f"An unexpected error occurred: {e}")

if __name__ == "__main__":
    manager = DatasetsManager(root_dir="my_3d_datasets")
    manager.prepare_all_datasets()

DatasetsManager initialized. All data will be stored in 'my_3d_datasets'

[5/6] Processing ShapeNetCore from Hugging Face...


Note: Environment variable`HF_TOKEN` is set and is the current active token independently from the token you've just configured.


Hugging Face login successful.


Fetching 58 files:   0%|          | 0/58 [00:00<?, ?it/s]

ShapeNet data available in cache: /home/dima/.cache/huggingface/hub/datasets--ShapeNet--ShapeNetCore/snapshots/0efb24cbe6828a85771a28335c5f7b5626514d9b


Processing ShapeNet Categories: 100%|██████████| 58/58 [05:33<00:00,  5.75s/it]  

ShapeNetCore preparation complete.





In [3]:
# datasets_manager_fixed_for_jupyter.py
import os
import sys
import zipfile
import shutil
import requests
import tempfile
import logging
from urllib.parse import urlencode
from functools import partial
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
from tqdm.notebook import tqdm
import multiprocessing
import platform

# mesh libs
import trimesh
import numpy as np
from stl import mesh
import thingi10k
import scipy.io

logging.basicConfig(level=logging.INFO, format="[%(levelname)s] %(message)s")

# -----------------------
# Worker functions (MUST be module-level)
# -----------------------
def _download_stream(url, filename, timeout=30, max_retries=3, chunk_size=8192):
    session = requests.Session()
    adapter = requests.adapters.HTTPAdapter(max_retries=max_retries)
    session.mount("https://", adapter)
    session.mount("http://", adapter)
    try:
        with session.get(url, stream=True, timeout=timeout) as r:
            r.raise_for_status()
            with open(filename, "wb") as f:
                for chunk in r.iter_content(chunk_size=chunk_size):
                    if chunk:
                        f.write(chunk)
        return filename
    except Exception as e:
        if os.path.exists(filename):
            try: os.remove(filename)
            except Exception: pass
        raise RuntimeError(f"Download failed: {url} -> {e}")

def _convert_to_stl_worker(src_path, dst_path):
    """
    Convert a single mesh file to STL using trimesh.
    Returns (dst_path, None) on success or (dst_path, error_str) on failure.
    """
    try:
        if os.path.exists(dst_path):
            return (dst_path, None)
        mesh_obj = trimesh.load_mesh(src_path, force='mesh')
        if mesh_obj.is_empty:
            return (dst_path, "Empty mesh")
        os.makedirs(os.path.dirname(dst_path), exist_ok=True)
        mesh_obj.export(dst_path)
        return (dst_path, None)
    except Exception as e:
        return (dst_path, str(e))

def _thingi10k_np_to_stl_worker(npz_path, _id, dst_path):
    """
    Convert Thingi10k stored numpy arrays to STL using numpy-stl.
    _id is unused here but included so the signature matches when submitted.
    """
    try:
        if os.path.exists(dst_path):
            return (dst_path, None)
        with np.load(npz_path) as data:
            vertices = np.asarray(data["vertices"], dtype=np.float64)
            facets = np.asarray(data["facets"], dtype=np.int32)
        if vertices.shape[0] < 3 or facets.shape[0] == 0:
            return (dst_path, "Insufficient geometry")
        mesh_data = vertices[facets]
        stl_mesh = mesh.Mesh(np.zeros(mesh_data.shape[0], dtype=mesh.Mesh.dtype))
        stl_mesh.vectors = mesh_data
        os.makedirs(os.path.dirname(dst_path), exist_ok=True)
        stl_mesh.save(dst_path)
        return (dst_path, None)
    except Exception as e:
        return (dst_path, str(e))

# -----------------------
# Manager
# -----------------------
class DatasetsManager:
    def __init__(self, root_dir="data", max_workers_cpu=None, max_workers_io=None):
        self.root_dir = root_dir
        self.paths = {
            "thingi10k": os.path.join(root_dir, "thingi10k"),
            "modelnet": os.path.join(root_dir, "ModelNet40"),
            "abc": os.path.join(root_dir, "abc_dataset"),
            "objectnet": os.path.join(root_dir, "objectnet3d"),
            "shapenet": os.path.join(root_dir, "shapenet"),
            "custom": os.path.join(root_dir, "custom_dataset"),
        }
        os.makedirs(root_dir, exist_ok=True)

        cpu_count = max(1, (os.cpu_count() or 2) - 1)
        self.max_workers_cpu = max_workers_cpu or min(max(1, cpu_count), 12)
        self.max_workers_io = max_workers_io or min(16, (os.cpu_count() or 4) * 2)

        print(f"DatasetsManager(root_dir='{self.root_dir}')")
        print(f"CPU workers: {self.max_workers_cpu}, IO workers: {self.max_workers_io}")

    # ---------------- Parallel utilities ----------------
    def _get_process_context(self):
        """
        Prefer 'fork' on Unix-like systems (avoids pickling problems in many Jupyter setups).
        If not available (Windows), return default context.
        """
        try:
            if platform.system() != "Windows":
                return multiprocessing.get_context("fork")
        except Exception:
            pass
        return multiprocessing.get_context()  # default (spawn on Windows/Mac depending on Python)

    def _use_process_pool(self):
        """
        Try to create a ProcessPoolExecutor with a 'fork' context where available.
        If that fails (e.g., environment forbids), indicate fallback to threads.
        """
        ctx = self._get_process_context()
        try:
            # Quick test: create an executor and shut it down immediately to verify it's allowed
            with ProcessPoolExecutor(max_workers=1, mp_context=ctx) as ex:
                pass
            return ctx
        except Exception as e:
            # Can't use process pool (spawn in notebook often fails). We'll fallback to threads.
            logging.warning(f"Process pool unavailable (will fallback to ThreadPool): {e}")
            return None

    def _parallel_mesh_convert(self, jobs, desc="Converting meshes"):
        """
        jobs: list of (src, dst, job_type)
        job_type in {"general", "thingi10k"}
        Returns dict {dst: error_or_None}
        """
        results = {}
        if not jobs:
            return results

        # Try process pool first
        ctx = self._use_process_pool()
        if ctx is not None:
            # Use ProcessPoolExecutor with chosen context
            with ProcessPoolExecutor(max_workers=self.max_workers_cpu, mp_context=ctx) as exe:
                futures = {}
                for src, dst, jtype in jobs:
                    if jtype == "thingi10k":
                        fut = exe.submit(_thingi10k_np_to_stl_worker, src, os.path.basename(src), dst)
                    else:
                        fut = exe.submit(_convert_to_stl_worker, src, dst)
                    futures[fut] = dst

                for fut in tqdm(as_completed(futures), total=len(futures), desc=desc):
                    dst = futures[fut]
                    try:
                        dst_path, err = fut.result()
                        results[dst_path] = err
                    except Exception as e:
                        results[dst] = str(e)
            return results

        # Fallback: use ThreadPoolExecutor (works in notebooks but may be slower for CPU-bound)
        logging.warning("Falling back to ThreadPoolExecutor for conversions (less speed for heavy CPU work).")
        with ThreadPoolExecutor(max_workers=max(2, min(self.max_workers_cpu, 8))) as exe:
            futures = {}
            for src, dst, jtype in jobs:
                if jtype == "thingi10k":
                    fut = exe.submit(_thingi10k_np_to_stl_worker, src, os.path.basename(src), dst)
                else:
                    fut = exe.submit(_convert_to_stl_worker, src, dst)
                futures[fut] = dst
            for fut in tqdm(as_completed(futures), total=len(futures), desc=desc):
                dst = futures[fut]
                try:
                    dst_path, err = fut.result()
                    results[dst_path] = err
                except Exception as e:
                    results[dst] = str(e)
        return results

    def _parallel_downloads(self, download_tasks, desc="Downloading"):
        """
        download_tasks: list of (url, dest_path)
        returns list of (dest_path, error_or_None)
        """
        results = []
        if not download_tasks:
            return results

        with ThreadPoolExecutor(max_workers=self.max_workers_io) as exe:
            futures = {exe.submit(_download_stream, url, out): (url, out) for (url, out) in download_tasks}
            for fut in tqdm(as_completed(futures), total=len(futures), desc=desc):
                url, out = futures[fut]
                try:
                    res = fut.result()
                    results.append((out, None))
                except Exception as e:
                    results.append((out, str(e)))
        return results

    # ---------------- Dataset preparation (same structure as before) ----------------
    def prepare_all_datasets(self):
        order = [
            self.prepare_thingi10k,
            self.prepare_modelnet40,
            self.prepare_abc_dataset,
            self.prepare_objectnet3d,
            self.prepare_shapenet,
            self.prepare_custom_dataset
        ]
        for i, fn in enumerate(order, start=1):
            print(f"\n--- [{i}/{len(order)}] {fn.__name__} ---")
            fn()
        print("\nAll dataset preparations finished.")

    def prepare_thingi10k(self):
        out_models = os.path.join(self.paths["thingi10k"], "models")
        os.makedirs(out_models, exist_ok=True)
        try:
            thingi10k.init()
        except Exception as e:
            print(f"Thingi10k init failed: {e}")
            return
        jobs = []
        for entry in tqdm(thingi10k.dataset(), desc="Collect Thingi10k entries"):
            fid = entry.get("file_id")
            npz_path = entry.get("file_path")
            if not npz_path or not fid:
                continue
            dst = os.path.join(out_models, f"{fid}.stl")
            if os.path.exists(dst):
                continue
            jobs.append((npz_path, dst, "thingi10k"))
        if not jobs:
            print("No new Thingi10k jobs.")
            return
        print(f"Converting {len(jobs)} Thingi10k entries using up to {self.max_workers_cpu} workers...")
        res = self._parallel_mesh_convert(jobs, desc="Thingi10k -> STL")
        failed = [p for p, e in res.items() if e]
        if failed:
            print(f"Thingi10k: {len(failed)} conversions failed.")
        print("Thingi10k done.")

    def prepare_modelnet40(self):
        url = "http://modelnet.cs.princeton.edu/ModelNet40.zip"
        zip_path = os.path.join(self.root_dir, "ModelNet40.zip")
        out_root = self.paths["modelnet"]
        os.makedirs(self.root_dir, exist_ok=True)

        if os.path.exists(out_root) and any(os.scandir(out_root)):
            has_stl = False
            for entry in os.scandir(out_root):
                if entry.is_dir():
                    if any(f.name.endswith(".stl") for f in os.scandir(entry.path)):
                        has_stl = True
                        break
            if has_stl:
                print("ModelNet40 seems processed; skipping.")
                return

        if not os.path.exists(out_root):
            print("Downloading ModelNet40 zip...")
            self._parallel_downloads([(url, zip_path)], desc="Downloading ModelNet40")
            if not os.path.exists(zip_path):
                print("ModelNet40 zip missing after download.")
                return
            print("Extracting ModelNet40...")
            with zipfile.ZipFile(zip_path, "r") as z:
                z.extractall(self.root_dir)
            os.remove(zip_path)

        jobs = []
        for category_d in tqdm(os.scandir(out_root), desc="Collecting ModelNet categories"):
            if not category_d.is_dir() or category_d.name.startswith("__"):
                continue
            for sub in ["train", "test"]:
                subdir = os.path.join(category_d.path, sub)
                if not os.path.exists(subdir):
                    continue
                for file in os.scandir(subdir):
                    if file.name.endswith(".off"):
                        dst = os.path.join(category_d.path, file.name.replace(".off", ".stl"))
                        if os.path.exists(dst):
                            continue
                        jobs.append((file.path, dst, "general"))
                try:
                    shutil.rmtree(subdir)
                except Exception:
                    pass

        if jobs:
            print(f"Converting {len(jobs)} ModelNet files...")
            res = self._parallel_mesh_convert(jobs, desc="ModelNet40 conversions")
            failed = sum(1 for e in res.values() if e)
            print(f"ModelNet40 conversions finished. Failed: {failed}")
        else:
            print("No ModelNet conversions needed.")
        print("ModelNet40 done.")

    def prepare_abc_dataset(self):
        url = "https://archive.nyu.edu/retrieve/120666/abc_full_100k_v00.zip"
        zip_path = os.path.join(self.root_dir, "abc_full_100k_v00.zip")
        extracted_path = os.path.join(self.root_dir, "100k")
        models_out = os.path.join(self.paths["abc"], "models")
        os.makedirs(models_out, exist_ok=True)

        if os.path.exists(models_out) and len(os.listdir(models_out)) > 1000:
            print("ABC dataset seems processed; skipping.")
            return

        if not os.path.exists(extracted_path):
            print("Downloading ABC dataset (large)...")
            self._parallel_downloads([(url, zip_path)], desc="Downloading ABC")
            if not os.path.exists(zip_path):
                print("ABC zip not present after download.")
                return
            print("Extracting ABC (this will take a while)...")
            with zipfile.ZipFile(zip_path, "r") as z:
                z.extractall(self.root_dir)
            os.remove(zip_path)

        jobs = []
        for split in ["train/512", "test/512"]:
            src_dir = os.path.join(extracted_path, split)
            if not os.path.exists(src_dir):
                continue
            for file in os.scandir(src_dir):
                if file.name.endswith(".obj"):
                    dst = os.path.join(models_out, file.name.replace(".obj", ".stl"))
                    if os.path.exists(dst):
                        continue
                    jobs.append((file.path, dst, "general"))

        if jobs:
            print(f"Converting {len(jobs)} ABC models...")
            res = self._parallel_mesh_convert(jobs, desc="ABC conversions")
            failed = sum(1 for e in res.values() if e)
            print(f"ABC conversions complete. Failed: {failed}")
        else:
            print("No ABC conversions needed.")

        if os.path.exists(extracted_path):
            try:
                shutil.rmtree(extracted_path)
            except Exception as e:
                print(f"Could not remove ABC extracted folder: {e}")

    def prepare_objectnet3d(self):
        out_root = self.paths["objectnet"]
        os.makedirs(out_root, exist_ok=True)
        if os.listdir(out_root):
            print("ObjectNet3D directory not empty; skipping.")
            return

        temp_dir = os.path.join(self.root_dir, "ObjectNet3D_temp")
        os.makedirs(temp_dir, exist_ok=True)
        try:
            urls = {
                "annotations": "ftp://cs.stanford.edu/cs/cvgl/ObjectNet3D/ObjectNet3D_annotations.zip",
                "cads": "ftp://cs.stanford.edu/cs/cvgl/ObjectNet3D/ObjectNet3D_cads.zip",
                "images": "ftp://cs.stanford.edu/cs/cvgl/ObjectNet3D/ObjectNet3D_images.zip"
            }
            dl_tasks = []
            for name, url in urls.items():
                target_zip = os.path.join(temp_dir, os.path.basename(url))
                if not os.path.exists(target_zip):
                    dl_tasks.append((url, target_zip))
            if dl_tasks:
                print("Downloading ObjectNet3D archives (parallel)...")
                self._parallel_downloads(dl_tasks, desc="ObjectNet3D downloads")

            for file in os.scandir(temp_dir):
                if file.name.endswith(".zip") and os.path.getsize(file.path) > 0:
                    try:
                        with zipfile.ZipFile(file.path, "r") as z:
                            z.extractall(temp_dir)
                    except Exception as e:
                        print(f"Extraction failed {file.path}: {e}")

            cad_off_dir = os.path.join(temp_dir, "ObjectNet3D", "CAD", "off")
            jobs = []
            if os.path.exists(cad_off_dir):
                for cat in os.scandir(cad_off_dir):
                    if not cat.is_dir(): continue
                    dst_cat_models = os.path.join(out_root, cat.name, "models")
                    os.makedirs(dst_cat_models, exist_ok=True)
                    for off_file in os.scandir(cat.path):
                        if off_file.name.endswith(".off") and len(off_file.name.split(".")[0]) == 6:
                            dst = os.path.join(dst_cat_models, off_file.name.replace(".off", ".stl"))
                            if os.path.exists(dst):
                                continue
                            jobs.append((off_file.path, dst, "general"))

            if jobs:
                print(f"Converting {len(jobs)} ObjectNet3D CAD files...")
                res = self._parallel_mesh_convert(jobs, desc="ObjectNet3D CAD -> STL")
                failed = sum(1 for e in res.values() if e)
                print(f"ObjectNet3D CAD conversions done. Failed: {failed}")

            ann_dir = os.path.join(temp_dir, "ObjectNet3D", "Annotations")
            img_dir = os.path.join(temp_dir, "ObjectNet3D", "Images")
            if os.path.exists(ann_dir) and os.path.exists(img_dir):
                for mat_file in tqdm(os.scandir(ann_dir), desc="ObjectNet3D annotations"):
                    if not (mat_file.name.endswith(".mat") and len(mat_file.name.split('.')[0]) == 6):
                        continue
                    try:
                        mat = scipy.io.loadmat(mat_file.path)
                        record = mat['record'][0, 0]
                        img_filename = str(record['filename'][0])
                        category_name = str(record['objects'][0, 0]['class'][0])
                        src_img = os.path.join(img_dir, img_filename)
                        if not os.path.exists(src_img):
                            continue
                        dest_img_dir = os.path.join(out_root, category_name, "images")
                        os.makedirs(dest_img_dir, exist_ok=True)
                        shutil.copy(src_img, os.path.join(dest_img_dir, os.path.basename(src_img)))
                    except Exception as e:
                        print(f"Annotation processing error {mat_file.name}: {e}")
        finally:
            try:
                if os.path.exists(temp_dir):
                    shutil.rmtree(temp_dir)
            except Exception:
                pass
        print("ObjectNet3D done.")

    def prepare_shapenet(self):
        out_root = self.paths["shapenet"]
        os.makedirs(out_root, exist_ok=True)
        from dotenv import load_dotenv
        load_dotenv()
        hf_token = os.environ.get("HF_TOKEN")
        if not hf_token:
            print("HF_TOKEN not set; skipping ShapeNet.")
            return
        try:
            from huggingface_hub import login, snapshot_download
            login(token=hf_token)
            repo_id = "ShapeNet/ShapeNetCore"
            cache_dir = snapshot_download(repo_id=repo_id, repo_type="dataset")
        except Exception as e:
            print(f"Failed to fetch ShapeNet: {e}")
            return

        jobs = []
        image_tmp_dirs = []
        for zip_filename in tqdm(os.listdir(cache_dir), desc="Enumerate ShapeNet zips"):
            if not zip_filename.endswith(".zip"):
                continue
            cat_id = zip_filename[:-4]
            models_dir = os.path.join(out_root, cat_id, "models")
            images_dir = os.path.join(out_root, cat_id, "images")
            if os.path.exists(models_dir) and len(os.listdir(models_dir)) > 0:
                continue
            os.makedirs(models_dir, exist_ok=True)
            os.makedirs(images_dir, exist_ok=True)
            zip_filepath = os.path.join(cache_dir, zip_filename)

            try:
                with zipfile.ZipFile(zip_filepath, "r") as z:
                    all_files = z.namelist()
                    model_ids = sorted(list({f.split("/")[1] for f in all_files if f.count("/") >= 2}))
                    for mid in model_ids:
                        model_norm = f"{cat_id}/{mid}/models/model_normalized.obj"
                        candidate = None
                        if model_norm in all_files:
                            candidate = model_norm
                        else:
                            for f in all_files:
                                if f.startswith(f"{cat_id}/{mid}/models/") and f.endswith(".obj"):
                                    candidate = f
                                    break
                        if not candidate:
                            continue
                        dst = os.path.join(models_dir, f"{mid}.stl")
                        if os.path.exists(dst):
                            continue
                        tmp_dir = tempfile.mkdtemp(prefix="shapenet_")
                        try:
                            z.extract(candidate, path=tmp_dir)
                            src_path = os.path.join(tmp_dir, candidate)
                            jobs.append((src_path, dst, "general"))
                            image_tmp_dirs.append(tmp_dir)
                        except Exception as ex:
                            shutil.rmtree(tmp_dir, ignore_errors=True)
                            print(f"Error extracting {candidate} from {zip_filename}: {ex}")

                    screenshot_files = [f for f in all_files if f.endswith(".png") or f.endswith(".jpg")]
                    for shot in screenshot_files:
                        try:
                            parts = shot.split("/")
                            if len(parts) >= 3:
                                mid = parts[1]
                                ext = os.path.splitext(shot)[1]
                                outname = f"{mid}_{os.path.basename(shot)}"
                                outpath = os.path.join(images_dir, outname)
                                if not os.path.exists(outpath):
                                    with z.open(shot) as src, open(outpath, "wb") as dst:
                                        shutil.copyfileobj(src, dst)
                        except Exception:
                            pass
            except Exception as e:
                print(f"Error processing ShapeNet zip {zip_filename}: {e}")

        if jobs:
            print(f"Converting {len(jobs)} ShapeNet models...")
            res = self._parallel_mesh_convert(jobs, desc="ShapeNet conversions")
            failed = sum(1 for e in res.values() if e)
            print(f"ShapeNet conversions done. Failed: {failed}")

        for tmp_dir in set(image_tmp_dirs):
            try:
                shutil.rmtree(tmp_dir)
            except Exception:
                pass
        print("ShapeNet done.")

    def prepare_custom_dataset(self):
        out_root = self.paths["custom"]
        os.makedirs(out_root, exist_ok=True)
        if os.listdir(out_root):
            print("Custom dataset folder not empty; skipping.")
            return

        files_to_download = {
            "train_data": "https://disk.yandex.ru/d/RRXJu9ZtEmSXzQ",
            "test_data": "https://disk.yandex.ru/d/TmbB7BsGzg1dQQ",
        }
        download_tasks = []
        for key, public_url in files_to_download.items():
            api = "https://cloud-api.yandex.net/v1/disk/public/resources/download?"
            api_url = api + urlencode(dict(public_key=public_url))
            try:
                r = requests.get(api_url, timeout=10)
                r.raise_for_status()
                href = r.json().get("href")
                if href:
                    out_zip = os.path.join(self.root_dir, f"yandex_{key}.zip")
                    download_tasks.append((href, out_zip))
                else:
                    print(f"Could not resolve Yandex link {public_url}")
            except Exception as e:
                print(f"Yandex API error for {public_url}: {e}")

        if download_tasks:
            print("Downloading custom dataset zips from Yandex.Disk (parallel)...")
            dl_res = self._parallel_downloads(download_tasks, desc="Yandex downloads")
            for (zipfile_path, err) in dl_res:
                if err:
                    print(f"Download failed for {zipfile_path}: {err}")
                    continue
                try:
                    with zipfile.ZipFile(zipfile_path, "r") as z:
                        z.extractall(out_root)
                    os.remove(zipfile_path)
                except Exception as e:
                    print(f"Failed to extract {zipfile_path}: {e}")
        print("Custom dataset done.")

    # compatibility wrapper kept
    def _download_file(self, url, filename):
        _download_stream(url, filename)

# ----------------------data-
# Usage note:
# -----------------------
# IMPORTANT: In Jupyter, restart the kernel before running this cell to clear any partially-created pools.
# Then run:
# manager = DatasetsManager(root_dir="my_3d_datasets")
# manager.prepare_all_datasets()
#
# If you still hit pickling/spawn errors in your environment, set mp start method to 'fork' in a fresh Python script:
# multiprocessing.set_start_method('fork')  # only on Unix-like systems and only at program start

if __name__ == "__main__":
    # When executed as a script, this will run nicely (spawn/fork semantics handled by _get_process_context).
    manager = DatasetsManager(root_dir="my_3d_datasets")
    # manager.prepare_all_datasets()
    # manager.prepare_shapenet()
    manager.prepare_objectnet3d()


DatasetsManager(root_dir='my_3d_datasets')
CPU workers: 12, IO workers: 16


Note: Environment variable`HF_TOKEN` is set and is the current active token independently from the token you've just configured.


Fetching 58 files:   0%|          | 0/58 [00:00<?, ?it/s]

Enumerate ShapeNet zips:   0%|          | 0/58 [00:00<?, ?it/s]

Error extracting 02958343/235392f8419bb5006a34aa94ca8a3355/models/model_normalized.obj from 02958343.zip: [Errno 28] No space left on device
Error extracting 02958343/2359486974efded33121f82ae456ac81/models/model_normalized.obj from 02958343.zip: [Errno 28] No space left on device
Error extracting 02958343/238c6d6da1c8ce2970097c1b40e1ea6/models/model_normalized.obj from 02958343.zip: [Errno 28] No space left on device
Error extracting 02958343/23ae5d6c60ab3f60ee3362a8d2d8318f/models/model_normalized.obj from 02958343.zip: [Errno 28] No space left on device
Error extracting 02958343/23bfcd49917919006a34aa94ca8a3355/models/model_normalized.obj from 02958343.zip: [Errno 28] No space left on device
Error extracting 02958343/23e7ae83601f893b575116d39d0ffbba/models/model_normalized.obj from 02958343.zip: [Errno 28] No space left on device
Error extracting 02958343/23e8adb3bf1961f85332d3b92481b499/models/model_normalized.obj from 02958343.zip: [Errno 28] No space left on device
Error extracti

KeyboardInterrupt: 