Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ Qhronos (v0.1.0) is a developer-first scheduling and notification platform. It l
**Event → Schedule → Occurrence Flow:**
- **Event:** User-defined intent and configuration, stored in the database.
- **Schedule:** Actionable plan (stored in Redis) for when the event should be executed. Created by the expander.
- **Occurrence:** Created only after a scheduled event is executed (dispatched by the dispatcher). Represents the actual attempt to process (dispatch) the event at a specific time, with status and result information.
- **Occurrence:** A record representing a specific scheduled instance of an Event. For recurring events, these are generated by the Expander. Each Occurrence tracks its lifecycle (e.g., pending, scheduled, dispatched, failed) and the outcome of its execution attempt by the Dispatcher.

For system architecture and in-depth design, see [design.md](./design.md).

Expand Down
22 changes: 12 additions & 10 deletions design.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ Qhronos is a developer-first, event scheduling and notification platform. It ena
[Scheduler Layer] ──► [Redis]
(Core Scheduler, Expander, Dispatcher, Archival Scheduler)
[Background Jobs: Expander, Dispatcher, Cleanup]
[Background Jobs: Expander, Dispatcher, Archival/Cleanup]
[Webhook Delivery]
Expand Down Expand Up @@ -101,6 +101,8 @@ Qhronos uses JWT tokens and a master token for API authentication. Access contro
### Middleware
- Cross-cutting concerns: logging, error handling, authentication, rate limiting
- Ensures consistent request/response processing and security
- **Ginzap Logger:** Uses `zap` for structured logging of HTTP requests and responses, including latency, status, etc., via `ginzap.Ginzap` and `ginzap.RecoveryWithZap`.
- **RequestIDMiddleware:** Injects a unique request ID into the context and logs for improved traceability.

### Services
- Encapsulate business logic not directly tied to HTTP or data access
Expand All @@ -112,11 +114,12 @@ Qhronos uses JWT tokens and a master token for API authentication. Access contro
- Maps Go models to database tables

### Scheduler Layer
- Background jobs for event expansion, webhook dispatch, and cleanup
- Expander: generates occurrences for recurring events
- Dispatcher: delivers webhooks for due occurrences, handles retries
- Cleanup: archives old data and enforces retention policies
- Uses Redis for coordination and scheduling
- Background jobs for event expansion, webhook dispatch, and data archival.
- Uses Redis for coordination and scheduling of tasks.
- **Core Scheduler (`scheduler.Scheduler`):** Manages the underlying scheduling mechanism in Redis, providing a foundation for other scheduler components like the Expander and Dispatcher to enqueue and manage scheduled tasks.
- **Expander (`scheduler.Expander`):** Periodically scans for recurring events. For each event, it computes the next set of occurrences based on the `look_ahead_duration` and `expansion_interval` settings and stores them (e.g., in Redis via the Core Scheduler, for the Dispatcher to pick up).
- **Dispatcher (`scheduler.Dispatcher`):** Retrieves due occurrences from the schedule (e.g., Redis, managed by Core Scheduler). It attempts to deliver the webhook to the event's configured URL, signing with HMAC if configured. Handles retries with backoff on failure, up to a maximum attempt count.
- **Archival Scheduler (`scheduler.ArchivalScheduler`):** Periodically archives old events, occurrences, and webhook attempts from the primary tables to archive tables based on configured retention policies and archival interval. This helps manage database size and performance.

### Models
- Defines data structures for events, occurrences, tokens, and errors
Expand Down Expand Up @@ -163,7 +166,7 @@ Qhronos uses JWT tokens and a master token for API authentication. Access contro
- Exceeding the limit results in a 429 response.

### Archiving & Retention
- The Cleanup job periodically archives old events, occurrences, and webhook attempts based on retention policies.
- The **Archival Scheduler** job periodically archives old events, occurrences, and webhook attempts based on retention policies.
- Archived data is moved to separate tables for long-term storage.

### Health & Status Endpoints
Expand All @@ -184,7 +187,6 @@ While the schema includes tables for analytics and performance metrics, the curr
| +----< archived_occurrences >
+----< archived_events >

[system_config] (global config)
[analytics_daily], [analytics_hourly], [performance_metrics] (aggregates)
```

Expand All @@ -194,7 +196,6 @@ While the schema includes tables for analytics and performance metrics, the curr
- **occurrences:** Each row represents a scheduled execution of an event, with status and delivery tracking.
- **webhook_attempts:** Logs each attempt to deliver a webhook for an occurrence, including status and response.
- **archived_events / archived_occurrences / archived_webhook_attempts:** Long-term storage for data past retention windows.
- **system_config:** Stores global configuration and retention policies.
- **analytics_daily / analytics_hourly / performance_metrics:** Tables for aggregated statistics and system performance. **Note:** These tables are present in the schema for future use. As of the current implementation, they are not yet populated or queried by the application logic. Enhanced analytics and reporting are planned for a future release.

## 7. API Reference
Expand All @@ -213,6 +214,7 @@ While the schema includes tables for analytics and performance metrics, the curr
| POST | /tokens | Create a new JWT token (admin) |
| GET | /status | Service status and health info |
| GET | /health | Simple health check |
| WS | /ws | Real-time event delivery via WebSocket |

### Request/Response Patterns
- All endpoints use JSON for request and response bodies.
Expand Down
6 changes: 3 additions & 3 deletions internal/scheduler/archiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package scheduler

import (
"context"
"encoding/json"
"time"

"github.com/feedloop/qhronos/internal/config"
Expand Down Expand Up @@ -56,7 +55,7 @@ func updateLastArchivalTimeRedis(ctx context.Context, rdb *redis.Client, checkPe
}

func syncRetentionConfigToDB(db *sqlx.DB, durations config.RetentionDurations) error {
value := map[string]interface{}{
/*value := map[string]interface{}{
"events": map[string]interface{}{
"max_past_occurrences": durations.Events.String(),
},
Expand All @@ -73,7 +72,8 @@ func syncRetentionConfigToDB(db *sqlx.DB, durations config.RetentionDurations) e
VALUES ($1, $2, $3, $4)
ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value, updated_at = now(), updated_by = EXCLUDED.updated_by
`, "retention_policies", valueJSON, "Data retention policies", "system")
return err
return err*/
return nil // Prevent writing to system_config
}

func StartArchivalScheduler(db *sqlx.DB, rdb *redis.Client, checkPeriod time.Duration, durations config.RetentionDurations, stopCh <-chan struct{}, logger *zap.Logger) {
Expand Down
6 changes: 3 additions & 3 deletions internal/testutils/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ func TestDB(t testing.TB) *sqlx.DB {

// Check if migrations have already been run
var count int
err = db.GetContext(context.Background(), &count, "SELECT COUNT(*) FROM pg_tables WHERE tablename = 'system_config'")
err = db.GetContext(context.Background(), &count, "SELECT COUNT(*) FROM pg_tables WHERE tablename = 'events'")
if err != nil || count == 0 {
// Run migrations
migration, err := os.ReadFile("migrations/001_initial_schema.sql")
migration, err := os.ReadFile("../../migrations/001_initial_schema.sql")
if err != nil {
t.Fatalf("Could not read migration file: %s", err)
}
Expand All @@ -53,4 +53,4 @@ func TestDB(t testing.TB) *sqlx.DB {
}

return db
}
}
54 changes: 27 additions & 27 deletions migrations/001_initial_schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -6,35 +6,35 @@ CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
-- Configuration Tables

-- System configuration including retention policies
CREATE TABLE system_config (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
key TEXT UNIQUE NOT NULL,
value JSONB NOT NULL,
description TEXT,
updated_at TIMESTAMPTZ DEFAULT now(),
updated_by TEXT NOT NULL
);
-- CREATE TABLE system_config (
-- id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
-- key TEXT UNIQUE NOT NULL,
-- value JSONB NOT NULL,
-- description TEXT,
-- updated_at TIMESTAMPTZ DEFAULT now(),
-- updated_by TEXT NOT NULL
-- );

-- Insert default retention policies
INSERT INTO system_config (key, value, description, updated_by) VALUES
('retention_policies', '{
"logs": {
"webhook_attempts": "30d",
"api_requests": "90d",
"error_logs": "180d",
"performance_metrics": "365d"
},
"events": {
"max_future_scheduling": "365d",
"max_past_occurrences": "30d",
"archived_events": "5y"
},
"analytics": {
"hourly_metrics": "30d",
"daily_metrics": "365d",
"monthly_metrics": "5y"
}
}', 'Data retention policies in days (d) or years (y)', 'system');
-- INSERT INTO system_config (key, value, description, updated_by) VALUES
-- ('retention_policies', '{
-- "logs": {
-- "webhook_attempts": "30d",
-- "api_requests": "90d",
-- "error_logs": "180d",
-- "performance_metrics": "365d"
-- },
-- "events": {
-- "max_future_scheduling": "365d",
-- "max_past_occurrences": "30d",
-- "archived_events": "5y"
-- },
-- "analytics": {
-- "hourly_metrics": "30d",
-- "daily_metrics": "365d",
-- "monthly_metrics": "5y"
-- }
-- }', 'Data retention policies in days (d) or years (y)', 'system');

-- Core Tables with Retention Policies

Expand Down
Loading