Skip to content

Commit

Permalink
add ability to stream to kinesis firehose
Browse files Browse the repository at this point in the history
  • Loading branch information
iamatypeofwalrus committed Jan 22, 2019
1 parent 11c1e27 commit cd5954c
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 12 deletions.
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ module github.com/iamatypeofwalrus/kpr

require (
github.com/aws/aws-sdk-go v1.16.23
github.com/go-ini/ini v1.35.0
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af
github.com/stretchr/testify v1.3.0 // indirect
github.com/urfave/cli v1.20.0
golang.org/x/net v0.0.0-20190119204137-ed066c81e75e // indirect
golang.org/x/text v0.3.0 // indirect
)
18 changes: 12 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
github.com/aws/aws-sdk-go v1.13.32 h1:AoV2boU+diwKoMaschMtUJim3nmBpM/4y45UqY708F4=
github.com/aws/aws-sdk-go v1.13.32/go.mod h1:ZRmQr0FajVIyZ4ZzBYKG5P3ZqPz9IHG41ZoMu1ADI3k=
github.com/aws/aws-sdk-go v1.16.23 h1:MwBOBeez0XEFVh6DCc888X+nHVBCjUDLnnWXSGGWUgM=
github.com/aws/aws-sdk-go v1.16.23/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/go-ini/ini v1.35.0 h1:D/my3+xOfqZMkJpciRcyqU7XMBUgiZa9qXjZIa8uv2k=
github.com/go-ini/ini v1.35.0/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8=
github.com/jmespath/go-jmespath v0.0.0-20160202185014-0b12d6b521d8 h1:12VvqtR6Aowv3l/EQUlocDHW2Cp4G9WJVH7uyH8QFJE=
github.com/jmespath/go-jmespath v0.0.0-20160202185014-0b12d6b521d8/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5imkbgOkpRUYLnmbU7UEFbjtDA2hxJ1ichM=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/urfave/cli v1.20.0 h1:fDqGv3UG/4jbVl/QkFwEdddtEDjh/5Ov6X+0B/3bPaw=
github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
golang.org/x/net v0.0.0-20190119204137-ed066c81e75e h1:MDa3fSUp6MdYHouVmCCNz/zaH2a6CRcxY3VhT/K3C5Q=
golang.org/x/net v0.0.0-20190119204137-ed066c81e75e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
104 changes: 100 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@ import (
"bufio"
"context"
"fmt"
"io"
"io/ioutil"
"log"
"os"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/firehose"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/urfave/cli"
)
Expand All @@ -31,10 +35,19 @@ func main() {
Usage: "Amazon Web Service `REGION`",
Value: "us-east-1",
},
cli.StringFlag{
Name: "delimiter, d",
Usage: "Kinesis Firehose `DELIMITER`",
Value: "\n",
},
cli.BoolFlag{
Name: "help, h",
Usage: "show this help message",
},
cli.BoolFlag{
Name: "verbose",
Usage: "log verbosely",
},
}

app.Action = do
Expand All @@ -51,6 +64,13 @@ func do(c *cli.Context) error {
cli.ShowAppHelp(c)
return nil
}

if !c.Bool("verbose") {
log.SetOutput(ioutil.Discard)
}

delimiter := c.String("delimiter")

streamName := c.String("stream")
if streamName == "" {
return fmt.Errorf("--stream, -s flag is required")
Expand All @@ -63,21 +83,95 @@ func do(c *cli.Context) error {
&aws.Config{Region: aws.String(region)},
),
)
svc := kinesis.New(sess)
return stream(streamName, svc)

kinesisClient := kinesis.New(sess)
firehoseClient := firehose.New(sess)

streamExists, isFirehose := checkStream(streamName, kinesisClient, firehoseClient)

if !streamExists {
return fmt.Errorf("stream %v doesn't exists in kinesis streams or firehose", streamName)
}

if isFirehose {
return streamToFirehose(streamName, os.Stdin, delimiter, firehoseClient)
}

return streamToKinesis(streamName, os.Stdin, kinesisClient)
}

func checkStream(streamName string, kinesisClient *kinesis.Kinesis, firehoseClient *firehose.Firehose) (exists, useFirehose bool) {
log.Println("checking if stream", streamName, "exists")
exists = false
useFirehose = false

_, err := kinesisClient.DescribeStream(
&kinesis.DescribeStreamInput{StreamName: aws.String(streamName)},
)

if err == nil {
log.Println("it exists! stream is a Kinesis stream")
exists = true
return
}

_, err = firehoseClient.DescribeDeliveryStream(
&firehose.DescribeDeliveryStreamInput{DeliveryStreamName: aws.String(streamName)},
)

if err == nil {
log.Println("it exists! stream is a Firehose")
exists, useFirehose = true, true
return
}

log.Println("could not find stream in kinesis or firehose")
return
}

func streamToFirehose(streamName string, input io.Reader, delimiter string, svc *firehose.Firehose) error {
log.Print("streaming to firehose")
scanner := bufio.NewScanner(input)

var count uint
for scanner.Scan() {
req := &firehose.PutRecordInput{
DeliveryStreamName: aws.String(streamName),
Record: &firehose.Record{
Data: []byte(scanner.Text() + delimiter),
},
}

_, err := svc.PutRecord(req)

if err != nil {
return err
}

count++
}

log.Println("streamed", count, "records")

if err := scanner.Err(); err != nil {
return err
}

return nil
}

func stream(streamName string, svc *kinesis.Kinesis) error {
func streamToKinesis(streamName string, input io.Reader, svc *kinesis.Kinesis) error {
// Hey! You're probably wondering what the hell I'm doing here with this variable.
// I want to approximate a round-robin strategy when sending data to Kinesis to make sure
// all shards get an equal amount of data. Unfortunately, doing a real round robin strategy
// is a pain in the ass to implement with the Kinesis API. We're going to do a poor man's
// round robin by letting the hash function that is applied to the partition key do it's job
// and just send it an monotonically increasing integer. We _should_ get good coverage across
// all shards this way.
log.Print("streaming to kinesis")
var count uint
ctx := context.Background()
scanner := bufio.NewScanner(os.Stdin)
scanner := bufio.NewScanner(input)
for scanner.Scan() {
req := &kinesis.PutRecordInput{
StreamName: aws.String(streamName),
Expand All @@ -93,6 +187,8 @@ func stream(streamName string, svc *kinesis.Kinesis) error {
count++
}

log.Print("streamed", count, "records")

if err := scanner.Err(); err != nil {
return err
}
Expand Down

0 comments on commit cd5954c

Please sign in to comment.