Skip to content

Commit

Permalink
Multi-channel TCert pool making pool's communication with TCA parallel.
Browse files Browse the repository at this point in the history
We think client_tcert_pool_mt.go can be further improved, by making
communication with TCA parallel.  The measurement with a prototype shows
better performance,(0.8KTPS -> 1.4KTPS for authorizable counter, query)
by utilizing CA-side concurrency.

This change is enabled by security.multithreading.multichannel=true.

Change-Id: I784efb19ae53eb2fccc2eeb6a9912f997cd5eadd
Signed-off-by: Akihiko Tozawa <atozawa@jp.ibm.com>
  • Loading branch information
akihikot committed Aug 26, 2016
1 parent 457635a commit 25586a5
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 23 deletions.
82 changes: 59 additions & 23 deletions core/crypto/client_tcert_pool_mt.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,20 @@ type tCertPoolEntry struct {
client *clientImpl
}

const maxFillerThreads = 5

//NewTCertPoolEntry creates a new tcert pool entry
func newTCertPoolEntry(client *clientImpl, attributes []string) *tCertPoolEntry {
tCertChannel := make(chan *TCertBlock, client.conf.getTCertBatchSize()*2)
tCertChannelFeedback := make(chan struct{}, client.conf.getTCertBatchSize()*2)
var tCertChannel chan *TCertBlock
var tCertChannelFeedback chan struct{}

if client.conf.IsMultiChannelEnabled() {
tCertChannel = make(chan *TCertBlock, client.conf.getTCertBatchSize()*maxFillerThreads)
tCertChannelFeedback = make(chan struct{}, maxFillerThreads)
} else {
tCertChannel = make(chan *TCertBlock, client.conf.getTCertBatchSize()*2)
tCertChannelFeedback = make(chan struct{}, client.conf.getTCertBatchSize()*2)
}
done := make(chan struct{}, 1)
return &tCertPoolEntry{attributes, tCertChannel, tCertChannelFeedback, done, client}
}
Expand Down Expand Up @@ -90,9 +100,11 @@ func (tCertPoolEntry *tCertPoolEntry) GetNextTCert(attributes ...string) (tCertB
tCertPoolEntry.client.Error("Failed getting a new TCert. Buffer is empty!")
}
if tCertBlock != nil {
// Send feedback to the filler
tCertPoolEntry.client.Debug("Send feedback")
tCertPoolEntry.tCertChannelFeedback <- struct{}{}
if !tCertPoolEntry.client.conf.IsMultiChannelEnabled() {
// Send feedback to the filler
tCertPoolEntry.client.Debug("Send feedback")
tCertPoolEntry.tCertChannelFeedback <- struct{}{}
}
break
}
}
Expand Down Expand Up @@ -175,14 +187,24 @@ func (tCertPoolEntry *tCertPoolEntry) filler() {
tCertPoolEntry.client.Debug("Load unused TCerts...done!")

if !stop {
ticker := time.NewTicker(1 * time.Second)
fillerThreads := 0
var ticker *time.Ticker
if tCertPoolEntry.client.conf.IsMultiChannelEnabled() {
ticker = time.NewTicker(200 * time.Millisecond)
} else {
ticker = time.NewTicker(1 * time.Second)
}
for {
select {
case <-tCertPoolEntry.done:
stop = true
tCertPoolEntry.client.Debug("Done signal.")
case <-tCertPoolEntry.tCertChannelFeedback:
tCertPoolEntry.client.Debug("Feedback received. Time to check for tcerts")
if tCertPoolEntry.client.conf.IsMultiChannelEnabled() {
fillerThreads--
} else {
tCertPoolEntry.client.Debug("Feedback received. Time to check for tcerts")
}
case <-ticker.C:
tCertPoolEntry.client.Debug("Time elapsed. Time to check for tcerts")
}
Expand All @@ -192,25 +214,39 @@ func (tCertPoolEntry *tCertPoolEntry) filler() {
break
}

if len(tCertPoolEntry.tCertChannel) < tCertPoolEntry.client.conf.getTCertBatchSize() {
tCertPoolEntry.client.Debugf("Refill TCert Pool. Current size [%d].",
len(tCertPoolEntry.tCertChannel),
)
if tCertPoolEntry.client.conf.IsMultiChannelEnabled() {

var numTCerts = cap(tCertPoolEntry.tCertChannel) - len(tCertPoolEntry.tCertChannel)
if len(tCertPoolEntry.tCertChannel) == 0 {
numTCerts = cap(tCertPoolEntry.tCertChannel) / 10
if numTCerts < 1 {
numTCerts = 1
}
}
for len(tCertPoolEntry.tCertChannel) <= tCertPoolEntry.client.conf.getTCertBatchSize()*(maxFillerThreads-fillerThreads-1) {
tCertPoolEntry.client.Debugf("Refill TCert Pool. Current size [%d].", len(tCertPoolEntry.tCertChannel))
numTCerts := tCertPoolEntry.client.conf.getTCertBatchSize()

tCertPoolEntry.client.Infof("Refilling [%d] TCerts.", numTCerts)
tCertPoolEntry.client.Infof("Refilling [%d] TCerts.", numTCerts)
fillerThreads++

err := tCertPoolEntry.client.getTCertsFromTCA(calculateAttributesHash(tCertPoolEntry.attributes), tCertPoolEntry.attributes, numTCerts)
if err != nil {
tCertPoolEntry.client.Errorf("Failed getting TCerts from the TCA: [%s]", err)
break
go func() {
err := tCertPoolEntry.client.getTCertsFromTCA(calculateAttributesHash(tCertPoolEntry.attributes), tCertPoolEntry.attributes, numTCerts)
if err != nil {
tCertPoolEntry.client.Errorf("Failed getting TCerts from the TCA: [%s]", err)
}
tCertPoolEntry.tCertChannelFeedback <- struct{}{}
}()
}
} else {
if len(tCertPoolEntry.tCertChannel) < tCertPoolEntry.client.conf.getTCertBatchSize() {
tCertPoolEntry.client.Debugf("Refill TCert Pool. Current size [%d].", len(tCertPoolEntry.tCertChannel))
var numTCerts = cap(tCertPoolEntry.tCertChannel) - len(tCertPoolEntry.tCertChannel)
if len(tCertPoolEntry.tCertChannel) == 0 {
numTCerts = cap(tCertPoolEntry.tCertChannel) / 10
if numTCerts < 1 {
numTCerts = 1
}
}
tCertPoolEntry.client.Infof("Refilling [%d] TCerts.", numTCerts)

err := tCertPoolEntry.client.getTCertsFromTCA(calculateAttributesHash(tCertPoolEntry.attributes), tCertPoolEntry.attributes, numTCerts)
if err != nil {
tCertPoolEntry.client.Errorf("Failed getting TCerts from the TCA: [%s]", err)
}
}
}
}
Expand Down
7 changes: 7 additions & 0 deletions core/crypto/crypto_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,13 @@ func TestMain(m *testing.M) {
if ret != 0 {
os.Exit(ret)
}

// Third scenario (repeat the above) now also with 'security.multithreading.multichannel' enabled.
properties["security.multithreading.multichannel"] = "true"
ret = runTestsOnScenario(m, properties, "Using multithread + multichannel enabled")
if ret != 0 {
os.Exit(ret)
}
properties["security.multithreading.enabled"] = "false"

//Fourth scenario with security level = 384
Expand Down
12 changes: 12 additions & 0 deletions core/crypto/node_conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ type configuration struct {
tlsServerName string

multiThreading bool
multiChannel bool

tCertBatchSize int
}

Expand Down Expand Up @@ -152,6 +154,12 @@ func (conf *configuration) init() error {
conf.multiThreading = viper.GetBool("security.multithreading.enabled")
}

// Set multichannel
conf.multiChannel = false
if viper.IsSet("security.multithreading.multichannel") {
conf.multiChannel = viper.GetBool("security.multithreading.multichannel")
}

return nil
}

Expand Down Expand Up @@ -255,6 +263,10 @@ func (conf *configuration) IsMultithreadingEnabled() bool {
return conf.multiThreading
}

func (conf *configuration) IsMultiChannelEnabled() bool {
return conf.multiChannel
}

func (conf *configuration) getTCAServerName() string {
return conf.tlsServerName
}
Expand Down
4 changes: 4 additions & 0 deletions peer/core.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -411,8 +411,12 @@ security:
# security to be enabled).
attributes:
enabled: false

# TCerts pool configuration. Multi-thread pool can also be configured
# by multichannel option switching concurrency in communication with TCA.
multithreading:
enabled: false
multichannel: false

# Confidentiality protocol versions supported: 1.2
confidentialityProtocolVersion: 1.2
Expand Down

0 comments on commit 25586a5

Please sign in to comment.