diff --git a/.github/workflows/api.yml b/.github/workflows/api.yml index e8044d5c..849f5ec8 100644 --- a/.github/workflows/api.yml +++ b/.github/workflows/api.yml @@ -37,18 +37,18 @@ jobs: - name: Wait for services to be healthy working-directory: ./tests run: | - echo "Waiting for sqld to be healthy..." + echo "Waiting for MongoDB to be healthy..." for i in $(seq 1 20); do - if curl -sf http://localhost:8090/health >/dev/null 2>&1; then - echo "sqld is healthy!" + if docker compose exec mongodb mongosh --eval "db.runCommand('ping').ok" --quiet >/dev/null 2>&1; then + echo "MongoDB is healthy!" break fi if [ $i -eq 20 ]; then - echo "sqld failed to become healthy" - docker compose logs sqld + echo "MongoDB failed to become healthy" + docker compose logs mongodb exit 1 fi - echo "sqld attempt $i/20 - waiting 3s..." + echo "MongoDB attempt $i/20 - waiting 3s..." sleep 3 done diff --git a/api/go.mod b/api/go.mod index d77c0bf6..1fe7dca1 100644 --- a/api/go.mod +++ b/api/go.mod @@ -44,9 +44,10 @@ require ( github.com/stretchr/testify v1.11.1 github.com/swaggo/swag v1.16.6 github.com/thedevsaddam/govalidator v1.9.10 - github.com/tursodatabase/libsql-client-go v0.0.0-20260514053736-a9a8fadfe885 github.com/uptrace/uptrace-go v1.43.0 github.com/xuri/excelize/v2 v2.10.1 + go.mongodb.org/mongo-driver/v2 v2.6.0 + go.opentelemetry.io/contrib/instrumentation/go.mongodb.org/mongo-driver/v2/mongo/otelmongo v0.0.0-20260513205827-ba143fc95a5e go.opentelemetry.io/otel v1.43.0 go.opentelemetry.io/otel/metric v1.43.0 go.opentelemetry.io/otel/sdk v1.43.0 @@ -94,13 +95,11 @@ require ( github.com/PuerkitoBio/goquery v1.12.0 // indirect github.com/andybalholm/brotli v1.2.1 // indirect github.com/andybalholm/cascadia v1.3.3 // indirect - github.com/antlr4-go/antlr/v4 v4.13.0 // indirect github.com/cenkalti/backoff/v5 v5.0.3 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/clipperhouse/displaywidth v0.11.0 // indirect github.com/clipperhouse/uax29/v2 v2.7.0 // indirect github.com/cncf/xds/go v0.0.0-20260202195803-dba9d589def2 // indirect - github.com/coder/websocket v1.8.12 // indirect github.com/envoyproxy/go-control-plane/envoy v1.37.0 // indirect github.com/envoyproxy/protoc-gen-validate v1.3.3 // indirect github.com/fatih/color v1.19.0 // indirect @@ -164,8 +163,12 @@ require ( github.com/valyala/fasthttp v1.71.0 // indirect github.com/vanng822/css v1.0.1 // indirect github.com/vanng822/go-premailer v1.33.0 // indirect + github.com/xdg-go/pbkdf2 v1.0.0 // indirect + github.com/xdg-go/scram v1.2.0 // indirect + github.com/xdg-go/stringprep v1.0.4 // indirect github.com/xuri/efp v0.0.1 // indirect github.com/xuri/nfp v0.0.2-0.20250530014748-2ddeb826f9a9 // indirect + github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect go.opentelemetry.io/contrib v1.43.0 // indirect go.opentelemetry.io/contrib/detectors/gcp v1.43.0 // indirect @@ -185,13 +188,12 @@ require ( go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.28.0 // indirect go.yaml.in/yaml/v3 v3.0.4 // indirect - golang.org/x/crypto v0.50.0 // indirect - golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8 // indirect + golang.org/x/crypto v0.51.0 // indirect golang.org/x/mod v0.35.0 // indirect golang.org/x/net v0.53.0 // indirect golang.org/x/oauth2 v0.36.0 // indirect - golang.org/x/sys v0.43.0 // indirect - golang.org/x/text v0.36.0 // indirect + golang.org/x/sys v0.44.0 // indirect + golang.org/x/text v0.37.0 // indirect golang.org/x/time v0.15.0 // indirect golang.org/x/tools v0.44.0 // indirect google.golang.org/appengine v1.6.8 // indirect diff --git a/api/go.sum b/api/go.sum index 0710b094..fa1163a2 100644 --- a/api/go.sum +++ b/api/go.sum @@ -66,8 +66,6 @@ github.com/andybalholm/brotli v1.2.1 h1:R+f5xP285VArJDRgowrfb9DqL18yVK0gKAW/F+eT github.com/andybalholm/brotli v1.2.1/go.mod h1:rzTDkvFWvIrjDXZHkuS16NPggd91W3kUSvPlQ1pLaKY= github.com/andybalholm/cascadia v1.3.3 h1:AG2YHrzJIm4BZ19iwJ/DAua6Btl3IwJX+VI4kktS1LM= github.com/andybalholm/cascadia v1.3.3/go.mod h1:xNd9bqTn98Ln4DwST8/nG+H0yuB8Hmgu1YHNnWw0GeA= -github.com/antlr4-go/antlr/v4 v4.13.0 h1:lxCg3LAv+EUK6t1i0y1V6/SLeUi0eKEKdhQAlS8TVTI= -github.com/antlr4-go/antlr/v4 v4.13.0/go.mod h1:pfChB/xh/Unjila75QW7+VU4TSnWnnk9UTnmpPaOR2g= github.com/avast/retry-go/v5 v5.0.0 h1:kf1Qc2UsTZ4qq8elDymqfbISvkyMuhgRxuJqX2NHP7k= github.com/avast/retry-go/v5 v5.0.0/go.mod h1://d+usmKWio1agtZfS1H/ltTqwtIfBnRq9zEwjc3eH8= github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= @@ -90,8 +88,6 @@ github.com/cncf/xds/go v0.0.0-20260202195803-dba9d589def2 h1:aBangftG7EVZoUb69Os github.com/cncf/xds/go v0.0.0-20260202195803-dba9d589def2/go.mod h1:qwXFYgsP6T7XnJtbKlf1HP8AjxZZyzxMmc+Lq5GjlU4= github.com/cockroachdb/cockroach-go/v2 v2.4.3 h1:LJO3K3jC5WXvMePRQSJE1NsIGoFGcEx1LW83W6RAlhw= github.com/cockroachdb/cockroach-go/v2 v2.4.3/go.mod h1:9U179XbCx4qFWtNhc7BiWLPfuyMVQ7qdAhfrwLz1vH0= -github.com/coder/websocket v1.8.12 h1:5bUXkEPPIbewrnkU8LTCLVaxi4N4J8ahufH2vlo4NAo= -github.com/coder/websocket v1.8.12/go.mod h1:LNVeNrXQZfe5qhS9ALED3uA+l5pPqvwXg3CKoDBB2gs= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= @@ -324,8 +320,6 @@ github.com/thedevsaddam/govalidator v1.9.10 h1:m3dLRbSZ5Hts3VUWYe+vxLMG+FdyQuWOj github.com/thedevsaddam/govalidator v1.9.10/go.mod h1:Ilx8u7cg5g3LXbSS943cx5kczyNuUn7LH/cK5MYuE90= github.com/tiendc/go-deepcopy v1.7.2 h1:Ut2yYR7W9tWjTQitganoIue4UGxZwCcJy3orjrrIj44= github.com/tiendc/go-deepcopy v1.7.2/go.mod h1:4bKjNC2r7boYOkD2IOuZpYjmlDdzjbpTRyCx+goBCJQ= -github.com/tursodatabase/libsql-client-go v0.0.0-20260514053736-a9a8fadfe885 h1:YssVXwM/9nUAjGNmUWdgvb05JVcsaBrDn5yr+MaJTn0= -github.com/tursodatabase/libsql-client-go v0.0.0-20260514053736-a9a8fadfe885/go.mod h1:08inkKyguB6CGGssc/JzhmQWwBgFQBgjlYFjxjRh7nU= github.com/uptrace/uptrace-go v1.43.0 h1:5QuCdyFJdWUEXx6Fr6sYfezdgO6n6lnkOvUTLlyQO7U= github.com/uptrace/uptrace-go v1.43.0/go.mod h1:ehDTIdtBSolg4Z0CCvg1C8yR6VX1YFDqBcg2KmsXWn0= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= @@ -336,6 +330,12 @@ github.com/vanng822/css v1.0.1 h1:10yiXc4e8NI8ldU6mSrWmSWMuyWgPr9DZ63RSlsgDw8= github.com/vanng822/css v1.0.1/go.mod h1:tcnB1voG49QhCrwq1W0w5hhGasvOg+VQp9i9H1rCM1w= github.com/vanng822/go-premailer v1.33.0 h1:nglIpKn/7e3kIAwYByiH5xpauFur7RwAucqyZ59hcic= github.com/vanng822/go-premailer v1.33.0/go.mod h1:LGYI7ym6FQ7KcHN16LiQRF+tlan7qwhP1KEhpTINFpo= +github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= +github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= +github.com/xdg-go/scram v1.2.0 h1:bYKF2AEwG5rqd1BumT4gAnvwU/M9nBp2pTSxeZw7Wvs= +github.com/xdg-go/scram v1.2.0/go.mod h1:3dlrS0iBaWKYVt2ZfA4cj48umJZ+cAEbR6/SjLA88I8= +github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= +github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= github.com/xuri/efp v0.0.1 h1:fws5Rv3myXyYni8uwj2qKjVaRP30PdjeYe2Y6FDsCL8= github.com/xuri/efp v0.0.1/go.mod h1:ybY/Jr0T0GTCnYjKqmdwxyxn2BQf2RcQIIvex5QldPI= github.com/xuri/excelize/v2 v2.10.1 h1:V62UlqopMqha3kOpnlHy2CcRVw1V8E63jFoWUmMzxN0= @@ -344,17 +344,23 @@ github.com/xuri/nfp v0.0.2-0.20250530014748-2ddeb826f9a9 h1:+C0TIdyyYmzadGaL/HBL github.com/xuri/nfp v0.0.2-0.20250530014748-2ddeb826f9a9/go.mod h1:WwHg+CVyzlv/TX9xqBFXEZAuxOPxn2k1GNHwG41IIUQ= github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU= github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E= +github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 h1:ilQV1hzziu+LLM3zUTJ0trRztfwgjqKnBWNtSRkbmwM= +github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78/go.mod h1:aL8wCCfTfSfmXjznFBSZNN13rSJjlIOI1fUNAtF7rmI= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= github.com/yuin/goldmark v1.8.2 h1:kEGpgqJXdgbkhcOgBxkC0X0PmoPG1ZyoZ117rDVp4zE= github.com/yuin/goldmark v1.8.2/go.mod h1:ip/1k0VRfGynBgxOz0yCqHrbZXhcjxyuS66Brc7iBKg= github.com/zeebo/xxh3 v1.1.0 h1:s7DLGDK45Dyfg7++yxI0khrfwq9661w9EN78eP/UZVs= github.com/zeebo/xxh3 v1.1.0/go.mod h1:IisAie1LELR4xhVinxWS5+zf1lA4p0MW4T+w+W07F5s= +go.mongodb.org/mongo-driver/v2 v2.6.0 h1:b9sJOYrkmt4l8bY43ZenFBcPlhYIjaOfYHLtbB/5qi8= +go.mongodb.org/mongo-driver/v2 v2.6.0/go.mod h1:yOI9kBsufol30iFsl1slpdq1I0eHPzybRWdyYUs8K/0= go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= go.opentelemetry.io/contrib v1.43.0 h1:rv+pngknCr4qpZDxSpEvEoRioutgfbkk82x6MChJQ3U= go.opentelemetry.io/contrib v1.43.0/go.mod h1:JYdNU7Pl/2ckKMGp8/G7zeyhEbtRmy9Q8bcrtv75Znk= go.opentelemetry.io/contrib/detectors/gcp v1.43.0 h1:62yY3dT7/ShwOxzA0RsKRgshBmfElKI4d/Myu2OxDFU= go.opentelemetry.io/contrib/detectors/gcp v1.43.0/go.mod h1:RyaZMFY7yi1kAs45S6mbFGz8O8rqB0dTY14uzvG4LCs= +go.opentelemetry.io/contrib/instrumentation/go.mongodb.org/mongo-driver/v2/mongo/otelmongo v0.0.0-20260513205827-ba143fc95a5e h1:OX282aWfZNOrSVUPF59HlRhyA+MDcyi4kI8WWXt6A8I= +go.opentelemetry.io/contrib/instrumentation/go.mongodb.org/mongo-driver/v2/mongo/otelmongo v0.0.0-20260513205827-ba143fc95a5e/go.mod h1:lw7VQzmNsmkZBRQqOQiREGxO3GtzG/pOVEmKufablmA= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.68.0 h1:0Qx7VGBacMm9ZENQ7TnNObTYI4ShC+lHI16seduaxZo= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.68.0/go.mod h1:Sje3i3MjSPKTSPvVWCaL8ugBzJwik3u4smCjUeuupqg= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.68.0 h1:CqXxU8VOmDefoh0+ztfGaymYbhdB/tT3zs79QaZTNGY= @@ -414,10 +420,8 @@ golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliY golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8= golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= -golang.org/x/crypto v0.50.0 h1:zO47/JPrL6vsNkINmLoo/PH1gcxpls50DNogFvB5ZGI= -golang.org/x/crypto v0.50.0/go.mod h1:3muZ7vA7PBCE6xgPX7nkzzjiUq87kRItoJQM1Yo8S+Q= -golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8 h1:aAcj0Da7eBAtrTp03QXWvm88pSyOt+UgdZw2BFZ+lEw= -golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8/go.mod h1:CQ1k9gNrJ50XIzaKCRR2hssIjF07kZFEiieALBM/ARQ= +golang.org/x/crypto v0.51.0 h1:IBPXwPfKxY7cWQZ38ZCIRPI50YLeevDLlLnyC5wRGTI= +golang.org/x/crypto v0.51.0/go.mod h1:8AdwkbraGNABw2kOX6YFPs3WM22XqI4EXEd8g+x7Oc8= golang.org/x/image v0.25.0 h1:Y6uW6rH1y5y/LK1J8BPWZtr6yZ7hrsy6hFrXjgsc2fQ= golang.org/x/image v0.25.0/go.mod h1:tCAmOEGthTtkalusGp1g3xa2gke8J6c2N565dTyl9Rs= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= @@ -462,8 +466,8 @@ golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.43.0 h1:Rlag2XtaFTxp19wS8MXlJwTvoh8ArU6ezoyFsMyCTNI= -golang.org/x/sys v0.43.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= +golang.org/x/sys v0.44.0 h1:ildZl3J4uzeKP07r2F++Op7E9B29JRUy+a27EibtBTQ= +golang.org/x/sys v0.44.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= golang.org/x/telemetry v0.0.0-20240228155512-f48c80bd79b2/go.mod h1:TeRTkGYfJXctD9OcfyVLyj2J3IxLnKwHJR8f4D8a3YE= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= @@ -483,8 +487,8 @@ golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= -golang.org/x/text v0.36.0 h1:JfKh3XmcRPqZPKevfXVpI1wXPTqbkE5f7JA92a55Yxg= -golang.org/x/text v0.36.0/go.mod h1:NIdBknypM8iqVmPiuco0Dh6P5Jcdk8lJL0CUebqK164= +golang.org/x/text v0.37.0 h1:Cqjiwd9eSg8e0QAkyCaQTNHFIIzWtidPahFWR83rTrc= +golang.org/x/text v0.37.0/go.mod h1:a5sjxXGs9hsn/AJVwuElvCAo9v8QYLzvavO5z2PiM38= golang.org/x/time v0.15.0 h1:bbrp8t3bGUeFOx08pvsMYRTCVSMk89u4tKbNOZbp88U= golang.org/x/time v0.15.0/go.mod h1:Y4YMaQmXwGQZoFaVFk4YpCt4FLQMYKZe9oeV/f4MSno= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/api/pkg/di/container.go b/api/pkg/di/container.go index ce6a82e2..e9ca3718 100644 --- a/api/pkg/di/container.go +++ b/api/pkg/di/container.go @@ -3,7 +3,6 @@ package di import ( "context" "crypto/tls" - "database/sql" "fmt" "net/http" "os" @@ -74,6 +73,7 @@ import ( "github.com/NdoleStudio/httpsms/pkg/handlers" "github.com/NdoleStudio/httpsms/pkg/telemetry" "github.com/NdoleStudio/httpsms/pkg/validators" + mongoDriver "go.mongodb.org/mongo-driver/v2/mongo" "gorm.io/driver/postgres" gormLogger "gorm.io/gorm/logger" ) @@ -83,7 +83,7 @@ type Container struct { projectID string db *gorm.DB dedicatedDB *gorm.DB - tursoDB *sql.DB + mongoDB *mongoDriver.Database version string app *fiber.App eventDispatcher *services.EventDispatcher @@ -293,21 +293,21 @@ func (container *Container) DedicatedDB() (db *gorm.DB) { return container.dedicatedDB } -// TursoDB creates a *sql.DB connection to a Turso/libSQL database -func (container *Container) TursoDB() *sql.DB { - if container.tursoDB != nil { - return container.tursoDB +// MongoDB creates a *mongo.Database connection to MongoDB Atlas +func (container *Container) MongoDB() *mongoDriver.Database { + if container.mongoDB != nil { + return container.mongoDB } - container.logger.Debug("creating Turso *sql.DB connection") + container.logger.Debug("creating MongoDB *mongo.Database connection") - db, err := repositories.NewTursoDB(os.Getenv("TURSO_DATABASE_DSN")) + db, err := repositories.NewMongoDB(os.Getenv("MONGODB_URI")) if err != nil { container.logger.Fatal(err) } - container.tursoDB = db - return container.tursoDB + container.mongoDB = db + return container.mongoDB } // HedgingFailureCounter creates an OTel counter for hedging secondary write failures @@ -922,12 +922,12 @@ func (container *Container) MessageThreadRepository() (repository repositories.M // HeartbeatMonitorRepository creates a new instance of repositories.HeartbeatMonitorRepository func (container *Container) HeartbeatMonitorRepository() (repository repositories.HeartbeatMonitorRepository) { switch os.Getenv("HEARTBEAT_DB_BACKEND") { - case "turso": - container.logger.Debug("creating libSQL repositories.HeartbeatMonitorRepository") - return repositories.NewLibsqlHeartbeatMonitorRepository( + case "mongodb": + container.logger.Debug("creating MongoDB repositories.HeartbeatMonitorRepository") + return repositories.NewMongoHeartbeatMonitorRepository( container.Logger(), container.Tracer(), - container.TursoDB(), + container.MongoDB(), ) case "hedging": container.logger.Debug("creating hedging repositories.HeartbeatMonitorRepository") @@ -935,7 +935,7 @@ func (container *Container) HeartbeatMonitorRepository() (repository repositorie container.Logger(), container.Tracer(), repositories.NewGormHeartbeatMonitorRepository(container.Logger(), container.Tracer(), container.DedicatedDB()), - repositories.NewLibsqlHeartbeatMonitorRepository(container.Logger(), container.Tracer(), container.TursoDB()), + repositories.NewMongoHeartbeatMonitorRepository(container.Logger(), container.Tracer(), container.MongoDB()), container.HedgingFailureCounter(), ) default: @@ -1760,12 +1760,12 @@ func (container *Container) RegisterSwaggerRoutes() { // HeartbeatRepository registers a new instance of repositories.HeartbeatRepository func (container *Container) HeartbeatRepository() repositories.HeartbeatRepository { switch os.Getenv("HEARTBEAT_DB_BACKEND") { - case "turso": - container.logger.Debug("creating libSQL repositories.HeartbeatRepository") - return repositories.NewLibsqlHeartbeatRepository( + case "mongodb": + container.logger.Debug("creating MongoDB repositories.HeartbeatRepository") + return repositories.NewMongoHeartbeatRepository( container.Logger(), container.Tracer(), - container.TursoDB(), + container.MongoDB(), ) case "hedging": container.logger.Debug("creating hedging repositories.HeartbeatRepository") @@ -1773,7 +1773,7 @@ func (container *Container) HeartbeatRepository() repositories.HeartbeatReposito container.Logger(), container.Tracer(), repositories.NewGormHeartbeatRepository(container.Logger(), container.Tracer(), container.DedicatedDB()), - repositories.NewLibsqlHeartbeatRepository(container.Logger(), container.Tracer(), container.TursoDB()), + repositories.NewMongoHeartbeatRepository(container.Logger(), container.Tracer(), container.MongoDB()), container.HedgingFailureCounter(), ) default: diff --git a/api/pkg/entities/heartbeat.go b/api/pkg/entities/heartbeat.go index 629efd29..abc070e9 100644 --- a/api/pkg/entities/heartbeat.go +++ b/api/pkg/entities/heartbeat.go @@ -8,10 +8,10 @@ import ( // Heartbeat represents is a pulse from an active phone type Heartbeat struct { - ID uuid.UUID `json:"id" gorm:"primaryKey;type:uuid;" example:"32343a19-da5e-4b1b-a767-3298a73703cb"` - Owner string `json:"owner" gorm:"index:idx_heartbeats_owner_timestamp" example:"+18005550199"` - Version string `json:"version" example:"344c10f"` - Charging bool `json:"charging" example:"true"` - UserID UserID `json:"user_id" example:"WB7DRDWrJZRGbYrv2CKGkqbzvqdC"` - Timestamp time.Time `json:"timestamp" gorm:"index:idx_heartbeats_owner_timestamp" example:"2022-06-05T14:26:01.520828+03:00"` + ID uuid.UUID `json:"id" gorm:"primaryKey;type:uuid;" bson:"_id" example:"32343a19-da5e-4b1b-a767-3298a73703cb"` + Owner string `json:"owner" gorm:"index:idx_heartbeats_owner_timestamp" bson:"owner" example:"+18005550199"` + Version string `json:"version" bson:"version" example:"344c10f"` + Charging bool `json:"charging" bson:"charging" example:"true"` + UserID UserID `json:"user_id" bson:"user_id" example:"WB7DRDWrJZRGbYrv2CKGkqbzvqdC"` + Timestamp time.Time `json:"timestamp" gorm:"index:idx_heartbeats_owner_timestamp" bson:"timestamp" example:"2022-06-05T14:26:01.520828+03:00"` } diff --git a/api/pkg/entities/heartbeat_monitor.go b/api/pkg/entities/heartbeat_monitor.go index 6b41a31a..7151f195 100644 --- a/api/pkg/entities/heartbeat_monitor.go +++ b/api/pkg/entities/heartbeat_monitor.go @@ -8,14 +8,14 @@ import ( // HeartbeatMonitor is used to monitor heartbeats of a phone type HeartbeatMonitor struct { - ID uuid.UUID `json:"id" gorm:"primaryKey;type:uuid;" example:"32343a19-da5e-4b1b-a767-3298a73703cb"` - PhoneID uuid.UUID `json:"phone_id" example:"32343a19-da5e-4b1b-a767-3298a73703cb"` - UserID UserID `json:"user_id" example:"WB7DRDWrJZRGbYrv2CKGkqbzvqdC"` - QueueID string `json:"queue_id" example:"0360259236613675274"` - Owner string `json:"owner" example:"+18005550199"` - PhoneOnline bool `json:"phone_online" example:"true" default:"true"` - CreatedAt time.Time `json:"created_at" example:"2022-06-05T14:26:02.302718+03:00"` - UpdatedAt time.Time `json:"updated_at" example:"2022-06-05T14:26:10.303278+03:00"` + ID uuid.UUID `json:"id" gorm:"primaryKey;type:uuid;" bson:"_id" example:"32343a19-da5e-4b1b-a767-3298a73703cb"` + PhoneID uuid.UUID `json:"phone_id" bson:"phone_id" example:"32343a19-da5e-4b1b-a767-3298a73703cb"` + UserID UserID `json:"user_id" bson:"user_id" example:"WB7DRDWrJZRGbYrv2CKGkqbzvqdC"` + QueueID string `json:"queue_id" bson:"queue_id" example:"0360259236613675274"` + Owner string `json:"owner" bson:"owner" example:"+18005550199"` + PhoneOnline bool `json:"phone_online" bson:"phone_online" example:"true" default:"true"` + CreatedAt time.Time `json:"created_at" bson:"created_at" example:"2022-06-05T14:26:02.302718+03:00"` + UpdatedAt time.Time `json:"updated_at" bson:"updated_at" example:"2022-06-05T14:26:10.303278+03:00"` } // RequiresCheck returns true if the heartbeat monitor requires a check diff --git a/api/pkg/repositories/libsql.go b/api/pkg/repositories/libsql.go deleted file mode 100644 index 66ee8c0e..00000000 --- a/api/pkg/repositories/libsql.go +++ /dev/null @@ -1,67 +0,0 @@ -package repositories - -import ( - "database/sql" - "fmt" - - _ "github.com/tursodatabase/libsql-client-go/libsql" // libSQL database driver - - "github.com/palantir/stacktrace" -) - -const ( - tableHeartbeats = "heartbeats" - tableHeartbeatMonitors = "heartbeat_monitors" -) - -// NewTursoDB creates a new *sql.DB connection to a Turso database and auto-creates tables -func NewTursoDB(dsn string) (*sql.DB, error) { - db, err := sql.Open("libsql", dsn) - if err != nil { - return nil, stacktrace.Propagate(err, fmt.Sprintf("cannot open turso database with DSN [%s]", dsn)) - } - - if err = db.Ping(); err != nil { - return nil, stacktrace.Propagate(err, fmt.Sprintf("cannot ping turso database with DSN [%s]", dsn)) - } - - if err = createTursoTables(db); err != nil { - return nil, stacktrace.Propagate(err, "cannot create turso tables") - } - - return db, nil -} - -func createTursoTables(db *sql.DB) error { - statements := []string{ - `CREATE TABLE IF NOT EXISTS ` + tableHeartbeats + ` ( - id TEXT PRIMARY KEY, - owner TEXT NOT NULL, - version TEXT NOT NULL, - charging INTEGER NOT NULL DEFAULT 0, - user_id TEXT NOT NULL, - timestamp DATETIME NOT NULL - )`, - `CREATE INDEX IF NOT EXISTS idx_heartbeats_owner_timestamp ON ` + tableHeartbeats + `(owner, timestamp)`, - `CREATE INDEX IF NOT EXISTS idx_heartbeats_user_id ON ` + tableHeartbeats + `(user_id)`, - `CREATE TABLE IF NOT EXISTS ` + tableHeartbeatMonitors + ` ( - id TEXT PRIMARY KEY, - phone_id TEXT NOT NULL, - user_id TEXT NOT NULL, - queue_id TEXT NOT NULL DEFAULT '', - owner TEXT NOT NULL, - phone_online INTEGER NOT NULL DEFAULT 1, - created_at DATETIME NOT NULL, - updated_at DATETIME NOT NULL - )`, - `CREATE INDEX IF NOT EXISTS idx_heartbeat_monitors_user_owner ON ` + tableHeartbeatMonitors + `(user_id, owner)`, - } - - for _, stmt := range statements { - if _, err := db.Exec(stmt); err != nil { - return stacktrace.Propagate(err, fmt.Sprintf("cannot execute statement: %s", stmt)) - } - } - - return nil -} diff --git a/api/pkg/repositories/libsql_heartbeat_monitor_repository.go b/api/pkg/repositories/libsql_heartbeat_monitor_repository.go deleted file mode 100644 index 0cb594af..00000000 --- a/api/pkg/repositories/libsql_heartbeat_monitor_repository.go +++ /dev/null @@ -1,199 +0,0 @@ -package repositories - -import ( - "context" - "database/sql" - "fmt" - "time" - - "github.com/google/uuid" - - "github.com/NdoleStudio/httpsms/pkg/entities" - "github.com/NdoleStudio/httpsms/pkg/telemetry" - "github.com/palantir/stacktrace" -) - -// libsqlHeartbeatMonitorRepository is responsible for persisting entities.HeartbeatMonitor in Turso/libSQL -type libsqlHeartbeatMonitorRepository struct { - logger telemetry.Logger - tracer telemetry.Tracer - db *sql.DB -} - -// NewLibsqlHeartbeatMonitorRepository creates the libSQL version of the HeartbeatMonitorRepository -func NewLibsqlHeartbeatMonitorRepository( - logger telemetry.Logger, - tracer telemetry.Tracer, - db *sql.DB, -) HeartbeatMonitorRepository { - return &libsqlHeartbeatMonitorRepository{ - logger: logger.WithService(fmt.Sprintf("%T", &libsqlHeartbeatMonitorRepository{})), - tracer: tracer, - db: db, - } -} - -func (repository *libsqlHeartbeatMonitorRepository) Store(ctx context.Context, monitor *entities.HeartbeatMonitor) error { - ctx, span := repository.tracer.Start(ctx) - defer span.End() - - ctx, cancel := context.WithTimeout(ctx, dbOperationDuration) - defer cancel() - - _, err := repository.db.ExecContext(ctx, - "INSERT INTO "+tableHeartbeatMonitors+" (id, phone_id, user_id, queue_id, owner, phone_online, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", - monitor.ID.String(), - monitor.PhoneID.String(), - string(monitor.UserID), - monitor.QueueID, - monitor.Owner, - boolToInt(monitor.PhoneOnline), - monitor.CreatedAt.UTC(), - monitor.UpdatedAt.UTC(), - ) - if err != nil { - msg := fmt.Sprintf("cannot save heartbeat monitor with ID [%s]", monitor.ID) - return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) - } - - return nil -} - -func (repository *libsqlHeartbeatMonitorRepository) Load(ctx context.Context, userID entities.UserID, phoneNumber string) (*entities.HeartbeatMonitor, error) { - ctx, span := repository.tracer.Start(ctx) - defer span.End() - - ctx, cancel := context.WithTimeout(ctx, dbOperationDuration) - defer cancel() - - row := repository.db.QueryRowContext(ctx, - "SELECT id, phone_id, user_id, queue_id, owner, phone_online, created_at, updated_at FROM "+tableHeartbeatMonitors+" WHERE user_id = ? AND owner = ? LIMIT 1", - string(userID), phoneNumber, - ) - - monitor, err := repository.scanHeartbeatMonitorRow(row) - if err == sql.ErrNoRows { - msg := fmt.Sprintf("heartbeat monitor with userID [%s] and owner [%s] does not exist", userID, phoneNumber) - return nil, repository.tracer.WrapErrorSpan(span, stacktrace.PropagateWithCode(err, ErrCodeNotFound, msg)) - } - if err != nil { - msg := fmt.Sprintf("cannot load heartbeat monitor with userID [%s] and owner [%s]", userID, phoneNumber) - return nil, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) - } - - return monitor, nil -} - -func (repository *libsqlHeartbeatMonitorRepository) Exists(ctx context.Context, userID entities.UserID, monitorID uuid.UUID) (bool, error) { - ctx, span := repository.tracer.Start(ctx) - defer span.End() - - ctx, cancel := context.WithTimeout(ctx, dbOperationDuration) - defer cancel() - - var count int - err := repository.db.QueryRowContext(ctx, - "SELECT COUNT(*) FROM "+tableHeartbeatMonitors+" WHERE user_id = ? AND id = ?", - string(userID), monitorID.String(), - ).Scan(&count) - if err != nil { - msg := fmt.Sprintf("cannot check if heartbeat monitor exists with userID [%s] and monitor ID [%s]", userID, monitorID) - return false, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) - } - - return count > 0, nil -} - -func (repository *libsqlHeartbeatMonitorRepository) UpdateQueueID(ctx context.Context, monitorID uuid.UUID, queueID string) error { - ctx, span := repository.tracer.Start(ctx) - defer span.End() - - ctx, cancel := context.WithTimeout(ctx, dbOperationDuration) - defer cancel() - - _, err := repository.db.ExecContext(ctx, - "UPDATE "+tableHeartbeatMonitors+" SET queue_id = ?, updated_at = ? WHERE id = ?", - queueID, time.Now().UTC(), monitorID.String(), - ) - if err != nil { - msg := fmt.Sprintf("cannot update heartbeat monitor ID [%s]", monitorID) - return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) - } - - return nil -} - -func (repository *libsqlHeartbeatMonitorRepository) Delete(ctx context.Context, userID entities.UserID, phoneNumber string) error { - ctx, span := repository.tracer.Start(ctx) - defer span.End() - - ctx, cancel := context.WithTimeout(ctx, dbOperationDuration) - defer cancel() - - _, err := repository.db.ExecContext(ctx, - "DELETE FROM "+tableHeartbeatMonitors+" WHERE user_id = ? AND owner = ?", - string(userID), phoneNumber, - ) - if err != nil { - msg := fmt.Sprintf("cannot delete heartbeat monitor with owner [%s] and userID [%s]", phoneNumber, userID) - return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) - } - - return nil -} - -func (repository *libsqlHeartbeatMonitorRepository) UpdatePhoneOnline(ctx context.Context, userID entities.UserID, monitorID uuid.UUID, online bool) error { - ctx, span := repository.tracer.Start(ctx) - defer span.End() - - ctx, cancel := context.WithTimeout(ctx, dbOperationDuration) - defer cancel() - - _, err := repository.db.ExecContext(ctx, - "UPDATE "+tableHeartbeatMonitors+" SET phone_online = ?, updated_at = ? WHERE id = ? AND user_id = ?", - boolToInt(online), time.Now().UTC(), monitorID.String(), string(userID), - ) - if err != nil { - msg := fmt.Sprintf("cannot update heartbeat monitor ID [%s] for user [%s]", monitorID, userID) - return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) - } - - return nil -} - -func (repository *libsqlHeartbeatMonitorRepository) DeleteAllForUser(ctx context.Context, userID entities.UserID) error { - ctx, span := repository.tracer.Start(ctx) - defer span.End() - - ctx, cancel := context.WithTimeout(ctx, dbOperationDuration) - defer cancel() - - _, err := repository.db.ExecContext(ctx, "DELETE FROM "+tableHeartbeatMonitors+" WHERE user_id = ?", string(userID)) - if err != nil { - msg := fmt.Sprintf("cannot delete all [%T] for user with ID [%s]", &entities.HeartbeatMonitor{}, userID) - return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) - } - - return nil -} - -func (repository *libsqlHeartbeatMonitorRepository) scanHeartbeatMonitorRow(row *sql.Row) (*entities.HeartbeatMonitor, error) { - monitor := new(entities.HeartbeatMonitor) - var id, phoneID, userID string - var phoneOnline int - err := row.Scan(&id, &phoneID, &userID, &monitor.QueueID, &monitor.Owner, &phoneOnline, &monitor.CreatedAt, &monitor.UpdatedAt) - if err != nil { - return nil, err - } - monitor.ID, err = uuid.Parse(id) - if err != nil { - return nil, stacktrace.Propagate(err, fmt.Sprintf("cannot parse heartbeat monitor ID [%s]", id)) - } - monitor.PhoneID, err = uuid.Parse(phoneID) - if err != nil { - return nil, stacktrace.Propagate(err, fmt.Sprintf("cannot parse heartbeat monitor phone ID [%s]", phoneID)) - } - monitor.UserID = entities.UserID(userID) - monitor.PhoneOnline = phoneOnline != 0 - return monitor, nil -} diff --git a/api/pkg/repositories/libsql_heartbeat_repository.go b/api/pkg/repositories/libsql_heartbeat_repository.go deleted file mode 100644 index 42fdf911..00000000 --- a/api/pkg/repositories/libsql_heartbeat_repository.go +++ /dev/null @@ -1,186 +0,0 @@ -package repositories - -import ( - "context" - "database/sql" - "fmt" - - "github.com/google/uuid" - - "github.com/NdoleStudio/httpsms/pkg/entities" - "github.com/NdoleStudio/httpsms/pkg/telemetry" - "github.com/palantir/stacktrace" -) - -// libsqlHeartbeatRepository is responsible for persisting entities.Heartbeat in Turso/libSQL -type libsqlHeartbeatRepository struct { - logger telemetry.Logger - tracer telemetry.Tracer - db *sql.DB -} - -// NewLibsqlHeartbeatRepository creates the libSQL version of the HeartbeatRepository -func NewLibsqlHeartbeatRepository( - logger telemetry.Logger, - tracer telemetry.Tracer, - db *sql.DB, -) HeartbeatRepository { - return &libsqlHeartbeatRepository{ - logger: logger.WithService(fmt.Sprintf("%T", &libsqlHeartbeatRepository{})), - tracer: tracer, - db: db, - } -} - -func (repository *libsqlHeartbeatRepository) Store(ctx context.Context, heartbeat *entities.Heartbeat) error { - ctx, span, _ := repository.tracer.StartWithLogger(ctx, repository.logger) - defer span.End() - - ctx, cancel := context.WithTimeout(ctx, dbOperationDuration) - defer cancel() - - _, err := repository.db.ExecContext(ctx, - "INSERT INTO "+tableHeartbeats+" (id, owner, version, charging, user_id, timestamp) VALUES (?, ?, ?, ?, ?, ?)", - heartbeat.ID.String(), - heartbeat.Owner, - heartbeat.Version, - boolToInt(heartbeat.Charging), - string(heartbeat.UserID), - heartbeat.Timestamp.UTC(), - ) - if err != nil { - msg := fmt.Sprintf("cannot save heartbeat with ID [%s]", heartbeat.ID) - return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) - } - - return nil -} - -func (repository *libsqlHeartbeatRepository) Index(ctx context.Context, userID entities.UserID, owner string, params IndexParams) (*[]entities.Heartbeat, error) { - ctx, span := repository.tracer.Start(ctx) - defer span.End() - - ctx, cancel := context.WithTimeout(ctx, dbOperationDuration) - defer cancel() - - var rows *sql.Rows - var err error - - if len(params.Query) > 0 { - queryPattern := "%" + params.Query + "%" - rows, err = repository.db.QueryContext(ctx, - "SELECT id, owner, version, charging, user_id, timestamp FROM "+tableHeartbeats+" WHERE user_id = ? AND owner = ? AND version LIKE ? ORDER BY timestamp DESC LIMIT ? OFFSET ?", - string(userID), owner, queryPattern, params.Limit, params.Skip, - ) - } else { - rows, err = repository.db.QueryContext(ctx, - "SELECT id, owner, version, charging, user_id, timestamp FROM "+tableHeartbeats+" WHERE user_id = ? AND owner = ? ORDER BY timestamp DESC LIMIT ? OFFSET ?", - string(userID), owner, params.Limit, params.Skip, - ) - } - if err != nil { - msg := fmt.Sprintf("cannot fetch heartbeats with owner [%s] and params [%+#v]", owner, params) - return nil, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) - } - defer rows.Close() - - heartbeats := make([]entities.Heartbeat, 0) - for rows.Next() { - heartbeat, scanErr := repository.scanHeartbeat(rows) - if scanErr != nil { - msg := fmt.Sprintf("cannot scan heartbeat row for owner [%s]", owner) - return nil, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(scanErr, msg)) - } - heartbeats = append(heartbeats, *heartbeat) - } - if rowsErr := rows.Err(); rowsErr != nil { - msg := fmt.Sprintf("error iterating heartbeat rows for owner [%s]", owner) - return nil, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(rowsErr, msg)) - } - - return &heartbeats, nil -} - -func (repository *libsqlHeartbeatRepository) Last(ctx context.Context, userID entities.UserID, owner string) (*entities.Heartbeat, error) { - ctx, span := repository.tracer.Start(ctx) - defer span.End() - - ctx, cancel := context.WithTimeout(ctx, dbOperationDuration) - defer cancel() - - row := repository.db.QueryRowContext(ctx, - "SELECT id, owner, version, charging, user_id, timestamp FROM "+tableHeartbeats+" WHERE user_id = ? AND owner = ? ORDER BY timestamp DESC LIMIT 1", - string(userID), owner, - ) - - heartbeat, err := repository.scanHeartbeatRow(row) - if err == sql.ErrNoRows { - msg := fmt.Sprintf("heartbeat with userID [%s] and owner [%s] does not exist", userID, owner) - return nil, repository.tracer.WrapErrorSpan(span, stacktrace.PropagateWithCode(err, ErrCodeNotFound, msg)) - } - if err != nil { - msg := fmt.Sprintf("cannot load heartbeat with userID [%s] and owner [%s]", userID, owner) - return nil, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) - } - - return heartbeat, nil -} - -func (repository *libsqlHeartbeatRepository) DeleteAllForUser(ctx context.Context, userID entities.UserID) error { - ctx, span := repository.tracer.Start(ctx) - defer span.End() - - ctx, cancel := context.WithTimeout(ctx, dbOperationDuration) - defer cancel() - - _, err := repository.db.ExecContext(ctx, "DELETE FROM "+tableHeartbeats+" WHERE user_id = ?", string(userID)) - if err != nil { - msg := fmt.Sprintf("cannot delete all [%T] for user with ID [%s]", &entities.Heartbeat{}, userID) - return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) - } - - return nil -} - -func (repository *libsqlHeartbeatRepository) scanHeartbeat(rows *sql.Rows) (*entities.Heartbeat, error) { - heartbeat := new(entities.Heartbeat) - var id string - var charging int - var userID string - err := rows.Scan(&id, &heartbeat.Owner, &heartbeat.Version, &charging, &userID, &heartbeat.Timestamp) - if err != nil { - return nil, err - } - heartbeat.ID, err = uuid.Parse(id) - if err != nil { - return nil, stacktrace.Propagate(err, fmt.Sprintf("cannot parse heartbeat ID [%s]", id)) - } - heartbeat.Charging = charging != 0 - heartbeat.UserID = entities.UserID(userID) - return heartbeat, nil -} - -func (repository *libsqlHeartbeatRepository) scanHeartbeatRow(row *sql.Row) (*entities.Heartbeat, error) { - heartbeat := new(entities.Heartbeat) - var id string - var charging int - var userID string - err := row.Scan(&id, &heartbeat.Owner, &heartbeat.Version, &charging, &userID, &heartbeat.Timestamp) - if err != nil { - return nil, err - } - heartbeat.ID, err = uuid.Parse(id) - if err != nil { - return nil, stacktrace.Propagate(err, fmt.Sprintf("cannot parse heartbeat ID [%s]", id)) - } - heartbeat.Charging = charging != 0 - heartbeat.UserID = entities.UserID(userID) - return heartbeat, nil -} - -func boolToInt(b bool) int { - if b { - return 1 - } - return 0 -} diff --git a/api/pkg/repositories/mongo_heartbeat_monitor_repository.go b/api/pkg/repositories/mongo_heartbeat_monitor_repository.go new file mode 100644 index 00000000..13200c07 --- /dev/null +++ b/api/pkg/repositories/mongo_heartbeat_monitor_repository.go @@ -0,0 +1,182 @@ +package repositories + +import ( + "context" + "fmt" + "time" + + "github.com/google/uuid" + "go.mongodb.org/mongo-driver/v2/bson" + "go.mongodb.org/mongo-driver/v2/mongo" + + "github.com/NdoleStudio/httpsms/pkg/entities" + "github.com/NdoleStudio/httpsms/pkg/telemetry" + "github.com/palantir/stacktrace" +) + +// mongoHeartbeatMonitorRepository is responsible for persisting entities.HeartbeatMonitor in MongoDB +type mongoHeartbeatMonitorRepository struct { + logger telemetry.Logger + tracer telemetry.Tracer + collection *mongo.Collection +} + +// NewMongoHeartbeatMonitorRepository creates the MongoDB version of the HeartbeatMonitorRepository +func NewMongoHeartbeatMonitorRepository( + logger telemetry.Logger, + tracer telemetry.Tracer, + db *mongo.Database, +) HeartbeatMonitorRepository { + return &mongoHeartbeatMonitorRepository{ + logger: logger.WithService(fmt.Sprintf("%T", &mongoHeartbeatMonitorRepository{})), + tracer: tracer, + collection: db.Collection(collectionHeartbeatMonitors), + } +} + +func (repository *mongoHeartbeatMonitorRepository) Store(ctx context.Context, monitor *entities.HeartbeatMonitor) error { + ctx, span := repository.tracer.Start(ctx) + defer span.End() + + ctx, cancel := context.WithTimeout(ctx, dbOperationDuration) + defer cancel() + + _, err := repository.collection.InsertOne(ctx, monitor) + if err != nil { + msg := fmt.Sprintf("cannot save heartbeat monitor with ID [%s]", monitor.ID) + return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) + } + + return nil +} + +func (repository *mongoHeartbeatMonitorRepository) Load(ctx context.Context, userID entities.UserID, phoneNumber string) (*entities.HeartbeatMonitor, error) { + ctx, span := repository.tracer.Start(ctx) + defer span.End() + + ctx, cancel := context.WithTimeout(ctx, dbOperationDuration) + defer cancel() + + filter := bson.D{ + {"user_id", string(userID)}, + {"owner", phoneNumber}, + } + + var monitor entities.HeartbeatMonitor + err := repository.collection.FindOne(ctx, filter).Decode(&monitor) + if err == mongo.ErrNoDocuments { + msg := fmt.Sprintf("heartbeat monitor with userID [%s] and owner [%s] does not exist", userID, phoneNumber) + return nil, repository.tracer.WrapErrorSpan(span, stacktrace.PropagateWithCode(err, ErrCodeNotFound, msg)) + } + if err != nil { + msg := fmt.Sprintf("cannot load heartbeat monitor with userID [%s] and owner [%s]", userID, phoneNumber) + return nil, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) + } + + return &monitor, nil +} + +func (repository *mongoHeartbeatMonitorRepository) Exists(ctx context.Context, userID entities.UserID, monitorID uuid.UUID) (bool, error) { + ctx, span := repository.tracer.Start(ctx) + defer span.End() + + ctx, cancel := context.WithTimeout(ctx, dbOperationDuration) + defer cancel() + + filter := bson.D{ + {"user_id", string(userID)}, + {"_id", monitorID.String()}, + } + + count, err := repository.collection.CountDocuments(ctx, filter) + if err != nil { + msg := fmt.Sprintf("cannot check if heartbeat monitor exists with userID [%s] and monitor ID [%s]", userID, monitorID) + return false, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) + } + + return count > 0, nil +} + +func (repository *mongoHeartbeatMonitorRepository) UpdateQueueID(ctx context.Context, monitorID uuid.UUID, queueID string) error { + ctx, span := repository.tracer.Start(ctx) + defer span.End() + + ctx, cancel := context.WithTimeout(ctx, dbOperationDuration) + defer cancel() + + filter := bson.D{{"_id", monitorID.String()}} + update := bson.D{{"$set", bson.D{ + {"queue_id", queueID}, + {"updated_at", time.Now().UTC()}, + }}} + + _, err := repository.collection.UpdateOne(ctx, filter, update) + if err != nil { + msg := fmt.Sprintf("cannot update heartbeat monitor ID [%s]", monitorID) + return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) + } + + return nil +} + +func (repository *mongoHeartbeatMonitorRepository) Delete(ctx context.Context, userID entities.UserID, phoneNumber string) error { + ctx, span := repository.tracer.Start(ctx) + defer span.End() + + ctx, cancel := context.WithTimeout(ctx, dbOperationDuration) + defer cancel() + + filter := bson.D{ + {"user_id", string(userID)}, + {"owner", phoneNumber}, + } + + _, err := repository.collection.DeleteMany(ctx, filter) + if err != nil { + msg := fmt.Sprintf("cannot delete heartbeat monitor with owner [%s] and userID [%s]", phoneNumber, userID) + return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) + } + + return nil +} + +func (repository *mongoHeartbeatMonitorRepository) UpdatePhoneOnline(ctx context.Context, userID entities.UserID, monitorID uuid.UUID, online bool) error { + ctx, span := repository.tracer.Start(ctx) + defer span.End() + + ctx, cancel := context.WithTimeout(ctx, dbOperationDuration) + defer cancel() + + filter := bson.D{ + {"_id", monitorID.String()}, + {"user_id", string(userID)}, + } + update := bson.D{{"$set", bson.D{ + {"phone_online", online}, + {"updated_at", time.Now().UTC()}, + }}} + + _, err := repository.collection.UpdateOne(ctx, filter, update) + if err != nil { + msg := fmt.Sprintf("cannot update heartbeat monitor ID [%s] for user [%s]", monitorID, userID) + return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) + } + + return nil +} + +func (repository *mongoHeartbeatMonitorRepository) DeleteAllForUser(ctx context.Context, userID entities.UserID) error { + ctx, span := repository.tracer.Start(ctx) + defer span.End() + + ctx, cancel := context.WithTimeout(ctx, dbOperationDuration) + defer cancel() + + _, err := repository.collection.DeleteMany(ctx, bson.D{{"user_id", string(userID)}}) + if err != nil { + msg := fmt.Sprintf("cannot delete all [%T] for user with ID [%s]", &entities.HeartbeatMonitor{}, userID) + return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) + } + + return nil +} diff --git a/api/pkg/repositories/mongo_heartbeat_repository.go b/api/pkg/repositories/mongo_heartbeat_repository.go new file mode 100644 index 00000000..d8a7839c --- /dev/null +++ b/api/pkg/repositories/mongo_heartbeat_repository.go @@ -0,0 +1,135 @@ +package repositories + +import ( + "context" + "fmt" + + "go.mongodb.org/mongo-driver/v2/bson" + "go.mongodb.org/mongo-driver/v2/mongo" + "go.mongodb.org/mongo-driver/v2/mongo/options" + + "github.com/NdoleStudio/httpsms/pkg/entities" + "github.com/NdoleStudio/httpsms/pkg/telemetry" + "github.com/palantir/stacktrace" +) + +// mongoHeartbeatRepository is responsible for persisting entities.Heartbeat in MongoDB +type mongoHeartbeatRepository struct { + logger telemetry.Logger + tracer telemetry.Tracer + collection *mongo.Collection +} + +// NewMongoHeartbeatRepository creates the MongoDB version of the HeartbeatRepository +func NewMongoHeartbeatRepository( + logger telemetry.Logger, + tracer telemetry.Tracer, + db *mongo.Database, +) HeartbeatRepository { + return &mongoHeartbeatRepository{ + logger: logger.WithService(fmt.Sprintf("%T", &mongoHeartbeatRepository{})), + tracer: tracer, + collection: db.Collection(collectionHeartbeats), + } +} + +func (repository *mongoHeartbeatRepository) Store(ctx context.Context, heartbeat *entities.Heartbeat) error { + ctx, span, _ := repository.tracer.StartWithLogger(ctx, repository.logger) + defer span.End() + + ctx, cancel := context.WithTimeout(ctx, dbOperationDuration) + defer cancel() + + _, err := repository.collection.InsertOne(ctx, heartbeat) + if err != nil { + msg := fmt.Sprintf("cannot save heartbeat with ID [%s]", heartbeat.ID) + return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) + } + + return nil +} + +func (repository *mongoHeartbeatRepository) Index(ctx context.Context, userID entities.UserID, owner string, params IndexParams) (*[]entities.Heartbeat, error) { + ctx, span := repository.tracer.Start(ctx) + defer span.End() + + ctx, cancel := context.WithTimeout(ctx, dbOperationDuration) + defer cancel() + + filter := bson.D{ + {"user_id", string(userID)}, + {"owner", owner}, + } + + if len(params.Query) > 0 { + filter = append(filter, bson.E{"version", bson.D{{"$regex", params.Query}, {"$options", "i"}}}) + } + + opts := options.Find(). + SetSort(bson.D{{"timestamp", -1}}). + SetSkip(int64(params.Skip)). + SetLimit(int64(params.Limit)) + + cursor, err := repository.collection.Find(ctx, filter, opts) + if err != nil { + msg := fmt.Sprintf("cannot fetch heartbeats with owner [%s] and params [%+#v]", owner, params) + return nil, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) + } + defer cursor.Close(ctx) + + var heartbeats []entities.Heartbeat + if err = cursor.All(ctx, &heartbeats); err != nil { + msg := fmt.Sprintf("cannot decode heartbeats for owner [%s]", owner) + return nil, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) + } + + if heartbeats == nil { + heartbeats = make([]entities.Heartbeat, 0) + } + + return &heartbeats, nil +} + +func (repository *mongoHeartbeatRepository) Last(ctx context.Context, userID entities.UserID, owner string) (*entities.Heartbeat, error) { + ctx, span := repository.tracer.Start(ctx) + defer span.End() + + ctx, cancel := context.WithTimeout(ctx, dbOperationDuration) + defer cancel() + + filter := bson.D{ + {"user_id", string(userID)}, + {"owner", owner}, + } + + opts := options.FindOne().SetSort(bson.D{{"timestamp", -1}}) + + var heartbeat entities.Heartbeat + err := repository.collection.FindOne(ctx, filter, opts).Decode(&heartbeat) + if err == mongo.ErrNoDocuments { + msg := fmt.Sprintf("heartbeat with userID [%s] and owner [%s] does not exist", userID, owner) + return nil, repository.tracer.WrapErrorSpan(span, stacktrace.PropagateWithCode(err, ErrCodeNotFound, msg)) + } + if err != nil { + msg := fmt.Sprintf("cannot load heartbeat with userID [%s] and owner [%s]", userID, owner) + return nil, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) + } + + return &heartbeat, nil +} + +func (repository *mongoHeartbeatRepository) DeleteAllForUser(ctx context.Context, userID entities.UserID) error { + ctx, span := repository.tracer.Start(ctx) + defer span.End() + + ctx, cancel := context.WithTimeout(ctx, dbOperationDuration) + defer cancel() + + _, err := repository.collection.DeleteMany(ctx, bson.D{{"user_id", string(userID)}}) + if err != nil { + msg := fmt.Sprintf("cannot delete all [%T] for user with ID [%s]", &entities.Heartbeat{}, userID) + return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) + } + + return nil +} diff --git a/api/pkg/repositories/mongodb.go b/api/pkg/repositories/mongodb.go new file mode 100644 index 00000000..f2f65ea9 --- /dev/null +++ b/api/pkg/repositories/mongodb.go @@ -0,0 +1,127 @@ +package repositories + +import ( + "context" + "fmt" + "net/url" + "reflect" + "time" + + "github.com/google/uuid" + "github.com/palantir/stacktrace" + "go.mongodb.org/mongo-driver/v2/bson" + "go.mongodb.org/mongo-driver/v2/mongo" + "go.mongodb.org/mongo-driver/v2/mongo/options" + "go.opentelemetry.io/contrib/instrumentation/go.mongodb.org/mongo-driver/v2/mongo/otelmongo" +) + +const ( + collectionHeartbeats = "heartbeats" + collectionHeartbeatMonitors = "heartbeat_monitors" +) + +// uuidEncodeValue encodes uuid.UUID as a BSON string +func uuidEncodeValue(_ bson.EncodeContext, vw bson.ValueWriter, val reflect.Value) error { + u := val.Interface().(uuid.UUID) + return vw.WriteString(u.String()) +} + +// uuidDecodeValue decodes a BSON string into uuid.UUID +func uuidDecodeValue(_ bson.DecodeContext, vr bson.ValueReader, val reflect.Value) error { + str, err := vr.ReadString() + if err != nil { + return err + } + parsed, err := uuid.Parse(str) + if err != nil { + return err + } + val.Set(reflect.ValueOf(parsed)) + return nil +} + +// newMongoRegistry creates a BSON registry that encodes uuid.UUID as strings +func newMongoRegistry() *bson.Registry { + rb := bson.NewRegistry() + rb.RegisterTypeEncoder(reflect.TypeOf(uuid.UUID{}), bson.ValueEncoderFunc(uuidEncodeValue)) + rb.RegisterTypeDecoder(reflect.TypeOf(uuid.UUID{}), bson.ValueDecoderFunc(uuidDecodeValue)) + return rb +} + +// NewMongoDB creates a new *mongo.Database connection to MongoDB Atlas and ensures indexes. +// The database name is derived from the appName query parameter in the URI. +func NewMongoDB(uri string) (*mongo.Database, error) { + dbName, err := parseMongoDBName(uri) + if err != nil { + return nil, stacktrace.Propagate(err, "cannot parse database name from MongoDB URI") + } + + pingCtx, pingCancel := context.WithTimeout(context.Background(), 10*time.Second) + defer pingCancel() + + serverAPI := options.ServerAPI(options.ServerAPIVersion1) + registry := newMongoRegistry() + opts := options.Client(). + ApplyURI(uri). + SetServerAPIOptions(serverAPI). + SetRegistry(registry). + SetMonitor(otelmongo.NewMonitor()) + + client, err := mongo.Connect(opts) + if err != nil { + return nil, stacktrace.Propagate(err, "cannot connect to MongoDB Atlas") + } + + if err = client.Ping(pingCtx, nil); err != nil { + return nil, stacktrace.Propagate(err, "cannot ping MongoDB Atlas") + } + + db := client.Database(dbName) + + indexCtx, indexCancel := context.WithTimeout(context.Background(), 30*time.Second) + defer indexCancel() + + if err = createMongoIndexes(indexCtx, db); err != nil { + return nil, stacktrace.Propagate(err, "cannot create MongoDB indexes") + } + + return db, nil +} + +// parseMongoDBName extracts the appName query parameter from the MongoDB URI to use as the database name +func parseMongoDBName(uri string) (string, error) { + parsed, err := url.Parse(uri) + if err != nil { + return "", stacktrace.Propagate(err, fmt.Sprintf("cannot parse MongoDB URI [%s]", uri)) + } + + appName := parsed.Query().Get("appName") + if appName == "" { + return "", stacktrace.NewError("MongoDB URI is missing the 'appName' query parameter which is used as the database name") + } + + return appName, nil +} + +func createMongoIndexes(ctx context.Context, db *mongo.Database) error { + // Heartbeats indexes + heartbeatsCol := db.Collection(collectionHeartbeats) + _, err := heartbeatsCol.Indexes().CreateMany(ctx, []mongo.IndexModel{ + {Keys: bson.D{{"owner", 1}, {"timestamp", -1}}}, + {Keys: bson.D{{"user_id", 1}}}, + }) + if err != nil { + return stacktrace.Propagate(err, "cannot create indexes on heartbeats collection") + } + + // Heartbeat monitors indexes + monitorsCol := db.Collection(collectionHeartbeatMonitors) + _, err = monitorsCol.Indexes().CreateMany(ctx, []mongo.IndexModel{ + {Keys: bson.D{{"user_id", 1}, {"owner", 1}}}, + }) + if err != nil { + return stacktrace.Propagate(err, "cannot create indexes on heartbeat_monitors collection") + } + + return nil +} diff --git a/tests/.env.test b/tests/.env.test index 909902ae..ac18af92 100644 --- a/tests/.env.test +++ b/tests/.env.test @@ -29,4 +29,4 @@ GCS_BUCKET_NAME= UPTRACE_DSN= CLOUDFLARE_TURNSTILE_SECRET_KEY= HEARTBEAT_DB_BACKEND=hedging -TURSO_DATABASE_DSN=http://sqld:8080 +MONGODB_URI=mongodb://httpsms:testpassword@mongodb:27017/?authSource=admin&appName=httpsms diff --git a/tests/docker-compose.yml b/tests/docker-compose.yml index 1e111b78..515b6298 100644 --- a/tests/docker-compose.yml +++ b/tests/docker-compose.yml @@ -25,10 +25,19 @@ services: timeout: 5s retries: 10 - sqld: - image: ghcr.io/tursodatabase/libsql-server:latest + mongodb: + image: mongo:7 ports: - - "8090:8080" + - "27017:27017" + environment: + MONGO_INITDB_ROOT_USERNAME: httpsms + MONGO_INITDB_ROOT_PASSWORD: testpassword + healthcheck: + test: ["CMD", "mongosh", "--eval", "db.adminCommand('ping')"] + interval: 5s + timeout: 5s + retries: 10 + start_period: 5s wiremock: image: wiremock/wiremock:3x @@ -58,8 +67,8 @@ services: condition: service_healthy wiremock: condition: service_healthy - sqld: - condition: service_started + mongodb: + condition: service_healthy env_file: - .env.test environment: diff --git a/web/pages/settings/index.vue b/web/pages/settings/index.vue index ebe95562..96aa83f3 100644 --- a/web/pages/settings/index.vue +++ b/web/pages/settings/index.vue @@ -385,7 +385,7 @@ delivered when the schedule opens according to your configured send rate.
@@ -435,10 +435,18 @@ -