Skip to content

Commit

Permalink
Client: Use workers for local jobs (#128). Basic functionality.
Browse files Browse the repository at this point in the history
  • Loading branch information
indyjo committed Jan 29, 2017
1 parent 85a3625 commit e88eb3a
Show file tree
Hide file tree
Showing 4 changed files with 225 additions and 73 deletions.
80 changes: 80 additions & 0 deletions src/github.com/indyjo/bitwrk-client/activity.go
Expand Up @@ -17,6 +17,7 @@
package client

import (
"crypto/rand"
"errors"
"github.com/indyjo/bitwrk-common/bitcoin"
"github.com/indyjo/bitwrk-common/bitwrk"
Expand Down Expand Up @@ -72,6 +73,7 @@ type ActivityState struct {
}

type ActivityManager struct {
logger bitwrk.Logger
mutex *sync.Mutex
activities map[ActivityKey]Activity
mandates map[ActivityKey]*Mandate
Expand All @@ -82,6 +84,7 @@ type ActivityManager struct {
}

var activityManager = ActivityManager{
bitwrk.Root().New("ActivityManager"),
new(sync.Mutex),
make(map[ActivityKey]Activity),
make(map[ActivityKey]*Mandate),
Expand All @@ -95,6 +98,51 @@ func GetActivityManager() *ActivityManager {
return &activityManager
}

func (m *ActivityManager) NewBuy(article bitwrk.ArticleId) (*BuyActivity, error) {
now := time.Now()
result := &BuyActivity{
Trade: Trade{
condition: sync.NewCond(new(sync.Mutex)),
manager: m,
key: m.NewKey(),
started: now,
lastUpdate: now,
bidType: bitwrk.Buy,
article: article,
alive: true,
},
}
m.register(result.key, result)
return result, nil
}

func (m *ActivityManager) NewSell(worker Worker) (*SellActivity, error) {
now := time.Now()

result := &SellActivity{
Trade: Trade{
condition: sync.NewCond(new(sync.Mutex)),
manager: m,
key: m.NewKey(),
started: now,
lastUpdate: now,
bidType: bitwrk.Sell,
article: worker.GetWorkerState().Info.Article,
encResultKey: new(bitwrk.Tkey),
alive: true,
},
worker: worker,
}

// Get a random key for encrypting the result
if _, err := rand.Reader.Read(result.encResultKey[:]); err != nil {
return nil, err
}

m.register(result.key, result)
return result, nil
}

func (m *ActivityManager) GetStorage() cafs.FileStorage {
return m.storage
}
Expand Down Expand Up @@ -175,6 +223,38 @@ func (m *ActivityManager) register(key ActivityKey, activity Activity) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.activities[key] = activity

trade := activity.GetTrade()
if trade == nil {
return // Only trades are activities we care about at the moment
}

// First try to find a local match
for key2, activity2 := range m.activities {
if key2 == key {
continue // can't match activity with itself
}
// can only match with activities of same article and opposite type (buy/sell)
trade2 := activity2.GetTrade()
if trade2 == nil || trade2.article != trade.article || trade2.bidType == trade.bidType {
continue
}
// other activity could be a valid local match
matched := false
trade2.execSync(func() {
if trade2.alive && !trade2.accepted && !trade2.rejected {
trade.localMatch = trade2
trade2.localMatch = trade
matched = true
m.logger.Printf("Local match: #%v (new) - #%v (old)", key, key2)
}
})

if matched {
return // Early exit when a match was found
}
}

// Try to apply all known mandates to the new activity, until a matching mandate
// was applied successfully.
for mandateKey, mandate := range m.mandates {
Expand Down
68 changes: 46 additions & 22 deletions src/github.com/indyjo/bitwrk-client/buy.go
Expand Up @@ -31,35 +31,17 @@ import (
"mime/multipart"
"net/http"
"net/url"
"sync"
"time"
)

type BuyActivity struct {
Trade
}

func (m *ActivityManager) NewBuy(article bitwrk.ArticleId) (*BuyActivity, error) {
now := time.Now()
result := &BuyActivity{
Trade: Trade{
condition: sync.NewCond(new(sync.Mutex)),
manager: m,
key: m.NewKey(),
started: now,
lastUpdate: now,
bidType: bitwrk.Buy,
article: article,
alive: true,
},
}
m.register(result.key, result)
return result, nil
}

// Manages the complete lifecycle of a buy.
// Manages the complete lifecycle of a buy, which can either be local or remote.
// When a bool can be read from interrupt, the buy is aborted.
// On success, returns a cafs.File to the result data.
func (a *BuyActivity) PerformBuy(log bitwrk.Logger, interrupt <-chan bool, workFile cafs.File) (cafs.File, error) {
log.Printf("Buy started")
a.execSync(func() { a.workFile = workFile.Duplicate() })
defer a.execSync(func() {
a.alive = false
Expand All @@ -74,8 +56,50 @@ func (a *BuyActivity) PerformBuy(log bitwrk.Logger, interrupt <-chan bool, workF
return file, err
}

// Waits for clearance and then performs either a local or a remote buy, depending on the decision taken.
func (a *BuyActivity) doPerformBuy(log bitwrk.Logger, interrupt <-chan bool) (cafs.File, error) {
if err := a.beginTrade(log, interrupt); err != nil {
if err := a.awaitClearance(log, interrupt); err != nil {
return nil, err
}

if a.localMatch != nil {
return a.doLocalBuy(log, interrupt)
} else {
return a.doRemoteBuy(log, interrupt)
}
}

// Performs a local buy.
func (a *BuyActivity) doLocalBuy(log bitwrk.Logger, interrupt <-chan bool) (cafs.File, error) {
sell := a.localMatch
var resultFile cafs.File

// Wait for sell to either die or produce a result
if err := sell.interruptibleWaitWhile(interrupt, func() bool {
if sell.alive && sell.resultFile == nil {
return true
} else {
if sell.resultFile != nil {
resultFile = sell.resultFile.Duplicate()
}
return false
}
}); err != nil {
return nil, fmt.Errorf("Error waiting for local sell to complete: %v", err)
}

if resultFile == nil {
return nil, fmt.Errorf("Sell didn't produce a result: #%v", sell.GetKey())
} else {
// Save result file
a.execSync(func() { a.resultFile = resultFile })
return resultFile, nil
}
}

// Performs a remote buy once it has been cleared.
func (a *BuyActivity) doRemoteBuy(log bitwrk.Logger, interrupt <-chan bool) (cafs.File, error) {
if err := a.beginRemoteTrade(log, interrupt); err != nil {
return nil, err
}

Expand Down
101 changes: 70 additions & 31 deletions src/github.com/indyjo/bitwrk-client/sell.go
Expand Up @@ -17,15 +17,12 @@
package client

import (
"crypto/rand"
"fmt"
"github.com/indyjo/bitwrk-common/bitcoin"
"github.com/indyjo/bitwrk-common/bitwrk"
. "github.com/indyjo/bitwrk-common/protocol"
"github.com/indyjo/cafs"
"io"
"sync"
"time"
)

type SellActivity struct {
Expand All @@ -34,35 +31,9 @@ type SellActivity struct {
worker Worker
}

func (m *ActivityManager) NewSell(worker Worker) (*SellActivity, error) {
now := time.Now()

result := &SellActivity{
Trade: Trade{
condition: sync.NewCond(new(sync.Mutex)),
manager: m,
key: m.NewKey(),
started: now,
lastUpdate: now,
bidType: bitwrk.Sell,
article: worker.GetWorkerState().Info.Article,
encResultKey: new(bitwrk.Tkey),
alive: true,
},
worker: worker,
}

// Get a random key for encrypting the result
if _, err := rand.Reader.Read(result.encResultKey[:]); err != nil {
return nil, err
}

m.register(result.key, result)
return result, nil
}

// Manages the complete lifecycle of a sell
func (a *SellActivity) PerformSell(log bitwrk.Logger, receiveManager *ReceiveManager, interrupt <-chan bool) error {
log.Printf("Sell started")
defer log.Println("Sell finished")
err := a.doPerformSell(log, receiveManager, interrupt)
if err != nil {
Expand All @@ -72,8 +43,76 @@ func (a *SellActivity) PerformSell(log bitwrk.Logger, receiveManager *ReceiveMan
return err
}

// Waits for clearance and then performs either a local or a remote sell, depending on the decision taken.
func (a *SellActivity) doPerformSell(log bitwrk.Logger, receiveManager *ReceiveManager, interrupt <-chan bool) error {
if err := a.beginTrade(log, interrupt); err != nil {
if err := a.awaitClearance(log, interrupt); err != nil {
return err
}

if a.localMatch != nil {
return a.doLocalSell(log, interrupt)
} else {
return a.doRemoteSell(log, receiveManager, interrupt)
}
}

// Performs a local sell.
func (a *SellActivity) doLocalSell(log bitwrk.Logger, interrupt <-chan bool) error {
// Directly get the work file from the local buy
buy := a.localMatch
var workFile cafs.File
buy.interruptibleWaitWhile(interrupt, func() bool {
// Initially, the work file is not set
if buy.workFile == nil {
return buy.alive
} else {
workFile = buy.workFile.Duplicate()
return false
}
})

if workFile == nil {
return fmt.Errorf("Buy was no longer alive on start of local sell")
}

a.execSync(func() {
a.workFile = workFile
})

reader := workFile.Open()
defer reader.Close()

st := NewScopedTransport()
defer st.Close()
if r, err := a.worker.DoWork(reader, NewClient(&st.Transport)); err != nil {
return fmt.Errorf("Worker finished with error: %v", err)
} else {
info := fmt.Sprintf("Sell #%v", a.GetKey())
temp := a.manager.GetStorage().Create(info)
defer temp.Dispose()
if _, err := io.Copy(temp, r); err != nil {
temp.Close()
return fmt.Errorf("Error reading work result: %v", err)
}
if err := temp.Close(); err != nil {
return fmt.Errorf("Error closing temporary of result data: %v", err)
}

// Put result file into sell so that the buy can see it
a.execSync(func() { a.resultFile = temp.File() })

// Wait for the buy to accept it
buy.interruptibleWaitWhile(interrupt, func() bool {
return buy.alive && buy.resultFile == nil
})

return nil
}
}

// Performs a remote sell once it has been cleared.
func (a *SellActivity) doRemoteSell(log bitwrk.Logger, receiveManager *ReceiveManager, interrupt <-chan bool) error {
if err := a.beginRemoteTrade(log, interrupt); err != nil {
return err
}

Expand Down

0 comments on commit e88eb3a

Please sign in to comment.