From 784a036edeaeb89f00719237c145db7945f83df1 Mon Sep 17 00:00:00 2001 From: saptarshi Date: Thu, 12 Mar 2026 23:47:16 +0530 Subject: [PATCH 1/2] added rollout and configurable heartbeat for sdk --- README.md | 182 +++-------- cmd/configctl/README.md | 234 ++++++++++++++ cmd/configctl/main.go | 2 + include/configclient/config_client.h | 13 +- include/configclient/config_client_impl.h | 10 +- .../distribution_service/database_manager.h | 16 + .../distribution_service.h | 23 ++ internal/commands/promote.go | 85 ++++++ internal/commands/rollout.go | 106 +++++++ src/api-service/README.md | 2 +- src/api-service/api_service.cpp | 8 +- src/client-sdk/README.md | 105 +++++++ src/client-sdk/config_client.cpp | 6 +- src/client-sdk/config_client_impl.cpp | 62 +++- src/distribution-service/README.md | 39 ++- src/distribution-service/config.cpp | 9 +- src/distribution-service/database_manager.cpp | 184 +++++++++++ .../distribution_service.cpp | 289 +++++++++++++++++- 18 files changed, 1210 insertions(+), 165 deletions(-) create mode 100644 cmd/configctl/README.md create mode 100644 internal/commands/promote.go create mode 100644 internal/commands/rollout.go create mode 100644 src/client-sdk/README.md diff --git a/README.md b/README.md index 49e3265..337aa7d 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # Konfig - Dynamic Configuration Service -A distributed configuration management system built with C++17, gRPC, Kafka, and modern observability tools. Upload, validate, distribute, and roll out configuration changes across services in real-time. +A distributed configuration management system built with C++17, gRPC, Kafka, and modern observability tools. Upload, validate, and roll out configuration changes to thousands of services in real-time. ## Quick Start @@ -15,33 +15,37 @@ make services make cli # 4. Upload a config -./bin/konfig upload --service my-service --file config.json --format json +./bin/configctl upload my-service --file config.yaml --format yaml -# 5. Retrieve it -./bin/konfig get --id my-service-v1 +# 5. Roll it out to all instances +./bin/configctl rollout my-service-v1 --strategy ALL_AT_ONCE ``` ## Architecture ``` ┌──────────────────┐ - │ CLI (konfig) │ + │ CLI (configctl) │ │ Go / gRPC │ └────────┬─────────┘ - │ + │ :8081 + ┌────────▼─────────┐ + │ API Service │ + │ Upload/Rollout │ + └────────┬─────────┘ + │ Kafka ┌──────────────┼──────────────┐ │ │ │ ┌──────▼──────┐ ┌────▼─────┐ ┌─────▼──────┐ - │ API Service │ │ Dist │ │ Validation │ - │ :8081 │ │ Service │ │ Service │ - │ (gRPC) │ │ :8082 │ │ :8083 │ - └──────┬──────┘ └────┬─────┘ └─────┬──────┘ - │ │ │ - ┌───────┬───┴───┬────────┤ │ - │ │ │ │ │ - ┌────▼──┐ ┌─▼───┐ ┌─▼────┐ ┌─▼───┐ ┌────▼──┐ - │Postgres│ │Redis│ │Kafka │ │Redis│ │Postgres│ - └───────┘ └─────┘ └──────┘ └─────┘ └───────┘ + │ PostgreSQL │ │ Redis │ │Distribution│ + │ (config, │ │ (cache) │ │ Service │ + │ rollouts) │ └──────────┘ │ :8082 │ + └─────────────┘ └─────┬──────┘ + │ gRPC stream + ┌──────▼──────┐ + │ Client SDK │ + │ (C++ lib) │ + └─────────────┘ ``` ### Services @@ -51,16 +55,14 @@ make cli | **API Service** | 8081 | Config upload, retrieval, deletion, rollout management | | **Distribution Service** | 8082 | Real-time config push to clients via gRPC streaming | | **Validation Service** | 8083 | Config syntax/schema/rule validation (JSON & YAML) | -| **CLI (`konfig`)** | - | Command-line tool for interacting with the API | -| **Client SDK** | - | C++ library for services to receive config updates | ### Infrastructure | Component | Port | Purpose | |-----------|------|---------| -| PostgreSQL | 5432 | Config metadata, data, audit logs, validation rules | -| Redis | 6379 | Caching, validation result cache | -| Kafka | 9092/9093 | Event streaming, config update notifications | +| PostgreSQL | 5432 | Config metadata, data, audit logs, rollout state | +| Redis | 6379 | Caching layer | +| Kafka | 9092/9093 | Event streaming (rollout triggers, config notifications) | | Prometheus | 9090 | Metrics collection | | Grafana | 3000 | Metrics dashboards (admin/admin) | | Kafka UI | 8080 | Topic and message inspection | @@ -72,43 +74,21 @@ make cli ``` Konfig/ ├── cmd/configctl/ # CLI tool (Go) -│ └── main.go ├── config/ # Service configuration files -│ ├── api-service.yml # Docker config -│ ├── api-service-local.yml # Local dev config -│ ├── distribution-service.yml -│ ├── distribution-service-local.yml -│ └── validation-service.yml -├── db/migrations/ # PostgreSQL schema migrations (000-008) -├── docker/ -│ ├── postgres/init.sql # DB initialization (runs migrations) -│ ├── grafana/ # Grafana provisioning -│ └── services/ # Per-service Dockerfiles -│ ├── api-service.Dockerfile -│ ├── distribution-service.Dockerfile -│ └── validation-service.Dockerfile +├── db/migrations/ # PostgreSQL schema migrations +├── docker/ # Dockerfiles and init scripts +├── examples/ # Example configs and client ├── include/ # C++ headers -│ ├── api_service/ -│ ├── distribution_service/ -│ ├── validation_service/ -│ ├── configclient/ # Client SDK headers -│ └── statsdclient/ # StatsD client +├── internal/commands/ # CLI command implementations ├── proto/ # Protocol Buffer definitions -│ ├── api.proto # ConfigAPIService RPCs -│ ├── distribution.proto # DistributionService RPCs -│ ├── validation.proto # ValidationService RPCs -│ └── config.proto # Shared message types -├── prometheus/ # Prometheus & StatsD config -├── scripts/ # Build helper scripts ├── src/ -│ ├── api-service/ # API Service implementation -│ ├── distribution-service/ # Distribution Service implementation -│ ├── validation-service/ # Validation Service implementation -│ ├── client-sdk/ # Client SDK library -│ └── common/ # Shared utilities (StatsD client) -├── docker-compose.yml # All containers -├── Dockerfile.dev # Development build container -└── Makefile # Build automation +│ ├── api-service/ +│ ├── distribution-service/ +│ ├── validation-service/ +│ ├── client-sdk/ +│ └── common/ +├── docker-compose.yml +└── Makefile ``` ## Makefile Commands @@ -127,7 +107,6 @@ make services-local # Build binaries locally (no Docker) make dev # Complete setup (dirs + infrastructure + verify) make infra-up # Start infrastructure containers make infra-down # Stop infrastructure containers -make infra-restart # Restart infrastructure make verify # Health check all services ``` @@ -135,12 +114,9 @@ make verify # Health check all services ```bash make proto # Generate protobuf/gRPC code -make api-service # Build API service locally -make distribution-service # Build distribution service locally -make validation-service # Build validation service locally make sdk # Build client SDK (shared + static) make cache-test # Build disk cache test binary (bin/cache_test) -make cli # Build CLI tool (bin/konfig) +make cli # Build CLI tool (bin/configctl) make all # Build everything locally make clean # Remove build artifacts make rebuild # Clean + build all @@ -154,55 +130,18 @@ make redis-shell # Redis CLI make kafka-topics # List Kafka topics make kafka-ui # Open Kafka UI in browser make grafana # Open Grafana in browser -make pgadmin # Open pgAdmin in browser ``` -See [COMMANDS.md](COMMANDS.md) for the complete command reference. - -## CLI Usage +### Dev Container ```bash -# Upload configuration -./bin/konfig upload --service payment-service --file config.json --format json - -# Get config by ID -./bin/konfig get --id payment-service-v1 - -# List configs for a service -./bin/konfig list --service payment-service - -# Validate without uploading -./bin/konfig validate --service payment-service --file config.json --format json - -# Delete a config -./bin/konfig delete --id payment-service-v1 - -# Check rollout status -./bin/konfig status --id payment-service-v1 - -# Rollback to previous version -./bin/konfig rollback --service payment-service --version 1 +make dev-up # Start dev container +make dev-shell # Enter interactive shell +make dev-build # Build inside container +make dev-sdk # Build client SDK inside container ``` -## Database Schema - -Managed via migrations in `db/migrations/` (000-008): - -| Table | Purpose | -|-------|---------| -| `config_metadata` | Service name, version, format, timestamps | -| `config_data` | Actual config content and hashes (FK to metadata) | -| `rollout_state` | Gradual rollout tracking | -| `service_instances` | Connected client instances | -| `audit_log` | All config actions with JSONB details | -| `health_checks` | Service health status | -| `validation_schemas` | Registered validation schemas | -| `validation_rules` | Custom validation rules per service | -| `validation_history` | Validation result audit trail | - -## Development - -### Local Development +## Local Development ```bash # Start infrastructure @@ -217,33 +156,9 @@ make services-local ./bin/distribution-service config/distribution-service-local.yml ``` -### Docker Development - -```bash -# Start everything -make infra-up && make services - -# View logs -docker compose logs -f api-service -docker compose logs -f validation-service - -# Rebuild after code changes -make services -``` - -### Dev Container (for building inside Linux) - -```bash -make dev-up # Start dev container -make dev-shell # Enter interactive shell -make dev-build # Build inside container -make dev-sdk # Build client SDK inside container -make dev-cache-test # Build disk cache test binary inside container -``` - ## Client SDK -The C++ client SDK (`libconfigclient`) lets services subscribe to real-time config updates with automatic reconnection and disk caching. +The C++ client SDK (`libconfigclient`) lets services subscribe to real-time config updates with automatic reconnection, configurable heartbeating, and disk caching. See [Client SDK](src/client-sdk/README.md) for the full reference including heartbeat configuration and advanced options. ```cpp #include "configclient/config_client.h" @@ -281,7 +196,7 @@ make dev-sdk && make dev-cache-test ./bin/cache_test distribution-service:8082 payment-service # Step 2 — upload a config (separate terminal) -./bin/konfig upload --service payment-service --file examples/configs/valid-config.json --format json +./bin/configctl upload payment-service --file examples/configs/valid-config.json --format json # cache_test prints: >>> CONFIG UPDATE <<< and [DiskCache] Saved config v1 ... # Step 3 — restart: cache loads before gRPC connects @@ -318,18 +233,19 @@ Headers are in `include/configclient/`. Libraries are in `lib/` after `make sdk` | PostgreSQL | `localhost:5432` | `postgres:5432` | | Redis | `localhost:6379` | `redis:6379` | | Kafka | `localhost:9093` | `kafka:9092` | -| StatsD | `localhost:9125` | `statsd-exporter:9125` | | API Service | `localhost:8081` | `api-service:8081` | | Validation Service | `localhost:8083` | `validation-service:8083` | | Distribution Service | `localhost:8082` | `distribution-service:8082` | **Database credentials:** `configuser` / `configpass` / `configservice` -## Service Documentation +## Documentation -- [API Service](src/api-service/README.md) -- [Distribution Service](src/distribution-service/README.md) -- [Validation Service](src/validation-service/README.md) +- [CLI Reference](cmd/configctl/README.md) — all commands, flags, rollout strategies +- [Client SDK](src/client-sdk/README.md) — C++ SDK usage, heartbeat config, disk cache +- [API Service](src/api-service/README.md) — gRPC API, upload flow, components +- [Distribution Service](src/distribution-service/README.md) — streaming, rollout execution, heartbeat monitor +- [Validation Service](src/validation-service/README.md) — schema validation, rules ## License diff --git a/cmd/configctl/README.md b/cmd/configctl/README.md new file mode 100644 index 0000000..80d536b --- /dev/null +++ b/cmd/configctl/README.md @@ -0,0 +1,234 @@ +# configctl — CLI Reference + +`configctl` is the command-line tool for managing configurations in Konfig. + +```bash +make cli # Build to ./bin/configctl +``` + +## Global Options + +``` +--server, -s API server address (default: localhost:8081) +--output, -o Output format: table | json | yaml (default: table) +--verbose, -v Verbose output +``` + +The server address can also be set via the `KONFIG_SERVER` environment variable: + +```bash +export KONFIG_SERVER=api-service:8081 +``` + +--- + +## upload + +Upload a new config version for a service. + +```bash +configctl upload --file --format +``` + +Each upload increments the version counter and creates a new config ID in the form `-v`. The config is validated before storage. + +```bash +./bin/configctl upload payment-service --file config.yaml --format yaml +# → Created: payment-service-v3 +``` + +--- + +## rollout + +Start a rollout to distribute a config to connected instances. + +```bash +configctl rollout --strategy [--percentage ] +``` + +| Flag | Default | Description | +|------|---------|-------------| +| `--strategy` | `ALL_AT_ONCE` | `ALL_AT_ONCE`, `CANARY`, or `PERCENTAGE` | +| `--percentage` | `100` | Target percentage for `PERCENTAGE` strategy | +| `--server` | `localhost:8081` | API server address | + +### Strategies + +| Strategy | Behaviour | +|----------|-----------| +| `ALL_AT_ONCE` | Pushes to all connected instances immediately. Completes when all instances receive the config. | +| `CANARY` | Pushes to ~10% of instances and stays `IN_PROGRESS`. Use `promote` to push to the rest after verification. | +| `PERCENTAGE` | Pushes to the specified percentage of instances (1–100). | + +```bash +# Push to everything at once +./bin/configctl rollout payment-service-v3 --strategy ALL_AT_ONCE + +# Canary — 10% first +./bin/configctl rollout payment-service-v3 --strategy CANARY + +# Half of instances +./bin/configctl rollout payment-service-v3 --strategy PERCENTAGE --percentage 50 +``` + +--- + +## promote + +Promote a `CANARY` rollout that is `IN_PROGRESS` to all remaining instances. + +```bash +configctl promote +``` + +Validates that the rollout is `CANARY` and `IN_PROGRESS` before proceeding. Use this after verifying that the canary instances look healthy. + +```bash +./bin/configctl promote payment-service-v3 +# → Pushing to all instances → COMPLETED +``` + +--- + +## rollback + +Rollback a service to a previous config version. + +```bash +configctl rollback --to-version +``` + +| Flag | Description | +|------|-------------| +| `--to-version` | Target version number (`0` = previous version) | + +```bash +# Rollback to version 2 +./bin/configctl rollback payment-service --to-version 2 + +# Rollback to previous version +./bin/configctl rollback payment-service --to-version 0 +``` + +The rollback creates a new config version copying the content from the target version, then triggers an `ALL_AT_ONCE` rollout. + +--- + +## status + +Show the rollout status of a config. + +```bash +configctl status +``` + +```bash +./bin/configctl status payment-service-v3 +``` + +Output: +``` +Rollout Status +━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ +Config ID: payment-service-v3 +Strategy: CANARY +Progress: 10% / 10% +Status: IN_PROGRESS +Started: 2024-01-15T10:00:00Z + +Instances: +INSTANCE ID VERSION STATUS +━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ +instance-abc 3 UPDATED +instance-xyz 2 PENDING +``` + +--- + +## get + +Retrieve a config by ID. + +```bash +configctl get +``` + +```bash +./bin/configctl get payment-service-v3 +``` + +--- + +## list + +List all config versions for a service. + +```bash +configctl list +``` + +```bash +./bin/configctl list payment-service +``` + +--- + +## validate + +Validate a config file without uploading it. + +```bash +configctl validate --file --format +``` + +```bash +./bin/configctl validate payment-service --file config.yaml --format yaml +``` + +--- + +## delete + +Delete a config by ID. + +```bash +configctl delete +``` + +```bash +./bin/configctl delete payment-service-v1 +``` + +--- + +## version + +Print the CLI version. + +```bash +./bin/configctl version +``` + +--- + +## Typical Workflow + +```bash +# 1. Upload new config +./bin/configctl upload payment-service --file config.yaml --format yaml +# → payment-service-v5 + +# 2. Canary rollout to 10% +./bin/configctl rollout payment-service-v5 --strategy CANARY + +# 3. Check status +./bin/configctl status payment-service-v5 +# → IN_PROGRESS, 10% + +# 4a. Looks good — promote to all +./bin/configctl promote payment-service-v5 + +# 4b. Something wrong — rollback +./bin/configctl rollback payment-service --to-version 0 +``` diff --git a/cmd/configctl/main.go b/cmd/configctl/main.go index e78d061..0a0c277 100644 --- a/cmd/configctl/main.go +++ b/cmd/configctl/main.go @@ -36,6 +36,8 @@ Push configuration changes to thousands of services in seconds.`, rootCmd.AddCommand(commands.NewListCommand()) rootCmd.AddCommand(commands.NewDeleteCommand()) rootCmd.AddCommand(commands.NewValidateCommand()) + rootCmd.AddCommand(commands.NewRolloutCommand()) + rootCmd.AddCommand(commands.NewPromoteCommand()) rootCmd.AddCommand(commands.NewRollbackCommand()) rootCmd.AddCommand(commands.NewStatusCommand()) rootCmd.AddCommand(commands.NewVersionCommand()) diff --git a/include/configclient/config_client.h b/include/configclient/config_client.h index c4c9a00..4515cdb 100644 --- a/include/configclient/config_client.h +++ b/include/configclient/config_client.h @@ -42,13 +42,16 @@ class ConfigClient { /** * @brief Construct a new Config Client * - * @param server_address Distribution service address (e.g., "localhost:8082") - * @param service_name Name of this service - * @param instance_id Unique instance identifier (auto-generated if empty) - * @param cache_dir Directory for disk cache (default: ~/.konfig/cache/) + * @param server_address Distribution service address (e.g., "localhost:8082") + * @param service_name Name of this service + * @param instance_id Unique instance identifier (auto-generated if empty) + * @param cache_dir Directory for disk cache (default: ~/.konfig/cache/) + * @param heartbeat_interval_seconds How often to send heartbeats (default: 30s) + * @param max_heartbeat_failures Consecutive failures before reconnecting (default: 3) */ ConfigClient(const std::string& server_address, const std::string& service_name, - const std::string& instance_id = "", const std::string& cache_dir = ""); + const std::string& instance_id = "", const std::string& cache_dir = "", + int heartbeat_interval_seconds = 30, int max_heartbeat_failures = 3); ~ConfigClient(); diff --git a/include/configclient/config_client_impl.h b/include/configclient/config_client_impl.h index 15f79cb..ce93012 100644 --- a/include/configclient/config_client_impl.h +++ b/include/configclient/config_client_impl.h @@ -19,7 +19,8 @@ namespace configservice { class ConfigClientImpl { public: ConfigClientImpl(const std::string& server_address, const std::string& service_name, - const std::string& instance_id, const std::string& cache_dir = ""); + const std::string& instance_id, const std::string& cache_dir = "", + int heartbeat_interval_seconds = 30, int max_heartbeat_failures = 3); ~ConfigClientImpl(); @@ -39,6 +40,7 @@ class ConfigClientImpl { private: void StreamLoop(); void ConnectAndSubscribe(); + void HeartbeatLoop(); void HandleConfigUpdate(const ConfigUpdate& update); void SetConnectionStatus(bool connected); @@ -73,6 +75,12 @@ class ConfigClientImpl { std::mutex shutdown_mutex_; std::condition_variable shutdown_cv_; + std::unique_ptr heartbeat_thread_; + std::mutex heartbeat_mutex_; + std::condition_variable heartbeat_cv_; + int heartbeat_interval_seconds_; + int max_heartbeat_failures_; + static constexpr int kReconnectDelaySeconds = 5; }; diff --git a/include/distribution_service/database_manager.h b/include/distribution_service/database_manager.h index 1b3e414..c126e6a 100644 --- a/include/distribution_service/database_manager.h +++ b/include/distribution_service/database_manager.h @@ -9,6 +9,13 @@ namespace configservice { +struct RolloutInfo { + int strategy = 0; // 0=ALL_AT_ONCE, 1=CANARY, 2=PERCENTAGE + int32_t target_percentage = 100; + std::string status; // "PENDING", "IN_PROGRESS", "COMPLETED", "FAILED" + bool found = false; +}; + class DatabaseManager { public: explicit DatabaseManager(const PostgresConfig& config); @@ -19,7 +26,9 @@ class DatabaseManager { // Config operations ConfigData GetLatestConfig(const std::string& service_name); + ConfigData GetLatestRolledOutConfig(const std::string& service_name); ConfigData GetConfigByVersion(const std::string& service_name, int64_t version); + ConfigData GetConfigById(const std::string& config_id); std::vector ListConfigs(const std::string& service_name, int limit); // Client status operations @@ -29,6 +38,13 @@ class DatabaseManager { bool RecordConfigDelivery(const std::string& service_name, const std::string& instance_id, int64_t version); + // Rollout operations + RolloutInfo GetRolloutInfo(const std::string& config_id); + bool UpdateRolloutProgress(const std::string& config_id, int32_t current_pct, + const std::string& status); + // Returns list of (config_id, service_name) for all IN_PROGRESS rollouts + std::vector> GetPendingRollouts(); + private: PostgresConfig config_; std::unique_ptr conn_; diff --git a/include/distribution_service/distribution_service.h b/include/distribution_service/distribution_service.h index 678ff88..40419d7 100644 --- a/include/distribution_service/distribution_service.h +++ b/include/distribution_service/distribution_service.h @@ -4,11 +4,14 @@ #include +#include #include +#include #include #include #include #include +#include #include "cache_manager.h" #include "database_manager.h" @@ -22,9 +25,11 @@ struct ClientInfo { std::string service_name; std::string instance_id; int64_t current_version; + grpc::ServerContext* context; // needed to cancel stream on timeout grpc::ServerReaderWriter* stream; std::chrono::steady_clock::time_point last_heartbeat; std::atomic active; + std::mutex write_mutex; // serializes all stream->Write() calls }; // Note: Class name is DistributionServiceImpl to avoid conflict with proto-generated @@ -61,20 +66,38 @@ class DistributionServiceImpl final : public DistributionService::Service { std::atomic running_; std::unique_ptr heartbeat_thread_; + // Rollout consumer + std::unique_ptr rollout_consumer_; + std::unique_ptr rollout_thread_; + // Helper methods ConfigData FetchConfig(const std::string& service_name, int64_t version); bool SendConfigToClient(std::shared_ptr client, const ConfigData& config); void RegisterClient(const std::string& key, std::shared_ptr client); void UnregisterClient(const std::string& key); size_t GetActiveClientCount(); + std::vector> GetClientsForService(const std::string& service_name); // Heartbeat monitoring void StartHeartbeatMonitor(); void StopHeartbeatMonitor(); void HeartbeatMonitorLoop(); + // Rollout consumer + void StartRolloutConsumer(); + void StopRolloutConsumer(); + void RolloutConsumerLoop(); + + // Rollout execution + void ExecuteRollout(const std::string& service_name, const std::string& config_id); + void PollPendingRollouts(); + // Metrics void UpdateMetrics(); + + // Utilities + static std::string ExtractJsonString(const std::string& json, const std::string& key); + static int64_t ExtractJsonInt(const std::string& json, const std::string& key); }; } // namespace configservice \ No newline at end of file diff --git a/internal/commands/promote.go b/internal/commands/promote.go new file mode 100644 index 0000000..1e2bdf2 --- /dev/null +++ b/internal/commands/promote.go @@ -0,0 +1,85 @@ +package commands + +import ( + "context" + "fmt" + "os" + "time" + + "github.com/codec404/Konfig/pkg/apiclient" + pb "github.com/codec404/Konfig/pkg/pb" + "github.com/spf13/cobra" +) + +func NewPromoteCommand() *cobra.Command { + var server string + + cmd := &cobra.Command{ + Use: "promote [config-id]", + Short: "Promote a CANARY rollout to all instances", + Long: `Promote a CANARY rollout that is IN_PROGRESS to all connected instances. + +This pushes the configuration to all remaining instances and marks the rollout +as COMPLETED. Use this after verifying the canary instances look healthy. + +Example: + configctl promote my-service-v5`, + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + configID := args[0] + + if server == "" { + server = os.Getenv("KONFIG_SERVER") + if server == "" { + server = "localhost:8081" + } + } + + client, err := apiclient.NewClient(server) + if err != nil { + return fmt.Errorf("failed to connect: %w", err) + } + defer client.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // Check current status first + statusResp, err := client.GetRolloutStatus(ctx, configID) + if err != nil { + return fmt.Errorf("failed to get rollout status: %w", err) + } + if statusResp.Success && statusResp.RolloutState != nil { + s := statusResp.RolloutState.Status.String() + if s != "IN_PROGRESS" { + return fmt.Errorf("rollout is %s, can only promote IN_PROGRESS rollouts", s) + } + if statusResp.RolloutState.Strategy.String() != "CANARY" { + return fmt.Errorf("rollout strategy is %s, promote is only for CANARY rollouts", + statusResp.RolloutState.Strategy.String()) + } + } + + fmt.Printf("Promoting %s to all instances...\n", configID) + + resp, err := client.StartRollout(ctx, &pb.StartRolloutRequest{ + ConfigId: configID, + Strategy: pb.RolloutStrategy_ALL_AT_ONCE, + TargetPercentage: 100, + }) + if err != nil { + return fmt.Errorf("promote failed: %w", err) + } + if !resp.Success { + return fmt.Errorf("promote failed: %s", resp.Message) + } + + fmt.Println("✓ Promotion started — pushing to all instances") + fmt.Printf(" Check progress with: configctl status %s\n", configID) + return nil + }, + } + + cmd.Flags().StringVar(&server, "server", "", "API server address (default: localhost:8081)") + return cmd +} diff --git a/internal/commands/rollout.go b/internal/commands/rollout.go new file mode 100644 index 0000000..e8179ed --- /dev/null +++ b/internal/commands/rollout.go @@ -0,0 +1,106 @@ +package commands + +import ( + "context" + "fmt" + "os" + "strings" + "time" + + "github.com/codec404/Konfig/pkg/apiclient" + pb "github.com/codec404/Konfig/pkg/pb" + "github.com/spf13/cobra" +) + +func NewRolloutCommand() *cobra.Command { + var ( + strategy string + percentage int32 + server string + ) + + cmd := &cobra.Command{ + Use: "rollout [config-id]", + Short: "Start a rollout for a configuration", + Long: `Start a rollout to distribute a configuration to service instances. + +Strategies: + ALL_AT_ONCE Push to all connected instances immediately (default) + CANARY Push to ~10% of instances first; stays IN_PROGRESS for review + PERCENTAGE Push to a specific percentage of instances + +Examples: + configctl rollout my-service-v2 --strategy ALL_AT_ONCE + configctl rollout my-service-v2 --strategy CANARY + configctl rollout my-service-v2 --strategy PERCENTAGE --percentage 50`, + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + configID := args[0] + + if server == "" { + server = os.Getenv("KONFIG_SERVER") + if server == "" { + server = "localhost:8081" + } + } + + client, err := apiclient.NewClient(server) + if err != nil { + return fmt.Errorf("failed to connect: %w", err) + } + defer client.Close() + + // Map strategy string to proto enum + var rolloutStrategy pb.RolloutStrategy + switch strings.ToUpper(strategy) { + case "ALL_AT_ONCE", "": + rolloutStrategy = pb.RolloutStrategy_ALL_AT_ONCE + case "CANARY": + rolloutStrategy = pb.RolloutStrategy_CANARY + case "PERCENTAGE": + rolloutStrategy = pb.RolloutStrategy_PERCENTAGE + if percentage <= 0 || percentage > 100 { + return fmt.Errorf("--percentage must be between 1 and 100") + } + default: + return fmt.Errorf("unknown strategy %q (valid: ALL_AT_ONCE, CANARY, PERCENTAGE)", strategy) + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + fmt.Printf("Starting rollout for %s (strategy: %s", configID, strings.ToUpper(strategy)) + if rolloutStrategy == pb.RolloutStrategy_PERCENTAGE { + fmt.Printf(", target: %d%%", percentage) + } + fmt.Println(")") + + resp, err := client.StartRollout(ctx, &pb.StartRolloutRequest{ + ConfigId: configID, + Strategy: rolloutStrategy, + TargetPercentage: percentage, + }) + if err != nil { + return fmt.Errorf("rollout failed: %w", err) + } + + if !resp.Success { + return fmt.Errorf("rollout failed: %s", resp.Message) + } + + fmt.Println("✓ Rollout started") + fmt.Printf(" Rollout ID: %s\n", resp.RolloutId) + fmt.Printf(" %s\n", resp.Message) + fmt.Println() + fmt.Printf("Check progress with: configctl status %s\n", configID) + + return nil + }, + } + + cmd.Flags().StringVar(&strategy, "strategy", "ALL_AT_ONCE", "Rollout strategy (ALL_AT_ONCE|CANARY|PERCENTAGE)") + cmd.Flags().Int32Var(&percentage, "percentage", 100, "Target percentage for PERCENTAGE strategy") + cmd.Flags().StringVar(&server, "server", "", "API server address (default: localhost:8081)") + + return cmd +} diff --git a/src/api-service/README.md b/src/api-service/README.md index f340ed8..4fc26f5 100644 --- a/src/api-service/README.md +++ b/src/api-service/README.md @@ -178,4 +178,4 @@ include/api_service/ - [Database Schema](../../db/migrations/) - [Validation Service](../validation-service/README.md) - [CLI Tool](../../cmd/configctl/) -- [Commands Reference](../../COMMANDS.md) +- [CLI Reference](../../cmd/configctl/README.md) diff --git a/src/api-service/api_service.cpp b/src/api-service/api_service.cpp index 8c76d0d..2fc3391 100644 --- a/src/api-service/api_service.cpp +++ b/src/api-service/api_service.cpp @@ -542,7 +542,13 @@ bool ApiServiceImpl::PublishEvent(const std::string& event_type, const std::stri return false; } - kafka_producer_->poll(0); + // poll(0) triggers internal send; for rollout/rollback events flush immediately + // so the distribution service receives them with minimal delay + if (event_type == "config.rollout_started" || event_type == "config.rolled_back") { + kafka_producer_->flush(100 /*ms*/); + } else { + kafka_producer_->poll(0); + } return true; } diff --git a/src/client-sdk/README.md b/src/client-sdk/README.md new file mode 100644 index 0000000..d06b353 --- /dev/null +++ b/src/client-sdk/README.md @@ -0,0 +1,105 @@ +# Client SDK + +The C++ client SDK (`libconfigclient`) lets services subscribe to real-time config updates with automatic reconnection, heartbeating, and disk caching. + +## Building + +```bash +make sdk # builds lib/libconfigclient.a and lib/libconfigclient.so +make dev-sdk # build inside the dev container (Linux) +``` + +Headers are in `include/configclient/`. Libraries are in `lib/` after building. + +## Constructor + +``` +ConfigClient(server_address, service_name, instance_id, cache_dir, + heartbeat_interval_seconds, max_heartbeat_failures) +``` + +| Parameter | Default | Description | +|-----------|---------|-------------| +| `server_address` | — | Distribution service address, e.g. `distribution-service:8082` | +| `service_name` | — | Name of the service subscribing to configs | +| `instance_id` | `""` | Unique instance identifier (defaults to hostname) | +| `cache_dir` | `""` | Directory for disk cache (defaults to `~/.konfig/cache/`) | +| `heartbeat_interval_seconds` | `30` | How often to send keep-alive heartbeats | +| `max_heartbeat_failures` | `3` | Consecutive failures before reconnecting | + +## Lifecycle + +| Method | Description | +|--------|-------------| +| `Start()` | Loads disk cache, then connects and subscribes. Returns `false` if already running. | +| `Stop()` | Cancels the stream, joins threads, shuts down cleanly. | +| `IsConnected()` | Returns `true` when the gRPC stream is active. | +| `GetCurrentConfig()` | Thread-safe access to the latest `ConfigData`. | +| `GetCurrentVersion()` | Thread-safe access to the current config version. | + +## Callbacks + +Register callbacks before calling `Start()`: + +| Callback | Signature | When called | +|----------|-----------|-------------| +| `OnConfigUpdate` | `void(const ConfigData&)` | Immediately from disk cache on startup, then on every live update | +| `OnConnectionStatus` | `void(bool connected)` | When the connection state changes | + +## Heartbeat + +The client sends periodic heartbeats over the gRPC stream to keep the connection alive. If `max_heartbeat_failures` consecutive writes fail, the stream is cancelled and reconnection starts automatically. + +**Conservative** (long interval, tolerant of transient failures): +- `heartbeat_interval_seconds = 60`, `max_heartbeat_failures = 5` + +**Aggressive** (detect dead connections faster): +- `heartbeat_interval_seconds = 10`, `max_heartbeat_failures = 2` + +## Disk Cache + +On every config update the SDK writes a binary cache to `~/.konfig/cache/.cache`. On the next startup the cached config is served **before** the gRPC connection is established. + +| Scenario | Behaviour | +|----------|-----------| +| First start, server up | No cache — waits for live stream | +| Restart, server up | Serves cache immediately, then receives live updates | +| Restart, server down | Serves cache, retries connection every 5 s | +| Corrupted cache | Discards file, falls back to live stream | + +## Reconnection + +The SDK runs a background stream thread. On disconnect (server restart, network issue, or heartbeat timeout) it waits 5 seconds and reconnects automatically. On reconnect, it sends its current version so the server only pushes the config if a newer one exists. + +## Linking + +```makefile +$(CXX) $(CXXFLAGS) $(INCLUDES) $(LDFLAGS) my_service.cpp \ + lib/libconfigclient.a -lgrpc++ -lgrpc -lprotobuf -lpthread -o bin/my_service +``` + +## Code Structure + +``` +src/client-sdk/ +├── config_client.cpp # Public ConfigClient wrapper +├── config_client_impl.cpp # Stream thread + heartbeat thread +└── disk_cache.cpp # Binary cache read/write + +include/configclient/ +├── config_client.h # Public API +├── config_client_impl.h # Implementation header +└── disk_cache.h # DiskCache header +``` + +## Example + +See `examples/simple_client/` for a complete working example. + +```bash +# Build (inside dev container) +make dev-build + +# Run against a local distribution service +./bin/simple_client localhost:8082 my-service +``` diff --git a/src/client-sdk/config_client.cpp b/src/client-sdk/config_client.cpp index e331ff3..f265020 100644 --- a/src/client-sdk/config_client.cpp +++ b/src/client-sdk/config_client.cpp @@ -22,11 +22,13 @@ std::string GenerateInstanceId() { } // anonymous namespace ConfigClient::ConfigClient(const std::string& server_address, const std::string& service_name, - const std::string& instance_id, const std::string& cache_dir) + const std::string& instance_id, const std::string& cache_dir, + int heartbeat_interval_seconds, int max_heartbeat_failures) : server_address_(server_address), service_name_(service_name), instance_id_(instance_id.empty() ? GenerateInstanceId() : instance_id) { impl_ = - std::make_unique(server_address_, service_name_, instance_id_, cache_dir); + std::make_unique(server_address_, service_name_, instance_id_, cache_dir, + heartbeat_interval_seconds, max_heartbeat_failures); } ConfigClient::~ConfigClient() { diff --git a/src/client-sdk/config_client_impl.cpp b/src/client-sdk/config_client_impl.cpp index 78af8de..ec5fc04 100644 --- a/src/client-sdk/config_client_impl.cpp +++ b/src/client-sdk/config_client_impl.cpp @@ -7,9 +7,12 @@ namespace configservice { ConfigClientImpl::ConfigClientImpl(const std::string& server_address, const std::string& service_name, const std::string& instance_id, - const std::string& cache_dir) + const std::string& cache_dir, int heartbeat_interval_seconds, + int max_heartbeat_failures) : server_address_(server_address), service_name_(service_name), instance_id_(instance_id), - current_version_(0), running_(false), connected_(false) { + current_version_(0), running_(false), connected_(false), + heartbeat_interval_seconds_(heartbeat_interval_seconds), + max_heartbeat_failures_(max_heartbeat_failures) { // Create gRPC channel channel_ = grpc::CreateChannel(server_address_, grpc::InsecureChannelCredentials()); stub_ = DistributionService::NewStub(channel_); @@ -46,6 +49,9 @@ bool ConfigClientImpl::Start() { // Start stream thread stream_thread_ = std::make_unique(&ConfigClientImpl::StreamLoop, this); + // Start heartbeat thread + heartbeat_thread_ = std::make_unique(&ConfigClientImpl::HeartbeatLoop, this); + return true; } @@ -62,13 +68,17 @@ void ConfigClientImpl::Stop() { context_->TryCancel(); } - // Wake up thread + // Wake up threads shutdown_cv_.notify_all(); + heartbeat_cv_.notify_all(); - // Wait for thread + // Wait for threads if (stream_thread_ && stream_thread_->joinable()) { stream_thread_->join(); } + if (heartbeat_thread_ && heartbeat_thread_->joinable()) { + heartbeat_thread_->join(); + } SetConnectionStatus(false); std::cout << "[ConfigClient] Client stopped" << std::endl; @@ -117,6 +127,50 @@ void ConfigClientImpl::StreamLoop() { } } +void ConfigClientImpl::HeartbeatLoop() { + int consecutive_failures = 0; + + while (running_) { + // Wait for the heartbeat interval (or until stopped) + { + std::unique_lock lock(heartbeat_mutex_); + heartbeat_cv_.wait_for(lock, std::chrono::seconds(heartbeat_interval_seconds_), + [this] { return !running_.load(); }); + } + + if (!running_) + break; + + // Only send heartbeats when connected + if (!connected_) { + consecutive_failures = 0; + continue; + } + + SubscribeRequest heartbeat; + heartbeat.set_service_name(service_name_); + heartbeat.set_instance_id(instance_id_); + heartbeat.set_current_version(GetCurrentVersion()); + + if (stream_ && stream_->Write(heartbeat)) { + consecutive_failures = 0; + } else { + consecutive_failures++; + std::cerr << "[ConfigClient] Heartbeat failed (" << consecutive_failures << "/" + << max_heartbeat_failures_ << ")" << std::endl; + + if (consecutive_failures >= max_heartbeat_failures_) { + std::cerr << "[ConfigClient] Max heartbeat failures reached, reconnecting..." + << std::endl; + consecutive_failures = 0; + if (context_) { + context_->TryCancel(); + } + } + } + } +} + void ConfigClientImpl::ConnectAndSubscribe() { // Create new context context_ = std::make_unique(); diff --git a/src/distribution-service/README.md b/src/distribution-service/README.md index 6e5a293..68fa673 100644 --- a/src/distribution-service/README.md +++ b/src/distribution-service/README.md @@ -9,12 +9,12 @@ The Distribution Service acts as the push mechanism in the configuration managem ### Key Features - Real-time bidirectional gRPC streaming for instant config delivery +- Rollout strategy execution: `ALL_AT_ONCE`, `CANARY`, `PERCENTAGE` +- Kafka consumer for rollout events (with DB polling as an idempotent catch-up) - Redis-based caching to reduce database load -- Client health monitoring with heartbeat mechanism -- Kafka event publishing for lifecycle events -- StatsD metrics for monitoring -- Audit logging for all config deliveries -- Graceful client disconnection handling +- Heartbeat monitor evicts timed-out clients and cancels their streams +- Version ordering: clients are never downgraded to an older config +- StatsD metrics, audit logging, and graceful disconnection handling ## gRPC API @@ -84,14 +84,34 @@ Defined in `proto/distribution.proto` as `DistributionService`: → client_disconnect event published to Kafka ``` +## Rollout Execution + +Rollouts are triggered in two ways: + +1. **Kafka event** — The API service publishes a `config.rollout_started` event after `StartRollout`. The distribution service consumes this immediately and begins pushing configs to the appropriate set of instances. +2. **DB polling** — Every 30 seconds, the service polls `rollout_state` for any `IN_PROGRESS` or `PENDING` rollouts. This handles the Kafka consumer rebalance window (2–3 s gap at startup) and acts as an idempotent catch-up mechanism. + +### Version Ordering + +When pushing a config to a client, the distribution service skips any client whose `current_version` is already equal to or greater than the rollout version. This prevents clients from being downgraded when a periodic poll re-executes an older rollout. + +### Rollout Strategies + +| Strategy | Behaviour | +|----------|-----------| +| `ALL_AT_ONCE` | Pushed to all connected instances. Completed when all instances receive the config (or no instances are connected). | +| `CANARY` | Pushed to ~10% of instances. Stays `IN_PROGRESS` until `configctl promote` is called. If no clients are connected, stays `IN_PROGRESS` (does not auto-complete). | +| `PERCENTAGE` | Pushed to the specified fraction of instances. Completed when the target percentage is reached. | + ## Components ### `distribution_service.cpp` Core gRPC service: -- `Subscribe()` - Bidirectional streaming, config delivery, heartbeat monitoring -- `ReportHealth()` - Processes health reports for rollout decisions -- `Heartbeat()` - Connection keepalive with timeout detection (90s) +- `Subscribe()` — Bidirectional streaming. Registers the client, sends the latest rolled-out config, then reads heartbeats +- `ExecuteRollout()` — Pushes a config to the appropriate subset of instances based on strategy +- `PollPendingRollouts()` — DB catch-up: re-runs any open rollouts on startup and every 30 s +- `HeartbeatMonitorLoop()` — Evicts clients that have not sent a heartbeat within the timeout, cancels their stream context ### `database_manager.cpp` @@ -255,7 +275,8 @@ include/distribution_service/ ## Related - [Proto Definition](../../proto/distribution.proto) -- [Client SDK](../client-sdk/) +- [Client SDK](../client-sdk/README.md) - [Database Schema](../../db/migrations/) - [API Service](../api-service/README.md) - [Commands Reference](../../COMMANDS.md) +- [CLI Reference](../../cmd/configctl/README.md) diff --git a/src/distribution-service/config.cpp b/src/distribution-service/config.cpp index 21a2079..d690b4b 100644 --- a/src/distribution-service/config.cpp +++ b/src/distribution-service/config.cpp @@ -58,13 +58,16 @@ ServiceConfig ServiceConfig::LoadFromFile(const std::string& config_file) { config.statsd.prefix = statsd["prefix"].as("distribution"); } - // Monitoring + // Monitoring — parse "30s" / "90s" duration strings if (yaml["monitoring"]) { auto mon = yaml["monitoring"]; + auto parse_seconds = [](const std::string& s) -> int { + return std::stoi(s); // stoi stops at the first non-digit ('s') + }; config.monitoring.heartbeat_interval_seconds = - mon["heartbeat_interval"].as("30s")[0] - '0'; + parse_seconds(mon["heartbeat_interval"].as("30s")); config.monitoring.heartbeat_timeout_seconds = - mon["heartbeat_timeout"].as("90s")[0] - '0'; + parse_seconds(mon["heartbeat_timeout"].as("90s")); } // Logging diff --git a/src/distribution-service/database_manager.cpp b/src/distribution-service/database_manager.cpp index 1bdd0fd..ddb91c5 100644 --- a/src/distribution-service/database_manager.cpp +++ b/src/distribution-service/database_manager.cpp @@ -98,6 +98,63 @@ ConfigData DatabaseManager::GetLatestConfig(const std::string& service_name) { } } +ConfigData DatabaseManager::GetLatestRolledOutConfig(const std::string& service_name) { + std::lock_guard lock(mutex_); + + if (!initialized_) { + throw std::runtime_error("Database not initialized"); + } + + try { + pqxx::work txn(*conn_); + + // Latest version that has a COMPLETED rollout + pqxx::result r = + txn.exec_params("SELECT m.config_id, m.service_name, m.version, m.format, d.content, " + " m.created_at, m.created_by " + "FROM config_metadata m " + "JOIN config_data d ON m.config_id = d.config_id " + "JOIN rollout_state rs ON rs.config_id = m.config_id " + "WHERE m.service_name = $1 AND rs.status = 'COMPLETED' " + "ORDER BY m.version DESC LIMIT 1", + service_name); + + if (r.empty()) { + // No completed rollout — fall back to absolute latest + // (handles first-time setup before any rollout has been run) + pqxx::result r2 = txn.exec_params( + "SELECT m.config_id, m.service_name, m.version, m.format, d.content, " + " m.created_at, m.created_by " + "FROM config_metadata m " + "JOIN config_data d ON m.config_id = d.config_id " + "WHERE m.service_name = $1 " + "ORDER BY m.version DESC LIMIT 1", + service_name); + + if (r2.empty()) { + ConfigData config; + config.set_service_name(service_name); + config.set_version(0); + return config; + } + auto result = ParseConfigRow(r2[0]); + txn.commit(); + return result; + } + + auto result = ParseConfigRow(r[0]); + txn.commit(); + + std::cout << "[DB] Latest rolled-out config: " << service_name << " v" << result.version() + << std::endl; + return result; + + } catch (const std::exception& e) { + std::cerr << "[DB] GetLatestRolledOutConfig failed: " << e.what() << std::endl; + throw; + } +} + ConfigData DatabaseManager::GetConfigByVersion(const std::string& service_name, int64_t version) { std::lock_guard lock(mutex_); @@ -238,6 +295,133 @@ bool DatabaseManager::RecordConfigDelivery(const std::string& service_name, } } +ConfigData DatabaseManager::GetConfigById(const std::string& config_id) { + std::lock_guard lock(mutex_); + + if (!initialized_) { + throw std::runtime_error("Database not initialized"); + } + + try { + pqxx::work txn(*conn_); + + pqxx::result r = + txn.exec_params("SELECT m.config_id, m.service_name, m.version, m.format, d.content, " + " m.created_at, m.created_by " + "FROM config_metadata m " + "JOIN config_data d ON m.config_id = d.config_id " + "WHERE m.config_id = $1", + config_id); + + if (r.empty()) { + ConfigData config; + config.set_config_id(config_id); + config.set_version(0); + return config; + } + + auto result = ParseConfigRow(r[0]); + txn.commit(); + return result; + + } catch (const std::exception& e) { + std::cerr << "[DB] GetConfigById failed: " << e.what() << std::endl; + throw; + } +} + +RolloutInfo DatabaseManager::GetRolloutInfo(const std::string& config_id) { + std::lock_guard lock(mutex_); + RolloutInfo info; + + if (!initialized_) { + return info; + } + + try { + pqxx::work txn(*conn_); + + pqxx::result r = txn.exec_params("SELECT strategy, target_percentage, status " + "FROM rollout_state WHERE config_id = $1", + config_id); + + if (!r.empty()) { + info.strategy = r[0]["strategy"].as(); + info.target_percentage = r[0]["target_percentage"].as(); + info.status = r[0]["status"].as(); + info.found = true; + } + + txn.commit(); + + } catch (const std::exception& e) { + std::cerr << "[DB] GetRolloutInfo failed: " << e.what() << std::endl; + } + + return info; +} + +bool DatabaseManager::UpdateRolloutProgress(const std::string& config_id, int32_t current_pct, + const std::string& status) { + std::lock_guard lock(mutex_); + + if (!initialized_) { + return false; + } + + try { + pqxx::work txn(*conn_); + + std::string sql = "UPDATE rollout_state " + "SET current_percentage = $2, status = $3"; + if (status == "COMPLETED" || status == "FAILED") { + sql += ", completed_at = NOW()"; + } + sql += " WHERE config_id = $1"; + + txn.exec_params(sql, config_id, current_pct, status); + txn.commit(); + + std::cout << "[DB] Rollout progress: " << config_id << " → " << current_pct << "% (" + << status << ")" << std::endl; + return true; + + } catch (const std::exception& e) { + std::cerr << "[DB] UpdateRolloutProgress failed: " << e.what() << std::endl; + return false; + } +} + +std::vector> DatabaseManager::GetPendingRollouts() { + std::lock_guard lock(mutex_); + std::vector> result; + + if (!initialized_) + return result; + + try { + pqxx::work txn(*conn_); + + pqxx::result r = txn.exec("SELECT rs.config_id, cm.service_name " + "FROM rollout_state rs " + "JOIN config_metadata cm ON rs.config_id = cm.config_id " + "WHERE rs.status = 'IN_PROGRESS' " + "ORDER BY rs.started_at ASC"); + + for (const auto& row : r) { + result.emplace_back(row["config_id"].as(), + row["service_name"].as()); + } + + txn.commit(); + + } catch (const std::exception& e) { + std::cerr << "[DB] GetPendingRollouts failed: " << e.what() << std::endl; + } + + return result; +} + ConfigData DatabaseManager::ParseConfigRow(const pqxx::row& row) { ConfigData config; diff --git a/src/distribution-service/distribution_service.cpp b/src/distribution-service/distribution_service.cpp index 3503146..d107cd9 100644 --- a/src/distribution-service/distribution_service.cpp +++ b/src/distribution-service/distribution_service.cpp @@ -1,7 +1,9 @@ #include "distribution_service/distribution_service.h" +#include #include #include +#include namespace configservice { @@ -57,6 +59,9 @@ bool DistributionServiceImpl::Initialize() { // Start heartbeat monitor StartHeartbeatMonitor(); + // Start rollout consumer + StartRolloutConsumer(); + std::cout << "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━" << std::endl; std::cout << "[DistributionService] ✓ Service initialized successfully" << std::endl; std::cout << std::endl; @@ -70,6 +75,9 @@ void DistributionServiceImpl::Shutdown() { // Stop heartbeat monitor StopHeartbeatMonitor(); + // Stop rollout consumer + StopRolloutConsumer(); + // Disconnect all clients { std::lock_guard lock(clients_mutex_); @@ -116,6 +124,7 @@ grpc::Status DistributionServiceImpl::Subscribe( client->service_name = service_name; client->instance_id = instance_id; client->current_version = current_version; + client->context = context; client->stream = stream; client->last_heartbeat = std::chrono::steady_clock::now(); client->active = true; @@ -143,7 +152,10 @@ grpc::Status DistributionServiceImpl::Subscribe( // Fetch and send config if needed try { auto start = std::chrono::steady_clock::now(); - ConfigData config = FetchConfig(service_name, -1); // -1 = latest + // Only send the latest *rolled-out* version on connect, not the latest uploaded. + // This ensures uploads don't bypass rollout strategies. + ConfigData config = + db_ ? db_->GetLatestRolledOutConfig(service_name) : FetchConfig(service_name, -1); auto end = std::chrono::steady_clock::now(); auto duration = std::chrono::duration_cast(end - start); @@ -190,9 +202,13 @@ grpc::Status DistributionServiceImpl::Subscribe( ConfigUpdate heartbeat; heartbeat.set_update_type(HEARTBEAT_ACK); - if (!stream->Write(heartbeat)) { - std::cout << "[DistributionService] Client disconnected: " << instance_id << std::endl; - break; + { + std::lock_guard write_lock(client->write_mutex); + if (!stream->Write(heartbeat)) { + std::cout << "[DistributionService] Client disconnected: " << instance_id + << std::endl; + break; + } } } @@ -275,6 +291,7 @@ bool DistributionServiceImpl::SendConfigToClient(std::shared_ptr cli update.set_update_type(NEW_CONFIG); update.set_force_reload(config.version() > client->current_version); + std::lock_guard write_lock(client->write_mutex); if (client->stream->Write(update)) { std::cout << "[DistributionService] Sent config v" << config.version() << " to " << client->instance_id << std::endl; @@ -363,6 +380,12 @@ void DistributionServiceImpl::HeartbeatMonitorLoop() { metrics_->RecordHeartbeatTimeout(); } + // Cancel the gRPC context so stream->Read() unblocks in Subscribe + auto it = active_clients_.find(key); + if (it != active_clients_.end() && it->second->context) { + it->second->context->TryCancel(); + } + active_clients_.erase(key); } } @@ -378,9 +401,263 @@ void DistributionServiceImpl::UpdateMetrics() { size_t active_count = GetActiveClientCount(); metrics_->SetActiveClients(active_count); +} + +// ─── Rollout consumer ──────────────────────────────────────────────────────── + +void DistributionServiceImpl::StartRolloutConsumer() { + std::string errstr; + + // Build broker list + std::ostringstream brokers; + for (size_t i = 0; i < config_.kafka.brokers.size(); ++i) { + if (i > 0) + brokers << ","; + brokers << config_.kafka.brokers[i]; + } - // Update cache hit rate (simplified - would need counters for real implementation) - // metrics_->SetCacheHitRate(0.85f); + RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); + conf->set("bootstrap.servers", brokers.str(), errstr); + conf->set("group.id", "distribution-service-rollout", errstr); + conf->set("auto.offset.reset", "latest", errstr); + conf->set("enable.auto.commit", "true", errstr); + + rollout_consumer_.reset(RdKafka::KafkaConsumer::create(conf, errstr)); + delete conf; + + if (!rollout_consumer_) { + std::cerr << "[DistributionService] ✗ Failed to create rollout consumer: " << errstr + << std::endl; + return; + } + + rollout_consumer_->subscribe({config_.kafka.topic}); + std::cout << "[DistributionService] ✓ Rollout consumer subscribed to topic: " + << config_.kafka.topic << std::endl; + + rollout_thread_ = + std::make_unique(&DistributionServiceImpl::RolloutConsumerLoop, this); +} + +void DistributionServiceImpl::StopRolloutConsumer() { + if (rollout_consumer_) { + rollout_consumer_->close(); + rollout_consumer_.reset(); + } + if (rollout_thread_ && rollout_thread_->joinable()) { + rollout_thread_->join(); + } + std::cout << "[DistributionService] Rollout consumer stopped" << std::endl; +} + +void DistributionServiceImpl::PollPendingRollouts() { + if (!db_) + return; + + auto pending = db_->GetPendingRollouts(); + if (pending.empty()) + return; + + std::cout << "[DistributionService] Found " << pending.size() + << " pending rollout(s) — executing now" << std::endl; + + for (const auto& [config_id, service_name] : pending) { + ExecuteRollout(service_name, config_id); + } +} + +void DistributionServiceImpl::RolloutConsumerLoop() { + // Catch up any rollouts that were IN_PROGRESS before this process started + // (covers the Kafka rebalance timing gap and service restarts) + PollPendingRollouts(); + + constexpr int kPollIntervalMs = 30000; // re-poll DB every 30s as safety net + int elapsed_ms = 0; + + while (running_) { + if (!rollout_consumer_) + break; + + RdKafka::Message* msg = rollout_consumer_->consume(100 /*ms*/); + elapsed_ms += 100; + if (elapsed_ms >= kPollIntervalMs) { + elapsed_ms = 0; + PollPendingRollouts(); + } + + if (msg->err() == RdKafka::ERR_NO_ERROR) { + std::string payload(static_cast(msg->payload()), msg->len()); + + std::string event_type = ExtractJsonString(payload, "event_type"); + if (event_type == "config.rollout_started" || event_type == "config.rolled_back") { + std::string service_name = ExtractJsonString(payload, "service_name"); + int64_t version = ExtractJsonInt(payload, "version"); + + if (!service_name.empty() && version > 0) { + // Reconstruct config_id (matches GenerateConfigId in api-service) + std::string config_id = service_name + "-v" + std::to_string(version); + std::cout << "[DistributionService] Rollout event received: " << event_type + << " config=" << config_id << std::endl; + ExecuteRollout(service_name, config_id); + } + } + } else if (msg->err() != RdKafka::ERR__TIMED_OUT && + msg->err() != RdKafka::ERR__PARTITION_EOF) { + std::cerr << "[DistributionService] Kafka consume error: " << msg->errstr() + << std::endl; + } + + delete msg; + } +} + +// ─── Rollout execution ──────────────────────────────────────────────────────── + +std::vector> DistributionServiceImpl::GetClientsForService( + const std::string& service_name) { + std::lock_guard lock(clients_mutex_); + std::vector> result; + for (auto& [key, client] : active_clients_) { + if (client->service_name == service_name && client->active) { + result.push_back(client); + } + } + // Sort by instance_id for deterministic canary/percentage selection + std::sort(result.begin(), result.end(), + [](const auto& a, const auto& b) { return a->instance_id < b->instance_id; }); + return result; +} + +void DistributionServiceImpl::ExecuteRollout(const std::string& service_name, + const std::string& config_id) { + if (!db_) + return; + + // Fetch rollout parameters + RolloutInfo rollout = db_->GetRolloutInfo(config_id); + if (!rollout.found) { + // No rollout record — treat as ALL_AT_ONCE (e.g. for rollback events) + rollout.strategy = 0; + rollout.target_percentage = 100; + } + + // Fetch the config to push + ConfigData config; + try { + config = db_->GetConfigById(config_id); + } catch (...) { + std::cerr << "[DistributionService] ExecuteRollout: failed to fetch config " << config_id + << std::endl; + return; + } + + if (config.version() == 0) { + std::cerr << "[DistributionService] ExecuteRollout: config not found: " << config_id + << std::endl; + return; + } + + auto clients = GetClientsForService(service_name); + size_t total = clients.size(); + + if (total == 0) { + std::cout << "[DistributionService] ExecuteRollout: no connected clients for " + << service_name << std::endl; + // CANARY stays IN_PROGRESS — wait for clients to connect + // ALL_AT_ONCE / PERCENTAGE with 0 clients: complete trivially (nothing to push) + if (rollout.strategy != 1) { + db_->UpdateRolloutProgress(config_id, 100, "COMPLETED"); + } + return; + } + + size_t target_count = total; // default: ALL_AT_ONCE + + if (rollout.strategy == 1) { + // CANARY: push to ~10% (minimum 1 instance) + target_count = std::max(size_t(1), total * 10 / 100); + std::cout << "[DistributionService] CANARY rollout: pushing to " << target_count << "/" + << total << " instances of " << service_name << std::endl; + } else if (rollout.strategy == 2) { + // PERCENTAGE: push to target_percentage% of instances + target_count = total * static_cast(rollout.target_percentage) / 100; + target_count = std::max(size_t(1), target_count); + std::cout << "[DistributionService] PERCENTAGE rollout: pushing to " << target_count << "/" + << total << " instances (" << rollout.target_percentage << "%) of " + << service_name << std::endl; + } else { + std::cout << "[DistributionService] ALL_AT_ONCE rollout: pushing to all " << total + << " instances of " << service_name << std::endl; + } + + // Push config to selected clients + size_t pushed = 0; + for (size_t i = 0; i < target_count && i < clients.size(); ++i) { + // Skip clients that already have this version or newer + if (clients[i]->current_version >= config.version()) { + pushed++; // count as delivered — they already have it + continue; + } + if (SendConfigToClient(clients[i], config)) { + clients[i]->current_version = config.version(); + pushed++; + if (db_) { + db_->UpdateClientStatus(service_name, clients[i]->instance_id, config.version(), + "connected"); + db_->RecordConfigDelivery(service_name, clients[i]->instance_id, config.version()); + } + if (events_) { + events_->PublishConfigUpdate(service_name, clients[i]->instance_id, + config.version()); + } + } + } + + // Update rollout progress + int32_t current_pct = total > 0 ? static_cast(pushed * 100 / total) : 100; + + std::string new_status; + if (rollout.strategy == 1) { + // CANARY stays IN_PROGRESS — operator promotes or rolls back + new_status = "IN_PROGRESS"; + } else if (rollout.strategy == 2 && current_pct < rollout.target_percentage) { + new_status = "IN_PROGRESS"; + } else { + new_status = "COMPLETED"; + } + + db_->UpdateRolloutProgress(config_id, current_pct, new_status); + + std::cout << "[DistributionService] ✓ Rollout executed: " << pushed << "/" << total + << " instances updated (" << current_pct << "%) status=" << new_status << std::endl; +} + +// ─── JSON utilities ─────────────────────────────────────────────────────────── + +std::string DistributionServiceImpl::ExtractJsonString(const std::string& json, + const std::string& key) { + std::string search = "\"" + key + "\":\""; + auto pos = json.find(search); + if (pos == std::string::npos) + return ""; + pos += search.size(); + auto end = json.find('"', pos); + if (end == std::string::npos) + return ""; + return json.substr(pos, end - pos); +} + +int64_t DistributionServiceImpl::ExtractJsonInt(const std::string& json, const std::string& key) { + std::string search = "\"" + key + "\":"; + auto pos = json.find(search); + if (pos == std::string::npos) + return 0; + pos += search.size(); + try { + return std::stoll(json.substr(pos)); + } catch (...) { + return 0; + } } } // namespace configservice \ No newline at end of file From a16db15ecf81b7947318ef4bb2071a7dbeb6db66 Mon Sep 17 00:00:00 2001 From: saptarshi Date: Fri, 13 Mar 2026 00:32:00 +0530 Subject: [PATCH 2/2] made write call async --- .../distribution_service.cpp | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/src/distribution-service/distribution_service.cpp b/src/distribution-service/distribution_service.cpp index d107cd9..50f2d8e 100644 --- a/src/distribution-service/distribution_service.cpp +++ b/src/distribution-service/distribution_service.cpp @@ -286,6 +286,11 @@ bool DistributionServiceImpl::SendConfigToClient(std::shared_ptr cli return false; } + // Fail fast if the stream is already known to be dead. + if (client->context && client->context->IsCancelled()) { + return false; + } + ConfigUpdate update; *update.mutable_config() = config; update.set_update_type(NEW_CONFIG); @@ -305,6 +310,11 @@ bool DistributionServiceImpl::SendConfigToClient(std::shared_ptr cli return true; } + // Write failed — cancel the context so future calls on this stream fail instantly. + if (client->context) { + client->context->TryCancel(); + } + if (metrics_) { metrics_->RecordConfigFailed(); } @@ -498,7 +508,11 @@ void DistributionServiceImpl::RolloutConsumerLoop() { std::string config_id = service_name + "-v" + std::to_string(version); std::cout << "[DistributionService] Rollout event received: " << event_type << " config=" << config_id << std::endl; - ExecuteRollout(service_name, config_id); + // Run in a detached thread so the consumer loop is never blocked + // by a slow rollout (e.g. dead clients, slow DB writes). + std::thread([this, service_name, config_id]() { + ExecuteRollout(service_name, config_id); + }).detach(); } } } else if (msg->err() != RdKafka::ERR__TIMED_OUT &&