Skip to content

Latest commit

 

History

History
368 lines (310 loc) · 11.4 KB

go-源码解读-同步sync-mutex.md

File metadata and controls

368 lines (310 loc) · 11.4 KB

Go 源码解读 同步模块 sync mutex

sync 包概览

sync 包提供基本的同步原语,如互斥锁。除了 Once 和 WaitGroup 类型之外,大多数类型都是供比较简单的多协程场景。

更高级别的同步最好通过 channel 和通信来实现。

需要注意的是,sync 包里定义的类型的值不允许被拷贝。

Mutex

// Mutex 是互斥锁。
// 互斥锁的零值是一个未加锁的 mutex。
//
// 第一次使用之后,不能复制互斥锁。
type Mutex struct {
	state int32
	sema  uint32
}

Mutex 的接口是 Locker。

// Locker 表示可以加锁和解锁的对象。
type Locker interface {
	Lock()
	Unlock()
}

下面将详细描述 Mutex 是如何实现加锁/解锁功能。

// Lock 方法给 m 加锁。
// 如果锁已经在使用中,那么调用的协程 goroutine 将会阻塞,直到 mutex 可用。
func (m *Mutex) Lock() {
	// Fast path 快速路径:获取解锁的互斥锁。
	if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
		if race.Enabled {
			race.Acquire(unsafe.Pointer(m))
		}
		return
	}
	// Slow path 慢速路径(简略操作以便快速路径可以内联)。
	m.lockSlow()
}

在Linux中,有两种方法来处理传入TCP数据段:快速路径(Fast Path)和慢速路径(Slow Path)。 使用快速路径只进行最少的处理,如处理数据段、发生ACK、存储时间戳等。 使用慢速路径可以处理乱序数据段、PAWS、socket内存管理和紧急数据等。

在 Lock 方法中,使用了两种方式来确保能够给 m 加锁(只不过一个操作快,一个操作慢而已)。

如果 Fast path 加锁成功了,那么便不会执行 Slow path 的方法。

在这里,为了更快的运行 Fast path,采用了内联的方式,也就是把逻辑直接在 Lock 方法里实现。

而 Slow path 则把实现封装到了m.lockSlow()里,这样在编译的时候会把子方法的代码加载到Lock()方法里,速度也会受到影响。

CompareAndSwapInt32

// CompareAndSwapInt32 对 int32 值执行比较和交换操作。
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)

CompareAndSwapInt32 原子操作由底层支持,所以执行速度非常快。

如果原子操作失败了,那会执行lockSlow()方法进行自旋抢锁,直到抢到锁为止。

mutex 解决竞争问题

当 go 的 build 和 run 命令使用选项-race也就是启用数据竞争检测时,race.Enabled=true,如果多个协程抢占同一个变量,便会引发竞态报警。

下面编写测试文件 mutex.go 来模拟Lock()方法的竞态检测。

package main

import (
	"fmt"
	"sync"
	"time"
)
var m sync.Mutex

func main() {
	var x int
	go func() {
		for {
			m.Lock()
			x = 1
			fmt.Println(x)
			m.Unlock()
		}
	}()

	go func() {
		for {
			m.Lock()
			x = 2
			fmt.Println(x)
			m.Unlock()
		}
	}()

	go func() {
		for {
			m.Lock()
			x = 3
			fmt.Println(x)
			m.Unlock()
		}
	}()

	time.Sleep(10 * time.Second)
}

当使用命令go run -race mutex.go开启竞态检测之后,发现并发调用不会触发Lock()方法竞争,也不会触发变量x的数据竞争。

当去掉 mutex 加锁,解锁代码后,如下代码运行,则会报 warning 错误。

package main

import (
	"time"
)

func main() {
	var x int
	go func() {
		for {
			x = 1
		}
	}()

	go func() {
		for {
			x = 2
		}
	}()

	go func() {
		for {
			x = 3
		}
	}()

	time.Sleep(10 * time.Second)
}

竞态报错如下所示:

1
==================
WARNING: DATA RACE
Write at 0x00c000136008 by goroutine 8:
  main.main.func2()
      /Users/benjamin/Documents/golangworkspace/gopath/src/learnGo/src/sdk/sync/mutex.go:23 +0x3c

