Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: simplify race and norace event loop #233

Merged
merged 2 commits into from
Feb 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions poll_default.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright 2023 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package netpoll

func (p *defaultPoll) Alloc() (operator *FDOperator) {
op := p.opcache.alloc()
op.poll = p
return op
}

func (p *defaultPoll) Free(operator *FDOperator) {
p.opcache.freeable(operator)
}

func (p *defaultPoll) appendHup(operator *FDOperator) {
p.hups = append(p.hups, operator.OnHup)
p.detach(operator)
operator.done()
}

func (p *defaultPoll) detach(operator *FDOperator) {
if err := operator.Control(PollDetach); err != nil {
logger.Printf("NETPOLL: poller detach operator failed: %v", err)
}
}

func (p *defaultPoll) onhups() {
if len(p.hups) == 0 {
return
}
hups := p.hups
p.hups = nil
go func(onhups []func(p Poll) error) {
for i := range onhups {
if onhups[i] != nil {
onhups[i](p)
}
}
}(hups)
}
48 changes: 10 additions & 38 deletions poll_default_bsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build (darwin || netbsd || freebsd || openbsd || dragonfly) && !race
//go:build darwin || netbsd || freebsd || openbsd || dragonfly
// +build darwin netbsd freebsd openbsd dragonfly
// +build !race

package netpoll

