Skip to content

Commit

Permalink
Merge pull request #327 from CovenantSQL/feature/miner_startup
Browse files Browse the repository at this point in the history
Parallelize chain synchronization in startup
  • Loading branch information
auxten committed May 13, 2019
2 parents 6af1b83 + 89b19a0 commit 67d13a6
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 11 deletions.
3 changes: 1 addition & 2 deletions sqlchain/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -792,12 +792,11 @@ func (c *Chain) processBlocks(ctx context.Context) {
}

// Start starts the main process of the sql-chain.
func (c *Chain) Start() (err error) {
func (c *Chain) Start() {
c.rt.goFunc(c.processBlocks)
c.sync()
c.rt.goFunc(c.mainCycle)
c.rt.startService(c)
return
}

// Stop stops the main process of the sql-chain.
Expand Down
5 changes: 1 addition & 4 deletions sqlchain/chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,10 +275,7 @@ func TestMultiChain(t *testing.T) {

// Start all chain instances
for _, v := range chains {
if err = v.chain.Start(); err != nil {
t.Fatalf("error occurred: %v", err)
}

v.chain.Start()
defer func(c *Chain) {
// Stop chain main process before exit
_ = c.Stop()
Expand Down
3 changes: 1 addition & 2 deletions worker/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,8 @@ func NewDatabase(cfg *DBConfig, peers *proto.Peers,
}
if db.chain, err = sqlchain.NewChain(chainCfg); err != nil {
return
} else if err = db.chain.Start(); err != nil {
return
}
db.chain.Start()

// init kayak config
kayakWalPath := filepath.Join(cfg.DataDir, KayakWalFileName)
Expand Down
20 changes: 17 additions & 3 deletions worker/dbms.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,16 +367,30 @@ func (dbms *DBMS) initDatabases(
meta *DBMSMeta, profiles map[proto.DatabaseID]*types.SQLChainProfile) (err error,
) {
currentInstance := make(map[proto.DatabaseID]bool)
wg := &sync.WaitGroup{}
errCh := make(chan error, len(profiles))

for id, profile := range profiles {
currentInstance[id] = true
var instance *types.ServiceInstance
if instance, err = dbms.buildSQLChainServiceInstance(profile); err != nil {
return
}
if err = dbms.Create(instance, false); err != nil {
return
}
wg.Add(1)
go func() {
defer wg.Done()
if err := dbms.Create(instance, false); err != nil {
log.WithFields(log.Fields{
"id": instance.DatabaseID,
}).WithError(err).Error("failed to create database instance")
errCh <- errors.Wrapf(err, "failed to create database %s", instance.DatabaseID)
}
}()
}
wg.Wait()
close(errCh)
for err := range errCh {
return err // omit any other error after this instance
}

// calculate to drop databases
Expand Down

0 comments on commit 67d13a6

Please sign in to comment.