Skip to content

anhvth/speedy_utils

Repository files navigation

Speedy Utils

PyPI Python Versions License

Speedy Utils is a Python utility library designed to streamline common programming tasks such as caching, parallel processing, file I/O, and data manipulation. It provides a collection of decorators, functions, and classes to enhance productivity and performance in your Python projects.

🚀 Recent Updates (January 27, 2026)

Enhanced Error Handling in Parallel Processing:

  • Rich-formatted error tracebacks with code context and syntax highlighting
  • Three error handling modes: 'raise', 'ignore', and 'log'
  • Filtered tracebacks focusing on user code (hiding infrastructure)
  • Real-time progress reporting with error/success statistics
  • Automatic error logging to timestamped files
  • Caller frame information showing where parallel functions were invoked

Quick Start

Parallel Processing with Error Handling

from speedy_utils import multi_thread, multi_process

# Simple parallel processing
results = multi_thread(lambda x: x * 2, [1, 2, 3, 4, 5])
# Results: [2, 4, 6, 8, 10]

# Robust processing with error handling
def process_item(item):
    if item == 3:
        raise ValueError(f"Cannot process item {item}")
    return item * 2

# Continue processing despite errors
results = multi_thread(process_item, [1, 2, 3, 4, 5], error_handler='log')
# Results: [2, 4, None, 8, 10] - errors logged automatically

Table of Contents

Features

  • Caching Mechanisms: Disk-based and in-memory caching to optimize function calls.
  • Parallel Processing: Multi-threading, multi-processing, and asynchronous multi-threading utilities with enhanced error handling.
  • File I/O: Simplified JSON, JSONL, and pickle file handling with support for various file extensions.
  • Data Manipulation: Utilities for flattening lists and dictionaries, converting data types, and more.
  • Timing Utilities: Tools to measure and log execution time of functions and processes.
  • Pretty Printing: Enhanced printing functions for structured data, including HTML tables for Jupyter notebooks.
  • Enhanced Error Handling: Rich error tracebacks with code context, configurable error handling modes ('raise', 'ignore', 'log'), and detailed progress reporting.

Installation

You can install Speedy Utils via PyPI:

pip install speedy-utils
# or
uv pip install speedy-utils

Alternatively, install directly from the repository:

pip install git+https://github.com/anhvth/speedy
# or
uv pip install git+https://github.com/anhvth/speedy

For local development:

git clone https://github.com/anhvth/speedy
cd speedy
uv sync

Extras

Optional dependencies can be installed via extras. For the ray backend support (requires Python >= 3.9):

pip install 'speedy-utils[ray]'
# or
uv pip install 'speedy-utils[ray]'

# developing this repo
uv sync --extra ray

Updating from previous versions

To update from previous versions or switch to v1.x, first uninstall any old packages, then install the latest version:

pip uninstall speedy_llm_utils speedy_utils
pip install -e ./  # for local development
# or
pip install speedy-utils -U  # for PyPI upgrade

Usage

Below are examples demonstrating how to utilize various features of Speedy Utils.

Caching

Memoize Decorator

Cache the results of function calls to disk to avoid redundant computations.

from speedy_utils import memoize

@memoize
def expensive_function(x):
    # Simulate an expensive computation
    import time
    time.sleep(2)
    return x * x

result = expensive_function(4)  # Takes ~2 seconds
result = expensive_function(4)  # Retrieved from cache instantly

In-Memory Memoization

Cache function results in memory for faster access within the same runtime.

from speedy_utils import imemoize

@imemoize
def compute_sum(a, b):
    return a + b

result = compute_sum(5, 7)  # Computed and cached
result = compute_sum(5, 7)  # Retrieved from in-memory cache

Parallel Processing

Multi-threading with Enhanced Error Handling

Execute functions concurrently using multiple threads with comprehensive error handling. The enhanced error handling provides three modes: 'raise' (default), 'ignore', and 'log'. When errors occur, you'll see rich-formatted tracebacks with code context and caller information.