import (
"sync"
"sync/atomic"
"syscall"
"unsafe"
Expand Down Expand Up @@ -50,6 +50,7 @@ func openDefaultPoll() *defaultPoll {
type defaultPoll struct {
fd int
trigger uint32
m sync.Map // only used in go:race
opcache *operatorCache // operator cache
hups []func(p Poll) error
}
Expand All @@ -74,14 +75,15 @@ func (p *defaultPoll) Wait() error {
return err
}
for i := 0; i < n; i++ {
var fd = int(events[i].Ident)
// trigger
if events[i].Ident == 0 {
if fd == 0 {
// clean trigger
atomic.StoreUint32(&p.trigger, 0)
continue
}
var operator = *(**FDOperator)(unsafe.Pointer(&events[i].Udata))
if !operator.do() {
var operator = p.getOperator(fd, unsafe.Pointer(&events[i].Udata))
if operator == nil || !operator.do() {
continue
}

Expand Down Expand Up @@ -132,7 +134,7 @@ func (p *defaultPoll) Wait() error {
operator.done()
}
// hup conns together to avoid blocking the poll.
p.detaches()
p.onhups()
p.opcache.free()
}
}
Expand Down Expand Up @@ -160,7 +162,7 @@ func (p *defaultPoll) Trigger() error {
func (p *defaultPoll) Control(operator *FDOperator, event PollEvent) error {
var evs = make([]syscall.Kevent_t, 1)
evs[0].Ident = uint64(operator.FD)
*(**FDOperator)(unsafe.Pointer(&evs[0].Udata)) = operator
p.setOperator(unsafe.Pointer(&evs[0].Udata), operator)
switch event {
case PollReadable, PollModReadable:
operator.inuse()
Expand All @@ -169,6 +171,7 @@ func (p *defaultPoll) Control(operator *FDOperator, event PollEvent) error {
operator.inuse()
evs[0].Filter, evs[0].Flags = syscall.EVFILT_WRITE, syscall.EV_ADD|syscall.EV_ENABLE|syscall.EV_ONESHOT
case PollDetach:
p.delOperator(operator)
evs[0].Filter, evs[0].Flags = syscall.EVFILT_READ, syscall.EV_DELETE|syscall.EV_ONESHOT
case PollR2RW:
evs[0].Filter, evs[0].Flags = syscall.EVFILT_WRITE, syscall.EV_ADD|syscall.EV_ENABLE
Expand All @@ -178,34 +181,3 @@ func (p *defaultPoll) Control(operator *FDOperator, event PollEvent) error {
_, err := syscall.Kevent(p.fd, evs, nil, nil)
return err
}

func (p *defaultPoll) Alloc() (operator *FDOperator) {
op := p.opcache.alloc()
op.poll = p
return op
}

func (p *defaultPoll) Free(operator *FDOperator) {
p.opcache.freeable(operator)
}

func (p *defaultPoll) appendHup(operator *FDOperator) {
p.hups = append(p.hups, operator.OnHup)
operator.Control(PollDetach)
operator.done()
}

func (p *defaultPoll) detaches() {
if len(p.hups) == 0 {
return
}
hups := p.hups
p.hups = nil
go func(onhups []func(p Poll) error) {
for i := range onhups {
if onhups[i] != nil {
onhups[i](p)
}
}
}(hups)
}
33 changes: 33 additions & 0 deletions poll_default_bsd_norace.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2023 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build (darwin || netbsd || freebsd || openbsd || dragonfly) && !race
// +build darwin netbsd freebsd openbsd dragonfly
// +build !race

package netpoll

import "unsafe"

func (p *defaultPoll) getOperator(fd int, ptr unsafe.Pointer) *FDOperator {
return *(**FDOperator)(ptr)
}

func (p *defaultPoll) setOperator(ptr unsafe.Pointer, operator *FDOperator) {
*(**FDOperator)(ptr) = operator
}

func (p *defaultPoll) delOperator(operator *FDOperator) {

}
37 changes: 37 additions & 0 deletions poll_default_bsd_race.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright 2023 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build (darwin || netbsd || freebsd || openbsd || dragonfly) && race
// +build darwin netbsd freebsd openbsd dragonfly
// +build race

package netpoll

import "unsafe"

func (p *defaultPoll) getOperator(fd int, ptr unsafe.Pointer) *FDOperator {
tmp, _ := p.m.Load(fd)
if tmp == nil {
return nil
}
return tmp.(*FDOperator)
}

func (p *defaultPoll) setOperator(ptr unsafe.Pointer, operator *FDOperator) {
p.m.Store(operator.FD, operator)
}

func (p *defaultPoll) delOperator(operator *FDOperator) {
p.m.Delete(operator.FD)
}
47 changes: 7 additions & 40 deletions poll_default_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build !race
// +build !race

package netpoll

import (
"runtime"
"sync"
"sync/atomic"
"syscall"
"unsafe"
Expand Down Expand Up @@ -58,6 +56,7 @@ type defaultPoll struct {
wop *FDOperator // eventfd, wake epoll_wait
buf []byte // read wfd trigger msg
trigger uint32 // trigger flag
m sync.Map // only used in go:race
opcache *operatorCache // operator cache
// fns for handle events
Reset func(size, caps int)
Expand Down Expand Up @@ -111,8 +110,8 @@ func (p *defaultPoll) Wait() (err error) {

func (p *defaultPoll) handler(events []epollevent) (closed bool) {
for i := range events {
var operator = *(**FDOperator)(unsafe.Pointer(&events[i].data))
if !operator.do() {
operator := p.getOperator(0, unsafe.Pointer(&events[i].data))
if operator == nil || !operator.do() {
continue
}
// trigger or exit gracefully
Expand Down Expand Up @@ -192,7 +191,7 @@ func (p *defaultPoll) handler(events []epollevent) (closed bool) {
operator.done()
}
// hup conns together to avoid blocking the poll.
p.detaches()
p.onhups()
return false
}

Expand All @@ -216,7 +215,7 @@ func (p *defaultPoll) Trigger() error {
func (p *defaultPoll) Control(operator *FDOperator, event PollEvent) error {
var op int
var evt epollevent
*(**FDOperator)(unsafe.Pointer(&evt.data)) = operator
p.setOperator(unsafe.Pointer(&evt.data), operator)
switch event {
case PollReadable: // server accept a new connection and wait read
operator.inuse()
Expand All @@ -227,6 +226,7 @@ func (p *defaultPoll) Control(operator *FDOperator, event PollEvent) error {
case PollModReadable: // client wait read/write
op, evt.events = syscall.EPOLL_CTL_MOD, syscall.EPOLLIN|syscall.EPOLLRDHUP|syscall.EPOLLERR
case PollDetach: // deregister
p.delOperator(operator)
op, evt.events = syscall.EPOLL_CTL_DEL, syscall.EPOLLIN|syscall.EPOLLOUT|syscall.EPOLLRDHUP|syscall.EPOLLERR
case PollR2RW: // connection wait read/write
op, evt.events = syscall.EPOLL_CTL_MOD, syscall.EPOLLIN|syscall.EPOLLOUT|syscall.EPOLLRDHUP|syscall.EPOLLERR
Expand All @@ -235,36 +235,3 @@ func (p *defaultPoll) Control(operator *FDOperator, event PollEvent) error {
}
return EpollCtl(p.fd, op, operator.FD, &evt)
}

func (p *defaultPoll) Alloc() (operator *FDOperator) {
op := p.opcache.alloc()
op.poll = p
return op
}

func (p *defaultPoll) Free(operator *FDOperator) {
p.opcache.freeable(operator)
}

func (p *defaultPoll) appendHup(operator *FDOperator) {
p.hups = append(p.hups, operator.OnHup)
if err := operator.Control(PollDetach); err != nil {
logger.Printf("NETPOLL: poller detach operator failed: %v", err)
}
operator.done()
}

func (p *defaultPoll) detaches() {
if len(p.hups) == 0 {
return
}
hups := p.hups
p.hups = nil
go func(onhups []func(p Poll) error) {
for i := range onhups {
if onhups[i] != nil {
onhups[i](p)
}
}
}(hups)
}
32 changes: 32 additions & 0 deletions poll_default_linux_norace.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright 2023 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build linux && !race
// +build linux,!race

package netpoll

import "unsafe"

func (p *defaultPoll) getOperator(fd int, ptr unsafe.Pointer) *FDOperator {
return *(**FDOperator)(ptr)
}

func (p *defaultPoll) setOperator(ptr unsafe.Pointer, operator *FDOperator) {
*(**FDOperator)(ptr) = operator
}

func (p *defaultPoll) delOperator(operator *FDOperator) {

}
Loading