Skip to content

Commit

Permalink
Fix potential deadlocks
Browse files Browse the repository at this point in the history
  • Loading branch information
dunglas committed Apr 26, 2020
1 parent 3156a1a commit f68da4c
Show file tree
Hide file tree
Showing 10 changed files with 86 additions and 28 deletions.
15 changes: 15 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,21 @@ When you send a PR, just make sure that:
* You make the PR on the same branch you based your changes on. If you see commits
that you did not make in your PR, you're doing it wrong.

### Finding deadlocks

To debug potential deadlocks:

1. install `go-deadlock`:

go get github.com/sasha-s/go-deadlock/...@master
find . -name "*.go" | xargs -n 1 sed -i '' 's#"sync"#"sync"\
"github.com/sasha-s/go-deadlock"#'
find . -name "*.go" | xargs -n 1 sed -i '' 's#sync.RWMutex#deadlock.RWMutex#'
find . -name "*.go" | xargs -n 1 sed -i '' 's#sync.Mutex#deadlock.Mutex#'

2. Run the load test (`LoadTest.scala`)
3. Be sure to remove go-deadlock before committing

## Protocol

The protocol is written in Markdown, compatible with [Mmark](https://mmark.miek.nl/).
Expand Down
48 changes: 36 additions & 12 deletions LoadTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,15 @@
* Available environment variables (all optional):
* - HUB_URL: the URL of the hub to test
* - JWT: the JWT to use for authenticating the publisher
* - SUBSCRIBERS: the number of concurrent subscribers
* - PUBLISHERS: the number of concurrent publishers
*/
* - INITIAL_SUBSCRIBERS: the number of concurrent subscribers initially connected
* - SUBSCRIBERS_RATE_FROM: minimum rate (per second) of additional subscribers to connect
* - SUBSCRIBERS_RATE_TO: maximum rate (per second) of additional subscribers to connect
* - PUBLISHERS_RATE_FROM: minimum rate (per second) of publications
* - PUBLISHERS_RATE_TO: maximum rate (per second) of publications
* - INJECTION_DURATION: duration of the publishers injection
* - CONNECTION_DURATION: duration of subscribers' connection
* - RANDOM_CONNECTION_DURATION: to randomize the connection duration (will longs CONNECTION_DURATION at max)
*/

package mercure

Expand All @@ -19,19 +25,31 @@ import scala.util.Properties

class LoadTest extends Simulation {
/** The hub URL */
val HubUrl = Properties.envOrElse("HUB_URL", "http://localhost:3001/.well-known/mercure" )
val HubUrl = Properties.envOrElse("HUB_URL", "http://localhost:3001/.well-known/mercure")
/** JWT to use to publish */
val Jwt = Properties.envOrElse("JWT", "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJtZXJjdXJlIjp7InN1YnNjcmliZSI6WyJmb28iLCJiYXIiXSwicHVibGlzaCI6WyJmb28iXX19.afLx2f2ut3YgNVFStCx95Zm_UND1mZJ69OenXaDuZL8")
/** Number of concurrent subscribers to connect */
val ConcurrentSubscribers = Properties.envOrElse("SUBSCRIBERS", "10000").toInt
/** Number of concurent publishers */
val ConcurrentPublishers = Properties.envOrElse("PUBLISHERS", "2").toInt
/** Number of concurrent subscribers initially connected */
val InitialSubscribers = Properties.envOrElse("INITIAL_SUBSCRIBERS", "100").toInt
/** Additional subscribers rate (per second) */
val SubscribersRateFrom = Properties.envOrElse("SUBSCRIBERS_RATE_FROM", "2").toInt
val SubscribersRateTo = Properties.envOrElse("SUBSCRIBERS_RATE_TO", "10").toInt
/** Publishers rate (per second) */
val PublishersRateFrom = Properties.envOrElse("PUBLISHERS_RATE_FROM", "2").toInt
val PublishersRateTo = Properties.envOrElse("PUBLISHERS_RATE_TO", "20").toInt
/** Duration of injection (in seconds) */
val InjectionDuration = Properties.envOrElse("INJECTION_DURATION", "3600").toInt
/** How long a subscriber can stay connected at max (in seconds) */
val ConnectionDuration = Properties.envOrElse("CONNECTION_DURATION", "300").toInt
/** Randomize the connection duration? */
val RandomConnectionDuration = Properties.envOrElse("RANDOM_CONNECTION_DURATION", "true").toBoolean

val rnd = new scala.util.Random

val httpProtocol = http
.baseUrl(HubUrl)

val startTime = System.nanoTime
val scenarioPublish = scenario("Publish")
.pause(2) // Wait for subscribers
.exec(
http("Publish")
.post("")
Expand All @@ -47,11 +65,17 @@ class LoadTest extends Simulation {
sse.checkMessage("Check content").check(regex("""(.*)Hi(.*)"""))
)
)
.pause(15)
.pause(if (RandomConnectionDuration) rnd.nextInt(ConnectionDuration) else ConnectionDuration)
.exec(sse("Close").close())

setUp(
scenarioSubscribe.inject(atOnceUsers(ConcurrentSubscribers)).protocols(httpProtocol),
scenarioPublish.inject(atOnceUsers(ConcurrentPublishers)).protocols(httpProtocol)
scenarioSubscribe.inject(
atOnceUsers(InitialSubscribers),
rampUsersPerSec(SubscribersRateFrom) to SubscribersRateTo during (InjectionDuration seconds) randomized
).protocols(httpProtocol),
scenarioPublish.inject(
nothingFor(4 seconds), // Wait for subscribers
rampUsersPerSec(PublishersRateFrom) to PublishersRateTo during (InjectionDuration + ConnectionDuration seconds) randomized
).protocols(httpProtocol)
)
}
14 changes: 10 additions & 4 deletions docs/hub/load-test.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,13 @@ To test your own infrastructure, we provide a [Gatling](https://gatling.io)-base

Available environment variables (all are optional):

* `HUB_URL`: the URL of the hub to test
* `JWT`: the JWT to use for authenticating the publisher
* `SUBSCRIBERS`: the number of concurrent subscribers
* `PUBLISHERS`: the number of concurrent publishers
* `HUB_URL`: the URL of the hub to test
* `JWT`: the JWT to use for authenticating the publisher
* `INITIAL_SUBSCRIBERS`: the number of concurrent subscribers initially connected
* `SUBSCRIBERS_RATE_FROM`: minimum rate (per second) of additional subscribers to connect
* `SUBSCRIBERS_RATE_TO`: maximum rate (per second) of additional subscribers to connect
* `PUBLISHERS_RATE_FROM`: minimum rate (per second) of publications
* `PUBLISHERS_RATE_TO`: maximum rate (per second) of publications
* `INJECTION_DURATION`: duration of the publishers injection
* `CONNECTION_DURATION`: duration of subscribers' connection
* `RANDOM_CONNECTION_DURATION`: to randomize the connection duration (will longs `CONNECTION_DURATION` at max)
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ require (
github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect
github.com/mitchellh/mapstructure v1.2.2 // indirect
github.com/pelletier/go-toml v1.7.0 // indirect
github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5 // indirect
github.com/prometheus/client_golang v1.5.1
github.com/prometheus/procfs v0.0.11 // indirect
github.com/sasha-s/go-deadlock v0.2.1-0.20190427202633-1595213edefa
github.com/sirupsen/logrus v1.5.0
github.com/spf13/afero v1.2.2 // indirect
github.com/spf13/cast v1.3.1 // indirect
Expand Down
8 changes: 6 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/pelletier/go-toml v1.7.0 h1:7utD74fnzVc/cpcyy8sjrlFr5vYpypUixARcHIMIGuI=
github.com/pelletier/go-toml v1.7.0/go.mod h1:vwGMzjaWMwyfHwgIBhI2YUM4fB6nL6lVAvS1LBMMhTE=
github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5 h1:q2e307iGHPdTGp0hoxKjt1H5pDo6utceo3dQVK3I5XQ=
github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5/go.mod h1:jvVRKCrJTQWu0XVbaOlby/2lO20uSCHEMzzplHXte1o=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand Down Expand Up @@ -146,6 +148,10 @@ github.com/prometheus/procfs v0.0.11/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/sasha-s/go-deadlock v0.2.0 h1:lMqc+fUb7RrFS3gQLtoQsJ7/6TV/pAIFvBsqX73DK8Y=
github.com/sasha-s/go-deadlock v0.2.0/go.mod h1:StQn567HiB1fF2yJ44N9au7wOhrPS3iZqiDbRupzT10=
github.com/sasha-s/go-deadlock v0.2.1-0.20190427202633-1595213edefa h1:0U2s5loxrTy6/VgfVoLuVLFJcURKLH49ie0zSch7gh4=
github.com/sasha-s/go-deadlock v0.2.1-0.20190427202633-1595213edefa/go.mod h1:F73l+cr82YSh10GxyRI6qZiCgK64VaZjwesgfQ1/iLM=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
Expand Down Expand Up @@ -264,8 +270,6 @@ golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBn
golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/tools v0.0.0-20200402223321-bcf690261a44 h1:bMm0eoDiGkM5VfIyKjxDvoflW5GLp7+VCo+60n8F+TE=
golang.org/x/tools v0.0.0-20200402223321-bcf690261a44/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20200426102838-f3a5411a4c3b h1:zSzQJAznWxAh9fZxiPy2FZo+ZZEYoYFYYDYdOrU7AaM=
golang.org/x/tools v0.0.0-20200426102838-f3a5411a4c3b/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down
5 changes: 3 additions & 2 deletions hub/bolt_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import (
"math/rand"
"net/url"
"strconv"
"sync"

"github.com/sasha-s/go-deadlock"

bolt "go.etcd.io/bbolt"
"go.uber.org/atomic"
Expand All @@ -20,7 +21,7 @@ const defaultBoltBucketName = "updates"

// BoltTransport implements the TransportInterface using the Bolt database.
type BoltTransport struct {
sync.RWMutex
deadlock.RWMutex
db *bolt.DB
bucketName string
size uint64
Expand Down
6 changes: 4 additions & 2 deletions hub/bolt_transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"testing"
"time"

"github.com/sasha-s/go-deadlock"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
bolt "go.etcd.io/bbolt"
Expand Down Expand Up @@ -150,7 +152,7 @@ func TestBoltTransportWriteIsNotDispatchedUntilListen(t *testing.T) {
var (
readUpdate *Update
readError error
m sync.Mutex
m deadlock.Mutex
wg sync.WaitGroup
)
wg.Add(1)
Expand Down Expand Up @@ -188,7 +190,7 @@ func TestBoltTransportWriteIsDispatched(t *testing.T) {
var (
readUpdate *Update
readError error
m sync.Mutex
m deadlock.Mutex
wg sync.WaitGroup
)
wg.Add(1)
Expand Down
5 changes: 3 additions & 2 deletions hub/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@ package hub
import (
"log"
"net/http"
"sync"

"github.com/sasha-s/go-deadlock"

"github.com/spf13/viper"
"github.com/yosida95/uritemplate"
)

// uriTemplates caches uritemplate.Template to improve memory and CPU usage.
type uriTemplates struct {
sync.RWMutex
deadlock.RWMutex
m map[string]*templateCache
}

Expand Down
5 changes: 3 additions & 2 deletions hub/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import (
"errors"
"fmt"
"net/url"
"sync"

"github.com/sasha-s/go-deadlock"

"github.com/spf13/viper"
)
Expand Down Expand Up @@ -49,7 +50,7 @@ func NewTransport(config *viper.Viper) (Transport, error) {

// LocalTransport implements the TransportInterface without database and simply broadcast the live Updates.
type LocalTransport struct {
sync.RWMutex
deadlock.RWMutex
pipes map[*Pipe]struct{}
done chan struct{}
}
Expand Down
6 changes: 4 additions & 2 deletions hub/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"testing"
"time"

"github.com/sasha-s/go-deadlock"

"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -27,7 +29,7 @@ func TestLocalTransportWriteIsNotDispatchedUntilListen(t *testing.T) {
var (
readUpdate *Update
readError error
m sync.Mutex
m deadlock.Mutex
wg sync.WaitGroup
)
wg.Add(1)
Expand Down Expand Up @@ -63,7 +65,7 @@ func TestLocalTransportWriteIsDispatched(t *testing.T) {
var (
readUpdate *Update
readError error
m sync.Mutex
m deadlock.Mutex
wg sync.WaitGroup
)
wg.Add(1)
Expand Down

0 comments on commit f68da4c

Please sign in to comment.