Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create a MockCluster within Go for testing #729

Merged
merged 20 commits into from Apr 8, 2022

Conversation

SourceFellows
Copy link
Contributor

Hi,

this PR exposes the MockCluster from librdkafka to Golang.
You are able to start a MockCluster with:

kafka.NewMockCluster(1)

and read the corresponding bootstrap.servers property via:

broker := mockCluster.Bootstrapservers()

Remote communication is also possible.

An example is also added.

Thanks,
Kristian

Copy link
Contributor

@edenhill edenhill left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is great stuff!

examples/mockserver_example/mockserver.go Outdated Show resolved Hide resolved
examples/mockserver_example/mockserver.go Outdated Show resolved Hide resolved
kafka/mockcluster.go Show resolved Hide resolved
kafka/mockcluster.go Outdated Show resolved Hide resolved
kafka/mockcluster.go Outdated Show resolved Hide resolved
kafka/mockcluster.go Outdated Show resolved Hide resolved
kafka/mockcluster.go Outdated Show resolved Hide resolved
kafka/mockcluster.go Outdated Show resolved Hide resolved
kafka/mockcluster.go Outdated Show resolved Hide resolved
kafka/librdkafka_vendor/rdkafka_mock.h Show resolved Hide resolved
kafka/mockcluster.go Outdated Show resolved Hide resolved
@kkoehler
Copy link
Contributor

Hi @edenhill,

still something missing?
I can't figure out what does "changes requested" mean.... is there any work needed on my side? ;-)
Thanks,
Kristian

@gstilwellDO
Copy link

Hi @edenhill,

still something missing? I can't figure out what does "changes requested" mean.... is there any work needed on my side? ;-) Thanks, Kristian

Indeed, it would be great to get this merged! I've got a project itching to use it :)

@jliunyu
Copy link
Contributor

jliunyu commented Mar 5, 2022

Hi @edenhill,

still something missing? I can't figure out what does "changes requested" mean.... is there any work needed on my side? ;-) Thanks, Kristian

Let's wait for a CI test before the merge.


mc.cConf = C.rd_kafka_conf_new()

mc.rk = C.rd_kafka_new(C.RD_KAFKA_PRODUCER, mc.cConf, cErrstr, 256)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an issue here when Close() is called which causes a crash.

In the documentation for rd_kafka_new it says:

conf is an optional struct created with rd_kafka_conf_new() that will be used instead of the default configuration. The conf object is freed by this function on success and must not be used or destroyed by the application sub-sequently. See rd_kafka_conf_set() et.al for more information.

This means that on success there should be no calls to C.rd_kafka_conf_destroy(mc.cConf) which is done in Close()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. I removed the call to the conf_destroy function to prevent problems ;-) Thanks

@@ -0,0 +1,100 @@
package main

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we rename this to mockcluster_example instead, to match the API naming?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": broker,
// Avoid connecting to IPv6 brokers:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this comment block

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

* limitations under the License.
*/

package kafka
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put the package as the first line of the file

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


package kafka

import "C"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicate import, remove this one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok done


package kafka

import "C"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicate import, remove this one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same

// @remark This is an experimental public API that is NOT covered by the
// librdkafka API or ABI stability guarantees.
//
// @warning THIS IS AN EXPERIMENTAL API, SUBJECT TO CHANGE OR REMOVAL.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// @warning THIS IS AN EXPERIMENTAL API, SUBJECT TO CHANGE OR REMOVAL.
// Warning: THIS IS AN EXPERIMENTAL API, SUBJECT TO CHANGE OR REMOVAL.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

cErrstr := (*C.char)(C.malloc(C.size_t(256)))
defer C.free(unsafe.Pointer(cErrstr))

mc.cConf = C.rd_kafka_conf_new()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to store this on the MockCluster object, it is only used in this function

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed the config reference from MockCluster


mc := &MockCluster{}

cErrstr := (*C.char)(C.malloc(C.size_t(256)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make this 512 to avoid truncation

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok done

mc.mcluster = C.rd_kafka_mock_cluster_new(mc.rk, C.int(brokerCount))
if mc.mcluster == nil {
C.rd_kafka_destroy(mc.rk)
C.rd_kafka_conf_destroy(mc.cConf)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this line, the cConf is already freed by the successful rd_kafka_new call

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. removed the call

}

// Bootstrapservers returns the bootstrap.servers property for this MockCluster
func (mc *MockCluster) Bootstrapservers() string {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename to BootstrapServers()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@kkoehler
Copy link
Contributor

Ok. Sorry for the single commits. @edenhill changed the impl according to your suggestion

Copy link
Contributor

@edenhill edenhill left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@edenhill
Copy link
Contributor

I can't merge this just yet because we need you to sign the CLA, and we're in the midst of the process of switching CLA signing solutions.
Will ping you when that is in place, thanks for your work on this and your patience!

@CLAassistant
Copy link

CLAassistant commented Apr 1, 2022

CLA assistant check
All committers have signed the CLA.

@jeffwidman
Copy link
Contributor

jeffwidman commented Apr 7, 2022

@edenhill it looks like this is ready to go including CLA... any chance of a merge with it? We're itching to use it...

Also, many thanks for all the hard work @SourceFellows!

Update: I was thinking even if it's on master but not yet tagged, that'd be helpful as we can pin to the specific commit. But it looks like there's no support for go modules yet?? #553

@edenhill edenhill merged commit f9fce7a into confluentinc:master Apr 8, 2022
1 check passed
@edenhill
Copy link
Contributor

edenhill commented Apr 8, 2022

Thank you for this!

@jeffwidman
Copy link
Contributor

Thanks for the quick merge!

@edenhill
Copy link
Contributor

edenhill commented Apr 8, 2022

I think a blog post how to use this as part of application testing would be fantastic.. just saying..saying

@kkoehler
Copy link
Contributor

kkoehler commented Apr 8, 2022

hearing that ;-) i started but not finished yet... going soon

eran-levy pushed a commit to eran-levy/confluent-kafka-go that referenced this pull request Apr 9, 2022
…confluentinc#729)

* Added MockCluster implementation which can be used for testing

* adjust documentation for mock cluster

* Added example for MockCluster

* Fixed naming

Co-authored-by: Magnus Edenhill <magnus@edenhill.se>

* fixed typo

Co-authored-by: Magnus Edenhill <magnus@edenhill.se>

* removed

Co-authored-by: Magnus Edenhill <magnus@edenhill.se>

* fixed comment

Co-authored-by: Magnus Edenhill <magnus@edenhill.se>

* adjust documentation

Co-authored-by: Magnus Edenhill <magnus@edenhill.se>

* adjustments regarding comments from Magnus

* fixed cleanup code for kafka configuration

* fixed destroy for C references

* fixed spaces

* removed call to `C.rd_kafka_conf_destroy(mc.cConf)` in close

* added docu for MockCluster type

* moved package statement into first line

* adjustments according to @edenhill comments

* Update kafka/mockcluster.go

Co-authored-by: Magnus Edenhill <magnus@edenhill.se>

* removed duplicate import

* removed the remark as suggested from @edelhill

* changed typedef from struct_rd_kafka_mock_cluster_s to rd_kafka_mock_cluster_t

Co-authored-by: Kristian Köhler <github@kkoehler.com>
Co-authored-by: Magnus Edenhill <magnus@edenhill.se>
Co-authored-by: Kristian Köhler <kkoehler@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

8 participants