Skip to content

Commit

Permalink
Use stdin for example data reader instead of file path
Browse files Browse the repository at this point in the history
  • Loading branch information
harlow committed Sep 2, 2019
1 parent e3ee95b commit b451fc4
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 11 deletions.
10 changes: 9 additions & 1 deletion README.md
Expand Up @@ -349,7 +349,15 @@ There are example Produder and Consumer code in `/cmd` directory. These should h

The examples run locally against [Kinesis Lite](https://github.com/mhart/kinesalite).

$ kinesalite
$ kinesalite &

Produce data to the stream:

$ cat cmd/producer/users.txt | go run cmd/producer/main.go --stream myStream

Consume data from the stream:

$ go run cmd/consumer/main.go --stream myStream

## Contributing

Expand Down
2 changes: 1 addition & 1 deletion cmd/producer/README.md
Expand Up @@ -4,4 +4,4 @@ A prepopulated file with JSON users is available on S3 for seeing the stream.

## Running the code

$ go run main.go --stream streamName
$ cat users.txt | go run main.go --stream streamName
19 changes: 10 additions & 9 deletions cmd/producer/main.go
Expand Up @@ -21,13 +21,6 @@ func main() {
)
flag.Parse()

// open dummy user data
f, err := os.Open("users.txt")
if err != nil {
log.Fatal("Cannot open users.txt file")
}
defer f.Close()

var records []*kinesis.PutRecordsRequestEntry

var client = kinesis.New(session.Must(session.NewSession(
Expand All @@ -43,7 +36,8 @@ func main() {
}

// loop over file data
b := bufio.NewScanner(f)
b := bufio.NewScanner(os.Stdin)

for b.Scan() {
records = append(records, &kinesis.PutRecordsRequestEntry{
Data: b.Bytes(),
Expand Down Expand Up @@ -79,8 +73,15 @@ func createStream(client *kinesis.Kinesis, streamName *string) error {
ShardCount: aws.Int64(2),
},
)
if err != nil {
return err
}

return err
return client.WaitUntilStreamExists(
&kinesis.DescribeStreamInput{
StreamName: streamName,
},
)
}

func putRecords(client *kinesis.Kinesis, streamName *string, records []*kinesis.PutRecordsRequestEntry) {
Expand Down

0 comments on commit b451fc4

Please sign in to comment.