From 078728d711b7ed67d54975a64e09944093c35de7 Mon Sep 17 00:00:00 2001 From: Ahmad Rizqi Meydiarso Date: Thu, 8 May 2025 09:36:59 +0700 Subject: [PATCH] Refactor: Remove system_config table and usage --- README.md | 2 +- design.md | 22 +++++++------ internal/scheduler/archiver.go | 6 ++-- internal/testutils/db.go | 6 ++-- migrations/001_initial_schema.sql | 54 +++++++++++++++---------------- 5 files changed, 46 insertions(+), 44 deletions(-) diff --git a/README.md b/README.md index 585b0e8..d9fb493 100644 --- a/README.md +++ b/README.md @@ -27,7 +27,7 @@ Qhronos (v0.1.0) is a developer-first scheduling and notification platform. It l **Event → Schedule → Occurrence Flow:** - **Event:** User-defined intent and configuration, stored in the database. - **Schedule:** Actionable plan (stored in Redis) for when the event should be executed. Created by the expander. -- **Occurrence:** Created only after a scheduled event is executed (dispatched by the dispatcher). Represents the actual attempt to process (dispatch) the event at a specific time, with status and result information. +- **Occurrence:** A record representing a specific scheduled instance of an Event. For recurring events, these are generated by the Expander. Each Occurrence tracks its lifecycle (e.g., pending, scheduled, dispatched, failed) and the outcome of its execution attempt by the Dispatcher. For system architecture and in-depth design, see [design.md](./design.md). diff --git a/design.md b/design.md index a55700c..7d1d81f 100644 --- a/design.md +++ b/design.md @@ -44,9 +44,9 @@ Qhronos is a developer-first, event scheduling and notification platform. It ena │ ▼ [Scheduler Layer] ──► [Redis] - │ + │ (Core Scheduler, Expander, Dispatcher, Archival Scheduler) ▼ - [Background Jobs: Expander, Dispatcher, Cleanup] + [Background Jobs: Expander, Dispatcher, Archival/Cleanup] │ ▼ [Webhook Delivery] @@ -101,6 +101,8 @@ Qhronos uses JWT tokens and a master token for API authentication. Access contro ### Middleware - Cross-cutting concerns: logging, error handling, authentication, rate limiting - Ensures consistent request/response processing and security +- **Ginzap Logger:** Uses `zap` for structured logging of HTTP requests and responses, including latency, status, etc., via `ginzap.Ginzap` and `ginzap.RecoveryWithZap`. +- **RequestIDMiddleware:** Injects a unique request ID into the context and logs for improved traceability. ### Services - Encapsulate business logic not directly tied to HTTP or data access @@ -112,11 +114,12 @@ Qhronos uses JWT tokens and a master token for API authentication. Access contro - Maps Go models to database tables ### Scheduler Layer -- Background jobs for event expansion, webhook dispatch, and cleanup -- Expander: generates occurrences for recurring events -- Dispatcher: delivers webhooks for due occurrences, handles retries -- Cleanup: archives old data and enforces retention policies -- Uses Redis for coordination and scheduling +- Background jobs for event expansion, webhook dispatch, and data archival. +- Uses Redis for coordination and scheduling of tasks. +- **Core Scheduler (`scheduler.Scheduler`):** Manages the underlying scheduling mechanism in Redis, providing a foundation for other scheduler components like the Expander and Dispatcher to enqueue and manage scheduled tasks. +- **Expander (`scheduler.Expander`):** Periodically scans for recurring events. For each event, it computes the next set of occurrences based on the `look_ahead_duration` and `expansion_interval` settings and stores them (e.g., in Redis via the Core Scheduler, for the Dispatcher to pick up). +- **Dispatcher (`scheduler.Dispatcher`):** Retrieves due occurrences from the schedule (e.g., Redis, managed by Core Scheduler). It attempts to deliver the webhook to the event's configured URL, signing with HMAC if configured. Handles retries with backoff on failure, up to a maximum attempt count. +- **Archival Scheduler (`scheduler.ArchivalScheduler`):** Periodically archives old events, occurrences, and webhook attempts from the primary tables to archive tables based on configured retention policies and archival interval. This helps manage database size and performance. ### Models - Defines data structures for events, occurrences, tokens, and errors @@ -163,7 +166,7 @@ Qhronos uses JWT tokens and a master token for API authentication. Access contro - Exceeding the limit results in a 429 response. ### Archiving & Retention -- The Cleanup job periodically archives old events, occurrences, and webhook attempts based on retention policies. +- The **Archival Scheduler** job periodically archives old events, occurrences, and webhook attempts based on retention policies. - Archived data is moved to separate tables for long-term storage. ### Health & Status Endpoints @@ -184,7 +187,6 @@ While the schema includes tables for analytics and performance metrics, the curr | +----< archived_occurrences > +----< archived_events > -[system_config] (global config) [analytics_daily], [analytics_hourly], [performance_metrics] (aggregates) ``` @@ -194,7 +196,6 @@ While the schema includes tables for analytics and performance metrics, the curr - **occurrences:** Each row represents a scheduled execution of an event, with status and delivery tracking. - **webhook_attempts:** Logs each attempt to deliver a webhook for an occurrence, including status and response. - **archived_events / archived_occurrences / archived_webhook_attempts:** Long-term storage for data past retention windows. -- **system_config:** Stores global configuration and retention policies. - **analytics_daily / analytics_hourly / performance_metrics:** Tables for aggregated statistics and system performance. **Note:** These tables are present in the schema for future use. As of the current implementation, they are not yet populated or queried by the application logic. Enhanced analytics and reporting are planned for a future release. ## 7. API Reference @@ -213,6 +214,7 @@ While the schema includes tables for analytics and performance metrics, the curr | POST | /tokens | Create a new JWT token (admin) | | GET | /status | Service status and health info | | GET | /health | Simple health check | +| WS | /ws | Real-time event delivery via WebSocket | ### Request/Response Patterns - All endpoints use JSON for request and response bodies. diff --git a/internal/scheduler/archiver.go b/internal/scheduler/archiver.go index 4710635..421a3d0 100644 --- a/internal/scheduler/archiver.go +++ b/internal/scheduler/archiver.go @@ -2,7 +2,6 @@ package scheduler import ( "context" - "encoding/json" "time" "github.com/feedloop/qhronos/internal/config" @@ -56,7 +55,7 @@ func updateLastArchivalTimeRedis(ctx context.Context, rdb *redis.Client, checkPe } func syncRetentionConfigToDB(db *sqlx.DB, durations config.RetentionDurations) error { - value := map[string]interface{}{ + /*value := map[string]interface{}{ "events": map[string]interface{}{ "max_past_occurrences": durations.Events.String(), }, @@ -73,7 +72,8 @@ func syncRetentionConfigToDB(db *sqlx.DB, durations config.RetentionDurations) e VALUES ($1, $2, $3, $4) ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value, updated_at = now(), updated_by = EXCLUDED.updated_by `, "retention_policies", valueJSON, "Data retention policies", "system") - return err + return err*/ + return nil // Prevent writing to system_config } func StartArchivalScheduler(db *sqlx.DB, rdb *redis.Client, checkPeriod time.Duration, durations config.RetentionDurations, stopCh <-chan struct{}, logger *zap.Logger) { diff --git a/internal/testutils/db.go b/internal/testutils/db.go index a72d067..47fb9fe 100644 --- a/internal/testutils/db.go +++ b/internal/testutils/db.go @@ -38,10 +38,10 @@ func TestDB(t testing.TB) *sqlx.DB { // Check if migrations have already been run var count int - err = db.GetContext(context.Background(), &count, "SELECT COUNT(*) FROM pg_tables WHERE tablename = 'system_config'") + err = db.GetContext(context.Background(), &count, "SELECT COUNT(*) FROM pg_tables WHERE tablename = 'events'") if err != nil || count == 0 { // Run migrations - migration, err := os.ReadFile("migrations/001_initial_schema.sql") + migration, err := os.ReadFile("../../migrations/001_initial_schema.sql") if err != nil { t.Fatalf("Could not read migration file: %s", err) } @@ -53,4 +53,4 @@ func TestDB(t testing.TB) *sqlx.DB { } return db -} \ No newline at end of file +} diff --git a/migrations/001_initial_schema.sql b/migrations/001_initial_schema.sql index 3cc0a18..514d2f1 100644 --- a/migrations/001_initial_schema.sql +++ b/migrations/001_initial_schema.sql @@ -6,35 +6,35 @@ CREATE EXTENSION IF NOT EXISTS "uuid-ossp"; -- Configuration Tables -- System configuration including retention policies -CREATE TABLE system_config ( - id UUID PRIMARY KEY DEFAULT gen_random_uuid(), - key TEXT UNIQUE NOT NULL, - value JSONB NOT NULL, - description TEXT, - updated_at TIMESTAMPTZ DEFAULT now(), - updated_by TEXT NOT NULL -); +-- CREATE TABLE system_config ( +-- id UUID PRIMARY KEY DEFAULT gen_random_uuid(), +-- key TEXT UNIQUE NOT NULL, +-- value JSONB NOT NULL, +-- description TEXT, +-- updated_at TIMESTAMPTZ DEFAULT now(), +-- updated_by TEXT NOT NULL +-- ); -- Insert default retention policies -INSERT INTO system_config (key, value, description, updated_by) VALUES -('retention_policies', '{ - "logs": { - "webhook_attempts": "30d", - "api_requests": "90d", - "error_logs": "180d", - "performance_metrics": "365d" - }, - "events": { - "max_future_scheduling": "365d", - "max_past_occurrences": "30d", - "archived_events": "5y" - }, - "analytics": { - "hourly_metrics": "30d", - "daily_metrics": "365d", - "monthly_metrics": "5y" - } -}', 'Data retention policies in days (d) or years (y)', 'system'); +-- INSERT INTO system_config (key, value, description, updated_by) VALUES +-- ('retention_policies', '{ +-- "logs": { +-- "webhook_attempts": "30d", +-- "api_requests": "90d", +-- "error_logs": "180d", +-- "performance_metrics": "365d" +-- }, +-- "events": { +-- "max_future_scheduling": "365d", +-- "max_past_occurrences": "30d", +-- "archived_events": "5y" +-- }, +-- "analytics": { +-- "hourly_metrics": "30d", +-- "daily_metrics": "365d", +-- "monthly_metrics": "5y" +-- } +-- }', 'Data retention policies in days (d) or years (y)', 'system'); -- Core Tables with Retention Policies