diff --git a/.env b/.env index ea40f63..ff31e82 100644 --- a/.env +++ b/.env @@ -19,7 +19,7 @@ ENABLE_SWAGGER=true # InfluxDB 3 Configuration (for telemetry time-series storage) INFLUX_HOST=http://localhost:8181 -INFLUX_TOKEN=apiv3_fqnVwnfzaVcJMTjm1nPvPAgTOdhoBjHLug6agmOrcdTTt_kIyp6DGnLQv2qWzyZ8WY4gTPGbvBXJtpYpM2bl8A +INFLUX_TOKEN=apiv3_nih6ByqY3a70gCY7k9Mc1w_SEGOWN_N_jEHZZMRqm3N4di-QoCCtMByqNmvcZpJzvdy_4x11ZDcoPMyaytV7vA INFLUX_DATABASE=telemetry # SQLite Configuration (for decision logging) @@ -51,3 +51,13 @@ WEBHOOK_PROCESS_TIMEOUT_MS=15000 # true -> accept legacy body-only signatures when X-Webhook-Timestamp is absent # false -> require timestamp-bound signatures only WEBHOOK_ACCEPT_LEGACY_SIGNATURE=true + +# Drill Director / Kubernetes execution (local dev; kubeconfig copied from VM) +DRILLS_KUBECONFIG_PATH=/Users/teransarathchandra/Development/Sliit/Y04S02/Research New/analysis-engine/.drills-kubeconfig +DRILLS_KUBE_CONTEXT=boutique +# Uses a local SSH tunnel that forwards to the VM's Minikube API (192.168.49.2:8443) +DRILLS_KUBE_API_SERVER=https://127.0.0.1:16443 +DRILLS_LOADGEN_DEPLOYMENT=loadgenerator +DRILLS_LOADGEN_CONTAINER=main +DRILLS_TARGETED_LOAD_RATE_ENV=RATE +DRILLS_TARGETED_LOAD_USERS_ENV=USERS diff --git a/.env.example b/.env.example index 1dd013a..6a65401 100644 --- a/.env.example +++ b/.env.example @@ -52,3 +52,13 @@ WEBHOOK_PROCESS_TIMEOUT_MS=15000 # true -> accept legacy body-only signatures when X-Webhook-Timestamp is absent # false -> require timestamp-bound signatures only WEBHOOK_ACCEPT_LEGACY_SIGNATURE=true + +# Drill Director / Kubernetes execution (optional for local drill runs) +# If not set, the drill engine will try in-cluster config first, then default kubeconfig loading rules. +# DRILLS_KUBECONFIG_PATH=/absolute/path/to/kubeconfig +# DRILLS_KUBE_CONTEXT=your-context +# DRILLS_KUBE_API_SERVER=https://your-cluster-api-server +# DRILLS_LOADGEN_DEPLOYMENT=loadgenerator +# DRILLS_LOADGEN_CONTAINER=main +# DRILLS_TARGETED_LOAD_RATE_ENV=RATE +# DRILLS_TARGETED_LOAD_USERS_ENV=USERS diff --git a/.gitignore b/.gitignore index 3e9f776..9c29ac0 100644 --- a/.gitignore +++ b/.gitignore @@ -14,6 +14,7 @@ analysis-engine # Config .env +.drills-kubeconfig .DS_Store # IDEs @@ -21,4 +22,4 @@ analysis-engine .idea/ # SQLite -data/*.db* \ No newline at end of file +data/*.db* diff --git a/cmd/analysis-engine/main.go b/cmd/analysis-engine/main.go index 8d861fd..c536cf1 100644 --- a/cmd/analysis-engine/main.go +++ b/cmd/analysis-engine/main.go @@ -18,6 +18,7 @@ import ( "predictive-analysis-engine/pkg/clients/graph" "predictive-analysis-engine/pkg/clients/telemetry" "predictive-analysis-engine/pkg/config" + "predictive-analysis-engine/pkg/drills" "predictive-analysis-engine/pkg/simulation" "predictive-analysis-engine/pkg/storage" "predictive-analysis-engine/pkg/worker" @@ -68,6 +69,21 @@ func main() { decisionsHandler := &api.DecisionsHandler{Store: store} telemetryHandler := &api.TelemetryHandler{Client: telemetryClient, Cfg: cfg} + drillEngine := drills.NewEngine(store, graphClient, telemetryClient, drills.EngineOptions{ + K8sClientOptions: drills.K8sClientOptions{ + KubeconfigPath: cfg.Drills.KubeconfigPath, + KubeContext: cfg.Drills.KubeContext, + APIServer: cfg.Drills.KubeAPIServer, + }, + TargetedLoad: drills.TargetedLoadActionOptions{ + DeploymentName: cfg.Drills.LoadGeneratorDeployment, + ContainerName: cfg.Drills.LoadGeneratorContainer, + RateEnvName: cfg.Drills.TargetedLoadRateEnv, + UsersEnvName: cfg.Drills.TargetedLoadUsersEnv, + }, + }) + drillsHandler := &api.DrillsHandler{Engine: drillEngine, Store: store} + r := chi.NewRouter() r.Use(api.CORSMiddleware) @@ -94,6 +110,7 @@ func main() { r.Get("/dependency-graph/snapshot", apiHandler.DependencyGraphHandler) decisionsHandler.RegisterRoutes(r) + drillsHandler.RegisterRoutes(r) r.Mount("/telemetry", telemetryHandler.Routes()) // Webhook endpoint: receives graph updates from service-graph-engine diff --git a/go.mod b/go.mod index 23c6a68..3a868fa 100644 --- a/go.mod +++ b/go.mod @@ -10,14 +10,21 @@ require ( github.com/mattn/go-sqlite3 v1.14.33 github.com/swaggo/http-swagger/v2 v2.0.2 github.com/swaggo/swag/v2 v2.0.0-rc5 + k8s.io/apimachinery v0.35.1 + k8s.io/client-go v0.35.1 ) require ( github.com/KyleBanks/depth v1.2.1 // indirect github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/emicklei/go-restful/v3 v3.12.2 // indirect + github.com/fxamacker/cbor/v2 v2.9.0 // indirect + github.com/go-logr/logr v1.4.3 // indirect github.com/go-openapi/jsonpointer v0.22.4 // indirect github.com/go-openapi/jsonreference v0.21.4 // indirect github.com/go-openapi/spec v0.22.3 // indirect + github.com/go-openapi/swag v0.23.0 // indirect github.com/go-openapi/swag/conv v0.25.4 // indirect github.com/go-openapi/swag/jsonname v0.25.4 // indirect github.com/go-openapi/swag/jsonutils v0.25.4 // indirect @@ -25,15 +32,41 @@ require ( github.com/go-openapi/swag/stringutils v0.25.4 // indirect github.com/go-openapi/swag/typeutils v0.25.4 // indirect github.com/go-openapi/swag/yamlutils v0.25.4 // indirect + github.com/google/gnostic-models v0.7.0 // indirect github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 // indirect + github.com/josharian/intern v1.0.0 // indirect + github.com/json-iterator/go v1.1.12 // indirect + github.com/mailru/easyjson v0.7.7 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee // indirect + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/oapi-codegen/runtime v1.0.0 // indirect + github.com/spf13/pflag v1.0.9 // indirect github.com/sv-tools/openapi v0.4.0 // indirect github.com/swaggo/files/v2 v2.0.2 // indirect github.com/swaggo/swag v1.16.6 // indirect + github.com/x448/float16 v0.8.4 // indirect + go.yaml.in/yaml/v2 v2.4.3 // indirect go.yaml.in/yaml/v3 v3.0.4 // indirect golang.org/x/mod v0.32.0 // indirect golang.org/x/net v0.49.0 // indirect + golang.org/x/oauth2 v0.30.0 // indirect golang.org/x/sync v0.19.0 // indirect + golang.org/x/sys v0.40.0 // indirect + golang.org/x/term v0.39.0 // indirect + golang.org/x/text v0.33.0 // indirect + golang.org/x/time v0.9.0 // indirect golang.org/x/tools v0.41.0 // indirect + google.golang.org/protobuf v1.36.8 // indirect + gopkg.in/evanphx/json-patch.v4 v4.13.0 // indirect + gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect + k8s.io/api v0.35.1 // indirect + k8s.io/klog/v2 v2.130.1 // indirect + k8s.io/kube-openapi v0.0.0-20250910181357-589584f1c912 // indirect + k8s.io/utils v0.0.0-20251002143259-bc988d571ff4 // indirect + sigs.k8s.io/json v0.0.0-20250730193827-2d320260d730 // indirect + sigs.k8s.io/randfill v1.0.0 // indirect + sigs.k8s.io/structured-merge-diff/v6 v6.3.0 // indirect + sigs.k8s.io/yaml v1.6.0 // indirect ) diff --git a/go.sum b/go.sum index b21d51e..8137b2e 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,7 @@ github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc= github.com/KyleBanks/depth v1.2.1/go.mod h1:jzSb9d0L43HxTQfT+oSA1EEp2q+ne2uh6XgeJcm8brE= +github.com/Masterminds/semver/v3 v3.4.0 h1:Zog+i5UMtVoCU8oKka5P7i9q9HgrJeGzI9SA1Xbatp0= +github.com/Masterminds/semver/v3 v3.4.0/go.mod h1:4V+yj/TJE1HU9XfppCwVMZq3I84lprf4nC11bSS5beM= github.com/RaveNoX/go-jsoncommentstrip v1.0.0/go.mod h1:78ihd09MekBnJnxpICcwzCMzGrKSKYe4AqU6PDYYpjk= github.com/apapsch/go-jsonmerge/v2 v2.0.0 h1:axGnT1gRIfimI7gJifB699GoE/oq+F2MU7Dml6nw9rQ= github.com/apapsch/go-jsonmerge/v2 v2.0.0/go.mod h1:lvDnEdqiQrp0O42VQGgmlKpxL1AP2+08jFMw88y4klk= @@ -7,15 +9,22 @@ github.com/bmatcuk/doublestar v1.1.1/go.mod h1:UD6OnuiIn0yFxxA2le/rnRU1G4RaI4UvF github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/emicklei/go-restful/v3 v3.12.2 h1:DhwDP0vY3k8ZzE0RunuJy8GhNpPL6zqLkDf9B/a0/xU= +github.com/emicklei/go-restful/v3 v3.12.2/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= +github.com/fxamacker/cbor/v2 v2.9.0 h1:NpKPmjDBgUfBms6tr6JZkTHtfFGcMKsw3eGcmD/sapM= +github.com/fxamacker/cbor/v2 v2.9.0/go.mod h1:vM4b+DJCtHn+zz7h3FFp/hDAI9WNWCsZj23V5ytsSxQ= github.com/go-chi/chi/v5 v5.2.4 h1:WtFKPHwlywe8Srng8j2BhOD9312j9cGUxG1SP4V2cR4= github.com/go-chi/chi/v5 v5.2.4/go.mod h1:X7Gx4mteadT3eDOMTsXzmI4/rwUpOwBHLpAfupzFJP0= +github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= +github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-openapi/jsonpointer v0.22.4 h1:dZtK82WlNpVLDW2jlA1YCiVJFVqkED1MegOUy9kR5T4= github.com/go-openapi/jsonpointer v0.22.4/go.mod h1:elX9+UgznpFhgBuaMQ7iu4lvvX1nvNsesQ3oxmYTw80= github.com/go-openapi/jsonreference v0.21.4 h1:24qaE2y9bx/q3uRK/qN+TDwbok1NhbSmGjjySRCHtC8= github.com/go-openapi/jsonreference v0.21.4/go.mod h1:rIENPTjDbLpzQmQWCj5kKj3ZlmEh+EFVbz3RTUh30/4= github.com/go-openapi/spec v0.22.3 h1:qRSmj6Smz2rEBxMnLRBMeBWxbbOvuOoElvSvObIgwQc= github.com/go-openapi/spec v0.22.3/go.mod h1:iIImLODL2loCh3Vnox8TY2YWYJZjMAKYyLH2Mu8lOZs= -github.com/go-openapi/swag v0.22.3 h1:yMBqmnQ0gyZvEb/+KzuWZOXgllrXT4SADYbvDaXHv/g= +github.com/go-openapi/swag v0.23.0 h1:vsEVJDUo2hPJ2tu0/Xc+4noaxyEffXNIs3cOULZ+GrE= +github.com/go-openapi/swag v0.23.0/go.mod h1:esZ8ITTYEsH1V2trKHjAN8Ai7xHb8RV+YSZ577vPjgQ= github.com/go-openapi/swag/conv v0.25.4 h1:/Dd7p0LZXczgUcC/Ikm1+YqVzkEeCc9LnOWjfkpkfe4= github.com/go-openapi/swag/conv v0.25.4/go.mod h1:3LXfie/lwoAv0NHoEuY1hjoFAYkvlqI/Bn5EQDD3PPU= github.com/go-openapi/swag/jsonname v0.25.4 h1:bZH0+MsS03MbnwBXYhuTttMOqk+5KcQ9869Vye1bNHI= @@ -36,8 +45,15 @@ github.com/go-openapi/testify/enable/yaml/v2 v2.0.2 h1:0+Y41Pz1NkbTHz8NngxTuAXxE github.com/go-openapi/testify/enable/yaml/v2 v2.0.2/go.mod h1:kme83333GCtJQHXQ8UKX3IBZu6z8T5Dvy5+CW3NLUUg= github.com/go-openapi/testify/v2 v2.0.2 h1:X999g3jeLcoY8qctY/c/Z8iBHTbwLz7R2WXd6Ub6wls= github.com/go-openapi/testify/v2 v2.0.2/go.mod h1:HCPmvFFnheKK2BuwSA0TbbdxJ3I16pjwMkYkP4Ywn54= -github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= -github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI= +github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= +github.com/google/gnostic-models v0.7.0 h1:qwTtogB15McXDaNqTZdzPJRHvaVJlAl+HVQnLmJEJxo= +github.com/google/gnostic-models v0.7.0/go.mod h1:whL5G0m6dmc5cPxKc5bdKdEN3UjI7OUGxBlw57miDrQ= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/pprof v0.0.0-20250403155104-27863c87afa6 h1:BHT72Gu3keYf3ZEu2J0b1vyeLSOYI8bm5wbJM/8yDe8= +github.com/google/pprof v0.0.0-20250403155104-27863c87afa6/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/influxdata/influxdb-client-go/v2 v2.14.0 h1:AjbBfJuq+QoaXNcrova8smSjwJdUHnwvfjMF71M1iI4= @@ -46,18 +62,46 @@ github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 h1:W9WBk7 github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo= github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= +github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= +github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/juju/gnuflag v0.0.0-20171113085948-2ce1bb71843d/go.mod h1:2PavIy+JPciBPrBUjwbNvtwB6RQlve+hkpll6QSNmOE= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= +github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= github.com/mattn/go-sqlite3 v1.14.33 h1:A5blZ5ulQo2AtayQ9/limgHEkFreKj1Dv226a1K73s0= github.com/mattn/go-sqlite3 v1.14.33/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee h1:W5t00kpgFdJifH4BDsTlE89Zl93FEloxaWZfGcifgq8= +github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/oapi-codegen/runtime v1.0.0 h1:P4rqFX5fMFWqRzY9M/3YF9+aPSPPB06IzP2P7oOxrWo= github.com/oapi-codegen/runtime v1.0.0/go.mod h1:LmCUMQuPB4M/nLXilQXhHw+BLZdDb18B34OO356yJ/A= +github.com/onsi/ginkgo/v2 v2.27.2 h1:LzwLj0b89qtIy6SSASkzlNvX6WktqurSHwkk2ipF/Ns= +github.com/onsi/ginkgo/v2 v2.27.2/go.mod h1:ArE1D/XhNXBXCBkKOLkbsb2c81dQHCRcF5zwn/ykDRo= +github.com/onsi/gomega v1.38.2 h1:eZCjf2xjZAqe+LeWvKb5weQ+NcPwX84kqJ0cZNxok2A= +github.com/onsi/gomega v1.38.2/go.mod h1:W2MJcYxRGV63b418Ai34Ud0hEdTVXq9NW9+Sx6uXf3k= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= +github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= +github.com/spf13/pflag v1.0.9 h1:9exaQaMOCwffKiiiYk6/BndUBv+iRViNW+4lEMi0PvY= +github.com/spf13/pflag v1.0.9/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/spkg/bom v0.0.0-20160624110644-59b7046e48ad/go.mod h1:qLr4V1qq6nMqFKkMo8ZTx3f+BZEkzsRUY10Xsm2mwU0= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= -github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= -github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/sv-tools/openapi v0.4.0 h1:UhD9DVnGox1hfTePNclpUzUFgos57FvzT2jmcAuTOJ4= github.com/sv-tools/openapi v0.4.0/go.mod h1:kD/dG+KP0+Fom1r6nvcj/ORtLus8d8enXT6dyRZDirE= github.com/swaggo/files/v2 v2.0.2 h1:Bq4tgS/yxLB/3nwOMcul5oLEUKa877Ykgz3CJMVbQKU= @@ -68,17 +112,58 @@ github.com/swaggo/swag v1.16.6 h1:qBNcx53ZaX+M5dxVyTrgQ0PJ/ACK+NzhwcbieTt+9yI= github.com/swaggo/swag v1.16.6/go.mod h1:ngP2etMK5a0P3QBizic5MEwpRmluJZPHjXcMoj4Xesg= github.com/swaggo/swag/v2 v2.0.0-rc5 h1:fK7d6ET9rrEsdB8IyuwXREWMcyQN3N7gawGFbbrjgHk= github.com/swaggo/swag/v2 v2.0.0-rc5/go.mod h1:kCL8Fu4Zl8d5tB2Bgj96b8wRowwrwk175bZHXfuGVFI= +github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= +github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= +go.yaml.in/yaml/v2 v2.4.3 h1:6gvOSjQoTB3vt1l+CU+tSyi/HOjfOjRLJ4YwYZGwRO0= +go.yaml.in/yaml/v2 v2.4.3/go.mod h1:zSxWcmIDjOzPXpjlTTbAsKokqkDNAVtZO0WOMiT90s8= go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc= go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg= golang.org/x/mod v0.32.0 h1:9F4d3PHLljb6x//jOyokMv3eX+YDeepZSEo3mFJy93c= golang.org/x/mod v0.32.0/go.mod h1:SgipZ/3h2Ci89DlEtEXWUk/HteuRin+HHhN+WbNhguU= golang.org/x/net v0.49.0 h1:eeHFmOGUTtaaPSGNmjBKpbng9MulQsJURQUAfUwY++o= golang.org/x/net v0.49.0/go.mod h1:/ysNB2EvaqvesRkuLAyjI1ycPZlQHM3q01F02UY/MV8= +golang.org/x/oauth2 v0.30.0 h1:dnDm7JmhM45NNpd8FDDeLhK6FwqbOf4MLCM9zb1BOHI= +golang.org/x/oauth2 v0.30.0/go.mod h1:B++QgG3ZKulg6sRPGD/mqlHQs5rB3Ml9erfeDY7xKlU= golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +golang.org/x/sys v0.40.0 h1:DBZZqJ2Rkml6QMQsZywtnjnnGvHza6BTfYFWY9kjEWQ= +golang.org/x/sys v0.40.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/term v0.39.0 h1:RclSuaJf32jOqZz74CkPA9qFuVTX7vhLlpfj/IGWlqY= +golang.org/x/term v0.39.0/go.mod h1:yxzUCTP/U+FzoxfdKmLaA0RV1WgE0VY7hXBwKtY/4ww= +golang.org/x/text v0.33.0 h1:B3njUFyqtHDUI5jMn1YIr5B0IE2U0qck04r6d4KPAxE= +golang.org/x/text v0.33.0/go.mod h1:LuMebE6+rBincTi9+xWTY8TztLzKHc/9C1uBCG27+q8= +golang.org/x/time v0.9.0 h1:EsRrnYcQiGH+5FfbgvV4AP7qEZstoyrHB0DzarOQ4ZY= +golang.org/x/time v0.9.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.41.0 h1:a9b8iMweWG+S0OBnlU36rzLp20z1Rp10w+IY2czHTQc= golang.org/x/tools v0.41.0/go.mod h1:XSY6eDqxVNiYgezAVqqCeihT4j1U2CCsqvH3WhQpnlg= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +google.golang.org/protobuf v1.36.8 h1:xHScyCOEuuwZEc6UtSOvPbAT4zRh0xcNRYekJwfqyMc= +google.golang.org/protobuf v1.36.8/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/evanphx/json-patch.v4 v4.13.0 h1:czT3CmqEaQ1aanPc5SdlgQrrEIb8w/wwCvWWnfEbYzo= +gopkg.in/evanphx/json-patch.v4 v4.13.0/go.mod h1:p8EYWUEYMpynmqDbY58zCKCFZw8pRWMG4EsWvDvM72M= +gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= +gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +k8s.io/api v0.35.1 h1:0PO/1FhlK/EQNVK5+txc4FuhQibV25VLSdLMmGpDE/Q= +k8s.io/api v0.35.1/go.mod h1:28uR9xlXWml9eT0uaGo6y71xK86JBELShLy4wR1XtxM= +k8s.io/apimachinery v0.35.1 h1:yxO6gV555P1YV0SANtnTjXYfiivaTPvCTKX6w6qdDsU= +k8s.io/apimachinery v0.35.1/go.mod h1:jQCgFZFR1F4Ik7hvr2g84RTJSZegBc8yHgFWKn//hns= +k8s.io/client-go v0.35.1 h1:+eSfZHwuo/I19PaSxqumjqZ9l5XiTEKbIaJ+j1wLcLM= +k8s.io/client-go v0.35.1/go.mod h1:1p1KxDt3a0ruRfc/pG4qT/3oHmUj1AhSHEcxNSGg+OA= +k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk= +k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= +k8s.io/kube-openapi v0.0.0-20250910181357-589584f1c912 h1:Y3gxNAuB0OBLImH611+UDZcmKS3g6CthxToOb37KgwE= +k8s.io/kube-openapi v0.0.0-20250910181357-589584f1c912/go.mod h1:kdmbQkyfwUagLfXIad1y2TdrjPFWp2Q89B3qkRwf/pQ= +k8s.io/utils v0.0.0-20251002143259-bc988d571ff4 h1:SjGebBtkBqHFOli+05xYbK8YF1Dzkbzn+gDM4X9T4Ck= +k8s.io/utils v0.0.0-20251002143259-bc988d571ff4/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= +sigs.k8s.io/json v0.0.0-20250730193827-2d320260d730 h1:IpInykpT6ceI+QxKBbEflcR5EXP7sU1kvOlxwZh5txg= +sigs.k8s.io/json v0.0.0-20250730193827-2d320260d730/go.mod h1:mdzfpAEoE6DHQEN0uh9ZbOCuHbLK5wOm7dK4ctXE9Tg= +sigs.k8s.io/randfill v1.0.0 h1:JfjMILfT8A6RbawdsK2JXGBR5AQVfd+9TbzrlneTyrU= +sigs.k8s.io/randfill v1.0.0/go.mod h1:XeLlZ/jmk4i1HRopwe7/aU3H5n1zNUcX6TM94b3QxOY= +sigs.k8s.io/structured-merge-diff/v6 v6.3.0 h1:jTijUJbW353oVOd9oTlifJqOGEkUw2jB/fXCbTiQEco= +sigs.k8s.io/structured-merge-diff/v6 v6.3.0/go.mod h1:M3W8sfWvn2HhQDIbGWj3S099YozAsymCo/wrT5ohRUE= +sigs.k8s.io/yaml v1.6.0 h1:G8fkbMSAFqgEFgh4b1wmtzDnioxFCUgTZhlbj5P9QYs= +sigs.k8s.io/yaml v1.6.0/go.mod h1:796bPqUfzR/0jLAl6XjHl3Ck7MiyVv8dbTdyT3/pMf4= diff --git a/pkg/api/demo_mode.go b/pkg/api/demo_mode.go index be3b611..2d1b3b6 100644 --- a/pkg/api/demo_mode.go +++ b/pkg/api/demo_mode.go @@ -41,9 +41,17 @@ type demoSnapshotEdge struct { P99 float64 `json:"p99"` } +const ( + demoSnapshotSeedV1 = "seed-v1" + demoFailureServiceID = "default:checkoutservice" + demoScaleServiceID = "default:recommendationservice" + demoScaleExpectedPods = 2 + demoScaleProjectedPods = 5 +) + var supportedDemoSnapshots = []demoSnapshotInfo{ { - ID: "seed-v1", + ID: demoSnapshotSeedV1, Label: "Seed Snapshot v1", Description: "Deterministic snapshot used for panel demonstration fallback mode.", }, @@ -55,13 +63,13 @@ func listDemoSnapshots() []demoSnapshotInfo { func loadDemoFailureResult(req simulation.FailureSimulationRequest, snapshotID string) (*simulation.FailureSimulationResult, error) { id := normalizeDemoSnapshotID(snapshotID) - if id != "seed-v1" { + if id != demoSnapshotSeedV1 { return nil, fmt.Errorf("unsupported snapshotId %q", snapshotID) } serviceID := normalizeServiceID(req.ServiceId) - if serviceID != "default:checkoutservice" { - return nil, fmt.Errorf("demo mode only supports failure scenario for default:checkoutservice") + if serviceID != demoFailureServiceID { + return nil, fmt.Errorf("demo mode only supports failure scenario for %s", demoFailureServiceID) } filePath := filepath.Join("data", "demo", "scenarios", "failure-checkoutservice.json") @@ -92,20 +100,20 @@ func loadDemoFailureResult(req simulation.FailureSimulationRequest, snapshotID s func loadDemoScalingResult(req simulation.ScalingSimulationRequest, snapshotID string) (*simulation.ScalingSimulationResult, error) { id := normalizeDemoSnapshotID(snapshotID) - if id != "seed-v1" { + if id != demoSnapshotSeedV1 { return nil, fmt.Errorf("unsupported snapshotId %q", snapshotID) } serviceID := normalizeServiceID(req.ServiceId) - if serviceID != "default:recommendationservice" { - return nil, fmt.Errorf("demo mode only supports scaling scenario for default:recommendationservice") + if serviceID != demoScaleServiceID { + return nil, fmt.Errorf("demo mode only supports scaling scenario for %s", demoScaleServiceID) } - if req.CurrentPods > 0 && req.CurrentPods != 2 { - return nil, fmt.Errorf("demo mode scaling scenario expects currentPods=2") + if req.CurrentPods > 0 && req.CurrentPods != demoScaleExpectedPods { + return nil, fmt.Errorf("demo mode scaling scenario expects currentPods=%d", demoScaleExpectedPods) } - if req.NewPods > 0 && req.NewPods != 5 { - return nil, fmt.Errorf("demo mode scaling scenario expects newPods=5") + if req.NewPods > 0 && req.NewPods != demoScaleProjectedPods { + return nil, fmt.Errorf("demo mode scaling scenario expects newPods=%d", demoScaleProjectedPods) } filePath := filepath.Join("data", "demo", "scenarios", "scale-recommendationservice.json") @@ -129,15 +137,15 @@ func loadDemoScalingResult(req simulation.ScalingSimulationRequest, snapshotID s result.SourceMode = "demo" result.SnapshotId = id result.Neighborhood.DepthUsed = depth - result.CurrentPods = 2 - result.NewPods = 5 + result.CurrentPods = demoScaleExpectedPods + result.NewPods = demoScaleProjectedPods return &result, nil } func loadDemoSimulationContext(serviceID string, k int, direction string, snapshotID string) (*simulation.SimulationContextResponse, error) { id := normalizeDemoSnapshotID(snapshotID) - if id != "seed-v1" { + if id != demoSnapshotSeedV1 { return nil, fmt.Errorf("unsupported snapshotId %q", snapshotID) } @@ -156,7 +164,7 @@ func loadDemoSimulationContext(serviceID string, k int, direction string, snapsh return nil, fmt.Errorf("direction must be one of: both, in, out") } - filePath := filepath.Join("data", "demo", "snapshots", "seed-v1.json") + filePath := filepath.Join("data", "demo", "snapshots", demoSnapshotSeedV1+".json") var snapshot demoSnapshotFile if err := loadDemoJSON(filePath, &snapshot); err != nil { return nil, err diff --git a/pkg/api/drills.go b/pkg/api/drills.go new file mode 100644 index 0000000..83c20ee --- /dev/null +++ b/pkg/api/drills.go @@ -0,0 +1,236 @@ +package api + +import ( + "encoding/json" + "errors" + "net/http" + "strings" + + "predictive-analysis-engine/pkg/drills" + "predictive-analysis-engine/pkg/storage" + + "github.com/go-chi/chi/v5" +) + +type DrillsHandler struct { + Engine *drills.Engine + Store *storage.DecisionStore +} + +func (h *DrillsHandler) RegisterRoutes(r chi.Router) { + r.Route("/drills", func(r chi.Router) { + r.Get("/k8s-health", h.K8sHealth) + r.Post("/plan", h.PlanDrill) + r.Post("/run", h.RunDrill) + r.Get("/runs/{id}", h.GetDrillRun) + r.Post("/runs/{id}/abort", h.AbortDrillRun) + r.Post("/runs/{id}/recover", h.RecoverDrillRun) + r.Post("/runs/{id}/accept", h.AcceptDrillRun) + r.Get("/history", h.ListHistory) + }) +} + +type DrillPlanRequest struct { + Type string `json:"type"` + Target string `json:"target"` + Config json.RawMessage `json:"config"` +} + +func (h *DrillsHandler) PlanDrill(w http.ResponseWriter, r *http.Request) { + var req DrillPlanRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, "Invalid request payload", http.StatusBadRequest) + return + } + + run, err := h.Engine.PlanDrill(req.Type, req.Target, req.Config) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(run) +} + +type DrillRunRequest struct { + RunID string `json:"runId"` +} + +func (h *DrillsHandler) RunDrill(w http.ResponseWriter, r *http.Request) { + var req DrillRunRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, "Invalid request payload", http.StatusBadRequest) + return + } + + if err := h.Engine.ExecuteDrill(req.RunID); err != nil { + status := http.StatusInternalServerError + if strings.Contains(strings.ToLower(err.Error()), "drill preflight failed") { + status = http.StatusPreconditionFailed + } + http.Error(w, err.Error(), status) + return + } + + w.WriteHeader(http.StatusAccepted) + json.NewEncoder(w).Encode(map[string]string{"status": "accepted", "runId": req.RunID}) +} + +type drillRunResponse struct { + storage.DrillRun + CanRecover bool `json:"canRecover,omitempty"` + RecoveryDeadline *string `json:"recoveryDeadline,omitempty"` + RecoveryMode string `json:"recoveryMode,omitempty"` + RecoverySource string `json:"recoverySource,omitempty"` +} + +func (h *DrillsHandler) GetDrillRun(w http.ResponseWriter, r *http.Request) { + id := chi.URLParam(r, "id") + if id == "" { + http.Error(w, "Missing drill rum id", http.StatusBadRequest) + return + } + + run, err := h.Store.GetDrillRun(id) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + if run == nil { + http.Error(w, "Run not found", http.StatusNotFound) + return + } + + resp := drillRunResponse{DrillRun: *run} + if h.Engine != nil { + if runtime := h.Engine.RuntimeState(id); runtime != nil { + resp.CanRecover = runtime.CanRecover + resp.RecoveryDeadline = runtime.RecoveryDeadline + resp.RecoveryMode = runtime.RecoveryMode + resp.RecoverySource = runtime.RecoverySource + } + } + if resp.RecoverySource == "" { + resp.RecoverySource = inferRecoverySource(run) + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(resp) +} + +func (h *DrillsHandler) AbortDrillRun(w http.ResponseWriter, r *http.Request) { + id := chi.URLParam(r, "id") + if id == "" { + http.Error(w, "Missing drill run id", http.StatusBadRequest) + return + } + + if err := h.Engine.AbortDrill(id); err != nil { + status := http.StatusInternalServerError + if errors.Is(err, drills.ErrDrillNotActive) { + status = http.StatusConflict + } + http.Error(w, err.Error(), status) + return + } + + w.WriteHeader(http.StatusAccepted) + json.NewEncoder(w).Encode(map[string]string{"status": "aborted"}) +} + +func (h *DrillsHandler) RecoverDrillRun(w http.ResponseWriter, r *http.Request) { + id := chi.URLParam(r, "id") + if id == "" { + http.Error(w, "Missing drill run id", http.StatusBadRequest) + return + } + + if err := h.Engine.RecoverDrill(id); err != nil { + status := http.StatusInternalServerError + if errors.Is(err, drills.ErrDrillNotRecoverable) { + status = http.StatusConflict + } + http.Error(w, err.Error(), status) + return + } + + w.WriteHeader(http.StatusAccepted) + json.NewEncoder(w).Encode(map[string]string{"status": "recovering"}) +} + +func (h *DrillsHandler) AcceptDrillRun(w http.ResponseWriter, r *http.Request) { + id := chi.URLParam(r, "id") + if id == "" { + http.Error(w, "Missing drill run id", http.StatusBadRequest) + return + } + + if err := h.Engine.AcceptDrill(id); err != nil { + status := http.StatusInternalServerError + if errors.Is(err, drills.ErrDrillNotRecoverable) { + status = http.StatusConflict + } + http.Error(w, err.Error(), status) + return + } + + w.WriteHeader(http.StatusAccepted) + json.NewEncoder(w).Encode(map[string]string{"status": "accepted"}) +} + +func (h *DrillsHandler) K8sHealth(w http.ResponseWriter, r *http.Request) { + if h.Engine == nil { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusServiceUnavailable) + json.NewEncoder(w).Encode(map[string]interface{}{ + "reachable": false, + "error": "drill engine not initialized", + }) + return + } + + result := h.Engine.CheckK8sConnectivity() + + w.Header().Set("Content-Type", "application/json") + if !result.Reachable { + w.WriteHeader(http.StatusServiceUnavailable) + } else { + w.WriteHeader(http.StatusOK) + } + json.NewEncoder(w).Encode(result) +} + +func (h *DrillsHandler) ListHistory(w http.ResponseWriter, r *http.Request) { + runs, err := h.Store.ListDrillRuns(50) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(runs) +} + +func inferRecoverySource(run *storage.DrillRun) string { + if run == nil || len(run.Timeline) == 0 { + return "" + } + + for i := len(run.Timeline) - 1; i >= 0; i-- { + step := run.Timeline[i] + if step.Phase != "Recovery" { + continue + } + msg := strings.ToLower(step.Message) + switch { + case strings.Contains(msg, "source: manual"): + return "manual" + case strings.Contains(msg, "source: failsafe"): + return "failsafe" + case strings.Contains(msg, "source: abort"): + return "abort" + } + } + return "" +} diff --git a/pkg/api/handlers.go b/pkg/api/handlers.go index 7c77117..ed3d6fa 100644 --- a/pkg/api/handlers.go +++ b/pkg/api/handlers.go @@ -536,6 +536,18 @@ func (h *Handler) SimulationCapabilitiesHandler(w http.ResponseWriter, r *http.R respondJSON(w, http.StatusOK, map[string]interface{}{ "enabled": []string{"failure", "scale"}, "experimental": []string{"add-service"}, + "demoConstraints": map[string]interface{}{ + "note": "Demo Snapshot Mode uses deterministic fixtures and supports a curated subset of scenarios.", + "addServiceSupported": false, + "failure": map[string]interface{}{ + "serviceId": demoFailureServiceID, + }, + "scale": map[string]interface{}{ + "serviceId": demoScaleServiceID, + "currentPods": demoScaleExpectedPods, + "newPods": demoScaleProjectedPods, + }, + }, }) } diff --git a/pkg/clients/telemetry/client.go b/pkg/clients/telemetry/client.go index ae6e1f5..83b32c4 100644 --- a/pkg/clients/telemetry/client.go +++ b/pkg/clients/telemetry/client.go @@ -28,9 +28,9 @@ type ServiceMetric struct { Service string `json:"service"` Namespace string `json:"namespace"` RequestRate float64 `json:"requestRate"` - ErrorRate float64 `json:"errorRate"` + ErrorRate *float64 `json:"errorRate"` P50 *float64 `json:"p50"` - P95 float64 `json:"p95"` + P95 *float64 `json:"p95"` P99 *float64 `json:"p99"` Availability *float64 `json:"availability"` } @@ -206,6 +206,7 @@ func (c *TelemetryClient) GetServiceMetrics(ctx context.Context, service string, } var metrics []ServiceMetric + preserveSparseNulls := service == "" for _, result := range res.Results { for _, series := range result.Series { @@ -219,72 +220,93 @@ func (c *TelemetryClient) GetServiceMetrics(ctx context.Context, service string, } for _, row := range series.Values { - - if len(row) != len(series.Columns) { + m, ok := parseServiceMetricRow(series.Columns, colMap, row, svcName, namespace, preserveSparseNulls) + if !ok { continue } + metrics = append(metrics, m) + } + } + } - getFloat := func(name string) float64 { - idx, ok := colMap[name] - if !ok || row[idx] == nil { - return 0 - } + return metrics, nil +} - if f, ok := row[idx].(float64); ok { - return f - } +func parseServiceMetricRow(columns []string, colMap map[string]int, row []interface{}, svcName, namespace string, preserveSparseNulls bool) (ServiceMetric, bool) { + if len(row) != len(columns) { + return ServiceMetric{}, false + } - return 0 - } + getFloat := func(name string) float64 { + idx, ok := colMap[name] + if !ok || row[idx] == nil { + return 0 + } - getOptionalFloat := func(name string) *float64 { - idx, ok := colMap[name] - if !ok || row[idx] == nil { - return nil - } + if f, ok := row[idx].(float64); ok { + return f + } - if f, ok := row[idx].(float64); ok { - v := f - return &v - } + return 0 + } - return nil - } + getOptionalFloat := func(name string) *float64 { + idx, ok := colMap[name] + if !ok || row[idx] == nil { + return nil + } - getTime := func() string { - idx, ok := colMap["time"] - if !ok || row[idx] == nil { - return "" - } + if f, ok := row[idx].(float64); ok { + v := f + return &v + } - if s, ok := row[idx].(string); ok { - return s - } + return nil + } - if f, ok := row[idx].(float64); ok { - t := time.Unix(0, int64(f)) - return t.Format(time.RFC3339) - } - return "" - } + getFloatPtrOrZero := func(name string) *float64 { + if v := getOptionalFloat(name); v != nil { + return v + } + zero := 0.0 + return &zero + } - m := ServiceMetric{ - Timestamp: getTime(), - Service: svcName, - Namespace: namespace, - RequestRate: getFloat("avg_request_rate"), - ErrorRate: getFloat("avg_error_rate"), - P50: getOptionalFloat("avg_p50"), - P95: getFloat("avg_p95"), - P99: getOptionalFloat("avg_p99"), - Availability: getOptionalFloat("avg_availability"), - } - metrics = append(metrics, m) - } + getTime := func() string { + idx, ok := colMap["time"] + if !ok || row[idx] == nil { + return "" } + + if s, ok := row[idx].(string); ok { + return s + } + + if f, ok := row[idx].(float64); ok { + t := time.Unix(0, int64(f)) + return t.Format(time.RFC3339) + } + return "" } - return metrics, nil + errorRate := getFloatPtrOrZero("avg_error_rate") + p95 := getFloatPtrOrZero("avg_p95") + if preserveSparseNulls { + errorRate = getOptionalFloat("avg_error_rate") + p95 = getOptionalFloat("avg_p95") + } + + return ServiceMetric{ + Timestamp: getTime(), + Service: svcName, + Namespace: namespace, + RequestRate: getFloat("avg_request_rate"), + ErrorRate: errorRate, + P50: getOptionalFloat("avg_p50"), + P95: p95, + P99: getOptionalFloat("avg_p99"), + Availability: getOptionalFloat("avg_availability"), + }, true } func (c *TelemetryClient) GetEdgeMetrics(ctx context.Context, fromSvc, toSvc, from, to string, stepSeconds int) ([]EdgeMetric, error) { diff --git a/pkg/clients/telemetry/client_test.go b/pkg/clients/telemetry/client_test.go new file mode 100644 index 0000000..c92e91f --- /dev/null +++ b/pkg/clients/telemetry/client_test.go @@ -0,0 +1,102 @@ +package telemetry + +import "testing" + +func TestParseServiceMetricRow_NullHandlingScope(t *testing.T) { + columns := []string{ + "time", + "avg_request_rate", + "avg_error_rate", + "avg_p50", + "avg_p95", + "avg_p99", + "avg_availability", + } + colMap := map[string]int{ + "time": 0, + "avg_request_rate": 1, + "avg_error_rate": 2, + "avg_p50": 3, + "avg_p95": 4, + "avg_p99": 5, + "avg_availability": 6, + } + + sparseRow := []interface{}{ + "2026-02-23T05:05:00Z", + float64(0), + nil, + nil, + nil, + nil, + float64(100), + } + + t.Run("global query preserves nulls", func(t *testing.T) { + got, ok := parseServiceMetricRow(columns, colMap, sparseRow, "adservice", "onlineboutique", true) + if !ok { + t.Fatalf("expected row to parse") + } + if got.RequestRate != 0 { + t.Fatalf("requestRate = %v, want 0", got.RequestRate) + } + if got.ErrorRate != nil { + t.Fatalf("errorRate = %v, want nil", *got.ErrorRate) + } + if got.P95 != nil { + t.Fatalf("p95 = %v, want nil", *got.P95) + } + if got.Availability == nil || *got.Availability != 100 { + t.Fatalf("availability = %v, want 100", got.Availability) + } + }) + + t.Run("service-specific query keeps legacy zero coercion", func(t *testing.T) { + got, ok := parseServiceMetricRow(columns, colMap, sparseRow, "adservice", "onlineboutique", false) + if !ok { + t.Fatalf("expected row to parse") + } + if got.ErrorRate == nil || *got.ErrorRate != 0 { + t.Fatalf("errorRate = %v, want 0", got.ErrorRate) + } + if got.P95 == nil || *got.P95 != 0 { + t.Fatalf("p95 = %v, want 0", got.P95) + } + }) + + t.Run("non-null values remain intact", func(t *testing.T) { + row := []interface{}{ + "2026-02-23T05:06:00Z", + float64(3.04), + float64(0.25), + nil, + float64(12.91), + nil, + float64(100), + } + + global, ok := parseServiceMetricRow(columns, colMap, row, "adservice", "onlineboutique", true) + if !ok { + t.Fatalf("expected row to parse in global mode") + } + local, ok := parseServiceMetricRow(columns, colMap, row, "adservice", "onlineboutique", false) + if !ok { + t.Fatalf("expected row to parse in service mode") + } + + for _, tc := range []struct { + name string + m ServiceMetric + }{ + {name: "global", m: global}, + {name: "service", m: local}, + } { + if tc.m.ErrorRate == nil || *tc.m.ErrorRate != 0.25 { + t.Fatalf("%s errorRate = %v, want 0.25", tc.name, tc.m.ErrorRate) + } + if tc.m.P95 == nil || *tc.m.P95 != 12.91 { + t.Fatalf("%s p95 = %v, want 12.91", tc.name, tc.m.P95) + } + } + }) +} diff --git a/pkg/config/config.go b/pkg/config/config.go index a725a67..85f6c97 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -16,6 +16,7 @@ type Config struct { TelemetryWorker TelemetryWorkerConfig Telemetry TelemetryConfig Webhook WebhookConfig + Drills DrillsConfig } type SimulationConfig struct { @@ -76,6 +77,16 @@ type WebhookConfig struct { AcceptLegacySignature bool } +type DrillsConfig struct { + KubeconfigPath string + KubeContext string + KubeAPIServer string + LoadGeneratorDeployment string + LoadGeneratorContainer string + TargetedLoadRateEnv string + TargetedLoadUsersEnv string +} + func Load() (*Config, error) { cfg := &Config{ Simulation: SimulationConfig{ @@ -92,7 +103,7 @@ func Load() (*Config, error) { }, GraphAPI: GraphAPIConfig{ BaseURL: getGraphBaseURL(), - TimeoutMs: getEnvInt("GRAPH_API_TIMEOUT_MS", 5000), + TimeoutMs: getEnvInt("GRAPH_API_TIMEOUT_MS", 15000), }, RateLimit: RateLimitConfig{ WindowMs: getEnvInt("RATE_LIMIT_WINDOW_MS", 60000), @@ -127,6 +138,15 @@ func Load() (*Config, error) { ProcessTimeoutMs: getEnvInt("WEBHOOK_PROCESS_TIMEOUT_MS", 15000), AcceptLegacySignature: getEnv("WEBHOOK_ACCEPT_LEGACY_SIGNATURE", "true") != "false", }, + Drills: DrillsConfig{ + KubeconfigPath: getEnv("DRILLS_KUBECONFIG_PATH", getEnv("DRILLS_KUBECONFIG", "")), + KubeContext: getEnv("DRILLS_KUBE_CONTEXT", ""), + KubeAPIServer: getEnv("DRILLS_KUBE_API_SERVER", ""), + LoadGeneratorDeployment: getEnv("DRILLS_LOADGEN_DEPLOYMENT", "loadgenerator"), + LoadGeneratorContainer: getEnv("DRILLS_LOADGEN_CONTAINER", "main"), + TargetedLoadRateEnv: getEnv("DRILLS_TARGETED_LOAD_RATE_ENV", "RATE"), + TargetedLoadUsersEnv: getEnv("DRILLS_TARGETED_LOAD_USERS_ENV", "USERS"), + }, } return cfg, nil diff --git a/pkg/drills/actions.go b/pkg/drills/actions.go new file mode 100644 index 0000000..7035114 --- /dev/null +++ b/pkg/drills/actions.go @@ -0,0 +1,951 @@ +package drills + +import ( + "context" + "encoding/json" + "fmt" + "net" + neturl "net/url" + "os" + "path/filepath" + "sort" + "strconv" + "strings" + "time" + + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/api/networking/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/util/retry" +) + +// Action defines the interface for drill execution tactics. +type Action interface { + Execute(ctx context.Context, namespace, target string, config json.RawMessage) error + Rollback(ctx context.Context, namespace, target string, config json.RawMessage) error +} + +type K8sClientOptions struct { + KubeconfigPath string + KubeContext string + APIServer string +} + +type K8sClientFactory struct { + opts K8sClientOptions +} + +func NewK8sClientFactory(opts K8sClientOptions) *K8sClientFactory { + return &K8sClientFactory{opts: opts} +} + +func (f *K8sClientFactory) Clientset() (*kubernetes.Clientset, error) { + restCfg, err := f.restConfig() + if err != nil { + return nil, err + } + return kubernetes.NewForConfig(restCfg) +} + +func (f *K8sClientFactory) PreflightConnectivity(ctx context.Context) error { + clientset, restCfg, err := f.clientsetWithConfig() + if err != nil { + return wrapK8sPreflightError("load kubernetes client configuration", "", err) + } + if _, err := clientset.Discovery().ServerVersion(); err != nil { + return wrapK8sPreflightError("probe kubernetes api server", restCfg.Host, err) + } + return nil +} + +func (f *K8sClientFactory) PreflightDeploymentAccess(ctx context.Context, namespace, deployment string) error { + clientset, restCfg, err := f.clientsetWithConfig() + if err != nil { + return wrapK8sPreflightError("load kubernetes client configuration", "", err) + } + if _, err := clientset.AppsV1().Deployments(namespace).Get(ctx, deployment, metav1.GetOptions{}); err != nil { + resource := fmt.Sprintf("read deployment %s/%s", namespace, deployment) + return wrapK8sPreflightError(resource, restCfg.Host, err) + } + return nil +} + +func (f *K8sClientFactory) PreflightNamespaceAccess(ctx context.Context, namespace string) error { + clientset, restCfg, err := f.clientsetWithConfig() + if err != nil { + return wrapK8sPreflightError("load kubernetes client configuration", "", err) + } + if _, err := clientset.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{Limit: 1}); err != nil { + resource := fmt.Sprintf("read pods in namespace %s", namespace) + return wrapK8sPreflightError(resource, restCfg.Host, err) + } + return nil +} + +func (f *K8sClientFactory) clientsetWithConfig() (*kubernetes.Clientset, *rest.Config, error) { + restCfg, err := f.restConfig() + if err != nil { + return nil, nil, err + } + clientset, err := kubernetes.NewForConfig(restCfg) + if err != nil { + return nil, nil, err + } + return clientset, restCfg, nil +} + +func (f *K8sClientFactory) restConfig() (*rest.Config, error) { + opts := f.resolvedOptions() + + var inClusterErr error + var kubeconfigErr error + + kubeconfigPreferred := opts.KubeconfigPath != "" || opts.KubeContext != "" || os.Getenv("KUBECONFIG") != "" + if kubeconfigPreferred { + if cfg, err := buildKubeconfigRestConfig(opts); err == nil { + return cfg, nil + } else { + kubeconfigErr = err + } + } + + if cfg, err := rest.InClusterConfig(); err == nil { + if opts.APIServer != "" { + cfg.Host = opts.APIServer + } + return cfg, nil + } else { + inClusterErr = err + } + + if !kubeconfigPreferred { + if cfg, err := buildKubeconfigRestConfig(opts); err == nil { + return cfg, nil + } else { + kubeconfigErr = err + } + } + + return nil, fmt.Errorf("failed to load k8s config (in-cluster: %v; kubeconfig: %v)", inClusterErr, kubeconfigErr) +} + +func (f *K8sClientFactory) resolvedOptions() K8sClientOptions { + opts := K8sClientOptions{} + if f != nil { + opts = f.opts + } + if opts.KubeconfigPath == "" { + opts.KubeconfigPath = firstNonEmptyEnv("DRILLS_KUBECONFIG_PATH", "DRILLS_KUBECONFIG") + } + if opts.KubeContext == "" { + opts.KubeContext = os.Getenv("DRILLS_KUBE_CONTEXT") + } + if opts.APIServer == "" { + opts.APIServer = os.Getenv("DRILLS_KUBE_API_SERVER") + } + return opts +} + +func buildKubeconfigRestConfig(opts K8sClientOptions) (*rest.Config, error) { + loadingRules := clientcmd.NewDefaultClientConfigLoadingRules() + if opts.KubeconfigPath != "" { + loadingRules.ExplicitPath = expandHomePath(opts.KubeconfigPath) + } + + overrides := &clientcmd.ConfigOverrides{} + if opts.KubeContext != "" { + overrides.CurrentContext = opts.KubeContext + } + + clientConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loadingRules, overrides) + cfg, err := clientConfig.ClientConfig() + if err != nil { + return nil, err + } + if opts.APIServer != "" { + cfg.Host = opts.APIServer + } + return cfg, nil +} + +func wrapK8sPreflightError(operation, apiHost string, err error) error { + if err == nil { + return nil + } + base := fmt.Sprintf("drill preflight failed: unable to %s", operation) + if apiHost != "" { + base = fmt.Sprintf("%s via %s", base, apiHost) + } + if isLoopbackAPIHost(apiHost) && strings.Contains(strings.ToLower(err.Error()), "connect: connection refused") { + return fmt.Errorf( + "%s: %w (detected loopback kubernetes api endpoint; if this environment relies on an SSH tunnel, start the tunnel on the analysis-engine host or set DRILLS_KUBE_API_SERVER / DRILLS_KUBECONFIG_PATH to a reachable cluster endpoint before calling /drills/run)", + base, + err, + ) + } + return fmt.Errorf("%s: %w", base, err) +} + +func isLoopbackAPIHost(raw string) bool { + if strings.TrimSpace(raw) == "" { + return false + } + parsed, err := neturl.Parse(raw) + if err != nil { + return false + } + host := strings.TrimSpace(parsed.Hostname()) + if host == "" { + return false + } + if strings.EqualFold(host, "localhost") { + return true + } + ip := net.ParseIP(host) + return ip != nil && ip.IsLoopback() +} + +func expandHomePath(path string) string { + if path == "" { + return path + } + if strings.HasPrefix(path, "~/") { + if home, err := os.UserHomeDir(); err == nil && home != "" { + return filepath.Join(home, path[2:]) + } + } + return path +} + +func firstNonEmptyEnv(keys ...string) string { + for _, key := range keys { + if value := strings.TrimSpace(os.Getenv(key)); value != "" { + return value + } + } + return "" +} + +// ScaleDeploymentAction handles scaling a deployment to a target replica count. +type ScaleDeploymentAction struct { + clients *K8sClientFactory + OriginalReplicas map[string]int32 +} + +func NewScaleDeploymentAction(clients ...*K8sClientFactory) *ScaleDeploymentAction { + var clientFactory *K8sClientFactory + if len(clients) > 0 { + clientFactory = clients[0] + } + return &ScaleDeploymentAction{ + clients: clientFactory, + OriginalReplicas: make(map[string]int32), + } +} + +type ScaleConfig struct { + Replicas int32 `json:"replicas"` +} + +func (a *ScaleDeploymentAction) Execute(ctx context.Context, namespace, target string, config json.RawMessage) error { + var conf ScaleConfig + if err := json.Unmarshal(config, &conf); err != nil { + return fmt.Errorf("invalid config for scale action: %w", err) + } + + clientset, err := getK8sClient(a.clients) + if err != nil { + return err + } + + deploymentsClient := clientset.AppsV1().Deployments(namespace) + + err = retry.RetryOnConflict(retry.DefaultRetry, func() error { + result, getErr := deploymentsClient.Get(ctx, target, metav1.GetOptions{}) + if getErr != nil { + return fmt.Errorf("failed to get deployment: %w", getErr) + } + + // Save original replicas if not already saved + key := fmt.Sprintf("%s/%s", namespace, target) + if _, exists := a.OriginalReplicas[key]; !exists { + if result.Spec.Replicas != nil { + a.OriginalReplicas[key] = *result.Spec.Replicas + } else { + a.OriginalReplicas[key] = 1 + } + } + + result.Spec.Replicas = &conf.Replicas + _, updateErr := deploymentsClient.Update(ctx, result, metav1.UpdateOptions{}) + return updateErr + }) + + if err != nil { + return fmt.Errorf("failed to scale deployment: %w", err) + } + return nil +} + +func (a *ScaleDeploymentAction) Rollback(ctx context.Context, namespace, target string, config json.RawMessage) error { + clientset, err := getK8sClient(a.clients) + if err != nil { + return err + } + + key := fmt.Sprintf("%s/%s", namespace, target) + originalReplicas, exists := a.OriginalReplicas[key] + if !exists { + originalReplicas = 1 + } + + deploymentsClient := clientset.AppsV1().Deployments(namespace) + + err = retry.RetryOnConflict(retry.DefaultRetry, func() error { + result, getErr := deploymentsClient.Get(ctx, target, metav1.GetOptions{}) + if getErr != nil { + return fmt.Errorf("failed to get deployment: %w", getErr) + } + + result.Spec.Replicas = &originalReplicas + _, updateErr := deploymentsClient.Update(ctx, result, metav1.UpdateOptions{}) + return updateErr + }) + + if err != nil { + return fmt.Errorf("failed to rollback scale deployment: %w", err) + } + delete(a.OriginalReplicas, key) + return nil +} + +// NetworkPolicyAction handles simulating a network cut via K8s NetworkPolicy. +type NetworkPolicyAction struct { + clients *K8sClientFactory + restoreSnapshots map[string]networkPolicyRestoreSnapshot +} + +func NewNetworkPolicyAction(clients ...*K8sClientFactory) *NetworkPolicyAction { + var clientFactory *K8sClientFactory + if len(clients) > 0 { + clientFactory = clients[0] + } + return &NetworkPolicyAction{ + clients: clientFactory, + restoreSnapshots: make(map[string]networkPolicyRestoreSnapshot), + } +} + +func (a *NetworkPolicyAction) Execute(ctx context.Context, namespace, target string, config json.RawMessage) error { + clientset, err := getK8sClient(a.clients) + if err != nil { + return err + } + + policyName := fmt.Sprintf("drill-deny-%s", target) + stateKey := fmt.Sprintf("%s/%s", namespace, target) + + preSnapshotNames, err := listNetworkPolicyNames(ctx, clientset, namespace) + if err != nil { + return fmt.Errorf("failed to capture pre network policy snapshot: %w", err) + } + if containsString(preSnapshotNames, policyName) { + return fmt.Errorf("network cut blocked: policy %q already exists before drill (pre-snapshot count=%d)", policyName, len(preSnapshotNames)) + } + + policy := &v1.NetworkPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: policyName, + Namespace: namespace, + Labels: map[string]string{ + "drill-director": "active", + }, + }, + Spec: v1.NetworkPolicySpec{ + PodSelector: metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": target, + }, + }, + PolicyTypes: []v1.PolicyType{v1.PolicyTypeIngress, v1.PolicyTypeEgress}, + }, + } + + _, err = clientset.NetworkingV1().NetworkPolicies(namespace).Create(ctx, policy, metav1.CreateOptions{}) + if err != nil { + if apierrors.IsAlreadyExists(err) { + return fmt.Errorf("network cut blocked: policy %q appeared during action execution", policyName) + } + return fmt.Errorf("failed to create deny network policy: %w", err) + } + + createdPolicy, err := clientset.NetworkingV1().NetworkPolicies(namespace).Get(ctx, policyName, metav1.GetOptions{}) + if err != nil { + _ = clientset.NetworkingV1().NetworkPolicies(namespace).Delete(context.Background(), policyName, metav1.DeleteOptions{}) + return fmt.Errorf("network cut create verification failed for %q: %w", policyName, err) + } + if createdPolicy.Labels["drill-director"] != "active" { + _ = clientset.NetworkingV1().NetworkPolicies(namespace).Delete(context.Background(), policyName, metav1.DeleteOptions{}) + return fmt.Errorf("network cut create verification failed for %q: expected drill-director=active label", policyName) + } + + postCreateSnapshotNames, err := listNetworkPolicyNames(ctx, clientset, namespace) + if err != nil { + _ = clientset.NetworkingV1().NetworkPolicies(namespace).Delete(context.Background(), policyName, metav1.DeleteOptions{}) + return fmt.Errorf("failed to capture post-create network policy snapshot: %w", err) + } + if !containsString(postCreateSnapshotNames, policyName) { + _ = clientset.NetworkingV1().NetworkPolicies(namespace).Delete(context.Background(), policyName, metav1.DeleteOptions{}) + return fmt.Errorf("network cut create verification failed for %q: policy missing from post-create snapshot", policyName) + } + + a.restoreSnapshots[stateKey] = networkPolicyRestoreSnapshot{ + PolicyName: policyName, + PreNames: preSnapshotNames, + } + return nil +} + +func (a *NetworkPolicyAction) Rollback(ctx context.Context, namespace, target string, config json.RawMessage) error { + clientset, err := getK8sClient(a.clients) + if err != nil { + return err + } + + stateKey := fmt.Sprintf("%s/%s", namespace, target) + policyName := fmt.Sprintf("drill-deny-%s", target) + snapshot, hasSnapshot := a.restoreSnapshots[stateKey] + if !hasSnapshot { + _, getErr := clientset.NetworkingV1().NetworkPolicies(namespace).Get(ctx, policyName, metav1.GetOptions{}) + if getErr == nil { + return fmt.Errorf("cannot safely rollback network cut: missing pre-snapshot state and policy %q still exists", policyName) + } + if !apierrors.IsNotFound(getErr) { + return fmt.Errorf("failed to verify network policy state without snapshot for %q: %w", policyName, getErr) + } + return nil + } + + err = clientset.NetworkingV1().NetworkPolicies(namespace).Delete(ctx, policyName, metav1.DeleteOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + return fmt.Errorf("failed to delete deny network policy: %w", err) + } + + _, getErr := clientset.NetworkingV1().NetworkPolicies(namespace).Get(ctx, policyName, metav1.GetOptions{}) + if getErr == nil { + return fmt.Errorf("network cut restore verification failed: policy %q still exists after rollback", policyName) + } + if !apierrors.IsNotFound(getErr) { + return fmt.Errorf("network cut restore verification failed while reading %q after rollback: %w", policyName, getErr) + } + + postRollbackNames, err := listNetworkPolicyNames(ctx, clientset, namespace) + if err != nil { + return fmt.Errorf("failed to capture post-rollback network policy snapshot: %w", err) + } + + if !stringSlicesEqual(snapshot.PreNames, postRollbackNames) { + missing, extra := diffStringSets(snapshot.PreNames, postRollbackNames) + return fmt.Errorf( + "network cut restore verification failed for %q: pre_count=%d post_count=%d missing=%v extra=%v", + policyName, + len(snapshot.PreNames), + len(postRollbackNames), + truncateStrings(missing, 6), + truncateStrings(extra, 6), + ) + } + + delete(a.restoreSnapshots, stateKey) + return nil +} + +type networkPolicyRestoreSnapshot struct { + PolicyName string + PreNames []string +} + +type TargetedLoadActionOptions struct { + DeploymentName string + ContainerName string + RateEnvName string + UsersEnvName string +} + +func DefaultTargetedLoadActionOptions() TargetedLoadActionOptions { + return TargetedLoadActionOptions{ + DeploymentName: "loadgenerator", + ContainerName: "main", + RateEnvName: "RATE", + UsersEnvName: "USERS", + } +} + +func (o TargetedLoadActionOptions) withDefaults() TargetedLoadActionOptions { + d := DefaultTargetedLoadActionOptions() + if o.DeploymentName != "" { + d.DeploymentName = o.DeploymentName + } + if o.ContainerName != "" { + d.ContainerName = o.ContainerName + } + if o.RateEnvName != "" { + d.RateEnvName = o.RateEnvName + } + if o.UsersEnvName != "" { + d.UsersEnvName = o.UsersEnvName + } + return d +} + +type TargetedLoadConfig struct { + RPS int `json:"rps"` + Rate int `json:"rate,omitempty"` + Users int `json:"users,omitempty"` +} + +type envVarSnapshot struct { + Exists bool + Value string +} + +type targetedLoadOriginalState struct { + Rate envVarSnapshot + Users envVarSnapshot +} + +type TargetedLoadAction struct { + clients *K8sClientFactory + opts TargetedLoadActionOptions + originals map[string]targetedLoadOriginalState +} + +func NewTargetedLoadAction(opts TargetedLoadActionOptions, clients ...*K8sClientFactory) *TargetedLoadAction { + var clientFactory *K8sClientFactory + if len(clients) > 0 { + clientFactory = clients[0] + } + return &TargetedLoadAction{ + clients: clientFactory, + opts: opts.withDefaults(), + originals: make(map[string]targetedLoadOriginalState), + } +} + +func (a *TargetedLoadAction) Execute(ctx context.Context, namespace, target string, config json.RawMessage) error { + var conf TargetedLoadConfig + if err := json.Unmarshal(config, &conf); err != nil { + return fmt.Errorf("invalid config for targeted load action: %w", err) + } + + desiredRate := conf.Rate + if desiredRate <= 0 { + desiredRate = conf.RPS + } + if desiredRate <= 0 { + return fmt.Errorf("invalid targeted load rate: expected positive rps/rate") + } + + clientset, err := getK8sClient(a.clients) + if err != nil { + return err + } + + deploymentsClient := clientset.AppsV1().Deployments(namespace) + stateKey := fmt.Sprintf("%s/%s", namespace, a.opts.DeploymentName) + + err = retry.RetryOnConflict(retry.DefaultRetry, func() error { + deployment, getErr := deploymentsClient.Get(ctx, a.opts.DeploymentName, metav1.GetOptions{}) + if getErr != nil { + return fmt.Errorf("failed to get load generator deployment %q: %w", a.opts.DeploymentName, getErr) + } + + containerIdx, findErr := findContainerIndex(deployment, a.opts.ContainerName) + if findErr != nil { + return findErr + } + + container := &deployment.Spec.Template.Spec.Containers[containerIdx] + if _, exists := a.originals[stateKey]; !exists { + rateValue, rateExists := getEnvVar(container.Env, a.opts.RateEnvName) + usersValue, usersExists := getEnvVar(container.Env, a.opts.UsersEnvName) + a.originals[stateKey] = targetedLoadOriginalState{ + Rate: envVarSnapshot{Exists: rateExists, Value: rateValue}, + Users: envVarSnapshot{Exists: usersExists, Value: usersValue}, + } + } + + container.Env = setEnvVar(container.Env, a.opts.RateEnvName, strconv.Itoa(desiredRate)) + if conf.Users > 0 { + container.Env = setEnvVar(container.Env, a.opts.UsersEnvName, strconv.Itoa(conf.Users)) + } + + _, updateErr := deploymentsClient.Update(ctx, deployment, metav1.UpdateOptions{}) + return updateErr + }) + if err != nil { + return fmt.Errorf("failed to apply targeted load action: %w", err) + } + return nil +} + +func (a *TargetedLoadAction) Rollback(ctx context.Context, namespace, target string, config json.RawMessage) error { + stateKey := fmt.Sprintf("%s/%s", namespace, a.opts.DeploymentName) + original, exists := a.originals[stateKey] + if !exists { + return nil + } + + clientset, err := getK8sClient(a.clients) + if err != nil { + return err + } + + deploymentsClient := clientset.AppsV1().Deployments(namespace) + err = retry.RetryOnConflict(retry.DefaultRetry, func() error { + deployment, getErr := deploymentsClient.Get(ctx, a.opts.DeploymentName, metav1.GetOptions{}) + if getErr != nil { + return fmt.Errorf("failed to get load generator deployment %q for rollback: %w", a.opts.DeploymentName, getErr) + } + + containerIdx, findErr := findContainerIndex(deployment, a.opts.ContainerName) + if findErr != nil { + return findErr + } + + container := &deployment.Spec.Template.Spec.Containers[containerIdx] + container.Env = restoreEnvVar(container.Env, a.opts.RateEnvName, original.Rate) + container.Env = restoreEnvVar(container.Env, a.opts.UsersEnvName, original.Users) + + _, updateErr := deploymentsClient.Update(ctx, deployment, metav1.UpdateOptions{}) + return updateErr + }) + if err != nil { + return fmt.Errorf("failed to rollback targeted load action: %w", err) + } + + delete(a.originals, stateKey) + return nil +} + +type MockAction struct { + Message string +} + +func NewMockAction(msg string) *MockAction { + return &MockAction{Message: msg} +} + +func (m *MockAction) Execute(ctx context.Context, namespace, target string, config json.RawMessage) error { + return nil +} + +func (m *MockAction) Rollback(ctx context.Context, namespace, target string, config json.RawMessage) error { + return nil +} + +func getK8sClient(factory *K8sClientFactory) (*kubernetes.Clientset, error) { + if factory == nil { + factory = NewK8sClientFactory(K8sClientOptions{}) + } + return factory.Clientset() +} + +// MigrateServiceAction migrates a service's pods to a specific target node. +// It uses nodeSelector patching + scale-down/up to force pod rescheduling. +type MigrateServiceAction struct { + clients *K8sClientFactory + OriginalReplicas map[string]int32 + OriginalSelector map[string]map[string]string // saved nodeSelector for rollback +} + +func NewMigrateServiceAction(clients ...*K8sClientFactory) *MigrateServiceAction { + var clientFactory *K8sClientFactory + if len(clients) > 0 { + clientFactory = clients[0] + } + return &MigrateServiceAction{ + clients: clientFactory, + OriginalReplicas: make(map[string]int32), + OriginalSelector: make(map[string]map[string]string), + } +} + +type MigrateConfig struct { + TargetNode string `json:"targetNode"` + Replicas int32 `json:"replicas,omitempty"` // if 0, preserves current replica count +} + +func (a *MigrateServiceAction) Execute(ctx context.Context, namespace, target string, config json.RawMessage) error { + var conf MigrateConfig + if err := json.Unmarshal(config, &conf); err != nil { + return fmt.Errorf("invalid config for migrate action: %w", err) + } + if conf.TargetNode == "" { + return fmt.Errorf("targetNode is required for migration") + } + + clientset, err := getK8sClient(a.clients) + if err != nil { + return err + } + + key := fmt.Sprintf("%s/%s", namespace, target) + if err := a.saveOriginalState(ctx, clientset, namespace, target, key); err != nil { + return err + } + if err := a.patchAndScaleDown(ctx, clientset, namespace, target, conf.TargetNode, key); err != nil { + return err + } + if err := a.waitForPodsTerminated(ctx, clientset, namespace, target); err != nil { + return err + } + return a.scaleUpOnTarget(ctx, clientset, namespace, target, conf.Replicas, key) +} + +func (a *MigrateServiceAction) saveOriginalState(ctx context.Context, clientset *kubernetes.Clientset, namespace, target, key string) error { + deployment, err := clientset.AppsV1().Deployments(namespace).Get(ctx, target, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("failed to get deployment for snapshot: %w", err) + } + if _, exists := a.OriginalReplicas[key]; !exists { + if deployment.Spec.Replicas != nil { + a.OriginalReplicas[key] = *deployment.Spec.Replicas + } else { + a.OriginalReplicas[key] = 1 + } + } + if _, exists := a.OriginalSelector[key]; !exists { + if deployment.Spec.Template.Spec.NodeSelector != nil { + orig := make(map[string]string) + for k, v := range deployment.Spec.Template.Spec.NodeSelector { + orig[k] = v + } + a.OriginalSelector[key] = orig + } else { + a.OriginalSelector[key] = nil + } + } + return nil +} + +func (a *MigrateServiceAction) patchAndScaleDown(ctx context.Context, clientset *kubernetes.Clientset, namespace, target, targetNode, key string) error { + deploymentsClient := clientset.AppsV1().Deployments(namespace) + return retry.RetryOnConflict(retry.DefaultRetry, func() error { + deployment, getErr := deploymentsClient.Get(ctx, target, metav1.GetOptions{}) + if getErr != nil { + return fmt.Errorf("failed to get deployment for migration: %w", getErr) + } + deployment.Spec.Template.Spec.NodeSelector = map[string]string{ + "kubernetes.io/hostname": targetNode, + } + zero := int32(0) + deployment.Spec.Replicas = &zero + _, updateErr := deploymentsClient.Update(ctx, deployment, metav1.UpdateOptions{}) + return updateErr + }) +} + +func (a *MigrateServiceAction) waitForPodsTerminated(ctx context.Context, clientset *kubernetes.Clientset, namespace, target string) error { + for i := 0; i < 30; i++ { + pods, listErr := clientset.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{ + LabelSelector: fmt.Sprintf("app=%s", target), + }) + if listErr != nil { + return nil // best-effort wait + } + if len(pods.Items) == 0 { + return nil + } + select { + case <-ctx.Done(): + return fmt.Errorf("context cancelled while waiting for pods to terminate") + case <-time.After(1 * time.Second): + } + } + return nil +} + +func (a *MigrateServiceAction) scaleUpOnTarget(ctx context.Context, clientset *kubernetes.Clientset, namespace, target string, replicas int32, key string) error { + desired := replicas + if desired <= 0 { + desired = a.OriginalReplicas[key] + } + deploymentsClient := clientset.AppsV1().Deployments(namespace) + return retry.RetryOnConflict(retry.DefaultRetry, func() error { + deployment, getErr := deploymentsClient.Get(ctx, target, metav1.GetOptions{}) + if getErr != nil { + return fmt.Errorf("failed to get deployment for scale up: %w", getErr) + } + deployment.Spec.Replicas = &desired + _, updateErr := deploymentsClient.Update(ctx, deployment, metav1.UpdateOptions{}) + return updateErr + }) +} + +func (a *MigrateServiceAction) Rollback(ctx context.Context, namespace, target string, config json.RawMessage) error { + clientset, err := getK8sClient(a.clients) + if err != nil { + return err + } + + key := fmt.Sprintf("%s/%s", namespace, target) + deploymentsClient := clientset.AppsV1().Deployments(namespace) + + err = retry.RetryOnConflict(retry.DefaultRetry, func() error { + deployment, getErr := deploymentsClient.Get(ctx, target, metav1.GetOptions{}) + if getErr != nil { + return fmt.Errorf("failed to get deployment for rollback: %w", getErr) + } + + // Restore original nodeSelector + if origSelector, exists := a.OriginalSelector[key]; exists { + deployment.Spec.Template.Spec.NodeSelector = origSelector + } else { + deployment.Spec.Template.Spec.NodeSelector = nil + } + + // Restore original replicas + origReplicas := int32(1) + if r, exists := a.OriginalReplicas[key]; exists { + origReplicas = r + } + deployment.Spec.Replicas = &origReplicas + + _, updateErr := deploymentsClient.Update(ctx, deployment, metav1.UpdateOptions{}) + return updateErr + }) + if err != nil { + return fmt.Errorf("failed to rollback migration: %w", err) + } + + delete(a.OriginalReplicas, key) + delete(a.OriginalSelector, key) + return nil +} + +func findContainerIndex(deployment *appsv1.Deployment, preferredName string) (int, error) { + containers := deployment.Spec.Template.Spec.Containers + if len(containers) == 0 { + return 0, fmt.Errorf("deployment %s/%s has no containers", deployment.Namespace, deployment.Name) + } + if preferredName == "" { + return 0, nil + } + for i, container := range containers { + if container.Name == preferredName { + return i, nil + } + } + if len(containers) == 1 { + return 0, nil + } + return 0, fmt.Errorf("container %q not found in deployment %s/%s", preferredName, deployment.Namespace, deployment.Name) +} + +func getEnvVar(envs []corev1.EnvVar, name string) (string, bool) { + for _, env := range envs { + if env.Name == name { + return env.Value, true + } + } + return "", false +} + +func setEnvVar(envs []corev1.EnvVar, name, value string) []corev1.EnvVar { + for i := range envs { + if envs[i].Name == name { + envs[i].Value = value + envs[i].ValueFrom = nil + return envs + } + } + return append(envs, corev1.EnvVar{Name: name, Value: value}) +} + +func restoreEnvVar(envs []corev1.EnvVar, name string, snapshot envVarSnapshot) []corev1.EnvVar { + if snapshot.Exists { + return setEnvVar(envs, name, snapshot.Value) + } + filtered := envs[:0] + for _, env := range envs { + if env.Name != name { + filtered = append(filtered, env) + } + } + return filtered +} + +func listNetworkPolicyNames(ctx context.Context, clientset *kubernetes.Clientset, namespace string) ([]string, error) { + list, err := clientset.NetworkingV1().NetworkPolicies(namespace).List(ctx, metav1.ListOptions{}) + if err != nil { + return nil, err + } + names := make([]string, 0, len(list.Items)) + for _, item := range list.Items { + names = append(names, item.Name) + } + sort.Strings(names) + return names, nil +} + +func containsString(items []string, needle string) bool { + for _, item := range items { + if item == needle { + return true + } + } + return false +} + +func stringSlicesEqual(a, b []string) bool { + if len(a) != len(b) { + return false + } + for i := range a { + if a[i] != b[i] { + return false + } + } + return true +} + +func diffStringSets(expected, actual []string) (missing []string, extra []string) { + expectedSet := make(map[string]struct{}, len(expected)) + actualSet := make(map[string]struct{}, len(actual)) + + for _, v := range expected { + expectedSet[v] = struct{}{} + } + for _, v := range actual { + actualSet[v] = struct{}{} + } + for _, v := range expected { + if _, ok := actualSet[v]; !ok { + missing = append(missing, v) + } + } + for _, v := range actual { + if _, ok := expectedSet[v]; !ok { + extra = append(extra, v) + } + } + sort.Strings(missing) + sort.Strings(extra) + return missing, extra +} + +func truncateStrings(items []string, limit int) []string { + if limit <= 0 || len(items) <= limit { + return items + } + out := make([]string, 0, limit+1) + out = append(out, items[:limit]...) + out = append(out, fmt.Sprintf("...+%d more", len(items)-limit)) + return out +} diff --git a/pkg/drills/engine.go b/pkg/drills/engine.go new file mode 100644 index 0000000..e04694e --- /dev/null +++ b/pkg/drills/engine.go @@ -0,0 +1,575 @@ +package drills + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "strings" + "sync" + "time" + + "predictive-analysis-engine/pkg/clients/graph" + "predictive-analysis-engine/pkg/clients/telemetry" + "predictive-analysis-engine/pkg/storage" + + "github.com/google/uuid" +) + +const ( + StatusPlanned = "Planned" + StatusRunning = "Running" + StatusObserving = "Observing" + StatusAwaitingRecovery = "AwaitingRecovery" + StatusRecovering = "Recovering" + StatusCompleted = "Completed" + StatusAccepted = "Accepted" + StatusAborted = "Aborted" + StatusFailed = "Failed" + + recoveryModeManualWithFailsafe = "manual_with_failsafe" + recoveryFailsafeTimeout = 5 * time.Minute +) + +var ( + ErrDrillNotActive = errors.New("drill is not actively running") + ErrDrillNotRecoverable = errors.New("drill is not awaiting recovery") +) + +type recoveryTrigger struct { + Source string + MarkAborted bool + SkipRollback bool +} + +type DrillRuntimeState struct { + CanRecover bool `json:"canRecover,omitempty"` + RecoveryDeadline *string `json:"recoveryDeadline,omitempty"` + RecoveryMode string `json:"recoveryMode,omitempty"` + RecoverySource string `json:"recoverySource,omitempty"` +} + +type drillSession struct { + runID string + cancel context.CancelFunc + recoverCh chan recoveryTrigger + + mu sync.Mutex + action Action + namespace string + awaitingRecovery bool + recoveryStarted bool + recoveryDeadline time.Time + recoverySource string + recoveryAborted bool +} + +func newDrillSession(runID string, cancel context.CancelFunc) *drillSession { + return &drillSession{ + runID: runID, + cancel: cancel, + recoverCh: make(chan recoveryTrigger, 1), + } +} + +func (s *drillSession) setActionContext(action Action, namespace string) { + s.mu.Lock() + defer s.mu.Unlock() + s.action = action + s.namespace = namespace +} + +func (s *drillSession) beginAwaitingRecovery(deadline time.Time) { + s.mu.Lock() + defer s.mu.Unlock() + s.awaitingRecovery = true + s.recoveryStarted = false + s.recoveryDeadline = deadline.UTC() +} + +func (s *drillSession) requestRecovery(trigger recoveryTrigger) (bool, error) { + s.mu.Lock() + defer s.mu.Unlock() + + if s.recoveryStarted { + return false, nil + } + if !s.awaitingRecovery { + return false, ErrDrillNotRecoverable + } + + s.awaitingRecovery = false + s.recoveryStarted = true + s.recoverySource = trigger.Source + s.recoveryAborted = trigger.MarkAborted + + select { + case s.recoverCh <- trigger: + default: + } + + return true, nil +} + +func (s *drillSession) beginFailsafeRecovery() recoveryTrigger { + s.mu.Lock() + defer s.mu.Unlock() + + if s.recoveryStarted { + return recoveryTrigger{Source: s.recoverySource, MarkAborted: s.recoveryAborted} + } + + s.awaitingRecovery = false + s.recoveryStarted = true + s.recoverySource = "failsafe" + s.recoveryAborted = false + return recoveryTrigger{Source: "failsafe"} +} + +func (s *drillSession) runtimeState() DrillRuntimeState { + s.mu.Lock() + defer s.mu.Unlock() + + state := DrillRuntimeState{ + RecoveryMode: recoveryModeManualWithFailsafe, + RecoverySource: s.recoverySource, + } + + if s.awaitingRecovery { + state.CanRecover = true + deadline := s.recoveryDeadline.UTC().Format(time.RFC3339) + state.RecoveryDeadline = &deadline + } + + return state +} + +type Engine struct { + store *storage.DecisionStore + graphClient *graph.Client + telemetryClient *telemetry.TelemetryClient + k8sClients *K8sClientFactory + actionFactories map[string]func() Action + active sync.Map // maps runID -> *drillSession +} + +type EngineOptions struct { + K8sClientOptions K8sClientOptions + TargetedLoad TargetedLoadActionOptions +} + +func NewEngine(store *storage.DecisionStore, graphClient *graph.Client, telemetryClient *telemetry.TelemetryClient, options ...EngineOptions) *Engine { + var opts EngineOptions + if len(options) > 0 { + opts = options[0] + } + + k8sClients := NewK8sClientFactory(opts.K8sClientOptions) + + e := &Engine{ + store: store, + graphClient: graphClient, + telemetryClient: telemetryClient, + k8sClients: k8sClients, + actionFactories: make(map[string]func() Action), + } + e.actionFactories["ServiceShutdown"] = func() Action { return NewScaleDeploymentAction(k8sClients) } + e.actionFactories["ServiceBrownout"] = func() Action { return NewScaleDeploymentAction(k8sClients) } + e.actionFactories["ScaleStress"] = func() Action { return NewScaleDeploymentAction(k8sClients) } + e.actionFactories["PodScaleUp"] = func() Action { return NewScaleDeploymentAction(k8sClients) } + e.actionFactories["PodScaleDown"] = func() Action { return NewScaleDeploymentAction(k8sClients) } + e.actionFactories["NetworkCut"] = func() Action { return NewNetworkPolicyAction(k8sClients) } + e.actionFactories["ExtendedNetworkCut"] = func() Action { return NewNetworkPolicyAction(k8sClients) } + e.actionFactories["TargetedLoad"] = func() Action { return NewTargetedLoadAction(opts.TargetedLoad, k8sClients) } + e.actionFactories["TrafficSpike"] = func() Action { return NewTargetedLoadAction(opts.TargetedLoad, k8sClients) } + e.actionFactories["MigrateService"] = func() Action { return NewMigrateServiceAction(k8sClients) } + return e +} + +type RunConfig struct { + Namespace string `json:"namespace"` + ObserveTokens int `json:"observeTokens"` // seconds to observe before recovery gate +} + +func (e *Engine) PlanDrill(drillType, target string, config json.RawMessage) (*storage.DrillRun, error) { + id := uuid.New().String() + + run := storage.DrillRun{ + ID: id, + Type: drillType, + Target: target, + Status: StatusPlanned, + StartTime: time.Now().UTC().Format(time.RFC3339), + Config: config, + Verdict: "Pending", + } + + if err := e.store.InsertDrillRun(run); err != nil { + return nil, fmt.Errorf("failed to save planned drill: %w", err) + } + + return &run, nil +} + +func (e *Engine) ExecuteDrill(runID string) error { + run, err := e.store.GetDrillRun(runID) + if err != nil || run == nil { + return fmt.Errorf("run not found or error: %w", err) + } + + if err := e.preflightExecuteDrill(run); err != nil { + e.failRun(run, "Validate", err.Error()) + return err + } + + ctx, cancel := context.WithCancel(context.Background()) + session := newDrillSession(runID, cancel) + e.active.Store(runID, session) + + go e.runStateMachine(ctx, run, session) + + return nil +} + +func (e *Engine) preflightExecuteDrill(run *storage.DrillRun) error { + if run == nil { + return fmt.Errorf("drill preflight failed: nil run") + } + if e.k8sClients == nil { + // Use default resolution path so preflight still validates env/kubeconfig state. + e.k8sClients = NewK8sClientFactory(K8sClientOptions{}) + } + + parsedConfig, namespace, targetName, err := e.parseRunConfigAndTarget(run) + if err != nil { + return fmt.Errorf("drill preflight failed: invalid run configuration: %w", err) + } + + _ = parsedConfig // reserved for future preflight checks + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + switch run.Type { + case "ServiceShutdown", "ServiceBrownout", "ScaleStress", "MigrateService", "PodScaleUp", "PodScaleDown": + if targetName == "" { + return fmt.Errorf("drill preflight failed: empty deployment target for %s", run.Type) + } + return e.k8sClients.PreflightDeploymentAccess(ctx, namespace, targetName) + case "NetworkCut", "ExtendedNetworkCut", "TargetedLoad", "TrafficSpike": + return e.k8sClients.PreflightNamespaceAccess(ctx, namespace) + default: + // Unsupported drill types are handled by the state machine validation path. + return nil + } +} + +func (e *Engine) parseRunConfigAndTarget(run *storage.DrillRun) (RunConfig, string, string, error) { + var parsedConfig RunConfig + if run == nil { + return parsedConfig, "", "", fmt.Errorf("nil run") + } + + if err := json.Unmarshal(run.Config, &parsedConfig); err != nil { + return parsedConfig, "", "", err + } + + namespace := parsedConfig.Namespace + target := run.Target + if namespace == "" { + parts := strings.Split(run.Target, "/") + if len(parts) == 2 { + namespace = parts[0] + target = parts[1] + } else { + namespace = "default" + } + } + + return parsedConfig, namespace, target, nil +} + +func (e *Engine) runStateMachine(ctx context.Context, run *storage.DrillRun, session *drillSession) { + defer e.active.Delete(run.ID) + + e.updateStatus(run, StatusRunning) + e.logStep(run.ID, "Validate", "Starting drill validation", "Ok") + + actionFactory, exists := e.actionFactories[run.Type] + if !exists { + e.failRun(run, "Validate", "Unsupported drill type") + return + } + action := actionFactory() + + parsedConfig, namespace, targetName, err := e.parseRunConfigAndTarget(run) + if err != nil { + e.failRun(run, "Validate", "Invalid configuration format") + return + } + run.Target = targetName + session.setActionContext(action, namespace) + + // 1. Warmup Snapshot + e.logStep(run.ID, "Warmup", "Capturing baseline metrics", "Ok") + if snapshot, err := e.captureSnapshot(ctx); err == nil { + run.PreSnapshot = snapshot + } else { + e.logStep(run.ID, "Warmup", fmt.Sprintf("Warning: Snapshot failed: %v", err), "Warn") + } + e.store.UpdateDrillRun(*run) + + // 2. Action + e.logStep(run.ID, "Action", fmt.Sprintf("Executing action: %s on %s", run.Type, run.Target), "Ok") + if err := action.Execute(ctx, namespace, run.Target, run.Config); err != nil { + e.failRun(run, "Action", fmt.Sprintf("Failed to execute action: %v", err)) + return + } + + e.updateStatus(run, StatusObserving) + + // 3. Observe Window + observeTime := time.Duration(parsedConfig.ObserveTokens) * time.Second + if observeTime <= 0 { + observeTime = 15 * time.Second + } + + e.logStep(run.ID, "Observation", fmt.Sprintf("Observing system for %v", observeTime), "Ok") + + recovery := recoveryTrigger{Source: "failsafe"} + select { + case <-ctx.Done(): + // Aborted early -> recover immediately. + e.logStep(run.ID, "Observation", "Drill aborted by user during observation", "Warn") + run.Verdict = "Aborted" + recovery = recoveryTrigger{Source: "abort", MarkAborted: true} + case <-time.After(observeTime): + run.Verdict = "Success" + recovery = e.awaitRecoveryAuthorization(ctx, run, session) + switch { + case recovery.MarkAborted: + run.Verdict = "Aborted" + case recovery.SkipRollback: + run.Verdict = "Accepted" + } + } + + // 4. Recovery + if recovery.SkipRollback { + // Operator accepted the current cluster state — skip rollback entirely. + e.updateStatus(run, StatusAccepted) + e.logStep(run.ID, "Recovery", "Operator accepted current cluster state; rollback skipped", "Ok") + } else { + e.updateStatus(run, StatusRecovering) + e.logStep(run.ID, "Recovery", e.recoveryInitiationMessage(recovery), "Ok") + + // Use background context for rollback just in case main ctx was cancelled. + if err := action.Rollback(context.Background(), namespace, run.Target, run.Config); err != nil { + e.logStep(run.ID, "Recovery", fmt.Sprintf("Rollback failed: %v", err), "Error") + run.Verdict = "Partial/Fail" + } else { + e.logStep(run.ID, "Recovery", "Rollback successful", "Ok") + } + } + + // 5. Post Snapshot + e.logStep(run.ID, "PostSnapshot", "Capturing final metrics", "Ok") + if snapshot, err := e.captureSnapshot(context.Background()); err == nil { + run.PostSnapshot = snapshot + } else { + e.logStep(run.ID, "PostSnapshot", fmt.Sprintf("Warning: Snapshot failed: %v", err), "Warn") + } + + // Finalize + endTime := time.Now().UTC().Format(time.RFC3339) + run.EndTime = &endTime + switch run.Verdict { + case "Aborted": + e.updateStatus(run, StatusAborted) + case "Accepted": + e.updateStatus(run, StatusAccepted) + default: + e.updateStatus(run, StatusCompleted) + } + e.logStep(run.ID, "Finalize", "Drill completed", "Ok") +} + +func (e *Engine) awaitRecoveryAuthorization(ctx context.Context, run *storage.DrillRun, session *drillSession) recoveryTrigger { + deadline := time.Now().UTC().Add(recoveryFailsafeTimeout) + session.beginAwaitingRecovery(deadline) + e.updateStatus(run, StatusAwaitingRecovery) + e.logStep(run.ID, "Recovery", fmt.Sprintf("Observation complete; awaiting operator recovery (failsafe in %s)", recoveryFailsafeTimeout), "Warn") + + timer := time.NewTimer(recoveryFailsafeTimeout) + defer timer.Stop() + + select { + case trigger := <-session.recoverCh: + return trigger + case <-timer.C: + return session.beginFailsafeRecovery() + case <-ctx.Done(): + return recoveryTrigger{Source: "abort", MarkAborted: true} + } +} + +func (e *Engine) recoveryInitiationMessage(trigger recoveryTrigger) string { + switch trigger.Source { + case "manual": + return "Initiating operator-approved rollback (source: manual)" + case "abort": + return "Emergency rollback initiated (source: abort)" + case "failsafe": + return "Failsafe timeout reached; initiating rollback (source: failsafe)" + case "accept": + return "Operator accepted current state; rollback skipped (source: accept)" + default: + return "Initiating rollback" + } +} + +func (e *Engine) captureSnapshot(ctx context.Context) (json.RawMessage, error) { + if e.graphClient == nil { + return nil, fmt.Errorf("graph client not initialized") + } + snapshot, err := e.graphClient.GetMetricsSnapshot(ctx) + if err != nil { + return nil, err + } + return json.Marshal(snapshot) +} + +func (e *Engine) AbortDrill(runID string) error { + raw, ok := e.active.Load(runID) + if !ok { + return fmt.Errorf("%w: %s", ErrDrillNotActive, runID) + } + session, ok := raw.(*drillSession) + if !ok || session == nil { + return fmt.Errorf("%w: %s", ErrDrillNotActive, runID) + } + + if queued, err := session.requestRecovery(recoveryTrigger{Source: "abort", MarkAborted: true}); err == nil && queued { + return nil + } else if err != nil && !errors.Is(err, ErrDrillNotRecoverable) { + return err + } + + session.cancel() + return nil +} + +func (e *Engine) AcceptDrill(runID string) error { + raw, ok := e.active.Load(runID) + if !ok { + return fmt.Errorf("%w: %s", ErrDrillNotRecoverable, runID) + } + session, ok := raw.(*drillSession) + if !ok || session == nil { + return fmt.Errorf("%w: %s", ErrDrillNotRecoverable, runID) + } + + _, err := session.requestRecovery(recoveryTrigger{Source: "accept", SkipRollback: true}) + return err +} + +func (e *Engine) RecoverDrill(runID string) error { + raw, ok := e.active.Load(runID) + if !ok { + return fmt.Errorf("%w: %s", ErrDrillNotRecoverable, runID) + } + session, ok := raw.(*drillSession) + if !ok || session == nil { + return fmt.Errorf("%w: %s", ErrDrillNotRecoverable, runID) + } + + _, err := session.requestRecovery(recoveryTrigger{Source: "manual"}) + return err +} + +// K8sHealthResult holds the outcome of a Kubernetes connectivity probe. +type K8sHealthResult struct { + Reachable bool `json:"reachable"` + Host string `json:"host,omitempty"` + Version string `json:"version,omitempty"` + Error string `json:"error,omitempty"` + Hint string `json:"hint,omitempty"` +} + +// CheckK8sConnectivity performs a lightweight probe against the configured +// Kubernetes API server. It returns a structured result rather than an error +// so that callers (HTTP handlers) can always produce a JSON response. +func (e *Engine) CheckK8sConnectivity() K8sHealthResult { + if e.k8sClients == nil { + e.k8sClients = NewK8sClientFactory(K8sClientOptions{}) + } + + clientset, restCfg, err := e.k8sClients.clientsetWithConfig() + if err != nil { + return K8sHealthResult{ + Reachable: false, + Error: err.Error(), + Hint: "Unable to load Kubernetes client configuration. Ensure DRILLS_KUBECONFIG_PATH or KUBECONFIG is set, or that the analysis engine is running inside a cluster.", + } + } + + host := "" + if restCfg != nil { + host = restCfg.Host + } + + info, err := clientset.Discovery().ServerVersion() + if err != nil { + hint := "Kubernetes API server is unreachable." + if isLoopbackAPIHost(host) { + hint = "Detected loopback Kubernetes API endpoint (" + host + "). Start an SSH tunnel to your cluster on this port, or set DRILLS_KUBE_API_SERVER to a reachable cluster endpoint." + } + return K8sHealthResult{ + Reachable: false, + Host: host, + Error: err.Error(), + Hint: hint, + } + } + + return K8sHealthResult{ + Reachable: true, + Host: host, + Version: info.GitVersion, + } +} + +func (e *Engine) RuntimeState(runID string) *DrillRuntimeState { + raw, ok := e.active.Load(runID) + if !ok { + return nil + } + session, ok := raw.(*drillSession) + if !ok || session == nil { + return nil + } + state := session.runtimeState() + return &state +} + +func (e *Engine) updateStatus(run *storage.DrillRun, status string) { + run.Status = status + e.store.UpdateDrillRun(*run) +} + +func (e *Engine) logStep(runID, phase, message, status string) { + e.store.AddDrillStep(storage.DrillStep{ + RunID: runID, + Timestamp: time.Now().UTC().Format(time.RFC3339), + Phase: phase, + Message: message, + Status: status, + }) +} + +func (e *Engine) failRun(run *storage.DrillRun, phase, reason string) { + e.logStep(run.ID, phase, reason, "Error") + run.Verdict = "Failed" + endTime := time.Now().UTC().Format(time.RFC3339) + run.EndTime = &endTime + e.updateStatus(run, StatusFailed) +} diff --git a/pkg/storage/drills_store.go b/pkg/storage/drills_store.go new file mode 100644 index 0000000..f6cfcb6 --- /dev/null +++ b/pkg/storage/drills_store.go @@ -0,0 +1,194 @@ +package storage + +import ( + "database/sql" + "encoding/json" + "fmt" + "time" +) + +// DrillRun represents a saved drill execution sequence. +type DrillRun struct { + ID string `json:"id"` + Type string `json:"type"` + Target string `json:"target"` + Status string `json:"status"` + StartTime string `json:"startTime"` + EndTime *string `json:"endTime,omitempty"` + Config json.RawMessage `json:"config"` + PreSnapshot json.RawMessage `json:"preSnapshot,omitempty"` + PostSnapshot json.RawMessage `json:"postSnapshot,omitempty"` + Verdict string `json:"verdict"` + CreatedAt string `json:"createdAt"` + Timeline []DrillStep `json:"timeline"` +} + +// DrillStep is a single log entry or phase transition for a drill. +type DrillStep struct { + ID int64 `json:"id,omitempty"` + RunID string `json:"runId"` + Timestamp string `json:"timestamp"` + Phase string `json:"phase"` + Message string `json:"message"` + Status string `json:"status"` // "Ok", "Error" +} + +// InsertDrillRun creates a new record for a drill run. +func (s *DecisionStore) InsertDrillRun(run DrillRun) error { + configStr := "{}" + if run.Config != nil { + configStr = string(run.Config) + } + + query := ` + INSERT INTO drill_runs (id, type, target, status, start_time, config, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?) + ` + _, err := s.db.Exec(query, run.ID, run.Type, run.Target, run.Status, run.StartTime, configStr, time.Now().UTC().Format(time.RFC3339)) + if err != nil { + return fmt.Errorf("failed to insert drill run: %w", err) + } + return nil +} + +// UpdateDrillRun updates an existing drill run. +func (s *DecisionStore) UpdateDrillRun(run DrillRun) error { + configStr := "{}" + if run.Config != nil { + configStr = string(run.Config) + } + var preStr, postStr *string + + if run.PreSnapshot != nil { + str := string(run.PreSnapshot) + preStr = &str + } + if run.PostSnapshot != nil { + str := string(run.PostSnapshot) + postStr = &str + } + + query := ` + UPDATE drill_runs + SET status = ?, end_time = ?, config = ?, pre_snapshot = ?, post_snapshot = ?, verdict = ? + WHERE id = ? + ` + _, err := s.db.Exec(query, run.Status, run.EndTime, configStr, preStr, postStr, run.Verdict, run.ID) + if err != nil { + return fmt.Errorf("failed to update drill run: %w", err) + } + return nil +} + +// AddDrillStep logs a step in the drill timeline. +func (s *DecisionStore) AddDrillStep(step DrillStep) error { + query := ` + INSERT INTO drill_steps (run_id, timestamp, phase, message, status) + VALUES (?, ?, ?, ?, ?) + ` + _, err := s.db.Exec(query, step.RunID, step.Timestamp, step.Phase, step.Message, step.Status) + if err != nil { + return fmt.Errorf("failed to insert drill step: %w", err) + } + return nil +} + +// GetDrillRun retrieves a drill run with its timeline. +func (s *DecisionStore) GetDrillRun(id string) (*DrillRun, error) { + query := ` + SELECT id, type, target, status, start_time, end_time, config, pre_snapshot, post_snapshot, verdict, created_at + FROM drill_runs WHERE id = ? + ` + row := s.db.QueryRow(query, id) + + var run DrillRun + var configStr string + var preStr, postStr sql.NullString + var endTime sql.NullString + + var verdictStr sql.NullString + + err := row.Scan(&run.ID, &run.Type, &run.Target, &run.Status, &run.StartTime, &endTime, &configStr, &preStr, &postStr, &verdictStr, &run.CreatedAt) + if err != nil { + if err == sql.ErrNoRows { + return nil, nil + } + return nil, fmt.Errorf("failed to scan drill run: %w", err) + } + + if verdictStr.Valid { + run.Verdict = verdictStr.String + } + + if endTime.Valid { + run.EndTime = &endTime.String + } + if configStr != "" { + run.Config = json.RawMessage(configStr) + } + if preStr.Valid && preStr.String != "" { + run.PreSnapshot = json.RawMessage(preStr.String) + } + if postStr.Valid && postStr.String != "" { + run.PostSnapshot = json.RawMessage(postStr.String) + } + + // Fetch timeline + timelineQuery := `SELECT id, run_id, timestamp, phase, message, status FROM drill_steps WHERE run_id = ? ORDER BY timestamp ASC, id ASC` + rows, err := s.db.Query(timelineQuery, id) + if err != nil { + return nil, fmt.Errorf("failed to query drill steps: %w", err) + } + defer rows.Close() + + for rows.Next() { + var step DrillStep + if err := rows.Scan(&step.ID, &step.RunID, &step.Timestamp, &step.Phase, &step.Message, &step.Status); err != nil { + return nil, fmt.Errorf("failed to scan drill step: %w", err) + } + run.Timeline = append(run.Timeline, step) + } + + return &run, nil +} + +// ListDrillRuns retrieves recent drill runs. +func (s *DecisionStore) ListDrillRuns(limit int) ([]DrillRun, error) { + if limit <= 0 { + limit = 50 + } + + query := ` + SELECT id, type, target, status, start_time, end_time, config, verdict, created_at + FROM drill_runs + ORDER BY start_time DESC LIMIT ? + ` + rows, err := s.db.Query(query, limit) + if err != nil { + return nil, fmt.Errorf("failed to list drill runs: %w", err) + } + defer rows.Close() + + var runs []DrillRun + for rows.Next() { + var run DrillRun + var configStr string + var verdictStr sql.NullString + var endTime sql.NullString + + if err := rows.Scan(&run.ID, &run.Type, &run.Target, &run.Status, &run.StartTime, &endTime, &configStr, &verdictStr, &run.CreatedAt); err != nil { + return nil, fmt.Errorf("failed to scan drill run list: %w", err) + } + if verdictStr.Valid { + run.Verdict = verdictStr.String + } + if endTime.Valid { + run.EndTime = &endTime.String + } + if configStr != "" { + run.Config = json.RawMessage(configStr) + } + runs = append(runs, run) + } + return runs, nil +} diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 11af275..00ec5ac 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -77,6 +77,33 @@ func (s *DecisionStore) initSchema() error { CREATE INDEX IF NOT EXISTS idx_webhook_events_first_seen_at ON webhook_events(first_seen_at); CREATE INDEX IF NOT EXISTS idx_webhook_events_source ON webhook_events(source); + + CREATE TABLE IF NOT EXISTS drill_runs ( + id TEXT PRIMARY KEY, + type TEXT NOT NULL, + target TEXT NOT NULL, + status TEXT NOT NULL, + start_time TEXT NOT NULL, + end_time TEXT, + config TEXT NOT NULL, + pre_snapshot TEXT, + post_snapshot TEXT, + verdict TEXT, + created_at TEXT DEFAULT CURRENT_TIMESTAMP + ); + + CREATE TABLE IF NOT EXISTS drill_steps ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + run_id TEXT NOT NULL, + timestamp TEXT NOT NULL, + phase TEXT NOT NULL, + message TEXT NOT NULL, + status TEXT NOT NULL, + FOREIGN KEY (run_id) REFERENCES drill_runs(id) ON DELETE CASCADE + ); + + CREATE INDEX IF NOT EXISTS idx_drill_runs_status ON drill_runs(status); + CREATE INDEX IF NOT EXISTS idx_drill_steps_run_id ON drill_steps(run_id); ` _, err := s.db.Exec(schema) if err != nil {