Skip to content

Latest commit

 

History

History
912 lines (682 loc) · 29.9 KB

File metadata and controls

912 lines (682 loc) · 29.9 KB

九、Goroutine——基本特性

在上一章中,您学习了 Unix 信号处理以及添加对管道的支持和在 Go 中创建图形图像。

这一章真正重要的主题是 goroutines。Go 使用 goroutines 和通道以自己的方式编程并发应用,同时支持传统的并发技术。Go 中的所有内容都使用 goroutines 执行;当程序开始执行时,它的单个 goroutine 会自动调用main()函数,以便开始实际执行程序。

在本章中,我们将使用易于理解的代码示例介绍 goroutines 的简单部分。然而,在第 10 章、**Goroutines-高级功能中,我们将讨论与 Goroutines 和频道相关的更重要、更高级的技术;因此,在阅读下一章之前,请确保您完全理解本章。

因此,本章将向您介绍以下内容:

  • 创建 goroutine
  • 同步 goroutines
  • 关于频道和如何使用频道
  • 读写频道
  • 创建和使用管道
  • wc.go实用程序的 Go 代码从第 6 章文件输入和输出更改为在新实现中使用 goroutines
  • 进一步改进wc.go的 goroutine 版本

关于 goroutines

goroutine是可以并发执行的最小 Go 实体。注意,这里使用单词minimum非常重要,因为 goroutine 不是自治实体。goroutine 存在于 Unix 进程中的线程中。简单地说,进程可以是自治的,并且可以独立存在,而 goroutine 和线程都不能。因此,为了创建 goroutine,您需要一个至少有一个线程的进程。好的方面是 goroutine 比线程轻,线程比进程轻。Go 中的所有内容都是使用 goroutines 执行的,这非常有意义,因为 Go 是一种设计上的并发编程语言。正如您刚刚了解到的,当一个 Go 程序开始执行时,它的单个 goroutine 调用main()函数,该函数开始实际的程序执行。

您可以使用go关键字后跟函数名或匿名函数的完整定义来定义新的 goroutine。go关键字在新的 goroutine 中启动函数参数,并允许调用函数自行继续。

但是,正如您将看到的,您无法控制或假设 goroutines 的执行顺序,因为这取决于操作系统的调度程序以及操作系统的负载。

并发与并行

一个非常常见的误解是并发并行指的是同一件事,这是不正确的!并行性是同时执行多个事物,而并发性是一种结构化组件的方法,以便在可能的情况下可以独立执行组件。

只有当您同时构建东西时,您才能安全地并行执行它们:只要您的操作系统和硬件允许。Erlang 编程语言很久以前就这样做了,早在 CPU 有多个内核和计算机有大量 RAM 之前。

在一个有效的并发设计中,添加并发实体可以使整个系统运行得更快,因为更多的东西可以并行运行。因此,期望的并行性来自于问题的更好的并发表达和实现。开发人员负责在系统设计阶段考虑并发性,并从系统组件的潜在并行执行中获益。因此,开发人员不应该考虑并行性,而应该考虑将事物分解成独立的组件,在组合时解决最初的问题。

即使不能在 Unix 计算机上并行运行函数,有效的并发设计仍将改进程序的设计和可维护性。换句话说,并发性比并行性好!

同步 Go 软件包

syncGo 包包含可以帮助您同步 goroutine 的函数;sync最重要的功能是sync.Addsync.Donesync.Wait。对每个程序员来说,同步 goroutine 是一项强制性任务。

请注意,goroutine 的同步与共享变量和共享状态无关。共享变量和共享状态与要用于执行并发交互的方法有关。

一个简单的例子

在本小节中,我们将介绍一个创建两个 goroutine 的简单程序。样本程序的名称为aGoroutine.go,分三部分呈现;第一部分如下:

package main 

import ( 
   "fmt" 
   "time" 
) 

