Skip to content

Commit

Permalink
feat(stream): add initial support for stream processing and live updates
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Smith <drsmith.phys@gmail.com>
  • Loading branch information
clok committed Jun 14, 2022
1 parent 8397e6f commit a39c9cf
Show file tree
Hide file tree
Showing 7 changed files with 242 additions and 23 deletions.
124 changes: 104 additions & 20 deletions commands/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@ package commands

import (
"fmt"
"github.com/clok/kemba"
"github.com/montanaflynn/stats"
"golang.org/x/text/language"
"golang.org/x/text/message"
"io"
"os"
"os/exec"
"runtime"
"strconv"
"strings"

"github.com/clok/kemba"
"github.com/montanaflynn/stats"
)

var (
Expand All @@ -20,6 +24,28 @@ var (
kfpld = kc.Extend("processLine:debug")
)

func clearTerminal() error {
var cmd *exec.Cmd
goos := runtime.GOOS
switch goos {
case "windows":
cmd = exec.Command("cmd", "/c", "cls")
case "linux", "darwin":
cmd = exec.Command("clear")
}

if cmd == nil {
return fmt.Errorf("%s is not supported", runtime.GOOS)
}

cmd.Stdout = os.Stdout
err := cmd.Run()
if err != nil {
return err
}
return nil
}

func processReader(opts *processReaderInput) ([]float64, error) {
var data []rune
var lines int64
Expand All @@ -44,23 +70,81 @@ func processReader(opts *processReaderInput) ([]float64, error) {
}
lines++

// TODO: Add support for refresh rate
// if lines%100 == 0 {
// res, err := processSample(sample)
// if err != nil {
// return nil, err
// }
// var row string
// for _, field := range res.ListFields() {
// if row == "" {
// row = fmt.Sprintf(res.GetFormat(field), res.Get(field))
// } else {
// format := "%s\t" + res.GetFormat(field)
// row = fmt.Sprintf(format, row, res.Get(field))
// }
// }
// fmt.Printf("%s\r", row)
// }
// reset
data = []rune{}
kfpd.Println("-- RESET OUTPUT --")
}
}

if len(data) > 0 {
value, err := processLine(&processLineInput{
line: data,
})
if err != nil && value == 0 {
fails++
}
lines++
sample = append(sample, value)
}
kfpl.Printf("%d / %d lines processed failed to parse", fails, lines)
return sample, nil
}

