Skip to content

Commit

Permalink
[CHORE] Go Memberlist debug (chroma-core#1913)
Browse files Browse the repository at this point in the history
## Description of changes

*Summarize the changes made by this PR.*
 - Improvements & Bug fixes
	 - Add some debugs for memberlist in go
 - New functionality
	 - None

## Test plan
*How are these changes tested?*
- [x] Tests pass locally with `pytest` for python, `yarn test` for js,
`cargo test` for rust

## Documentation Changes
*Are all docstrings for user-facing APIs updated if required? Do we need
to make documentation changes in the [docs
repository](https://github.com/chroma-core/docs)?*
  • Loading branch information
HammadB authored and atroyn committed Apr 3, 2024
1 parent ac66934 commit 4d07e42
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 1 deletion.
2 changes: 1 addition & 1 deletion go/pkg/coordinator/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func NewWithGrpcProvider(config Config, provider grpcutils.GrpcProvider, db *gor
}

func createMemberlistManager(config Config) (*memberlist_manager.MemberlistManager, error) {
log.Info("Starting memberlist manager")
log.Info("Creating memberlist manager")
memberlist_name := config.WorkerMemberlistName
namespace := config.KubernetesNamespace
clientset, err := utils.GetKubernetesInterface()
Expand Down
1 change: 1 addition & 0 deletions go/pkg/memberlist_manager/memberlist_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func NewMemberlistManager(nodeWatcher IWatcher, memberlistStore IMemberlistStore
}

func (m *MemberlistManager) Start() error {
log.Info("Starting memberlist manager")
m.nodeWatcher.RegisterCallback(func(nodeIp string) {
m.workqueue.Add(nodeIp)
})
Expand Down
3 changes: 3 additions & 0 deletions go/pkg/memberlist_manager/memberlist_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package memberlist_manager
import (
"context"

"github.com/pingcap/log"
"go.uber.org/zap"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand Down Expand Up @@ -53,6 +55,7 @@ func (s *CRMemberlistStore) GetMemberlist(ctx context.Context) (return_memberlis

func (s *CRMemberlistStore) UpdateMemberlist(ctx context.Context, memberlist *Memberlist, resourceVersion string) error {
gvr := getGvr()
log.Info("Updating memberlist store", zap.Any("memberlist", memberlist))
unstructured := memberlistToCr(memberlist, s.coordinatorNamespace, s.memberlistCustomResource, resourceVersion)
_, err := s.dynamicClient.Resource(gvr).Namespace("chroma").Update(context.TODO(), unstructured, metav1.UpdateOptions{})
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions go/pkg/memberlist_manager/node_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type KubernetesWatcher struct {
}

func NewKubernetesWatcher(clientset kubernetes.Interface, coordinator_namespace string, pod_label string, resyncPeriod time.Duration) *KubernetesWatcher {
log.Info("Creating new kubernetes watcher", zap.String("namespace", coordinator_namespace), zap.String("pod label", pod_label), zap.Duration("resync period", resyncPeriod))
labelSelector := labels.SelectorFromSet(map[string]string{MemberLabel: pod_label})
factory := informers.NewSharedInformerFactoryWithOptions(clientset, resyncPeriod, informers.WithNamespace(coordinator_namespace), informers.WithTweakListOptions(func(options *metav1.ListOptions) { options.LabelSelector = labelSelector.String() }))
podInformer := factory.Core().V1().Pods().Informer()
Expand Down Expand Up @@ -75,6 +76,7 @@ func (w *KubernetesWatcher) Start() error {
log.Error("Error while asserting object to pod")
}
if err == nil {
log.Info("Kubernetes Pod Added", zap.String("key", key), zap.String("ip", objPod.Status.PodIP))
ip := objPod.Status.PodIP
w.mu.Lock()
w.ipToKey[ip] = key
Expand All @@ -91,6 +93,7 @@ func (w *KubernetesWatcher) Start() error {
log.Error("Error while asserting object to pod")
}
if err == nil {
log.Info("Kubernetes Pod Updated", zap.String("key", key), zap.String("ip", objPod.Status.PodIP))
ip := objPod.Status.PodIP
w.ipToKey[ip] = key
w.notify(ip)
Expand All @@ -105,6 +108,7 @@ func (w *KubernetesWatcher) Start() error {
log.Error("Error while asserting object to pod")
}
if err == nil {
log.Info("Kubernetes Pod Deleted", zap.String("ip", objPod.Status.PodIP))
ip := objPod.Status.PodIP
// The contract for GetStatus is that if the ip is not in this map, then it returns NotReady
delete(w.ipToKey, ip)
Expand Down

0 comments on commit 4d07e42

Please sign in to comment.