func namedFunction() { 
   time.Sleep(10000 * time.Microsecond) 
   fmt.Println("Printing from namedFunction!") 
} 

除了预期的packageimport语句外,您还可以看到名为namedFunction()的函数的实现,该函数在屏幕上打印消息之前会休眠一段时间。

aGoroutine.go的第二部分包含以下 Go 代码:

func main() { 
   fmt.Println("Chapter 09 - Goroutines.") 
   go namedFunction() 

在这里,您创建了一个执行namedFunction()函数的 goroutine。此 naive 程序的最后一部分如下所示:

   go func() { 
         fmt.Println("An anonymous function!") 
   }() 

   time.Sleep(10000 * time.Microsecond) 
   fmt.Println("Exiting...") 
} 

在这里,您创建了另一个 goroutine,该 goroutine 执行一个匿名函数,该函数包含一个fmt.Println()语句。

如您所见,以这种方式运行的 goroutine 彼此完全隔离,无法交换任何类型的数据,这并不总是理想的操作风格。

如果您忘记调用main()函数中的time.Sleep()函数,或者time.Sleep()睡眠时间很短,则main()会过早完成,两个 goroutine 没有足够的时间启动,因此无法完成各自的工作;因此,您将无法在屏幕上看到所有预期的输出!

执行aGoroutine.go将生成以下输出:

$ go run aGoroutine.go
Chapter 09 - Goroutines.
Printing from namedFunction!
Exiting... 

创建多个 goroutine

本小节将向您展示如何创建许多 goroutine,以及由于必须处理更多 goroutine 而产生的问题。节目名称为moreGoroutines.go,分三部分呈现。

moreGoroutines.go的第一部分如下:

package main 

import ( 
   "fmt" 
   "time" 
) 

程序的第二部分具有以下 Go 代码:

func main() { 
   fmt.Println("Chapter 09 - Goroutines.") 

   for i := 0; i < 10; i++ { 
         go func(x int) { 
               time.Sleep(10) 
               fmt.Printf("%d ", x) 
         }(i) 
   } 

这一次,匿名函数接受一个名为x的参数,该参数的值为i变量。使用i变量的for循环一个接一个地创建十个 goroutine。

计划的最后一部分如下:

   time.Sleep(10000) 
   fmt.Println("Exiting...") 
} 

同样,如果将较小的值作为参数添加到time.Sleep(),则在执行程序时会看到不同的结果。

执行moreGoroutines.go会产生一个奇怪的输出:

$ go run moreGoroutines.go
Chapter 09 - Goroutines.
1 7 Exiting...
2 3

然而,当你多次执行moreGoroutines.go时,最大的惊喜就来了:

$ go run moreGoroutines.go
Chapter 09 - Goroutines.
Exiting...
$ go run moreGoroutines.go
Chapter 09 - Goroutines.
3 1 0 9 2 Exiting...
4 5 6 8 7
$ go run moreGoroutines.go
Chapter 09 - Goroutines.
2 0 1 8 7 3 6 5 Exiting...
4

正如您所看到的,程序之前的所有输出都与第一个不同!因此,不仅输出不协调,而且没有足够的时间执行所有 goroutine;您无法确定 goroutines 的执行顺序。然而,尽管您无法解决后一个问题,因为 goroutines 的执行顺序取决于开发人员无法控制的各种参数,但下一小节将教您如何同步 goroutines,并给它们足够的时间来完成,而无需调用time.Sleep()

等待 goroutines 完成他们的工作

本小节将向您演示创建等待 goroutines 完成其任务的调用函数的正确方法。节目名称为waitGR.go,分四部分呈现;第一部分如下:

package main 

import ( 
   "fmt" 
   "sync" 
) 

除了没有time包和增加sync包之外,这里没有什么特别之处。

第二部分具有以下 Go 代码:

func main() { 
   fmt.Println("Waiting for Goroutines!") 

   var waitGroup sync.WaitGroup 
   waitGroup.Add(10) 

在这里,您创建了一个类型为sync.WaitGroup的新变量,它等待一组 goroutine 完成。属于该组的 goroutine 的数量由对sync.Add()函数的一个或多个调用定义。

在 Go 语句之前调用sync.Add()以防止出现竞争状况非常重要。

此外,sync.Add(10)调用告诉我们的程序,我们将等待 10 个 goroutine 完成。

该计划的第三部分如下:

   var i int64 
   for i = 0; i < 10; i++ { 

         go func(x int64) { 
               defer waitGroup.Done() 
               fmt.Printf("%d ", x) 
         }(i) 
   } 

在这里,您可以使用for循环创建所需数量的 goroutine,但也可以使用多个顺序 Go 语句。当每个 goroutine 完成其任务时,sync.Done()函数被执行:在函数定义之后立即使用defer关键字,告诉匿名函数在完成之前自动调用sync.Done()

waitGR.go的最后一部分如下:

   waitGroup.Wait() 
   fmt.Println("\nExiting...") 
} 

这里的好处是没有必要打电话给time.Sleep(),因为sync.Wait()在等我们。

这里再次需要注意的是,您不应该对 goroutine 的执行顺序进行任何假设,在该顺序中,也可以通过以下输出进行验证:

$ go run waitGR.go
Waiting for Goroutines!
9 0 5 6 7 8 2 1 3 4
Exiting...
$ go run waitGR.go
Waiting for Goroutines!
9 0 5 6 7 8 3 1 2 4
Exiting...
$ go run waitGR.go
Waiting for Goroutines!
9 5 6 7 8 1 0 2 3 4
Exiting...

如果您调用waitGroup.Add()的次数超过需要,执行waitGR.go时会出现如下错误消息:

Waiting for Goroutines!
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [semacquire]:
sync.runtime_Semacquire(0xc42000e28c)
      /usr/local/Cellar/go/1.8.3/libexec/src/runtime/sema.go:47 +0x34
sync.(*WaitGroup).Wait(0xc42000e280)
      /usr/local/Cellar/go/1.8.3/libexec/src/sync/waitgroup.go:131 +0x7a
main.main()
      /Users/mtsouk/ch/ch9/code/waitGR.go:22 +0x13c
exit status 2
9 0 1 2 6 7 8 3 4 5

这是因为当您通过调用sync.Add(1)n+1 次告诉您的程序等待 n+1 个 goroutine 时,您的程序不能只有 n 个 goroutine(或更少)!简单地说,这将使sync.Wait()无限期地等待一个或多个 goroutine 调用sync.Done(),而没有任何运气,这显然是一种死锁情况,会阻止程序完成。

创建动态数量的 goroutine

这一次,将要创建的 goroutine 的数量将作为命令行参数给出:程序的名称将是dynamicGR.go,并将分四部分显示。

dynamicGR.go的第一部分如下:

package main 

import ( 
   "fmt" 
   "os" 
   "path/filepath" 
   "strconv" 
   "sync" 
) 

dynamicGR.go的第二部分包含以下 Go 代码:

func main() { 
   if len(os.Args) != 2 { 
         fmt.Printf("usage: %s integer\n",filepath.Base(os.Args[0])) 
         os.Exit(1) 
   } 

   numGR, _ := strconv.ParseInt(os.Args[1], 10, 64) 
   fmt.Printf("Going to create %d goroutines.\n", numGR) 
   var waitGroup sync.WaitGroup 

   var i int64 
   for i = 0; i < numGR; i++ { 
         waitGroup.Add(1) 

如您所见,waitGroup.Add(1)语句是在创建新的 goroutine 之前调用的。

dynamicGR.go的 Go 代码第三部分如下:

         go func(x int64) { 
               defer waitGroup.Done() 
               fmt.Printf(" %d ", x) 
         }(i) 
   } 

在前面的部分中,将创建每个简单化的 goroutine。

计划的最后一部分如下:

   waitGroup.Wait() 
   fmt.Println("\nExiting...") 
} 