Previous write at 0x00c000136008 by goroutine 7:
  main.main.func1()
      /Users/benjamin/Documents/golangworkspace/gopath/src/learnGo/src/sdk/sync/mutex.go:15 +0x3c

Goroutine 8 (running) created at:
  main.main()
      /Users/benjamin/Documents/golangworkspace/gopath/src/learnGo/src/sdk/sync/mutex.go:21 +0x9c

Goroutine 7 (running) created at:
  main.main()
      /Users/benjamin/Documents/golangworkspace/gopath/src/learnGo/src/sdk/sync/mutex.go:13 +0x7a
==================
==================
WARNING: DATA RACE
Write at 0x00c000136008 by goroutine 9:
  main.main.func3()
      /Users/benjamin/Documents/golangworkspace/gopath/src/learnGo/src/sdk/sync/mutex.go:31 +0x3c

Previous write at 0x00c000136008 by goroutine 7:
  main.main.func1()
      /Users/benjamin/Documents/golangworkspace/gopath/src/learnGo/src/sdk/sync/mutex.go:15 +0x3c

Goroutine 9 (running) created at:
  main.main()
      /Users/benjamin/Documents/golangworkspace/gopath/src/learnGo/src/sdk/sync/mutex.go:29 +0xbe

Goroutine 7 (running) created at:
  main.main()
      /Users/benjamin/Documents/golangworkspace/gopath/src/learnGo/src/sdk/sync/mutex.go:13 +0x7a
==================
2
3

从上可以看出,mutex 加锁是去除数据竞争的有效手段。

Mutex fairness 互斥公平性

在看lockSlow()方法之前,需要先了解一下 mutex 里的一些常量以及互斥的两种模式。

const ( 
	// 表示 mutex 是上锁状态
	mutexLocked = 1 << iota 
	// 表示 mutex 是唤醒状态
	mutexWoken 
	// 表示 mutex 处于饥饿模式
	mutexStarving
	// 表示 mutex.state 右移 3 位后即为等待的 goroutine 的数量。 
	mutexWaiterShift = iota
	
	// 互斥公平性。 
	// 
	// Mutex 可以有 2 种操作模式:正常模式,饥饿模式。 
	// 在正常模式,waiters 是按先进先出的顺序排队,但是一个唤醒状态的 waiters 不拥有互斥锁, 
	// 并与新到达的 goroutines 竞争所有权。 
	// 新到达的 goroutines 有一个优势 -- 他们已经在 CPU 上运行,而且可能有很多这样的 goroutine, 
	// 所以一个唤醒状态的 waiter 很有可能会输。 
	// 在这种情况下,它排在等待队列的前面。 
	// 如果 waiter 在超过 1ms 的时间内无法获取互斥,它会将互斥切换到饥饿模式。 
	// 
	// 在饥饿模式下,mutex 的所有权直接从解锁的 goroutine 传递给队列前面的 waiter。 
	// 新到达的 goroutines 不会尝试获取互斥锁,即使它看起来是解锁的,也不会尝试自旋·。 
	// 相反,他们在等待队列的尾部排队。 
	// 
	// 如果 waiter 接收到 mutex 的所有权,并看到 
	//(1)它是队列中的最后一个 waiter, 
	// 或者(2)它等待的时间不到 1ms, 
	// 它将会互斥对象切换回到正常操作模式。 
	// 
	// 正常模式具有相当好的性能,因为 goroutine 可以连续多次获取 mutex,即使有阻塞的 waiter。 
	// 饥饿模式是预防高并发系统中尾延迟(tail latency)的重要方法。 
	// 
	// starvationThresholdNs 值为 1000000 纳秒,即 1ms,表示将 mutex 切换到饥饿模式的等待时间阈值。
	starvationThresholdNs = 1e6
)

lockSlow

接下来,来看下lockSlow()的实现。

