Skip to content
This repository has been archived by the owner on Feb 21, 2024. It is now read-only.

WIP: A new t-store for a new varchar data type (fb-1834) #2305

Closed
wants to merge 17 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
108 changes: 107 additions & 1 deletion api.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ import (
"github.com/featurebasedb/featurebase/v3/disco"
"github.com/featurebasedb/featurebase/v3/logger"
"github.com/featurebasedb/featurebase/v3/rbf"
"github.com/featurebasedb/featurebase/v3/tstore"
"github.com/featurebasedb/featurebase/v3/wireprotocol"
"github.com/prometheus/client_golang/prometheus"

//"github.com/featurebasedb/featurebase/v3/pg"
"github.com/featurebasedb/featurebase/v3/pql"
"github.com/featurebasedb/featurebase/v3/roaring"
planner_types "github.com/featurebasedb/featurebase/v3/sql3/planner/types"
Expand Down Expand Up @@ -263,9 +264,13 @@ func (api *API) CreateIndex(ctx context.Context, indexName string, options Index
return nil, errors.Wrap(err, "validating api method")
}

// get the next indexID number
indexID := int32(len(api.holder.indexes) + 1)

// Populate the create index message.
ts := timestamp()
cim := &CreateIndexMessage{
IndexID: indexID,
Index: indexName,
CreatedAt: ts,
Owner: requestUserID,
Expand Down Expand Up @@ -1727,13 +1732,22 @@ func (api *API) ImportRoaringShard(ctx context.Context, indexName string, shard
}
}

// handle the tuple data
if len(req.Tuples) > 0 {
err := api.importTuples(ctx, tx, shard, indexName, req.Tuples)
if err != nil {
return err
}
}

if api.isComputeNode && !req.SuppressLog {
partition := disco.ShardToShardPartition(indexName, shard, disco.DefaultPartitionN)
msg := &computer.ImportRoaringShardMessage{
Table: indexName,
Partition: partition,
Shard: shard,
Views: make([]computer.RoaringUpdate, len(req.Views)),
Tuples: req.Tuples,
}
for i, view := range req.Views {
msg.Views[i] = computer.RoaringUpdate{
Expand Down Expand Up @@ -1766,6 +1780,93 @@ func (api *API) ImportRoaringShard(ctx context.Context, indexName string, shard
return nil
}

func cleanupView(fieldType string, viewUpdate *RoaringUpdate) error {
// TODO wouldn't hurt to have consolidated logic somewhere for validating view names.
switch fieldType {
case FieldTypeSet, FieldTypeTime:
if viewUpdate.View == "" {
viewUpdate.View = "standard"
}
// add 'standard_' if we just have a time... this is how IDK works by default
if fieldType == FieldTypeTime && !strings.HasPrefix(viewUpdate.View, viewStandard) {
viewUpdate.View = fmt.Sprintf("%s_%s", viewStandard, viewUpdate.View)
}
case FieldTypeInt, FieldTypeDecimal, FieldTypeTimestamp:
if viewUpdate.View == "" {
viewUpdate.View = "bsig_" + viewUpdate.Field
} else if viewUpdate.View != "bsig_"+viewUpdate.Field {
return NewBadRequestError(errors.Errorf("invalid view name (%s) for field %s of type %s", viewUpdate.View, viewUpdate.Field, fieldType))
}
}
return nil
}

func (api *API) importTuples(ctx context.Context, tx Tx, shard uint64, tableName string, tupleData []byte) error {
// get the table
index, err := api.Index(ctx, tableName)
if err != nil {
return err
}

// if the index id is 0 then we can't do an insert into the t-store
if index.ID == 0 {
return errors.Errorf("cannot insert into table '%s' because it does not have a non-zero object id", tableName)
}

b, err := index.GetTStore(shard)
if err != nil {
return err
}

// start to read the data for the import
rdr := bytes.NewReader(tupleData)
_, err = wireprotocol.ExpectToken(rdr, wireprotocol.TOKEN_SCHEMA_INFO)
if err != nil {
return err
}

// get the row schema from the import data
rowSchema, err := wireprotocol.ReadSchema(rdr)
if err != nil {
return err
}

// read rows until we get to the end
tk, err := wireprotocol.ReadToken(rdr)
if err != nil {
return err
}
for tk == wireprotocol.TOKEN_ROW {
rr, err := wireprotocol.ReadRow(rdr, rowSchema)
if err != nil {
return err
}

// make sure the key is at offset 0
s := rowSchema[0]
if !strings.EqualFold(s.ColumnName, "_id") {
return errors.Errorf("unexpected key column position ")
}

err = b.Insert(&tstore.BTreeTuple{
TupleSchema: rowSchema,
Tuple: rr,
})
if err != nil {
return err
}

tk, err = wireprotocol.ReadToken(rdr)
if err != nil {
return err
}
}
if tk != wireprotocol.TOKEN_DONE {
return errors.Errorf("unexpected token '%d'", tk)
}
return nil
}

// ImportValue is a wrapper around the common code in ImportValueWithTx, which
// currently just translates req.Clear into a clear ImportOption.
func (api *API) ImportValue(ctx context.Context, qcx *Qcx, req *ImportValueRequest, opts ...ImportOption) error {
Expand Down Expand Up @@ -3329,18 +3430,23 @@ func (n *NopSchemaAPI) DropDatabase(context.Context, dax.DatabaseID) error { re
func (n *NopSchemaAPI) DatabaseByName(ctx context.Context, dbname dax.DatabaseName) (*dax.Database, error) {
return nil, nil
}

func (n *NopSchemaAPI) DatabaseByID(ctx context.Context, dbid dax.DatabaseID) (*dax.Database, error) {
return nil, nil
}

func (n *NopSchemaAPI) SetDatabaseOption(ctx context.Context, dbid dax.DatabaseID, option string, value string) error {
return nil
}

func (n *NopSchemaAPI) Databases(context.Context, ...dax.DatabaseID) ([]*dax.Database, error) {
return nil, nil
}

func (n *NopSchemaAPI) TableByName(ctx context.Context, tname dax.TableName) (*dax.Table, error) {
return nil, nil
}

func (n *NopSchemaAPI) TableByID(ctx context.Context, tid dax.TableID) (*dax.Table, error) {
return nil, nil
}
Expand Down