From 82a231f997cf404c6a7afa5bd6f47521ac1eedeb Mon Sep 17 00:00:00 2001 From: gfleury Date: Fri, 20 Sep 2019 15:38:08 +0200 Subject: [PATCH 1/2] Adding multiple queries --- cmd.go | 17 +++-- output/output.go | 21 ++++-- output/simple_table_output.go | 5 +- tablestream/output.go | 2 - tablestream/tableregex_test.go | 6 +- tablestream/view.go | 82 +++++++++++----------- tablestream/viewdata_aggregated_test.go | 90 +++++++++++++++++++++++++ 7 files changed, 163 insertions(+), 60 deletions(-) diff --git a/cmd.go b/cmd.go index ece1971..66237f1 100644 --- a/cmd.go +++ b/cmd.go @@ -31,7 +31,7 @@ func main() { Run: func(cmd *cobra.Command, args []string) { for i, mapping := range c.Mappings { if mapping.Name == args[0] { - err := o.CreateStreamFromConfigurationMapping(&c.Mappings[i], &args[1]) + err := o.CreateStreamFromConfigurationMapping(&c.Mappings[i], args[1:]) if err != nil { fmt.Println(err) os.Exit(1) @@ -73,17 +73,24 @@ func main() { fmt.Println(err) os.Exit(1) } - err = o.Stream().Query(args[1]) - if err != nil { - fmt.Println(err) - os.Exit(1) + + queries := args[1:] + for _, query := range queries { + err = o.Stream().Query(query) + if err != nil { + fmt.Println(err) + os.Exit(1) + } } + err = o.Configure() if err != nil { fmt.Println(err) os.Exit(1) } + break + } } if o == nil { diff --git a/output/output.go b/output/output.go index 4e6bbba..bcea6e8 100644 --- a/output/output.go +++ b/output/output.go @@ -13,7 +13,7 @@ type Outputer interface { Configure() error Stream() *tablestream.Stream ErrorChan() *chan error - CreateStreamFromConfigurationMapping(mapping *conf.Mapping, createNamedQueries *string) error + CreateStreamFromConfigurationMapping(mapping *conf.Mapping, createNamedQueries []string) error InputExists() *bool SetInputExists(func() *bool) EnableProfile() @@ -28,7 +28,7 @@ type StreamOutput struct { profile bool } -func (o *StreamOutput) CreateStreamFromConfigurationMapping(mapping *conf.Mapping, createNamedQueries *string) error { +func (o *StreamOutput) CreateStreamFromConfigurationMapping(mapping *conf.Mapping, createNamedQueries []string) error { o.stream = &tablestream.Stream{} for _, tableDDL := range mapping.Tables { err := o.Stream().Query(tableDDL) @@ -36,15 +36,22 @@ func (o *StreamOutput) CreateStreamFromConfigurationMapping(mapping *conf.Mappin return err } } - if createNamedQueries != nil { + for _, createNamedQuerie := range createNamedQueries { + found := false for _, query := range mapping.Queries { - if query.Name == *createNamedQueries { - err := o.stream.Query(query.Query) - return err + if query.Name == createNamedQuerie { + err := o.Stream().Query(query.Query) + if err != nil { + return err + } + found = true } } - return fmt.Errorf("No query named %s found", *createNamedQueries) + if !found { + return fmt.Errorf("No query named %s found", createNamedQuerie) + } } + return nil } diff --git a/output/simple_table_output.go b/output/simple_table_output.go index 64bd5d3..788bec3 100644 --- a/output/simple_table_output.go +++ b/output/simple_table_output.go @@ -1,9 +1,11 @@ package output import ( - "github.com/gfleury/gstreamtop/tablestream" + "fmt" "os" "time" + + "github.com/gfleury/gstreamtop/tablestream" ) type SimpleTableOutput struct { @@ -15,6 +17,7 @@ func (o *SimpleTableOutput) Loop() { for *o.InputExists() { <-pTicker.C + fmt.Println("\033[2J") for _, view := range o.stream.GetViews() { tablestream.TableWrite(view, os.Stdout) } diff --git a/tablestream/output.go b/tablestream/output.go index db13a60..023b004 100644 --- a/tablestream/output.go +++ b/tablestream/output.go @@ -1,7 +1,6 @@ package tablestream import ( - "fmt" "io" "github.com/olekukonko/tablewriter" @@ -12,6 +11,5 @@ func TableWrite(v *View, file io.Writer) { ptable := tablewriter.NewWriter(file) ptable.SetHeader(data[0]) ptable.AppendBulk(data[1:]) - fmt.Println("\033[2J") ptable.Render() } diff --git a/tablestream/tableregex_test.go b/tablestream/tableregex_test.go index fa9ea09..2290b4d 100644 --- a/tablestream/tableregex_test.go +++ b/tablestream/tableregex_test.go @@ -43,10 +43,8 @@ func (s *Suite) TestAddRow(c *check.C) { defer mmutex.Unlock() for { for _, j := range table.typeInstance { - select { - case msg = <-j: - return - } + msg = <-j + return } } diff --git a/tablestream/view.go b/tablestream/view.go index bfa8971..ceef1ab 100644 --- a/tablestream/view.go +++ b/tablestream/view.go @@ -87,56 +87,56 @@ func (v *View) UpdateView() { table.Lock() tableChan := table.TypeInstance(v.name) table.Unlock() - select { - case newData := <-tableChan: - if !v.evaluateWhere(newData) { + + newData := <-tableChan + if !v.evaluateWhere(newData) { + continue + } + v.lock.Lock() + groupBy := make([]string, len(v.groupByFields)) + idx := 0 + for _, groupByField := range v.groupByFields { + var ok bool + if groupByField.Modifier() == "SESSION" { continue } - v.lock.Lock() - groupBy := make([]string, len(v.groupByFields)) - idx := 0 - for _, groupByField := range v.groupByFields { - var ok bool - if groupByField.Modifier() == "SESSION" { - continue - } - groupByIfc, err := groupByField.CallUpdateValue(AggregatedValue{value: newData[groupByField.Field().name], groupBy: []string{""}}) - if err != nil { - v.AddError(fmt.Errorf("failed to update value on %s:%s %s", v.name, groupByField.Field().name, err.Error())) - continue - } - if groupBy[idx], ok = groupByIfc.(string); !ok { - groupBy[idx] = fmt.Sprintf("%d", groupByIfc.(int)) - } - idx++ + groupByIfc, err := groupByField.CallUpdateValue(AggregatedValue{value: newData[groupByField.Field().name], groupBy: []string{""}}) + if err != nil { + v.AddError(fmt.Errorf("failed to update value on %s:%s %s", v.name, groupByField.Field().name, err.Error())) + continue } - for _, groupByField := range v.groupByFields { - var ok bool - if groupByField.Modifier() != "SESSION" { - continue - } - groupByIfc, err := groupByField.CallUpdateValue(AggregatedValue{value: newData[groupByField.Field().name], groupBy: groupBy}) - if err != nil { - v.AddError(fmt.Errorf("failed to update value on %s:%s %s", v.name, groupByField.Field().name, err.Error())) - continue - } - if groupBy[idx], ok = groupByIfc.(string); !ok { - groupBy[idx] = fmt.Sprintf("%d", groupByIfc.(int)) - } - idx++ + if groupBy[idx], ok = groupByIfc.(string); !ok { + groupBy[idx] = fmt.Sprintf("%d", groupByIfc.(int)) } - for key, value := range newData { - for _, viewData := range v.ViewDataByFieldName(key) { - _, err := viewData.CallUpdateValue(AggregatedValue{value: value, groupBy: groupBy}) - if err != nil { - v.AddError(fmt.Errorf("failed to update value on %s:%s %s %s", v.name, viewData.Name(), value, err.Error())) - } + idx++ + } + for _, groupByField := range v.groupByFields { + var ok bool + if groupByField.Modifier() != "SESSION" { + continue + } + groupByIfc, err := groupByField.CallUpdateValue(AggregatedValue{value: newData[groupByField.Field().name], groupBy: groupBy}) + if err != nil { + v.AddError(fmt.Errorf("failed to update value on %s:%s %s", v.name, groupByField.Field().name, err.Error())) + continue + } + if groupBy[idx], ok = groupByIfc.(string); !ok { + groupBy[idx] = fmt.Sprintf("%d", groupByIfc.(int)) + } + idx++ + } + for key, value := range newData { + for _, viewData := range v.ViewDataByFieldName(key) { + _, err := viewData.CallUpdateValue(AggregatedValue{value: value, groupBy: groupBy}) + if err != nil { + v.AddError(fmt.Errorf("failed to update value on %s:%s %s %s", v.name, viewData.Name(), value, err.Error())) } } - v.lock.Unlock() } + v.lock.Unlock() } } + } func (v *View) IntViewData(idx int, keys []string) []int { diff --git a/tablestream/viewdata_aggregated_test.go b/tablestream/viewdata_aggregated_test.go index 4f9cf3b..48fa779 100644 --- a/tablestream/viewdata_aggregated_test.go +++ b/tablestream/viewdata_aggregated_test.go @@ -366,3 +366,93 @@ func (s *Suite) TestSingleQuotesRegex(c *check.C) { c.Assert(len(allRows), check.Equals, 5) } + +func (s *Suite) TestTwoRunningQueriesWithGroupBYExecution(c *check.C) { + query := `CREATE TABLE log(ip VARCHAR, col2 VARCHAR, col3 VARCHAR, + dt VARCHAR, method VARCHAR, url VARCHAR, version VARCHAR, + response INTEGER, size INTEGER, col10 VARCHAR, useragent VARCHAR) + WITH FIELDS IDENTIFIED BY '^(?P\\S+)\\s(?P\\S+)\\s(?P\\S+)\\s\\[(?P
[\\w:\\/]+\\s[+\\-]\\d{4})\\]\\s"(?P\\S+)\\s?(?P\\S+)?\\s?(?P\\S+)?"\\s(?P\\d{3}|-)\\s(?P\\d+|-)\\s?"?(?P[^"]*)"?\\s?"?(?P[^"]*)?"?$' + LINES TERMINATED BY '\n';` + + stream := &Stream{} + + err := stream.Query(query) + c.Assert(err, check.IsNil) + + query = `SELECT URLIFY(url) as url, COUNT(*) as count, + SUM(size) as sum, size, MAX(response) FROM log + GROUP BY url, size ORDER BY count LIMIT 20;` + + err = stream.Query(query) + c.Assert(err, check.IsNil) + + query = `SELECT url, COUNT(*) as count, SUM(size) as sum, AVG(size), MAX(response) + FROM log WHERE url LIKE '/robots%' AND response > 100 GROUP BY url ORDER BY + count LIMIT 5;` + + err = stream.Query(query) + c.Assert(err, check.IsNil) + + table, err := stream.Table("log") + c.Assert(err, check.IsNil) + + err = table.AddRow(`92.115.179.247 - - [20/May/2015:21:05:35 +0000] "GET /favicon.ico HTTP/1.1" 200 366638 "-" "Mozilla/5.0 (X11; Ubuntu; Linux i686; rv:20.0) Gecko/20100101 Firefox/20.0"`) + c.Assert(err, check.IsNil) + err = table.AddRow(`92.115.179.247 - - [20/May/2015:21:05:35 +0000] "GET /favicon.ico HTTP/1.1" 200 366638 "-" "Mozilla/5.0 (X11; Ubuntu; Linux i686; rv:20.0) Gecko/20100101 Firefox/20.0"`) + c.Assert(err, check.IsNil) + err = table.AddRow(`92.115.179.247 - - [20/May/2015:21:05:35 +0000] "GET /favicon.ico HTTP/1.1" 200 366638 "-" "Mozilla/5.0 (X11; Ubuntu; Linux i686; rv:20.0) Gecko/20100101 Firefox/20.0"`) + c.Assert(err, check.IsNil) + + err = table.AddRow(`66.169.220.99 - - [20/May/2015:21:05:03 +0000] "GET /favicon.ico HTTP/1.1" 200 3638 "-" "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:27.0) Gecko/20100101 Firefox/27.0"`) + c.Assert(err, check.IsNil) + err = table.AddRow(`66.169.220.99 - - [20/May/2015:21:05:03 +0000] "GET /favicon.ico HTTP/1.1" 200 3638 "-" "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:27.0) Gecko/20100101 Firefox/27.0"`) + c.Assert(err, check.IsNil) + err = table.AddRow(`66.169.220.99 - - [20/May/2015:21:05:03 +0000] "GET /favicon.ico HTTP/1.1" 200 3638 "-" "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:27.0) Gecko/20100101 Firefox/27.0"`) + c.Assert(err, check.IsNil) + + err = table.AddRow(`180.76.6.130 - - [20/May/2015:20:05:43 +0000] "GET /robots.txt HTTP/1.1" 200 102 "-" "Mozilla/5.0 (Windows NT 5.1; rv:6.0.2) Gecko/20100101 Firefox/6.0.2"`) + c.Assert(err, check.IsNil) + err = table.AddRow(`180.76.6.130 - - [20/May/2015:20:05:43 +0000] "GET /robots.txt HTTP/1.1" 200 102 "-" "Mozilla/5.0 (Windows NT 5.1; rv:6.0.2) Gecko/20100101 Firefox/6.0.2"`) + c.Assert(err, check.IsNil) + + // Time to flush the channels + time.Sleep(1000 * time.Millisecond) + + allRows := stream.views[0].FetchAllRows() + // +--------------+-------+---------+--------+---------------+ + // | URL | COUNT | SUM | SIZE | MAX(RESPONSE) | + // +--------------+-------+---------+--------+---------------+ + // | /favicon.ico | 3 | 1099914 | 366638 | 200 | + // | /favicon.ico | 3 | 10914 | 3638 | 200 | + // ..... + // +--------------+-------+---------+--------+---------------+ + c.Assert(len(allRows), check.Equals, 4) + c.Assert(allRows[1][0], check.Equals, "/favicon.ico") + c.Assert(allRows[1][1], check.Equals, "3") + c.Assert(allRows[1][2], check.Equals, "1099914") + c.Assert(allRows[1][3], check.Equals, "366638") + c.Assert(allRows[1][4], check.Equals, "200") + + c.Assert(allRows[2][0], check.Equals, "/favicon.ico") + c.Assert(allRows[2][1], check.Equals, "3") + c.Assert(allRows[2][2], check.Equals, "10914") + c.Assert(allRows[2][3], check.Equals, "3638") + c.Assert(allRows[2][4], check.Equals, "200") + + // SECOND QUERY + + allRows = stream.views[1].FetchAllRows() + // +--------------+-------+---------+--------+---------------+ + // | URL | COUNT | SUM | SIZE | MAX(RESPONSE) | + // +--------------+-------+---------+--------+---------------+ + // | /favicon.ico | 3 | 1099914 | 366638 | 200 | + // | /favicon.ico | 3 | 10914 | 3638 | 200 | + // ..... + // +--------------+-------+---------+--------+---------------+ + c.Assert(len(allRows), check.Equals, 3) + c.Assert(allRows[1][0], check.Equals, "/robots.txt") + c.Assert(allRows[1][1], check.Equals, "2") + c.Assert(allRows[1][2], check.Equals, "204") + c.Assert(allRows[1][3], check.Equals, "102") + c.Assert(allRows[1][4], check.Equals, "200") +} From 0f08a3bb792afe73a32ca0acb037834ecb55f4f9 Mon Sep 17 00:00:00 2001 From: gfleury Date: Fri, 20 Sep 2019 17:17:01 +0200 Subject: [PATCH 2/2] Adding some test coverage --- cmd.go | 2 -- output/output_test.go | 33 +++++++++++++++++++++++++++++++++ 2 files changed, 33 insertions(+), 2 deletions(-) diff --git a/cmd.go b/cmd.go index 66237f1..e7b0780 100644 --- a/cmd.go +++ b/cmd.go @@ -88,9 +88,7 @@ func main() { fmt.Println(err) os.Exit(1) } - break - } } if o == nil { diff --git a/output/output_test.go b/output/output_test.go index 98a88dc..7c0d402 100644 --- a/output/output_test.go +++ b/output/output_test.go @@ -1,11 +1,14 @@ package output import ( + "fmt" + "github.com/gfleury/gstreamtop/conf" "github.com/gfleury/gstreamtop/tablestream" "github.com/prometheus/client_golang/prometheus/promhttp" "io/ioutil" "net/http" "net/http/httptest" + "os" "strings" "testing" "time" @@ -91,3 +94,33 @@ func (s *Suite) TestPrometheus(c *check.C) { } } + +func (s *Suite) TestOutput(c *check.C) { + conf := &conf.Configuration{} + + conf.SetFileURL("../mapping.yaml") + err := conf.ReadFile() + if err != nil { + fmt.Println(err) + os.Exit(1) + } + + o := &StreamOutput{} + err = o.CreateStreamFromConfigurationMapping(&conf.Mappings[0], nil) + c.Assert(err, check.IsNil) + + a := true + + o.SetInputExists(func() *bool { + return &a + }) + exists := o.InputExists() + c.Assert(*exists, check.Equals, true) + + a = false + exists = o.InputExists() + c.Assert(*exists, check.Equals, false) + + b := o.Stream() + c.Assert(b, check.NotNil) +}