在这里,您只需告诉程序等待所有 goroutine 完成使用waitGroup.Wait()语句。

dynamicGR.go的执行需要一个整数参数,即您要创建的 goroutine 数:

$ go run dynamicGR.go 15
Going to create 15 goroutines.
 0  2  4  1  3  5  14  10  8  9  12  11  6  13  7
Exiting...
$ go run dynamicGR.go 15
Going to create 15 goroutines.
 5  3  14  4  10  6  7  11  8  9  12  2  13  1  0
Exiting...
$ go run dynamicGR.go 15
Going to create 15 goroutines.
 4  2  3  6  5  10  9  7  0  12  11  1  14  13  8
Exiting...

正如您所想象的,您想要创建的 goroutine 越多,您的输出就越多样化,因为无法控制程序 goroutine 的执行顺序。

关于频道

简单地说,通道是一种允许 goroutine 交换数据的通信机制。然而,这里存在一些规则。首先,每个通道允许交换特定的数据类型,也称为通道的元素类型,其次,为了使通道正常运行,您需要使用一些 Go 代码来接收通过通道发送的内容。

您应该使用chan关键字声明一个新频道,您可以使用close()功能关闭一个频道。此外,由于每个通道都有自己的类型,开发人员应该定义它。

最后,一个非常重要的细节:当您使用通道作为函数参数时,可以指定其方向,即它是用于写入还是读取。在我看来,如果您事先知道通道的用途,请使用此功能,因为它将使您的程序更加健壮和安全:否则,只需不定义通道功能参数的用途即可。因此,如果您声明通道函数参数将仅用于读取,并尝试对其进行写入,则会收到一条错误消息,很可能会使您免于讨厌的 bug。

尝试从写入通道读取时将收到的错误消息类似于以下内容:

# command-line-arguments
./writeChannel.go:13: invalid operation: <-c (receive from send-only type chan<- int)

向频道写信

在本小节中,您将学习如何写入通道。所呈现的程序将被称为writeChannel.go,您将看到它分为三个部分。

第一部分有预期的序言:

package main 

import ( 
   "fmt" 
   "time" 
) 

正如您所理解的,频道的使用不需要任何额外的Go包。

writeChannel.go的第二部分如下:

func writeChannel(c chan<- int, x int) { 
   fmt.Println(x) 
   c <- x 
   close(c) 
   fmt.Println(x) 
} 

尽管writeChannel()函数写入通道,但数据将丢失,因为目前没有人读取程序中的通道。

程序的最后一部分包含以下 Go 代码:

func main() { 
   c := make(chan int) 
   go writeChannel(c, 10) 
   time.Sleep(2 * time.Second) 
} 

在这里,您可以通过用于int数据的chan关键字来查看名为c的通道变量的定义。

执行writeChannel.go将创建以下输出:

 $ go run writeChannel.go
 10

这不是你期望看到的!这个不可预测的输出的原因是第二个fmt.Println(x)语句没有执行。原因很简单:c <- x语句正在阻止writeChannel()函数其余部分的执行,因为没有人在读取c通道。

从频道读

本小节将改进writeChannel.go的 Go 代码,允许您从频道读取。提交的程序将被称为readChannel.go,分为四个部分。

第一部分如下:

package main 

import ( 
   "fmt" 
   "time" 
) 

readChannel.go的第二部分有以下 Go 代码:

func writeChannel(c chan<- int, x int) { 
   fmt.Println(x) 
   c <- x 
   close(c) 
   fmt.Println(x) 
} 

再次注意,如果没有人收集写入通道的数据,则发送该数据的函数将在等待有人读取其数据时暂停。然而,在第 10 章、**Goroutines-高级功能中,您将看到一个非常漂亮的解决方案。

第三部分具有以下 Go 代码:

