Skip to content

eddiethedean/bulbasaur

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

4 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Bulbasaur

Bidirectional Unified Library Bridge And Schema Adaptation Utility Runtime

Python Version License PyPI Version Code Style

Convert between PySpark schemas and SQLAlchemy/SQLModel classes with ease.

Bulbasaur provides simple, bidirectional conversion functions to transform schemas between PySpark and SQLAlchemy, as well as SQLModel (optional dependency), supporting all common types. Perfect for data engineering workflows that need to bridge distributed data processing with ORM capabilities.

Table of Contents

Features

  • Bidirectional Conversion: Convert schemas in both directions seamlessly
  • Comprehensive Type Support: Supports all common primitive types with precision preservation
  • SQLModel Support: Optional SQLModel integration for modern Python type hints
  • Type Safety: Clear error messages for unsupported types and invalid schemas
  • Simple API: Functional, stateless functions with minimal dependencies
  • Schema Validation: Automatic validation of schemas before conversion
  • Explicit Primary Keys: Specify primary key fields explicitly for clarity and control

Installation

Basic Installation

pip install bulbasaur

With SQLModel Support

For SQLModel integration (optional):

pip install bulbasaur[sqlmodel]

Development Installation

git clone https://github.com/eddiethedean/bulbasaur.git
cd bulbasaur
pip install -e ".[dev]"

Requirements

  • Python >= 3.8
  • pyspark >= 3.0.0
  • sqlalchemy >= 1.4.0
  • sqlmodel >= 0.0.8 (optional, for SQLModel support)

Quick Start

Get started with Bulbasaur in just a few lines of code. Here are the most common conversion patterns:

Converting PySpark Schema to SQLAlchemy Model

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
from bulbasaur import to_sqlalchemy_model
from sqlalchemy.orm import DeclarativeBase

class Base(DeclarativeBase):
    pass

# Define a PySpark schema
pyspark_schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("score", DoubleType(), True),
])

# Convert to SQLAlchemy model
Person = to_sqlalchemy_model(pyspark_schema, primary_key="name", class_name="Person", base=Base)
print(Person.__tablename__)  # person
print(Person.name)  # Person.name

Converting SQLAlchemy Model to PySpark Schema

from sqlalchemy import Column, Integer, String, Float
from sqlalchemy.orm import DeclarativeBase
from bulbasaur import to_pyspark_schema

class Base(DeclarativeBase):
    pass

class Person(Base):
    __tablename__ = "person"
    
    name = Column(String, primary_key=True)
    age = Column(Integer)
    score = Column(Float)

# Convert to PySpark schema
pyspark_schema = to_pyspark_schema(Person)
print(pyspark_schema)
# StructType([StructField('name', StringType(), False), StructField('age', IntegerType(), True), StructField('score', DoubleType(), True)])

Converting PySpark Schema to SQLModel Class

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
from bulbasaur import to_sqlmodel_class

# Define a PySpark schema
pyspark_schema = StructType([
    StructField("id", IntegerType(), False),
    StructField("name", StringType(), False),
    StructField("age", IntegerType(), False),
    StructField("score", DoubleType(), True),
])

# Convert to SQLModel class
Person = to_sqlmodel_class(pyspark_schema, primary_key="id", class_name="Person")
print(Person.__name__)  # Person
print(Person.__annotations__)
# {'id': <class 'int'>, 'name': <class 'str'>, 'age': <class 'int'>, 'score': typing.Optional[float]}

Converting SQLModel Class to PySpark Schema

from sqlmodel import SQLModel
from bulbasaur import to_pyspark_schema

class Person(SQLModel):
    name: str
    age: int
    score: float | None = None

# Convert to PySpark schema
pyspark_schema = to_pyspark_schema(Person)
print(pyspark_schema)
# StructType([StructField('name', StringType(), False),
#             StructField('age', IntegerType(), False),
#             StructField('score', StringType(), True)])

Use Cases

Bulbasaur is perfect for:

  • Data Pipeline Integration: Convert PySpark schemas to SQLAlchemy models for database operations
  • Schema Synchronization: Keep schemas consistent between Spark jobs and database models
  • API Development: Generate SQLAlchemy models from PySpark DataFrames for REST APIs
  • Data Validation: Use SQLModel classes for validation while working with PySpark DataFrames
  • Migration Tools: Convert existing PySpark schemas to ORM models for legacy system migrations

Supported Types

Bulbasaur supports comprehensive type mappings between PySpark and SQLAlchemy/SQLModel. Precision and scale are preserved for decimal types, and nullability is maintained across conversions.

PySpark → SQLAlchemy

