Skip to content
Permalink
Browse files

refactor: add a semaphore lib

Simple lib for better semaphore semanthics.
  • Loading branch information...
caarlos0 committed Jun 25, 2018
1 parent 6a06f76 commit 58d71a1c957d511903c32f8b1fb948a116543de7
@@ -18,6 +18,7 @@ import (
"github.com/goreleaser/goreleaser/config" "github.com/goreleaser/goreleaser/config"
"github.com/goreleaser/goreleaser/context" "github.com/goreleaser/goreleaser/context"
"github.com/goreleaser/goreleaser/internal/artifact" "github.com/goreleaser/goreleaser/internal/artifact"
"github.com/goreleaser/goreleaser/internal/semaphore"
"github.com/goreleaser/goreleaser/pipeline" "github.com/goreleaser/goreleaser/pipeline"
) )


@@ -157,15 +158,13 @@ func Upload(ctx *context.Context, puts []config.Put, kind string, check Response
} }


func runPipeByFilter(ctx *context.Context, put config.Put, filter artifact.Filter, kind string, check ResponseChecker) error { func runPipeByFilter(ctx *context.Context, put config.Put, filter artifact.Filter, kind string, check ResponseChecker) error {
sem := make(chan bool, ctx.Parallelism) var sem = semaphore.New(ctx.Parallelism)
var g errgroup.Group var g errgroup.Group
for _, artifact := range ctx.Artifacts.Filter(filter).List() { for _, artifact := range ctx.Artifacts.Filter(filter).List() {
sem <- true sem.Acquire()
artifact := artifact artifact := artifact
g.Go(func() error { g.Go(func() error {
defer func() { defer sem.Release()
<-sem
}()
return uploadAsset(ctx, put, artifact, kind, check) return uploadAsset(ctx, put, artifact, kind, check)
}) })
} }
@@ -0,0 +1,20 @@
// Package semaphore provides a small and simple semaphore lib for goreleaser.
package semaphore

// Semaphore is the semaphore itself
type Semaphore chan bool

// New returns a new semaphore of a given size.
func New(size int) Semaphore {
return make(Semaphore, size)
}

// Acquire acquires one semaphore permit.
func (s Semaphore) Acquire() {
s <- true
}

// Release releases one semaphore permit
func (s Semaphore) Release() {
<-s
}
@@ -0,0 +1,20 @@
package semaphore

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestSemaphore(t *testing.T) {
var sem = New(1)
var counter = 0
for i := 0; i < 10; i++ {
sem.Acquire()
go func() {
counter++
sem.Release()
}()
}
require.Equal(t, counter, 9)
}
@@ -21,6 +21,7 @@ import (


// langs to init // langs to init
_ "github.com/goreleaser/goreleaser/internal/builders/golang" _ "github.com/goreleaser/goreleaser/internal/builders/golang"
"github.com/goreleaser/goreleaser/internal/semaphore"
) )


// Pipe for build // Pipe for build
@@ -71,16 +72,14 @@ func runPipeOnBuild(ctx *context.Context, build config.Build) error {
if err := runHook(ctx, build.Env, build.Hooks.Pre); err != nil { if err := runHook(ctx, build.Env, build.Hooks.Pre); err != nil {
return errors.Wrap(err, "pre hook failed") return errors.Wrap(err, "pre hook failed")
} }
sem := make(chan bool, ctx.Parallelism) var sem = semaphore.New(ctx.Parallelism)
var g errgroup.Group var g errgroup.Group
for _, target := range build.Targets { for _, target := range build.Targets {
sem <- true sem.Acquire()
target := target target := target
build := build build := build
g.Go(func() error { g.Go(func() error {
defer func() { defer sem.Release()
<-sem
}()
return doBuild(ctx, build, target) return doBuild(ctx, build, target)
}) })
} }
@@ -15,6 +15,7 @@ import (
"github.com/goreleaser/goreleaser/checksum" "github.com/goreleaser/goreleaser/checksum"
"github.com/goreleaser/goreleaser/context" "github.com/goreleaser/goreleaser/context"
"github.com/goreleaser/goreleaser/internal/artifact" "github.com/goreleaser/goreleaser/internal/artifact"
"github.com/goreleaser/goreleaser/internal/semaphore"
) )


// Pipe for checksums // Pipe for checksums
@@ -49,20 +50,18 @@ func (Pipe) Run(ctx *context.Context) (err error) {
defer file.Close() // nolint: errcheck defer file.Close() // nolint: errcheck


var g errgroup.Group var g errgroup.Group
var semaphore = make(chan bool, ctx.Parallelism) var sem = semaphore.New(ctx.Parallelism)
for _, artifact := range ctx.Artifacts.Filter( for _, artifact := range ctx.Artifacts.Filter(
artifact.Or( artifact.Or(
artifact.ByType(artifact.UploadableArchive), artifact.ByType(artifact.UploadableArchive),
artifact.ByType(artifact.UploadableBinary), artifact.ByType(artifact.UploadableBinary),
artifact.ByType(artifact.LinuxPackage), artifact.ByType(artifact.LinuxPackage),
), ),
).List() { ).List() {
semaphore <- true sem.Acquire()
artifact := artifact artifact := artifact
g.Go(func() error { g.Go(func() error {
defer func() { defer sem.Release()
<-semaphore
}()
return checksums(ctx, file, artifact) return checksums(ctx, file, artifact)
}) })
} }
@@ -19,6 +19,7 @@ import (
"github.com/goreleaser/goreleaser/context" "github.com/goreleaser/goreleaser/context"
"github.com/goreleaser/goreleaser/internal/artifact" "github.com/goreleaser/goreleaser/internal/artifact"
"github.com/goreleaser/goreleaser/internal/deprecate" "github.com/goreleaser/goreleaser/internal/deprecate"
"github.com/goreleaser/goreleaser/internal/semaphore"
"github.com/goreleaser/goreleaser/pipeline" "github.com/goreleaser/goreleaser/pipeline"
) )


@@ -81,15 +82,13 @@ func (Pipe) Run(ctx *context.Context) error {


func doRun(ctx *context.Context) error { func doRun(ctx *context.Context) error {
var g errgroup.Group var g errgroup.Group
sem := make(chan bool, ctx.Parallelism) var sem = semaphore.New(ctx.Parallelism)
for i, docker := range ctx.Config.Dockers { for i, docker := range ctx.Config.Dockers {
docker := docker docker := docker
seed := i seed := i
sem <- true sem.Acquire()
g.Go(func() error { g.Go(func() error {
defer func() { defer sem.Release()
<-sem
}()
log.WithField("docker", docker).Debug("looking for binaries matching") log.WithField("docker", docker).Debug("looking for binaries matching")
var binaries = ctx.Artifacts.Filter( var binaries = ctx.Artifacts.Filter(
artifact.And( artifact.And(
@@ -22,6 +22,7 @@ import (
"github.com/goreleaser/goreleaser/internal/artifact" "github.com/goreleaser/goreleaser/internal/artifact"
"github.com/goreleaser/goreleaser/internal/filenametemplate" "github.com/goreleaser/goreleaser/internal/filenametemplate"
"github.com/goreleaser/goreleaser/internal/linux" "github.com/goreleaser/goreleaser/internal/linux"
"github.com/goreleaser/goreleaser/internal/semaphore"
"github.com/goreleaser/goreleaser/pipeline" "github.com/goreleaser/goreleaser/pipeline"
) )


@@ -63,17 +64,15 @@ func doRun(ctx *context.Context) error {
artifact.ByGoos("linux"), artifact.ByGoos("linux"),
)).GroupByPlatform() )).GroupByPlatform()
var g errgroup.Group var g errgroup.Group
sem := make(chan bool, ctx.Parallelism) var sem = semaphore.New(ctx.Parallelism)
for _, format := range ctx.Config.NFPM.Formats { for _, format := range ctx.Config.NFPM.Formats {
for platform, artifacts := range linuxBinaries { for platform, artifacts := range linuxBinaries {
sem <- true sem.Acquire()
format := format format := format
arch := linux.Arch(platform) arch := linux.Arch(platform)
artifacts := artifacts artifacts := artifacts
g.Go(func() error { g.Go(func() error {
defer func() { defer sem.Release()
<-sem
}()
return create(ctx, format, arch, artifacts) return create(ctx, format, arch, artifacts)
}) })
} }
@@ -9,6 +9,7 @@ import (
"github.com/goreleaser/goreleaser/context" "github.com/goreleaser/goreleaser/context"
"github.com/goreleaser/goreleaser/internal/artifact" "github.com/goreleaser/goreleaser/internal/artifact"
"github.com/goreleaser/goreleaser/internal/client" "github.com/goreleaser/goreleaser/internal/client"
"github.com/goreleaser/goreleaser/internal/semaphore"
"github.com/goreleaser/goreleaser/pipeline" "github.com/goreleaser/goreleaser/pipeline"
) )


@@ -66,7 +67,7 @@ func doRun(ctx *context.Context, c client.Client) error {
return err return err
} }
var g errgroup.Group var g errgroup.Group
sem := make(chan bool, ctx.Parallelism) var sem = semaphore.New(ctx.Parallelism)
for _, artifact := range ctx.Artifacts.Filter( for _, artifact := range ctx.Artifacts.Filter(
artifact.Or( artifact.Or(
artifact.ByType(artifact.UploadableArchive), artifact.ByType(artifact.UploadableArchive),
@@ -76,12 +77,10 @@ func doRun(ctx *context.Context, c client.Client) error {
artifact.ByType(artifact.LinuxPackage), artifact.ByType(artifact.LinuxPackage),
), ),
).List() { ).List() {
sem <- true sem.Acquire()
artifact := artifact artifact := artifact
g.Go(func() error { g.Go(func() error {
defer func() { defer sem.Release()
<-sem
}()
return upload(ctx, c, releaseID, artifact) return upload(ctx, c, releaseID, artifact)
}) })
} }
@@ -14,6 +14,7 @@ import (
"github.com/goreleaser/goreleaser/context" "github.com/goreleaser/goreleaser/context"
"github.com/goreleaser/goreleaser/internal/artifact" "github.com/goreleaser/goreleaser/internal/artifact"
"github.com/goreleaser/goreleaser/internal/nametemplate" "github.com/goreleaser/goreleaser/internal/nametemplate"
"github.com/goreleaser/goreleaser/internal/semaphore"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
) )


@@ -45,14 +46,12 @@ func (Pipe) Default(ctx *context.Context) error {
// Run the pipe // Run the pipe
func (Pipe) Run(ctx *context.Context) error { func (Pipe) Run(ctx *context.Context) error {
var g errgroup.Group var g errgroup.Group
sem := make(chan bool, ctx.Parallelism) var sem = semaphore.New(ctx.Parallelism)
for _, conf := range ctx.Config.S3 { for _, conf := range ctx.Config.S3 {
conf := conf conf := conf
sem <- true sem.Acquire()
g.Go(func() error { g.Go(func() error {
defer func() { defer sem.Release()
<-sem
}()
return upload(ctx, conf) return upload(ctx, conf)
}) })
} }
@@ -80,7 +79,7 @@ func upload(ctx *context.Context, conf config.S3) error {
} }


var g errgroup.Group var g errgroup.Group
sem := make(chan bool, ctx.Parallelism) var sem = semaphore.New(ctx.Parallelism)
for _, artifact := range ctx.Artifacts.Filter( for _, artifact := range ctx.Artifacts.Filter(
artifact.Or( artifact.Or(
artifact.ByType(artifact.UploadableArchive), artifact.ByType(artifact.UploadableArchive),
@@ -90,12 +89,10 @@ func upload(ctx *context.Context, conf config.S3) error {
artifact.ByType(artifact.LinuxPackage), artifact.ByType(artifact.LinuxPackage),
), ),
).List() { ).List() {
sem <- true sem.Acquire()
artifact := artifact artifact := artifact
g.Go(func() error { g.Go(func() error {
defer func() { defer sem.Release()
<-sem
}()
f, err := os.Open(artifact.Path) f, err := os.Open(artifact.Path)
if err != nil { if err != nil {
return err return err
@@ -18,6 +18,7 @@ import (
"github.com/goreleaser/goreleaser/internal/artifact" "github.com/goreleaser/goreleaser/internal/artifact"
"github.com/goreleaser/goreleaser/internal/filenametemplate" "github.com/goreleaser/goreleaser/internal/filenametemplate"
"github.com/goreleaser/goreleaser/internal/linux" "github.com/goreleaser/goreleaser/internal/linux"
"github.com/goreleaser/goreleaser/internal/semaphore"
"github.com/goreleaser/goreleaser/pipeline" "github.com/goreleaser/goreleaser/pipeline"
) )


@@ -84,24 +85,22 @@ func (Pipe) Run(ctx *context.Context) error {
} }


var g errgroup.Group var g errgroup.Group
sem := make(chan bool, ctx.Parallelism) var sem = semaphore.New(ctx.Parallelism)
for platform, binaries := range ctx.Artifacts.Filter( for platform, binaries := range ctx.Artifacts.Filter(
artifact.And( artifact.And(
artifact.ByGoos("linux"), artifact.ByGoos("linux"),
artifact.ByType(artifact.Binary), artifact.ByType(artifact.Binary),
), ),
).GroupByPlatform() { ).GroupByPlatform() {
sem <- true sem.Acquire()
arch := linux.Arch(platform) arch := linux.Arch(platform)
if arch == "armel" { if arch == "armel" {
log.WithField("arch", arch).Warn("ignored unsupported arch") log.WithField("arch", arch).Warn("ignored unsupported arch")
continue continue
} }
binaries := binaries binaries := binaries
g.Go(func() error { g.Go(func() error {
go func() { defer sem.Release()
<-sem
}()
return create(ctx, arch, binaries) return create(ctx, arch, binaries)
}) })
} }

0 comments on commit 58d71a1

Please sign in to comment.
You can’t perform that action at this time.