func main() { 
   c := make(chan int) 
   go writeChannel(c, 10) 
   time.Sleep(2 * time.Second) 
   fmt.Println("Read:", <-c) 
   time.Sleep(2 * time.Second) 

这里,fmt.Println()函数中的<-c语句用于从通道中读取单个值:同一语句可用于将通道的值存储到变量中。但是,如果不存储从通道读取的值,它将丢失。

readChannel.go的最后一部分如下:

   _, ok := <-c 
   if ok { 
         fmt.Println("Channel is open!") 
   } else { 
         fmt.Println("Channel is closed!") 
   } 
} 

在这里,您可以看到一种技术,该技术允许您确定要读取的通道是否已关闭。但是,如果通道打开,由于分配中使用了_字符,因此显示的 Go 代码将丢弃通道的读取值。

执行readChannel.go将创建以下输出:

$ go run readChannel.go
10
Read: 10
10
Channel is closed!
$ go run readChannel.go
10
10
Read: 10
Channel is closed!

加油

第 8 章、**处理和信号中,您通过h1s.go等许多示例了解了 Go 如何处理 Unix 信号。然而,现在您对 Goroutine 和 Channel 有了更多的了解,是时候对h1s.go的 Go 代码进行更多的解释了。

正如您已经知道的,h1s.go使用通道和 goroutine,现在应该很清楚,作为 goroutine 执行的匿名函数使用无限for循环从sigs通道读取。这意味着,每当有我们感兴趣的信号时,goroutine 就会从sigs频道读取并处理它。

管道

Go节目很少使用单一频道。使用多个通道的一种非常常见的技术称为管道。因此,管道是一种连接 goroutine 的方法,这样一个 goroutine 的输出在通道的帮助下成为另一个 goroutine 的输入。使用管道的好处如下:

  • 使用管道的一个好处是,程序中有一个恒定的流,因为没有人会等待一切都完成后才开始执行程序的 goroutine 和 channels
  • 此外,您使用的变量更少,因此内存空间也更少,因为您不必保存所有内容
  • 最后,管道的使用简化了程序的设计,提高了程序的可维护性

pipelines.go的代码与整数管道一起工作,将分五部分介绍;第一部分如下:

package main 

import ( 
   "fmt" 
   "os" 
   "path/filepath" 
   "strconv" 
) 

第二部分包含以下 Go 代码:

func genNumbers(min, max int64, out chan<- int64) { 

   var i int64 
   for i = min; i <= max; i++ { 
         out <- i 
   } 
   close(out) 
} 

这里,您定义了一个接受三个参数的函数:两个整数和一个输出通道。输出通道将用于写入将在另一个函数中读取的数据:这就是创建管道的方式。

该计划的第三部分如下:

func findSquares(out chan<- int64, in <-chan int64) { 
   for x := range in { 
         out <- x * x 
   } 
   close(out) 
} 

这一次,函数接受两个参数,这两个参数都是通道。然而,out是一个输出通道,而in是一个用于读取数据的输入通道。

第四部分包含另一个功能的定义:

func calcSum(in <-chan int64) { 
   var sum int64 
   sum = 0 
   for x2 := range in { 
         sum = sum + x2 
   } 
   fmt.Printf("The sum of squares is %d\n", sum) 
} 

pipelines.go的最后一个函数只接受一个参数,这是用于读取数据的通道。

pipelines.go的最后一部分是main()功能的实现:

func main() { 
   if len(os.Args) != 3 { 
         fmt.Printf("usage: %s n1 n2\n", filepath.Base(os.Args[0])) 
         os.Exit(1) 
   } 
   n1, _ := strconv.ParseInt(os.Args[1], 10, 64) 
   n2, _ := strconv.ParseInt(os.Args[2], 10, 64) 

   if n1 > n2 { 
         fmt.Printf("%d should be smaller than %d\n", n1, n2) 
         os.Exit(10) 
   } 

   naturals := make(chan int64) 
   squares := make(chan int64) 
   go genNumbers(n1, n2, naturals) 
   go findSquares(squares, naturals) 
   calcSum(squares) 
} 

