Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Second stage of topology cleanup #3818

Merged
merged 2 commits into from Mar 28, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion libbeat/outputs/elasticsearch/output.go
Expand Up @@ -25,9 +25,9 @@ type elasticsearchOutput struct {
index outil.Selector
beat common.BeatInfo
pipeline *outil.Selector
clients []mode.ProtocolClient

mode mode.ConnectionMode
topology

template map[string]interface{}
template2x map[string]interface{}
Expand Down
44 changes: 2 additions & 42 deletions libbeat/outputs/elasticsearch/output_test.go
Expand Up @@ -43,46 +43,6 @@ func createElasticsearchConnection(flushInterval int, bulkSize int) *elasticsear
return output
}

func TestTopologyInES(t *testing.T) {

if testing.Verbose() {
logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"*"})
}

elasticsearchOutput1 := createElasticsearchConnection(0, 0)
elasticsearchOutput2 := createElasticsearchConnection(0, 0)
elasticsearchOutput3 := createElasticsearchConnection(0, 0)

elasticsearchOutput1.PublishIPs("proxy1", []string{"10.1.0.4"})
elasticsearchOutput2.PublishIPs("proxy2", []string{"10.1.0.9",
"fe80::4e8d:79ff:fef2:de6a"})
elasticsearchOutput3.PublishIPs("proxy3", []string{"10.1.0.10"})

name2 := elasticsearchOutput3.GetNameByIP("10.1.0.9")
if name2 != "proxy2" {
t.Errorf("Failed to update proxy2 in topology: name=%s", name2)
}

elasticsearchOutput1.PublishIPs("proxy1", []string{"10.1.0.4"})
elasticsearchOutput2.PublishIPs("proxy2", []string{"10.1.0.9"})
elasticsearchOutput3.PublishIPs("proxy3", []string{"192.168.1.2"})

name3 := elasticsearchOutput3.GetNameByIP("192.168.1.2")
if name3 != "proxy3" {
t.Errorf("Failed to add a new IP")
}

name3 = elasticsearchOutput3.GetNameByIP("10.1.0.10")
if name3 != "" {
t.Errorf("Failed to delete old IP of proxy3: %s", name3)
}

name2 = elasticsearchOutput3.GetNameByIP("fe80::4e8d:79ff:fef2:de6a")
if name2 != "" {
t.Errorf("Failed to delete old IP of proxy2: %s", name2)
}
}

func TestOneEvent(t *testing.T) {

if testing.Verbose() {
Expand Down Expand Up @@ -158,7 +118,7 @@ func TestOneEvent(t *testing.T) {
func TestEvents(t *testing.T) {

if testing.Verbose() {
logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"topology", "output_elasticsearch"})
logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"output_elasticsearch"})
}

ts := time.Now()
Expand Down Expand Up @@ -298,7 +258,7 @@ func testBulkWithParams(t *testing.T, output *elasticsearchOutput) {
func TestBulkEvents(t *testing.T) {

if testing.Verbose() {
logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"topology", "output_elasticsearch", "elasticsearch"})
logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"output_elasticsearch", "elasticsearch"})
}

testBulkWithParams(t, createElasticsearchConnection(50, 2))
Expand Down
12 changes: 12 additions & 0 deletions libbeat/outputs/elasticsearch/testing.go
@@ -1,6 +1,7 @@
package elasticsearch

import (
"math/rand"
"os"
"time"

Expand Down Expand Up @@ -62,3 +63,14 @@ func newTestClientAuth(url, user, pass string) *Client {
}
return client
}

func (t *elasticsearchOutput) randomClient() *Client {
switch len(t.clients) {
case 0:
return nil
case 1:
return t.clients[0].(*Client).Clone()
default:
return t.clients[rand.Intn(len(t.clients))].(*Client).Clone()
}
}
127 changes: 0 additions & 127 deletions libbeat/outputs/elasticsearch/topology.go

This file was deleted.

28 changes: 10 additions & 18 deletions libbeat/outputs/redis/config.go
Expand Up @@ -20,28 +20,20 @@ type redisConfig struct {
MaxRetries int `config:"max_retries"`
TLS *outputs.TLSConfig `config:"ssl"`
Proxy transport.ProxyConfig `config:",inline"`

Db int `config:"db"`
DataType string `config:"datatype"`

HostTopology string `config:"host_topology"`
PasswordTopology string `config:"password_topology"`
DbTopology int `config:"db_topology"`
Codec outputs.CodecConfig `config:"codec"`
Codec outputs.CodecConfig `config:"codec"`
Db int `config:"db"`
DataType string `config:"datatype"`
}

var (
defaultConfig = redisConfig{
Port: 6379,
LoadBalance: true,
Timeout: 5 * time.Second,
MaxRetries: 3,
TLS: nil,
Db: 0,
DataType: "list",
HostTopology: "",
PasswordTopology: "",
DbTopology: 1,
Port: 6379,
LoadBalance: true,
Timeout: 5 * time.Second,
MaxRetries: 3,
TLS: nil,
Db: 0,
DataType: "list",
}
)

Expand Down
29 changes: 4 additions & 25 deletions libbeat/publisher/publish.go
Expand Up @@ -99,36 +99,15 @@ func init() {
publishDisabled = flag.Bool("N", false, "Disable actual publishing for testing")
}

func (publisher *BeatPublisher) IsPublisherIP(ip string) bool {
for _, myip := range publisher.IPAddrs {
if myip == ip {
return true
}
}

return false
}

func (publisher *BeatPublisher) GetServerName(ip string) string {
// in case the IP is localhost, return current shipper name
islocal, err := common.IsLoopback(ip)
if err != nil {
logp.Err("Parsing IP %s fails with: %s", ip, err)
return ""
}

if islocal {
return publisher.name
}

return ""
}

func (publisher *BeatPublisher) Connect() Client {
atomic.AddUint32(&publisher.numClients, 1)
return newClient(publisher)
}

func (publisher *BeatPublisher) GetName() string {
return publisher.name
}

// Create new PublisherType
func New(
beat common.BeatInfo,
Expand Down
Binary file removed packetbeat/docs/images/topology_map.png
Binary file not shown.