From f7ec43c9d9a6365b70ec66d1265b774afa0cbade Mon Sep 17 00:00:00 2001 From: Manas Srivastava Date: Mon, 11 May 2026 13:28:27 +0530 Subject: [PATCH] feat(jobs): k8s-aware deploy + custom-domain reconcilers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two new background reconcilers wired into the River queue: - deploy_status_reconcile.go: polls the k8s deploy namespace for each active deployment and updates the platform DB status (building → deploying → healthy/failed) based on actual ReplicaSet rollout state. Replaces the previous "compute provider returns and we trust it" model that was reporting healthy before pods were Ready. - custom_domain_reconcile.go: checks pending custom-domain rows, creates/updates the corresponding Ingress + cert-manager Certificate resources, marks the row verified when the cert is Ready. Tolerates transient ACME failures. Adds k8s.io/client-go (and transitive deps) to go.mod. Jobs registered in internal/jobs/workers.go; expire.go gains a small change to skip deploys that the reconciler will catch. Co-Authored-By: Claude Opus 4.7 (1M context) --- go.mod | 31 ++ go.sum | 98 +++++ internal/jobs/custom_domain_reconcile.go | 450 +++++++++++++++++++++++ internal/jobs/deploy_status_reconcile.go | 421 +++++++++++++++++++++ internal/jobs/expire.go | 6 +- internal/jobs/workers.go | 40 +- main.go | 17 +- 7 files changed, 1060 insertions(+), 3 deletions(-) create mode 100644 internal/jobs/custom_domain_reconcile.go create mode 100644 internal/jobs/deploy_status_reconcile.go diff --git a/go.mod b/go.mod index f73b48d..31fea4a 100644 --- a/go.mod +++ b/go.mod @@ -21,6 +21,9 @@ require ( google.golang.org/grpc v1.79.3 instant.dev/common v0.0.0 instant.dev/proto v0.0.0 + k8s.io/api v0.32.2 + k8s.io/apimachinery v0.32.2 + k8s.io/client-go v0.32.2 ) require ( @@ -30,25 +33,40 @@ require ( github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/dustin/go-humanize v1.0.1 // indirect + github.com/emicklei/go-restful/v3 v3.11.0 // indirect + github.com/fxamacker/cbor/v2 v2.7.0 // indirect github.com/go-ini/ini v1.67.0 // indirect github.com/go-logr/logr v1.4.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-ole/go-ole v1.3.0 // indirect + github.com/go-openapi/jsonpointer v0.21.0 // indirect + github.com/go-openapi/jsonreference v0.21.0 // indirect + github.com/go-openapi/swag v0.23.0 // indirect github.com/goccy/go-json v0.10.5 // indirect + github.com/gogo/protobuf v1.3.2 // indirect github.com/golang-jwt/jwt/v4 v4.5.2 // indirect github.com/golang/protobuf v1.5.4 // indirect + github.com/google/gnostic-models v0.6.8 // indirect + github.com/google/go-cmp v0.7.0 // indirect + github.com/google/gofuzz v1.2.0 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.3 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect github.com/jackc/puddle/v2 v2.2.1 // indirect + github.com/josharian/intern v1.0.0 // indirect + github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/compress v1.18.0 // indirect github.com/klauspost/cpuid/v2 v2.2.10 // indirect github.com/lufia/plan9stats v0.0.0-20250317134145-8bc96cf8fc35 // indirect + github.com/mailru/easyjson v0.7.7 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/minio/md5-simd v1.1.2 // indirect github.com/minio/minio-go/v7 v7.0.90 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/philhofer/fwd v1.1.3-0.20240916144458-20a13a1f6b7c // indirect + github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55 // indirect github.com/prometheus/client_model v0.6.2 // indirect @@ -63,10 +81,12 @@ require ( github.com/secure-io/sio-go v0.3.1 // indirect github.com/shirou/gopsutil/v3 v3.24.5 // indirect github.com/shoenig/go-m1cpu v0.1.6 // indirect + github.com/spf13/pflag v1.0.5 // indirect github.com/stretchr/testify v1.11.1 // indirect github.com/tinylib/msgp v1.2.5 // indirect github.com/tklauser/go-sysconf v0.3.15 // indirect github.com/tklauser/numcpus v0.10.0 // indirect + github.com/x448/float16 v0.8.4 // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.39.0 // indirect @@ -76,13 +96,24 @@ require ( go.uber.org/goleak v1.3.0 // indirect golang.org/x/crypto v0.47.0 // indirect golang.org/x/net v0.49.0 // indirect + golang.org/x/oauth2 v0.34.0 // indirect golang.org/x/sync v0.19.0 // indirect golang.org/x/sys v0.40.0 // indirect + golang.org/x/term v0.39.0 // indirect golang.org/x/text v0.33.0 // indirect + golang.org/x/time v0.10.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20251202230838-ff82c1b0f217 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20260120221211-b8f7ae30c516 // indirect google.golang.org/protobuf v1.36.11 // indirect + gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect + gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect + k8s.io/klog/v2 v2.130.1 // indirect + k8s.io/kube-openapi v0.0.0-20241105132330-32ad38e42d3f // indirect + k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738 // indirect + sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3 // indirect + sigs.k8s.io/structured-merge-diff/v4 v4.4.2 // indirect + sigs.k8s.io/yaml v1.4.0 // indirect ) replace instant.dev/common => ../common diff --git a/go.sum b/go.sum index efc9bd8..297f68c 100644 --- a/go.sum +++ b/go.sum @@ -11,12 +11,17 @@ github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F9 github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= +github.com/emicklei/go-restful/v3 v3.11.0 h1:rAQeMHw1c7zTmncogyy8VvRZwtkmkZ4FxERmMY4rD+g= +github.com/emicklei/go-restful/v3 v3.11.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= +github.com/fxamacker/cbor/v2 v2.7.0 h1:iM5WgngdRBanHcxugY4JySA0nk1wZorNOpTgCMedv5E= +github.com/fxamacker/cbor/v2 v2.7.0/go.mod h1:pxXPTn3joSm21Gbwsv0w9OSA2y1HFR9qXEeXQVeNoDQ= github.com/go-ini/ini v1.67.0 h1:z6ZrTEZqSWOTyH2FlglNbNgARyHG8oLW9gMELqKr06A= github.com/go-ini/ini v1.67.0/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= @@ -27,15 +32,33 @@ github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= github.com/go-ole/go-ole v1.3.0 h1:Dt6ye7+vXGIKZ7Xtk4s6/xVdGDQynvom7xCFEdWr6uE= github.com/go-ole/go-ole v1.3.0/go.mod h1:5LS6F96DhAwUc7C+1HLexzMXY1xGRSryjyPPKW6zv78= +github.com/go-openapi/jsonpointer v0.21.0 h1:YgdVicSA9vH5RiHs9TZW5oyafXZFc6+2Vc1rr/O9oNQ= +github.com/go-openapi/jsonpointer v0.21.0/go.mod h1:IUyH9l/+uyhIYQ/PXVA41Rexl+kOkAPDdXEYns6fzUY= +github.com/go-openapi/jsonreference v0.21.0 h1:Rs+Y7hSXT83Jacb7kFyjn4ijOuVGSvOdF2+tg1TRrwQ= +github.com/go-openapi/jsonreference v0.21.0/go.mod h1:LmZmgsrTkVg9LG4EaHeY8cBDslNPMo06cago5JNLkm4= +github.com/go-openapi/swag v0.23.0 h1:vsEVJDUo2hPJ2tu0/Xc+4noaxyEffXNIs3cOULZ+GrE= +github.com/go-openapi/swag v0.23.0/go.mod h1:esZ8ITTYEsH1V2trKHjAN8Ai7xHb8RV+YSZ577vPjgQ= +github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI= +github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= github.com/goccy/go-json v0.10.5 h1:Fq85nIqj+gXn/S5ahsiTlK3TmC85qgirsdTP/+DeaC4= github.com/goccy/go-json v0.10.5/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang-jwt/jwt/v4 v4.5.2 h1:YtQM7lnr8iZ+j5q71MGKkNw9Mn7AjHM68uc9g5fXeUI= github.com/golang-jwt/jwt/v4 v4.5.2/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/google/gnostic-models v0.6.8 h1:yo/ABAfM5IMRsS1VnXjTBvUb61tFIHozhlYvRgGre9I= +github.com/google/gnostic-models v0.6.8/go.mod h1:5n7qKqH0f5wFt+aWF8CW6pZLLNOfYuF5OpfBSENuI8U= +github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= +github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad h1:a6HEuzUHeKH6hwfN/ZoQgRgVIWFJljSWa/zetS2WTvg= +github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad/go.mod h1:vavhavw2zAxS5dIdcRluK6cSGGPlZynqzFM8NdvU144= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.3 h1:NmZ1PKzSTQbuGHw9DGPFomqkkLWMC+vZCkfs+FHv1Vg= @@ -50,6 +73,12 @@ github.com/jackc/pgx/v5 v5.6.0 h1:SWJzexBzPL5jb0GEsrPMLIsi/3jOo7RHlzTjcAeDrPY= github.com/jackc/pgx/v5 v5.6.0/go.mod h1:DNZ/vlrUnhWCoFGxHAG8U2ljioxukquj7utPDgtQdTw= github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk= github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= +github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kisielk/sqlstruct v0.0.0-20201105191214-5f3e10d3ab46/go.mod h1:yyMNCyc/Ib3bDTKd379tNMpB/7/H5TjM2Y9QJ5THLbE= github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= @@ -66,6 +95,8 @@ github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/lufia/plan9stats v0.0.0-20250317134145-8bc96cf8fc35 h1:PpXWgLPs+Fqr325bN2FD2ISlRRztXibcX6e8f5FR5Dc= github.com/lufia/plan9stats v0.0.0-20250317134145-8bc96cf8fc35/go.mod h1:autxFIvghDt3jPTLoqZ9OZ7s9qTGNAWmYCjVFWPX/zg= +github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= +github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= github.com/minio/madmin-go/v3 v3.0.110 h1:FIYekj7YPc430ffpXFWiUtyut3qBt/unIAcDzJn9H5M= @@ -74,10 +105,21 @@ github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34= github.com/minio/md5-simd v1.1.2/go.mod h1:MzdKDxYpY2BT9XQFocsiZf/NKVtR7nkE4RoEpN+20RM= github.com/minio/minio-go/v7 v7.0.90 h1:TmSj1083wtAD0kEYTx7a5pFsv3iRYMsOJ6A4crjA1lE= github.com/minio/minio-go/v7 v7.0.90/go.mod h1:uvMUcGrpgeSAAI6+sD3818508nUyMULw94j2Nxku/Go= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/onsi/ginkgo/v2 v2.21.0 h1:7rg/4f3rB88pb5obDgNZrNHrQ4e6WpjonchcpuBRnZM= +github.com/onsi/ginkgo/v2 v2.21.0/go.mod h1:7Du3c42kxCUegi0IImZ1wUQzMBVecgIHjR1C+NkhLQo= +github.com/onsi/gomega v1.35.1 h1:Cwbd75ZBPxFSuZ6T+rN/WCb/gOc6YgFBXLlZLhC7Ds4= +github.com/onsi/gomega v1.35.1/go.mod h1:PvZbdDc8J6XJEpDK4HCuRBm8a6Fzp9/DmhC9C7yFlog= github.com/philhofer/fwd v1.1.3-0.20240916144458-20a13a1f6b7c h1:dAMKvw0MlJT1GshSTtih8C2gDs04w8dReiOGXrGLNoY= github.com/philhofer/fwd v1.1.3-0.20240916144458-20a13a1f6b7c/go.mod h1:RqIHx9QI14HlwKwm98g9Re5prTQ6LdeRQn+gXJFxsJM= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= @@ -127,6 +169,8 @@ github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFt github.com/shoenig/go-m1cpu v0.1.6/go.mod h1:1JJMcUBvfNwpq05QDQVAnx3gUHr9IYF7GNg9SUEw2VQ= github.com/shoenig/test v0.6.4 h1:kVTaSd7WLz5WZ2IaoM0RSzRsUD+m8wRR+5qvntpn4LU= github.com/shoenig/test v0.6.4/go.mod h1:byHiCGXqrVaflBLAMq/srcZIHynQPQgeyvkvXnjqq0k= +github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= +github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= @@ -138,6 +182,10 @@ github.com/tklauser/go-sysconf v0.3.15 h1:VE89k0criAymJ/Os65CSn1IXaol+1wrsFHEB8O github.com/tklauser/go-sysconf v0.3.15/go.mod h1:Dmjwr6tYFIseJw7a3dRLJfsHAMXZ3nEnL/aZY+0IuI4= github.com/tklauser/numcpus v0.10.0 h1:18njr6LDBk1zuna922MgdjQuJFjrdppsZG60sHGfjso= github.com/tklauser/numcpus v0.10.0/go.mod h1:BiTKazU708GQTYF4mB+cmlpT2Is1gLk7XVuEeem8LsQ= +github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= +github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= +github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0= github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= @@ -163,27 +211,55 @@ go.opentelemetry.io/proto/otlp v1.9.0/go.mod h1:xE+Cx5E/eEHw+ISFkwPLwCZefwVjY+pq go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200302210943-78000ba7a073/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.47.0 h1:V6e3FRj+n4dbpw86FJ8Fv7XVOql7TEwpHapKoMJ/GO8= golang.org/x/crypto v0.47.0/go.mod h1:ff3Y9VzzKbwSSEzWqJsJVBnWmRwRSHt/6Op5n9bQc4A= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.49.0 h1:eeHFmOGUTtaaPSGNmjBKpbng9MulQsJURQUAfUwY++o= golang.org/x/net v0.49.0/go.mod h1:/ysNB2EvaqvesRkuLAyjI1ycPZlQHM3q01F02UY/MV8= +golang.org/x/oauth2 v0.34.0 h1:hqK/t4AKgbqWkdkcAeI8XLmbK+4m4G5YeQRrmiotGlw= +golang.org/x/oauth2 v0.34.0/go.mod h1:lzm5WQJQwKZ3nwavOZ3IS5Aulzxi68dUSgRHujetwEA= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200302150141-5c8b2ff67527/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.40.0 h1:DBZZqJ2Rkml6QMQsZywtnjnnGvHza6BTfYFWY9kjEWQ= golang.org/x/sys v0.40.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/term v0.39.0 h1:RclSuaJf32jOqZz74CkPA9qFuVTX7vhLlpfj/IGWlqY= +golang.org/x/term v0.39.0/go.mod h1:yxzUCTP/U+FzoxfdKmLaA0RV1WgE0VY7hXBwKtY/4ww= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.33.0 h1:B3njUFyqtHDUI5jMn1YIr5B0IE2U0qck04r6d4KPAxE= golang.org/x/text v0.33.0/go.mod h1:LuMebE6+rBincTi9+xWTY8TztLzKHc/9C1uBCG27+q8= +golang.org/x/time v0.10.0 h1:3usCWA8tQn0L8+hFJQNgzpWbd89begxN66o1Ojdn5L4= +golang.org/x/time v0.10.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/tools v0.40.0 h1:yLkxfA+Qnul4cs9QA3KnlFu0lVmd8JJfoq+E41uSutA= +golang.org/x/tools v0.40.0/go.mod h1:Ik/tzLRlbscWpqqMRjyWYDisX8bG13FrdXp3o4Sr9lc= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk= gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E= google.golang.org/genproto/googleapis/api v0.0.0-20251202230838-ff82c1b0f217 h1:fCvbg86sFXwdrl5LgVcTEvNC+2txB5mgROGmRL5mrls= @@ -197,6 +273,28 @@ google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/evanphx/json-patch.v4 v4.12.0 h1:n6jtcsulIzXPJaxegRbvFNNrZDjbij7ny3gmSPG+6V4= +gopkg.in/evanphx/json-patch.v4 v4.12.0/go.mod h1:p8EYWUEYMpynmqDbY58zCKCFZw8pRWMG4EsWvDvM72M= +gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= +gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +k8s.io/api v0.32.2 h1:bZrMLEkgizC24G9eViHGOPbW+aRo9duEISRIJKfdJuw= +k8s.io/api v0.32.2/go.mod h1:hKlhk4x1sJyYnHENsrdCWw31FEmCijNGPJO5WzHiJ6Y= +k8s.io/apimachinery v0.32.2 h1:yoQBR9ZGkA6Rgmhbp/yuT9/g+4lxtsGYwW6dR6BDPLQ= +k8s.io/apimachinery v0.32.2/go.mod h1:GpHVgxoKlTxClKcteaeuF1Ul/lDVb74KpZcxcmLDElE= +k8s.io/client-go v0.32.2 h1:4dYCD4Nz+9RApM2b/3BtVvBHw54QjMFUl1OLcJG5yOA= +k8s.io/client-go v0.32.2/go.mod h1:fpZ4oJXclZ3r2nDOv+Ux3XcJutfrwjKTCHz2H3sww94= +k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk= +k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= +k8s.io/kube-openapi v0.0.0-20241105132330-32ad38e42d3f h1:GA7//TjRY9yWGy1poLzYYJJ4JRdzg3+O6e8I+e+8T5Y= +k8s.io/kube-openapi v0.0.0-20241105132330-32ad38e42d3f/go.mod h1:R/HEjbvWI0qdfb8viZUeVZm0X6IZnxAydC7YU42CMw4= +k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738 h1:M3sRQVHv7vB20Xc2ybTt7ODCeFj6JSWYFzOFnYeS6Ro= +k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= +sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3 h1:/Rv+M11QRah1itp8VhT6HoVx1Ray9eB4DBr+K+/sCJ8= +sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3/go.mod h1:18nIHnGi6636UCz6m8i4DhaJ65T6EruyzmoQqI2BVDo= +sigs.k8s.io/structured-merge-diff/v4 v4.4.2 h1:MdmvkGuXi/8io6ixD5wud3vOLwc1rj0aNqRlpuvjmwA= +sigs.k8s.io/structured-merge-diff/v4 v4.4.2/go.mod h1:N8f93tFZh9U6vpxwRArLiikrE5/2tiu1w1AGfACIGE4= +sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E= +sigs.k8s.io/yaml v1.4.0/go.mod h1:Ejl7/uTz7PSA4eKMyQCUTnhZYNmLIl+5c2lQPGR2BPY= diff --git a/internal/jobs/custom_domain_reconcile.go b/internal/jobs/custom_domain_reconcile.go new file mode 100644 index 0000000..67a3814 --- /dev/null +++ b/internal/jobs/custom_domain_reconcile.go @@ -0,0 +1,450 @@ +package jobs + +// custom_domain_reconcile.go — periodic reconciler for the custom_domains table. +// +// Sprint 10's Verify endpoint is pull-only: the dashboard polls it after the +// customer adds DNS. This worker complements that by sweeping the table on a +// schedule so domains advance even when nobody is looking at the dashboard. +// +// Lifecycle (mirrors api/internal/handlers/custom_domain.go): +// +// pending_verification → verified → ingress_ready → cert_ready → live +// │ +// └→ failed (after 7d stuck in pending_verification) +// +// SCOPE NOTE: the worker module (instant.dev/worker) is a separate Go module +// from the api (instant.dev) and does NOT import api packages or k8s.io +// client-go. As a result this reconciler covers: +// +// step 1 TXT lookup → mark verified +// step 4 HTTP HEAD probe of cert_ready domains → mark live +// step 5 Stale pending_verification (>7d) → mark failed +// +// Steps 2 (Ingress create) and 3 (cert poll) remain in the API handler — the +// dashboard's first Verify click triggers them. See the TODO at the bottom of +// this file for promoting those into the worker once the api module is +// import-reachable (or once we vendor a minimal k8s client here). +// +// The worker duplicates the small set of model SQL needed (read + status +// updates) rather than depending on api/internal/models. The columns live in +// migration 014_custom_domains.sql; if that schema changes the queries here +// need an update too. Keep both in sync. + +import ( + "context" + "database/sql" + "fmt" + "log/slog" + "net" + "net/http" + "strings" + "time" + + "github.com/google/uuid" + "github.com/riverqueue/river" +) + +// Reconciler tunables. The interval matches the periodic-job registration in +// workers.go; threshold + timeouts are stand-alone. +const ( + customDomainReconcileInterval = 5 * time.Minute + customDomainStaleThreshold = 7 * 24 * time.Hour + txtLookupTimeout = 5 * time.Second + httpProbeTimeout = 10 * time.Second + + // Status strings — verbatim copies of models.CustomDomainStatus*. + // Duplicated here because the worker module does not import the api + // module. If the api strings ever change, update both places. + statusPending = "pending_verification" + statusVerified = "verified" + statusIngressReady = "ingress_ready" + statusCertReady = "cert_ready" + statusLive = "live" + statusFailed = "failed" + + // VerificationTokenPrefix mirrors api/internal/models.VerificationTokenPrefix. + verificationTokenPrefix = "instanode-verify-" + + // txtChallengePrefix is the DNS label we ask customers to put their TXT + // record at — same constant the API handler uses. + txtChallengePrefix = "_instanode." + + staleVerificationFailReason = "verification timeout: TXT record not observed within 7 days" +) + +// CustomDomainReconcileArgs is the periodic-job payload. Empty — every run is +// a full table sweep. +type CustomDomainReconcileArgs struct{} + +// Kind implements river.JobArgs. +func (CustomDomainReconcileArgs) Kind() string { return "custom_domain_reconcile" } + +// k8sCustomDomainProvider is the slice of the api's K8sStackProvider used by +// the reconciler. Defined as an interface so callers can pass nil when k8s +// isn't reachable (steps 2 & 3 are then skipped — see SCOPE NOTE above). +// +// Today no implementation is wired in the worker — the worker's main.go +// always passes nil. Kept as part of the constructor signature so the +// follow-up that vendors a real k8s client doesn't have to reshape this +// worker's public surface. +type k8sCustomDomainProvider interface { + EnsureCustomDomainIngress(ctx context.Context, namespace, hostname, serviceName string, servicePort int) (string, error) + CertificateReady(ctx context.Context, namespace, certName string) (bool, string, error) +} + +// CustomDomainReconciler is the River worker. +type CustomDomainReconciler struct { + river.WorkerDefaults[CustomDomainReconcileArgs] + db *sql.DB + k8s k8sCustomDomainProvider // may be nil; ingress/cert steps then skipped + httpCli *http.Client // probe client; never follows redirects +} + +// NewCustomDomainReconciler constructs the worker. +// +// Pass nil for k8sProvider in environments where the worker can't talk to +// the cluster API (steps 2 & 3 will be skipped — the API handler's Verify +// endpoint still drives those when a user clicks "verify" in the dashboard). +// +// httpCli may be nil — the worker installs a default with a 10s timeout and +// CheckRedirect set to ErrUseLastResponse so a 301 from the wrong CDN never +// fools us into reporting a custom hostname as live. +func NewCustomDomainReconciler(db *sql.DB, k8sProvider k8sCustomDomainProvider, httpCli *http.Client) *CustomDomainReconciler { + if httpCli == nil { + httpCli = &http.Client{ + Timeout: httpProbeTimeout, + CheckRedirect: func(req *http.Request, via []*http.Request) error { + return http.ErrUseLastResponse + }, + } + } + return &CustomDomainReconciler{ + db: db, + k8s: k8sProvider, + httpCli: httpCli, + } +} + +// activeCustomDomain is the projection the reconciler reads — only the columns +// needed for state transitions, no audit fields. +type activeCustomDomain struct { + id uuid.UUID + hostname string + token string // verification token; combined with prefix to build TXT value + status string + createdAt time.Time +} + +// Work runs the full sweep. Errors on individual domains are logged and +// swallowed so one bad row never stops the rest of the batch — same fail-open +// posture as ExpireAnonymousWorker. +func (w *CustomDomainReconciler) Work(ctx context.Context, job *river.Job[CustomDomainReconcileArgs]) error { + start := time.Now() + + domains, err := w.listActiveDomains(ctx) + if err != nil { + return fmt.Errorf("custom_domain_reconcile: list active: %w", err) + } + + if len(domains) == 0 { + slog.Info("jobs.custom_domain_reconcile.completed", + "total", 0, + "duration_ms", time.Since(start).Milliseconds(), + "job_id", job.ID, + ) + return nil + } + + var ( + advancedVerified int + advancedLive int + markedFailed int + recordedErrors int + ) + + for _, d := range domains { + switch d.status { + case statusPending: + result := w.reconcilePending(ctx, d) + switch result { + case reconcileAdvanced: + advancedVerified++ + case reconcileFailed: + markedFailed++ + case reconcileRecordedErr: + recordedErrors++ + } + + case statusCertReady: + if w.reconcileCertReady(ctx, d) == reconcileAdvanced { + advancedLive++ + } + + case statusVerified, statusIngressReady: + // Steps 2 & 3 (Ingress create / cert poll) are not wired in the + // worker — the api handler still drives them via Verify. Log + // once at debug so anyone reading the worker logs can confirm + // these are intentionally skipped. + slog.Debug("jobs.custom_domain_reconcile.skip_ingress_steps", + "id", d.id, + "hostname", d.hostname, + "status", d.status, + "note", "k8s steps owned by api handler in current sprint", + ) + + case statusLive, statusFailed: + // Terminal — listActiveDomains filters these out, but defend in + // case the SQL changes. + + default: + slog.Warn("jobs.custom_domain_reconcile.unknown_status", + "id", d.id, "hostname", d.hostname, "status", d.status, + ) + } + } + + slog.Info("jobs.custom_domain_reconcile.completed", + "total", len(domains), + "advanced_verified", advancedVerified, + "advanced_live", advancedLive, + "marked_failed", markedFailed, + "recorded_errors", recordedErrors, + "duration_ms", time.Since(start).Milliseconds(), + "job_id", job.ID, + ) + return nil +} + +// reconcileResult is a small enum describing what happened for one row. +// Used purely for the per-batch counters in the completion log. +type reconcileResult int + +const ( + reconcileNoop reconcileResult = iota + reconcileAdvanced + reconcileFailed + reconcileRecordedErr +) + +// reconcilePending handles a domain in pending_verification: +// +// 1. If created > 7d ago and still pending → mark failed (stop probing). +// 2. Otherwise run a TXT lookup; mark verified on match. +// 3. On miss / error, record last_check_err so the dashboard can render +// the reason — never error out the batch. +func (w *CustomDomainReconciler) reconcilePending(ctx context.Context, d activeCustomDomain) reconcileResult { + if time.Since(d.createdAt) > customDomainStaleThreshold { + if err := w.markFailed(ctx, d.id, staleVerificationFailReason); err != nil { + slog.Error("jobs.custom_domain_reconcile.mark_failed_failed", + "error", err, "id", d.id, "hostname", d.hostname) + return reconcileNoop + } + slog.Info("jobs.custom_domain_reconcile.marked_failed", + "id", d.id, "hostname", d.hostname, "reason", staleVerificationFailReason) + return reconcileFailed + } + + matched, lookupErr := w.lookupTXT(ctx, d.hostname, d.token) + if matched { + if err := w.markVerified(ctx, d.id); err != nil { + slog.Error("jobs.custom_domain_reconcile.mark_verified_failed", + "error", err, "id", d.id, "hostname", d.hostname) + return reconcileNoop + } + slog.Info("jobs.custom_domain_reconcile.advanced_verified", + "id", d.id, "hostname", d.hostname) + return reconcileAdvanced + } + + msg := "TXT record missing or wrong value" + if lookupErr != nil { + msg = lookupErr.Error() + } + if err := w.updateLastCheck(ctx, d.id, msg); err != nil { + slog.Warn("jobs.custom_domain_reconcile.update_last_check_failed", + "error", err, "id", d.id, "hostname", d.hostname) + return reconcileNoop + } + return reconcileRecordedErr +} + +// reconcileCertReady runs an HTTP HEAD against https:///. A 2xx or +// 3xx (without following) marks the domain live; anything else records the +// last_check_err so the dashboard surfaces "DNS not pointing yet" or similar. +// +// We deliberately do NOT follow redirects: a 301 to https://example.com from +// somewhere else's CDN is not proof our ingress is serving the hostname. +func (w *CustomDomainReconciler) reconcileCertReady(ctx context.Context, d activeCustomDomain) reconcileResult { + url := "https://" + d.hostname + "/" + probeCtx, cancel := context.WithTimeout(ctx, httpProbeTimeout) + defer cancel() + + req, err := http.NewRequestWithContext(probeCtx, http.MethodHead, url, nil) + if err != nil { + _ = w.updateLastCheck(ctx, d.id, fmt.Sprintf("build probe request: %v", err)) + return reconcileNoop + } + req.Header.Set("User-Agent", "instanode-domain-reconciler/1") + + resp, err := w.httpCli.Do(req) + if err != nil { + _ = w.updateLastCheck(ctx, d.id, fmt.Sprintf("HTTPS HEAD probe failed: %v", err)) + return reconcileNoop + } + defer resp.Body.Close() + + if resp.StatusCode >= 200 && resp.StatusCode < 400 { + if err := w.updateStatus(ctx, d.id, statusLive, ""); err != nil { + slog.Error("jobs.custom_domain_reconcile.mark_live_failed", + "error", err, "id", d.id, "hostname", d.hostname) + return reconcileNoop + } + slog.Info("jobs.custom_domain_reconcile.advanced_live", + "id", d.id, "hostname", d.hostname, "probe_status", resp.StatusCode) + return reconcileAdvanced + } + + msg := fmt.Sprintf("HTTPS HEAD probe returned %d", resp.StatusCode) + if err := w.updateLastCheck(ctx, d.id, msg); err != nil { + slog.Warn("jobs.custom_domain_reconcile.update_last_check_failed", + "error", err, "id", d.id, "hostname", d.hostname) + return reconcileNoop + } + return reconcileRecordedErr +} + +// lookupTXT runs a context-bound TXT lookup at "_instanode." and +// returns whether any record matches "instanode-verify-". Trims +// surrounding quotes some resolvers leave on TXT contents — same logic the +// API handler uses. +func (w *CustomDomainReconciler) lookupTXT(parent context.Context, hostname, token string) (bool, error) { + lookupCtx, cancel := context.WithTimeout(parent, txtLookupTimeout) + defer cancel() + + resolver := &net.Resolver{} + records, err := resolver.LookupTXT(lookupCtx, txtChallengePrefix+hostname) + if err != nil { + return false, fmt.Errorf("TXT lookup for %s failed: %w", txtChallengePrefix+hostname, err) + } + want := verificationTokenPrefix + token + for _, r := range records { + clean := strings.Trim(r, "\"") + if clean == want || r == want { + return true, nil + } + } + return false, nil +} + +// ── SQL helpers ─────────────────────────────────────────────────────────────── +// +// These mirror the api package's models.CustomDomain* helpers. They are +// duplicated here intentionally — the worker module does not import the api +// module. The schema lives in migration 014_custom_domains.sql; keep these in +// sync if columns or status strings ever change. + +// listActiveDomains returns every row not in a terminal status. Newest first +// matches the order the dashboard already uses, but the worker doesn't depend +// on order — it's purely for log readability. +func (w *CustomDomainReconciler) listActiveDomains(ctx context.Context) ([]activeCustomDomain, error) { + rows, err := w.db.QueryContext(ctx, ` + SELECT id, hostname, verification_token, status, created_at + FROM custom_domains + WHERE status NOT IN ($1, $2) + ORDER BY created_at ASC + `, statusLive, statusFailed) + if err != nil { + return nil, fmt.Errorf("listActiveDomains: query: %w", err) + } + defer rows.Close() + + var out []activeCustomDomain + for rows.Next() { + var d activeCustomDomain + if err := rows.Scan(&d.id, &d.hostname, &d.token, &d.status, &d.createdAt); err != nil { + return nil, fmt.Errorf("listActiveDomains: scan: %w", err) + } + out = append(out, d) + } + return out, rows.Err() +} + +// markVerified is the equivalent of models.MarkCustomDomainVerified. Sets +// verified_at = now() and clears last_check_err on success. +func (w *CustomDomainReconciler) markVerified(ctx context.Context, id uuid.UUID) error { + _, err := w.db.ExecContext(ctx, ` + UPDATE custom_domains + SET status = $1, + verified_at = now(), + last_check_at = now(), + last_check_err = NULL + WHERE id = $2 AND status = $3 + `, statusVerified, id, statusPending) + if err != nil { + return fmt.Errorf("markVerified: %w", err) + } + return nil +} + +// markFailed is the equivalent of the requested MarkFailed helper. Records +// reason in last_check_err so the dashboard can render "verification timed +// out". +func (w *CustomDomainReconciler) markFailed(ctx context.Context, id uuid.UUID, reason string) error { + _, err := w.db.ExecContext(ctx, ` + UPDATE custom_domains + SET status = $1, + last_check_at = now(), + last_check_err = $2 + WHERE id = $3 + `, statusFailed, reason, id) + if err != nil { + return fmt.Errorf("markFailed: %w", err) + } + return nil +} + +// updateStatus advances status and resets last_check fields. Equivalent of +// models.UpdateCustomDomainStatus. Empty errMsg sets last_check_err to NULL. +func (w *CustomDomainReconciler) updateStatus(ctx context.Context, id uuid.UUID, status, errMsg string) error { + var errVal interface{} + if errMsg != "" { + errVal = errMsg + } + _, err := w.db.ExecContext(ctx, ` + UPDATE custom_domains + SET status = $1, + last_check_at = now(), + last_check_err = $2 + WHERE id = $3 + `, status, errVal, id) + if err != nil { + return fmt.Errorf("updateStatus: %w", err) + } + return nil +} + +// updateLastCheck records last_check_at + last_check_err without changing +// status. Equivalent of the requested UpdateLastCheck helper. +func (w *CustomDomainReconciler) updateLastCheck(ctx context.Context, id uuid.UUID, errStr string) error { + var errVal interface{} + if errStr != "" { + errVal = errStr + } + _, err := w.db.ExecContext(ctx, ` + UPDATE custom_domains + SET last_check_at = now(), + last_check_err = $1 + WHERE id = $2 + `, errVal, id) + if err != nil { + return fmt.Errorf("updateLastCheck: %w", err) + } + return nil +} + +// TODO(custom-domains): once the api module is import-reachable from worker +// (or once we vendor a minimal k8s client here) wire the k8sCustomDomainProvider +// path so steps 2 (Ingress create) and 3 (cert poll) run from the reconciler +// too. Today the dashboard's first Verify click drives those — fine for the +// happy path, but a domain whose user never returns to the dashboard will sit +// at "verified" forever. diff --git a/internal/jobs/deploy_status_reconcile.go b/internal/jobs/deploy_status_reconcile.go new file mode 100644 index 0000000..7c7bbe5 --- /dev/null +++ b/internal/jobs/deploy_status_reconcile.go @@ -0,0 +1,421 @@ +package jobs + +// deploy_status_reconcile.go — periodic reconciler for the deployments table. +// +// PROBLEM SHAPE +// +// The API's /deploy/new handler persists a deployments row with status="building" +// and kicks off runDeploy() in a goroutine. runDeploy() calls compute.Deploy() +// which, on a k8s backend, returns immediately after kicking off the kaniko +// build + creating the Deployment object. The handler then writes whatever +// status compute.Deploy() returned (typically still "building" if AvailableReplicas +// has not yet reached 1). +// +// After that single write, the deployments.status column is never touched again +// by the system. The k8s Deployment transitions to AvailableReplicas>=1 within +// ~30-90s on local clusters, but the DB row stays "building" forever — there +// is no background process that watches live k8s state and rolls it forward. +// +// Customers calling GET /deploy/:id to discover when their app is ready get +// the stale value. This worker fixes that by sweeping all non-terminal +// deployments every 30s and reconciling the column from the live k8s +// Deployment object's status.AvailableReplicas + conditions. +// +// LIFECYCLE (mirrors api/internal/providers/compute/k8s/client.go:deploymentStatus) +// +// building ─→ deploying ─→ healthy +// │ +// └→ failed (DeploymentReplicaFailure=True) +// +// building ─→ stopped (k8s Deployment NotFound — namespace was deleted) +// +// failed and stopped are TERMINAL — we never reconcile out of them. The +// listActiveDeployments SQL filter already excludes them. +// +// SCOPE / FAIL-OPEN POSTURE +// +// The worker is constructed with a deployStatusK8sProvider interface. The +// concrete implementation lives in this file (k8sDeployStatusClient) and is +// built from rest.InClusterConfig() with a kubeconfig fallback for local +// dev. If client init fails (e.g. no kubeconfig in CI, no cluster reachable), +// the constructor returns a reconciler with k8s=nil and Work() short-circuits +// at the top with a warn log. This keeps the worker process alive — other +// periodic jobs (expire_anonymous, expire_stacks, custom_domain_reconcile) +// keep running. Same fail-open posture as ExpireAnonymousWorker on Redis errors. +// +// SQL SCHEMA +// +// The deployments table is owned by the api module. Columns referenced here: +// - id (uuid, PK) +// - provider_id (text, nullable until runDeploy completes — we skip those) +// - status (text) +// - error_message (text, nullable) +// - updated_at (timestamptz) +// Status string set (kept in sync with api/internal/models/deployment.go): +// building | deploying | healthy | failed | stopped +// +// NAMESPACE LAYOUT +// +// The api's k8s.K8sProvider derives namespace = "instant-deploy-" from +// the provider_id, where provider_id = "app-". This worker duplicates +// that derivation (deployNamespaceFromProviderID) rather than depending on +// the api package — same pattern custom_domain_reconcile.go uses for status +// strings. +// +// RBAC +// +// The worker ServiceAccount (infra/k8s/worker-rbac.yaml) gains a new ClusterRole +// "instant-worker-deploy-reader" with get on deployments.apps. We can't restrict +// it to namespace-by-namespace because instant-deploy- namespaces are +// created on demand by the api process; the deploy-reader ClusterRole is +// read-only and scoped to the deployments resource so the blast radius is tight. + +import ( + "context" + "database/sql" + "errors" + "fmt" + "log/slog" + "strings" + "time" + + "github.com/google/uuid" + "github.com/riverqueue/river" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" +) + +// Reconciler tunables. The interval matches the periodic-job registration in +// workers.go. +const ( + deployStatusReconcileInterval = 30 * time.Second + + // k8sGetTimeout caps a single Get-Deployment call so one stuck namespace + // doesn't stall the whole batch sweep. The control plane is normally + // sub-second; 5s is generous. + k8sGetTimeout = 5 * time.Second + + // Status strings — verbatim copies of the canonical set in + // api/internal/models/deployment.go and the k8s compute provider's + // deploymentStatus() helper. Duplicated here because the worker module + // does not import the api module. If the api strings ever change, + // update both places. + deployStatusBuilding = "building" + deployStatusDeploying = "deploying" + deployStatusHealthy = "healthy" + deployStatusFailed = "failed" + deployStatusStopped = "stopped" + + // providerIDPrefix mirrors api/internal/providers/compute/k8s/client.go's + // deploymentName(appID) = "app-" + appID. + providerIDPrefix = "app-" + + // deployNamespacePrefix mirrors deployNamespace(appID) in the api's k8s + // provider. The worker derives the namespace from provider_id rather than + // storing it on the deployments row. + deployNamespacePrefix = "instant-deploy-" +) + +// DeployStatusReconcileArgs is the periodic-job payload. Empty — every run is +// a full table sweep. +type DeployStatusReconcileArgs struct{} + +// Kind implements river.JobArgs. +func (DeployStatusReconcileArgs) Kind() string { return "deploy_status_reconcile" } + +// deployStatusK8sProvider is the slice of k8s API the reconciler uses. Defined +// as an interface so callers can pass nil when k8s isn't reachable (the +// reconciler then warn-logs and returns nil — same pattern as +// k8sCustomDomainProvider in custom_domain_reconcile.go). +type deployStatusK8sProvider interface { + // GetDeployment returns the live k8s Deployment object, or apierrors.IsNotFound + // when the namespace or Deployment has been deleted. The caller maps NotFound + // to status="stopped". + GetDeployment(ctx context.Context, namespace, name string) (*appsv1.Deployment, error) +} + +// k8sDeployStatusClient is the concrete deployStatusK8sProvider implementation. +// Wraps a kubernetes.Clientset so the reconciler doesn't import the full client +// surface at every callsite. +type k8sDeployStatusClient struct { + cs *kubernetes.Clientset +} + +// GetDeployment implements deployStatusK8sProvider. +func (c *k8sDeployStatusClient) GetDeployment(ctx context.Context, namespace, name string) (*appsv1.Deployment, error) { + return c.cs.AppsV1().Deployments(namespace).Get(ctx, name, metav1.GetOptions{}) +} + +// NewK8sDeployStatusClient builds a deployStatusK8sProvider from in-cluster +// config, falling back to the default kubeconfig for local dev. Returns nil +// (and a non-nil error) when neither is reachable — the caller logs and +// passes nil to NewDeployStatusReconciler. +// +// This mirrors api/internal/providers/compute/k8s/client.go:newClientset but +// duplicated here so the worker module does not depend on the api module. +func NewK8sDeployStatusClient() (deployStatusK8sProvider, error) { + cfg, err := rest.InClusterConfig() + if err != nil { + cfg, err = clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile) + if err != nil { + return nil, fmt.Errorf("k8s config (in-cluster + kubeconfig both failed): %w", err) + } + } + cs, err := kubernetes.NewForConfig(cfg) + if err != nil { + return nil, fmt.Errorf("k8s NewForConfig: %w", err) + } + return &k8sDeployStatusClient{cs: cs}, nil +} + +// DeployStatusReconciler is the River worker that sweeps non-terminal +// deployments and rolls status forward from live k8s state. +type DeployStatusReconciler struct { + river.WorkerDefaults[DeployStatusReconcileArgs] + db *sql.DB + k8s deployStatusK8sProvider // may be nil — the worker then warn-logs each run +} + +// NewDeployStatusReconciler constructs the worker. +// +// Pass nil for k8sProvider in environments where the worker can't talk to +// the cluster API. Work() will short-circuit each run with a warn log so +// the rest of the periodic job lineup keeps functioning. +func NewDeployStatusReconciler(db *sql.DB, k8sProvider deployStatusK8sProvider) *DeployStatusReconciler { + return &DeployStatusReconciler{ + db: db, + k8s: k8sProvider, + } +} + +// activeDeployment is the projection the reconciler reads — only the columns +// needed to drive transitions. +type activeDeployment struct { + id uuid.UUID + providerID string + status string +} + +// Work runs the full sweep. Errors on individual rows are logged and swallowed +// so one bad row never stops the rest — same fail-open posture as +// CustomDomainReconciler. +func (w *DeployStatusReconciler) Work(ctx context.Context, job *river.Job[DeployStatusReconcileArgs]) error { + start := time.Now() + + if w.k8s == nil { + // k8s wasn't reachable at startup — log once per tick at WARN so + // operators notice, but don't fail the job (River would retry + // forever). The rest of the worker process keeps running. + slog.Warn("jobs.deploy_status_reconcile.skipped_no_k8s_client", + "reason", "k8s client init failed at startup; deployments will stay stale until worker is restarted with reachable cluster", + "job_id", job.ID, + ) + return nil + } + + deployments, err := w.listActiveDeployments(ctx) + if err != nil { + return fmt.Errorf("deploy_status_reconcile: list active: %w", err) + } + + if len(deployments) == 0 { + slog.Info("jobs.deploy_status_reconcile.completed", + "total", 0, + "duration_ms", time.Since(start).Milliseconds(), + "job_id", job.ID, + ) + return nil + } + + var ( + transitions int + errors int + skipped int + ) + + for _, d := range deployments { + if d.providerID == "" { + // runDeploy() hasn't reached UpdateDeploymentProviderID yet + // (kaniko build still in flight on the api side). Nothing to + // poll — leave the row alone. + skipped++ + continue + } + + newStatus, err := w.computeNewStatus(ctx, d.providerID) + if err == errSkipForeignProviderID { + // Row was provisioned by a different backend (e.g. the stack + // pipeline uses provider_id="instant-stack-*"). Not a fault. + slog.Debug("jobs.deploy_status_reconcile.skip_foreign_provider_id", + "id", d.id, "provider_id", d.providerID) + skipped++ + continue + } + if err != nil { + slog.Warn("jobs.deploy_status_reconcile.k8s_get_failed", + "id", d.id, "provider_id", d.providerID, "error", err) + errors++ + continue + } + + if newStatus == d.status { + continue + } + + if err := w.updateStatus(ctx, d.id, newStatus); err != nil { + slog.Error("jobs.deploy_status_reconcile.update_failed", + "id", d.id, "provider_id", d.providerID, + "from", d.status, "to", newStatus, "error", err) + errors++ + continue + } + + slog.Info("jobs.deploy_status_reconcile.transition", + "id", d.id, "provider_id", d.providerID, + "from", d.status, "to", newStatus) + transitions++ + } + + slog.Info("jobs.deploy_status_reconcile.completed", + "total", len(deployments), + "transitions", transitions, + "errors", errors, + "skipped", skipped, + "duration_ms", time.Since(start).Milliseconds(), + "job_id", job.ID, + ) + return nil +} + +// errSkipForeignProviderID is returned by computeNewStatus when a deployments +// row's provider_id does not match the "app-" shape (e.g. it was +// provisioned by the stack pipeline whose provider_id is "instant-stack-"). +// Sentinel error — Work() treats this as a skip, not an error. +var errSkipForeignProviderID = errors.New("provider_id not in app- shape; reconciler is single-app only") + +// computeNewStatus performs a single Get against the per-deployment namespace +// and maps the result into the canonical status string set. NotFound (the +// namespace or Deployment has been deleted out from under us) maps to +// "stopped" — same as the api's k8s.Status() helper. +func (w *DeployStatusReconciler) computeNewStatus(ctx context.Context, providerID string) (string, error) { + ns := deployNamespaceFromProviderID(providerID) + if ns == "" { + // provider_id doesn't have the "app-" prefix — almost certainly a row + // from a future or alternate compute backend. Skip cleanly. + return "", errSkipForeignProviderID + } + + getCtx, cancel := context.WithTimeout(ctx, k8sGetTimeout) + defer cancel() + + deploy, err := w.k8s.GetDeployment(getCtx, ns, providerID) + if apierrors.IsNotFound(err) { + // Namespace or Deployment is gone (manual cleanup, expiry sweep, + // teardown). Mark stopped so the row leaves the active set on + // the next sweep. + return deployStatusStopped, nil + } + if err != nil { + return "", err + } + + return deploymentStatusFromK8s(deploy), nil +} + +// deploymentStatusFromK8s mirrors api/internal/providers/compute/k8s/client.go's +// deploymentStatus() helper. Kept verbatim here to guarantee the worker's +// state machine matches what runDeploy() would write if it polled longer. +// +// Order matters: replica failure is checked first so a Deployment that has +// transient AvailableReplicas>=1 but a sticky failure condition (e.g. quota +// exceeded on a rolling update) does not flap into "healthy". +func deploymentStatusFromK8s(deploy *appsv1.Deployment) string { + for _, cond := range deploy.Status.Conditions { + if cond.Type == appsv1.DeploymentReplicaFailure && cond.Status == corev1.ConditionTrue { + return deployStatusFailed + } + } + if deploy.Status.AvailableReplicas >= 1 { + return deployStatusHealthy + } + if deploy.Status.UpdatedReplicas > 0 || deploy.Status.UnavailableReplicas > 0 { + return deployStatusDeploying + } + return deployStatusBuilding +} + +// deployNamespaceFromProviderID derives the per-deployment namespace from the +// provider_id stored on a deployments row. provider_id = "app-"; +// namespace = "instant-deploy-". Returns "" for rows whose provider_id +// doesn't match the expected shape (e.g. future Fly.io backend). +func deployNamespaceFromProviderID(providerID string) string { + if !strings.HasPrefix(providerID, providerIDPrefix) { + return "" + } + appID := strings.TrimPrefix(providerID, providerIDPrefix) + if appID == "" { + return "" + } + return deployNamespacePrefix + appID +} + +// ── SQL helpers ─────────────────────────────────────────────────────────────── +// +// These mirror the api's models.Deployment helpers but use only the columns +// the reconciler needs. Duplicated here intentionally — the worker module does +// not import the api module. The schema lives in the deployments migration in +// api/internal/db/migrations; if columns or status strings change, keep these +// in sync. + +// listActiveDeployments returns every deployments row not in a terminal status. +// Terminal = failed | stopped. The api's runDeploy/Redeploy paths also write +// "deploying" and "building" transitively — both are picked up here. +func (w *DeployStatusReconciler) listActiveDeployments(ctx context.Context) ([]activeDeployment, error) { + rows, err := w.db.QueryContext(ctx, ` + SELECT id, COALESCE(provider_id, ''), status + FROM deployments + WHERE status IN ($1, $2, $3) + ORDER BY updated_at ASC + `, deployStatusBuilding, deployStatusDeploying, deployStatusHealthy) + if err != nil { + return nil, fmt.Errorf("listActiveDeployments: query: %w", err) + } + defer rows.Close() + + var out []activeDeployment + for rows.Next() { + var d activeDeployment + if err := rows.Scan(&d.id, &d.providerID, &d.status); err != nil { + return nil, fmt.Errorf("listActiveDeployments: scan: %w", err) + } + out = append(out, d) + } + return out, rows.Err() +} + +// updateStatus advances the status column and bumps updated_at. We deliberately +// do NOT clear error_message — the api owns that field and we don't want to +// race with a concurrent runDeploy error write. +// +// We also do not transition out of "failed" or "stopped" — the SELECT above +// excludes those, so the UPDATE doesn't need a defensive WHERE clause for +// them. We do, however, add a WHERE status IN (...) guard so a row that +// concurrently transitioned to a terminal state between SELECT and UPDATE +// is left alone. +func (w *DeployStatusReconciler) updateStatus(ctx context.Context, id uuid.UUID, status string) error { + _, err := w.db.ExecContext(ctx, ` + UPDATE deployments + SET status = $1, updated_at = now() + WHERE id = $2 + AND status IN ($3, $4, $5) + `, status, id, deployStatusBuilding, deployStatusDeploying, deployStatusHealthy) + if err != nil { + return fmt.Errorf("updateStatus: %w", err) + } + return nil +} diff --git a/internal/jobs/expire.go b/internal/jobs/expire.go index d3358d5..6336924 100644 --- a/internal/jobs/expire.go +++ b/internal/jobs/expire.go @@ -151,7 +151,9 @@ func (w *ExpireAnonymousWorker) Work(ctx context.Context, job *river.Job[ExpireA } // expireResourceTypeToProto maps a resource_type string to the protobuf enum. -// Returns UNSPECIFIED for types without provisioner deprovisioning (queue, webhook, storage). +// Queue/NATS now provisions dedicated pods (k8s backend) so it needs deprovisioning +// just like postgres/redis/mongo. Webhook + storage stay UNSPECIFIED — they don't +// have a per-resource pod (webhook is API-receiver only; storage is bucket-isolated). func expireResourceTypeToProto(resourceType string) commonv1.ResourceType { switch resourceType { case "postgres": @@ -160,6 +162,8 @@ func expireResourceTypeToProto(resourceType string) commonv1.ResourceType { return commonv1.ResourceType_RESOURCE_TYPE_REDIS case "mongodb": return commonv1.ResourceType_RESOURCE_TYPE_MONGODB + case "queue": + return commonv1.ResourceType_RESOURCE_TYPE_QUEUE default: return commonv1.ResourceType_RESOURCE_TYPE_UNSPECIFIED } diff --git a/internal/jobs/workers.go b/internal/jobs/workers.go index 6d1c4cb..c6c7b84 100644 --- a/internal/jobs/workers.go +++ b/internal/jobs/workers.go @@ -62,7 +62,13 @@ func (mondayAt8UTCSchedule) Next(t time.Time) time.Time { // StartWorkers initialises and starts the River background worker pool. // It registers all job workers and schedules periodic jobs. -func StartWorkers(ctx context.Context, db *sql.DB, rdb *redis.Client, cfg *config.Config, provClient *provisioner.Client, planRegistry PlanRegistry) *Workers { +// +// deployStatusK8s is the k8s client used by DeployStatusReconciler to fetch +// live Deployment objects from the per-deployment "instant-deploy-" +// namespaces. Pass nil when the worker can't reach a cluster — the +// reconciler logs at WARN each run and other periodic jobs keep functioning. +// See worker/internal/jobs/deploy_status_reconcile.go for the SCOPE NOTE. +func StartWorkers(ctx context.Context, db *sql.DB, rdb *redis.Client, cfg *config.Config, provClient *provisioner.Client, planRegistry PlanRegistry, deployStatusK8s deployStatusK8sProvider) *Workers { _ = rdb // available for future workers; currently only used by quota checks done via db // River requires pgx pool — open a separate connection for the worker pool. @@ -98,6 +104,16 @@ func StartWorkers(ctx context.Context, db *sql.DB, rdb *redis.Client, cfg *confi river.AddWorker(workers, &WeeklyDigestWorker{db: db, email: emailClient}) river.AddWorker(workers, NewEnforceStorageQuotaWorker(db, planRegistry)) river.AddWorker(workers, NewUpdateStorageBytesWorker(db, provClient)) + // Custom-domain reconciler — TXT lookup, HTTP probe, stale-failed sweep. + // k8s provider is nil today: the worker module does not import the api's + // k8s client. Steps 2/3 (Ingress + cert poll) stay in the api handler. + // See custom_domain_reconcile.go for the full SCOPE NOTE. + river.AddWorker(workers, NewCustomDomainReconciler(db, nil, nil)) + // Deploy-status reconciler — sweeps non-terminal deployments and rolls + // status forward from live k8s Deployment state every 30s. deployStatusK8s + // may be nil (kubeconfig unreachable in CI / docker-compose); the worker + // then short-circuits with a WARN each tick. See deploy_status_reconcile.go. + river.AddWorker(workers, NewDeployStatusReconciler(db, deployStatusK8s)) periodicJobs := []*river.PeriodicJob{ river.NewPeriodicJob( @@ -154,6 +170,28 @@ func StartWorkers(ctx context.Context, db *sql.DB, rdb *redis.Client, cfg *confi }, &river.PeriodicJobOpts{RunOnStart: false}, ), + // Custom-domain reconciler runs every 5 minutes — see + // customDomainReconcileInterval in custom_domain_reconcile.go. + // RunOnStart=true so a worker restart immediately picks up domains + // that became verifiable while we were down. + river.NewPeriodicJob( + river.PeriodicInterval(customDomainReconcileInterval), + func() (river.JobArgs, *river.InsertOpts) { + return CustomDomainReconcileArgs{}, nil + }, + &river.PeriodicJobOpts{RunOnStart: true}, + ), + // Deploy-status reconciler runs every 30s — see + // deployStatusReconcileInterval in deploy_status_reconcile.go. + // RunOnStart=true so a worker restart immediately reconciles any + // deployments stuck in "building" or "deploying" from the last cycle. + river.NewPeriodicJob( + river.PeriodicInterval(deployStatusReconcileInterval), + func() (river.JobArgs, *river.InsertOpts) { + return DeployStatusReconcileArgs{}, nil + }, + &river.PeriodicJobOpts{RunOnStart: true}, + ), } riverClient, err := river.NewClient(riverpgxv5.New(pool), &river.Config{ diff --git a/main.go b/main.go index d9f9d07..cbace61 100644 --- a/main.go +++ b/main.go @@ -70,7 +70,22 @@ func main() { planRegistry = commonplans.Default() } - workers := jobs.StartWorkers(ctx, database, rdb, cfg, provClient, planRegistry) + // Build the k8s client used by DeployStatusReconciler. Fails open: if + // neither in-cluster config nor kubeconfig is reachable (CI, docker-compose, + // bare-metal dev box), we pass nil and the reconciler warn-logs each tick + // while every other periodic job keeps running. See + // worker/internal/jobs/deploy_status_reconcile.go for the SCOPE NOTE. + deployStatusK8s, k8sErr := jobs.NewK8sDeployStatusClient() + if k8sErr != nil { + slog.Warn("worker.deploy_status_k8s_client_init_failed", + "error", k8sErr, + "note", "DeployStatusReconciler will log warnings each tick; other periodic jobs unaffected") + deployStatusK8s = nil + } else { + slog.Info("worker.deploy_status_k8s_client_ready") + } + + workers := jobs.StartWorkers(ctx, database, rdb, cfg, provClient, planRegistry, deployStatusK8s) defer workers.Stop() // Exit immediately if River failed to start so Kubernetes restarts the pod.