This repository has been archived by the owner on Dec 26, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
run.go
132 lines (115 loc) · 3.38 KB
/
run.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
// SPDX-License-Identifier: GPL-3.0
package main
import (
"errors"
"fmt"
"strings"
"time"
tpipfs "github.com/comrade-coop/trusted-pods/pkg/ipfs"
tpk8s "github.com/comrade-coop/trusted-pods/pkg/kubernetes"
"github.com/multiformats/go-multiaddr"
"github.com/spf13/cobra"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/watch"
"sigs.k8s.io/controller-runtime/pkg/client"
)
var ipfsApi string
var kubeConfig string
var runCmd = &cobra.Command{
Use: "run",
Short: "Start observing Kubernetes services and registering them in ipfs",
RunE: func(cmd *cobra.Command, args []string) error {
ipfs, ipfsMultiaddr, err := tpipfs.GetIpfsClient(ipfsApi)
if err != nil {
return err
}
for {
time.Sleep(time.Second * 1)
_, err := ipfs.Key().Self(cmd.Context())
if err == nil {
break
}
fmt.Printf("IPFS is not started yet, %s", err)
}
ipfsp2p := tpipfs.NewP2pApi(ipfs, ipfsMultiaddr)
scheme, err := tpk8s.GetScheme()
if err != nil {
return err
}
config, err := tpk8s.GetConfig(kubeConfig)
if err != nil {
return err
}
cl, err := client.NewWithWatch(config, client.Options{
Scheme: scheme,
})
if err != nil {
return err
}
services := &corev1.ServiceList{}
sub, err := cl.Watch(cmd.Context(), services, client.HasLabels{tpk8s.LabelIpfsP2P})
if err != nil {
return err
}
defer func() { sub.Stop() }()
fmt.Print("Now watching the list of services.\n")
Loop:
for {
select {
case e := <-sub.ResultChan():
if e == (watch.Event{}) || e.Type == watch.Error {
fmt.Printf("Watch error: %v", e)
sub.Stop()
sub, err = cl.Watch(cmd.Context(), services, client.HasLabels{tpk8s.LabelIpfsP2P})
if err != nil {
return err
}
}
if service, ok := e.Object.(*corev1.Service); ok {
err := handleEvent(ipfsp2p, e.Type, service)
if err != nil {
if e.Type == watch.Added {
fmt.Fprintf(cmd.ErrOrStderr(), "Service %s: %s\n", service.Name, err)
}
continue Loop
}
}
case <-cmd.Context().Done():
break Loop
}
}
return nil
},
}
func handleEvent(ipfsp2p *tpipfs.P2pApi, eType watch.EventType, service *corev1.Service) error {
if len(service.Spec.Ports) != 1 {
return errors.New("Expected exactly one exposed port")
}
protocol := service.ObjectMeta.Annotations[tpk8s.LabelIpfsP2P]
port := service.Spec.Ports[0].Port
portProtocol := strings.ToLower(string(service.Spec.Ports[0].Protocol))
dns := fmt.Sprintf("%s.%s.svc.cluster.local", service.Name, service.Namespace)
endpoint, err := multiaddr.NewMultiaddr(fmt.Sprintf("/dns4/%s/%s/%d", dns, portProtocol, port))
if err != nil {
return err
}
if eType != watch.Deleted {
if eType == watch.Added {
fmt.Printf("Forwarding %s to %s\n", protocol, endpoint)
}
_, err := ipfsp2p.ExposeEndpoint(protocol, endpoint, tpipfs.ReturnExistingEndpoint)
return err
} else {
fmt.Printf("Dropping forward for %s to %s\n", protocol, endpoint)
endpoint, err := ipfsp2p.ExposeEndpoint(protocol, endpoint, tpipfs.ReturnExistingEndpoint)
if err != nil {
return err
}
err = endpoint.Close()
return err
}
}
func init() {
runCmd.Flags().StringVar(&kubeConfig, "kubeconfig", "", "absolute path to the kubeconfig file (leave blank to use in-cluster config)")
runCmd.Flags().StringVar(&ipfsApi, "ipfs", "", "multiaddr where the ipfs/kubo api can be accessed (leave blank to use the daemon running in IPFS_PATH)")
}