Skip to content

Commit

Permalink
chore: consumer data structures (#470)
Browse files Browse the repository at this point in the history
  • Loading branch information
tokers committed May 25, 2021
1 parent d6d3796 commit 1dd5087
Show file tree
Hide file tree
Showing 12 changed files with 591 additions and 6 deletions.
20 changes: 15 additions & 5 deletions pkg/apisix/apisix.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type Cluster interface {
}

// Route is the specific client interface to take over the create, update,
// list and delete for APISIX's Route resource.
// list and delete for APISIX Route resource.
type Route interface {
Get(context.Context, string) (*v1.Route, error)
List(context.Context) ([]*v1.Route, error)
Expand All @@ -63,7 +63,7 @@ type Route interface {
}

// SSL is the specific client interface to take over the create, update,
// list and delete for APISIX's SSL resource.
// list and delete for APISIX SSL resource.
type SSL interface {
Get(context.Context, string) (*v1.Ssl, error)
List(context.Context) ([]*v1.Ssl, error)
Expand All @@ -73,7 +73,7 @@ type SSL interface {
}

// Upstream is the specific client interface to take over the create, update,
// list and delete for APISIX's Upstream resource.
// list and delete for APISIX Upstream resource.
type Upstream interface {
Get(context.Context, string) (*v1.Upstream, error)
List(context.Context) ([]*v1.Upstream, error)
Expand All @@ -83,7 +83,7 @@ type Upstream interface {
}

// StreamRoute is the specific client interface to take over the create, update,
// list and delete for APISIX's Stream Route resource.
// list and delete for APISIX Stream Route resource.
type StreamRoute interface {
Get(context.Context, string) (*v1.StreamRoute, error)
List(context.Context) ([]*v1.StreamRoute, error)
Expand All @@ -93,7 +93,7 @@ type StreamRoute interface {
}

// GlobalRule is the specific client interface to take over the create, update,
// list and delete for APISIX's Global Rule resource.
// list and delete for APISIX Global Rule resource.
type GlobalRule interface {
Get(context.Context, string) (*v1.GlobalRule, error)
List(context.Context) ([]*v1.GlobalRule, error)
Expand All @@ -102,6 +102,16 @@ type GlobalRule interface {
Update(context.Context, *v1.GlobalRule) (*v1.GlobalRule, error)
}

// Consumer it the specific client interface to take over the create, update,
// list and delete for APISIX Consumer resource.
type Consumer interface {
Get(context.Context, string) (*v1.Consumer, error)
List(context.Context) ([]*v1.Consumer, error)
Create(context.Context, *v1.Consumer) (*v1.Consumer, error)
Delete(context.Context, *v1.Consumer) error
Update(context.Context, *v1.Consumer) (*v1.Consumer, error)
}

type apisix struct {
mu sync.RWMutex
nonExistentCluster Cluster
Expand Down
8 changes: 8 additions & 0 deletions pkg/apisix/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ type Cache interface {
InsertStreamRoute(*v1.StreamRoute) error
// InsertGlobalRule adds or updates global_rule to cache.
InsertGlobalRule(*v1.GlobalRule) error
// InsertConsumer adds or updates consumer to cache.
InsertConsumer(*v1.Consumer) error

// GetRoute finds the route from cache according to the primary index (id).
GetRoute(string) (*v1.Route, error)
Expand All @@ -44,6 +46,8 @@ type Cache interface {
GetStreamRoute(string) (*v1.StreamRoute, error)
// GetGlobalRule finds the global_rule from cache according to the primary index (id).
GetGlobalRule(string) (*v1.GlobalRule, error)
// GetConsumer finds the consumer from cache according to the primary index (id).
GetConsumer(string) (*v1.Consumer, error)

// ListRoutes lists all routes in cache.
ListRoutes() ([]*v1.Route, error)
Expand All @@ -55,6 +59,8 @@ type Cache interface {
ListStreamRoutes() ([]*v1.StreamRoute, error)
// ListGlobalRules lists all global_rule objects in cache.
ListGlobalRules() ([]*v1.GlobalRule, error)
// ListConsumers lists all consumer objects in cache.
ListConsumers() ([]*v1.Consumer, error)

// DeleteRoute deletes the specified route in cache.
DeleteRoute(*v1.Route) error
Expand All @@ -66,4 +72,6 @@ type Cache interface {
DeleteStreamRoute(*v1.StreamRoute) error
// DeleteGlobalRule deletes the specified stream_route in cache.
DeleteGlobalRule(*v1.GlobalRule) error
// DeleteConsumer deletes the specified consumer in cache.
DeleteConsumer(*v1.Consumer) error
}
28 changes: 28 additions & 0 deletions pkg/apisix/cache/memdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ func (c *dbCache) InsertGlobalRule(gr *v1.GlobalRule) error {
return c.insert("global_rule", gr.DeepCopy())
}

func (c *dbCache) InsertConsumer(consumer *v1.Consumer) error {
return c.insert("consumer", consumer.DeepCopy())
}

func (c *dbCache) insert(table string, obj interface{}) error {
txn := c.db.Txn(true)
defer txn.Abort()
Expand Down Expand Up @@ -116,6 +120,14 @@ func (c *dbCache) GetGlobalRule(id string) (*v1.GlobalRule, error) {
return obj.(*v1.GlobalRule).DeepCopy(), nil
}

func (c *dbCache) GetConsumer(username string) (*v1.Consumer, error) {
obj, err := c.get("consumer", username)
if err != nil {
return nil, err
}
return obj.(*v1.Consumer).DeepCopy(), nil
}

func (c *dbCache) get(table, id string) (interface{}, error) {
txn := c.db.Txn(false)
defer txn.Abort()
Expand Down Expand Up @@ -192,6 +204,18 @@ func (c *dbCache) ListGlobalRules() ([]*v1.GlobalRule, error) {
return globalRules, nil
}

func (c *dbCache) ListConsumers() ([]*v1.Consumer, error) {
raws, err := c.list("consumer")
if err != nil {
return nil, err
}
consumers := make([]*v1.Consumer, 0, len(raws))
for _, raw := range raws {
consumers = append(consumers, raw.(*v1.Consumer).DeepCopy())
}
return consumers, nil
}

func (c *dbCache) list(table string) ([]interface{}, error) {
txn := c.db.Txn(false)
defer txn.Abort()
Expand Down Expand Up @@ -229,6 +253,10 @@ func (c *dbCache) DeleteGlobalRule(gr *v1.GlobalRule) error {
return c.delete("global_rule", gr)
}

func (c *dbCache) DeleteConsumer(consumer *v1.Consumer) error {
return c.delete("consumer", consumer)
}

func (c *dbCache) delete(table string, obj interface{}) error {
txn := c.db.Txn(true)
defer txn.Abort()
Expand Down
45 changes: 44 additions & 1 deletion pkg/apisix/cache/memdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ func TestMemDBCacheGlobalRule(t *testing.T) {

gr, err = c.GetGlobalRule("3")
assert.Nil(t, err)
assert.Equal(t, gr, gr)
assert.Equal(t, gr, gr3)

assert.Nil(t, c.DeleteGlobalRule(gr), "delete global_rule r3")

Expand All @@ -302,3 +302,46 @@ func TestMemDBCacheGlobalRule(t *testing.T) {
}
assert.Error(t, ErrNotFound, c.DeleteGlobalRule(gr4))
}

func TestMemDBCacheConsumer(t *testing.T) {
c, err := NewMemDBCache()
assert.Nil(t, err, "NewMemDBCache")

c1 := &v1.Consumer{
Username: "jack",
}
assert.Nil(t, c.InsertConsumer(c1), "inserting consumer c1")

c11, err := c.GetConsumer("jack")
assert.Nil(t, err)
assert.Equal(t, c1, c11)

c2 := &v1.Consumer{
Username: "tom",
}
c3 := &v1.Consumer{
Username: "jerry",
}
assert.Nil(t, c.InsertConsumer(c2), "inserting consumer c2")
assert.Nil(t, c.InsertConsumer(c3), "inserting consumer c3")

c22, err := c.GetConsumer("tom")
assert.Nil(t, err)
assert.Equal(t, c2, c22)

assert.Nil(t, c.DeleteConsumer(c3), "delete consumer c3")

consumers, err := c.ListConsumers()
assert.Nil(t, err, "listing consumers")

if consumers[0].Username > consumers[1].Username {
consumers[0], consumers[1] = consumers[1], consumers[0]
}
assert.Equal(t, consumers[0], c1)
assert.Equal(t, consumers[1], c2)

c4 := &v1.Consumer{
Username: "chandler",
}
assert.Error(t, ErrNotFound, c.DeleteConsumer(c4))
}
10 changes: 10 additions & 0 deletions pkg/apisix/cache/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,16 @@ var (
},
},
},
"consumer": {
Name: "consumer",
Indexes: map[string]*memdb.IndexSchema{
"id": {
Name: "id",
Unique: true,
Indexer: &memdb.StringFieldIndex{Field: "Username"},
},
},
},
},
}
)
16 changes: 16 additions & 0 deletions pkg/apisix/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ type cluster struct {
ssl SSL
streamRoute StreamRoute
globalRules GlobalRule
consumer Consumer
}

func newCluster(o *ClusterOptions) (Cluster, error) {
Expand Down Expand Up @@ -105,6 +106,7 @@ func newCluster(o *ClusterOptions) (Cluster, error) {
c.ssl = newSSLClient(c)
c.streamRoute = newStreamRouteClient(c)
c.globalRules = newGlobalRuleClient(c)
c.consumer = newConsumerClient(c)

go c.syncCache()

Expand Down Expand Up @@ -179,6 +181,11 @@ func (c *cluster) syncCacheOnce() (bool, error) {
log.Errorf("failed to list global_rules in APISIX: %s", err)
return false, err
}
consumers, err := c.consumer.List(context.TODO())
if err != nil {
log.Errorf("failed to list consumers in APISIX: %s", err)
return false, err
}

for _, r := range routes {
if err := c.cache.InsertRoute(r); err != nil {
Expand Down Expand Up @@ -230,6 +237,15 @@ func (c *cluster) syncCacheOnce() (bool, error) {
return false, err
}
}
for _, consumer := range consumers {
if err := c.cache.InsertConsumer(consumer); err != nil {
log.Errorw("failed to insert consumer to cache",
zap.Any("consumer", consumer),
zap.String("cluster", c.name),
zap.String("error", err.Error()),
)
}
}
return true, nil
}

Expand Down
Loading

0 comments on commit 1dd5087

Please sign in to comment.