-
Notifications
You must be signed in to change notification settings - Fork 88
/
server.go
168 lines (146 loc) · 4.31 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
package search
import (
"context"
"fmt"
"io"
"log/slog"
"net/http"
"os"
"strings"
"github.com/bluesky-social/indigo/atproto/identity"
"github.com/carlmjohnson/versioninfo"
"github.com/labstack/echo/v4"
"github.com/labstack/echo/v4/middleware"
es "github.com/opensearch-project/opensearch-go/v2"
"github.com/prometheus/client_golang/prometheus/promhttp"
slogecho "github.com/samber/slog-echo"
"go.opentelemetry.io/contrib/instrumentation/github.com/labstack/echo/otelecho"
_ "net/http/pprof" // For pprof in the metrics server
)
type LastSeq struct {
ID uint `gorm:"primarykey"`
Seq int64
}
type ServerConfig struct {
Logger *slog.Logger
ProfileIndex string
PostIndex string
AtlantisAddresses []string
}
type Server struct {
escli *es.Client
postIndex string
profileIndex string
dir identity.Directory
echo *echo.Echo
logger *slog.Logger
Indexer *Indexer
}
func NewServer(escli *es.Client, dir identity.Directory, config ServerConfig) (*Server, error) {
logger := config.Logger
if logger == nil {
logger = slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
Level: slog.LevelInfo,
}))
}
serv := Server{
escli: escli,
postIndex: config.PostIndex,
profileIndex: config.ProfileIndex,
dir: dir,
logger: logger,
}
return &serv, nil
}
func (s *Server) EnsureIndices(ctx context.Context) error {
indices := []struct {
Name string
SchemaJSON string
}{
{Name: s.postIndex, SchemaJSON: palomarPostSchemaJSON},
{Name: s.profileIndex, SchemaJSON: palomarProfileSchemaJSON},
}
for _, idx := range indices {
resp, err := s.escli.Indices.Exists([]string{idx.Name})
if err != nil {
return err
}
defer resp.Body.Close()
io.ReadAll(resp.Body)
if resp.IsError() && resp.StatusCode != 404 {
return fmt.Errorf("failed to check index existence")
}
if resp.StatusCode == 404 {
s.logger.Warn("creating opensearch index", "index", idx.Name)
if len(idx.SchemaJSON) < 2 {
return fmt.Errorf("empty schema file (go:embed failed)")
}
buf := strings.NewReader(idx.SchemaJSON)
resp, err := s.escli.Indices.Create(
idx.Name,
s.escli.Indices.Create.WithBody(buf))
if err != nil {
return err
}
defer resp.Body.Close()
errBytes, err := io.ReadAll(resp.Body)
if resp.IsError() {
s.logger.Error("failed to create index", "index", idx.Name, "response", string(errBytes))
return fmt.Errorf("failed to create index")
}
if err != nil {
return err
}
}
}
return nil
}
type HealthStatus struct {
Service string `json:"service,const=palomar"`
Status string `json:"status"`
Version string `json:"version"`
Message string `json:"msg,omitempty"`
}
func (a *Server) handleHealthCheck(c echo.Context) error {
if a.Indexer != nil {
if err := a.Indexer.db.Exec("SELECT 1").Error; err != nil {
a.logger.Error("healthcheck can't connect to database", "err", err)
return c.JSON(500, HealthStatus{Status: "error", Version: versioninfo.Short(), Message: "can't connect to database"})
}
}
return c.JSON(200, HealthStatus{Status: "ok", Version: versioninfo.Short()})
}
func (s *Server) RunAPI(listen string) error {
s.logger.Info("Configuring HTTP server")
e := echo.New()
e.HideBanner = true
e.Use(slogecho.New(s.logger))
e.Use(middleware.Recover())
e.Use(MetricsMiddleware)
e.Use(middleware.BodyLimit("64M"))
e.Use(otelecho.Middleware("palomar"))
e.HTTPErrorHandler = func(err error, ctx echo.Context) {
code := 500
if he, ok := err.(*echo.HTTPError); ok {
code = he.Code
}
s.logger.Warn("HTTP request error", "statusCode", code, "path", ctx.Path(), "err", err)
ctx.Response().WriteHeader(code)
}
e.Use(middleware.CORS())
e.GET("/", s.handleHealthCheck)
e.GET("/_health", s.handleHealthCheck)
e.GET("/metrics", echo.WrapHandler(promhttp.Handler()))
e.GET("/xrpc/app.bsky.unspecced.searchPostsSkeleton", s.handleSearchPostsSkeleton)
e.GET("/xrpc/app.bsky.unspecced.searchActorsSkeleton", s.handleSearchActorsSkeleton)
s.echo = e
s.logger.Info("starting search API daemon", "bind", listen)
return s.echo.Start(listen)
}
func (s *Server) RunMetrics(listen string) error {
http.Handle("/metrics", promhttp.Handler())
return http.ListenAndServe(listen, nil)
}
func (s *Server) Shutdown(ctx context.Context) error {
return s.echo.Shutdown(ctx)
}