func (m *Mutex) lockSlow() {
	var waitStartTime int64
	starving := false
	awoke := false
	iter := 0
	old := m.state
	for {
		// 不要在饥饿模式下自旋,所有权交给 waiters。
		// 这样我们无论如何都无法获得 mutex。
		// old & 0101 == 0001 等于 1 说明已经加过锁,这是未处于饥饿模式,系统也支持自旋,则开始自旋。
		if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
			// 活跃的自旋是有效的。
			// 尝试设置 mutexWoken 标志以通知 Unlock 不要唤醒其他被阻塞的 goroutines。
			if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
				atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
				awoke = true
			}
			// 将当前的协程标记为唤醒状态后,执行自旋操作,计数器 +1,当前状态更新到 old。
			runtime_doSpin()
			iter++
			old = m.state
			continue
		}
		new := old
		// old & 0100 == 0,说明新到的协程是正常模式,需要排除。
		if old & mutexStarving == 0 {
			// new |= 0001
			new |= mutexLocked
		}
		// old & 0101 !=0,说明新到的协程是饥饿模式下已加锁,需要去排队。
		if old&(mutexLocked|mutexStarving) != 0 {
			new += 1 << mutexWaiterShift
		}
		
		// 当前 goroutine 将 mutex 切换到饥饿模式。
		// 但是如果 mutex 当前处于解锁状态,则不要进行切换。
		// Unlock 期望饥饿的 mutex 拥有 waiters,这在这种情形下不能实现。
		// old & 0001 !=0,说明协程已经加锁,需要切换到饥饿模式,协程解锁状态下不切换。
		if starving && old&mutexLocked != 0 {
			new |= mutexStarving
		}
		// 唤醒
		if awoke {
			// goroutine 已经从睡眠中唤醒,所以我们需要在任何情况下重置标志。
			// awoke 标志为唤醒,但是 m.state 不是唤醒,互斥状态不相同就 panic
			if new&mutexWoken == 0 {
				throw("sync: inconsistent mutex state")
			}
			// 把唤醒状态位去掉,重置标志
			new &^= mutexWoken
		}
		if atomic.CompareAndSwapInt32(&m.state, old, new) {
			// old 的第一位和第三位不是1,说明协程目前是未锁定且饥饿模式,获取锁成功。
			if old&(mutexLocked|mutexStarving) == 0 {
				// 使用 CAS 原子操作给 mutex 加锁
				break 
			}
			// 如果我们之前已经在等了,就在队伍前面排队。
			// 被唤醒的协程抢锁失败,重新放到队列首部。
			queueLifo := waitStartTime != 0
			if waitStartTime == 0 {
				waitStartTime = runtime_nanotime()
			}
			// 进入休眠状态,等待信号唤醒
			runtime_SemacquireMutex(&m.sema, queueLifo, 1)
			// 确认当前锁的状态
			starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
			old = m.state
			if old&mutexStarving != 0 {
				// 如果这个 goroutine 被唤醒并且 mutex 处于饥饿模式,
				// 那么所有权会被移交给我们,但 mutex 处于某种不一致的状态:
				// mutexLocked 未设置,并且我们仍被视为 waiter。得修复好它。
				// 饥饿模式不会出现 mutex 被锁住或唤醒状态,等待队列不能为0。
				if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
					throw("sync: inconsistent mutex state")
				}
				// 拿到锁,等待数 -1
				delta := int32(mutexLocked - 1<<mutexWaiterShift)
				if !starving || old>>mutexWaiterShift == 1 {
					// 退出饥饿模式。
					// 这个操作很关键,并考虑了等待时间。
					// 饥饿模式是如此的低效,以至于两个 goroutines 一旦将 mutex 切换到饥饿模式,
					// 就可以无限地进入锁步(lock-step)。
					// 非饥饿模式,等待者只有一个时,退出饥饿模式。
					delta -= mutexStarving
				}
				// 更新状态,高位原子计数直接添加。
				atomic.AddInt32(&m.state, delta)
				break
			}
			// awoke = true,不处于饥饿模式,新到达的协程先获得锁
			awoke = true
			iter = 0
		} else {
			// 自旋没有成功,更新 new,记录当前状态
			old = m.state
		}
	}

	if race.Enabled {
		race.Acquire(unsafe.Pointer(m))
	}
}

for 循环中runtime_doSpin()这段代码的目的是,尝试通过自旋方式获取锁资源,自旋是可以避免 goroutine 切换,但是消耗的资源更多。

//go:linkname sync_runtime_doSpin sync.runtime_doSpin
//go:nosplit
func sync_runtime_doSpin() {
	procyield(active_spin_cnt)
}

runtime_doSpin()sync_runtime_doSpin()链接,该方法会在 CPU 上执行若干次 PAUSE 指令,什么功能也没有,但是会占用 CPU 资源。