-
Notifications
You must be signed in to change notification settings - Fork 5k
/
forward.go
103 lines (90 loc) · 2.74 KB
/
forward.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package headless
import (
"context"
"fmt"
"sync"
"time"
"github.com/go-redis/redis/v8"
"k8s.io/client-go/tools/clientcmd"
"github.com/argoproj/argo-cd/v2/reposerver/apiclient"
repoapiclient "github.com/argoproj/argo-cd/v2/reposerver/apiclient"
"github.com/argoproj/argo-cd/v2/util/cache"
"github.com/argoproj/argo-cd/v2/util/io"
kubeutil "github.com/argoproj/argo-cd/v2/util/kube"
)
type forwardCacheClient struct {
namespace string
context string
init sync.Once
client cache.CacheClient
err error
}
func (c *forwardCacheClient) doLazy(action func(client cache.CacheClient) error) error {
c.init.Do(func() {
overrides := clientcmd.ConfigOverrides{
CurrentContext: c.context,
}
redisPort, err := kubeutil.PortForward(6379, c.namespace, &overrides,
"app.kubernetes.io/name=argocd-redis-ha-haproxy", "app.kubernetes.io/name=argocd-redis")
if err != nil {
c.err = err
return
}
redisClient := redis.NewClient(&redis.Options{Addr: fmt.Sprintf("localhost:%d", redisPort)})
c.client = cache.NewRedisCache(redisClient, time.Hour)
})
if c.err != nil {
return c.err
}
return action(c.client)
}
func (c *forwardCacheClient) Set(item *cache.Item) error {
return c.doLazy(func(client cache.CacheClient) error {
return client.Set(item)
})
}
func (c *forwardCacheClient) Get(key string, obj interface{}) error {
return c.doLazy(func(client cache.CacheClient) error {
return client.Get(key, obj)
})
}
func (c *forwardCacheClient) Delete(key string) error {
return c.doLazy(func(client cache.CacheClient) error {
return client.Delete(key)
})
}
func (c *forwardCacheClient) OnUpdated(ctx context.Context, key string, callback func() error) error {
return c.doLazy(func(client cache.CacheClient) error {
return client.OnUpdated(ctx, key, callback)
})
}
func (c *forwardCacheClient) NotifyUpdated(key string) error {
return c.doLazy(func(client cache.CacheClient) error {
return client.NotifyUpdated(key)
})
}
type forwardRepoClientset struct {
namespace string
context string
init sync.Once
repoClientset repoapiclient.Clientset
err error
}
func (c *forwardRepoClientset) NewRepoServerClient() (io.Closer, repoapiclient.RepoServerServiceClient, error) {
c.init.Do(func() {
overrides := clientcmd.ConfigOverrides{
CurrentContext: c.context,
}
repoServerPort, err := kubeutil.PortForward(8081, c.namespace, &overrides, "app.kubernetes.io/name=argocd-repo-server")
if err != nil {
c.err = err
return
}
c.repoClientset = apiclient.NewRepoServerClientset(fmt.Sprintf("localhost:%d", repoServerPort), 60, apiclient.TLSConfiguration{
DisableTLS: false, StrictValidation: false})
})
if c.err != nil {
return nil, nil, c.err
}
return c.repoClientset.NewRepoServerClient()
}