/
db.go
117 lines (107 loc) · 3.07 KB
/
db.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package main
import (
"fmt"
"net"
"net/http"
"os"
"os/signal"
"syscall"
pb "github.com/aayushrangwala/watermark-service/api/v1/pb/db"
"github.com/aayushrangwala/watermark-service/internal/database"
dbsvc "github.com/aayushrangwala/watermark-service/pkg/database"
"github.com/aayushrangwala/watermark-service/pkg/database/endpoints"
"github.com/aayushrangwala/watermark-service/pkg/database/transport"
"github.com/go-kit/kit/log"
kitgrpc "github.com/go-kit/kit/transport/grpc"
"github.com/oklog/oklog/pkg/group"
"google.golang.org/grpc"
)
const (
defaultHTTPPort = "8081"
defaultGRPCPort = "8082"
)
var (
logger log.Logger
httpAddr = net.JoinHostPort("localhost", envString("HTTP_PORT", defaultHTTPPort))
grpcAddr = net.JoinHostPort("localhost", envString("GRPC_PORT", defaultGRPCPort))
)
func main() {
var (
service = dbsvc.NewService()
eps = endpoints.NewEndpointSet(service)
httpHandler = transport.NewHTTPHandler(eps)
grpcServer = transport.NewGRPCServer(eps)
)
var g group.Group
{
// The HTTP listener mounts the Go kit HTTP handler we created.
httpListener, err := net.Listen("tcp", httpAddr)
if err != nil {
logger.Log("transport", "HTTP", "during", "Listen", "err", err)
os.Exit(1)
}
g.Add(func() error {
logger.Log("transport", "HTTP", "addr", httpAddr)
return http.Serve(httpListener, httpHandler)
}, func(error) {
httpListener.Close()
})
}
{
// The gRPC listener mounts the Go kit gRPC server we created.
grpcListener, err := net.Listen("tcp", grpcAddr)
if err != nil {
logger.Log("transport", "gRPC", "during", "Listen", "err", err)
os.Exit(1)
}
g.Add(func() error {
logger.Log("transport", "gRPC", "addr", grpcAddr)
// we add the Go Kit gRPC Interceptor to our gRPC service as it is used by
// the here demonstrated zipkin tracing middleware.
baseServer := grpc.NewServer(grpc.UnaryInterceptor(kitgrpc.Interceptor))
pb.RegisterDatabaseServer(baseServer, grpcServer)
return baseServer.Serve(grpcListener)
}, func(error) {
grpcListener.Close()
})
}
{
// This function just sits and waits for ctrl-C.
cancelInterrupt := make(chan struct{})
g.Add(func() error {
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
select {
case sig := <-c:
return fmt.Errorf("received signal %s", sig)
case <-cancelInterrupt:
return nil
}
}, func(error) {
close(cancelInterrupt)
})
}
logger.Log("exit", g.Run())
}
func envString(env, fallback string) string {
e := os.Getenv(env)
if e == "" {
return fallback
}
return e
}
func init() {
logger = log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
logger = log.With(logger, "ts", log.DefaultTimestampUTC)
db, err := database.Init(database.DefaultHost, database.DefaultPort, database.DefaultDBUser, database.DefaultDatabase,
database.DefaultPassword)
defer func() {
err := db.Close()
if err != nil {
logger.Log("ERROR::Failed to close the database connection ", err.Error())
}
}()
if err != nil {
logger.Log(fmt.Sprintf("FATAL: failed to load db with error: %s", err.Error()))
}
}