From 25586a519afac0dd9bcd4160e2690fecee475f7d Mon Sep 17 00:00:00 2001 From: Akihiko Tozawa Date: Tue, 16 Aug 2016 02:23:43 +0900 Subject: [PATCH] Multi-channel TCert pool making pool's communication with TCA parallel. We think client_tcert_pool_mt.go can be further improved, by making communication with TCA parallel. The measurement with a prototype shows better performance,(0.8KTPS -> 1.4KTPS for authorizable counter, query) by utilizing CA-side concurrency. This change is enabled by security.multithreading.multichannel=true. Change-Id: I784efb19ae53eb2fccc2eeb6a9912f997cd5eadd Signed-off-by: Akihiko Tozawa --- core/crypto/client_tcert_pool_mt.go | 82 +++++++++++++++++++++-------- core/crypto/crypto_test.go | 7 +++ core/crypto/node_conf.go | 12 +++++ peer/core.yaml | 4 ++ 4 files changed, 82 insertions(+), 23 deletions(-) diff --git a/core/crypto/client_tcert_pool_mt.go b/core/crypto/client_tcert_pool_mt.go index f3fb92c0613..6e60b62d49d 100644 --- a/core/crypto/client_tcert_pool_mt.go +++ b/core/crypto/client_tcert_pool_mt.go @@ -32,10 +32,20 @@ type tCertPoolEntry struct { client *clientImpl } +const maxFillerThreads = 5 + //NewTCertPoolEntry creates a new tcert pool entry func newTCertPoolEntry(client *clientImpl, attributes []string) *tCertPoolEntry { - tCertChannel := make(chan *TCertBlock, client.conf.getTCertBatchSize()*2) - tCertChannelFeedback := make(chan struct{}, client.conf.getTCertBatchSize()*2) + var tCertChannel chan *TCertBlock + var tCertChannelFeedback chan struct{} + + if client.conf.IsMultiChannelEnabled() { + tCertChannel = make(chan *TCertBlock, client.conf.getTCertBatchSize()*maxFillerThreads) + tCertChannelFeedback = make(chan struct{}, maxFillerThreads) + } else { + tCertChannel = make(chan *TCertBlock, client.conf.getTCertBatchSize()*2) + tCertChannelFeedback = make(chan struct{}, client.conf.getTCertBatchSize()*2) + } done := make(chan struct{}, 1) return &tCertPoolEntry{attributes, tCertChannel, tCertChannelFeedback, done, client} } @@ -90,9 +100,11 @@ func (tCertPoolEntry *tCertPoolEntry) GetNextTCert(attributes ...string) (tCertB tCertPoolEntry.client.Error("Failed getting a new TCert. Buffer is empty!") } if tCertBlock != nil { - // Send feedback to the filler - tCertPoolEntry.client.Debug("Send feedback") - tCertPoolEntry.tCertChannelFeedback <- struct{}{} + if !tCertPoolEntry.client.conf.IsMultiChannelEnabled() { + // Send feedback to the filler + tCertPoolEntry.client.Debug("Send feedback") + tCertPoolEntry.tCertChannelFeedback <- struct{}{} + } break } } @@ -175,14 +187,24 @@ func (tCertPoolEntry *tCertPoolEntry) filler() { tCertPoolEntry.client.Debug("Load unused TCerts...done!") if !stop { - ticker := time.NewTicker(1 * time.Second) + fillerThreads := 0 + var ticker *time.Ticker + if tCertPoolEntry.client.conf.IsMultiChannelEnabled() { + ticker = time.NewTicker(200 * time.Millisecond) + } else { + ticker = time.NewTicker(1 * time.Second) + } for { select { case <-tCertPoolEntry.done: stop = true tCertPoolEntry.client.Debug("Done signal.") case <-tCertPoolEntry.tCertChannelFeedback: - tCertPoolEntry.client.Debug("Feedback received. Time to check for tcerts") + if tCertPoolEntry.client.conf.IsMultiChannelEnabled() { + fillerThreads-- + } else { + tCertPoolEntry.client.Debug("Feedback received. Time to check for tcerts") + } case <-ticker.C: tCertPoolEntry.client.Debug("Time elapsed. Time to check for tcerts") } @@ -192,25 +214,39 @@ func (tCertPoolEntry *tCertPoolEntry) filler() { break } - if len(tCertPoolEntry.tCertChannel) < tCertPoolEntry.client.conf.getTCertBatchSize() { - tCertPoolEntry.client.Debugf("Refill TCert Pool. Current size [%d].", - len(tCertPoolEntry.tCertChannel), - ) + if tCertPoolEntry.client.conf.IsMultiChannelEnabled() { - var numTCerts = cap(tCertPoolEntry.tCertChannel) - len(tCertPoolEntry.tCertChannel) - if len(tCertPoolEntry.tCertChannel) == 0 { - numTCerts = cap(tCertPoolEntry.tCertChannel) / 10 - if numTCerts < 1 { - numTCerts = 1 - } - } + for len(tCertPoolEntry.tCertChannel) <= tCertPoolEntry.client.conf.getTCertBatchSize()*(maxFillerThreads-fillerThreads-1) { + tCertPoolEntry.client.Debugf("Refill TCert Pool. Current size [%d].", len(tCertPoolEntry.tCertChannel)) + numTCerts := tCertPoolEntry.client.conf.getTCertBatchSize() - tCertPoolEntry.client.Infof("Refilling [%d] TCerts.", numTCerts) + tCertPoolEntry.client.Infof("Refilling [%d] TCerts.", numTCerts) + fillerThreads++ - err := tCertPoolEntry.client.getTCertsFromTCA(calculateAttributesHash(tCertPoolEntry.attributes), tCertPoolEntry.attributes, numTCerts) - if err != nil { - tCertPoolEntry.client.Errorf("Failed getting TCerts from the TCA: [%s]", err) - break + go func() { + err := tCertPoolEntry.client.getTCertsFromTCA(calculateAttributesHash(tCertPoolEntry.attributes), tCertPoolEntry.attributes, numTCerts) + if err != nil { + tCertPoolEntry.client.Errorf("Failed getting TCerts from the TCA: [%s]", err) + } + tCertPoolEntry.tCertChannelFeedback <- struct{}{} + }() + } + } else { + if len(tCertPoolEntry.tCertChannel) < tCertPoolEntry.client.conf.getTCertBatchSize() { + tCertPoolEntry.client.Debugf("Refill TCert Pool. Current size [%d].", len(tCertPoolEntry.tCertChannel)) + var numTCerts = cap(tCertPoolEntry.tCertChannel) - len(tCertPoolEntry.tCertChannel) + if len(tCertPoolEntry.tCertChannel) == 0 { + numTCerts = cap(tCertPoolEntry.tCertChannel) / 10 + if numTCerts < 1 { + numTCerts = 1 + } + } + tCertPoolEntry.client.Infof("Refilling [%d] TCerts.", numTCerts) + + err := tCertPoolEntry.client.getTCertsFromTCA(calculateAttributesHash(tCertPoolEntry.attributes), tCertPoolEntry.attributes, numTCerts) + if err != nil { + tCertPoolEntry.client.Errorf("Failed getting TCerts from the TCA: [%s]", err) + } } } } diff --git a/core/crypto/crypto_test.go b/core/crypto/crypto_test.go index 4b83fd143d5..ec2efe6eb1d 100644 --- a/core/crypto/crypto_test.go +++ b/core/crypto/crypto_test.go @@ -88,6 +88,13 @@ func TestMain(m *testing.M) { if ret != 0 { os.Exit(ret) } + + // Third scenario (repeat the above) now also with 'security.multithreading.multichannel' enabled. + properties["security.multithreading.multichannel"] = "true" + ret = runTestsOnScenario(m, properties, "Using multithread + multichannel enabled") + if ret != 0 { + os.Exit(ret) + } properties["security.multithreading.enabled"] = "false" //Fourth scenario with security level = 384 diff --git a/core/crypto/node_conf.go b/core/crypto/node_conf.go index ccb5dca5e95..a9382282ef0 100644 --- a/core/crypto/node_conf.go +++ b/core/crypto/node_conf.go @@ -62,6 +62,8 @@ type configuration struct { tlsServerName string multiThreading bool + multiChannel bool + tCertBatchSize int } @@ -152,6 +154,12 @@ func (conf *configuration) init() error { conf.multiThreading = viper.GetBool("security.multithreading.enabled") } + // Set multichannel + conf.multiChannel = false + if viper.IsSet("security.multithreading.multichannel") { + conf.multiChannel = viper.GetBool("security.multithreading.multichannel") + } + return nil } @@ -255,6 +263,10 @@ func (conf *configuration) IsMultithreadingEnabled() bool { return conf.multiThreading } +func (conf *configuration) IsMultiChannelEnabled() bool { + return conf.multiChannel +} + func (conf *configuration) getTCAServerName() string { return conf.tlsServerName } diff --git a/peer/core.yaml b/peer/core.yaml index 9dcac2814e1..4b6de70badb 100644 --- a/peer/core.yaml +++ b/peer/core.yaml @@ -411,8 +411,12 @@ security: # security to be enabled). attributes: enabled: false + + # TCerts pool configuration. Multi-thread pool can also be configured + # by multichannel option switching concurrency in communication with TCA. multithreading: enabled: false + multichannel: false # Confidentiality protocol versions supported: 1.2 confidentialityProtocolVersion: 1.2