Skip to content
This repository was archived by the owner on Feb 21, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3070,9 +3070,9 @@ func (api *API) Directive(ctx context.Context, d *dax.Directive) error {
}

// DirectiveApplied returns true if the computer's current Directive has been
// applied and is ready to be queried. This it temporary (primarily for tests)
// and needs to be refactored as we improve the logic around mds-to-computer
// communication.
// applied and is ready to be queried. This is temporary (primarily for tests)
// and needs to be refactored as we improve the logic around
// controller-to-computer communication.
func (api *API) DirectiveApplied(ctx context.Context) (bool, error) {
return api.holder.DirectiveApplied(), nil
}
Expand All @@ -3085,7 +3085,7 @@ func (api *API) SnapshotShardData(ctx context.Context, req *dax.SnapshotShardDat
}
// TODO(jaffee) confirm this node is actually responsible for the given
// shard? Not sure we need to given that this request comes from
// MDS, but might be a belt&suspenders situation.
// the Controller, but might be a belt&suspenders situation.

qtid := req.TableKey.QualifiedTableID()

Expand Down
12 changes: 6 additions & 6 deletions ctl/dax.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@ func BuildDAXFlags(cmd *cobra.Command, srv *server.Command) {
flags.BoolVar(&srv.Config.Verbose, "verbose", srv.Config.Verbose, "Enable verbose logging")
flags.StringVar(&srv.Config.LogPath, "log-path", srv.Config.LogPath, "Log path")

// MDS
flags.BoolVar(&srv.Config.MDS.Run, "mds.run", srv.Config.MDS.Run, "Run the MDS service in process.")
flags.DurationVar(&srv.Config.MDS.Config.RegistrationBatchTimeout, "mds.config.registration-batch-timeout", srv.Config.MDS.Config.RegistrationBatchTimeout, "Timeout for node registration batches.")
flags.StringVar(&srv.Config.MDS.Config.DataDir, "mds.config.data-dir", srv.Config.MDS.Config.DataDir, "MDS directory to use in process.")
flags.DurationVar(&srv.Config.MDS.Config.SnappingTurtleTimeout, "mds.config.snapping-turtle-timeout", srv.Config.MDS.Config.SnappingTurtleTimeout, "Period for running automatic snapshotting routine.")
// Controller
flags.BoolVar(&srv.Config.Controller.Run, "controller.run", srv.Config.Controller.Run, "Run the Controller service in process.")
flags.DurationVar(&srv.Config.Controller.Config.RegistrationBatchTimeout, "controller.config.registration-batch-timeout", srv.Config.Controller.Config.RegistrationBatchTimeout, "Timeout for node registration batches.")
flags.StringVar(&srv.Config.Controller.Config.DataDir, "controller.config.data-dir", srv.Config.Controller.Config.DataDir, "Controller directory to use in process.")
flags.DurationVar(&srv.Config.Controller.Config.SnappingTurtleTimeout, "controller.config.snapping-turtle-timeout", srv.Config.Controller.Config.SnappingTurtleTimeout, "Period for running automatic snapshotting routine.")

// Queryer
flags.BoolVar(&srv.Config.Queryer.Run, "queryer.run", srv.Config.Queryer.Run, "Run the Queryer service in process.")
flags.StringVar(&srv.Config.Queryer.Config.MDSAddress, "queryer.config.mds-address", srv.Config.Queryer.Config.MDSAddress, "Address of remote MDS process.")
flags.StringVar(&srv.Config.Queryer.Config.ControllerAddress, "queryer.config.controller-address", srv.Config.Queryer.Config.ControllerAddress, "Address of remote Controller process.")

// Computer
flags.BoolVar(&srv.Config.Computer.Run, "computer.run", srv.Config.Computer.Run, "Run the Computer service in process.")
Expand Down
4 changes: 2 additions & 2 deletions ctl/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func serverFlagSet(srv *server.Config, prefix string) *pflag.FlagSet {

flags := pflag.NewFlagSet("featurebase", pflag.ExitOnError)
flags.StringVar(&srv.Name, pre("name"), srv.Name, "Name of the node in the cluster.")
flags.StringVar(&srv.MDSAddress, pre("mds-address"), srv.MDSAddress, "MDS service to register with.")
flags.StringVar(&srv.ControllerAddress, pre("controller-address"), srv.ControllerAddress, "Controller service to register with.")
flags.StringVar(&srv.WriteloggerDir, pre("writelogger-dir"), srv.WriteloggerDir, "Writelogger directory to read/write append logs.")
flags.StringVar(&srv.SnapshotterDir, pre("snapshotter-dir"), srv.SnapshotterDir, "Snapshotter directory to read/write snapshots.")
flags.StringVarP(&srv.DataDir, pre("data-dir"), short("d"), srv.DataDir, "Directory to store FeatureBase data files.")
Expand Down Expand Up @@ -112,7 +112,7 @@ func serverFlagSet(srv *server.Config, prefix string) *pflag.FlagSet {

flags.BoolVar(&srv.SQL.EndpointEnabled, pre("sql.endpoint-enabled"), srv.SQL.EndpointEnabled, "Enable FeatureBase SQL /sql endpoint (default false)")

flags.DurationVar(&srv.CheckInInterval, pre("check-in-interval"), srv.CheckInInterval, "Interval between check-ins to MDS")
flags.DurationVar(&srv.CheckInInterval, pre("check-in-interval"), srv.CheckInInterval, "Interval between check-ins to the Controller")

// Future flags.
flags.BoolVar(&srv.Future.Rename, pre("future.rename"), false, "Present application name as FeatureBase. Defaults to false, will default to true in an upcoming release.")
Expand Down
4 changes: 2 additions & 2 deletions dax/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ testv:

test-integration:
mkdir -p ../coverage-from-docker
$(GO) test ./test/dax -count 1 -run TestDAXIntegration/$(RUN)
$(GO) test ./test/dax -count 1 -timeout 20m -run TestDAXIntegration/$(RUN)

testv-integration:
$(GO) test -v ./test/dax -count 1 -run TestDAXIntegration/$(RUN)
$(GO) test -v ./test/dax -count 1 -timeout 20m -run TestDAXIntegration/$(RUN)


############################### AWS STUFF ###############################
Expand Down
2 changes: 1 addition & 1 deletion dax/computer/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
)

// Registrar represents the methods which Computer uses to register itself with
// MDS.
// the Controller.
type Registrar interface {
RegisterNode(ctx context.Context, node *dax.Node) error
CheckInNode(ctx context.Context, node *dax.Node) error
Expand Down
16 changes: 8 additions & 8 deletions dax/computer/service/computer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
featurebase "github.com/featurebasedb/featurebase/v3"
"github.com/featurebasedb/featurebase/v3/dax"
"github.com/featurebasedb/featurebase/v3/dax/computer"
mdsclient "github.com/featurebasedb/featurebase/v3/dax/mds/client"
controllerclient "github.com/featurebasedb/featurebase/v3/dax/controller/client"
"github.com/featurebasedb/featurebase/v3/dax/snapshotter"
"github.com/featurebasedb/featurebase/v3/dax/writelogger"
"github.com/featurebasedb/featurebase/v3/errors"
Expand Down Expand Up @@ -54,11 +54,11 @@ func (c *computerService) Start() error {
c.computer = cmd
}

if c.cfg.ComputerConfig.MDSAddress != "" {
mdsAddr := dax.Address(c.cfg.ComputerConfig.MDSAddress)
// Set mds (registrar) on computer.
if err := c.SetMDS(mdsAddr); err != nil {
return errors.Wrapf(err, "setting mds service on computer: %s", c.cfg.Name)
if c.cfg.ComputerConfig.ControllerAddress != "" {
controllerAddr := dax.Address(c.cfg.ComputerConfig.ControllerAddress)
// Set Controller (registrar) on computer.
if err := c.SetController(controllerAddr); err != nil {
return errors.Wrapf(err, "setting controller service on computer: %s", c.cfg.Name)
}
}
}
Expand Down Expand Up @@ -103,8 +103,8 @@ func (c *computerService) HTTPHandler() http.Handler {
return c.computer.HTTPHandler()
}

func (c *computerService) SetMDS(addr dax.Address) error {
c.computer.Registrar = mdsclient.New(addr, c.logger)
func (c *computerService) SetController(addr dax.Address) error {
c.computer.Registrar = controllerclient.New(addr, c.logger)
return nil
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Package balancer is an implementation of the controller's Balancer interface.
// Package balancer is an implementation of the controller.Balancer interface.
package balancer

import (
Expand All @@ -9,16 +9,16 @@ import (
"time"

"github.com/featurebasedb/featurebase/v3/dax"
"github.com/featurebasedb/featurebase/v3/dax/mds/controller"
"github.com/featurebasedb/featurebase/v3/dax/mds/schemar"
"github.com/featurebasedb/featurebase/v3/dax/controller"
"github.com/featurebasedb/featurebase/v3/dax/controller/schemar"
"github.com/featurebasedb/featurebase/v3/errors"
"github.com/featurebasedb/featurebase/v3/logger"
)

// Ensure type implements interface.
var _ controller.Balancer = (*Balancer)(nil)

// Balancer is an implementation of the controller.Balancer interface which
// Balancer is an implementation of the balancer.Balancer interface which
// isolates workers and jobs by database. It helps manage the relationships
// between workers and jobs. The logic it uses to balance jobs across workers is
// very simple; it bases everything off the number of workers and number of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import (

"github.com/featurebasedb/featurebase/v3/dax"
daxbolt "github.com/featurebasedb/featurebase/v3/dax/boltdb"
"github.com/featurebasedb/featurebase/v3/dax/mds/controller"
"github.com/featurebasedb/featurebase/v3/dax/mds/controller/balancer/boltdb"
schemardb "github.com/featurebasedb/featurebase/v3/dax/mds/schemar/boltdb"
"github.com/featurebasedb/featurebase/v3/dax/controller"
"github.com/featurebasedb/featurebase/v3/dax/controller/balancer/boltdb"
schemardb "github.com/featurebasedb/featurebase/v3/dax/controller/schemar/boltdb"
daxtest "github.com/featurebasedb/featurebase/v3/dax/test"
testbolt "github.com/featurebasedb/featurebase/v3/dax/test/boltdb"
"github.com/featurebasedb/featurebase/v3/logger"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,8 @@ import (

"github.com/featurebasedb/featurebase/v3/dax"
"github.com/featurebasedb/featurebase/v3/dax/boltdb"
"github.com/featurebasedb/featurebase/v3/dax/mds/controller"
"github.com/featurebasedb/featurebase/v3/dax/mds/controller/balancer"
"github.com/featurebasedb/featurebase/v3/dax/mds/schemar"
balancer "github.com/featurebasedb/featurebase/v3/dax/controller/balancer"
"github.com/featurebasedb/featurebase/v3/dax/controller/schemar"
"github.com/featurebasedb/featurebase/v3/errors"
"github.com/featurebasedb/featurebase/v3/logger"
)
Expand All @@ -27,7 +26,7 @@ var BalancerBuckets []boltdb.Bucket = []boltdb.Bucket{
}

// NewBalancer returns a new instance of controller.Balancer.
func NewBalancer(db *boltdb.DB, schemar schemar.Schemar, logger logger.Logger) controller.Balancer {
func NewBalancer(db *boltdb.DB, schemar schemar.Schemar, logger logger.Logger) *balancer.Balancer {
fjs := newFreeJobService(db)
wjs := newWorkerJobService(db, logger)
fws := newFreeWorkerService(db)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (

"github.com/featurebasedb/featurebase/v3/dax"
"github.com/featurebasedb/featurebase/v3/dax/boltdb"
"github.com/featurebasedb/featurebase/v3/dax/mds/controller"
"github.com/featurebasedb/featurebase/v3/dax/controller"
"github.com/featurebasedb/featurebase/v3/errors"
"github.com/featurebasedb/featurebase/v3/logger"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"testing"

"github.com/featurebasedb/featurebase/v3/dax"
"github.com/featurebasedb/featurebase/v3/dax/mds/controller/balancer/boltdb"
"github.com/featurebasedb/featurebase/v3/dax/controller/balancer/boltdb"
testbolt "github.com/featurebasedb/featurebase/v3/dax/test/boltdb"
"github.com/featurebasedb/featurebase/v3/errors"
"github.com/featurebasedb/featurebase/v3/logger"
Expand Down
38 changes: 19 additions & 19 deletions dax/mds/client/client.go → dax/controller/client/client.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Package client is an HTTP client for MDS.
// Package client is an HTTP client for Controller.
package client

import (
Expand All @@ -11,7 +11,7 @@ import (

"github.com/featurebasedb/featurebase/v3/dax"
"github.com/featurebasedb/featurebase/v3/dax/computer"
mdshttp "github.com/featurebasedb/featurebase/v3/dax/mds/http"
controllerhttp "github.com/featurebasedb/featurebase/v3/dax/controller/http"
"github.com/featurebasedb/featurebase/v3/errors"
"github.com/featurebasedb/featurebase/v3/logger"
)
Expand All @@ -24,8 +24,8 @@ const (
var _ computer.Registrar = (*Client)(nil)
var _ dax.Schemar = (*Client)(nil)

// Client is an HTTP client that operates on the MDS endpoints exposed by the
// main MDS service.
// Client is an HTTP client that operates on the Controller endpoints exposed by
// the main Controller service.
type Client struct {
address dax.Address
logger logger.Logger
Expand Down Expand Up @@ -137,7 +137,7 @@ func (c *Client) DatabaseByID(ctx context.Context, qdbid dax.QualifiedDatabaseID
func (c *Client) DatabaseByName(ctx context.Context, orgID dax.OrganizationID, name dax.DatabaseName) (*dax.QualifiedDatabase, error) {
url := fmt.Sprintf("%s/database-by-name", c.address.WithScheme(defaultScheme))

req := &mdshttp.DatabaseByNameRequest{
req := &controllerhttp.DatabaseByNameRequest{
OrganizationID: orgID,
Name: name,
}
Expand Down Expand Up @@ -173,7 +173,7 @@ func (c *Client) DatabaseByName(ctx context.Context, orgID dax.OrganizationID, n
func (c *Client) Databases(ctx context.Context, orgID dax.OrganizationID, ids ...dax.DatabaseID) ([]*dax.QualifiedDatabase, error) {
url := fmt.Sprintf("%s/databases", c.address.WithScheme(defaultScheme))

req := &mdshttp.DatabasesRequest{
req := &controllerhttp.DatabasesRequest{
OrganizationID: orgID,
DatabaseIDs: ids,
}
Expand Down Expand Up @@ -291,7 +291,7 @@ func (c *Client) TableID(ctx context.Context, qdbid dax.QualifiedDatabaseID, nam
func (c *Client) Tables(ctx context.Context, qdbid dax.QualifiedDatabaseID, ids ...dax.TableID) ([]*dax.QualifiedTable, error) {
url := fmt.Sprintf("%s/tables", c.address.WithScheme(defaultScheme))

req := mdshttp.TablesRequest{
req := controllerhttp.TablesRequest{
OrganizationID: qdbid.OrganizationID,
DatabaseID: qdbid.DatabaseID,
TableIDs: ids,
Expand Down Expand Up @@ -377,7 +377,7 @@ func (c *Client) DropTable(ctx context.Context, qtid dax.QualifiedTableID) error
func (c *Client) CreateField(ctx context.Context, qtid dax.QualifiedTableID, fld *dax.Field) error {
url := fmt.Sprintf("%s/create-field", c.address.WithScheme(defaultScheme))

req := mdshttp.CreateFieldRequest{
req := controllerhttp.CreateFieldRequest{
TableKey: qtid.Key(),
Field: fld,
}
Expand Down Expand Up @@ -407,7 +407,7 @@ func (c *Client) DropField(ctx context.Context, qtid dax.QualifiedTableID, fldNa
url := fmt.Sprintf("%s/drop-field", c.address.WithScheme(defaultScheme))

// Encode the request.
req := mdshttp.DropFieldRequest{
req := controllerhttp.DropFieldRequest{
Table: qtid,
Field: fldName,
}
Expand Down Expand Up @@ -438,7 +438,7 @@ func (c *Client) IngestShard(ctx context.Context, qtid dax.QualifiedTableID, sha

var host dax.Address

req := &mdshttp.IngestShardRequest{
req := &controllerhttp.IngestShardRequest{
Table: qtid,
Shard: shard,
}
Expand All @@ -462,7 +462,7 @@ func (c *Client) IngestShard(ctx context.Context, qtid dax.QualifiedTableID, sha
return host, errors.Errorf("status code: %d: %s", resp.StatusCode, b)
}

var isr *mdshttp.IngestShardResponse
var isr *controllerhttp.IngestShardResponse
if err := json.NewDecoder(resp.Body).Decode(&isr); err != nil {
return host, errors.Wrap(err, "reading response body")
}
Expand All @@ -475,7 +475,7 @@ func (c *Client) IngestPartition(ctx context.Context, qtid dax.QualifiedTableID,

var host dax.Address

req := &mdshttp.IngestPartitionRequest{
req := &controllerhttp.IngestPartitionRequest{
Table: qtid,
Partition: partition,
}
Expand All @@ -499,7 +499,7 @@ func (c *Client) IngestPartition(ctx context.Context, qtid dax.QualifiedTableID,
return host, errors.Errorf("status code: %d: %s", resp.StatusCode, b)
}

var isr *mdshttp.IngestPartitionResponse
var isr *controllerhttp.IngestPartitionResponse
if err := json.NewDecoder(resp.Body).Decode(&isr); err != nil {
return host, errors.Wrap(err, "reading response body")
}
Expand All @@ -513,7 +513,7 @@ func (c *Client) ComputeNodes(ctx context.Context, qtid dax.QualifiedTableID, sh

var nodes []dax.ComputeNode

req := &mdshttp.ComputeNodesRequest{
req := &controllerhttp.ComputeNodesRequest{
Table: qtid,
Shards: shards,
}
Expand All @@ -537,7 +537,7 @@ func (c *Client) ComputeNodes(ctx context.Context, qtid dax.QualifiedTableID, sh
return nodes, errors.Errorf("status code: %d: %s", resp.StatusCode, b)
}

var cnr *mdshttp.ComputeNodesResponse
var cnr *controllerhttp.ComputeNodesResponse
if err := json.NewDecoder(resp.Body).Decode(&cnr); err != nil {
return nodes, errors.Wrap(err, "reading response body")
}
Expand All @@ -551,7 +551,7 @@ func (c *Client) TranslateNodes(ctx context.Context, qtid dax.QualifiedTableID,

var nodes []dax.TranslateNode

req := &mdshttp.TranslateNodesRequest{
req := &controllerhttp.TranslateNodesRequest{
Table: qtid,
Partitions: partitions,
}
Expand All @@ -575,7 +575,7 @@ func (c *Client) TranslateNodes(ctx context.Context, qtid dax.QualifiedTableID,
return nodes, errors.Errorf("status code: %d: %s", resp.StatusCode, b)
}

var cnr *mdshttp.TranslateNodesResponse
var cnr *controllerhttp.TranslateNodesResponse
if err := json.NewDecoder(resp.Body).Decode(&cnr); err != nil {
return nodes, errors.Wrap(err, "reading response body")
}
Expand All @@ -587,7 +587,7 @@ func (c *Client) RegisterNode(ctx context.Context, node *dax.Node) error {
url := fmt.Sprintf("%s/register-node", c.address.WithScheme(defaultScheme))
c.logger.Debugf("RegisterNode: %s, url: %s", node.Address, url)

req := &mdshttp.RegisterNodeRequest{
req := &controllerhttp.RegisterNodeRequest{
Address: node.Address,
RoleTypes: node.RoleTypes,
}
Expand Down Expand Up @@ -618,7 +618,7 @@ func (c *Client) CheckInNode(ctx context.Context, node *dax.Node) error {
url := fmt.Sprintf("%s/check-in-node", c.address.WithScheme(defaultScheme))
c.logger.Debugf("CheckInNode url: %s", url)

req := &mdshttp.CheckInNodeRequest{
req := &controllerhttp.CheckInNodeRequest{
Address: node.Address,
RoleTypes: node.RoleTypes,
}
Expand Down
17 changes: 8 additions & 9 deletions dax/mds/controller/config.go → dax/controller/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,23 @@ package controller
import (
"time"

"github.com/featurebasedb/featurebase/v3/dax/boltdb"
"github.com/featurebasedb/featurebase/v3/dax/mds/schemar"
"github.com/featurebasedb/featurebase/v3/logger"
)

type NewBalancerFn func(string, logger.Logger) Balancer

type Config struct {
Director Director
Schemar schemar.Schemar

Balancer Balancer
// Poller
PollInterval time.Duration `toml:"poll-interval"`

StorageMethod string
BoltDB *boltdb.DB
// Storage
StorageMethod string `toml:"-"`
DataDir string `toml:"-"`

SnapshotterDir string
WriteloggerDir string
SnapshotterDir string `toml:"snapshotter-dir"`
WriteloggerDir string `toml:"writelogger-dir"`

// RegistrationBatchTimeout is the time that the controller will
// wait after a node registers itself to see if any more nodes
Expand All @@ -35,5 +34,5 @@ type Config struct {
// until the timeout expires to start another round of snapshots.
SnappingTurtleTimeout time.Duration

Logger logger.Logger
Logger logger.Logger `toml:"-"`
}
Loading