Add Kinesis support#151
Conversation
Codecov Report
@@ Coverage Diff @@
## master #151 +/- ##
==========================================
- Coverage 93.54% 93.16% -0.38%
==========================================
Files 23 24 +1
Lines 1193 1244 +51
==========================================
+ Hits 1116 1159 +43
- Misses 54 58 +4
- Partials 23 27 +4
Continue to review full report at Codecov.
|
151c539 to
34fdfc8
Compare
| func TestGetDataRecorderWhenKinesis(t *testing.T) { | ||
| singletonDataRecorderOnce = sync.Once{} | ||
| defer gostub.StubFunc(&NewKinesisRecorder, nil).Reset() | ||
| config.Config.RecorderType = "kinesis" |
There was a problem hiding this comment.
Let's also set the RecorderType back to default after this test
| *models.EvalResult | ||
| } | ||
|
|
||
| // Unmalshalls the context |
There was a problem hiding this comment.
nit, comment looks wrong. Do you mean Payload marshals the EvalResult?
| logrus.WithField("kinesis_error", err).Error("error marshaling") | ||
| } | ||
|
|
||
| err = k.producer.Put(payload, kr.Key()) |
There was a problem hiding this comment.
Currently the kafka record schema looks like
{
"payload": string(EvalResult.MarshalBinary()) OR encrypted string,
"encrypted": false <---- optional, you can ignore this and implement encryption later if needed
}
Can we make the same abstraction/schema in kinesis? So that it's seamless to switch data recorders.
Another idea is to move https://github.com/checkr/flagr/blob/master/pkg/handler/data_recorder_kafka.go#L152 as the shared "message frame" for both Kafka and Kinesis
There was a problem hiding this comment.
I saw a lot of similarities, thought of doing that right away, but it could get messy thinking of versioning and commit history. What about starting with the duplication, making everything isolated and working, and then I can open a second PR with the abstractions, so first I can implement and figure what will be duplicated. WDYT?
There was a problem hiding this comment.
Sounds good for deferring the abstractions later. Let's at least make sure the message schema is the same, otherwise, there will be breaking changes for Kinesis in the future.
|
|
||
| go func() { | ||
| for err := range p.NotifyFailures() { | ||
| logrus.WithField("kinesis_error", err).Error("error pushing") |
There was a problem hiding this comment.
nit, the error message can be more specific, like logrus.WithField("kinesis_error", err).Error("error pushing to kinesis"), same for the error messages below
|
|
||
| "github.com/checkr/flagr/pkg/util" | ||
| "github.com/checkr/flagr/swagger_gen/models" | ||
| //"github.com/prashantv/gostub" |
| RecorderKafkaEncryptionKey string `env:"FLAGR_RECORDER_KAFKA_ENCRYPTION_KEY" envDefault:""` | ||
|
|
||
| // Kinesis related configurations for data records logging (Flagr Metrics) | ||
| RecorderKinesisStreamName string `env:"FLAGR_RECORDER_KINESIS_STREAM_NAME" envDefault:"flagr-stream"` |
There was a problem hiding this comment.
can be the same default as kafka, i.e. flagr-records
| RecorderKinesisAggregateBatchCount int `env:"FLAGR_RECORDER_KINESIS_AGGREGATE_BATCH_COUNT" envDefault:"4294967295"` | ||
| RecorderKinesisAggregateBatchSize int `env:"FLAGR_RECORDER_KINESIS_AGGREGATE_BATCH_SIZE" envDefault:"51200"` | ||
| RecorderKinesisVerbose bool `env:"FLAGR_RECORDER_KINESIS_VERBOSE" envDefault:"false"` | ||
|
|
There was a problem hiding this comment.
Also, what about kinesis connection credentials, not very familiar with kinesis, I would image you may need AWS keys.
There was a problem hiding this comment.
Yes, you need, but they are loaded implicitly by default from AWS packages, we can document but I wouldn't change the that behaviour.
There was a problem hiding this comment.
👍 it would be great if we can have some docs that point to the AWS behavior.
8615334 to
70d2ef1
Compare
zhouzhuojie
left a comment
There was a problem hiding this comment.
Thanks for the PR! LGTM. It would be great if we can cover more lines.
| } | ||
|
|
||
| // NewKinesisRecorder creates a new Kinesis recorder | ||
| var NewKinesisRecorder = func() DataRecorder { |
There was a problem hiding this comment.
it would be great if we can cover more lines
There was a problem hiding this comment.
Yes, it's still WIP, I have to test more things. I just didn't have time yet.
740d49a to
babad0c
Compare
|
@zhouzhuojie I think the code its pretty much done, please take a look. I'll be conducting some performance/delivery tests now to make sure like if I call 1M times, I have 1M records on the stream... Once I have that ready I'll mention here so we move forward. In the meantime, please feel free to look at the code and I can work on your comments right away. |
Delivery TestFor the delivery test I've created a small script to trigger 10k requests, to the evaluation endpoint, in parallel (batches of 8/10). That should e.g.: In the meantime I was reading from the stream and storing the content into a file. Later on I've grep the content of the file, looking for the IDs of the entities sent, filtering that out, sorting, checking if its uniq and counting the uniq IDs.
That returned 10k. |
openflagr#150 Environment defaults: https://github.com/a8m/kinesis-producer/blob/master/config.go#L12 - Add Kinesis data recorder (without encryption) - Add documentation for AWS Authentication with Kinesis - Increase test coverage of the data recorder Delivery test The delivery tests ran with a small script to trigger 10k requests, to the evaluation endpoint, in parallel (batches of 8/10). That should `put` at least 10k records into the stream. e.g.: ```script seq 1 10000 | parallel -I {} http --ignore-stdin post 'http://127.0.0.1:18000/api/v1/evaluation' entityID=neu-{} entityType=report flagID:=3 > neu.txt ``` While reading from the stream and storing the content into a file. By greping the content of the file, looking for the IDs of the entities sent, filtering that out, sorting, checking if its uniq and counting the uniq IDs. `grep "neu-[0-9]*" -ao kinesis.log | sort -V | uniq -c | sort` That returned 10k.
babad0c to
0a2f262
Compare
From: #150
Description
Add Kinesis support for the data recorder.
Motivation and Context
That makes it more flexible since Kafka is not easy to self-host and Kinesis works well-enough for small throughput rates.
How Has This Been Tested?
Still figuring how we should test this since Kinesis is not open source, mocking? or just best effort (unit + offline integration?)
Types of changes
Checklist:
Delivery Test
For the delivery test I've created a small script to trigger 10k requests, to the evaluation endpoint, in parallel (batches of 8/10). That should
putat least 10k records into the stream.e.g.:
In the meantime I was reading from the stream and storing the content into a file. Later on I've grep the content of the file, looking for the IDs of the entities sent, filtering that out, sorting, checking if its uniq and counting the uniq IDs.
grep "neu-[0-9]*" -ao kinesis.log | sort -V | uniq -c | sortThat returned 10k.