Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add API && Plugin framework registry #5

Merged
merged 25 commits into from
Dec 3, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 1 addition & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,4 @@ module github.com/apache/skywalking-satellite

go 1.14

require (
github.com/sirupsen/logrus v1.7.0
)
require github.com/sirupsen/logrus v1.7.0
5 changes: 1 addition & 4 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.7.0 h1:ShrD1U9pZB12TX0cVy0DtePoCH97K8EtX+mg7ZARUtM=
github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037 h1:YyJpGZS1sBuBCzLAR1VEpK193GlqGZbnPFnPV/5Rsb4=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
86 changes: 86 additions & 0 deletions internal/pkg/api/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package api

import (
"fmt"
"time"
)

// Event that implement this interface would be allowed to transmit in the Satellite.
type Event interface {
// Name is a identify to distinguish different events.
EvanLjp marked this conversation as resolved.
Show resolved Hide resolved
Name() string
EvanLjp marked this conversation as resolved.
Show resolved Hide resolved

// Meta is a pair of key and value to record meta data, such as labels.
Meta() map[string]string

// Data returns the wrappered data.
EvanLjp marked this conversation as resolved.
Show resolved Hide resolved
Data() interface{}

// Time returns the event time.
Time() time.Time

// IsOutput indicates that the event can exist in the output context when the return value is true.
IsOutput() bool
}

// InputEvent is used in Gatherer to bridge Queue.
type InputEvent interface {
Event

// ToBytes serialize the event to a byte array.
ToBytes() []byte

// FromBytes deserialize the byte array to an event.
FromBytes(bytes []byte) InputEvent
}

// BatchOutputEvents is batch events to output.
type BatchOutputEvents struct {
// BatchEvents grouped by event name.
BatchEvents map[string][]Event

// The start offset of the batch.
StartOffset int64

// The end offset of the batch.
EndOffset int64
}

// OutputEventContext is a container to store the output context.
type OutputEventContext struct {
context map[string]Event
}

// Put puts the incoming event into the context when the event allows to output.
func (c *OutputEventContext) Put(event Event) {
if event.IsOutput() {
c.context[event.Name()] = event
}
}

// Get returns a event in the context. When the eventName does not exist, a error would be returned.
func (c *OutputEventContext) Get(eventName string) (Event, error) {
e, ok := c.context[eventName]
if !ok {
err := fmt.Errorf("cannot find the event name in OutputEventContext : %s", eventName)
return nil, err
}
return e, nil
}
26 changes: 26 additions & 0 deletions internal/pkg/api/forwarder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package api

// Forwarder is a plugin interface, that defines new forwarders.
type Forwarder interface {
ComponentPlugin

// Output the batch events to the external output services, such as Kafka MQ and SkyWalking OAP cluster.
Output(batch BatchOutputEvents)
}
26 changes: 26 additions & 0 deletions internal/pkg/api/gatherer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package api

// Gatherer is a plugin interface, that defines new gatherers.
type Gatherer interface {
ComponentPlugin

// Gather gathers the data from the input.
Gather() (*InputEvent, error)
}
28 changes: 28 additions & 0 deletions internal/pkg/api/parser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package api

// Parser is a plugin interface, that defines new Parsers for Gatherer plugin.
type Parser interface {

// ParseBytes parse the byte buffer into events.
ParseBytes(bytes []byte) ([]InputEvent, error)

// ParseStr parse the string into events.
ParseStr(str string) ([]InputEvent, error)
}
74 changes: 74 additions & 0 deletions internal/pkg/api/plugin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package api

import "io"

