Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Setup a signal context and use it. #207

Merged
merged 1 commit into from Sep 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 5 additions & 6 deletions cldevice.go
Expand Up @@ -8,6 +8,7 @@ package main
import (
"bufio"
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
Expand Down Expand Up @@ -244,8 +245,6 @@ type Device struct {
allDiffOneShares uint64
validShares uint64
invalidShares uint64

quit chan struct{}
}

// If the device order and OpenCL index are ever not the same then we can
Expand Down Expand Up @@ -389,7 +388,6 @@ func NewDevice(index int, order int, platformID cl.CL_platform_id, deviceID cl.C
deviceID: deviceID,
deviceName: getDeviceInfo(deviceID, cl.CL_DEVICE_NAME, "CL_DEVICE_NAME"),
deviceType: getDeviceInfo(deviceID, cl.CL_DEVICE_TYPE, "CL_DEVICE_TYPE"),
quit: make(chan struct{}),
newWork: make(chan *work.Work, 5),
workDone: workDone,
fanPercent: 0,
Expand Down Expand Up @@ -588,7 +586,7 @@ func NewDevice(index int, order int, platformID cl.CL_platform_id, deviceID cl.C
return d, nil
}

func (d *Device) runDevice() error {
func (d *Device) runDevice(ctx context.Context) error {
minrLog.Infof("Started DEV #%d: %s", d.index, d.deviceName)
outputData := make([]uint32, outputBufferSize)

Expand All @@ -600,11 +598,12 @@ func (d *Device) runDevice() error {
}

var status cl.CL_int
ctxDoneCh := ctx.Done()
for {
d.updateCurrentWork()
d.updateCurrentWork(ctx)

select {
case <-d.quit:
case <-ctxDoneCh:
return nil
default:
}
Expand Down
10 changes: 4 additions & 6 deletions cudevice.go
Expand Up @@ -82,8 +82,6 @@ type Device struct {
allDiffOneShares uint64
validShares uint64
invalidShares uint64

quit chan struct{}
}

func decredBlake3Hash(dimgrid, threads uint32, midstate, lastblock unsafe.Pointer, out cu.DevicePtr) {
Expand Down Expand Up @@ -199,7 +197,6 @@ func NewCuDevice(index int, order int, deviceID cu.Device,
deviceType: DeviceTypeGPU,
cuda: true,
kind: DeviceKindNVML,
quit: make(chan struct{}),
newWork: make(chan *work.Work, 5),
workDone: workDone,
fanPercent: 0,
Expand Down Expand Up @@ -294,7 +291,7 @@ func NewCuDevice(index int, order int, deviceID cu.Device,
return d, nil
}

func (d *Device) runDevice() error {
func (d *Device) runDevice(ctx context.Context) error {
// Initialize the nonces for the device such that each device in the same
// system is doing different work while also helping prevent collisions
// across multiple processes and systems working on the same template.
Expand Down Expand Up @@ -350,11 +347,12 @@ func (d *Device) runDevice() error {
nonceResultsHSlice := *(*[]uint32)(unsafe.Pointer(&nonceResultsHSliceHeader))

// Mining loop.
ctxDoneCh := ctx.Done()
for {
d.updateCurrentWork()
d.updateCurrentWork(ctx)

select {
case <-d.quit:
case <-ctxDoneCh():
return nil
default:
}
Expand Down
20 changes: 8 additions & 12 deletions device.go
Expand Up @@ -3,6 +3,7 @@
package main

import (
"context"
"crypto/rand"
"encoding/binary"
"fmt"
Expand Down Expand Up @@ -96,7 +97,7 @@ func (d *Device) initNonces() error {
return nil
}

func (d *Device) updateCurrentWork() {
func (d *Device) updateCurrentWork(ctx context.Context) {
var w *work.Work
if d.hasWork {
// If we already have work, we just need to check if there's new one
Expand All @@ -107,11 +108,10 @@ func (d *Device) updateCurrentWork() {
return
}
} else {
// If we don't have work, we block until we do. We need to watch for
// quit events too.
// If we don't have work, we block until we do.
select {
case w = <-d.newWork:
case <-d.quit:
case <-ctx.Done():
return
}
}
Expand Down Expand Up @@ -143,8 +143,8 @@ func (d *Device) updateCurrentWork() {
minrLog.Tracef("work data for work update: %x", d.work.Data)
}

func (d *Device) Run() {
err := d.runDevice()
func (d *Device) Run(ctx context.Context) {
err := d.runDevice(ctx)
if err != nil {
minrLog.Errorf("Error on device: %v", err)
}
Expand Down Expand Up @@ -354,14 +354,10 @@ func (d *Device) foundCandidate(ts, nonce0, nonce1 uint32) {
}
}

func (d *Device) Stop() {
close(d.quit)
}

func (d *Device) SetWork(w *work.Work) {
func (d *Device) SetWork(ctx context.Context, w *work.Work) {
select {
case d.newWork <- w:
case <-d.quit:
case <-ctx.Done():
}
}

Expand Down
11 changes: 2 additions & 9 deletions main.go
Expand Up @@ -4,7 +4,6 @@ import (
"net"
"net/http"
"os"
"os/signal"
"runtime"
"runtime/pprof"
"time"
Expand Down Expand Up @@ -86,15 +85,9 @@ func gominerMain() error {
go RunMonitor(m)
}

c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
go func() {
<-c
mainLog.Warn("Got Control+C, exiting...")
m.Stop()
}()
ctx := shutdownListener()

m.Run()
m.Run(ctx)

return nil
}
Expand Down
42 changes: 17 additions & 25 deletions miner.go
Expand Up @@ -3,6 +3,7 @@
package main

import (
"context"
"fmt"
"sync"
"sync/atomic"
Expand All @@ -23,7 +24,6 @@ type Miner struct {
started uint32
devices []*Device
workDone chan []byte
quit chan struct{}
needsWorkRefresh chan struct{}
wg sync.WaitGroup
pool *stratum.Stratum
Expand All @@ -32,7 +32,6 @@ type Miner struct {
func NewMiner() (*Miner, error) {
m := &Miner{
workDone: make(chan []byte, 10),
quit: make(chan struct{}),
needsWorkRefresh: make(chan struct{}),
}

Expand All @@ -59,12 +58,12 @@ func NewMiner() (*Miner, error) {
return m, nil
}

func (m *Miner) workSubmitThread() {
func (m *Miner) workSubmitThread(ctx context.Context) {
defer m.wg.Done()

for {
select {
case <-m.quit:
case <-ctx.Done():
return
case data := <-m.workDone:
// Only use that is we are not using a pool.
Expand All @@ -84,7 +83,7 @@ func (m *Miner) workSubmitThread() {

select {
case m.needsWorkRefresh <- struct{}{}:
case <-m.quit:
case <-ctx.Done():
}
}
} else {
Expand All @@ -107,15 +106,15 @@ func (m *Miner) workSubmitThread() {

select {
case m.needsWorkRefresh <- struct{}{}:
case <-m.quit:
case <-ctx.Done():
}
}
}
}
}
}

func (m *Miner) workRefreshThread() {
func (m *Miner) workRefreshThread(ctx context.Context) {
defer m.wg.Done()

t := time.NewTicker(100 * time.Millisecond)
Expand All @@ -129,7 +128,7 @@ func (m *Miner) workRefreshThread() {
minrLog.Errorf("Error in getwork: %v", err)
} else {
for _, d := range m.devices {
d.SetWork(work)
d.SetWork(ctx, work)
}
}
} else {
Expand All @@ -141,23 +140,23 @@ func (m *Miner) workRefreshThread() {
minrLog.Errorf("Error in getpoolwork: %v", err)
} else {
for _, d := range m.devices {
d.SetWork(work)
d.SetWork(ctx, work)
}
}
} else {
m.pool.Unlock()
}
}
select {
case <-m.quit:
case <-ctx.Done():
return
case <-t.C:
case <-m.needsWorkRefresh:
}
}
}

func (m *Miner) printStatsThread() {
func (m *Miner) printStatsThread(ctx context.Context) {
defer m.wg.Done()

t := time.NewTicker(time.Second * 5)
Expand Down Expand Up @@ -196,53 +195,46 @@ func (m *Miner) printStatsThread() {
}

select {
case <-m.quit:
case <-ctx.Done():
return
case <-t.C:
case <-m.needsWorkRefresh:
}
}
}

func (m *Miner) Run() {
func (m *Miner) Run(ctx context.Context) {
m.wg.Add(len(m.devices))

for _, d := range m.devices {
device := d
go func() {
device.Run()
device.Run(ctx)
device.Release()
m.wg.Done()
}()
}

m.wg.Add(1)
go m.workSubmitThread()
go m.workSubmitThread(ctx)

if cfg.Benchmark {
minrLog.Warn("Running in BENCHMARK mode! No real mining taking place!")
work := &work.Work{}
for _, d := range m.devices {
d.SetWork(work)
d.SetWork(ctx, work)
}
} else {
m.wg.Add(1)
go m.workRefreshThread()
go m.workRefreshThread(ctx)
}

m.wg.Add(1)
go m.printStatsThread()
go m.printStatsThread(ctx)

m.wg.Wait()
}

func (m *Miner) Stop() {
close(m.quit)
for _, d := range m.devices {
d.Stop()
}
}

func (m *Miner) Status() (uint64, uint64, uint64, uint64, float64) {
if cfg.Pool != "" {
valid := atomic.LoadUint64(&m.pool.ValidShares)
Expand Down
58 changes: 58 additions & 0 deletions signal.go
@@ -0,0 +1,58 @@
// Copyright (c) 2013-2016 The btcsuite developers
// Copyright (c) 2015-2016 The Decred developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.

package main

import (
"context"
"os"
"os/signal"
)

// shutdownRequestChannel is used to initiate shutdown from one of the
// subsystems using the same code paths as when an interrupt signal is received.
var shutdownRequestChannel = make(chan struct{})

// interruptSignals defines the default signals to catch in order to do a proper
// shutdown. This may be modified during init depending on the platform.
var interruptSignals = []os.Signal{os.Interrupt}

// shutdownListener listens for OS Signals such as SIGINT (Ctrl+C) and shutdown
// requests from shutdownRequestChannel. It returns a context that is canceled
// when either signal is received.
func shutdownListener() context.Context {
ctx, cancel := context.WithCancel(context.Background())
go func() {
interruptChannel := make(chan os.Signal, 1)
signal.Notify(interruptChannel, interruptSignals...)

// Listen for initial shutdown signal and cancel the returned context.
select {
case sig := <-interruptChannel:
minrLog.Infof("Received signal (%s). Shutting down...", sig)

case <-shutdownRequestChannel:
minrLog.Infof("Shutdown requested. Shutting down...")
}
cancel()

// Listen for repeated signals and display a message so the user
// knows the shutdown is in progress and the process is not
// hung.
for {
select {
case sig := <-interruptChannel:
minrLog.Infof("Received signal (%s). Already "+
"shutting down...", sig)

case <-shutdownRequestChannel:
minrLog.Info("Shutdown requested. Already " +
"shutting down...")
}
}
}()

return ctx
}