From 57dd732339c97a1a3cc36d915176494f6905815a Mon Sep 17 00:00:00 2001 From: Raphael Simon Date: Sun, 13 Oct 2024 14:08:19 -0700 Subject: [PATCH 1/8] Add CI --- .env | 3 +- .github/workflows/ci.yml | 31 ++++++++++++ .github/workflows/codeql.yml | 62 ++++++++++++++++++++++++ .github/workflows/dependency-review.yml | 20 ++++++++ examples/pool/scheduler/main.go | 5 +- examples/pool/worker/main.go | 31 +++++++++--- examples/streaming/multi-readers/main.go | 6 ++- scripts/run-examples | 40 ++++++++++----- 8 files changed, 176 insertions(+), 22 deletions(-) create mode 100644 .github/workflows/ci.yml create mode 100644 .github/workflows/codeql.yml create mode 100644 .github/workflows/dependency-review.yml diff --git a/.env b/.env index 7ecadfb..889ced1 100644 --- a/.env +++ b/.env @@ -1 +1,2 @@ -export REDIS_PASSWORD=redispassword \ No newline at end of file +export REDIS_PASSWORD=redispassword +export TIMEOUT=1s \ No newline at end of file diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..2b1f7ce --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,31 @@ +name: Run Static Checks and Tests + +on: + push: + branches: + - main + pull_request: + branches: + - main + +jobs: + ci: + strategy: + fail-fast: true + matrix: + go: ['1.22'] + os: ['ubuntu-latest', 'windows-latest'] + runs-on: ${{ matrix.os }} + + steps: + - name: Check out code + uses: actions/checkout@v4 + + - name: Set up Go ${{ matrix.go }} + uses: actions/setup-go@v5 + with: + go-version: ${{ matrix.go }} + id: go + + - name: Build + run: scripts/cibuild \ No newline at end of file diff --git a/.github/workflows/codeql.yml b/.github/workflows/codeql.yml new file mode 100644 index 0000000..441076e --- /dev/null +++ b/.github/workflows/codeql.yml @@ -0,0 +1,62 @@ +name: "CodeQL" + +on: + push: + branches: [ "main" ] + pull_request: + # The branches below must be a subset of the branches above + branches: [ "main" ] + +jobs: + analyze: + name: Analyze + runs-on: ubuntu-latest + permissions: + actions: read + contents: read + security-events: write + + strategy: + fail-fast: false + matrix: + language: [ 'go' ] + # CodeQL supports [ 'cpp', 'csharp', 'go', 'java', 'javascript', 'python', 'ruby' ] + # Learn more about CodeQL language support at https://aka.ms/codeql-docs/language-support + + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Set up Go + uses: actions/setup-go@v5 + with: + go-version-file: 'go.mod' + + # Initializes the CodeQL tools for scanning. + - name: Initialize CodeQL + uses: github/codeql-action/init@v3 + with: + languages: ${{ matrix.language }} + # If you wish to specify custom queries, you can do so here or in a config file. + # By default, queries listed here will override any specified in a config file. + # Prefix the list here with "+" to use these queries and those in the config file. + + # Details on CodeQL's query packs refer to : https://docs.github.com/en/code-security/code-scanning/automatically-scanning-your-code-for-vulnerabilities-and-errors/configuring-code-scanning#using-queries-in-ql-packs + # queries: security-extended,security-and-quality + + + # Autobuild attempts to build any compiled languages (C/C++, C#, Go, or Java). + # If this step fails, then you should remove it and run the build manually (see below) + #- name: Autobuild + # uses: github/codeql-action/autobuild@v2 + + # â„šī¸ Command-line programs to run using the OS shell. + # 📚 See https://docs.github.com/en/actions/using-workflows/workflow-syntax-for-github-actions#jobsjob_idstepsrun + + # If the Autobuild fails above, remove it and uncomment the following three lines. + # modify them (or add more) to build your code if your project, please refer to the EXAMPLE below for guidance. + + - name: Perform CodeQL Analysis + uses: github/codeql-action/analyze@v3 + with: + category: "/language:${{matrix.language}}" diff --git a/.github/workflows/dependency-review.yml b/.github/workflows/dependency-review.yml new file mode 100644 index 0000000..0d4a013 --- /dev/null +++ b/.github/workflows/dependency-review.yml @@ -0,0 +1,20 @@ +# Dependency Review Action +# +# This Action will scan dependency manifest files that change as part of a Pull Request, surfacing known-vulnerable versions of the packages declared or updated in the PR. Once installed, if the workflow run is marked as required, PRs introducing known-vulnerable packages will be blocked from merging. +# +# Source repository: https://github.com/actions/dependency-review-action +# Public documentation: https://docs.github.com/en/code-security/supply-chain-security/understanding-your-software-supply-chain/about-dependency-review#dependency-review-enforcement +name: 'Dependency Review' +on: [pull_request] + +permissions: + contents: read + +jobs: + dependency-review: + runs-on: ubuntu-latest + steps: + - name: 'Checkout Repository' + uses: actions/checkout@v4 + - name: 'Dependency Review' + uses: actions/dependency-review-action@v4 diff --git a/examples/pool/scheduler/main.go b/examples/pool/scheduler/main.go index be7fe5d..70e9e77 100644 --- a/examples/pool/scheduler/main.go +++ b/examples/pool/scheduler/main.go @@ -28,7 +28,10 @@ func main() { } // Create node for pool "example". - node, err := pool.AddNode(ctx, "example", rdb, pool.WithLogger(logger)) + node, err := pool.AddNode(ctx, "example", rdb, + pool.WithJobSinkBlockDuration(100*time.Millisecond), // Shutdown faster + pool.WithLogger(logger), + ) if err != nil { panic(err) } diff --git a/examples/pool/worker/main.go b/examples/pool/worker/main.go index 4213fbc..3611fa9 100644 --- a/examples/pool/worker/main.go +++ b/examples/pool/worker/main.go @@ -51,7 +51,10 @@ func main() { } // Create node for pool "example". - node, err := pool.AddNode(ctx, "example", rdb, pool.WithLogger(logger)) + node, err := pool.AddNode(ctx, "example", rdb, + pool.WithJobSinkBlockDuration(100*time.Millisecond), // Shutdown faster + pool.WithLogger(logger), + ) if err != nil { panic(err) } @@ -62,15 +65,31 @@ func main() { panic(err) } - // Wait for jobs to complete. - log.Infof(ctx, "Waiting for jobs... CTRL+C to stop.") + // Check if a timeout is set + var timeout time.Duration + if t := os.Getenv("TIMEOUT"); t != "" { + timeout, err = time.ParseDuration(t) + if err != nil { + panic(err) + } + } else { + timeout = 10 * time.Minute + } + log.Infof(ctx, "timeout set to %s", timeout) - // Close done channel on CTRL-C. + // Wait for CTRL-C or timeout. + log.Infof(ctx, "Waiting for jobs... CTRL+C to stop.") sigc := make(chan os.Signal, 1) + defer close(sigc) signal.Notify(sigc, os.Interrupt) - <-sigc - close(sigc) + select { + case <-sigc: + log.Infof(ctx, "interrupted") + case <-time.After(timeout): + log.Infof(ctx, "timeout") + } + // Shutdown node. if err := node.Shutdown(ctx); err != nil { panic(err) } diff --git a/examples/streaming/multi-readers/main.go b/examples/streaming/multi-readers/main.go index e0e386a..b8baefd 100644 --- a/examples/streaming/multi-readers/main.go +++ b/examples/streaming/multi-readers/main.go @@ -28,7 +28,11 @@ func main() { } // Don't forget to destroy the stream when done - defer stream.Destroy(ctx) + defer func() { + if err := stream.Destroy(ctx); err != nil { + panic(err) + } + }() // Write 2 events to the stream id1, err := stream.Add(ctx, "event 1", []byte("payload 1")) diff --git a/scripts/run-examples b/scripts/run-examples index d1e8355..371bfba 100755 --- a/scripts/run-examples +++ b/scripts/run-examples @@ -1,17 +1,31 @@ #!/bin/bash +# Find the Git root directory +git_root=$(git rev-parse --show-toplevel 2>/dev/null) + +if [ -z "$git_root" ]; then + echo "Error: Not in a Git repository" >&2 + exit 1 +fi + +# Load environment variables from .env file in the Git root +if [ -f "$git_root/.env" ]; then + export $(grep -v '^#' "$git_root/.env" | xargs) +fi + +# Update file paths to use absolute paths from Git root files=( - "examples/pool/worker/main.go" - "examples/pool/producer/main.go" - "examples/rmap/basics/main.go" - "examples/rmap/multi-nodes/main.go" - "examples/rmap/multi-nodes/main.go" - "examples/streaming/single-reader/main.go" - "examples/streaming/single-sink/main.go" - "examples/streaming/multi-readers/main.go" - "examples/streaming/multi-sinks/main.go" - "examples/streaming/multi-streams/main.go" - "examples/streaming/pub-sub/main.go" + "$git_root/examples/pool/worker/main.go" + "$git_root/examples/pool/producer/main.go" + "$git_root/examples/rmap/basics/main.go" + "$git_root/examples/rmap/multi-nodes/main.go" + "$git_root/examples/rmap/multi-nodes/main.go" + "$git_root/examples/streaming/single-reader/main.go" + "$git_root/examples/streaming/single-sink/main.go" + "$git_root/examples/streaming/multi-readers/main.go" + "$git_root/examples/streaming/multi-sinks/main.go" + "$git_root/examples/streaming/multi-streams/main.go" + "$git_root/examples/streaming/pub-sub/main.go" ) args=( @@ -30,7 +44,7 @@ run_example() { shift local args=("$@") echo "Running: $file ${args[*]}" - output=$(timeout "$timeout_sec" go run -v "$file" "${args[@]}" 2>&1) + output=$(cd "$git_root" && timeout "$timeout_sec" go run -v "$file" "${args[@]}" 2>&1) local exit_status=$? if [[ $exit_status -ne 0 ]]; then echo "Example '${file}' exited with an error or exceeded the timeout:" @@ -55,4 +69,4 @@ for pid in "${pids[@]}"; do fi done -exit "$status" \ No newline at end of file +exit "$status" From 3c108d340c3924540aba975146f08064276db220 Mon Sep 17 00:00:00 2001 From: Raphael Simon Date: Sun, 13 Oct 2024 14:10:38 -0700 Subject: [PATCH 2/8] go mod tidy --- go.mod | 4 +--- go.sum | 60 +--------------------------------------------------------- 2 files changed, 2 insertions(+), 62 deletions(-) diff --git a/go.mod b/go.mod index 172420c..7003185 100644 --- a/go.mod +++ b/go.mod @@ -5,8 +5,6 @@ go 1.22.7 toolchain go1.23.1 require ( - github.com/crossnokaye/internal-services v0.0.0-20241002164525-1737842b8588 - github.com/google/uuid v1.6.0 github.com/gorilla/websocket v1.5.3 github.com/oklog/ulid/v2 v2.1.0 github.com/redis/go-redis/v9 v9.6.1 @@ -33,10 +31,10 @@ require ( github.com/go-logfmt/logfmt v0.6.0 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect + github.com/google/uuid v1.6.0 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 // indirect github.com/manveru/faker v0.0.0-20171103152722-9fbc68a78c4d // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/puzpuzpuz/xsync/v3 v3.4.0 // indirect go.opentelemetry.io/otel v1.31.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.31.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.31.0 // indirect diff --git a/go.sum b/go.sum index 9e411fc..84fbd4b 100644 --- a/go.sum +++ b/go.sum @@ -1,18 +1,13 @@ -github.com/aws/smithy-go v1.20.3 h1:ryHwveWzPV5BIof6fyDvor6V3iUL7nTfiTKXHiW05nE= -github.com/aws/smithy-go v1.20.3/go.mod h1:krry+ya/rV9RDcV/Q16kpu6ypI4K2czasz0NC3qS14E= github.com/aws/smithy-go v1.22.0 h1:uunKnWlcoL3zO7q+gG2Pk53joueEOsnNB28QdMsmiMM= github.com/aws/smithy-go v1.22.0/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg= github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= -github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4= github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= -github.com/crossnokaye/internal-services v0.0.0-20241002164525-1737842b8588 h1:+76syaOYT0/TkXvhaJQgJpQvmADLuJ9W9T1CM3VfSbA= -github.com/crossnokaye/internal-services v0.0.0-20241002164525-1737842b8588/go.mod h1:wawMB8oi5SKfji4DeHvokHxEE7rVPdLrDvsfkGAibOI= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= @@ -36,8 +31,6 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= -github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 h1:bkypFPDjIYGfCYD5mRBvpqxfYX1YCS1PXdKYWi8FsN0= -github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0/go.mod h1:P+Lt/0by1T8bfcF3z737NnSbmxQAppXMRziHUxPOC8k= github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 h1:asbCHRVmodnJTuQ3qamDwqVOIjwqUPTYmYuemVOx+Ys= github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0/go.mod h1:ggCgvZ2r7uOoQjOyu2Y1NhHmEPPzzuhWgcza5M1Ji1I= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= @@ -55,65 +48,38 @@ github.com/oklog/ulid/v2 v2.1.0/go.mod h1:rcEKHmBBKfef9DhnvX7y1HZBYxjXb0cP5ExxNs github.com/pborman/getopt v0.0.0-20170112200414-7148bc3a4c30/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/puzpuzpuz/xsync/v3 v3.4.0 h1:DuVBAdXuGFHv8adVXjWWZ63pJq+NRXOWVXlKDBZ+mJ4= -github.com/puzpuzpuz/xsync/v3 v3.4.0/go.mod h1:VjzYrABPabuM4KyBh1Ftq6u8nhwY5tBPKP9jpmh0nnA= github.com/redis/go-redis/v9 v9.6.1 h1:HHDteefn6ZkTtY5fGUE8tj8uy85AHk6zP7CpzIAM0y4= github.com/redis/go-redis/v9 v9.6.1/go.mod h1:0C0c6ycQsdpVNQpxb1njEQIqkx5UcsM8FJCQLgE9+RA= -github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= -github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= +github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.53.0 h1:IVtyPth4Rs5P8wIf0mP2KVKFNTJ4paX9qQ4Hkh5gFdc= -go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.53.0/go.mod h1:ImRBLMJv177/pwiLZ7tU7HDGNdBv7rS0HQ99eN/zBl8= go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.55.0 h1:sqmsIQ75l6lfZjjpnXXT9DFVtYEDg6CH0/Cn4/3A1Wg= go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.55.0/go.mod h1:rsg1EO8LXSs2po50PB5CeY/MSVlhghuKBgXlKnqm6ks= -go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.53.0 h1:4K4tsIXefpVJtvA/8srF4V4y0akAoPHkIslgAkjixJA= -go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.53.0/go.mod h1:jjdQuTGVsXV4vSs+CJ2qYDeDPf9yIJV23qlIzBm73Vg= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.55.0 h1:ZIg3ZT/aQ7AfKqdwp7ECpOK6vHqquXXuyTjIO8ZdmPs= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.55.0/go.mod h1:DQAwmETtZV00skUwgD6+0U89g80NKsJE3DCKeLLPQMI= -go.opentelemetry.io/otel v1.28.0 h1:/SqNcYk+idO0CxKEUOtKQClMK/MimZihKYMruSMViUo= -go.opentelemetry.io/otel v1.28.0/go.mod h1:q68ijF8Fc8CnMHKyzqL6akLO46ePnjkgfIMIjUIX9z4= go.opentelemetry.io/otel v1.31.0 h1:NsJcKPIW0D0H3NgzPDHmo0WW6SptzPdqg/L1zsIm2hY= go.opentelemetry.io/otel v1.31.0/go.mod h1:O0C14Yl9FgkjqcCZAsE053C13OaddMYr/hz6clDkEJE= -go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.28.0 h1:U2guen0GhqH8o/G2un8f/aG/y++OuW6MyCo6hT9prXk= -go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.28.0/go.mod h1:yeGZANgEcpdx/WK0IvvRFC+2oLiMS2u4L/0Rj2M2Qr0= go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.31.0 h1:FZ6ei8GFW7kyPYdxJaV2rgI6M+4tvZzhYsQ2wgyVC08= go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.31.0/go.mod h1:MdEu/mC6j3D+tTEfvI15b5Ci2Fn7NneJ71YMoiS3tpI= -go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.28.0 h1:aLmmtjRke7LPDQ3lvpFz+kNEH43faFhzW7v8BFIEydg= -go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.28.0/go.mod h1:TC1pyCt6G9Sjb4bQpShH+P5R53pO6ZuGnHuuln9xMeE= go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.31.0 h1:ZsXq73BERAiNuuFXYqP4MR5hBrjXfMGSO+Cx7qoOZiM= go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.31.0/go.mod h1:hg1zaDMpyZJuUzjFxFsRYBoccE86tM9Uf4IqNMUxvrY= -go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.28.0 h1:3Q/xZUyC1BBkualc9ROb4G8qkH90LXEIICcs5zv1OYY= -go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.28.0/go.mod h1:s75jGIWA9OfCMzF0xr+ZgfrB5FEbbV7UuYo32ahUiFI= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.31.0 h1:K0XaT3DwHAcV4nKLzcQvwAgSyisUghWoY20I7huthMk= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.31.0/go.mod h1:B5Ki776z/MBnVha1Nzwp5arlzBbE3+1jk+pGmaP5HME= -go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.28.0 h1:R3X6ZXmNPRR8ul6i3WgFURCHzaXjHdm0karRG/+dj3s= -go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.28.0/go.mod h1:QWFXnDavXWwMx2EEcZsf3yxgEKAqsxQ+Syjp+seyInw= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.31.0 h1:FFeLy03iVTXP6ffeN2iXrxfGsZGCjVx0/4KlizjyBwU= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.31.0/go.mod h1:TMu73/k1CP8nBUpDLc71Wj/Kf7ZS9FK5b53VapRsP9o= -go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.28.0 h1:j9+03ymgYhPKmeXGk5Zu+cIZOlVzd9Zv7QIiyItjFBU= -go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.28.0/go.mod h1:Y5+XiUG4Emn1hTfciPzGPJaSI+RpDts6BnCIir0SLqk= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.31.0 h1:lUsI2TYsQw2r1IASwoROaCnjdj2cvC2+Jbxvk6nHnWU= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.31.0/go.mod h1:2HpZxxQurfGxJlJDblybejHB6RX6pmExPNe517hREw4= go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.28.0 h1:BJee2iLkfRfl9lc7aFmBwkWxY/RI1RDdXepSF6y8TPE= go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.28.0/go.mod h1:DIzlHs3DRscCIBU3Y9YSzPfScwnYnzfnCd4g8zA7bZc= go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.28.0 h1:EVSnY9JbEEW92bEkIYOVMw4q1WJxIAGoFTrtYOzWuRQ= go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.28.0/go.mod h1:Ea1N1QQryNXpCD0I1fdLibBAIpQuBkznMmkdKrapk1Y= -go.opentelemetry.io/otel/metric v1.28.0 h1:f0HGvSl1KRAU1DLgLGFjrwVyismPlnuU6JD6bOeuA5Q= -go.opentelemetry.io/otel/metric v1.28.0/go.mod h1:Fb1eVBFZmLVTMb6PPohq3TO9IIhUisDsbJoL/+uQW4s= go.opentelemetry.io/otel/metric v1.31.0 h1:FSErL0ATQAmYHUIzSezZibnyVlft1ybhy4ozRPcF2fE= go.opentelemetry.io/otel/metric v1.31.0/go.mod h1:C3dEloVbLuYoX41KpmAhOqNriGbA+qqH6PQ5E5mUfnY= -go.opentelemetry.io/otel/sdk v1.28.0 h1:b9d7hIry8yZsgtbmM0DKyPWMMUMlK9NEKuIG4aBqWyE= -go.opentelemetry.io/otel/sdk v1.28.0/go.mod h1:oYj7ClPUA7Iw3m+r7GeEjz0qckQRJK2B8zjcZEfu7Pg= go.opentelemetry.io/otel/sdk v1.31.0 h1:xLY3abVHYZ5HSfOg3l2E5LUj2Cwva5Y7yGxnSW9H5Gk= go.opentelemetry.io/otel/sdk v1.31.0/go.mod h1:TfRbMdhvxIIr/B2N2LQW2S5v9m3gOQ/08KsbbO5BPT0= -go.opentelemetry.io/otel/sdk/metric v1.28.0 h1:OkuaKgKrgAbYrrY0t92c+cC+2F6hsFNnCQArXCKlg08= -go.opentelemetry.io/otel/sdk/metric v1.28.0/go.mod h1:cWPjykihLAPvXKi4iZc1dpER3Jdq2Z0YLse3moQUCpg= go.opentelemetry.io/otel/sdk/metric v1.31.0 h1:i9hxxLJF/9kkvfHppyLL55aW7iIJz4JjxTeYusH7zMc= go.opentelemetry.io/otel/sdk/metric v1.31.0/go.mod h1:CRInTMVvNhUKgSAMbKyTMxqOBC0zgyxzW55lZzX43Y8= -go.opentelemetry.io/otel/trace v1.28.0 h1:GhQ9cUuQGmNDd5BTCP2dAvv75RdMxEfTmYejp+lkx9g= -go.opentelemetry.io/otel/trace v1.28.0/go.mod h1:jPyXzNPg6da9+38HEwElrQiHlVMTnVfM3/yv2OlIHaI= go.opentelemetry.io/otel/trace v1.31.0 h1:ffjsj1aRouKewfr85U2aGagJ46+MvodynlQ1HYdmJys= go.opentelemetry.io/otel/trace v1.31.0/go.mod h1:TXZkRk7SM2ZQLtR6eoAWQFIHPvzQ06FJAsO1tJg480A= go.opentelemetry.io/proto/otlp v1.3.1 h1:TrMUixzpM0yuc/znrFTP9MMRh8trP93mkCiDVeXrui0= @@ -122,54 +88,30 @@ go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= goa.design/clue v1.0.6 h1:K9YXPUXtFfdKk0qi4yjtTR1v5ZO+M22FVdCp3maL9dk= goa.design/clue v1.0.6/go.mod h1:0YB3ihZJ6Sc0IRmF6TTWIvIcIPzc+JDETQFjel1AL/o= -goa.design/goa/v3 v3.18.2 h1:2t3RHgc2tTYEExEwRlnkrSsFD82rsZ5mBkv5JUBvelY= -goa.design/goa/v3 v3.18.2/go.mod h1:jS66/I2PCh9MPwXtFY8vWb4pGQDYz3qVK3TjjvRLEUM= goa.design/goa/v3 v3.19.1 h1:jpV3LEy7YANzPMwm++Lu17RoThRJgXrPxdEM0A1nlOE= goa.design/goa/v3 v3.19.1/go.mod h1:astNE9ube0YCxqq7DQkt1MtLxB/b3kRPEFkEZovcO2I= goa.design/model v1.9.8 h1:SGf+q+hYO1rh/Jvq7T0ZbfBcANzi3Lc3RHKWBDZCWCE= goa.design/model v1.9.8/go.mod h1:RqPSTbZV49gD3+IBsT9/zf+EPxt4zuDPuT/6r857H3w= -golang.org/x/mod v0.19.0 h1:fEdghXQSo20giMthA7cd28ZC+jts4amQ3YMXiP5oMQ8= -golang.org/x/mod v0.19.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/mod v0.21.0 h1:vvrHzRwRfVKSiLrG+d4FMl/Qi4ukBCE6kZlTUkDYRT0= golang.org/x/mod v0.21.0/go.mod h1:6SkKJ3Xj0I0BrPOZoBy3bdMptDDU9oJrpohJ3eWZ1fY= -golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys= -golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE= golang.org/x/net v0.30.0 h1:AcW1SDZMkb8IpzCdQUaIq2sP4sZ4zw+55h6ynffypl4= golang.org/x/net v0.30.0/go.mod h1:2wGyMJ5iFasEhkwi13ChkO/t1ECNC4X4eBKkVFyYFlU= -golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= -golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= -golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI= -golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo= golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/term v0.22.0 h1:BbsgPEJULsl2fV/AT3v15Mjva5yXKQDyKf+TbDz7QJk= -golang.org/x/term v0.22.0/go.mod h1:F3qCibpT5AMpCRfhfT53vVJwhLtIVHhB9XDjfFvnMI4= golang.org/x/term v0.25.0 h1:WtHI/ltw4NvSUig5KARz9h521QvRC8RmF/cuYqifU24= golang.org/x/term v0.25.0/go.mod h1:RPyXicDX+6vLxogjjRxjgD2TKtmAO6NZBsBRfrOLu7M= -golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= -golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM= golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= -golang.org/x/tools v0.23.0 h1:SGsXPZ+2l4JsgaCKkx+FQ9YZ5XEtA1GZYuoDjenLjvg= -golang.org/x/tools v0.23.0/go.mod h1:pnu6ufv6vQkll6szChhK3C3L/ruaIv5eBeztNG8wtsI= golang.org/x/tools v0.26.0 h1:v/60pFQmzmT9ExmjDv2gGIfi3OqfKoEP6I5+umXlbnQ= golang.org/x/tools v0.26.0/go.mod h1:TPVVj70c7JJ3WCazhD8OdXcZg/og+b9+tH/KxylGwH0= -google.golang.org/genproto/googleapis/api v0.0.0-20240709173604-40e1e62336c5 h1:a/Z0jgw03aJ2rQnp5PlPpznJqJft0HyvyrcUcxgzPwY= -google.golang.org/genproto/googleapis/api v0.0.0-20240709173604-40e1e62336c5/go.mod h1:mw8MG/Qz5wfgYr6VqVCiZcHe/GJEfI+oGGDCohaVgB0= google.golang.org/genproto/googleapis/api v0.0.0-20241007155032-5fefd90f89a9 h1:T6rh4haD3GVYsgEfWExoCZA2o2FmbNyKpTuAxbEFPTg= google.golang.org/genproto/googleapis/api v0.0.0-20241007155032-5fefd90f89a9/go.mod h1:wp2WsuBYj6j8wUdo3ToZsdxxixbvQNAHqVJrTgi5E5M= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240723171418-e6d459c13d2a h1:hqK4+jJZXCU4pW7jsAdGOVFIfLHQeV7LaizZKnZ84HI= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240723171418-e6d459c13d2a/go.mod h1:Ue6ibwXGpU+dqIcODieyLOcgj7z8+IcskoNIgZxtrFY= google.golang.org/genproto/googleapis/rpc v0.0.0-20241007155032-5fefd90f89a9 h1:QCqS/PdaHTSWGvupk2F/ehwHtGc0/GYkT+3GAcR1CCc= google.golang.org/genproto/googleapis/rpc v0.0.0-20241007155032-5fefd90f89a9/go.mod h1:GX3210XPVPUjJbTUbvwI8f2IpZDMZuPJWDzDuebbviI= -google.golang.org/grpc v1.65.0 h1:bs/cUb4lp1G5iImFFd3u5ixQzweKizoZJAwBNLR42lc= -google.golang.org/grpc v1.65.0/go.mod h1:WgYC2ypjlB0EiQi6wdKixMqukr6lBc0Vo+oOgjrM5ZQ= google.golang.org/grpc v1.67.1 h1:zWnc1Vrcno+lHZCOofnIMvycFcc0QRGIzm9dhnDX68E= google.golang.org/grpc v1.67.1/go.mod h1:1gLDyUQU7CTLJI90u3nXZ9ekeghjeM7pTDZlqFNg2AA= -google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= -google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA= google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= From e3c227298b38dbeeb08b628065be2165691ce8fd Mon Sep 17 00:00:00 2001 From: Raphael Simon Date: Sun, 13 Oct 2024 14:15:18 -0700 Subject: [PATCH 3/8] Bump to Go 1.23 --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2b1f7ce..71c0888 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -13,7 +13,7 @@ jobs: strategy: fail-fast: true matrix: - go: ['1.22'] + go: ['1.23'] os: ['ubuntu-latest', 'windows-latest'] runs-on: ${{ matrix.os }} From 18a23f8dc1f32e6f980cd66688f864a0c8537186 Mon Sep 17 00:00:00 2001 From: Raphael Simon Date: Sun, 13 Oct 2024 14:20:21 -0700 Subject: [PATCH 4/8] Remove custom CodeQL settings --- .github/workflows/codeql.yml | 32 +++++--------------------------- 1 file changed, 5 insertions(+), 27 deletions(-) diff --git a/.github/workflows/codeql.yml b/.github/workflows/codeql.yml index 441076e..542d1b4 100644 --- a/.github/workflows/codeql.yml +++ b/.github/workflows/codeql.yml @@ -4,8 +4,9 @@ on: push: branches: [ "main" ] pull_request: - # The branches below must be a subset of the branches above branches: [ "main" ] + schedule: + - cron: '31 7 * * 3' jobs: analyze: @@ -20,43 +21,20 @@ jobs: fail-fast: false matrix: language: [ 'go' ] - # CodeQL supports [ 'cpp', 'csharp', 'go', 'java', 'javascript', 'python', 'ruby' ] - # Learn more about CodeQL language support at https://aka.ms/codeql-docs/language-support steps: - name: Checkout repository uses: actions/checkout@v4 - - name: Set up Go - uses: actions/setup-go@v5 - with: - go-version-file: 'go.mod' - - # Initializes the CodeQL tools for scanning. - name: Initialize CodeQL uses: github/codeql-action/init@v3 with: languages: ${{ matrix.language }} - # If you wish to specify custom queries, you can do so here or in a config file. - # By default, queries listed here will override any specified in a config file. - # Prefix the list here with "+" to use these queries and those in the config file. - - # Details on CodeQL's query packs refer to : https://docs.github.com/en/code-security/code-scanning/automatically-scanning-your-code-for-vulnerabilities-and-errors/configuring-code-scanning#using-queries-in-ql-packs - # queries: security-extended,security-and-quality - - - # Autobuild attempts to build any compiled languages (C/C++, C#, Go, or Java). - # If this step fails, then you should remove it and run the build manually (see below) - #- name: Autobuild - # uses: github/codeql-action/autobuild@v2 - - # â„šī¸ Command-line programs to run using the OS shell. - # 📚 See https://docs.github.com/en/actions/using-workflows/workflow-syntax-for-github-actions#jobsjob_idstepsrun - # If the Autobuild fails above, remove it and uncomment the following three lines. - # modify them (or add more) to build your code if your project, please refer to the EXAMPLE below for guidance. + - name: Autobuild + uses: github/codeql-action/autobuild@v3 - name: Perform CodeQL Analysis uses: github/codeql-action/analyze@v3 with: - category: "/language:${{matrix.language}}" + category: "/language:${{matrix.language}}" \ No newline at end of file From 860afa779789ef6c955ed18a1abc65acaae4bdd9 Mon Sep 17 00:00:00 2001 From: Raphael Simon Date: Sun, 13 Oct 2024 14:22:44 -0700 Subject: [PATCH 5/8] Start Redis --- scripts/cibuild | 1 + 1 file changed, 1 insertion(+) diff --git a/scripts/cibuild b/scripts/cibuild index 85da796..2e5b170 100755 --- a/scripts/cibuild +++ b/scripts/cibuild @@ -6,6 +6,7 @@ GIT_ROOT=$(git rev-parse --show-toplevel) pushd ${GIT_ROOT} ./scripts/setup +./scripts/start-redis ./scripts/test ./scripts/run-examples From bd8c1fb3da99acac6e878da57cfc8b2420407384 Mon Sep 17 00:00:00 2001 From: Raphael Simon Date: Sun, 13 Oct 2024 21:02:44 -0700 Subject: [PATCH 6/8] Properly requeue jobs of dead workers --- pool/errors.go | 8 ++-- pool/node.go | 110 +++++++++++++++++++++++++++++++++++--------- pool/node_test.go | 8 +--- pool/worker.go | 46 +++++++++++++++--- pool/worker_test.go | 28 +++++++---- streaming/sink.go | 2 +- 6 files changed, 152 insertions(+), 50 deletions(-) diff --git a/pool/errors.go b/pool/errors.go index d140532..332a921 100644 --- a/pool/errors.go +++ b/pool/errors.go @@ -1,7 +1,7 @@ package pool -import "fmt" +import "errors" -// errRequeue is the error returned by a worker when starting a job it wants to -// requeue. -var errRequeue = fmt.Errorf("requeue") +// ErrRequeue indicates that a worker failed to process a job's start or stop operation +// and requests the job to be requeued for another attempt. +var ErrRequeue = errors.New("requeue") diff --git a/pool/node.go b/pool/node.go index b0f28fd..7781ae6 100644 --- a/pool/node.go +++ b/pool/node.go @@ -32,6 +32,8 @@ type ( nodeStream *streaming.Stream // node event stream for receiving worker events nodeReader *streaming.Reader // node event reader workerMap *rmap.Map // worker creation times by ID + jobsMap *rmap.Map // jobs by worker ID + jobPayloadsMap *rmap.Map // job payloads by job key keepAliveMap *rmap.Map // worker keep-alive timestamps indexed by ID shutdownMap *rmap.Map // key is node ID that requested shutdown tickerMap *rmap.Map // ticker next tick time indexed by name @@ -110,6 +112,8 @@ func AddNode(ctx context.Context, name string, rdb *redis.Client, opts ...NodeOp nodeID := ulid.Make().String() var ( wm *rmap.Map + jm *rmap.Map + jpm *rmap.Map km *rmap.Map tm *rmap.Map poolSink *streaming.Sink @@ -121,6 +125,14 @@ func AddNode(ctx context.Context, name string, rdb *redis.Client, opts ...NodeOp if err != nil { return nil, fmt.Errorf("failed to join pool workers replicated map %q: %w", workerMapName(name), err) } + jm, err = rmap.Join(ctx, jobsMapName(name), rdb, rmap.WithLogger(logger)) + if err != nil { + return nil, fmt.Errorf("failed to join pool jobs replicated map %q: %w", jobsMapName(name), err) + } + jpm, err = rmap.Join(ctx, jobPayloadsMapName(name), rdb, rmap.WithLogger(logger)) + if err != nil { + return nil, fmt.Errorf("failed to join pool job payloads replicated map %q: %w", jobPayloadsMapName(name), err) + } km, err = rmap.Join(ctx, keepAliveMapName(name), rdb, rmap.WithLogger(logger)) if err != nil { return nil, fmt.Errorf("failed to join pool keep-alive replicated map %q: %w", keepAliveMapName(name), err) @@ -150,6 +162,8 @@ func AddNode(ctx context.Context, name string, rdb *redis.Client, opts ...NodeOp NodeID: nodeID, keepAliveMap: km, workerMap: wm, + jobsMap: jm, + jobPayloadsMap: jpm, shutdownMap: wsm, tickerMap: tm, workerStreams: make(map[string]*streaming.Stream), @@ -205,6 +219,7 @@ func (node *Node) AddWorker(ctx context.Context, handler JobHandler) (*Worker, e } node.localWorkers = append(node.localWorkers, w) node.workerStreams[w.ID] = w.stream + node.logger.Info("added worker", "worker", w.ID) return w, nil } @@ -222,6 +237,7 @@ func (node *Node) RemoveWorker(ctx context.Context, w *Worker) error { break } } + node.logger.Info("removed worker", "worker", w.ID) return nil } @@ -236,6 +252,7 @@ func (node *Node) Workers() []*Worker { // DispatchJob dispatches a job to the proper worker in the pool. // It returns the error returned by the worker's start handler if any. +// If the context is done before the job is dispatched, the context error is returned. func (node *Node) DispatchJob(ctx context.Context, key string, payload []byte) error { // Send job to pool stream. node.lock.Lock() @@ -254,9 +271,16 @@ func (node *Node) DispatchJob(ctx context.Context, key string, payload []byte) e node.lock.Unlock() // Wait for return status. - err = <-cherr + select { + case err = <-cherr: + case <-ctx.Done(): + err = ctx.Err() + } close(cherr) + if err == nil { + node.logger.Info("dispatched", "key", key) + } return err } @@ -270,6 +294,7 @@ func (node *Node) StopJob(ctx context.Context, key string) error { if _, err := node.poolStream.Add(ctx, evStopJob, marshalJobKey(key)); err != nil { return fmt.Errorf("failed to add stop job to stream %q: %w", node.poolStream.Name, err) } + node.logger.Info("stop requested", "key", key) return nil } @@ -283,6 +308,7 @@ func (node *Node) NotifyWorker(ctx context.Context, key string, payload []byte) if _, err := node.poolStream.Add(ctx, evNotify, marshalNotification(key, payload)); err != nil { return fmt.Errorf("failed to add notification to stream %q: %w", node.poolStream.Name, err) } + node.logger.Info("notification sent", "key", key) return nil } @@ -418,10 +444,11 @@ func (node *Node) routeWorkerEvent(ctx context.Context, ev *streaming.Event) err // Compute the worker ID that will handle the job. key := unmarshalJobKey(ev.Payload) - wid, err := node.jobWorker(ctx, key) - if err != nil { - return fmt.Errorf("failed to route job %q to worker: %w", key, err) + activeWorkers := node.activeWorkers(ctx) + if len(activeWorkers) == 0 { + return fmt.Errorf("no active worker in pool %q", node.Name) } + wid := activeWorkers[node.h.Hash(key, int64(len(activeWorkers)))] // Stream the event to the worker corresponding to the key hash. stream, err := node.workerStream(ctx, wid) @@ -652,16 +679,6 @@ func (node *Node) handleShutdownMapUpdate(ctx context.Context) { node.logger.Info("shutdown") } -// jobWorker returns the ID of the worker that handles the job with the given key. -// It is the caller's responsibility to lock the node. -func (node *Node) jobWorker(ctx context.Context, key string) (string, error) { - activeWorkers := node.activeWorkers(ctx) - if len(activeWorkers) == 0 { - return "", fmt.Errorf("no active worker in pool %q", node.Name) - } - return activeWorkers[node.h.Hash(key, int64(len(activeWorkers)))], nil -} - // activeWorkers returns the IDs of the active workers in the pool. // It is the caller's responsibility to lock the node. func (node *Node) activeWorkers(ctx context.Context) []string { @@ -683,24 +700,59 @@ func (node *Node) activeWorkers(ctx context.Context) []string { // Then filter out workers that have not been seen for more than workerTTL. alive := node.keepAliveMap.Map() var activeIDs []string + now := time.Now() for _, id := range sortedIDs { - a, ok := alive[id] + ls, ok := alive[id] if !ok { // This could happen if a worker is removed from the // pool and the last seen map deletion replicates before // the workers map deletion. continue } - nanos, _ := strconv.ParseInt(a, 10, 64) - t := time.Unix(0, nanos) - horizon := t.Add(node.workerTTL) - if horizon.After(time.Now()) { + lsi, err := strconv.ParseInt(ls, 10, 64) + if err != nil { + node.logger.Error(fmt.Errorf("failed to parse last seen timestamp for worker %q: %w", id, err)) + continue + } + lastSeen := now.Sub(time.Unix(0, lsi)) + if lastSeen <= node.workerTTL { activeIDs = append(activeIDs, id) - } else { - node.logger.Info("deleting", "worker", id, "last seen", t, "TTL", node.workerTTL) - if err := node.deleteWorker(ctx, id); err != nil { - node.logger.Error(fmt.Errorf("failed to delete worker %q: %w", id, err)) + continue + } + node.logger.Info("deleting", "worker", id, "last seen", lastSeen, "TTL", node.workerTTL) + + // requeue the worker's jobs first + jobs, _ := node.jobsMap.GetValues(id) + var requeued int + for _, key := range jobs { + payload, ok := node.jobPayloadsMap.Get(key) + if !ok { + node.logger.Error(fmt.Errorf("failed to get requeue inactive job payload for job %q: %w", key, err)) + continue + } + job := &Job{ + Key: key, + Payload: []byte(payload), + CreatedAt: time.Now(), + NodeID: node.NodeID, + } + eventID, err := node.poolStream.Add(ctx, evStartJob, marshalJob(job)) + if err != nil { + node.logger.Error(fmt.Errorf("failed to requeue inactive job %q: %w", job.Key, err)) + continue } + node.pendingJobs[eventID] = nil + node.logger.Info("requeued inactive", "job", job.Key, "event-id", eventID) + requeued++ + } + if requeued != len(jobs) { + node.logger.Error(fmt.Errorf("failed to requeue all inactive jobs for worker %q, will retry later: %d/%d", id, requeued, len(jobs))) + continue + } + + // then delete the worker + if err := node.deleteWorker(ctx, id); err != nil { + node.logger.Error(fmt.Errorf("failed to delete worker %q: %w", id, err)) } } return activeIDs @@ -762,6 +814,18 @@ func workerMapName(pool string) string { return fmt.Sprintf("%s:workers", pool) } +// jobsMapName returns the name of the replicated map used to store the +// jobs by worker ID. +func jobsMapName(pool string) string { + return fmt.Sprintf("%s:jobs", pool) +} + +// jobPayloadsMapName returns the name of the replicated map used to store the +// job payloads by job key. +func jobPayloadsMapName(pool string) string { + return fmt.Sprintf("%s:job-payloads", pool) +} + // keepAliveMapName returns the name of the replicated map used to store the // worker keep-alive timestamps. func keepAliveMapName(pool string) string { diff --git a/pool/node_test.go b/pool/node_test.go index be3c487..1aa5c08 100644 --- a/pool/node_test.go +++ b/pool/node_test.go @@ -156,13 +156,9 @@ func TestTwoNodeJobDispatchAndAck(t *testing.T) { t.Errorf("Unexpected job received by worker1: %+v", job) return nil } - originalStartFunc := worker2.handler.(*mockHandler).startFunc worker2.handler.(*mockHandler).startFunc = func(job *Job) error { - err := originalStartFunc(job) - if err == nil { - close(jobDone) - } - return err + close(jobDone) + return nil } // Test job dispatch and execution diff --git a/pool/worker.go b/pool/worker.go index 572192e..9cc952a 100644 --- a/pool/worker.go +++ b/pool/worker.go @@ -6,6 +6,7 @@ import ( "fmt" "sort" "strconv" + "strings" "sync" "time" @@ -31,6 +32,8 @@ type ( reader *streaming.Reader done chan struct{} workersMap *rmap.Map + jobsMap *rmap.Map + jobPayloadsMap *rmap.Map keepAliveMap *rmap.Map shutdownMap *rmap.Map workerTTL time.Duration @@ -110,6 +113,8 @@ func newWorker(ctx context.Context, p *Node, h JobHandler) (*Worker, error) { reader: reader, done: make(chan struct{}), workersMap: p.workerMap, + jobsMap: p.jobsMap, + jobPayloadsMap: p.jobPayloadsMap, keepAliveMap: p.keepAliveMap, shutdownMap: p.shutdownMap, workerTTL: p.workerTTL, @@ -169,8 +174,8 @@ func (w *Worker) handleEvents(c <-chan *streaming.Event) { err = w.notify(ctx, key, payload) } if err != nil { - if errors.Is(err, errRequeue) { - w.logger.Info("requeue", ev.EventName, "after", w.pendingJobTTL, "error", err) + if errors.Is(err, ErrRequeue) { + w.logger.Info("requeue", ev.EventName, "after", w.pendingJobTTL) continue } w.ackPoolEvent(ctx, nodeID, ev.ID, err) @@ -188,7 +193,6 @@ func (w *Worker) handleEvents(c <-chan *streaming.Event) { // stop stops the reader, the worker goroutines and removes the worker from the // workers and keep-alive maps. -// TBD: what to do if requeue fails? func (w *Worker) stop(ctx context.Context) { w.lock.Lock() defer w.lock.Unlock() @@ -203,6 +207,17 @@ func (w *Worker) stop(ctx context.Context) { if _, er := w.keepAliveMap.Delete(ctx, w.ID); er != nil { err = fmt.Errorf("failed to remove worker %q from keep alive map: %w", w.ID, er) } + keys, er := w.jobsMap.Delete(ctx, w.ID) + if er != nil { + err = fmt.Errorf("failed to remove worker %q from jobs map: %w", w.ID, er) + } + if keys != "" { + for _, key := range strings.Split(keys, ",") { + if _, er := w.jobPayloadsMap.Delete(ctx, key); er != nil { + err = fmt.Errorf("worker stop: failed to remove job payload %q from job payloads map: %w", key, er) + } + } + } w.reader.Close() if er := w.stream.Destroy(ctx); er != nil { err = fmt.Errorf("failed to destroy stream for worker %q: %w", w.ID, er) @@ -231,31 +246,50 @@ func (w *Worker) stopAndWait(ctx context.Context) { } // startJob starts a job. -func (w *Worker) startJob(_ context.Context, job *Job) error { +func (w *Worker) startJob(ctx context.Context, job *Job) error { w.lock.Lock() defer w.lock.Unlock() if w.stopped { return fmt.Errorf("worker %q stopped", w.ID) } + if _, err := w.jobsMap.AppendUniqueValues(ctx, w.ID, job.Key); err != nil { + w.logger.Error(fmt.Errorf("failed to add job %q to jobs map: %w, requeueing", job.Key, err)) + return ErrRequeue + } + if _, err := w.jobPayloadsMap.Set(ctx, job.Key, string(job.Payload)); err != nil { + w.logger.Error(fmt.Errorf("failed to add job payload %q to job payloads map: %w, requeueing", job.Key, err)) + return ErrRequeue + } job.Worker = w if err := w.handler.Start(job); err != nil { + if _, err := w.jobsMap.RemoveValues(ctx, w.ID, job.Key); err != nil { + w.logger.Error(fmt.Errorf("start failure handling: failed to remove job %q from jobs map: %w", job.Key, err)) + } + if _, err := w.jobPayloadsMap.Delete(ctx, job.Key); err != nil { + w.logger.Error(fmt.Errorf("start failure handling: failed to remove job payload %q from job payloads map: %w", job.Key, err)) + } return err } w.logger.Info("started job", "job", job.Key) - job.Worker = w w.jobs[job.Key] = job return nil } // stopJob stops a job. // worker.lock must be held when calling this method. -func (w *Worker) stopJob(_ context.Context, key string) error { +func (w *Worker) stopJob(ctx context.Context, key string) error { if _, ok := w.jobs[key]; !ok { return fmt.Errorf("job %s not found", key) } if err := w.handler.Stop(key); err != nil { return fmt.Errorf("failed to stop job %q: %w", key, err) } + if _, err := w.jobsMap.RemoveValues(ctx, w.ID, key); err != nil { + w.logger.Error(fmt.Errorf("stop job: failed to remove job %q from jobs map: %w", key, err)) + } + if _, err := w.jobPayloadsMap.Delete(ctx, key); err != nil { + w.logger.Error(fmt.Errorf("stop job: failed to remove job payload %q from job payloads map: %w", key, err)) + } w.logger.Info("stopped job", "job", key) delete(w.jobs, key) return nil diff --git a/pool/worker_test.go b/pool/worker_test.go index 9b0428c..8ea4cc1 100644 --- a/pool/worker_test.go +++ b/pool/worker_test.go @@ -1,6 +1,7 @@ package pool import ( + "context" "strings" "testing" "time" @@ -18,7 +19,9 @@ func TestWorkerRequeueJobs(t *testing.T) { rdb = ptesting.NewRedisClient(t) node = newTestNode(t, ctx, rdb, testName) ) - defer ptesting.CleanupRedis(t, rdb, true, testName) + defer ptesting.CleanupRedis(t, rdb, false, testName) + ctx, cancel := context.WithTimeout(ctx, time.Second) + defer cancel() // Create a worker and dispatch a job worker := newTestWorker(t, ctx, node) @@ -27,19 +30,24 @@ func TestWorkerRequeueJobs(t *testing.T) { // Wait for the job to start require.Eventually(t, func() bool { return len(worker.Jobs()) == 1 }, max, delay) - // stop refreshing keep-alive and remove the worker from the node's keep-alive map - worker.workerTTL = time.Hour - _, err := node.keepAliveMap.Delete(ctx, worker.ID) - require.NoError(t, err) - require.Eventually(t, func() bool { return len(node.keepAliveMap.Map()) == 0 }, time.Second, delay) + // Emulate the worker failing by preventing it from refreshing its keepalive + // This means we can't cleanup cleanly, hence "false" in CleanupRedis + worker.lock.Lock() + worker.stopped = true + worker.lock.Unlock() // Create a new worker to pick up the requeued job newWorker := newTestWorker(t, ctx, node) + // Wait at least workerTTL + time.Sleep(2 * node.workerTTL) + + // Route an event to trigger worker cleanup + assert.NoError(t, node.DispatchJob(ctx, testName+"2", []byte("payload2"))) + // Wait for the job to be requeued and started by the new worker // Increase 'max' to cover the time until requeue happens - require.Eventually(t, func() bool { return len(newWorker.Jobs()) == 1 }, time.Second, delay) - - // Clean up - assert.NoError(t, node.Shutdown(ctx)) + require.Eventually(t, func() bool { + return len(newWorker.Jobs()) == 2 + }, time.Second, delay, "job was not requeued") } diff --git a/streaming/sink.go b/streaming/sink.go index 9852715..aacc75c 100644 --- a/streaming/sink.go +++ b/streaming/sink.go @@ -495,7 +495,7 @@ func (s *Sink) deleteStaleConsumers(ctx context.Context) { } if time.Since(time.Unix(0, ts)) > 2*s.ackGracePeriod { staleConsumers = append(staleConsumers, consumer) - s.logger.Debug("stale consumer", "consumer", consumer) + s.logger.Debug("stale consumer", "consumer", consumer, "since", time.Since(time.Unix(0, ts)), "grace", 2*s.ackGracePeriod) } } for _, stream := range s.streams { From db528afe8acfb32b03060e2b8c8c761169b4efca Mon Sep 17 00:00:00 2001 From: Raphael Simon Date: Sun, 13 Oct 2024 21:06:40 -0700 Subject: [PATCH 7/8] Pulse isn't meant to be used on Windows --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 71c0888..4bc26af 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -14,7 +14,7 @@ jobs: fail-fast: true matrix: go: ['1.23'] - os: ['ubuntu-latest', 'windows-latest'] + os: ['ubuntu-latest'] runs-on: ${{ matrix.os }} steps: From fb5b7a89f4333df7cd4c8fc848af7296cb538edb Mon Sep 17 00:00:00 2001 From: Raphael Simon Date: Sun, 13 Oct 2024 21:07:53 -0700 Subject: [PATCH 8/8] Remove CodeQL custom settings, not needed --- .github/workflows/codeql.yml | 40 ------------------------------------ 1 file changed, 40 deletions(-) delete mode 100644 .github/workflows/codeql.yml diff --git a/.github/workflows/codeql.yml b/.github/workflows/codeql.yml deleted file mode 100644 index 542d1b4..0000000 --- a/.github/workflows/codeql.yml +++ /dev/null @@ -1,40 +0,0 @@ -name: "CodeQL" - -on: - push: - branches: [ "main" ] - pull_request: - branches: [ "main" ] - schedule: - - cron: '31 7 * * 3' - -jobs: - analyze: - name: Analyze - runs-on: ubuntu-latest - permissions: - actions: read - contents: read - security-events: write - - strategy: - fail-fast: false - matrix: - language: [ 'go' ] - - steps: - - name: Checkout repository - uses: actions/checkout@v4 - - - name: Initialize CodeQL - uses: github/codeql-action/init@v3 - with: - languages: ${{ matrix.language }} - - - name: Autobuild - uses: github/codeql-action/autobuild@v3 - - - name: Perform CodeQL Analysis - uses: github/codeql-action/analyze@v3 - with: - category: "/language:${{matrix.language}}" \ No newline at end of file