from speedy_utils import multi_thread

def process_item(item):
    # Simulate processing that might fail
    if item == 3:
        raise ValueError(f"Invalid item: {item}")
    return item * 2

items = [1, 2, 3, 4, 5]

# Default behavior: raise on first error with rich traceback
try:
    results = multi_thread(process_item, items, workers=3)
except SystemExit:
    print("Error occurred and was displayed with rich formatting")

# Continue processing on errors, return None for failed items
results = multi_thread(process_item, items, workers=3, error_handler='ignore')
print(results)  # [2, 4, None, 8, 10]

# Log errors to files and continue processing
results = multi_thread(process_item, items, workers=3, error_handler='log', max_error_files=10)
print(results)  # [2, 4, None, 8, 10] - errors logged to .cache/speedy_utils/error_logs/

Multi-processing with Error Handling

Process items across multiple processes with the same enhanced error handling capabilities.

from speedy_utils import multi_process

def risky_computation(x):
    """Computation that might fail for certain inputs."""
    if x % 5 == 0:
        raise RuntimeError(f"Cannot process multiples of 5: {x}")
    return x ** 2

data = list(range(12))

# Process with error logging (continues on errors)
results = multi_process(
    risky_computation, 
    data, 
    backend='mp',
    error_handler='log',
    max_error_files=5
)
print(results)  # [0, 1, 4, 9, 16, None, 36, 49, 64, 81, None, 121]

Multi-Process with Inner Thread Pools

For maximum parallelism, multi_process supports nested parallelism where each process has its own thread pool. This is ideal for CPU-bound tasks that also benefit from I/O parallelism.

from speedy_utils import multi_process

def process_data(item):
    """Process a data item with potential I/O operations."""
    # Simulate CPU work
    result = item ** 2
    # Simulate I/O (each thread can do I/O in parallel)
    time.sleep(0.01)
    return result

data = list(range(100))

# 4 processes, each with 4 threads = 16 concurrent workers
results = multi_process(
    process_data,
    data,
    num_procs=4,      # Number of processes
    num_threads=4,    # Threads per process
    backend='mp',
    progress=True,
)

Backend Options:

Backend Description Use Case
'mp' Multi-processing with optional inner threads CPU-bound work, bypasses GIL
'safe' In-process thread pool (for testing) Debugging, unit tests
'seq' Sequential execution Debugging, reproducibility
'ray' Ray distributed backend Distributed computing

When to use num_procs vs num_threads:

  • CPU-bound tasks: Use num_procs > 1, num_threads = 1 (processes bypass GIL)
  • I/O-bound tasks: Use num_procs = 1, num_threads > 1 (threads are lighter)
  • Mixed workloads: Use both (e.g., num_procs=4, num_threads=4)
# Example: Web scraping with multi-process + multi-thread
def fetch_and_parse(url):
    response = requests.get(url)  # I/O bound
    return parse_content(response.text)  # CPU bound

# 4 processes for parsing, 8 threads per process for fetching
results = multi_process(
    fetch_and_parse,
    urls,
    num_procs=4,
    num_threads=8,
    backend='mp',
    error_handler='log',  # Continue on failed URLs
)

mpython (CLI Tool)

mpython is a CLI tool for running Python scripts in multiple tmux windows with automatic GPU/CPU allocation for parallel processing.

Basic Usage:

# Run script.py with 16 parallel processes across GPUs 0-7
mpython script.py

# Run with 8 processes
mpython -t 8 script.py

# Run on specific GPUs only
mpython --gpus 0,1,2 script.py

Multi-Process Script Setup:

Your script must use MP_ID and MP_TOTAL environment variables for sharding:

import os

MP_ID = int(os.getenv("MP_ID", "0"))
MP_TOTAL = int(os.getenv("MP_TOTAL", "1"))

# Shard your data - each process gets its slice
inputs = list(range(1000))
my_inputs = inputs[MP_ID::MP_TOTAL]

for item in my_inputs:
    process(item)

