Skip to content

Commit

Permalink
kvstore: Replace package calls with .Client.function
Browse files Browse the repository at this point in the history
We need to use different backends at the same time. Currently we assume a
single connected backend instance by using the package level functions.
We now direclty call these functions on the default client instead,
allowing future changes to use non-global instances of the backends.

Signed-off-by: Ray Bejjani <ray@isovalent.com>
  • Loading branch information
raybejjani authored and aanm committed Feb 17, 2020
1 parent f868f97 commit 76c7fca
Show file tree
Hide file tree
Showing 18 changed files with 154 additions and 275 deletions.
4 changes: 2 additions & 2 deletions cilium/cmd/kvstore_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ var kvstoreDeleteCmd = &cobra.Command{
setupKvstore(ctx)

if recursive {
if err := kvstore.DeletePrefix(ctx, args[0]); err != nil {
if err := kvstore.Client().DeletePrefix(ctx, args[0]); err != nil {
Fatalf("Unable to delete keys: %s", err)
}
} else {
if err := kvstore.Delete(ctx, args[0]); err != nil {
if err := kvstore.Client().Delete(ctx, args[0]); err != nil {
Fatalf("Unable to delete key: %s", err)
}
}
Expand Down
8 changes: 4 additions & 4 deletions cilium/cmd/kvstore_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ var kvstoreGetCmd = &cobra.Command{
setupKvstore(ctx)

if recursive {
pairs, err := kvstore.ListPrefix(ctx, key)
pairs, err := kvstore.Client().ListPrefix(ctx, key)
if err != nil {
Fatalf("Unable to list keys: %s", err)
}
Expand All @@ -57,17 +57,17 @@ var kvstoreGetCmd = &cobra.Command{
fmt.Printf("%s => %s\n", k, string(v.Data))
}
} else {
val, err := kvstore.Get(ctx, key)
val, err := kvstore.Client().Get(ctx, key)
if err != nil || val == nil {
Fatalf("Unable to retrieve key: %s", err)
}
if command.OutputJSON() {
if err := command.PrintOutput(*val); err != nil {
if err := command.PrintOutput(string(val)); err != nil {
os.Exit(1)
}
return
}
fmt.Printf("%s => %s\n", key, *val)
fmt.Printf("%s => %s\n", key, string(val))
}
},
}
Expand Down
2 changes: 1 addition & 1 deletion cilium/cmd/kvstore_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ var kvstoreSetCmd = &cobra.Command{

setupKvstore(ctx)

err := kvstore.Set(ctx, key, value)
err := kvstore.Client().Set(ctx, key, []byte(value))
if err != nil {
Fatalf("Unable to set key: %s", err)
}
Expand Down
8 changes: 4 additions & 4 deletions daemon/daemon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ func (ds *DaemonSuite) SetUpTest(c *C) {
c.Assert(err, IsNil)
ds.d = d

kvstore.DeletePrefix(context.TODO(), common.OperationalPath)
kvstore.DeletePrefix(context.TODO(), kvstore.BaseKeyPrefix)
kvstore.Client().DeletePrefix(context.TODO(), common.OperationalPath)
kvstore.Client().DeletePrefix(context.TODO(), kvstore.BaseKeyPrefix)

ds.OnGetPolicyRepository = nil
ds.OnQueueEndpointBuild = nil
Expand All @@ -136,8 +136,8 @@ func (ds *DaemonSuite) TearDownTest(c *C) {
}

if ds.kvstoreInit {
kvstore.DeletePrefix(context.TODO(), common.OperationalPath)
kvstore.DeletePrefix(context.TODO(), kvstore.BaseKeyPrefix)
kvstore.Client().DeletePrefix(context.TODO(), common.OperationalPath)
kvstore.Client().DeletePrefix(context.TODO(), kvstore.BaseKeyPrefix)
}

// Restore the policy enforcement mode.
Expand Down
2 changes: 1 addition & 1 deletion pkg/clustermesh/clustermesh_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (o *testObserver) OnDelete(k store.NamedKey) {

func (s *ClusterMeshTestSuite) TestClusterMesh(c *C) {
kvstore.SetupDummy("etcd")
defer kvstore.Close()
defer kvstore.Client().Close()

identity.InitWellKnownIdentities()
// The nils are only used by k8s CRD identities. We default to kvstore.
Expand Down
30 changes: 15 additions & 15 deletions pkg/clustermesh/services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (s *ClusterMeshServicesTestSuite) SetUpTest(c *C) {

s.randomName = testutils.RandomRune()

kvstore.DeletePrefix(context.TODO(), "cilium/state/services/v1/"+s.randomName)
kvstore.Client().DeletePrefix(context.TODO(), "cilium/state/services/v1/"+s.randomName)
s.svcCache = k8s.NewServiceCache()
identity.InitWellKnownIdentities()

Expand Down Expand Up @@ -109,8 +109,8 @@ func (s *ClusterMeshServicesTestSuite) TearDownTest(c *C) {
}

os.RemoveAll(s.testDir)
kvstore.DeletePrefix(context.TODO(), "cilium/state/services/v1/"+s.randomName)
kvstore.Close()
kvstore.Client().DeletePrefix(context.TODO(), "cilium/state/services/v1/"+s.randomName)
kvstore.Client().Close()
}

func (s *ClusterMeshServicesTestSuite) expectEvent(c *C, action k8s.CacheAction, id k8s.ServiceID, fn func(event k8s.ServiceEvent) bool) {
Expand All @@ -137,9 +137,9 @@ func (s *ClusterMeshServicesTestSuite) expectEvent(c *C, action k8s.CacheAction,

func (s *ClusterMeshServicesTestSuite) TestClusterMeshServicesGlobal(c *C) {
k, v := s.prepareServiceUpdate("1", "10.0.185.196", "http", "80")
kvstore.Set(context.TODO(), k, v)
kvstore.Client().Set(context.TODO(), k, []byte(v))
k, v = s.prepareServiceUpdate("2", "20.0.185.196", "http2", "90")
kvstore.Set(context.TODO(), k, v)
kvstore.Client().Set(context.TODO(), k, []byte(v))

swgSvcs := lock.NewStoppableWaitGroup()
k8sSvc := &types.Service{
Expand Down Expand Up @@ -197,10 +197,10 @@ func (s *ClusterMeshServicesTestSuite) TestClusterMeshServicesGlobal(c *C) {
return event.Endpoints.Backends["30.0.185.196"] == nil
})

kvstore.DeletePrefix(context.TODO(), "cilium/state/services/v1/"+s.randomName+"1")
kvstore.Client().DeletePrefix(context.TODO(), "cilium/state/services/v1/"+s.randomName+"1")
s.expectEvent(c, k8s.UpdateService, svcID, nil)

kvstore.DeletePrefix(context.TODO(), "cilium/state/services/v1/"+s.randomName+"2")
kvstore.Client().DeletePrefix(context.TODO(), "cilium/state/services/v1/"+s.randomName+"2")
s.expectEvent(c, k8s.DeleteService, svcID, nil)

swgSvcs.Stop()
Expand All @@ -218,9 +218,9 @@ func (s *ClusterMeshServicesTestSuite) TestClusterMeshServicesGlobal(c *C) {

func (s *ClusterMeshServicesTestSuite) TestClusterMeshServicesUpdate(c *C) {
k, v := s.prepareServiceUpdate("1", "10.0.185.196", "http", "80")
kvstore.Set(context.TODO(), k, v)
kvstore.Client().Set(context.TODO(), k, []byte(v))
k, v = s.prepareServiceUpdate("2", "20.0.185.196", "http2", "90")
kvstore.Set(context.TODO(), k, v)
kvstore.Client().Set(context.TODO(), k, []byte(v))

k8sSvc := &types.Service{
Service: &v1.Service{
Expand Down Expand Up @@ -251,26 +251,26 @@ func (s *ClusterMeshServicesTestSuite) TestClusterMeshServicesUpdate(c *C) {
})

k, v = s.prepareServiceUpdate("1", "80.0.185.196", "http", "8080")
kvstore.Set(context.TODO(), k, v)
kvstore.Client().Set(context.TODO(), k, []byte(v))
s.expectEvent(c, k8s.UpdateService, svcID, func(event k8s.ServiceEvent) bool {
return event.Endpoints.Backends["80.0.185.196"] != nil &&
event.Endpoints.Backends["20.0.185.196"] != nil
})

k, v = s.prepareServiceUpdate("2", "90.0.185.196", "http", "8080")
kvstore.Set(context.TODO(), k, v)
kvstore.Client().Set(context.TODO(), k, []byte(v))
s.expectEvent(c, k8s.UpdateService, svcID, func(event k8s.ServiceEvent) bool {
return event.Endpoints.Backends["80.0.185.196"] != nil &&
event.Endpoints.Backends["90.0.185.196"] != nil
})

kvstore.DeletePrefix(context.TODO(), "cilium/state/services/v1/"+s.randomName+"1")
kvstore.Client().DeletePrefix(context.TODO(), "cilium/state/services/v1/"+s.randomName+"1")
// The observer will have a defaults.NodeDeleteDelay time before it receives
// the event. For this reason we will trigger the delete events sequentially
// and only do the assertion in the end. This way we wait 30seconds for the
// test to complete instead of 30+30 seconds.
time.Sleep(2 * time.Second)
kvstore.DeletePrefix(context.TODO(), "cilium/state/services/v1/"+s.randomName+"2")
kvstore.Client().DeletePrefix(context.TODO(), "cilium/state/services/v1/"+s.randomName+"2")

s.expectEvent(c, k8s.UpdateService, svcID, nil)
s.expectEvent(c, k8s.DeleteService, svcID, nil)
Expand All @@ -284,9 +284,9 @@ func (s *ClusterMeshServicesTestSuite) TestClusterMeshServicesUpdate(c *C) {

func (s *ClusterMeshServicesTestSuite) TestClusterMeshServicesNonGlobal(c *C) {
k, v := s.prepareServiceUpdate("1", "10.0.185.196", "http", "80")
kvstore.Set(context.TODO(), k, v)
kvstore.Client().Set(context.TODO(), k, []byte(v))
k, v = s.prepareServiceUpdate("2", "20.0.185.196", "http2", "90")
kvstore.Set(context.TODO(), k, v)
kvstore.Client().Set(context.TODO(), k, []byte(v))

k8sSvc := &types.Service{
Service: &v1.Service{
Expand Down
2 changes: 1 addition & 1 deletion pkg/endpoint/endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (s *EndpointSuite) SetUpTest(c *C) {

func (s *EndpointSuite) TearDownTest(c *C) {
s.mgr.Close()
kvstore.Close()
kvstore.Client().Close()
}

func (s *EndpointSuite) TestEndpointStatus(c *C) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/endpoint/redirect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (d *DummyOwner) UpdateIdentities(added, deleted cache.IdentityCache) {}
func (s *RedirectSuite) TestAddVisibilityRedirects(c *check.C) {
// Setup dependencies for endpoint.
kvstore.SetupDummy("etcd")
defer kvstore.Close()
defer kvstore.Client().Close()

identity.InitWellKnownIdentities()
idAllocatorOwner := &DummyIdentityAllocatorOwner{}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ipcache/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,13 @@ type kvstoreImplementation struct{}
// upsert places the mapping of {key, value} into the kvstore, optionally with
// a lease.
func (k kvstoreImplementation) upsert(ctx context.Context, key string, value string, lease bool) error {
_, err := kvstore.UpdateIfDifferent(ctx, key, value, lease)
_, err := kvstore.Client().UpdateIfDifferent(ctx, key, []byte(value), lease)
return err
}

// release removes the specified key from the kvstore.
func (k kvstoreImplementation) release(ctx context.Context, key string) error {
return kvstore.Delete(ctx, key)
return kvstore.Client().Delete(ctx, key)
}

// kvReferenceCounter provides a thin wrapper around the kvstore which adds
Expand Down
2 changes: 1 addition & 1 deletion pkg/k8s/cnpstatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func (c *CNPStatusEventHandler) watchForCNPStatusEvents(watcher *kvstore.Watcher
func (c *CNPStatusEventHandler) StopStatusHandler(cnp *types.SlimCNP) {
cnpKey := getKeyFromObject(cnp.GetObjectMeta())
prefix := formatKeyForKvstore(cnp.GetObjectMeta())
err := kvstore.DeletePrefix(context.TODO(), prefix)
err := kvstore.Client().DeletePrefix(context.TODO(), prefix)
if err != nil {
log.WithError(err).WithField("prefix", prefix).Warning("error deleting prefix from kvstore")
}
Expand Down
Loading

0 comments on commit 76c7fca

Please sign in to comment.