In [1]:
from asyncio import Protocol
from contextlib import AbstractContextManager
from typing import Optional, Sequence, Tuple, cast
from mediapipe.python.solutions.hands import Hands
import numpy as np


class SafeHandsResult(Protocol):
    multi_hand_landmarks: Optional[Sequence]
    multi_hand_world_landmarks: Optional[Sequence]
    multi_handedness: Optional[Sequence]

class SafeHands(AbstractContextManager):
	def __init__(self, 
              	static_image_mode: bool = False,
			    max_num_hands: int = 2,
			    model_complexity: int = 1,
			    min_detection_confidence: float = 0.5,
			    min_tracking_confidence: float = 0.5):
		self.max_num_hands = max_num_hands
		self.hands = Hands(static_image_mode = static_image_mode,
			    max_num_hands = max_num_hands,
			    model_complexity = model_complexity,
			    min_detection_confidence = min_detection_confidence,
			    min_tracking_confidence = min_tracking_confidence)
    
	def __enter__(self) -> "SafeHands":
		return self
    
	def __exit__(self, exc_type, exc_value, traceback) -> None:
		self.hands.close()

	def process(self, frame_rgb: np.ndarray) -> Optional[Tuple[np.ndarray, np.ndarray, np.ndarray]]:
		mp_result = cast(SafeHandsResult, self.hands.process(frame_rgb))
		if (not mp_result.multi_hand_landmarks and not mp_result.multi_hand_world_landmarks and not mp_result.multi_handedness):
			return None
		landmarks = np.zeros((self.max_num_hands, 21, 3), dtype=np.float32)
		world_landmarks = np.zeros((self.max_num_hands, 21, 3), dtype=np.float32)
		handedness = np.zeros((self.max_num_hands, 2), dtype=np.float32)
		if mp_result.multi_hand_landmarks:
			for i, hand in enumerate(mp_result.multi_hand_landmarks[:self.max_num_hands]):
				for j, lm in enumerate(hand.landmark):
					landmarks[i, j, 0] = lm.x
					landmarks[i, j, 1] = lm.y
					landmarks[i, j, 2] = lm.z

		if mp_result.multi_hand_world_landmarks:
			for i, hand in enumerate(mp_result.multi_hand_world_landmarks[:self.max_num_hands]):
				for j, lm in enumerate(hand.landmark):
					world_landmarks[i, j, 2] = lm.z
					world_landmarks[i, j, 0] = lm.x
					world_landmarks[i, j, 1] = lm.y

		if mp_result.multi_handedness:
			for i, h in enumerate(mp_result.multi_handedness[:self.max_num_hands]):
				handedness[i, 0] = 0 if h.classification[0].label == "Left" else 1
				handedness[i, 1] = h.classification[0].score

		return landmarks, world_landmarks, handedness


2025-12-10 04:21:10.081650: I external/local_xla/xla/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2025-12-10 04:21:10.085805: I external/local_xla/xla/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2025-12-10 04:21:10.097539: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:467] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1765336870.116167  218808 cuda_dnn.cc:8579] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1765336870.121504  218808 cuda_blas.cc:1407] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
W0000 00:00:1765336870.136538  218808 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linkin

In [4]:
import logging


