Skip to content

forkkit/gostream-core

 
 

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

gostream-core

Build Status Go Report Card

The Stream Processing API for Go

TODO

  • Window
    • LengthWindow
    • LengthBatchWindow
    • TimeWindow
    • TimeBatchWindow
  • Selector
    • EqualsType, NotEqualsType
    • Equals, NotEquals
    • LargerThan, LessThan
  • Function
    • Max, Min, Median
    • Count, Sum, Average
    • Cast
    • As
    • SelectAll, Select
    • GroupBy
    • Having
  • View
    • OrderBy, Limit
    • First, Last
  • Tool
    • Builder
    • Lexer
    • Parser

Install

go get github.com/itsubaki/gostream-core

Example

type LogEvent struct {
  Time    time.Time
  Level   int
  Message string
}

// select count(*) from LogEvent.time(10sec) where Level > 2
w := window.NewTime(LogEvent{}, 10*time.Second)
defer w.Close()

w.SetSelector(
  selector.LargerThanInt{
    Name: "Level",
    Value: 2,
  },
)
w.SetFunction(
  function.Count{
    As: "count",
  },
)

go func() {
  for {
    newest := event.Newest(<-w.Output())
    if newest.Int("count") > 10 {
      // notification
    }
  }
}()

w.Input() <- LogEvent{
  Time:    time.Now(),
  Level:   1,
  Message: "this is text log.",
}
type MyEvent struct {
  Name  string
  Value int
}

// select Name as n, Value as v
//  from MyEvent.time(10msec)
//  where Value > 97
//  orderby Value DESC
//  limit 10 offset 5

w := window.NewTime(MyEvent{}, 10 * time.Millisecond)
defer w.Close()

w.SetSelector(
  selector.LargerThanInt{
    Name: "Value",
    Value: 97,
  },
)
w.SetFunction(
  function.SelectString{
    Name: "Name",
    As: "n",
  },
  function.SelectInt{
    Name: "Value",
    As: "v",
  },
)
w.SetView(
  view.OrderByInt{
    Name: "Value",
    Reverse: true,
  },
  view.Limit{
    Limit: 10,
    Offset: 5,
  },
)

go func() {
  for {
    fmt.Println(<-w.Output())
  }
}()

for i := 0; i < 100; i++ {
  w.Input() <-MyEvent{
    Name:  "name",
    Value: i,
  }
}
// select avg(Value), sum(Value) from MyEvent.length(10)
w := window.NewLength(MyEvent{}, 10)
defer w.Close()

w.SetFunction(
  function.AverageInt{
    Name: "Value",
    As:   "avg(Value)",
  },
  function.SumInt{
    Name: "Value",
    As:   "sum(Value)",
  },
)

RuntimeEventBuilder

// type RuntimeEvent struct {
//  Name string
//  Value int
// }
b := builder.New()
b.SetField("Name", reflect.TypeOf(""))
b.SetField("Value", reflect.TypeOf(0))
s := b.Build()


// i.Value()
// -> RuntimeEvent{Name: "foobar", Value: 123}
// i.Pointer()
// -> &RuntimeEvent{Name: "foobar", Value: 123}
i := s.NewInstance()
i.SetString("Name", "foobar")
i.SetInt("Value", 123)

w.Input() <-i.Value()

(WIP) Query

p := parser.New()
p.Register("MapEvent", MapEvent{})

query := "select * from MapEvent.length(10)"
statement, err := p.Parse(query)
if err != nil {
  log.Println("failed.")
  return
}

window := statement.New()
defer window.Close()

window.Input() <-MapEvent{map}
fmt.Println(<-window.Output())

About

The Stream Processing API for Go

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages

  • Go 100.0%