func processReaderStream(opts *processReaderStreamInput) ([]float64, error) {
var data []rune
var lines int64
var fails int64
var sample []float64

p := message.NewPrinter(language.English)

// Clear terminal screen
err := clearTerminal()
if err != nil {
return nil, err
}

var steps int64
steps = opts.refresh
iteration := 0
for {
input, _, err := opts.reader.ReadRune()
if err != nil && err == io.EOF {
break
}
kfpd.Printf("%c", input)
data = append(data, input)
if input == '\n' {
value, err := processLine(&processLineInput{
line: data,
})
if err != nil && value == 0 {
fails++
} else {
sample = append(sample, value)
}
lines++

if lines%steps == 0 {
res, err := processSample(sample)
if err != nil {
return nil, err
}
fmt.Printf("\033[0;0H")
for _, field := range res.ListFields() {
format := "%s\t" + res.GetFormat(field) + "\n"
fmt.Printf(format, res.GetHeader(field), res.Get(field))
}
steps = steps * opts.factor
if steps > opts.cap {
steps = opts.cap
}
iteration = iteration + 1
_, err = p.Printf("\n[%d] next refresh at N modulo %d == 0", iteration, steps)
if err != nil {
return nil, err
}
}

// reset
data = []rune{}
Expand Down
5 changes: 2 additions & 3 deletions commands/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,8 @@ func printTranspose(res *ResultSet) {

var (
CommandSimple = &cli.Command{
Name: "simple",
Aliases: []string{"s"},
Usage: "simple statistics",
Name: "simple",
Usage: "simple statistics",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "path",
Expand Down
126 changes: 126 additions & 0 deletions commands/stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package commands

import (
"bufio"
"fmt"
"github.com/urfave/cli/v2"
"github.com/yargevad/filepathx"
"golang.org/x/text/language"
"golang.org/x/text/message"
"os"
)

var (
CommandStream = &cli.Command{
Name: "stream",
Usage: "stream process ",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "path",
Aliases: []string{"p"},
Usage: "File path to files to stream, can be a glob. If not set, a pipe is assumed.",
},
&cli.Int64Flag{
Name: "refresh",
Aliases: []string{"r"},
Usage: "how many rows of data between updates",
Value: 100,
},
&cli.Int64Flag{
Name: "factor",
Aliases: []string{"f"},
Usage: "rate of growth of refresh value",
Value: 10,
},
&cli.Int64Flag{
Name: "cap",
Aliases: []string{"c"},
Usage: "max value of refresh rate for updates",
Value: 100_000_000,
},
},
Action: func(c *cli.Context) error {
var sample []float64
if c.String("path") != "" {
kf.Printf("globbing files with pattern: %s", c.String("path"))
files, err := filepathx.Glob(c.String("path"))
if err != nil {
return err
}
kf.Printf("found %d files", len(files))
kf.Log(files)

// filter files
// For each file, create reader, pass in reader
for _, fPath := range files {
kf.Printf("processing file: %s", fPath)
file, err := os.Open(fPath)
if err != nil {
return err
}

reader := bufio.NewReader(file)
inner, err := processReaderStream(&processReaderStreamInput{
reader: reader,
refresh: c.Int64("refresh"),
factor: c.Int64("factor"),
cap: c.Int64("cap"),
})
if err != nil {
return err
}
sample = append(sample, inner...)
}
} else {
// run in pipe pass blocks
info, err := os.Stdin.Stat()
if err != nil {
return err
}

noNamedPipe := info.Mode()&os.ModeNamedPipe == 0
noUnixPipe := info.Mode()&os.ModeCharDevice != 0 || info.Size() <= 0
k.Printf("noNamedPipe: %t noUnixPipe: %t\n", noNamedPipe, noUnixPipe)

if noNamedPipe && noUnixPipe {
// if neither, throw error
_ = cli.ShowSubcommandHelp(c)
return fmt.Errorf("please use this command with a pipe or the --path flag set")
}

reader := bufio.NewReader(os.Stdin)
sample, err = processReaderStream(&processReaderStreamInput{
reader: reader,
refresh: c.Int64("refresh"),
factor: c.Int64("factor"),
cap: c.Int64("cap"),
})
if err != nil {
return err
}
}

res, err := processSample(sample)
if err != nil {
return err
}

// Clear terminal screen
err = clearTerminal()
if err != nil {
return err
}

printTranspose(&res)
fmt.Println("")

p := message.NewPrinter(language.English)
_, err = p.Printf("Done. Processed %d rows\n", res.n)
if err != nil {
return err
}

return nil
},
}
)
7 changes: 7 additions & 0 deletions commands/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,13 @@ type processReaderInput struct {
reader *bufio.Reader
}

type processReaderStreamInput struct {
reader *bufio.Reader
refresh int64
factor int64
cap int64
}

type ResultSet struct {
n int
min float64
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/stretchr/testify v1.7.2
github.com/urfave/cli/v2 v2.8.1
github.com/yargevad/filepathx v1.0.0
golang.org/x/text v0.3.7
)

require (
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ github.com/yargevad/filepathx v1.0.0 h1:SYcT+N3tYGi+NvazubCNlvgIPbzAk7i7y2dwg3I5
github.com/yargevad/filepathx v1.0.0/go.mod h1:BprfX/gpYNJHJfc35GjRRpVcwWXS89gGulUIU5tK3tA=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44 h1:Bli41pIlzTzf3KEY06n+xnzK/BESIg2ze4Pgfh/aI8c=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
1 change: 1 addition & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func main() {
app.Usage = "Statistics in Go - CLI tool for quick statistical analysis of data streams"
app.Commands = []*cli.Command{
commands.CommandSimple,
commands.CommandStream,
im,
{
Name: "version",
Expand Down

0 comments on commit a39c9cf

Please sign in to comment.