logging.basicConfig(
	level=logging.INFO,
	format="%(asctime)s [%(processName)s] %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)

In [13]:
from abc import ABC, abstractmethod
from multiprocessing import Event, Process, Queue
from os import getpid
from typing import Callable, Generic, TypeVar, Optional
from multiprocessing.synchronize import Event as EventType
from queue import Empty


I = TypeVar("I")
O = TypeVar("O")
Q = TypeVar("Q")


class GenMPQueue(Generic[Q]):
    def __init__(self, maxsize: int = 0):
        self._queue = Queue(maxsize)

    def put(self, obj: Q, block: bool = True, timeout: Optional[float] = None) -> None:
        self._queue.put(obj, block, timeout)

    def get(self, block: bool = True, timeout: Optional[float] = None) -> Q:
        return self._queue.get(block, timeout)

    def put_nowait(self, obj: Q) -> None:
        self._queue.put_nowait(obj)

    def get_nowait(self) -> Q:
        return self._queue.get_nowait()

    def empty(self) -> bool:
        return self._queue.empty()

    def full(self) -> bool:
        return self._queue.full()

    def qsize(self) -> int:
        return self._queue.qsize()

    def close(self) -> None:
        self._queue.close()

    def join_thread(self) -> None:
        self._queue.join_thread()

    def cancel_join_thread(self) -> None:
        self._queue.cancel_join_thread()

    def __enter__(self) -> "GenMPQueue":
        return self

    def __exit__(self, exc_type, exc_value, traceback) -> None:
        self._queue.close()


class IWorker(ABC, Generic[I, O]):
    def __init__(self, in_queue: GenMPQueue[I], out_queue: GenMPQueue[O]) -> None:
        self.in_queue: GenMPQueue[I] = in_queue
        self.out_queue: GenMPQueue[O] = out_queue

    @abstractmethod
    def work(self, item: I) -> O:
        pass


class IBatchProcessor(
    Generic[I, O],
    AbstractContextManager,
):

    @abstractmethod
    def start(self) -> None:
        pass

    @abstractmethod
    def stop(self) -> None:
        pass


class BatchProcessor(IBatchProcessor[I, O]):
    def __init__(
        self,
        in_queue: GenMPQueue[I],
        out_queue: GenMPQueue[O],
        n_workers: int,
        worker_generator: Callable[[GenMPQueue[I], GenMPQueue[O]], IWorker[I, O]],
        worker_timeout: float | None = None,
    ) -> None:
        self.in_queue: GenMPQueue[I] = in_queue
        self.out_queue: GenMPQueue[O] = out_queue
        self.n_workers = n_workers
        self.worker_generator = worker_generator
        self.stop_event: EventType = Event()
        self.workers: list[Process] = []
        self.worker_timeout = worker_timeout

    def _worker(self) -> None:
        worker = self.worker_generator(self.in_queue, self.out_queue)

        while not self.stop_event.is_set():
            try:
                item: I = self.in_queue.get(timeout=0.5)
            except Empty:
                continue
            try:
                result: O = worker.work(item)
                self.out_queue.put(result)
            except Exception:
                logger.exception("Worker crashed while processing item")

    def start(self) -> None:
        self.stop_event.clear()
        for _ in range(self.n_workers):
            p = Process(target=self._worker)
            p.start()
            self.workers.append(p)

    def stop(self) -> None:
        self.stop_event.set()
        if not self.worker_timeout:
            for p in self.workers:
                p.join()
        else:
            for p in self.workers:
                p.join(timeout=self.worker_timeout)
            for p in self.workers:
                if p.is_alive():
                    p.terminate()
        self.workers.clear()

    def __enter__(self) -> "BatchProcessor":
        return self

    def __exit__(self, exc_type, exc_value, traceback) -> None:
        self.stop()

In [15]:
from typing import Iterator, Optional, Generic
from abc import ABC, abstractmethod
from multiprocessing import Event
import asyncio

T = TypeVar("T")
O = TypeVar("O")

I = TypeVar("I")
C = TypeVar("C")

class IDatasetSource(ABC, Generic[T]):
	@abstractmethod
	def __iter__(self) -> Iterator[T]:
		pass

	@abstractmethod
	def __len__(self) -> Optional[int]:
		pass
	
class IDataCollector(ABC, Generic[I, C]):
	@abstractmethod
	def collect(self, item:I) ->None:
		pass
	@abstractmethod
	def get_collection(self) -> C:
		pass


class IterBatchProcessor(Generic[T, O, C]):
	def __init__(
		self,
		batch_processor: IBatchProcessor[T, O],
		data_source: IDatasetSource[T],
		collector: IDataCollector[O,C],
		max_samples: Optional[int]  = None,
		max_queue_size: int = 200,
	) -> None:
		self.in_queue: GenMPQueue[T] = GenMPQueue(maxsize=max_queue_size)
		self.out_queue: GenMPQueue[O] = GenMPQueue(maxsize=max_queue_size)
		self.batch_processor: IBatchProcessor[T, O] = batch_processor
		self.data_source: IDatasetSource[T] = data_source
		self.max_samples: Optional[int] = max_samples
		self.collector = collector
  
	def _run_blocking(self) -> Optional[C]:
		def in_routine(in_queue, datasource, max_items):
			for i, item in enumerate(datasource):
				if i >= max_items:
					break
				in_queue.put(item)
	
		in_p = Process(target=in_routine, args=[self.in_queue, self.data_source, self.max_samples])
		in_p.start()
		self.batch_processor.start()
		while in_p.is_alive():
			self.collector.collect(self.out_queue.get())
		self.batch_processor.stop()
		in_p.join()
		while not self.out_queue.empty():
			self.collector.collect(self.out_queue.get())
		return self.collector.get_collection()

	async def process(self):
		try:
			loop = asyncio.get_running_loop()
			return await loop.run_in_executor(None, self._run_blocking)
		except asyncio.CancelledError:
			self.batch_processor.stop()
			raise
	

In [None]:
#Variant
from typing import Iterator, Optional, Generic
from abc import ABC, abstractmethod
from multiprocessing import Event
import asyncio

T = TypeVar("T")
O = TypeVar("O")

I = TypeVar("I")
C = TypeVar("C")

class IDatasetSource(ABC, Generic[T]):
	@abstractmethod
	def __iter__(self) -> Iterator[T]:
		pass

	@abstractmethod
	def __len__(self) -> Optional[int]:
		pass
	
class IDataCollector(ABC, Generic[I, C]):
	@abstractmethod
	def collect(self, item:I) ->None:
		pass
	@abstractmethod
	def get_collection(self) -> C:
		pass


class IterBatchProcessor(Generic[T, O, C]):
	def __init__(
		self,
		batch_processor: IBatchProcessor[T, O],
		data_source: IDatasetSource[T],
		collector: IDataCollector[O,C],
		max_samples: Optional[int]  = None,
		max_queue_size: int = 200,
	) -> None:
		self.in_queue: GenMPQueue[T] = GenMPQueue(maxsize=max_queue_size)
		self.out_queue: GenMPQueue[O] = GenMPQueue(maxsize=max_queue_size)
		self.batch_processor: IBatchProcessor[T, O] = batch_processor
		self.data_source: IDatasetSource[T] = data_source
		self.max_samples: Optional[int] = max_samples
		self.collector = collector
  
	async def _producer(self, done_evt: asyncio.Event):
		try:
			for i, item in enumerate(self.data_source):
				if self.max_samples is not None and i >= self.max_samples:
					break
				await asyncio.to_thread(self.in_queue.put, item)
		finally:
			done_evt.set()

	async def _consumer(self, done_evt: asyncio.Event):
		while True:
			if done_evt.is_set() and self.out_queue.empty():
				break
			try:
				item = await asyncio.to_thread(
					self.out_queue.get, True, 0.5
				)
				self.collector.collect(item)
			except Empty:
				await asyncio.sleep(0)

	async def process(self) -> C:
		done_evt = asyncio.Event()

		try:
			self.batch_processor.start()

			producer = asyncio.create_task(
				self._producer(done_evt)
			)
			consumer = asyncio.create_task(
				self._consumer(done_evt)
			)

			await producer
			await consumer

			return self.collector.get_collection()

		except asyncio.CancelledError:
			self.batch_processor.stop()
			raise
		finally:
			self.batch_processor.stop()
	

In [None]:
import asyncio
from typing import Any, Iterable
from datasets import load_dataset


class HugDataSource(IDatasetSource[Any]):
	def __init__(self) -> None:
		DATASET_NAME = "Vincent-luo/hagrid-mediapipe-hands"
		SPLIT = "train"

		self.dataset: Iterable[Any] = load_dataset(
			DATASET_NAME,
			split=SPLIT,
			streaming=True,
		)
	  
	def __iter__(self) -> Iterator[Any]:
		return self.dataset.__iter__()

	def __len__(self) -> Optional[int]:
		pass

class Collector(IDataCollector):
    

async def main():
	processor = IterBatchProcessor(BatchProcessor(), HugDataSource(), Collector(), 100, 200)

asyncio.run(main())

In [None]:
from queue import Empty
from typing import Iterable
from datasets import load_dataset
import numpy as np
from multiprocessing import Process, Event
from multiprocessing.synchronize import Event as EventType
import pandas as pd


class NoHands:
	"""Worker processed image but no hands were found."""

def dataset_saver(
	in_queue: GenMPQueue[np.ndarray | NoHands],
	stop_event: EventType,
	output_path: str,
	flush_every: int = 1000,
) -> None:
	logger.info("Saver started")

	buffer: list[pd.DataFrame] = []
	column_names: list[str] | None = None

	while True:
		try:
			item = in_queue.get(timeout=0.5)
		except Empty:
			if stop_event.is_set():
				break
			continue

		if isinstance(item, NoHands):
			continue

		if column_names is None:
			column_names = [
				f"lm{i}{axis}" for i in range(item.shape[1]) for axis in ("x", "y", "z")
			]

		flattened = item.reshape(item.shape[0], -1)
		buffer.append(pd.DataFrame(flattened, columns=column_names))

	if buffer:
		df = pd.concat(buffer, ignore_index=True)
		df.to_parquet(output_path, index=False)

	logger.info("Saver finished cleanly")



def worker_generator(
	in_queue: GenMPQueue[np.ndarray], out_queue: GenMPQueue[np.ndarray | NoHands]
) -> IWorker[np.ndarray, np.ndarray | NoHands]:
	class MPWorker(IWorker[np.ndarray, np.ndarray | NoHands]):
		def __init__(
			self, in_queue: GenMPQueue[np.ndarray], out_queue: GenMPQueue[np.ndarray | NoHands]
		) -> None:
			super().__init__(in_queue, out_queue)
			self.safe_hands = SafeHands(
				static_image_mode=True,
				max_num_hands=1,
			)

		def work(self, item: np.ndarray) -> np.ndarray | NoHands:
			result = self.safe_hands.process(item)
			if result is None:
				return NoHands()
			landmarks, _, _ = result
			return landmarks

	return MPWorker(in_queue, out_queue)


if __name__ == "__main__":
	DATASET_NAME = "Vincent-luo/hagrid-mediapipe-hands"
	SPLIT = "train"
	MAX_SAMPLES = 100
	OUTPUT_PATH = "a.parquet"

	dataset: Iterable = load_dataset(
		DATASET_NAME,
		split=SPLIT,
		streaming=True,
	)

	in_queue: GenMPQueue[np.ndarray] = GenMPQueue(maxsize=200)
	out_queue: GenMPQueue[np.ndarray | NoHands] = GenMPQueue(maxsize=200)

	processor = BatchProcessor(
		in_queue=in_queue,
		out_queue=out_queue,
		n_workers=30,
		worker_generator=worker_generator,
  		worker_timeout=5
	)
	processor.start()

	saver_stop_event: EventType = Event()
	saver = Process(
		target=dataset_saver,
		args=(out_queue, saver_stop_event, OUTPUT_PATH, 10),
	)
	saver.start()

	logger.info("Stream started")

	for i, item in enumerate(dataset):
		if i >= MAX_SAMPLES:
			break

		if i % 500 == 0:
			logger.info("Processed %d samples", i)

		img = item["image"]
		frame = np.array(img.convert("RGB"))
		in_queue.put(frame)

	processor.stop()
	saver_stop_event.set()
	saver.join()

	logger.info("Pipeline finished successfully ✅")

2025-12-10 04:36:11,191 [MainProcess] INFO - HTTP Request: HEAD https://huggingface.co/datasets/Vincent-luo/hagrid-mediapipe-hands/resolve/main/README.md "HTTP/1.1 307 Temporary Redirect"
2025-12-10 04:36:11,200 [MainProcess] INFO - HTTP Request: HEAD https://huggingface.co/api/resolve-cache/datasets/Vincent-luo/hagrid-mediapipe-hands/d4a37fd0729013021dcd1d5bef6a172c0f18b914/README.md "HTTP/1.1 200 OK"
2025-12-10 04:36:11,382 [MainProcess] INFO - HTTP Request: HEAD https://huggingface.co/datasets/Vincent-luo/hagrid-mediapipe-hands/resolve/d4a37fd0729013021dcd1d5bef6a172c0f18b914/hagrid-mediapipe-hands.py "HTTP/1.1 404 Not Found"
2025-12-10 04:36:11,697 [MainProcess] INFO - HTTP Request: HEAD https://s3.amazonaws.com/datasets.huggingface.co/datasets/datasets/Vincent-luo/hagrid-mediapipe-hands/Vincent-luo/hagrid-mediapipe-hands.py "HTTP/1.1 404 Not Found"
2025-12-10 04:36:11,830 [MainProcess] INFO - HTTP Request: HEAD https://huggingface.co/datasets/Vincent-luo/hagrid-mediapipe-hands/res

In [11]:
print(pd.read_parquet("a.parquet"))

        lm0x      lm0y          lm0z      lm1x      lm1y      lm1z      lm2x  \
0   0.225915  0.850638 -8.855681e-07  0.218813  0.687800 -0.007930  0.253913   
1   0.515303  0.477451 -3.362758e-07  0.541747  0.516120 -0.013345  0.584433   
2   0.470155  0.576103 -1.621919e-06  0.599694  0.523961 -0.067335  0.709539   
3   0.474171  0.830082  5.295099e-07  0.419266  0.795439 -0.030467  0.402109   
4   0.735487  0.542231 -9.114277e-08  0.723344  0.429138 -0.022489  0.678152   
..       ...       ...           ...       ...       ...       ...       ...   
91  0.455137  0.722885  6.719982e-07  0.415284  0.646005  0.006014  0.422603   
92  0.797456  0.714044 -4.436324e-07  0.733435  0.626247  0.005714  0.644629   
93  0.501803  0.725136 -9.860157e-07  0.603402  0.633062 -0.028235  0.647440   
94  0.616090  0.725264 -1.170247e-07  0.530562  0.698483 -0.009025  0.495261   
95  0.447762  0.659644 -9.137238e-08  0.402772  0.593632 -0.042988  0.393378   

        lm2y      lm2z      lm3x  ...  