Skip to content

Commit

Permalink
Setup a signal context and use it.
Browse files Browse the repository at this point in the history
  • Loading branch information
dajohi committed Sep 11, 2023
1 parent 718d9ff commit f37ea01
Show file tree
Hide file tree
Showing 7 changed files with 109 additions and 58 deletions.
11 changes: 5 additions & 6 deletions cldevice.go
Expand Up @@ -8,6 +8,7 @@ package main
import (
"bufio"
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
Expand Down Expand Up @@ -244,8 +245,6 @@ type Device struct {
allDiffOneShares uint64
validShares uint64
invalidShares uint64

quit chan struct{}
}

// If the device order and OpenCL index are ever not the same then we can
Expand Down Expand Up @@ -389,7 +388,6 @@ func NewDevice(index int, order int, platformID cl.CL_platform_id, deviceID cl.C
deviceID: deviceID,
deviceName: getDeviceInfo(deviceID, cl.CL_DEVICE_NAME, "CL_DEVICE_NAME"),
deviceType: getDeviceInfo(deviceID, cl.CL_DEVICE_TYPE, "CL_DEVICE_TYPE"),
quit: make(chan struct{}),
newWork: make(chan *work.Work, 5),
workDone: workDone,
fanPercent: 0,
Expand Down Expand Up @@ -588,7 +586,7 @@ func NewDevice(index int, order int, platformID cl.CL_platform_id, deviceID cl.C
return d, nil
}

func (d *Device) runDevice() error {
func (d *Device) runDevice(ctx context.Context) error {
minrLog.Infof("Started DEV #%d: %s", d.index, d.deviceName)
outputData := make([]uint32, outputBufferSize)

Expand All @@ -600,11 +598,12 @@ func (d *Device) runDevice() error {
}

var status cl.CL_int
ctxDoneCh := ctx.Done()
for {
d.updateCurrentWork()
d.updateCurrentWork(ctx)

select {
case <-d.quit:
case <-ctxDoneCh:
return nil
default:
}
Expand Down
10 changes: 4 additions & 6 deletions cudevice.go
Expand Up @@ -82,8 +82,6 @@ type Device struct {
allDiffOneShares uint64
validShares uint64
invalidShares uint64

quit chan struct{}
}

func decredBlake3Hash(dimgrid, threads uint32, midstate, lastblock unsafe.Pointer, out cu.DevicePtr) {
Expand Down Expand Up @@ -199,7 +197,6 @@ func NewCuDevice(index int, order int, deviceID cu.Device,
deviceType: DeviceTypeGPU,
cuda: true,
kind: DeviceKindNVML,
quit: make(chan struct{}),
newWork: make(chan *work.Work, 5),
workDone: workDone,
fanPercent: 0,
Expand Down Expand Up @@ -294,7 +291,7 @@ func NewCuDevice(index int, order int, deviceID cu.Device,
return d, nil
}

func (d *Device) runDevice() error {
func (d *Device) runDevice(ctx context.Context) error {
// Initialize the nonces for the device such that each device in the same
// system is doing different work while also helping prevent collisions
// across multiple processes and systems working on the same template.
Expand Down Expand Up @@ -350,11 +347,12 @@ func (d *Device) runDevice() error {
nonceResultsHSlice := *(*[]uint32)(unsafe.Pointer(&nonceResultsHSliceHeader))

// Mining loop.
ctxDoneCh := ctx.Done()
for {
d.updateCurrentWork()
d.updateCurrentWork(ctx)

select {
case <-d.quit:
case <-ctxDoneCh():
return nil
default:
}
Expand Down
20 changes: 8 additions & 12 deletions device.go
Expand Up @@ -3,6 +3,7 @@
package main

import (
"context"
"crypto/rand"
"encoding/binary"
"fmt"
Expand Down Expand Up @@ -96,7 +97,7 @@ func (d *Device) initNonces() error {
return nil
}

func (d *Device) updateCurrentWork() {
func (d *Device) updateCurrentWork(ctx context.Context) {
var w *work.Work
if d.hasWork {
// If we already have work, we just need to check if there's new one
Expand All @@ -107,11 +108,10 @@ func (d *Device) updateCurrentWork() {
return
}
} else {
// If we don't have work, we block until we do. We need to watch for
// quit events too.
// If we don't have work, we block until we do.
select {
case w = <-d.newWork:
case <-d.quit:
case <-ctx.Done():
return
}
}
Expand Down Expand Up @@ -143,8 +143,8 @@ func (d *Device) updateCurrentWork() {
minrLog.Tracef("work data for work update: %x", d.work.Data)
}

func (d *Device) Run() {
err := d.runDevice()
func (d *Device) Run(ctx context.Context) {
err := d.runDevice(ctx)
if err != nil {
minrLog.Errorf("Error on device: %v", err)
}
Expand Down Expand Up @@ -354,14 +354,10 @@ func (d *Device) foundCandidate(ts, nonce0, nonce1 uint32) {
}
}

func (d *Device) Stop() {
close(d.quit)
}

func (d *Device) SetWork(w *work.Work) {
func (d *Device) SetWork(ctx context.Context, w *work.Work) {
select {
case d.newWork <- w:
case <-d.quit:
case <-ctx.Done():
}
}

Expand Down
11 changes: 2 additions & 9 deletions main.go
Expand Up @@ -4,7 +4,6 @@ import (
"net"
"net/http"
"os"
"os/signal"
"runtime"
"runtime/pprof"
"time"
Expand Down Expand Up @@ -86,15 +85,9 @@ func gominerMain() error {
go RunMonitor(m)
}

c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
go func() {
<-c
mainLog.Warn("Got Control+C, exiting...")
m.Stop()
}()
ctx := shutdownListener()

m.Run()
m.Run(ctx)

return nil
}
Expand Down
42 changes: 17 additions & 25 deletions miner.go
Expand Up @@ -3,6 +3,7 @@
package main

import (
"context"
"fmt"
"sync"
"sync/atomic"
Expand All @@ -23,7 +24,6 @@ type Miner struct {
started uint32
devices []*Device
workDone chan []byte
quit chan struct{}
needsWorkRefresh chan struct{}
wg sync.WaitGroup
pool *stratum.Stratum
Expand All @@ -32,7 +32,6 @@ type Miner struct {
func NewMiner() (*Miner, error) {
m := &Miner{
workDone: make(chan []byte, 10),
quit: make(chan struct{}),
needsWorkRefresh: make(chan struct{}),
}

Expand All @@ -59,12 +58,12 @@ func NewMiner() (*Miner, error) {
return m, nil
}

func (m *Miner) workSubmitThread() {
func (m *Miner) workSubmitThread(ctx context.Context) {
defer m.wg.Done()

for {
select {
case <-m.quit:
case <-ctx.Done():
return
case data := <-m.workDone:
// Only use that is we are not using a pool.
Expand All @@ -84,7 +83,7 @@ func (m *Miner) workSubmitThread() {

select {
case m.needsWorkRefresh <- struct{}{}:
case <-m.quit:
case <-ctx.Done():
}
}
} else {
Expand All @@ -107,15 +106,15 @@ func (m *Miner) workSubmitThread() {

select {
case m.needsWorkRefresh <- struct{}{}:
case <-m.quit:
case <-ctx.Done():
}
}
}
}
}
}

func (m *Miner) workRefreshThread() {
func (m *Miner) workRefreshThread(ctx context.Context) {
defer m.wg.Done()

t := time.NewTicker(100 * time.Millisecond)
Expand All @@ -129,7 +128,7 @@ func (m *Miner) workRefreshThread() {
minrLog.Errorf("Error in getwork: %v", err)
} else {
for _, d := range m.devices {
d.SetWork(work)
d.SetWork(ctx, work)
}
}
} else {
Expand All @@ -141,23 +140,23 @@ func (m *Miner) workRefreshThread() {
minrLog.Errorf("Error in getpoolwork: %v", err)
} else {
for _, d := range m.devices {
d.SetWork(work)
d.SetWork(ctx, work)
}
}
} else {
m.pool.Unlock()
}
}
select {
case <-m.quit:
case <-ctx.Done():
return
case <-t.C:
case <-m.needsWorkRefresh:
}
}
}

func (m *Miner) printStatsThread() {
func (m *Miner) printStatsThread(ctx context.Context) {
defer m.wg.Done()

t := time.NewTicker(time.Second * 5)
Expand Down Expand Up @@ -196,53 +195,46 @@ func (m *Miner) printStatsThread() {
}

select {
case <-m.quit:
case <-ctx.Done():
return
case <-t.C:
case <-m.needsWorkRefresh:
}
}
}

func (m *Miner) Run() {
func (m *Miner) Run(ctx context.Context) {
m.wg.Add(len(m.devices))

for _, d := range m.devices {
device := d
go func() {
device.Run()
device.Run(ctx)
device.Release()
m.wg.Done()
}()
}

m.wg.Add(1)
go m.workSubmitThread()
go m.workSubmitThread(ctx)

if cfg.Benchmark {
minrLog.Warn("Running in BENCHMARK mode! No real mining taking place!")
work := &work.Work{}
for _, d := range m.devices {
d.SetWork(work)
d.SetWork(ctx, work)
}
} else {
m.wg.Add(1)
go m.workRefreshThread()
go m.workRefreshThread(ctx)
}

m.wg.Add(1)
go m.printStatsThread()
go m.printStatsThread(ctx)

m.wg.Wait()
}

func (m *Miner) Stop() {
close(m.quit)
for _, d := range m.devices {
d.Stop()
}
}

func (m *Miner) Status() (uint64, uint64, uint64, uint64, float64) {
if cfg.Pool != "" {
valid := atomic.LoadUint64(&m.pool.ValidShares)
Expand Down
58 changes: 58 additions & 0 deletions signal.go
@@ -0,0 +1,58 @@
// Copyright (c) 2013-2016 The btcsuite developers
// Copyright (c) 2015-2016 The Decred developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.

package main

import (
"context"
"os"
"os/signal"
)

// shutdownRequestChannel is used to initiate shutdown from one of the
// subsystems using the same code paths as when an interrupt signal is received.
var shutdownRequestChannel = make(chan struct{})

// interruptSignals defines the default signals to catch in order to do a proper
// shutdown. This may be modified during init depending on the platform.
var interruptSignals = []os.Signal{os.Interrupt}

// shutdownListener listens for OS Signals such as SIGINT (Ctrl+C) and shutdown
// requests from shutdownRequestChannel. It returns a context that is canceled
// when either signal is received.
func shutdownListener() context.Context {
ctx, cancel := context.WithCancel(context.Background())
go func() {
interruptChannel := make(chan os.Signal, 1)
signal.Notify(interruptChannel, interruptSignals...)

// Listen for initial shutdown signal and cancel the returned context.
select {
case sig := <-interruptChannel:
minrLog.Infof("Received signal (%s). Shutting down...", sig)

case <-shutdownRequestChannel:
minrLog.Infof("Shutdown requested. Shutting down...")
}
cancel()

// Listen for repeated signals and display a message so the user
// knows the shutdown is in progress and the process is not
// hung.
for {
select {
case sig := <-interruptChannel:
minrLog.Infof("Received signal (%s). Already "+
"shutting down...", sig)

case <-shutdownRequestChannel:
minrLog.Info("Shutdown requested. Already " +
"shutting down...")
}
}
}()

return ctx
}

0 comments on commit f37ea01

Please sign in to comment.