这里,main()函数首先读取它的两个命令行参数,并创建必要的通道变量(naturalssquares。然后,它调用管道的函数:注意,通道的最后一个函数不是作为 goroutine 执行的。

下图显示了pipelines.go中使用的管道的图形表示,以说明该特定管道的工作方式:

pipelines.go 中使用的管道结构的图形表示

运行pipelines.go生成以下输出:

$ go run pipelines.go
usage: pipelines n1 n2
exit status 1
$ go run pipelines.go 3 2
3 should be smaller than 2
exit status 10
$ go run pipelines.go 3 20
The sum of squares is 2865
$ go run pipelines.go 1 20
The sum of squares is 2870
$ go run pipelines.go 20 20
The sum of squares is 400

wc.go 的更好版本

正如我们在第 6 章**文件输入和输出中所述,在本章中,您将学习如何创建使用 goroutines 的wc.go版本。新公用事业公司的名称将为dWC.go,分为四个部分。请注意,dWC.go的当前版本将每个命令行参数视为一个文件。

*该实用程序的第一部分如下所示:

package main 

import ( 
   "bufio" 
   "fmt" 
   "io" 
   "os" 
   "path/filepath" 
   "regexp" 
   "sync" 
) 

第二部分具有以下 Go 代码:

func count(filename string) { 
   var err error 
   var numberOfLines int = 0 
   var numberOfCharacters int = 0 
   var numberOfWords int = 0 

   f, err := os.Open(filename) 
   if err != nil { 
         fmt.Printf("%s\n", err) 
         return 
   } 
   defer f.Close() 

   r := bufio.NewReader(f) 
   for { 
         line, err := r.ReadString('\n') 

         if err == io.EOF { 
               break 
         } else if err != nil { 
               fmt.Printf("error reading file %s\n", err) 
         } 
         numberOfLines++ 
         r := regexp.MustCompile("[^\\s]+") 
         for range r.FindAllString(line, -1) { 
               numberOfWords++ 
         } 
         numberOfCharacters += len(line) 
   } 

   fmt.Printf("\t%d\t", numberOfLines) 
   fmt.Printf("%d\t", numberOfWords) 
   fmt.Printf("%d\t", numberOfCharacters) 
   fmt.Printf("%s\n", filename) 
} 

count()函数完成所有处理,不向main()函数返回任何信息:它只打印输入文件的行、字和字符,然后退出。尽管count()功能的当前实现完成了所需的工作,但它不是设计程序的正确方法,因为无法控制程序的输出。

该实用程序的第三部分如下所示:

func main() { 
   if len(os.Args) == 1 { 
         fmt.Printf("usage: %s <file1> [<file2> [... <fileN]]\n", 
               filepath.Base(os.Args[0])) 
         os.Exit(1) 
   } 

dWC.go的最后一部分如下:

   var waitGroup sync.WaitGroup 
   for _, filename := range os.Args[1:] { 
         waitGroup.Add(1) 
         go func(filename string) { 
               count(filename) 
               defer waitGroup.Done() 
         }(filename) 
   } 
   waitGroup.Wait() 
} 

如您所见,每个输入文件都由不同的 goroutine 处理。正如预期的那样,您不能对输入文件的处理顺序做出任何假设。

执行dWC.go将生成以下输出:

$ go run dWC.go /tmp/swtag.log /tmp/swtag.log doesnotExist
open doesnotExist: no such file or directory
          48    275   3571  /tmp/swtag.log
          48    275   3571  /tmp/swtag.log

在这里,您可以看到,尽管doesnotExist文件名是最后一个命令行参数,但它是dWC.go输出中的第一个参数!

尽管dWC.go使用 goroutines,但它并不聪明,因为 goroutines 运行时不相互通信,也不执行任何其他任务。此外,由于无法保证count()函数的fmt.Printf()语句不会中断,因此输出可能会被置乱。

因此,接下来的章节以及将在第 10 章、**Goroutines-高级功能中介绍的一些技术将改进dWC.go

计算总数

当前版本的dWC.go无法计算总计,将dWC.go的输出与awk进行处理即可轻松解决:

$ go run dWC.go /tmp/swtag.log /tmp/swtag.log | awk '{sum1+=$1; sum2+=$2; sum3+=$3} END {print "\t", sum1, "\t", sum2, "\t", sum3}'
       96    550   7142

不过,这还远远不够完美和优雅!

当前版本的dWC.go无法计算总数的主要原因是其 goroutines 之间没有通信方式。这可以通过渠道和管道轻松解决。新版本的dWC.go将被称为dWCtotal.go,将分五部分介绍。

dWCtotal.go的第一部分如下:

package main 

import ( 
   "bufio" 
   "fmt" 
   "io" 
   "os" 
   "path/filepath" 
   "regexp" 
) 

type File struct { 
   Filename   string 
   Lines      int 
   Words      int 
   Characters int 
   Error      error 
} 

这里定义了一个新的struct类型。新结构称为File,它有四个字段和一个用于保存错误消息的附加字段。这是管道循环多个值的正确方法。有人可能会说,File结构的更好名称应该是CountsResultsFileCountsFileResults

该计划的第二部分如下:

func process(files []string, out chan<- File) { 
   for _, filename := range files { 
         var fileToProcess File 
         fileToProcess.Filename = filename 
         fileToProcess.Lines = 0 
         fileToProcess.Words = 0 
         fileToProcess.Characters = 0 
         out <- fileToProcess 
   } 
   close(out) 
} 

process()函数的更好名称应该是beginProcess()processResults()。在整个dWCtotal.go计划中,你可以尝试自己做出改变。

dWCtotal.go的第三部分有以下 Go 代码:

func count(in <-chan File, out chan<- File) { 
   for y := range in { 
         filename := y.Filename 
         f, err := os.Open(filename) 
         if err != nil { 
               y.Error = err 
               out <- y 
               continue 
         } 
         defer f.Close() 
         r := bufio.NewReader(f) 
         for { 
               line, err := r.ReadString('\n') 
               if err == io.EOF { 
                     break 
               } else if err != nil { 
                     fmt.Printf("error reading file %s", err) 
                     y.Error = err 
                     out <- y 
                     continue 
               } 
               y.Lines = y.Lines + 1 
               r := regexp.MustCompile("[^\\s]+") 
               for range r.FindAllString(line, -1) { 
                     y.Words = y.Words + 1 
               } 
               y.Characters = y.Characters + len(line) 
         } 
         out <- y 
   } 
   close(out) 
} 

尽管count()函数仍然计算计数,但它不会打印计数。它只是使用File类型的struct变量将行数、字数、字符数以及文件名发送到另一个通道。

这里有一个非常重要的细节,这是count()函数的最后一个语句:为了正确结束管道,应该从第一个通道开始关闭所有相关通道。否则,程序执行将失败,并显示类似以下错误消息:

fatal error: all goroutines are asleep - deadlock!

但是,就关闭管道的通道而言,还应注意过早关闭通道,尤其是当管道中存在拆分时。

程序的第四部分包含以下 Go 代码:

func calculate(in <-chan File) { 
   var totalWords int = 0 
   var totalLines int = 0 
   var totalChars int = 0 
   for x := range in { 
         totalWords = totalWords + x.Words 
         totalLines = totalLines + x.Lines 
         totalChars = totalChars + x.Characters 
         if x.Error == nil { 
               fmt.Printf("\t%d\t", x.Lines) 
               fmt.Printf("%d\t", x.Words) 
               fmt.Printf("%d\t", x.Characters) 
               fmt.Printf("%s\n", x.Filename) 
         } 
   } 

   fmt.Printf("\t%d\t", totalLines) 
   fmt.Printf("%d\t", totalWords) 
   fmt.Printf("%d\ttotal\n", totalChars) 
} 

这里没有什么特别之处:calculate()函数完成了打印程序输出的肮脏工作。

dWCtotal.go的最后一部分如下:

func main() { 
   if len(os.Args) == 1 { 
         fmt.Printf("usage: %s <file1> [<file2> [... <fileN]]\n", 
               filepath.Base(os.Args[0])) 
         os.Exit(1) 
   } 

   files := make(chan File)
   values := make(chan File) 

   go process(os.Args[1:], files) 
   go count(files, values) 
   calculate(values) 
} 

由于files通道仅用于传递文件名,因此它可能是string通道而不是File通道。但是,这样代码更一致。

现在dWCtotal.go自动生成总计,即使它只需要处理一个文件:

$ go run dWCtotal.go /tmp/swtag.log
      48    275   3571  /tmp/swtag.log
      48    275   3571  total
$ go run dWCtotal.go /tmp/swtag.log /tmp/swtag.log doesNotExist
      48    275   3571  /tmp/swtag.log
      48    275   3571  /tmp/swtag.log
      96    550   7142  total

请注意,dWCtotal.godWC.go实现了相同的核心功能,即计算文件的字、字符和行数:处理信息的方式不同,因为dWCtotal.go使用的是管道,而不是孤立的 goroutine。

第 10 章、**Goroutines-高级功能将使用其他技术实现dWCtotal.go的功能。

做一些基准测试

在本节中,我们将比较第 6 章*、文件输入和输出、*与wc(1)dWC.godWCtotal.go的性能。为了使结果更准确,所有三个实用程序都将处理相对较大的文件:

$ wc /tmp/*.data
  712804 3564024 9979897 /tmp/connections.data
  285316  855948 4400685 /tmp/diskSpace.data
  712523 1425046 8916670 /tmp/memory.data
 1425500 2851000 5702000 /tmp/pageFaults.data
  285658  840622 4313833 /tmp/uptime.data
 3421801 9536640 33313085 total

因此,time(1)实用程序将测量以下命令:

$ time wc /tmp/*.data /tmp/*.data
$ time wc /tmp/uptime.data /tmp/pageFaults.data
$ time ./dWC /tmp/*.data /tmp/*.data
$ time ./dWC /tmp/uptime.data /tmp/pageFaults.data
$ time ./dWCtotal /tmp/*.data /tmp/*.data
$ time ./dWCtotal /tmp/uptime.data /tmp/pageFaults.data
$ time ./wc /tmp/uptime.data /tmp/pageFaults.data
$ time ./wc /tmp/*.data /tmp/*.data

下图显示了用于测量上述命令时,time(1)实用程序输出的实字段的图形表示:

绘制时间(1)实用程序的真实字段

最早的wc(1)实用程序是迄今为止速度最快的。另外,dWC.godWCtotal.gowc.go都快。除dWC.go外,其余两个 Go 版本的性能相同。

练习

  1. 创建一个管道,用于读取文本文件,查找给定单词的出现次数,并计算该单词在所有文件中出现的总数。
  2. 试着加快速度。
  3. 创建一个简单的Go程序,使用频道播放乒乓球。您应该使用命令行参数定义乒乓球和乒乓球的总数。

总结

在本章中,我们讨论了创建和同步 goroutine,以及创建和使用管道和通道以允许 goroutine 相互通信。此外,我们还开发了两个版本的wc(1)实用程序,它们使用 goroutines 来处理输入文件。

在继续下一章之前,请确保您完全理解本章的概念,因为在下一章中,我们将讨论与 goroutine 和通道相关的更高级功能,包括共享内存、缓冲通道、select关键字、GOMAXPROCS环境变量和信号通道。*