Skip to content

Commit

Permalink
Merge pull request #7 from gfleury/multiple-queries
Browse files Browse the repository at this point in the history
Adding multiple queries
  • Loading branch information
gfleury committed Sep 21, 2019
2 parents 3fd9bbd + 0f08a3b commit b972e4c
Show file tree
Hide file tree
Showing 8 changed files with 194 additions and 60 deletions.
15 changes: 10 additions & 5 deletions cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func main() {
Run: func(cmd *cobra.Command, args []string) {
for i, mapping := range c.Mappings {
if mapping.Name == args[0] {
err := o.CreateStreamFromConfigurationMapping(&c.Mappings[i], &args[1])
err := o.CreateStreamFromConfigurationMapping(&c.Mappings[i], args[1:])
if err != nil {
fmt.Println(err)
os.Exit(1)
Expand Down Expand Up @@ -73,11 +73,16 @@ func main() {
fmt.Println(err)
os.Exit(1)
}
err = o.Stream().Query(args[1])
if err != nil {
fmt.Println(err)
os.Exit(1)

queries := args[1:]
for _, query := range queries {
err = o.Stream().Query(query)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
}

err = o.Configure()
if err != nil {
fmt.Println(err)
Expand Down
21 changes: 14 additions & 7 deletions output/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type Outputer interface {
Configure() error
Stream() *tablestream.Stream
ErrorChan() *chan error
CreateStreamFromConfigurationMapping(mapping *conf.Mapping, createNamedQueries *string) error
CreateStreamFromConfigurationMapping(mapping *conf.Mapping, createNamedQueries []string) error
InputExists() *bool
SetInputExists(func() *bool)
EnableProfile()
Expand All @@ -28,23 +28,30 @@ type StreamOutput struct {
profile bool
}

func (o *StreamOutput) CreateStreamFromConfigurationMapping(mapping *conf.Mapping, createNamedQueries *string) error {
func (o *StreamOutput) CreateStreamFromConfigurationMapping(mapping *conf.Mapping, createNamedQueries []string) error {
o.stream = &tablestream.Stream{}
for _, tableDDL := range mapping.Tables {
err := o.Stream().Query(tableDDL)
if err != nil {
return err
}
}
if createNamedQueries != nil {
for _, createNamedQuerie := range createNamedQueries {
found := false
for _, query := range mapping.Queries {
if query.Name == *createNamedQueries {
err := o.stream.Query(query.Query)
return err
if query.Name == createNamedQuerie {
err := o.Stream().Query(query.Query)
if err != nil {
return err
}
found = true
}
}
return fmt.Errorf("No query named %s found", *createNamedQueries)
if !found {
return fmt.Errorf("No query named %s found", createNamedQuerie)
}
}

return nil
}

Expand Down
33 changes: 33 additions & 0 deletions output/output_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package output

import (
"fmt"
"github.com/gfleury/gstreamtop/conf"
"github.com/gfleury/gstreamtop/tablestream"
"github.com/prometheus/client_golang/prometheus/promhttp"
"io/ioutil"
"net/http"
"net/http/httptest"
"os"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -91,3 +94,33 @@ func (s *Suite) TestPrometheus(c *check.C) {
}

}

func (s *Suite) TestOutput(c *check.C) {
conf := &conf.Configuration{}

conf.SetFileURL("../mapping.yaml")
err := conf.ReadFile()
if err != nil {
fmt.Println(err)
os.Exit(1)
}

o := &StreamOutput{}
err = o.CreateStreamFromConfigurationMapping(&conf.Mappings[0], nil)
c.Assert(err, check.IsNil)

a := true

o.SetInputExists(func() *bool {
return &a
})
exists := o.InputExists()
c.Assert(*exists, check.Equals, true)

a = false
exists = o.InputExists()
c.Assert(*exists, check.Equals, false)

b := o.Stream()
c.Assert(b, check.NotNil)
}
5 changes: 4 additions & 1 deletion output/simple_table_output.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package output

import (
"github.com/gfleury/gstreamtop/tablestream"
"fmt"
"os"
"time"

"github.com/gfleury/gstreamtop/tablestream"
)

type SimpleTableOutput struct {
Expand All @@ -15,6 +17,7 @@ func (o *SimpleTableOutput) Loop() {

for *o.InputExists() {
<-pTicker.C
fmt.Println("\033[2J")
for _, view := range o.stream.GetViews() {
tablestream.TableWrite(view, os.Stdout)
}
Expand Down
2 changes: 0 additions & 2 deletions tablestream/output.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package tablestream

import (
"fmt"
"io"

"github.com/olekukonko/tablewriter"
Expand All @@ -12,6 +11,5 @@ func TableWrite(v *View, file io.Writer) {
ptable := tablewriter.NewWriter(file)
ptable.SetHeader(data[0])
ptable.AppendBulk(data[1:])
fmt.Println("\033[2J")
ptable.Render()
}
6 changes: 2 additions & 4 deletions tablestream/tableregex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,8 @@ func (s *Suite) TestAddRow(c *check.C) {
defer mmutex.Unlock()
for {
for _, j := range table.typeInstance {
select {
case msg = <-j:
return
}
msg = <-j
return
}
}

Expand Down
82 changes: 41 additions & 41 deletions tablestream/view.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,56 +87,56 @@ func (v *View) UpdateView() {
table.Lock()
tableChan := table.TypeInstance(v.name)
table.Unlock()
select {
case newData := <-tableChan:
if !v.evaluateWhere(newData) {

newData := <-tableChan
if !v.evaluateWhere(newData) {
continue
}
v.lock.Lock()
groupBy := make([]string, len(v.groupByFields))
idx := 0
for _, groupByField := range v.groupByFields {
var ok bool
if groupByField.Modifier() == "SESSION" {
continue
}
v.lock.Lock()
groupBy := make([]string, len(v.groupByFields))
idx := 0
for _, groupByField := range v.groupByFields {
var ok bool
if groupByField.Modifier() == "SESSION" {
continue
}
groupByIfc, err := groupByField.CallUpdateValue(AggregatedValue{value: newData[groupByField.Field().name], groupBy: []string{""}})
if err != nil {
v.AddError(fmt.Errorf("failed to update value on %s:%s %s", v.name, groupByField.Field().name, err.Error()))
continue
}
if groupBy[idx], ok = groupByIfc.(string); !ok {
groupBy[idx] = fmt.Sprintf("%d", groupByIfc.(int))
}
idx++
groupByIfc, err := groupByField.CallUpdateValue(AggregatedValue{value: newData[groupByField.Field().name], groupBy: []string{""}})
if err != nil {
v.AddError(fmt.Errorf("failed to update value on %s:%s %s", v.name, groupByField.Field().name, err.Error()))
continue
}
for _, groupByField := range v.groupByFields {
var ok bool
if groupByField.Modifier() != "SESSION" {
continue
}
groupByIfc, err := groupByField.CallUpdateValue(AggregatedValue{value: newData[groupByField.Field().name], groupBy: groupBy})
if err != nil {
v.AddError(fmt.Errorf("failed to update value on %s:%s %s", v.name, groupByField.Field().name, err.Error()))
continue
}
if groupBy[idx], ok = groupByIfc.(string); !ok {
groupBy[idx] = fmt.Sprintf("%d", groupByIfc.(int))
}
idx++
if groupBy[idx], ok = groupByIfc.(string); !ok {
groupBy[idx] = fmt.Sprintf("%d", groupByIfc.(int))
}
for key, value := range newData {
for _, viewData := range v.ViewDataByFieldName(key) {
_, err := viewData.CallUpdateValue(AggregatedValue{value: value, groupBy: groupBy})
if err != nil {
v.AddError(fmt.Errorf("failed to update value on %s:%s %s %s", v.name, viewData.Name(), value, err.Error()))
}
idx++
}
for _, groupByField := range v.groupByFields {
var ok bool
if groupByField.Modifier() != "SESSION" {
continue
}
groupByIfc, err := groupByField.CallUpdateValue(AggregatedValue{value: newData[groupByField.Field().name], groupBy: groupBy})
if err != nil {
v.AddError(fmt.Errorf("failed to update value on %s:%s %s", v.name, groupByField.Field().name, err.Error()))
continue
}
if groupBy[idx], ok = groupByIfc.(string); !ok {
groupBy[idx] = fmt.Sprintf("%d", groupByIfc.(int))
}
idx++
}
for key, value := range newData {
for _, viewData := range v.ViewDataByFieldName(key) {
_, err := viewData.CallUpdateValue(AggregatedValue{value: value, groupBy: groupBy})
if err != nil {
v.AddError(fmt.Errorf("failed to update value on %s:%s %s %s", v.name, viewData.Name(), value, err.Error()))
}
}
v.lock.Unlock()
}
v.lock.Unlock()
}
}

}

func (v *View) IntViewData(idx int, keys []string) []int {
Expand Down
90 changes: 90 additions & 0 deletions tablestream/viewdata_aggregated_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,3 +366,93 @@ func (s *Suite) TestSingleQuotesRegex(c *check.C) {
c.Assert(len(allRows), check.Equals, 5)

}

func (s *Suite) TestTwoRunningQueriesWithGroupBYExecution(c *check.C) {
query := `CREATE TABLE log(ip VARCHAR, col2 VARCHAR, col3 VARCHAR,
dt VARCHAR, method VARCHAR, url VARCHAR, version VARCHAR,
response INTEGER, size INTEGER, col10 VARCHAR, useragent VARCHAR)
WITH FIELDS IDENTIFIED BY '^(?P<ip>\\S+)\\s(?P<col2>\\S+)\\s(?P<col3>\\S+)\\s\\[(?P<dt>[\\w:\\/]+\\s[+\\-]\\d{4})\\]\\s"(?P<method>\\S+)\\s?(?P<url>\\S+)?\\s?(?P<version>\\S+)?"\\s(?P<response>\\d{3}|-)\\s(?P<size>\\d+|-)\\s?"?(?P<col10>[^"]*)"?\\s?"?(?P<useragent>[^"]*)?"?$'
LINES TERMINATED BY '\n';`

stream := &Stream{}

err := stream.Query(query)
c.Assert(err, check.IsNil)

query = `SELECT URLIFY(url) as url, COUNT(*) as count,
SUM(size) as sum, size, MAX(response) FROM log
GROUP BY url, size ORDER BY count LIMIT 20;`

err = stream.Query(query)
c.Assert(err, check.IsNil)

query = `SELECT url, COUNT(*) as count, SUM(size) as sum, AVG(size), MAX(response)
FROM log WHERE url LIKE '/robots%' AND response > 100 GROUP BY url ORDER BY
count LIMIT 5;`

err = stream.Query(query)
c.Assert(err, check.IsNil)

table, err := stream.Table("log")
c.Assert(err, check.IsNil)

err = table.AddRow(`92.115.179.247 - - [20/May/2015:21:05:35 +0000] "GET /favicon.ico HTTP/1.1" 200 366638 "-" "Mozilla/5.0 (X11; Ubuntu; Linux i686; rv:20.0) Gecko/20100101 Firefox/20.0"`)
c.Assert(err, check.IsNil)
err = table.AddRow(`92.115.179.247 - - [20/May/2015:21:05:35 +0000] "GET /favicon.ico HTTP/1.1" 200 366638 "-" "Mozilla/5.0 (X11; Ubuntu; Linux i686; rv:20.0) Gecko/20100101 Firefox/20.0"`)
c.Assert(err, check.IsNil)
err = table.AddRow(`92.115.179.247 - - [20/May/2015:21:05:35 +0000] "GET /favicon.ico HTTP/1.1" 200 366638 "-" "Mozilla/5.0 (X11; Ubuntu; Linux i686; rv:20.0) Gecko/20100101 Firefox/20.0"`)
c.Assert(err, check.IsNil)

err = table.AddRow(`66.169.220.99 - - [20/May/2015:21:05:03 +0000] "GET /favicon.ico HTTP/1.1" 200 3638 "-" "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:27.0) Gecko/20100101 Firefox/27.0"`)
c.Assert(err, check.IsNil)
err = table.AddRow(`66.169.220.99 - - [20/May/2015:21:05:03 +0000] "GET /favicon.ico HTTP/1.1" 200 3638 "-" "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:27.0) Gecko/20100101 Firefox/27.0"`)
c.Assert(err, check.IsNil)
err = table.AddRow(`66.169.220.99 - - [20/May/2015:21:05:03 +0000] "GET /favicon.ico HTTP/1.1" 200 3638 "-" "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:27.0) Gecko/20100101 Firefox/27.0"`)
c.Assert(err, check.IsNil)

err = table.AddRow(`180.76.6.130 - - [20/May/2015:20:05:43 +0000] "GET /robots.txt HTTP/1.1" 200 102 "-" "Mozilla/5.0 (Windows NT 5.1; rv:6.0.2) Gecko/20100101 Firefox/6.0.2"`)
c.Assert(err, check.IsNil)
err = table.AddRow(`180.76.6.130 - - [20/May/2015:20:05:43 +0000] "GET /robots.txt HTTP/1.1" 200 102 "-" "Mozilla/5.0 (Windows NT 5.1; rv:6.0.2) Gecko/20100101 Firefox/6.0.2"`)
c.Assert(err, check.IsNil)

// Time to flush the channels
time.Sleep(1000 * time.Millisecond)

allRows := stream.views[0].FetchAllRows()
// +--------------+-------+---------+--------+---------------+
// | URL | COUNT | SUM | SIZE | MAX(RESPONSE) |
// +--------------+-------+---------+--------+---------------+
// | /favicon.ico | 3 | 1099914 | 366638 | 200 |
// | /favicon.ico | 3 | 10914 | 3638 | 200 |
// .....
// +--------------+-------+---------+--------+---------------+
c.Assert(len(allRows), check.Equals, 4)
c.Assert(allRows[1][0], check.Equals, "/favicon.ico")
c.Assert(allRows[1][1], check.Equals, "3")
c.Assert(allRows[1][2], check.Equals, "1099914")
c.Assert(allRows[1][3], check.Equals, "366638")
c.Assert(allRows[1][4], check.Equals, "200")

c.Assert(allRows[2][0], check.Equals, "/favicon.ico")
c.Assert(allRows[2][1], check.Equals, "3")
c.Assert(allRows[2][2], check.Equals, "10914")
c.Assert(allRows[2][3], check.Equals, "3638")
c.Assert(allRows[2][4], check.Equals, "200")

// SECOND QUERY

allRows = stream.views[1].FetchAllRows()
// +--------------+-------+---------+--------+---------------+
// | URL | COUNT | SUM | SIZE | MAX(RESPONSE) |
// +--------------+-------+---------+--------+---------------+
// | /favicon.ico | 3 | 1099914 | 366638 | 200 |
// | /favicon.ico | 3 | 10914 | 3638 | 200 |
// .....
// +--------------+-------+---------+--------+---------------+
c.Assert(len(allRows), check.Equals, 3)
c.Assert(allRows[1][0], check.Equals, "/robots.txt")
c.Assert(allRows[1][1], check.Equals, "2")
c.Assert(allRows[1][2], check.Equals, "204")
c.Assert(allRows[1][3], check.Equals, "102")
c.Assert(allRows[1][4], check.Equals, "200")
}

0 comments on commit b972e4c

Please sign in to comment.