本章中的方法包括工作池、异步操作的等待组以及context
包的使用。并行性和并发性是 Go 语言最受宣传和推广的特性之一。本章将提供一些有用的模式,帮助您入门并了解这些特性。
Go 提供了使并行应用成为可能的原语。Goroutines 允许任何函数变成异步和并发的。通道允许应用设置与 Goroutines 的通信。围棋中的一句名言是,“不是通过分享记忆来交流,而是通过交流来分享记忆”,来自https://blog.golang.org/share-memory-by-communicating 。
在本章中,我们将介绍以下配方:
- 使用通道和 select 语句
- 使用 sync.WaitGroup 执行异步操作
- 使用原子操作和互斥
- 使用上下文包
- 执行通道的状态管理
- 使用工作池设计模式
- 使用工人创建管道
为了继续本章中的所有配方,请按照以下步骤配置您的环境:
- 从下载 Go 1.12.6 或更高版本并安装到您的操作系统上 https://golang.org/doc/install 。
- 打开终端或控制台应用,创建并导航到项目目录,如
~/projects/go-programming-cookbook
,所有代码将从此目录运行和修改。 - 将最新代码克隆到
~/projects/go-programming-cookbook-original
并(可选)从该目录中工作,而不是手动键入示例:
$ git clone git@github.com:PacktPublishing/Go-Programming-Cookbook-Second-Edition.git go-programming-cookbook-original
Go 通道与 Goroutines 相结合,是异步通信的一流公民。当我们使用 select 语句时,通道变得特别强大。这些语句允许 Goroutine 一次智能地处理来自多个通道的请求。
这些步骤包括编写和运行应用:
- 从终端或控制台应用中,创建一个名为
~/projects/go-programming-cookbook/chapter10/channels
的新目录并导航到它。 - 运行以下命令:
$ go mod init github.com/PacktPublishing/Go-Programming-Cookbook-Second-Edition/chapter10/channels
您应该会看到一个名为go.mod
的文件,其中包含以下代码:
module github.com/PacktPublishing/Go-Programming-Cookbook-Second-Edition/chapter10/channels
- 从
~/projects/go-programming-cookbook-original/chapter10/channels
复制测试,或者以此为机会编写一些自己的代码! - 创建一个名为
sender.go
的文件,其内容如下:
package channels
import "time"
// Sender sends "tick"" on ch until done is
// written to, then it sends "sender done."
// and exits
func Sender(ch chan string, done chan bool) {
t := time.Tick(100 * time.Millisecond)
for {
select {
case <-done:
ch <- "sender done."
return
case <-t:
ch <- "tick"
}
}
}
- 创建一个名为
printer.go
的文件,其内容如下:
package channels
import (
"context"
"fmt"
"time"
)
// Printer will print anything sent on the ch chan
// and will print tock every 200 milliseconds
// this will repeat forever until a context is
// Done, i.e. timed out or cancelled
func Printer(ctx context.Context, ch chan string) {
t := time.Tick(200 * time.Millisecond)
for {
select {
case <-ctx.Done():
fmt.Println("printer done.")
return
case res := <-ch:
fmt.Println(res)
case <-t:
fmt.Println("tock")
}
}
}
- 创建一个名为
example
的新目录并导航到它。 - 创建一个名为
main.go
的文件,其内容如下:
package main
import (
"context"
"time"
"github.com/PacktPublishing/
Go-Programming-Cookbook-Second-Edition/
chapter10/channels"
)
func main() {
ch := make(chan string)
done := make(chan bool)
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
go channels.Printer(ctx, ch)
go channels.Sender(ch, done)
time.Sleep(2 * time.Second)
done <- true
cancel()
//sleep a bit extra so channels can clean up
time.Sleep(3 * time.Second)
}
- 运行
go run main.go
。 - 您还可以运行以下命令:
$ go build $ ./example
您现在应该看到以下输出,但打印顺序可能有所不同:
$ go run main.go
tick
tock
tick
tick
tock
tick
tick
tock
tick
.
.
.
sender done.
printer done.
go.mod
文件可能会被更新,go.sum
文件现在应该存在于顶级配方目录中。- 如果您复制或编写了自己的测试,请转到一个目录并运行
go test
。确保所有测试都通过。
此配方演示了两种启动工作进程的方法,该工作进程可以读取通道,也可以写入通道,并且可能同时执行这两种操作。当写入done
通道时,或通过调用取消功能或超时取消context
时,工作者将终止。使用上下文包配方的将更详细地涵盖context
包。
main
包用于将单独的功能连接在一起;因此,只要通道不共享,就可以设置多个对。除此之外,还可以让多个 goroutine 监听同一个频道,我们将在中使用工作池设计模式配方进行探讨。
最后,由于 goroutine 的异步性质,建立清除和终止条件可能会很困难;例如,常见的错误是执行以下操作:
select{
case <-time.Tick(200 * time.Millisecond):
//this resets whenever any other 'lane' is chosen
}
通过将Tick
放在select
语句中,就有可能防止这种情况发生。在select
语句中,也没有简单的方法对流量进行优先级排序。
有时,异步执行一些操作,然后等到它们完成后再继续,这是很有用的。例如,如果一个操作需要从多个 API 中提取信息并聚合这些信息,那么异步发出这些客户机请求可能会有所帮助。本食谱将探索使用sync.WaitGroup
来协调并行的非依赖性任务。
这些步骤包括编写和运行应用:
- 从终端或控制台应用中,创建一个名为
~/projects/go-programming-cookbook/chapter10/waitgroup
的新目录并导航到它。 - 运行以下命令:
$ go mod init github.com/PacktPublishing/Go-Programming-Cookbook-Second-Edition/chapter10/waitgroup
您应该会看到一个名为go.mod
的文件,其中包含以下内容:
module github.com/PacktPublishing/Go-Programming-Cookbook-Second-Edition/chapter10/waitgroup
-
从
~/projects/go-programming-cookbook-original/chapter10/waitgroup
复制测试,或者以此为机会编写一些自己的代码! -
创建一个名为
tasks.go
的文件,其内容如下:
package waitgroup
import (
"fmt"
"log"
"net/http"
"strings"
"time"
)
// GetURL gets a url, and logs the time it took
func GetURL(url string) (*http.Response, error) {
start := time.Now()
log.Printf("getting %s", url)
resp, err := http.Get(url)
log.Printf("completed getting %s in %s", url,
time.Since(start))
return resp, err
}
// CrawlError is our custom error type
// for aggregating errors
type CrawlError struct {
Errors []string
}
// Add adds another error
func (c *CrawlError) Add(err error) {
c.Errors = append(c.Errors, err.Error())
}
// Error implements the error interface
func (c *CrawlError) Error() string {
return fmt.Sprintf("All Errors: %s", strings.Join(c.Errors,
","))
}
// Present can be used to determine if
// we should return this
func (c *CrawlError) Present() bool {
return len(c.Errors) != 0
}
- 创建一个名为
process.go
的文件,其内容如下:
package waitgroup
import (
"log"
"sync"
"time"
)
// Crawl collects responses from a list of urls
// that are passed in. It waits for all requests
// to complete before returning.
func Crawl(sites []string) ([]int, error) {
start := time.Now()
log.Printf("starting crawling")
wg := &sync.WaitGroup{}
var resps []int
cerr := &CrawlError{}
for _, v := range sites {
wg.Add(1)
go func(v string) {
defer wg.Done()
resp, err := GetURL(v)
if err != nil {
cerr.Add(err)
return
}
resps = append(resps, resp.StatusCode)
}(v)
}
wg.Wait()
// we encountered a crawl error
if cerr.Present() {
return resps, cerr
}
log.Printf("completed crawling in %s", time.Since(start))
return resps, nil
}
- 创建一个名为
example
的新目录并导航到它。 - 创建一个名为
main.go
的文件,其内容如下:
package main
import (
"fmt"
"github.com/PacktPublishing/
Go-Programming-Cookbook-Second-Edition/
chapter10/waitgroup"
)
func main() {
sites := []string{
"https://golang.org",
"https://godoc.org",
"https://www.google.com/search?q=golang",
}
resps, err := waitgroup.Crawl(sites)
if err != nil {
panic(err)
}
fmt.Println("Resps received:", resps)
}
- 运行
go run main.go
。 - 您还可以运行以下命令:
$ go build $ ./example
您应该看到以下输出:
$ go run main.go
2017/04/05 19:45:07 starting crawling
2017/04/05 19:45:07 getting https://www.google.com/search?
q=golang
2017/04/05 19:45:07 getting https://golang.org
2017/04/05 19:45:07 getting https://godoc.org
2017/04/05 19:45:07 completed getting https://golang.org in
178.22407ms
2017/04/05 19:45:07 completed getting https://godoc.org in
181.400873ms
2017/04/05 19:45:07 completed getting
https://www.google.com/search?q=golang in 238.019327ms
2017/04/05 19:45:07 completed crawling in 238.191791ms
Resps received: [200 200 200]
go.mod
文件可能会被更新,go.sum
文件现在应该存在于顶级配方目录中。- 如果您复制或编写了自己的测试,请转到一个目录并运行
go test
。确保所有测试都通过。
此配方向您展示了如何在等待工作时使用waitgroups
作为同步机制。本质上,waitgroup.Wait()
将等待其内部计数器达到0
为止。waitgroup.Add(int)
方法将按输入的金额递增计数器,waitgroup.Done()
将按1
递减计数器。因此,当各种 goroutine 将waitgroup
标记为Done()
时,有必要异步Wait()
。
在这个方法中,我们在发送每个 HTTP 请求之前递增,然后调用一个 deferwg.Done()
方法,这样我们就可以在 Goroutine 终止时递减。然后,我们等待所有 goroutine 完成,然后返回聚合结果。
实际上,最好使用通道来传递错误和响应。
当像这样异步执行操作时,您应该考虑线程安全性,例如修改共享映射。如果您记住这一点,waitgroups
是等待任何类型的异步操作的有用特性。
在诸如 GO 之类的语言中,可以在异步操作和并行性中构建,重要的是考虑诸如线程安全之类的事情。例如,同时从多个 goroutine 访问地图是危险的。Go 在sync
和sync/atomic
包中提供了许多帮助程序,以确保某些事件只发生一次,或者 Goroutines 可以在操作上序列化。
此配方将演示如何使用这些包安全地修改具有各种 goroutine 的映射,并保留可由多个 goroutine 安全访问的全局序号值。它还将展示Once.Do
方法,该方法可用于确保 Go 应用只执行一次操作,例如读取配置文件或初始化变量。
这些步骤包括编写和运行应用:
- 从终端或控制台应用中,创建一个名为
~/projects/go-programming-cookbook/chapter10/atomic
的新目录并导航到它。 - 运行以下命令:
$ go mod init github.com/PacktPublishing/Go-Programming-Cookbook-Second-Edition/chapter10/atomic
您应该会看到一个名为go.mod
的文件,其中包含以下内容:
module github.com/PacktPublishing/Go-Programming-Cookbook-Second-Edition/chapter10/atomic
- 从
~/projects/go-programming-cookbook-original/chapter10/atomic
复制测试,或者以此为机会编写一些自己的代码! - 创建一个名为
map.go
的文件,其内容如下:
package atomic
import (
"errors"
"sync"
)
// SafeMap uses a mutex to allow
// getting and setting in a thread-safe way
type SafeMap struct {
m map[string]string
mu *sync.RWMutex
}
// NewSafeMap creates a SafeMap
func NewSafeMap() SafeMap {
return SafeMap{m: make(map[string]string), mu:
&sync.RWMutex{}}
}
// Set uses a write lock and sets the value given
// a key
func (t *SafeMap) Set(key, value string) {
t.mu.Lock()
defer t.mu.Unlock()
t.m[key] = value
}
// Get uses a RW lock and gets the value if it exists,
// otherwise an error is returned
func (t *SafeMap) Get(key string) (string, error) {
t.mu.RLock()
defer t.mu.RUnlock()
if v, ok := t.m[key]; ok {
return v, nil
}
return "", errors.New("key not found")
}
- 创建一个名为
ordinal.go
的文件,其内容如下:
package atomic
import (
"sync"
"sync/atomic"
)
// Ordinal holds a global a value
// and can only be initialized once
type Ordinal struct {
ordinal uint64
once *sync.Once
}
// NewOrdinal returns ordinal with once
// setup
func NewOrdinal() *Ordinal {
return &Ordinal{once: &sync.Once{}}
}
// Init sets the ordinal value
// can only be done once
func (o *Ordinal) Init(val uint64) {
o.once.Do(func() {
atomic.StoreUint64(&o.ordinal, val)
})
}
// GetOrdinal will return the current
// ordinal
func (o *Ordinal) GetOrdinal() uint64 {
return atomic.LoadUint64(&o.ordinal)
}
// Increment will increment the current
// ordinal
func (o *Ordinal) Increment() {
atomic.AddUint64(&o.ordinal, 1)
}
- 创建一个名为
example
的新目录并导航到它。 - 创建一个名为
main.go
的文件,其内容如下:
package main
import (
"fmt"
"sync"
"github.com/PacktPublishing/
Go-Programming-Cookbook-Second-Edition/
chapter10/atomic"
)
func main() {
o := atomic.NewOrdinal()
m := atomic.NewSafeMap()
o.Init(1123)
fmt.Println("initial ordinal is:", o.GetOrdinal())
wg := sync.WaitGroup{}
for i := 0; i < 10; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
m.Set(fmt.Sprint(i), "success")
o.Increment()
}(i)
}
wg.Wait()
for i := 0; i < 10; i++ {
v, err := m.Get(fmt.Sprint(i))
if err != nil || v != "success" {
panic(err)
}
}
fmt.Println("final ordinal is:", o.GetOrdinal())
fmt.Println("all keys found and marked as: 'success'")
}
- 运行
go run main.go
。 - 您还可以运行以下命令:
$ go build $ ./example
您现在应该看到以下输出:
$ go run main.go
initial ordinal is: 1123
final ordinal is: 1133
all keys found and marked as: 'success'
go.mod
文件可能会被更新,go.sum
文件现在应该存在于顶级配方目录中。- 如果您复制或编写了自己的测试,请转到一个目录并运行
go test
。确保所有测试都通过。
对于我们的 map 配方,我们使用了一个ReadWrite
互斥。这个互斥锁背后的思想是,任意数量的读卡器都可以获得读锁,但只有一个写卡器可以获得写锁。此外,当其他任何人(读卡器或写卡器)拥有锁时,写卡器无法获得锁。这非常有用,因为与标准互斥相比,读取速度非常快且无阻塞。当我们想要设置数据时,我们使用Lock()
对象,当我们想要读取数据时,我们使用RLock()
。最终使用Unlock()
或RUnlock()
是非常重要的,这样才能避免应用死锁。延迟的Unlock()
对象可能有用,但可能比手动调用Unlock()
慢。
当您希望使用锁定值对其他操作进行分组时,此模式可能不够灵活。例如,在某些情况下,您可能需要锁定、执行一些附加处理,并且只有在完成此操作后才能解锁。考虑到这一点对你的设计很重要。
Ordinal
使用sync/atmoic
包获取和设置值。还有原子比较操作,例如atomic.CompareAndSwapUInt64()
,非常有价值。此配方只允许对Ordinal
对象调用Init
一次;否则,它只能以原子方式递增。
我们循环并创建 10 个 goroutine(与sync.Waitgroup
同步),并显示序号正确地递增了 10 倍,并且映射中的每个键都已正确设置。
本书中有几个食谱使用了context
包装。本食谱将探索创建和管理上下文的基础知识。理解上下文的一个很好的参考是https://blog.golang.org/context 。自从写了这篇博文后,上下文从net/context
移动到了一个名为context
的包。在与第三方库(如 GRPC)交互时,这仍然偶尔会导致问题。
本食谱将探索设置和获取上下文、取消和超时的值。
这些步骤包括编写和运行应用:
- 从终端或控制台应用中,创建一个名为
~/projects/go-programming-cookbook/chapter10/context
的新目录并导航到它。 - 运行以下命令:
$ go mod init github.com/PacktPublishing/Go-Programming-Cookbook-Second-Edition/chapter10/context
您应该会看到一个名为go.mod
的文件,其中包含以下内容:
module github.com/PacktPublishing/Go-Programming-Cookbook-Second-Edition/chapter10/context
- 从
~/projects/go-programming-cookbook-original/chapter10/context
复制测试,或者将其作为练习来编写您自己的代码! - 创建一个名为
values.go
的文件,其内容如下:
package context
import "context"
type key string
const (
timeoutKey key = "TimeoutKey"
deadlineKey key = "DeadlineKey"
)
// Setup sets some values
func Setup(ctx context.Context) context.Context {
ctx = context.WithValue(ctx, timeoutKey,
"timeout exceeded")
ctx = context.WithValue(ctx, deadlineKey,
"deadline exceeded")
return ctx
}
// GetValue grabs a value given a key and
// returns a string representation of the
// value
func GetValue(ctx context.Context, k key) string {
if val, ok := ctx.Value(k).(string); ok {
return val
}
return ""
}
- 创建一个名为
exec.go
的文件,其内容如下:
package context
import (
"context"
"fmt"
"math/rand"
"time"
)
// Exec sets two random timers and prints
// a different context value for whichever
// fires first
func Exec() {
// a base context
ctx := context.Background()
ctx = Setup(ctx)
rand.Seed(time.Now().UnixNano())
timeoutCtx, cancel := context.WithTimeout(ctx,
(time.Duration(rand.Intn(2)) * time.Millisecond))
defer cancel()
deadlineCtx, cancel := context.WithDeadline(ctx,
time.Now().Add(time.Duration(rand.Intn(2))
*time.Millisecond))
defer cancel()
for {
select {
case <-timeoutCtx.Done():
fmt.Println(GetValue(ctx, timeoutKey))
return
case <-deadlineCtx.Done():
fmt.Println(GetValue(ctx, deadlineKey))
return
}
}
}
- 创建一个名为
example
的新目录并导航到它。 - 创建一个名为
main.go
的文件,其内容如下:
package main
import "github.com/PacktPublishing/
Go-Programming-Cookbook-Second-Edition/
chapter10/context"
func main() {
context.Exec()
}
- 运行
go run main.go
。 - 您还可以运行以下命令:
$ go build $ ./example
您现在应该看到以下输出:
$ go run main.go
timeout exceeded
OR
$ go run main.go
deadline exceeded
go.mod
文件可能会被更新,go.sum
文件现在应该存在于顶级配方目录中。- 如果您复制或编写了自己的测试,请转到一个目录并运行
go test
。确保所有测试都通过。
使用上下文值时,最好创建一个新类型来表示键。在本例中,我们创建了一个key
类型,然后声明了一些相应的const
值来表示所有可能的键。
在这种情况下,我们使用Setup()
函数同时初始化所有键/值对。修改上下文时,函数通常采用一个context
参数并返回一个context
值。因此,签名通常如下所示:
func ModifyContext(ctx context.Context) context.Context
有时,这些方法也会返回错误或cancel()
函数,例如在context.WithCancel
、context.WithTimeout
和context.WithDeadline
的情况下。所有子上下文都继承父上下文的属性。
在这个配方中,我们创建了两个子上下文,一个有截止日期,一个有超时。我们将这些超时设置为随机范围,然后在接收到其中一个时终止。最后,我们提取一个给定 set 键的值并打印它。
频道可以是 Go 中的任何类型。结构通道允许您通过单个消息传递大量状态。本食谱将探讨如何使用通道传递复杂的请求结构,并在复杂的响应结构中返回结果。
在下一个配方中,使用工作者池设计模式,其价值变得更加明显,因为您可以创建能够执行各种任务的通用工作者。
这些步骤包括编写和运行应用:
- 从终端或控制台应用中,创建一个名为
~/projects/go-programming-cookbook/chapter10/state
的新目录并导航到它。 - 运行以下命令:
$ go mod init github.com/PacktPublishing/Go-Programming-Cookbook-Second-Edition/chapter10/state
您应该会看到一个名为go.mod
的文件,其中包含以下内容:
module github.com/PacktPublishing/Go-Programming-Cookbook-Second-Edition/chapter10/state
- 从
~/projects/go-programming-cookbook-original/chapter10/state
复制测试,或者以此为机会编写一些自己的代码! - 创建一个名为
state.go
的文件,其内容如下:
package state
type op string
const (
// Add values
Add op = "add"
// Subtract values
Subtract = "sub"
// Multiply values
Multiply = "mult"
// Divide values
Divide = "div"
)
// WorkRequest perform an op
// on two values
type WorkRequest struct {
Operation op
Value1 int64
Value2 int64
}
// WorkResponse returns the result
// and any errors
type WorkResponse struct {
Wr *WorkRequest
Result int64
Err error
}
- 创建一个名为
processor.go
的文件,其内容如下:
package state
import "context"
// Processor routes work to Process
func Processor(ctx context.Context, in chan *WorkRequest, out
chan *WorkResponse) {
for {
select {
case <-ctx.Done():
return
case wr := <-in:
out <- Process(wr)
}
}
}
- 创建一个名为
process.go
的文件,其内容如下:
package state
import "errors"
// Process switches on operation type
// Then does work
func Process(wr *WorkRequest) *WorkResponse {
resp := WorkResponse{Wr: wr}
switch wr.Operation {
case Add:
resp.Result = wr.Value1 + wr.Value2
case Subtract:
resp.Result = wr.Value1 - wr.Value2
case Multiply:
resp.Result = wr.Value1 * wr.Value2
case Divide:
if wr.Value2 == 0 {
resp.Err = errors.New("divide by 0")
break
}
resp.Result = wr.Value1 / wr.Value2
default:
resp.Err = errors.New("unsupported operation")
}
return &resp
}
- 创建一个名为
example
的新目录并导航到它。 - 创建一个名为
main.go
的文件,其内容如下:
package main
import (
"context"
"fmt"
"github.com/PacktPublishing/
Go-Programming-Cookbook-Second-Edition/
chapter10/state"
)
func main() {
in := make(chan *state.WorkRequest, 10)
out := make(chan *state.WorkResponse, 10)
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
go state.Processor(ctx, in, out)
req := state.WorkRequest{state.Add, 3, 4}
in <- &req
req2 := state.WorkRequest{state.Subtract, 5, 2}
in <- &req2
req3 := state.WorkRequest{state.Multiply, 9, 9}
in <- &req3
req4 := state.WorkRequest{state.Divide, 8, 2}
in <- &req4
req5 := state.WorkRequest{state.Divide, 8, 0}
in <- &req5
for i := 0; i < 5; i++ {
resp := <-out
fmt.Printf("Request: %v; Result: %v, Error: %vn",
resp.Wr, resp.Result, resp.Err)
}
}
- 运行
go run main.go
。 - 您还可以运行以下命令:
$ go build $ ./example
您现在应该看到以下输出:
$ go run main.go
Request: &{add 3 4}; Result: 7, Error: <nil>
Request: &{sub 5 2}; Result: 3, Error: <nil>
Request: &{mult 9 9}; Result: 81, Error: <nil>
Request: &{div 8 2}; Result: 4, Error: <nil>
Request: &{div 8 0}; Result: 0, Error: divide by 0
go.mod
文件可能会被更新,go.sum
文件现在应该存在于顶级配方目录中。- 如果您复制或编写了自己的测试,请转到一个目录并运行
go test
。确保所有测试都通过。
此配方中的Processor()
函数是一个永远循环的函数,通过显式调用 cancel 或超时,直到上下文被取消。它将所有工作分派给Process()
,当给定各种操作时,它可以处理不同的功能。还可以让这些情况中的每一个为更多的模块化代码分派另一个函数。
最终,响应返回到一个响应通道,我们循环并在最后打印所有结果。我们还在divide by 0
示例中演示了一个错误案例。
worker pool 设计模式是一种将长期运行的 Goroutine 作为 worker 进行调度的模式。这些工作人员可以使用多个通道或使用指定类型的有状态请求结构来处理各种工作,如前一个配方中所述。此配方将创建有状态的工作线程,并演示如何协调和启动在同一通道上同时处理请求的多个工作线程。这些工作者将是crypto
工作者,就像在 web 身份验证应用中一样。它们的目的是使用bcrypt
包对明文字符串进行散列,并将文本密码与散列进行比较。
这些步骤包括编写和运行应用:
- 从终端或控制台应用中,创建一个名为
~/projects/go-programming-cookbook/chapter10/pool
的新目录并导航到它。 - 运行以下命令:
$ go mod init github.com/PacktPublishing/Go-Programming-Cookbook-Second-Edition/chapter10/pool
您应该会看到一个名为go.mod
的文件,其中包含以下内容:
module github.com/PacktPublishing/Go-Programming-Cookbook-Second-Edition/chapter10/pool
- 从
~/projects/go-programming-cookbook-original/chapter10/pool
复制测试,或者以此为机会编写一些自己的代码! - 创建一个名为
worker.go
的文件,其内容如下:
package pool
import (
"context"
"fmt"
)
// Dispatch creates numWorker workers, returns a cancel
// function channels for adding work and responses,
// cancel must be called
func Dispatch(numWorker int) (context.CancelFunc, chan
WorkRequest, chan WorkResponse) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
in := make(chan WorkRequest, 10)
out := make(chan WorkResponse, 10)
for i := 0; i < numWorker; i++ {
go Worker(ctx, i, in, out)
}
return cancel, in, out
}
// Worker loops forever and is part of the worker pool
func Worker(ctx context.Context, id int, in chan WorkRequest,
out chan WorkResponse) {
for {
select {
case <-ctx.Done():
return
case wr := <-in:
fmt.Printf("worker id: %d, performing %s
workn", id, wr.Op)
out <- Process(wr)
}
}
}
- 创建一个名为
work.go
的文件,其内容如下:
package pool
import "errors"
type op string
const (
// Hash is the bcrypt work type
Hash op = "encrypt"
// Compare is bcrypt compare work
Compare = "decrypt"
)
// WorkRequest is a worker req
type WorkRequest struct {
Op op
Text []byte
Compare []byte // optional
}
// WorkResponse is a worker resp
type WorkResponse struct {
Wr WorkRequest
Result []byte
Matched bool
Err error
}
// Process dispatches work to the worker pool channel
func Process(wr WorkRequest) WorkResponse {
switch wr.Op {
case Hash:
return hashWork(wr)
case Compare:
return compareWork(wr)
default:
return WorkResponse{Err: errors.New("unsupported
operation")}
}
}
- 创建一个名为
crypto.go
的文件,其内容如下:
package pool
import "golang.org/x/crypto/bcrypt"
func hashWork(wr WorkRequest) WorkResponse {
val, err := bcrypt.GenerateFromPassword(wr.Text,
bcrypt.DefaultCost)
return WorkResponse{
Result: val,
Err: err,
Wr: wr,
}
}
func compareWork(wr WorkRequest) WorkResponse {
var matched bool
err := bcrypt.CompareHashAndPassword(wr.Compare, wr.Text)
if err == nil {
matched = true
}
return WorkResponse{
Matched: matched,
Err: err,
Wr: wr,
}
}
- 创建一个名为
example
的新目录并导航到它。 - 创建一个名为
main.go
的文件,其内容如下:
package main
import (
"fmt"
"github.com/PacktPublishing/
Go-Programming-Cookbook-Second-Edition/
chapter10/pool"
)
func main() {
cancel, in, out := pool.Dispatch(10)
defer cancel()
for i := 0; i < 10; i++ {
in <- pool.WorkRequest{Op: pool.Hash, Text:
[]byte(fmt.Sprintf("messages %d", i))}
}
for i := 0; i < 10; i++ {
res := <-out
if res.Err != nil {
panic(res.Err)
}
in <- pool.WorkRequest{Op: pool.Compare, Text:
res.Wr.Text, Compare: res.Result}
}
for i := 0; i < 10; i++ {
res := <-out
if res.Err != nil {
panic(res.Err)
}
fmt.Printf("string: "%s"; matched: %vn",
string(res.Wr.Text), res.Matched)
}
}
- 运行
go run main.go
。 - 您还可以运行以下命令:
$ go build $ ./example
您现在应该看到以下输出:
$ go run main.go
worker id: 9, performing encrypt work
worker id: 5, performing encrypt work
worker id: 2, performing encrypt work
worker id: 8, performing encrypt work
worker id: 6, performing encrypt work
worker id: 1, performing encrypt work
worker id: 0, performing encrypt work
worker id: 4, performing encrypt work
worker id: 3, performing encrypt work
worker id: 7, performing encrypt work
worker id: 2, performing decrypt work
worker id: 6, performing decrypt work
worker id: 8, performing decrypt work
worker id: 1, performing decrypt work
worker id: 0, performing decrypt work
worker id: 9, performing decrypt work
worker id: 3, performing decrypt work
worker id: 4, performing decrypt work
worker id: 7, performing decrypt work
worker id: 5, performing decrypt work
string: "messages 9"; matched: true
string: "messages 3"; matched: true
string: "messages 4"; matched: true
string: "messages 0"; matched: true
string: "messages 1"; matched: true
string: "messages 8"; matched: true
string: "messages 5"; matched: true
string: "messages 7"; matched: true
string: "messages 2"; matched: true
string: "messages 6"; matched: true
go.mod
文件可能会被更新,go.sum
文件现在应该存在于顶级配方目录中。- 如果您复制或编写了自己的测试,请转到一个目录并运行
go test
。确保所有测试都通过。
此配方使用Dispatch()
方法在单个输入通道、输出通道和连接到单个cancel()
函数的通道上创建多个工作线程。如果您想为不同的目的创建不同的池,可以使用此选项。例如,您可以使用单独的池创建 10 个crypto
和 20 个compare
工作者。对于这个配方,我们使用一个池,将哈希请求发送给工作者,检索响应,然后将compare
请求发送到同一个池。因此,执行工作的工人每次都会有所不同,但他们都能够执行任何一种类型的工作。
这种方法的优点是两个请求都允许并行性,并且还可以控制最大并发性。限制 goroutine 的最大数量对于限制内存也很重要。我选择了crypto
作为这个配方,因为crypto
是一个很好的例子,如果你为每一个新的请求启动一个新的 Goroutine,那么它会让你的 CPU 或内存崩溃;例如,在 web 服务中。
此配方演示如何创建工作池组并将它们连接在一起以形成管道。对于这个方法,我们将两个池链接在一起,但该模式可以用于更复杂的操作,类似于中间件。
工作线程池对于保持工作线程相对简单以及进一步控制并发性非常有用。例如,在并行化其他操作的同时序列化日志记录可能很有用。为更昂贵的操作提供一个更小的池也很有用,这样您就不会使机器资源过载。
这些步骤包括编写和运行应用:
- 从终端或控制台应用中,创建一个名为
~/projects/go-programming-cookbook/chapter10/pipeline
的新目录并导航到它。 - 运行以下命令:
$ go mod init github.com/PacktPublishing/Go-Programming-Cookbook-Second-Edition/chapter10/pipeline
您应该会看到一个名为go.mod
的文件,其中包含以下内容:
module github.com/PacktPublishing/Go-Programming-Cookbook-Second-Edition/chapter10/pipeline
- 从
~/projects/go-programming-cookbook-original/chapter10/pipeline
复制测试,或者以此为机会编写一些自己的代码! - 创建一个名为
worker.go
的文件,其内容如下:
package pipeline
import "context"
// Worker have one role
// that is determined when
// Work is called
type Worker struct {
in chan string
out chan string
}
// Job is a job a worker can do
type Job string
const (
// Print echo's all input to
// stdout
Print Job = "print"
// Encode base64 encodes input
Encode Job = "encode"
)
// Work is how to dispatch a worker, they are assigned
// a job here
func (w *Worker) Work(ctx context.Context, j Job) {
switch j {
case Print:
w.Print(ctx)
case Encode:
w.Encode(ctx)
default:
return
}
}
- 创建一个名为
print.go
的文件,其内容如下:
package pipeline
import (
"context"
"fmt"
)
// Print prints w.in and repalys it
// on w.out
func (w *Worker) Print(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case val := <-w.in:
fmt.Println(val)
w.out <- val
}
}
}
- 创建一个名为
encode.go
的文件,其内容如下:
package pipeline
import (
"context"
"encoding/base64"
"fmt"
)
// Encode takes plain text as int
// and returns "string => <base64 string encoding>
// as out
func (w *Worker) Encode(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case val := <-w.in:
w.out <- fmt.Sprintf("%s => %s", val,
base64.StdEncoding.EncodeToString([]byte(val)))
}
}
}
- 创建一个名为
pipeline.go
的文件,其内容如下:
package pipeline
import "context"
// NewPipeline initializes the workers and
// connects them, it returns the input of the pipeline
// and the final output
func NewPipeline(ctx context.Context, numEncoders, numPrinters
int) (chan string, chan string) {
inEncode := make(chan string, numEncoders)
inPrint := make(chan string, numPrinters)
outPrint := make(chan string, numPrinters)
for i := 0; i < numEncoders; i++ {
w := Worker{
in: inEncode,
out: inPrint,
}
go w.Work(ctx, Encode)
}
for i := 0; i < numPrinters; i++ {
w := Worker{
in: inPrint,
out: outPrint,
}
go w.Work(ctx, Print)
}
return inEncode, outPrint
}
- 创建一个名为
example
的新目录并导航到它。 - 创建一个名为
main.go
的文件,其内容如下:
package main
import (
"context"
"fmt"
"github.com/PacktPublishing/
Go-Programming-Cookbook-Second-Edition/
chapter10/pipeline"
)
func main() {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
in, out := pipeline.NewPipeline(ctx, 10, 2)
go func() {
for i := 0; i < 20; i++ {
in <- fmt.Sprint("Message", i)
}
}()
for i := 0; i < 20; i++ {
<-out
}
}
- 运行
go run main.go
。 - 您还可以运行以下命令:
$ go build $ ./example
您现在应该看到以下输出:
$ go run main.go
Message3 => TWVzc2FnZTM=
Message7 => TWVzc2FnZTc=
Message8 => TWVzc2FnZTg=
Message9 => TWVzc2FnZTk=
Message5 => TWVzc2FnZTU=
Message11 => TWVzc2FnZTEx
Message10 => TWVzc2FnZTEw
Message4 => TWVzc2FnZTQ=
Message12 => TWVzc2FnZTEy
Message6 => TWVzc2FnZTY=
Message14 => TWVzc2FnZTE0
Message13 => TWVzc2FnZTEz
Message0 => TWVzc2FnZTA=
Message15 => TWVzc2FnZTE1
Message1 => TWVzc2FnZTE=
Message17 => TWVzc2FnZTE3
Message16 => TWVzc2FnZTE2
Message19 => TWVzc2FnZTE5
Message18 => TWVzc2FnZTE4
Message2 => TWVzc2FnZTI=
go.mod
文件可能会被更新,go.sum
文件现在应该存在于顶级配方目录中。- 如果您复制或编写了自己的测试,请转到一个目录并运行
go test
。确保所有测试都通过。
main
包创建了一个由 10 个编码器和 2 个打印机组成的管道。它将 20 个字符串排入 in 通道,并在 out 通道上等待 20 个响应。如果消息到达 out 通道,则表明它们已成功通过整个管道。
NewPipeline
功能用于连接游泳池。它确保使用适当的缓冲大小创建通道,并确保某些池的输出通道连接到其他池的适当输入通道。还可以通过在每个辅助进程上使用一个 in 通道数组和一个 out 通道数组、多个命名通道或通道映射来扇出管道。这将允许在每个步骤向记录器发送消息。