-
Notifications
You must be signed in to change notification settings - Fork 86
/
start.go
147 lines (128 loc) · 4.45 KB
/
start.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package cmd
import (
"fmt"
"os"
"os/signal"
"strconv"
"github.com/rs/zerolog/log"
"github.com/spf13/cobra"
"github.com/bacalhau-project/bacalhau/apps/job-info-consumer/consumer/pkg"
"github.com/bacalhau-project/bacalhau/pkg/libp2p"
"github.com/bacalhau-project/bacalhau/pkg/system"
"github.com/bacalhau-project/bacalhau/pkg/telemetry"
"github.com/bacalhau-project/bacalhau/pkg/util"
)
type StartOptions struct {
postgres pkg.PostgresDatastoreParams
swarmPort int
peerConnect string
}
func NewStartOptions() *StartOptions {
return &StartOptions{
postgres: pkg.PostgresDatastoreParams{
Host: util.GetEnv("POSTGRES_HOST", "127.0.0.1"), //nolint:gomnd
Port: util.GetEnvAs[int]("POSTGRES_PORT", 5432, strconv.Atoi), //nolint:gomnd
Database: util.GetEnv("POSTGRES_DB", "bacalhau"), //nolint:gomnd
User: util.GetEnv("POSTGRES_USER", "postgres"), //nolint:gomnd
Password: util.GetEnv("POSTGRES_PASSWORD", ""), //nolint:gomnd
SSLMode: util.GetEnv("POSTGRES_SSL_MODE", "disable"), //nolint:gomnd
AutoMigrate: util.GetEnvAs[bool]("POSTGRES_AUTO_MIGRATE", false, strconv.ParseBool), //nolint:gomnd
},
swarmPort: util.GetEnvAs[int]("SWARM_PORT", 1236, strconv.Atoi), //nolint:gomnd
peerConnect: util.GetEnv("BACALHAU_PEER_CONNECT", ""),
}
}
func newStartCmd() *cobra.Command {
opts := NewStartOptions()
cmd := &cobra.Command{
Use: "start",
Short: "Start the bacalhau job info consumer",
RunE: func(cmd *cobra.Command, _ []string) error {
return start(cmd, opts)
},
}
cmd.PersistentFlags().StringVar(
&opts.postgres.Host, "postgres-host", opts.postgres.Host,
`The host for the postgres server.`,
)
cmd.PersistentFlags().IntVar(
&opts.postgres.Port, "postgres-port", opts.postgres.Port,
`The port for the postgres server.`,
)
cmd.PersistentFlags().StringVar(
&opts.postgres.Database, "postgres-database", opts.postgres.Database,
`The database for the postgres server.`,
)
cmd.PersistentFlags().StringVar(
&opts.postgres.User, "postgres-user", opts.postgres.User,
`The user for the postgres server.`,
)
cmd.PersistentFlags().StringVar(
&opts.postgres.Password, "postgres-password", opts.postgres.Password,
`The password for the postgres server.`,
)
cmd.PersistentFlags().StringVar(
&opts.postgres.SSLMode, "postgres-ssl-mode", opts.postgres.Password,
`The ssl mode for the postgres server.`,
)
cmd.PersistentFlags().BoolVar(
&opts.postgres.AutoMigrate, "postgres-auto-migrate", opts.postgres.AutoMigrate,
`Should auto migrate the database schema.`,
)
cmd.PersistentFlags().IntVar(
&opts.swarmPort, "swarm-port", opts.swarmPort,
`The port to listen on for swarm connections and GossipSub messages.`,
)
cmd.PersistentFlags().StringVar(
&opts.peerConnect, "peer", opts.peerConnect,
`The libp2p multiaddress to connect to.`,
)
return cmd
}
const libp2pPrivateKeyNumBits = 2048
func start(cmd *cobra.Command, options *StartOptions) error {
// Cleanup manager ensures that resources are freed before exiting:
cm := system.NewCleanupManager()
cm.RegisterCallback(telemetry.Cleanup)
defer cm.Cleanup(cmd.Context())
ctx := cmd.Context()
// Context ensures main goroutine waits until killed with ctrl+c:
ctx, cancel := signal.NotifyContext(ctx, os.Interrupt)
defer cancel()
ctx, rootSpan := system.NewRootSpan(ctx, system.GetTracer(), "bacalhau.consumer")
defer rootSpan.End()
peers, err := getPeers(options.peerConnect)
if err != nil {
return err
}
log.Ctx(ctx).Debug().Msgf("libp2p connecting to: %s", peers)
prvKey, err := libp2p.GeneratePrivateKey(libp2pPrivateKeyNumBits)
if err != nil {
return fmt.Errorf("generating libp2p key pair: %w", err)
}
libp2pHost, err := libp2p.NewHost(options.swarmPort, prvKey)
if err != nil {
return fmt.Errorf("error creating libp2p host: %w", err)
}
application, err := pkg.NewApplication(pkg.ApplicationParams{
PostgresDatastoreParams: options.postgres,
Libp2pHost: libp2pHost,
})
if err != nil {
return err
}
cm.RegisterCallbackWithContext(application.Stop)
// Start transport layer
err = libp2p.ConnectToPeersContinuously(ctx, cm, libp2pHost, peers)
if err != nil {
return err
}
// Start application
err = application.Start(ctx)
log.Info().Msg("Started")
if err != nil {
return err
}
<-ctx.Done() // block until killed
return nil
}