Compare schemas, detect drift, generate safe migration SQL with risk analysis across PostgreSQL, MySQL, and SQLite.
Database schema migrations are dangerous. Tools like Alembic and Flyway focus on running migrations but leave the hardest part to you: figuring out what changed and whether it's safe. You end up writing migration SQL by hand, guessing at risk levels, and hoping your rollback scripts actually work.
migra takes a different approach. Give it two schemas -- your current state and your target state -- and it will:
- Diff them at the column, index, and constraint level
- Assess risk for every change (SAFE through DESTRUCTIVE)
- Generate migration SQL with proper up/down scripts
- Validate your schema against best practices before you ship
- Track drift so you know when production diverges from your codebase
No database connection required. Pure schema analysis.
- Column-level change detection (type, nullable, default, constraints)
- Index and constraint tracking
- Multi-table dependency-aware ordering
- Risk assessment for every change
- PostgreSQL:
CREATE INDEX CONCURRENTLY, proper quoting,COMMENT ON - MySQL:
ENGINE=InnoDB, backtick quoting,MODIFY COLUMN - SQLite: Limitation-aware (warns about
ALTER COLUMN, constraint changes)
- Missing primary key detection
- Foreign key reference validation
- Reserved word warnings
- Duplicate index detection
- Naming convention checks
- Column type best practices
- Timestamped migration creation
- Checksum validation (detect tampered migrations)
- Dependency ordering
- Dry-run mode
- Rollback support
- History tracking
- Point-in-time schema snapshots
- Fingerprint-based change detection
- Detailed drift reports with risk analysis
pip install migramigra init# JSON schema files
migra diff schema_v1.json schema_v2.json
# SQL DDL files
migra diff current.sql target.sql
# With specific dialect
migra diff current.json target.json -d mysql# Generate and save migration SQL
migra plan current.json target.json -n "add_user_profiles"
# Preview without saving
migra plan current.json target.json --dry-runmigra validate schema.json# Save baseline snapshot
migra snapshot schema.json -l "v1.0"
# Later, check for drift
migra drift schema_current.json# Create empty migration
migra create "add_audit_columns"
# View status
migra status
# Apply pending migrations
migra apply
# Rollback last migration
migra rollback
# Rollback last 3
migra rollback -n 3{
"name": "myapp",
"tables": [
{
"name": "users",
"schema": "public",
"columns": [
{"name": "id", "type": "INTEGER", "primary_key": true, "nullable": false},
{"name": "email", "type": "VARCHAR", "max_length": 255, "unique": true},
{"name": "created_at", "type": "TIMESTAMP", "default": "NOW()"}
],
"indexes": [
{"name": "idx_email", "columns": ["email"], "unique": true}
]
}
]
}CREATE TABLE users (
id INTEGER PRIMARY KEY NOT NULL,
email VARCHAR(255) UNIQUE,
created_at TIMESTAMP DEFAULT NOW()
);
CREATE UNIQUE INDEX idx_email ON users (email);from migra import Schema, Table, Column, SchemaDiff
from migra.planner import MigrationPlanner
# Define schemas
source = Schema(tables=[
Table(name="users", columns=[
Column(name="id", type="INTEGER", primary_key=True),
Column(name="email", type="VARCHAR", max_length=255),
]),
])
target = Schema(tables=[
Table(name="users", columns=[
Column(name="id", type="INTEGER", primary_key=True),
Column(name="email", type="VARCHAR", max_length=255),
Column(name="name", type="TEXT"),
Column(name="age", type="INTEGER", nullable=False, default="0"),
]),
])
# Compute diff
diff = SchemaDiff(source=source, target=target)
changes = diff.compute()
for change in changes:
print(f"[{change.risk_level.value}] {change.description}")
# Generate migration SQL
planner = MigrationPlanner(dialect="postgresql")
plan = planner.plan(diff)
print(plan.up_sql)
print(plan.down_sql)
print(f"Max risk: {plan.max_risk.value}")from migra.validator import SchemaValidator
validator = SchemaValidator()
issues = validator.validate(schema)
for issue in issues:
print(issue) # [WARNING] NO_PRIMARY_KEY [logs]: Table logs has no primary keyfrom migra.snapshot import SnapshotManager
mgr = SnapshotManager()
mgr.save(current_schema, label="v1.0")
# Later...
if mgr.has_drift(current_schema):
details = mgr.drift_details(current_schema)
print(f"Schema drifted! {details['changes']} changes detected")| Level | Description | Example |
|---|---|---|
| SAFE | No data impact | Add nullable column, create table |
| LOW | Minimal impact | Add index, add constraint |
| MEDIUM | Moderate impact | Change column type, drop index |
| HIGH | Significant risk | Add NOT NULL without default, shrink VARCHAR |
| DESTRUCTIVE | Data loss possible | Drop table, drop column |
Create .migra.yml in your project root:
migrations_dir: db/migrations
dialect: postgresql
allow_destructive: false
naming_convention: timestamp
environments:
production:
dialect: postgresql
host: prod-db
development:
dialect: sqlite
host: localhostmigra includes a schema linter that checks for common issues:
migra lint schema.json[ERROR] NO_PRIMARY_KEY [logs]: Table has no primary key
[WARNING] RESERVED_WORD [users.order]: Column name is a SQL reserved word
[INFO] NAMING [UserProfiles]: Table name should be snake_case
[WARNING] DUPLICATE_INDEX [users]: idx_email and idx_users_email cover same columns
| Rule | Severity | Description |
|---|---|---|
NO_PRIMARY_KEY |
ERROR | Table missing primary key |
MISSING_FK_TARGET |
ERROR | Foreign key references non-existent table |
RESERVED_WORD |
WARNING | Column/table uses SQL reserved word |
DUPLICATE_INDEX |
WARNING | Multiple indexes cover same columns |
WIDE_VARCHAR |
INFO | VARCHAR > 4000 chars (consider TEXT) |
NAMING_CONVENTION |
INFO | Non-snake_case naming detected |
MISSING_TIMESTAMP |
INFO | Table lacks created_at/updated_at |
Normalize schemas to a canonical form for reliable comparison:
from migra.normalizer import SchemaNormalizer
normalizer = SchemaNormalizer()
normalized = normalizer.normalize(schema)
# Normalizes:
# - Column type aliases (INT -> INTEGER, BOOL -> BOOLEAN)
# - Default value expressions
# - Index naming
# - Constraint orderingMerge multiple schema files into one:
from migra.merge import SchemaMerger
merger = SchemaMerger()
merged = merger.merge([schema_users, schema_orders, schema_products])
# Validates cross-schema foreign key references
# Detects naming conflictsCollapse multiple migrations into a single optimized migration:
migra squash --from 001 --to 010 -n "consolidated_v1"from migra.squasher import MigrationSquasher
squasher = MigrationSquasher()
squashed = squasher.squash(migrations[0:10])
# Removes redundant operations (add column then drop same column)
# Optimizes ordering for minimal locksGenerate HTML or Markdown reports of schema changes:
migra report current.json target.json --format html -o report.html
migra report current.json target.json --format markdownmigra/
schema.py # Schema, Table, Column, Index, Constraint models
diff.py # Schema comparison engine with risk assessment
planner.py # SQL generation (PostgreSQL, MySQL, SQLite)
migration.py # Migration lifecycle management
parser.py # SQL DDL parser
snapshot.py # Point-in-time schema snapshots
validator.py # Schema best-practice validation
normalizer.py # Schema normalization to canonical form
merge.py # Multi-schema merging
squasher.py # Migration squashing and optimization
linter.py # Schema linting rules
report.py # HTML/Markdown report generation
graph.py # Table dependency graph
formatter.py # SQL formatting utilities
loader.py # File loading (JSON, SQL DDL)
config.py # Configuration management
cli.py # Click-based CLI