Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: move redis key to pkg/redis package #2378

Merged
merged 1 commit into from
May 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
58 changes: 0 additions & 58 deletions manager/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package cache

import (
"fmt"
"time"

"github.com/go-redis/cache/v8"
Expand All @@ -27,23 +26,6 @@ import (
pkgredis "d7y.io/dragonfly/v2/pkg/redis"
)

const (
// Seed Peer prefix of cache key.
SeedPeerNamespace = "seed-peers"

// Peer prefix of cache key.
PeerNamespace = "peers"

// Scheduler prefix of cache key.
SchedulerNamespace = "schedulers"

// Applications prefix of cache key.
ApplicationsNamespace = "applications"

// Buckets prefix of cache key.
BucketsNamespace = "buckets"
)

// Cache is cache client.
type Cache struct {
*cache.Cache
Expand Down Expand Up @@ -76,43 +58,3 @@ func New(cfg *config.Config) (*Cache, error) {
TTL: cfg.Cache.Redis.TTL,
}, nil
}

// Make namespace cache key.
func MakeNamespaceCacheKey(namespace string) string {
return fmt.Sprintf("manager:%s", namespace)
}

// Make cache key.
func MakeCacheKey(namespace string, id string) string {
return fmt.Sprintf("%s:%s", MakeNamespaceCacheKey(namespace), id)
}

// Make cache key for seed peer.
func MakeSeedPeerCacheKey(clusterID uint, hostname, ip string) string {
return MakeCacheKey(SeedPeerNamespace, fmt.Sprintf("%d-%s-%s", clusterID, hostname, ip))
}

// Make cache key for scheduler.
func MakeSchedulerCacheKey(clusterID uint, hostname, ip string) string {
return MakeCacheKey(SchedulerNamespace, fmt.Sprintf("%d-%s-%s", clusterID, hostname, ip))
}

// Make cache key for peer.
func MakePeerCacheKey(hostname, ip string) string {
return MakeCacheKey(PeerNamespace, fmt.Sprintf("%s-%s", hostname, ip))
}

// Make schedulers cache key for peer.
func MakeSchedulersCacheKeyForPeer(hostname, ip string) string {
return MakeCacheKey(PeerNamespace, fmt.Sprintf("%s-%s:schedulers", hostname, ip))
}

// Make applications cache key.
func MakeApplicationsCacheKey() string {
return MakeNamespaceCacheKey(ApplicationsNamespace)
}

// Make cache key for bucket.
func MakeBucketCacheKey(name string) string {
return MakeCacheKey(BucketsNamespace, name)
}
23 changes: 12 additions & 11 deletions manager/rpcserver/manager_server_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"d7y.io/dragonfly/v2/manager/types"
pkgcache "d7y.io/dragonfly/v2/pkg/cache"
"d7y.io/dragonfly/v2/pkg/objectstorage"
pkgredis "d7y.io/dragonfly/v2/pkg/redis"
"d7y.io/dragonfly/v2/pkg/slices"
)