Managing Sessions:

  • Sessions are named incrementally: mpython, mpython-1, mpython-2, etc.
  • Kill all sessions: kill-mpython
  • Attach to session: tmux attach -t mpython

Enhanced Error Handling

Speedy Utils now provides comprehensive error handling for parallel processing with rich formatting and detailed diagnostics.

Rich Error Tracebacks

When errors occur, you'll see beautifully formatted tracebacks with:

  • Code context: Lines of code around the error location
  • Caller information: Shows where the parallel function was invoked
  • Filtered frames: Focuses on user code, hiding infrastructure details
  • Color coding: Easy-to-read formatting with syntax highlighting

Error Handling Modes

Choose how to handle errors in parallel processing:

  • 'raise' (default): Stop on first error with detailed traceback
  • 'ignore': Continue processing, return None for failed items
  • 'log': Log errors to files and continue processing

Error Logging

When using error_handler='log', errors are automatically saved to timestamped files in .cache/speedy_utils/error_logs/ with full context and stack traces.

Progress Reporting with Error Statistics

Progress bars now show real-time error and success counts:

Multi-thread [8/10] [00:02<00:00, 3.45it/s, success=8, errors=2, pending=0]

This makes it easy to monitor processing health at a glance.

Example: Robust Data Processing

from speedy_utils import multi_thread

def process_data_record(record):
    """Process a data record that might have issues."""
    try:
        # Your processing logic here
        value = record['value'] / record['divisor']
        return {'result': value, 'status': 'success'}
    except KeyError as e:
        raise ValueError(f"Missing required field in record: {e}")
    except ZeroDivisionError:
        raise ValueError("Division by zero in record")

# Sample data with some problematic records
data = [
    {'value': 10, 'divisor': 2},     # OK
    {'value': 15, 'divisor': 0},     # Will error
    {'value': 20, 'divisor': 4},     # OK
    {'value': 25},                   # Missing divisor - will error
]

# Process with error logging - continues despite errors
results = multi_thread(
    process_data_record, 
    data, 
    workers=4,
    error_handler='log',
    max_error_files=10
)

print("Results:", results)
# Output: Results: [{'result': 5.0, 'status': 'success'}, None, {'result': 5.0, 'status': 'success'}, None]
# Errors are logged to files for later analysis

File I/O

Dumping Data

Save data in JSON, JSONL, or pickle formats.

from speedy_utils import dump_json_or_pickle, dump_jsonl

data = {"name": "Alice", "age": 30}

# Save as JSON
dump_json_or_pickle(data, "data.json")

# Save as JSONL
dump_jsonl([data, {"name": "Bob", "age": 25}], "data.jsonl")

# Save as Pickle
dump_json_or_pickle(data, "data.pkl")

Loading Data

Load data based on file extensions.

from speedy_utils import load_json_or_pickle, load_by_ext

# Load JSON
data = load_json_or_pickle("data.json")

# Load JSONL
data_list = load_json_or_pickle("data.jsonl")

# Load Pickle
data = load_json_or_pickle("data.pkl")

# Load based on extension with parallel processing
loaded_data = load_by_ext(["data.json", "data.pkl"])

Data Manipulation

Flattening Lists and Dictionaries

from speedy_utils import flatten_list, flatten_dict

nested_list = [[1, 2], [3, 4], [5]]
flat_list = flatten_list(nested_list)
print(flat_list)  # [1, 2, 3, 4, 5]

nested_dict = {"a": {"b": 1, "c": 2}, "d": 3}
flat_dict = flatten_dict(nested_dict)
print(flat_dict)  # {'a.b': 1, 'a.c': 2, 'd': 3}

Converting to Built-in Python Types

from speedy_utils import convert_to_builtin_python
from pydantic import BaseModel

class User(BaseModel):
    name: str
    age: int

user = User(name="Charlie", age=28)
builtin_user = convert_to_builtin_python(user)
print(builtin_user)  # {'name': 'Charlie', 'age': 28}

