SQL-style query optimization for Unix pipes
dbbasic-pipe
brings bidirectional flow control to Unix pipelines, allowing downstream commands to signal upstream commands to optimize data production—just like SQL query optimizers push predicates down and apply limits early.
Traditional Unix pipes are one-way: data flows left-to-right with no way for downstream commands to tell upstream "I only need 100 items" or "stop after the first match."
# Inefficient: cat reads entire 10GB file
cat huge.log | grep ERROR | head -10
dbbasic-pipe
adds an out-of-band coordination channel via environment variables and Unix sockets, letting commands communicate control messages while data still flows through normal pipes.
# Efficient: pcat stops reading after producing enough matches
dbbasic-pipe pcat huge.log | pfilter ERROR | plimit 10
pip install dbbasic-pipe
# Create test data
cat > data.json << EOF
{"name": "Alice", "age": 25}
{"name": "Bob", "age": 17}
{"name": "Charlie", "age": 30}
{"name": "David", "age": 15}
{"name": "Eve", "age": 22}
EOF
# Run with coordination (optimized)
dbbasic-pipe pcat data.json | pfilter 'age > 18' | plimit 2
Output:
{"name": "Alice", "age": 25}
{"name": "Charlie", "age": 30}
What happened:
plimit
sent backpressure signal: "I only need 2 items"pfilter
received it and forwarded upstreampcat
stopped reading after producing 2 matching records (instead of reading all 5)
Wrapper that enables coordination for a pipeline.
dbbasic-pipe <command>
Example:
dbbasic-pipe bash -c 'pcat data.json | pfilter "age > 18" | plimit 10'
Smart cat - reads files and respects backpressure signals.
pcat data.json
pcat large.log | plimit 100
Filter JSON lines by Python expressions.
cat data.json | pfilter 'age > 18'
cat data.json | pfilter 'name == "Alice"'
cat data.json | pfilter 'status == "active" and score > 50'
Output first N lines and signal backpressure upstream.
cat data.json | plimit 10
Data flow: pcat → pipe → pfilter → pipe → plimit
Control flow: pcat ← socket ← pfilter ← socket ← plimit
↓ ↓
└──→ coordinator ←──────┘
dbbasic-pipe
setsPIPE_CONTROL_SOCKET
environment variable- Starts
pipe-coordinator
service in background - Commands detect the env var and connect to coordinator socket
- Commands exchange JSON control messages:
register
: Announce presencebackpressure
: Signal "I only need N more items"complete
: Notify when done
{"type": "register", "pid": 12345, "command": "pfilter", "predicate": "age > 18"}
{"type": "backpressure", "count": 100}
{"type": "complete", "pid": 12345, "processed": 150, "output": 100}
All commands work without coordination too:
# Works fine, just no optimization
pcat data.json | pfilter 'age > 18' | plimit 10
# Stop after finding 10 errors (doesn't read entire file)
dbbasic-pipe pcat /var/log/huge.log | pfilter 'ERROR' | plimit 10
With future pcurl
command:
# pcurl could add ?limit=100 to API request based on backpressure
dbbasic-pipe pcurl api.com/users | pfilter 'age > 18' | plimit 100
With future psql
wrapper:
# psql-smart could build optimized query: WHERE age>18 LIMIT 100
dbbasic-pipe psql-smart -c 'SELECT * FROM users' | pfilter 'age>18' | plimit 100
Scenario | Traditional | With dbbasic-pipe |
---|---|---|
cat 10GB.log | grep ERROR | head -10 |
Reads entire 10GB | Stops after ~10 matches |
cat users.json | filter age>18 | head -100 |
Processes all records | Stops at ~100 matches |
API calls | Downloads all data | Could request ?limit=100 |
This approach is inspired by:
- SQL query optimizers - Push predicates down, apply limits early
- Reactive Streams - Explicit backpressure protocol
- Apache Beam/Flink - Distributed query planning
- Rust iterators - Lazy evaluation with
take()
- PRQL/Kusto - Pipe-style query languages
# Clone repo
git clone https://github.com/askrobots/dbbasic-pipe.git
cd dbbasic-pipe
# Install in development mode
pip install -e .
# Run tests
python -m pytest
Create your own coordination-aware commands:
#!/usr/bin/env python3
import os, sys, socket, json
ctrl_socket = os.getenv('PIPE_CONTROL_SOCKET')
if ctrl_socket:
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.connect(ctrl_socket)
# Register
sock.send(json.dumps({
'type': 'register',
'pid': os.getpid(),
'command': 'my-tool'
}).encode() + b'\n')
# Listen for backpressure
# ... check sock with select()
# Process data via stdin/stdout as normal
- pcurl - HTTP client that adds query params based on backpressure
- psql-smart - Builds optimal SQL queries from downstream filters
- pgrep - Stops searching after limit reached
- ptail - Stops tailing after condition met
- Type system - Commands advertise schema
- Distributed mode - Coordinator as network service
- Query optimization - Merge adjacent filters, reorder operations
Contributions welcome! Please open issues or PRs at: https://github.com/askrobots/dbbasic-pipe
MIT License - see LICENSE file
Dan Quellhorst
- GitHub: @askrobots
- Website: quellhorst.com
- dbbasic-tsv - TSV data processing
- dbbasic-content - Content management tools
- dbbasic-video - Video processing utilities