Expand Down Expand Up @@ -93,7 +94,7 @@ func newManagerServerV1(
// Get SeedPeer and SeedPeer cluster configuration.
func (s *managerServerV1) GetSeedPeer(ctx context.Context, req *managerv1.GetSeedPeerRequest) (*managerv1.SeedPeer, error) {
log := logger.WithHostnameAndIP(req.Hostname, req.Ip)
cacheKey := cache.MakeSeedPeerCacheKey(uint(req.SeedPeerClusterId), req.Hostname, req.Ip)
cacheKey := pkgredis.MakeSeedPeerKeyInManager(uint(req.SeedPeerClusterId), req.Hostname, req.Ip)

// Cache hit.
var pbSeedPeer managerv1.SeedPeer
Expand Down Expand Up @@ -209,7 +210,7 @@ func (s *managerServerV1) UpdateSeedPeer(ctx context.Context, req *managerv1.Upd

if err := s.cache.Delete(
ctx,
cache.MakeSeedPeerCacheKey(seedPeer.SeedPeerClusterID, seedPeer.Hostname, seedPeer.IP),
pkgredis.MakeSeedPeerKeyInManager(seedPeer.SeedPeerClusterID, seedPeer.Hostname, seedPeer.IP),
); err != nil {
log.Warn(err)
}
Expand Down Expand Up @@ -265,7 +266,7 @@ func (s *managerServerV1) createSeedPeer(ctx context.Context, req *managerv1.Upd
// Get Scheduler and Scheduler cluster configuration.
func (s *managerServerV1) GetScheduler(ctx context.Context, req *managerv1.GetSchedulerRequest) (*managerv1.Scheduler, error) {
log := logger.WithHostnameAndIP(req.Hostname, req.Ip)
cacheKey := cache.MakeSchedulerCacheKey(uint(req.SchedulerClusterId), req.Hostname, req.Ip)
cacheKey := pkgredis.MakeSchedulerKeyInManager(uint(req.SchedulerClusterId), req.Hostname, req.Ip)

// Cache hit.
var pbScheduler managerv1.Scheduler
Expand Down Expand Up @@ -404,7 +405,7 @@ func (s *managerServerV1) UpdateScheduler(ctx context.Context, req *managerv1.Up

if err := s.cache.Delete(
ctx,
cache.MakeSchedulerCacheKey(scheduler.SchedulerClusterID, scheduler.Hostname, scheduler.IP),
pkgredis.MakeSchedulerKeyInManager(scheduler.SchedulerClusterID, scheduler.Hostname, scheduler.IP),
); err != nil {
log.Warn(err)
}
Expand Down Expand Up @@ -484,7 +485,7 @@ func (s *managerServerV1) ListSchedulers(ctx context.Context, req *managerv1.Lis

// Cache hit.
var pbListSchedulersResponse managerv1.ListSchedulersResponse
cacheKey := cache.MakeSchedulersCacheKeyForPeer(req.Hostname, req.Ip)
cacheKey := pkgredis.MakeSchedulersKeyForPeerInManager(req.Hostname, req.Ip)

if err := s.cache.Get(ctx, cacheKey, &pbListSchedulersResponse); err != nil {
log.Warnf("%s cache miss because of %s", cacheKey, err.Error())
Expand Down Expand Up @@ -626,7 +627,7 @@ func (s *managerServerV1) ListBuckets(ctx context.Context, req *managerv1.ListBu

log := logger.WithHostnameAndIP(req.Hostname, req.Ip)
var pbListBucketsResponse managerv1.ListBucketsResponse
cacheKey := cache.MakeBucketCacheKey(s.objectStorageConfig.Name)
cacheKey := pkgredis.MakeBucketKeyInManager(s.objectStorageConfig.Name)

// Cache hit.
if err := s.cache.Get(ctx, cacheKey, &pbListBucketsResponse); err != nil {
Expand Down Expand Up @@ -668,7 +669,7 @@ func (s *managerServerV1) ListApplications(ctx context.Context, req *managerv1.L

// Cache hit.
var pbListApplicationsResponse managerv1.ListApplicationsResponse
cacheKey := cache.MakeApplicationsCacheKey()
cacheKey := pkgredis.MakeApplicationsKeyInManager()
if err := s.cache.Get(ctx, cacheKey, &pbListApplicationsResponse); err != nil {
log.Warnf("%s cache miss because of %s", cacheKey, err.Error())
} else {
Expand Down Expand Up @@ -771,7 +772,7 @@ func (s *managerServerV1) KeepAlive(stream managerv1.Manager_KeepAliveServer) er

if err := s.cache.Delete(
context.TODO(),
cache.MakeSchedulerCacheKey(clusterID, hostname, ip),
pkgredis.MakeSchedulerKeyInManager(clusterID, hostname, ip),
); err != nil {
log.Warnf("refresh keepalive status failed: %s", err.Error())
}
Expand All @@ -791,7 +792,7 @@ func (s *managerServerV1) KeepAlive(stream managerv1.Manager_KeepAliveServer) er

if err := s.cache.Delete(
context.TODO(),
cache.MakeSeedPeerCacheKey(clusterID, hostname, ip),
pkgredis.MakeSeedPeerKeyInManager(clusterID, hostname, ip),
); err != nil {
log.Warnf("refresh keepalive status failed: %s", err.Error())
}
Expand All @@ -814,7 +815,7 @@ func (s *managerServerV1) KeepAlive(stream managerv1.Manager_KeepAliveServer) er

if err := s.cache.Delete(
context.TODO(),
cache.MakeSchedulerCacheKey(clusterID, hostname, ip),
pkgredis.MakeSchedulerKeyInManager(clusterID, hostname, ip),
); err != nil {
log.Warnf("refresh keepalive status failed: %s", err.Error())
}
Expand All @@ -834,7 +835,7 @@ func (s *managerServerV1) KeepAlive(stream managerv1.Manager_KeepAliveServer) er

if err := s.cache.Delete(
context.TODO(),
cache.MakeSeedPeerCacheKey(clusterID, hostname, ip),
pkgredis.MakeSeedPeerKeyInManager(clusterID, hostname, ip),
); err != nil {
log.Warnf("refresh keepalive status failed: %s", err.Error())
}
Expand Down
23 changes: 12 additions & 11 deletions manager/rpcserver/manager_server_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"d7y.io/dragonfly/v2/manager/types"
pkgcache "d7y.io/dragonfly/v2/pkg/cache"
"d7y.io/dragonfly/v2/pkg/objectstorage"
pkgredis "d7y.io/dragonfly/v2/pkg/redis"
"d7y.io/dragonfly/v2/pkg/slices"
)

Expand Down Expand Up @@ -93,7 +94,7 @@ func newManagerServerV2(
// Get SeedPeer and SeedPeer cluster configuration.
func (s *managerServerV2) GetSeedPeer(ctx context.Context, req *managerv2.GetSeedPeerRequest) (*managerv2.SeedPeer, error) {
log := logger.WithHostnameAndIP(req.Hostname, req.Ip)
cacheKey := cache.MakeSeedPeerCacheKey(uint(req.SeedPeerClusterId), req.Hostname, req.Ip)
cacheKey := pkgredis.MakeSeedPeerKeyInManager(uint(req.SeedPeerClusterId), req.Hostname, req.Ip)

// Cache hit.
var pbSeedPeer managerv2.SeedPeer
Expand Down Expand Up @@ -209,7 +210,7 @@ func (s *managerServerV2) UpdateSeedPeer(ctx context.Context, req *managerv2.Upd

if err := s.cache.Delete(
ctx,
cache.MakeSeedPeerCacheKey(seedPeer.SeedPeerClusterID, seedPeer.Hostname, seedPeer.IP),
pkgredis.MakeSeedPeerKeyInManager(seedPeer.SeedPeerClusterID, seedPeer.Hostname, seedPeer.IP),
); err != nil {
log.Warn(err)
}
Expand Down Expand Up @@ -265,7 +266,7 @@ func (s *managerServerV2) createSeedPeer(ctx context.Context, req *managerv2.Upd
// Get Scheduler and Scheduler cluster configuration.
func (s *managerServerV2) GetScheduler(ctx context.Context, req *managerv2.GetSchedulerRequest) (*managerv2.Scheduler, error) {
log := logger.WithHostnameAndIP(req.Hostname, req.Ip)
cacheKey := cache.MakeSchedulerCacheKey(uint(req.SchedulerClusterId), req.Hostname, req.Ip)
cacheKey := pkgredis.MakeSchedulerKeyInManager(uint(req.SchedulerClusterId), req.Hostname, req.Ip)

// Cache hit.
var pbScheduler managerv2.Scheduler
Expand Down Expand Up @@ -404,7 +405,7 @@ func (s *managerServerV2) UpdateScheduler(ctx context.Context, req *managerv2.Up

if err := s.cache.Delete(
ctx,
cache.MakeSchedulerCacheKey(scheduler.SchedulerClusterID, scheduler.Hostname, scheduler.IP),
pkgredis.MakeSchedulerKeyInManager(scheduler.SchedulerClusterID, scheduler.Hostname, scheduler.IP),
); err != nil {
log.Warn(err)
}
Expand Down Expand Up @@ -484,7 +485,7 @@ func (s *managerServerV2) ListSchedulers(ctx context.Context, req *managerv2.Lis

// Cache hit.
var pbListSchedulersResponse managerv2.ListSchedulersResponse
cacheKey := cache.MakeSchedulersCacheKeyForPeer(req.Hostname, req.Ip)
cacheKey := pkgredis.MakeSchedulersKeyForPeerInManager(req.Hostname, req.Ip)

if err := s.cache.Get(ctx, cacheKey, &pbListSchedulersResponse); err != nil {
log.Warnf("%s cache miss because of %s", cacheKey, err.Error())
Expand Down Expand Up @@ -625,7 +626,7 @@ func (s *managerServerV2) ListBuckets(ctx context.Context, req *managerv2.ListBu

log := logger.WithHostnameAndIP(req.Hostname, req.Ip)
var pbListBucketsResponse managerv2.ListBucketsResponse
cacheKey := cache.MakeBucketCacheKey(s.objectStorageConfig.Name)
cacheKey := pkgredis.MakeBucketKeyInManager(s.objectStorageConfig.Name)

// Cache hit.
if err := s.cache.Get(ctx, cacheKey, &pbListBucketsResponse); err != nil {
Expand Down Expand Up @@ -667,7 +668,7 @@ func (s *managerServerV2) ListApplications(ctx context.Context, req *managerv2.L

// Cache hit.
var pbListApplicationsResponse managerv2.ListApplicationsResponse
cacheKey := cache.MakeApplicationsCacheKey()
cacheKey := pkgredis.MakeApplicationsKeyInManager()
if err := s.cache.Get(ctx, cacheKey, &pbListApplicationsResponse); err != nil {
log.Warnf("%s cache miss because of %s", cacheKey, err.Error())
} else {
Expand Down Expand Up @@ -770,7 +771,7 @@ func (s *managerServerV2) KeepAlive(stream managerv2.Manager_KeepAliveServer) er

if err := s.cache.Delete(
context.TODO(),
cache.MakeSchedulerCacheKey(clusterID, hostname, ip),
pkgredis.MakeSchedulerKeyInManager(clusterID, hostname, ip),
); err != nil {
log.Warnf("refresh keepalive status failed: %s", err.Error())
}
Expand All @@ -790,7 +791,7 @@ func (s *managerServerV2) KeepAlive(stream managerv2.Manager_KeepAliveServer) er

if err := s.cache.Delete(
context.TODO(),
cache.MakeSeedPeerCacheKey(clusterID, hostname, ip),
pkgredis.MakeSeedPeerKeyInManager(clusterID, hostname, ip),
); err != nil {
log.Warnf("refresh keepalive status failed: %s", err.Error())
}
Expand All @@ -813,7 +814,7 @@ func (s *managerServerV2) KeepAlive(stream managerv2.Manager_KeepAliveServer) er

if err := s.cache.Delete(
context.TODO(),
cache.MakeSchedulerCacheKey(clusterID, hostname, ip),
pkgredis.MakeSchedulerKeyInManager(clusterID, hostname, ip),
); err != nil {
log.Warnf("refresh keepalive status failed: %s", err.Error())
}
Expand All @@ -833,7 +834,7 @@ func (s *managerServerV2) KeepAlive(stream managerv2.Manager_KeepAliveServer) er

if err := s.cache.Delete(
context.TODO(),
cache.MakeSeedPeerCacheKey(clusterID, hostname, ip),
pkgredis.MakeSeedPeerKeyInManager(clusterID, hostname, ip),
); err != nil {
log.Warnf("refresh keepalive status failed: %s", err.Error())
}
Expand Down
4 changes: 2 additions & 2 deletions manager/service/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ import (
"context"
"strings"

"d7y.io/dragonfly/v2/manager/cache"
pkgredis "d7y.io/dragonfly/v2/pkg/redis"
)

func (s *service) GetPeers(ctx context.Context) ([]string, error) {
rawKeys, err := s.rdb.Keys(ctx, cache.MakeCacheKey(cache.PeerNamespace, "*")).Result()
rawKeys, err := s.rdb.Keys(ctx, pkgredis.MakeKeyInManager(pkgredis.PeersNamespace, "*")).Result()
if err != nil {
return nil, err
}
Expand Down