Utility Functions

Pretty Printing

from speedy_utils import fprint, print_table

data = {"name": "Dana", "age": 22, "city": "New York"}

# Pretty print as table
fprint(data)

# Print as table using tabulate
print_table(data)

Timing Utilities

from speedy_utils import timef, Clock

@timef
def slow_function():
    import time
    time.sleep(3)
    return "Done"

result = slow_function()  # Prints execution time

# Using Clock
clock = Clock()
# ... your code ...
clock.log()

LLM

The LLM class provides a unified interface for language model interactions with structured input/output handling. It supports text completion, structured outputs, caching, streaming, and VLLM integration.

Basic Text Completion

from llm_utils import LLM

llm = LLM(
    instruction="You are a helpful assistant.",
    model="gpt-4o-mini"
)

# Simple text completion
results = llm("What is Python?")
print(results[0]["parsed"])  # The text response
print(results[0]["messages"])  # Full conversation history

Structured Output with Pydantic

from pydantic import BaseModel
from llm_utils import LLM

class Sentiment(BaseModel):
    sentiment: str
    confidence: float

llm = LLM(
    instruction="Analyze the sentiment of the input.",
    output_model=Sentiment,
    model="gpt-4o-mini"
)

results = llm("I love this product!")
parsed: Sentiment = results[0]["parsed"]
print(f"Sentiment: {parsed.sentiment}, Confidence: {parsed.confidence}")

Streaming Responses

from llm_utils import LLM

llm = LLM(model="gpt-4o-mini")

# Stream text responses
stream = llm("Tell me a story", stream=True)
for chunk in stream:
    content = chunk.choices[0].delta.content
    if content:
        print(content, end="", flush=True)

Client Configuration

The LLM class accepts various client configurations:

from llm_utils import LLM
from openai import OpenAI

# Using a custom OpenAI client
custom_client = OpenAI(base_url="http://localhost:8000/v1", api_key="your-key")
llm = LLM(client=custom_client, model="llama-2-7b")

# Using a port number (for VLLM servers)
llm = LLM(client=8000, model="llama-2-7b")

# Using a base URL string
llm = LLM(client="http://localhost:8000/v1", model="llama-2-7b")

VLLM Integration

Start and manage VLLM servers automatically:

from llm_utils import LLM

llm = LLM(
    vllm_cmd="vllm serve meta-llama/Llama-2-7b-chat-hf --port 8000",
    model="meta-llama/Llama-2-7b-chat-hf"
)

# The server starts automatically and is cleaned up on exit
results = llm("Hello!")

# Cleanup is automatic when using context manager
with LLM(vllm_cmd="vllm serve ...") as llm:
    results = llm("Hello!")

Caching

Enable response caching to avoid redundant API calls:

from llm_utils import LLM

# Enable caching (default: True)
llm = LLM(model="gpt-4o-mini", cache=True)

# First call hits the API
result1 = llm("What is 2+2?")

# Second call returns cached result
result2 = llm("What is 2+2?")  # Instant response from cache

# Disable caching for a specific call
result3 = llm("What is 2+2?", cache=False)

Reasoning Models

Handle reasoning models that provide thinking traces:

from llm_utils import LLM

# For models like DeepSeek-R1 that output reasoning
llm = LLM(model="deepseek-reasoner")

results = llm("Solve this math problem: 15 * 23")

# Access the final answer
answer = results[0]["parsed"]

# Access reasoning content when the response includes it
reasoning = results[0].get("reasoning_content")

Conversation History

Inspect previous conversations:

from llm_utils import LLM

llm = LLM(model="gpt-4o-mini")

# Make some calls
llm("Hello")
llm("How are you?")

# Inspect the last conversation
history = llm.inspect_history(idx=-1)
print(history)

# Get last 3 messages from the conversation
history = llm.inspect_history(idx=-1, k_last_messages=3)

Testing

The test suite uses pytest:

uv sync
uv run pytest

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Packages

 
 
 

Contributors