// The following comments is to illustrate the relationship between different plugin interface in api package.
wu-sheng marked this conversation as resolved.
Show resolved Hide resolved
//
//
// Processors
// -----------------------------------------
// --------- --------- ----------- -----------
// | Gatherer | ==> | Queue | ==> | Processor | ==> ... ==> | Processor |
// | (Parser) | | Mem/File | ----------- -----------
// ---------- --------- || ||
// \/ \/
// ---------------------------------------
// | OutputEventContext |
// ---------------------------------------
// ||
// \/
// ------------------- -------------
// | BatchOutputEvents | <== | BatchBuffer |
// ------------------- -------------
// ||
// \/
// -------------------
// | Forwarder | ==> Kakfa/OAP
// -------------------
// 1. The Gatherer plugin would fetch or receive the input data.
EvanLjp marked this conversation as resolved.
Show resolved Hide resolved
// 2. The Parser plugin would parse the input data to InputEvent.
// If the event needs output, please tag it by the IsOutput
wu-sheng marked this conversation as resolved.
Show resolved Hide resolved
// method.
// 3. The Queue plugin would store the InputEvent. But different
wu-sheng marked this conversation as resolved.
Show resolved Hide resolved
// Queue would use different ways to store data, such as store
// bytes by serialization or keep original.
// 4. The Processor plugin would pull the event from the Queue and
EvanLjp marked this conversation as resolved.
Show resolved Hide resolved
// process the event to create a new event. Next, the event is
// passed to the next processor to do the same things until the
EvanLjp marked this conversation as resolved.
Show resolved Hide resolved
// whole processors are performed. Similar to above, if any
// events need output, please mark. The events would be stored
// in the OutputEventContext. When the processing is finished,
// the OutputEventContext would be passed to the BatchBuffer.
// 5. When BatchBuffer is full, the OutputEventContexts would be
EvanLjp marked this conversation as resolved.
Show resolved Hide resolved
// partitioned by event name and convert to BatchOutputEvents.
// 6. The Follower would be ordered to send each partition in
// BatchOutputEvents in different ways, such as different gRPC
// endpoints.

// ComponentPlugin is an interface to initialize the specific plugin.
type ComponentPlugin interface {
io.Closer

// Init initialize the specific plugin and would return error when the configuration is error.
Init() error

// Run triggers the specific plugin to work.
Run()
EvanLjp marked this conversation as resolved.
Show resolved Hide resolved
}
26 changes: 26 additions & 0 deletions internal/pkg/api/processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package api

// Processor is a plugin interface, that defines new pipeline processors.
type Processor interface {
ComponentPlugin

// Process produces a new event by processing incoming event.
Process(in Event) Event
}
45 changes: 45 additions & 0 deletions internal/pkg/api/queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package api

// Queue is a plugin interface, that defines new queues.
type Queue interface {
ComponentPlugin

// Publisher get the only publisher for the current queue.
Publisher() QueuePublisher

// Consumer get the only consumer for the current queue.
Consumer() QueueConsumer
}

// QueuePublisher is a plugin interface, that defines new queue publishers.
type QueuePublisher interface {
ComponentPlugin

// Enqueue push a inputEvent into the queue.
Enqueue(event *InputEvent) error
}

// QueueConsumer is a plugin interface, that defines new queue consumers.
type QueueConsumer interface {
ComponentPlugin

// Dequeue pop an event form the Queue. When the queue is empty, the method would be blocked.
Dequeue() (*InputEvent, error)
}
3 changes: 3 additions & 0 deletions internal/pkg/logger/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ func Init(opts ...Option) {
if f.timePattern == "" {
f.timePattern = defaultTimePattern
}
if !strings.Contains(f.logPattern, "\n") {
f.logPattern += "\n"
}
Log.SetFormatter(f)
})
}
Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/logger/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestFormatter_Format(t *testing.T) {
}{
{
name: "logWithEmptyFields",
want: []byte("[2020-12-12 12:12:12,012][trace][] - entry1"),
want: []byte("[2020-12-12 12:12:12,012][trace][] - entry1\n"),
args: args{
entry: func() *logrus.Entry {
entry := Log.WithTime(time.Date(2020, 12, 12, 12, 12, 12, 12, time.Local).Local())
Expand All @@ -50,7 +50,7 @@ func TestFormatter_Format(t *testing.T) {
},
{
name: "logWithFields",
want: []byte("[2020-12-12 12:12:12,012][warning][a=b] - entry2"),
want: []byte("[2020-12-12 12:12:12,012][warning][a=b] - entry2\n"),
args: args{
entry: func() *logrus.Entry {
entry := Log.WithField("a", "b").WithTime(time.Date(2020, 12, 12, 12, 12, 12, 12, time.Local).Local())
Expand Down