PySpark Type SQLAlchemy Type Notes
ByteType SmallInteger 8-bit integer
ShortType SmallInteger 16-bit integer
IntegerType Integer 32-bit integer
LongType BigInteger 64-bit integer
FloatType Float 32-bit floating point
DoubleType Float 64-bit floating point
BooleanType Boolean Boolean value
StringType String Variable-length string
DateType Date Date only
TimestampType DateTime Date and time with timezone
TimestampNTZType DateTime Date and time without timezone
DecimalType(p,s) Numeric(p,s) Precision and scale preserved
BinaryType LargeBinary Binary data
NullType String Fallback to String

SQLAlchemy → PySpark

SQLAlchemy Type PySpark Type Notes
SmallInteger ShortType 16-bit integer
Integer IntegerType 32-bit integer
BigInteger LongType 64-bit integer
Float DoubleType 64-bit floating point
Boolean BooleanType Boolean value
String StringType Variable-length string
Text StringType Long text as string
Date DateType Date only
DateTime TimestampType Date and time
Time TimestampType Time as timestamp
Numeric(p,s) DecimalType(p,s) Precision and scale preserved
LargeBinary BinaryType Binary data

Limitations

Unsupported Types

The following PySpark types are not directly supported in SQLAlchemy and will raise UnsupportedTypeError:

Type Reason Workaround
ArrayType SQLAlchemy doesn't have native array support Use JSON or String type
MapType SQLAlchemy doesn't have native map support Use JSON or String type
Nested StructType SQLAlchemy doesn't support nested structures Use JSON or String type

Type Conversions

Precision Preservation:

  • DecimalType(p, s)Numeric(p, s): Precision and scale are fully preserved
  • FloatDoubleType: Both represent 64-bit floating point numbers

Nullability Handling:

  • PySpark → SQLAlchemy: Nullability is preserved from StructField.nullable
  • SQLAlchemy → PySpark: Nullability is preserved from Column.nullable
  • SQLModel: Optional types (| None or Optional[T]) are converted to nullable fields

Input Validation

Bulbasaur performs comprehensive schema validation before conversion:

Validation Rule Error Type Description
Duplicate field names SchemaError Each field must have a unique name
Empty field names SchemaError Field names must be non-empty strings
Invalid field types (None) SchemaError All fields must have a valid data type
Invalid field name types SchemaError Field names must be strings
Empty schema SchemaError Schema must contain at least one field

Advanced Examples

Custom Base Class

from sqlalchemy.orm import DeclarativeBase
from bulbasaur import to_sqlalchemy_model

class CustomBase(DeclarativeBase):
    pass

schema = StructType([
    StructField("id", IntegerType(), False),
    StructField("name", StringType(), True),
])

Model = to_sqlalchemy_model(schema, primary_key="id", class_name="MyModel", base=CustomBase)

Round-Trip Conversion

from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from bulbasaur import to_pyspark_schema, to_sqlalchemy_model
from sqlalchemy.orm import DeclarativeBase

class Base(DeclarativeBase):
    pass

# Start with PySpark schema
original = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
])

# Convert to SQLAlchemy and back
model = to_sqlalchemy_model(original, primary_key="name", base=Base)
converted_back = to_pyspark_schema(model)

# Verify types match
assert len(converted_back.fields) == len(original.fields)
assert converted_back.fields[0].name == original.fields[0].name

Composite Primary Keys

from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from bulbasaur import to_sqlalchemy_model
from sqlalchemy.orm import DeclarativeBase

class Base(DeclarativeBase):
    pass

# Define schema with multiple fields for composite key
schema = StructType([
    StructField("user_id", IntegerType(), False),
    StructField("role_id", IntegerType(), False),
    StructField("permission", StringType(), True),
])

# Convert with composite primary key
UserRole = to_sqlalchemy_model(
    schema, 
    primary_key=["user_id", "role_id"], 
    class_name="UserRole", 
    base=Base
)

# Both fields are marked as primary keys
assert UserRole.__table__.columns["user_id"].primary_key is True
assert UserRole.__table__.columns["role_id"].primary_key is True

Error Handling

Bulbasaur provides clear error messages through custom exceptions:

from bulbasaur import ConversionError, UnsupportedTypeError, SchemaError

try:
    schema = to_sqlalchemy_model(invalid_schema)
except SchemaError as e:
    print(f"Invalid schema: {e}")
except UnsupportedTypeError as e:
    print(f"Unsupported type: {e}")
except ConversionError as e:
    print(f"Conversion error: {e}")

API Reference

to_sqlalchemy_model(pyspark_schema, primary_key, class_name="GeneratedModel", base=None)

Convert a PySpark StructType to a SQLAlchemy model class.

Parameters:

Parameter Type Default Description
pyspark_schema pyspark.sql.types.StructType required PySpark schema to convert
primary_key str or list[str] required Field name(s) to use as primary key. Can be a single string or list for composite keys.
class_name str "GeneratedModel" Name for the generated model class
base Type[DeclarativeBase] DeclarativeBase Base class for the model (optional)

Returns:

  • Type[DeclarativeBase]: SQLAlchemy model class with __tablename__ attribute

