Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Central processor registration #229

Merged
merged 2 commits into from
Oct 13, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ collect: imports fields go-generate create-docs notice
imports:
@mkdir -p include
@mkdir -p processor
@python ${GOPATH}/src/${BEAT_PATH}/script/generate_imports.py ${BEAT_PATH} > include/list.go

.PHONY: fields
fields:
Expand Down
3 changes: 0 additions & 3 deletions beater/beater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@ import (
"net/http/httptest"
"testing"

// make sure processors are loaded
_ "github.com/elastic/apm-server/include"

"github.com/elastic/apm-server/tests"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/monitoring"
Expand Down
57 changes: 39 additions & 18 deletions beater/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,28 @@ import (

"crypto/subtle"

err "github.com/elastic/apm-server/processor/error"
"github.com/elastic/apm-server/processor/healthcheck"
"github.com/elastic/apm-server/processor/transaction"
"github.com/elastic/beats/libbeat/monitoring"
)

type processorHandler func(processor.Processor, Config, reporter) http.Handler
const (
BackendTransactionsURL = "/v1/transactions"
FrontendTransactionsURL = "/v1/client-side/transactions"
BackendErrorsURL = "/v1/errors"
FrontendErrorsURL = "/v1/client-side/errors"
HealthCheckURL = "/healthcheck"
)

type ProcessorFactory func() processor.Processor

type ProcessorHandler func(ProcessorFactory, Config, reporter) http.Handler

type routeMapping struct {
ProcessorHandler
ProcessorFactory
}

var (
serverMetrics = monitoring.Default.NewRegistry("apm-server.server")
Expand All @@ -32,38 +50,39 @@ var (
errForbidden = errors.New("forbidden request")
errPOSTRequestOnly = errors.New("only POST requests are supported")

handlerMap = map[int]processorHandler{
processor.Backend: backendHandler,
processor.Frontend: frontendHandler,
processor.HealthCheck: healthCheckHandler,
Routes = map[string]routeMapping{
BackendTransactionsURL: {backendHandler, transaction.NewProcessor},
FrontendTransactionsURL: {frontendHandler, transaction.NewProcessor},
BackendErrorsURL: {backendHandler, err.NewProcessor},
FrontendErrorsURL: {frontendHandler, err.NewProcessor},
HealthCheckURL: {healthCheckHandler, healthcheck.NewProcessor},
}
)

func newMuxer(config Config, report reporter) *http.ServeMux {
mux := http.NewServeMux()

for path, p := range processor.Registry.Processors() {
handler := handlerMap[p.Type()]
for path, mapping := range Routes {
logp.Info("Path %s added to request handler", path)
mux.Handle(path, handler(p, config, report))
mux.Handle(path, mapping.ProcessorHandler(mapping.ProcessorFactory, config, report))
}

return mux
}

func backendHandler(p processor.Processor, config Config, report reporter) http.Handler {
func backendHandler(pf ProcessorFactory, config Config, report reporter) http.Handler {
return logHandler(
authHandler(config.SecretToken,
processRequestHandler(p, config, report)))
processRequestHandler(pf, config, report)))
}

func frontendHandler(p processor.Processor, config Config, report reporter) http.Handler {
func frontendHandler(pf ProcessorFactory, config Config, report reporter) http.Handler {
return logHandler(
frontendSwitchHandler(config.EnableFrontend,
processRequestHandler(p, config, report)))
processRequestHandler(pf, config, report)))
}

func healthCheckHandler(_ processor.Processor, _ Config, _ reporter) http.Handler {
func healthCheckHandler(_ ProcessorFactory, _ Config, _ reporter) http.Handler {
return logHandler(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
sendStatus(w, r, 200, nil)
Expand Down Expand Up @@ -114,14 +133,16 @@ func isAuthorized(req *http.Request, secretToken string) bool {
return subtle.ConstantTimeCompare([]byte(parts[1]), []byte(secretToken)) == 1
}

func processRequestHandler(p processor.Processor, config Config, report reporter) http.Handler {
func processRequestHandler(pf ProcessorFactory, config Config, report reporter) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
code, err := processRequest(r, p, config.MaxUnzippedSize, report)
code, err := processRequest(r, pf, config.MaxUnzippedSize, report)
sendStatus(w, r, code, err)
})
}

func processRequest(r *http.Request, p processor.Processor, maxSize int64, report reporter) (int, error) {
func processRequest(r *http.Request, pf ProcessorFactory, maxSize int64, report reporter) (int, error) {

processor := pf()

if r.Method != "POST" {
return 405, errPOSTRequestOnly
Expand All @@ -142,11 +163,11 @@ func processRequest(r *http.Request, p processor.Processor, maxSize int64, repor

}

if err = p.Validate(buf); err != nil {
if err = processor.Validate(buf); err != nil {
return 400, err
}

list, err := p.Transform(buf)
list, err := processor.Transform(buf)
if err != nil {
return 400, err
}
Expand Down
10 changes: 4 additions & 6 deletions beater/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ import (
"github.com/kabukky/httpscerts"
"github.com/stretchr/testify/assert"

"github.com/elastic/apm-server/processor/healthcheck"
"github.com/elastic/apm-server/processor/transaction"
"github.com/elastic/apm-server/tests"
"github.com/elastic/beats/libbeat/beat"
)
Expand Down Expand Up @@ -56,7 +54,7 @@ func TestServerHealth(t *testing.T) {
apm, teardown := setupServer(t, noSSL)
defer teardown()

req, err := http.NewRequest("GET", healthcheck.Endpoint, nil)
req, err := http.NewRequest("GET", HealthCheckURL, nil)
if err != nil {
t.Fatalf("Failed to create test request object: %v", err)
}
Expand All @@ -70,7 +68,7 @@ func TestServerFrontendSwitch(t *testing.T) {
apm, teardown := setupServer(t, noSSL)
defer teardown()

req, _ := http.NewRequest("POST", transaction.FrontendEndpoint, bytes.NewReader(testData))
req, _ := http.NewRequest("POST", FrontendTransactionsURL, bytes.NewReader(testData))

rec := httptest.NewRecorder()
apm.Handler.ServeHTTP(rec, req)
Expand Down Expand Up @@ -182,7 +180,7 @@ func withSSL(t *testing.T, domain string) *SSLConfig {
}

func makeTestRequest(t *testing.T) *http.Request {
req, err := http.NewRequest("POST", transaction.BackendEndpoint, bytes.NewReader(testData))
req, err := http.NewRequest("POST", BackendTransactionsURL, bytes.NewReader(testData))
if err != nil {
t.Fatalf("Failed to create test request object: %v", err)
}
Expand All @@ -195,7 +193,7 @@ func postTestRequest(t *testing.T, apm *http.Server, client *http.Client, schema
client = http.DefaultClient
}

addr := fmt.Sprintf("%s://%s%s", schema, apm.Addr, transaction.BackendEndpoint)
addr := fmt.Sprintf("%s://%s%s", schema, apm.Addr, BackendTransactionsURL)
return client.Post(addr, "application/json", bytes.NewReader(testData))
}

Expand Down
13 changes: 0 additions & 13 deletions include/list.go

This file was deleted.

2 changes: 0 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
"os"

"github.com/elastic/apm-server/cmd"

_ "github.com/elastic/apm-server/include"
)

func main() {
Expand Down
4 changes: 2 additions & 2 deletions processor/error/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
)

func BenchmarkEventWithFileLoading(b *testing.B) {
processor := NewBackendProcessor()
processor := NewProcessor()
for i := 0; i < b.N; i++ {
data, _ := tests.LoadValidData("error")
err := processor.Validate(data)
Expand All @@ -20,7 +20,7 @@ func BenchmarkEventWithFileLoading(b *testing.B) {
}

func BenchmarkEventFileLoadingOnce(b *testing.B) {
processor := NewBackendProcessor()
processor := NewProcessor()
data, _ := tests.LoadValidData("error")
for i := 0; i < b.N; i++ {
err := processor.Validate(data)
Expand Down
4 changes: 2 additions & 2 deletions processor/error/package_tests/fields_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func TestFields(t *testing.T) {
"./../../../_meta/fields.common.yml",
"./../_meta/fields.yml",
}
tests.TestEventAttrsDocumentedInFields(t, fieldsPaths, er.NewBackendProcessor)
tests.TestEventAttrsDocumentedInFields(t, fieldsPaths, er.NewProcessor)

notInEvent := set.New(
"context.db.instance",
Expand All @@ -26,5 +26,5 @@ func TestFields(t *testing.T) {
"error id icon",
"view errors",
)
tests.TestDocumentedFieldsInEvent(t, fieldsPaths, er.NewBackendProcessor, notInEvent)
tests.TestDocumentedFieldsInEvent(t, fieldsPaths, er.NewProcessor, notInEvent)
}
4 changes: 2 additions & 2 deletions processor/error/package_tests/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ func TestProcessorOK(t *testing.T) {
{Name: "TestProcessErrorMininmalPayloadLog", Path: "tests/data/valid/error/minimal_payload_log.json"},
{Name: "TestProcessErrorFull", Path: "tests/data/valid/error/payload.json"},
}
tests.TestProcessRequests(t, er.NewBackendProcessor(), requestInfo)
tests.TestProcessRequests(t, er.NewProcessor(), requestInfo)
}

// ensure invalid documents fail the json schema validation already
func TestProcessorFailedValidation(t *testing.T) {
data, err := tests.LoadInvalidData("error")
assert.Nil(t, err)
err = er.NewBackendProcessor().Validate(data)
err = er.NewProcessor().Validate(data)
assert.NotNil(t, err)
}
24 changes: 4 additions & 20 deletions processor/error/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,6 @@ import (
"github.com/elastic/beats/libbeat/monitoring"
)

func init() {
pr.Registry.AddProcessor(BackendEndpoint, NewBackendProcessor())
pr.Registry.AddProcessor(FrontendEndpoint, NewFrontendProcessor())
}

var (
errorMetrics = monitoring.Default.NewRegistry("apm-server.processor.error")
validationCount = monitoring.NewInt(errorMetrics, "validation.count")
Expand All @@ -23,24 +18,17 @@ var (
)

const (
BackendEndpoint = "/v1/errors"
FrontendEndpoint = "/v1/client-side/errors"
processorName = "error"
processorName = "error"
)

func NewBackendProcessor() pr.Processor {
schema := pr.CreateSchema(errorSchema, processorName)
return &processor{schema, pr.Backend}
}
var schema = pr.CreateSchema(errorSchema, processorName)

func NewFrontendProcessor() pr.Processor {
schema := pr.CreateSchema(errorSchema, processorName)
return &processor{schema, pr.Frontend}
func NewProcessor() pr.Processor {
return &processor{schema}
}

type processor struct {
schema *jsonschema.Schema
typ int
}

func (p *processor) Validate(buf []byte) error {
Expand All @@ -66,7 +54,3 @@ func (p *processor) Transform(buf []byte) ([]beat.Event, error) {
func (p *processor) Name() string {
return processorName
}

func (p *processor) Type() int {
return p.typ
}
21 changes: 4 additions & 17 deletions processor/error/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,9 @@ import (
)

func TestImplementProcessorInterface(t *testing.T) {
constructors := []func() pr.Processor{NewFrontendProcessor, NewBackendProcessor}
for _, constructor := range constructors {
p := constructor()
assert.NotNil(t, p)
_, ok := p.(pr.Processor)
assert.True(t, ok)
assert.IsType(t, &processor{}, p)
}
}

func TestAddProcessorToRegistryOnInit(t *testing.T) {
p := pr.Registry.Processor(BackendEndpoint)
p := NewProcessor()
assert.NotNil(t, p)
assert.Equal(t, pr.Backend, p.Type())

p2 := pr.Registry.Processor(FrontendEndpoint)
assert.NotNil(t, p2)
assert.Equal(t, pr.Frontend, p2.Type())
_, ok := p.(pr.Processor)
assert.True(t, ok)
assert.IsType(t, &processor{}, p)
}
9 changes: 0 additions & 9 deletions processor/healthcheck/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,7 @@ import (
"github.com/elastic/beats/libbeat/beat"
)

func init() {
pr.Registry.AddProcessor(Endpoint, NewProcessor())
}

const (
Endpoint = "/healthcheck"
processorName = "healthcheck"
)

Expand All @@ -31,7 +26,3 @@ func (p *processor) Transform(buf []byte) ([]beat.Event, error) {
func (p *processor) Name() string {
return processorName
}

func (p *processor) Type() int {
return pr.HealthCheck
}
6 changes: 0 additions & 6 deletions processor/healthcheck/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,3 @@ func TestImplementProcessorInterface(t *testing.T) {
assert.True(t, ok)
assert.IsType(t, &processor{}, p)
}

func TestAddProcessorToRegistryOnInit(t *testing.T) {
p := pr.Registry.Processor(Endpoint)
assert.NotNil(t, p)
assert.Equal(t, pr.HealthCheck, p.Type())
}
1 change: 0 additions & 1 deletion processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ type Processor interface {
Validate([]byte) error
Transform([]byte) ([]beat.Event, error)
Name() string
Type() int
}

func CreateDoc(timestamp time.Time, docMappings []m.DocMapping) beat.Event {
Expand Down
Loading