Educational Python application demonstrating local message queuing, stream processing, stream processing engine, windowing functions, data aggregation pipelines, and stream analytics with SQLite3 storage.
- FIFO Queue - First-in-first-out message ordering
- Priority Queue - Priority-based message handling
- Queue Persistence - SQLite storage for durability
- Queue Manager - Manage multiple queues
- Queue Statistics - Track enqueue/dequeue metrics
- Stream Processor - Process continuous data streams
- Transformations - Map, filter, reduce operations
- Stream Sources - Sensor data, events, queues
- Stream Sinks - Database, console, queue outputs
- Batch Processing - Process events in batches
- Tumbling Windows - Fixed-size, non-overlapping windows
- Sliding Windows - Overlapping time windows
- Window Aggregation - Aggregate data within windows
- Time-Based Grouping - Group events by time
- Window Management - Open, close, slide windows
- Multi-Stage Pipelines - Chain aggregation operations
- Group By - Aggregate by key
- Aggregation Functions - Sum, avg, count, min, max
- Pipeline Results - Store in SQLite
- Custom Aggregators - Build custom aggregations
- Real-Time Metrics - Track metrics as they happen
- Metric Statistics - Count, sum, avg, min, max
- Rate Calculation - Events per second
- Metric History - Store in SQLite
- Analytics Dashboard - View metrics via API
git clone https://github.com/Amruth22/Python-Stream-Processing-Engine.git
cd Python-Stream-Processing-Enginepython -m venv venv
# On Windows:
venv\Scripts\activate
# On macOS/Linux:
source venv/bin/activatepip install -r requirements.txtpython main.pypython api/app.pypython tests.pyPython-Stream-Processing-Engine/
│
├── queue/
│ ├── message_queue.py # Message queue
│ ├── queue_manager.py # Queue management
│ └── queue_store.py # SQLite persistence
│
├── stream/
│ ├── stream_processor.py # Stream engine
│ ├── stream_source.py # Data sources
│ └── stream_sink.py # Data sinks
│
├── windowing/
│ ├── window_functions.py # Base windowing
│ ├── tumbling_window.py # Tumbling windows
│ └── sliding_window.py # Sliding windows
│
├── aggregation/
│ ├── aggregation_pipeline.py # Aggregation pipeline
│ ├── aggregators.py # Aggregator functions
│ └── pipeline_store.py # SQLite storage
│
├── analytics/
│ ├── stream_analytics.py # Stream analytics
│ └── analytics_engine.py # Analytics engine
│
├── api/
│ └── app.py # Flask API
│
├── main.py # Demonstration
├── tests.py # 10 unit tests
└── README.md # This file
from queue.message_queue import MessageQueue
# Create queue
queue = MessageQueue('events', max_size=1000)
# Enqueue messages
queue.enqueue({'event': 'user_login', 'user_id': 1})
queue.enqueue({'event': 'page_view', 'page': '/dashboard'})
# Dequeue messages
message = queue.dequeue()
print(message)
# Get stats
stats = queue.get_stats()
print(f"Queue size: {stats['current_size']}")from stream.stream_processor import StreamProcessor
# Create processor
processor = StreamProcessor('my-processor')
# Add transformations
processor.map(lambda x: {**x, 'value': x['value'] * 2})
processor.filter(lambda x: x['value'] > 10)
# Process events
events = [{'value': 5}, {'value': 10}, {'value': 15}]
results = processor.process_batch(events)from windowing.tumbling_window import TumblingWindow
# Create 10-second tumbling window
window = TumblingWindow(duration=10)
# Add events
window.add_event({'value': 10, 'timestamp': time.time()})
window.add_event({'value': 20, 'timestamp': time.time()})
# Get closed windows
closed_windows = window.get_closed_windows()
# Aggregate window
total = window.aggregate(lambda events: sum(e['value'] for e in events))from windowing.sliding_window import SlidingWindow
# Create sliding window: 30s size, slide every 10s
window = SlidingWindow(size=30, slide=10)
# Add events
window.add_event({'value': 10, 'timestamp': time.time()})
# Get current window
current = window.get_current_window()
print(f"Events in window: {current['count']}")from aggregation.aggregation_pipeline import AggregationPipeline
# Create pipeline
pipeline = AggregationPipeline('sales')
# Configure
pipeline.group_by('product')
pipeline.aggregate('count', '*')
pipeline.aggregate('sum', 'amount')
pipeline.aggregate('avg', 'amount')
# Process data
data = [
{'product': 'Laptop', 'amount': 999.99},
{'product': 'Laptop', 'amount': 1299.99},
{'product': 'Mouse', 'amount': 29.99}
]
results = pipeline.process(data)
# Results: {'Laptop': {'count': 2, 'sum': 2299.98, 'avg': 1149.99}}from analytics.stream_analytics import StreamAnalytics
# Create analytics
analytics = StreamAnalytics()
# Track metrics
analytics.track_metric('page_views', 1)
analytics.track_metric('api_calls', 5)
# Get metrics
metrics = analytics.get_all_metrics()
print(metrics)
# Get specific metric
page_views = analytics.get_metric('page_views')
print(f"Page views: {page_views['count']}")
print(f"Rate: {page_views['rate_per_second']}/sec")Fixed-size, non-overlapping windows
[0-10s] [10-20s] [20-30s] [30-40s]
| | | |
W1 W2 W3 W4
Each event belongs to exactly one window
Overlapping windows
[0-30s]
[10-40s]
[20-50s]
[30-60s]
Events can belong to multiple windows
Run the comprehensive test suite:
python tests.py- ✅ Message Queue - Test enqueue/dequeue
- ✅ Queue Persistence - Test SQLite storage
- ✅ Stream Processing - Test pipeline
- ✅ Tumbling Window - Test fixed windows
- ✅ Sliding Window - Test overlapping windows
- ✅ Data Aggregation - Test aggregation functions
- ✅ Stream Analytics - Test real-time metrics
- ✅ Aggregators - Test count, sum, avg
- ✅ Window Aggregation - Test windowed aggregation
- ✅ Queue Manager - Test queue management
Batch:
- Process data in chunks
- Higher latency
- Simpler to implement
Stream:
- Process data as it arrives
- Low latency
- Real-time results
Use Cases:
- Calculate metrics per time period
- Detect patterns in time windows
- Aggregate recent data
- Time-based analytics
Common Aggregations:
- Count: Number of events
- Sum: Total of values
- Avg: Average value
- Min/Max: Range of values
For production use:
-
Message Broker:
- Use Kafka or RabbitMQ
- Implement distributed queuing
- Add message persistence
-
Stream Processing:
- Use Apache Flink or Spark Streaming
- Implement exactly-once semantics
- Add checkpointing
-
Storage:
- Use time-series database
- Implement data partitioning
- Add data retention policies
-
Scalability:
- Distribute processing
- Add horizontal scaling
- Implement backpressure
- Flask 3.0.0 - Web framework
- python-dotenv 1.0.0 - Environment variables
- pytest 7.4.3 - Testing framework
- requests 2.31.0 - HTTP client
- sqlite3 - Database (built-in)
This project is for educational purposes. Feel free to use and modify as needed.
Happy Streaming! 🚀