Raises:

  • SchemaError: If the schema structure is invalid (duplicate fields, empty names, etc.) or primary key field(s) not found
  • UnsupportedTypeError: If a type cannot be converted (ArrayType, MapType, nested StructType)

Example:

from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from bulbasaur import to_sqlalchemy_model
from sqlalchemy.orm import DeclarativeBase

class Base(DeclarativeBase):
    pass

schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
])

Person = to_sqlalchemy_model(schema, primary_key="name", class_name="Person", base=Base)
# Person is now a SQLAlchemy model class

to_pyspark_schema(model)

Convert a SQLAlchemy model class, instance, or SQLModel class to a PySpark StructType.

Parameters:

Parameter Type Description
model Type or instance SQLAlchemy model class/instance or SQLModel class

Returns:

  • pyspark.sql.types.StructType: PySpark schema with all fields converted

Raises:

  • SchemaError: If the model structure is invalid (no __table__ attribute, etc.)
  • UnsupportedTypeError: If a type cannot be converted

Example:

from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import DeclarativeBase
from bulbasaur import to_pyspark_schema

class Base(DeclarativeBase):
    pass

class Person(Base):
    __tablename__ = "person"
    name = Column(String, primary_key=True)
    age = Column(Integer)

schema = to_pyspark_schema(Person)
# Returns StructType with name and age fields

to_sqlmodel_class(pyspark_schema, primary_key, class_name="GeneratedModel")

Convert a PySpark StructType to a SQLModel class with type annotations.

Parameters:

Parameter Type Default Description
pyspark_schema pyspark.sql.types.StructType required PySpark schema to convert
primary_key str required Field name to use as primary key (must be a single string)
class_name str "GeneratedModel" Name for the generated model class

Returns:

  • Type[SQLModel]: SQLModel class with type annotations and default values

Raises:

  • SchemaError: If the schema structure is invalid or primary key field not found
  • UnsupportedTypeError: If a type cannot be converted
  • ImportError: If SQLModel is not installed

Example:

from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from bulbasaur import to_sqlmodel_class

schema = StructType([
    StructField("id", IntegerType(), False),
    StructField("name", StringType(), False),
    StructField("age", IntegerType(), True),
])

Person = to_sqlmodel_class(schema, primary_key="id", class_name="Person")
# Person is now a SQLModel class with type annotations
person = Person(id=1, name="Alice", age=30)

Development

Setup

Clone the repository and install in development mode:

git clone https://github.com/eddiethedean/bulbasaur.git
cd bulbasaur
pip install -e ".[dev]"

Running Tests

Run the full test suite:

pytest

Run tests with coverage:

pytest --cov=bulbasaur --cov-report=html

Code Quality

Format code with Black:

black bulbasaur tests

Lint code with Ruff:

ruff check bulbasaur tests

Project Structure

bulbasaur/
├── bulbasaur/                # Main package
│   ├── __init__.py          # Public API exports
│   ├── converters.py         # Core conversion functions
│   ├── type_mappings.py      # Type mapping dictionaries
│   └── errors.py             # Custom exceptions
├── tests/                    # Test suite
│   ├── test_converters.py    # Conversion function tests
│   ├── test_type_mappings.py # Type mapping tests
│   ├── test_errors.py        # Error handling tests
│   └── test_comprehensive.py # Comprehensive integration tests
├── pyproject.toml            # Package configuration
├── README.md                 # This file
└── LICENSE                   # MIT License

License

MIT License - see LICENSE file for details.

Contributing

Contributions are welcome! We appreciate your help in making Bulbasaur better.

How to Contribute

  1. Fork the repository and create a new branch for your feature or bugfix
  2. Make your changes following the existing code style
  3. Add tests for new functionality or bug fixes
  4. Run the test suite to ensure everything passes
  5. Submit a Pull Request with a clear description of your changes

Development Guidelines

  • Follow the existing code style (Black formatting, 100 character line length)
  • Write tests for all new features and bug fixes
  • Update documentation as needed
  • Ensure all tests pass before submitting

Reporting Issues

If you find a bug or have a feature request, please open an issue on GitHub with:

  • A clear description of the problem or feature
  • Steps to reproduce (for bugs)
  • Expected vs actual behavior
  • Python version and dependency versions

Inspiration

This project is part of a family of schema conversion libraries:

  • 🦎 charmander - Convert between Polars and PySpark schemas
  • 🐢 poldantic - Convert between Pydantic models and Polars schemas
  • 🌱 bulbasaur - Convert between PySpark and SQLAlchemy/SQLModel schemas

About

Bulbasaur provides a bridge between PySpark's distributed data processing and SQLAlchemy's ORM capabilities, enabling seamless schema conversion for data engineering workflows. Whether you're building data pipelines, APIs, or migration tools, Bulbasaur makes it easy to work with schemas across different ecosystems.


Made with ❤️ by Odos Matthews

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages