Skip to content

Commit

Permalink
[feat:#75][storage:runtime]: storage runtime dependency
Browse files Browse the repository at this point in the history
  • Loading branch information
stone1100 authored and CodingCrush committed Jul 12, 2019
1 parent c205ba3 commit 47bb340
Show file tree
Hide file tree
Showing 31 changed files with 748 additions and 349 deletions.
8 changes: 5 additions & 3 deletions Makefile
Expand Up @@ -46,12 +46,14 @@ clean-build:
rm -f bin/lind
cd web/ && make web_clean


clean: ## Clean up useless files.
$(clean-build)
clean-tmp: ## clean up tmp and test out files
find . -type f -name '*.out' -exec rm -f {} +
find . -type f -name '.DS_Store' -exec rm -f {} +
find . -type f -name '*.test' -exec rm -f {} +
find . -type f -name '*.prof' -exec rm -f {} +
find . -type s -name 'localhost:*' -exec rm -f {} +
find . -type s -name '127.0.0.1:*' -exec rm -f {} +

clean: ## Clean up useless files.
$(clean-build)
$(clean-tmp)
4 changes: 2 additions & 2 deletions broker/rpc/broker_client.go
Expand Up @@ -6,8 +6,8 @@ import (

"google.golang.org/grpc"

"github.com/eleme/lindb/rpc/pkg/broker"
"github.com/eleme/lindb/rpc/pkg/common"
"github.com/eleme/lindb/rpc/proto/broker"
"github.com/eleme/lindb/rpc/proto/common"
)

type BrokerClient interface {
Expand Down
4 changes: 2 additions & 2 deletions broker/rpc/broker_server.go
Expand Up @@ -9,8 +9,8 @@ import (

"github.com/eleme/lindb/pkg/logger"
"github.com/eleme/lindb/rpc"
"github.com/eleme/lindb/rpc/pkg/broker"
"github.com/eleme/lindb/rpc/pkg/common"
"github.com/eleme/lindb/rpc/proto/broker"
"github.com/eleme/lindb/rpc/proto/common"
)

type BrokerServer interface {
Expand Down
2 changes: 1 addition & 1 deletion broker/rpc/broker_test.go
Expand Up @@ -9,7 +9,7 @@ import (

"github.com/eleme/lindb/pkg/logger"
"github.com/eleme/lindb/rpc"
"github.com/eleme/lindb/rpc/pkg/common"
"github.com/eleme/lindb/rpc/proto/common"
)

const (
Expand Down
30 changes: 14 additions & 16 deletions broker/runtime.go
Expand Up @@ -34,23 +34,19 @@ type apiHandler struct {
databaseAPI *admin.DatabaseAPI
}

// Runtime represents broker runtime dependency
// runtime represents broker runtime dependency
type runtime struct {
state server.State

state server.State
cfgPath string
config config.Broker
ctx context.Context
cancel context.CancelFunc

// init value when runtime
repo state.Repository

srv srv

repo state.Repository
srv srv
httpServer *http.Server

ctx context.Context
cancel context.CancelFunc

log *zap.Logger
}

Expand Down Expand Up @@ -93,9 +89,7 @@ func (r *runtime) Run() error {
r.buildAPIDependency()

// start http server
go func() {
r.startHTTPServer()
}()
r.startHTTPServer()

r.state = server.Running
return nil
Expand Down Expand Up @@ -125,6 +119,7 @@ func (r *runtime) Stop() error {
}
}

r.log.Info("broker server stop complete")
r.state = server.Terminated
return nil
}
Expand All @@ -143,9 +138,12 @@ func (r *runtime) startHTTPServer() {
IdleTimeout: time.Second * 60,
Handler: router,
}
if err := r.httpServer.ListenAndServe(); err != nil {
panic(fmt.Sprintf("start http server error:%s", err))
}
go func() {
if err := r.httpServer.ListenAndServe(); err != http.ErrServerClosed {
panic(fmt.Sprintf("start http server error:%s", err))
}
r.log.Info("http server stop complete")
}()
}

// startStateRepo starts state repository
Expand Down
6 changes: 5 additions & 1 deletion broker/runtime_test.go
Expand Up @@ -2,6 +2,7 @@ package broker

import (
"testing"
"time"

"gopkg.in/check.v1"

Expand All @@ -19,7 +20,7 @@ type testBrokerRuntimeSuite struct {
mock.RepoTestSuite
}

func TestDatabaseAPI(t *testing.T) {
func TestBrokerRuntime(t *testing.T) {
check.Suite(&testBrokerRuntimeSuite{})
test = t
check.TestingT(t)
Expand Down Expand Up @@ -51,6 +52,9 @@ func (ts *testBrokerRuntimeSuite) TestBrokerRun(c *check.C) {
if err != nil {
c.Fatal(err)
}
// wait run finish
time.Sleep(500 * time.Millisecond)

c.Assert(server.Running, check.Equals, broker.State())

broker.Stop()
Expand Down
4 changes: 2 additions & 2 deletions cmd/lind/broker.go
Expand Up @@ -54,15 +54,15 @@ func serveBroker(cmd *cobra.Command, args []string) error {
// start broker server
broker := broker.NewBrokerRuntime(brokerCfgPath)
if err := broker.Run(); err != nil {
return fmt.Errorf("run broker error:%s", err)
return fmt.Errorf("run broker server error:%s", err)
}

// waiting system exit signal
<-ctx.Done()

// stop broker server
if err := broker.Stop(); err != nil {
return fmt.Errorf("stop broker error:%s", err)
return fmt.Errorf("stop broker server error:%s", err)
}

return nil
Expand Down
3 changes: 0 additions & 3 deletions cmd/lind/root.go
Expand Up @@ -5,9 +5,6 @@ import (
)

const (
// default config file location of LinDB
cfgFilePath = "/etc/lindb"

linDBText = `
__ _ ____ ____
/ / (_) ____ / __ \ / __ )
Expand Down
50 changes: 11 additions & 39 deletions cmd/lind/storage.go
Expand Up @@ -3,22 +3,10 @@ package lind
import (
"fmt"
_ "net/http/pprof" // for profiling
"os"

"github.com/eleme/lindb/config"
"github.com/eleme/lindb/pkg/logger"
"github.com/eleme/lindb/query"
"github.com/eleme/lindb/storage"
"github.com/eleme/lindb/tsdb"

"github.com/BurntSushi/toml"
"github.com/spf13/cobra"
"go.uber.org/zap"
)

const (
storageCfgName = "storage.toml"
defaultStorageCfgFile = cfgFilePath + "/" + storageCfgName
)

var (
Expand All @@ -34,7 +22,7 @@ func newStorageCmd() *cobra.Command {
Short: "The storage layer of LinDB",
}
runStorageCmd.PersistentFlags().StringVar(&storageCfgPath, "config", "",
fmt.Sprintf("storage config file path, default is %s", defaultStorageCfgFile))
fmt.Sprintf("storage config file path, default is %s", storage.DefaultStorageCfgFile))
runStorageCmd.PersistentFlags().BoolVar(&storageDebug, "debug", false,
"profiling Go programs with pprof")

Expand All @@ -61,38 +49,22 @@ var initializeStorageConfigCmd = &cobra.Command{
}

func serveStorage(cmd *cobra.Command, args []string) error {
log := logger.GetLogger()
ctx := newCtxWithSignals()

if storageCfgPath == "" {
storageCfgPath = defaultStorageCfgFile
// start storage server
storage := storage.NewStorageRuntime(storageCfgPath)
if err := storage.Run(); err != nil {
return fmt.Errorf("run storage server error:%s", err)
}
if _, err := os.Stat(storageCfgPath); err != nil {
return fmt.Errorf("config file doesn't exist, see how to initialize the config by `lind storage -h`")
}
fmt.Printf("load config file: %v successfully\n", storageCfgPath)

storageConfig := config.StorageConfig{}
if _, err := toml.DecodeFile(storageCfgPath, &storageConfig); err != nil {
return err
}
// start the repository server
storageServer := storage.New(ctx, &storageConfig)
if err := storageServer.Start(); err != nil {
log.Error("storage start failed", zap.Error(err))
return err
}
// waiting system exit signal
<-ctx.Done()

// create a new engine
engine, err := tsdb.NewEngine(storageConfig.Name, storageConfig.Path)
if err != nil {
return err
// stop storage server
if err := storage.Stop(); err != nil {
return fmt.Errorf("stop storage server error:%s", err)
}
// todo: fix this
_ = query.NewTSDBExecutor(engine, nil, nil)

<-ctx.Done()
return engine.Close()
return nil
}

// databaseCmd provides the ability to control the database of storage
Expand Down
24 changes: 12 additions & 12 deletions config/storage.go
Expand Up @@ -2,22 +2,22 @@ package config

import "github.com/eleme/lindb/pkg/state"

// StorageConfig represents a storage configuration
type StorageConfig struct {
StorageRepositoryConfig `toml:"StorageRepositoryConfig"`
Engine `toml:"engine"`
StoragePort uint16 `toml:"Port"`
// Storage represents a storage configuration
type Storage struct {
Coordinator state.Config `toml:"coordinator"`
Server Server `toml:"server"`

Engine `toml:"engine"`
}

// Server represents tcp server config
type Server struct {
Port uint16 `toml:"port"`
TTL int64 `toml:"ttl"`
}

// Engine represents an engine level configuration
type Engine struct {
Path string `toml:"path"`
Name string `toml:"name"`
}

// RepositoryConfig represents the repository config
type StorageRepositoryConfig struct {
state.Config `toml:"RepositoryConfig"`
HeartBeatTTL int64 `toml:"HeartBeatTTL"`
HeartBeatPrefix string `toml:"HeartBeatPrefix"`
}
4 changes: 4 additions & 0 deletions coordinator/discovery/constants.go
@@ -0,0 +1,4 @@
package discovery

// ActiveNodesPath represents active nodes prefix path for node register
const ActiveNodesPath = "/active/nodes"
64 changes: 0 additions & 64 deletions coordinator/discovery/register.go

This file was deleted.

0 comments on commit 47bb340

Please sign in to comment.