Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error when try to start my application with go_kafka_client included #1

Closed
vladiacob opened this issue Dec 6, 2014 · 5 comments
Closed

Comments

@vladiacob
Copy link
Contributor

This is what error i got:

github.com/stealthly/go_kafka_client
../../../github.com/stealthly/go_kafka_client/testing_utils.go:33: not enough arguments in call to zk.StartTestCluster
../../../github.com/stealthly/go_kafka_client/testing_utils.go:148: too many arguments in call to producer.NewKafkaProducer
../../../github.com/stealthly/go_kafka_client/testing_utils.go:151: p.Send undefined (type *producer.KafkaProducer has no field or method Send)

I commented content from the testing_utils.go and after this I passed this error but i got another one:

panic: runtime error: invalid memory address or nil pointer dereference
[signal 0xb code=0x1 addr=0x70 pc=0x4593c2]

goroutine 1 [running]:
runtime.panic(0x72fbc0, 0xba6108)
    /usr/lib/go/src/pkg/runtime/panic.c:266 +0xb6
github.com/stealthly/go_kafka_client.Infof(0x6985a0, 0xc21007df30, 0x6985a0, 0xc21007df40, 0xc21007df20, ...)
    /home/vagrant/workspace/ideazz-go/src/github.com/stealthly/go_kafka_client/utils.go:55 +0x112
github.com/stealthly/go_kafka_client.NewConsumer(0xc210050580, 0x0)
    /home/vagrant/workspace/ideazz-go/src/github.com/stealthly/go_kafka_client/consumer.go:84 +0x18c
main.main()
    /home/vagrant/workspace/ideazz-go/src/bitbucket.org/viacob/ideazz-indexer/main.go:46 +0x3e5

goroutine 3 [semacquire]:
sync.runtime_Syncsemacquire(0xc21005ff90)
    /usr/lib/go/src/pkg/runtime/sema.goc:257 +0xca
sync.(*Cond).Wait(0xc21005ff80)
    /usr/lib/go/src/pkg/sync/cond.go:62 +0x89
github.com/cihub/seelog.(*asyncLoopLogger).processItem(0xc210039780, 0x0)
    /home/vagrant/workspace/ideazz-go/src/github.com/cihub/seelog/behavior_asynclooplogger.go:50 +0x95
github.com/cihub/seelog.(*asyncLoopLogger).processQueue(0xc210039780)
    /home/vagrant/workspace/ideazz-go/src/github.com/cihub/seelog/behavior_asynclooplogger.go:63 +0x31
created by github.com/cihub/seelog.newAsyncLoopLogger
    /home/vagrant/workspace/ideazz-go/src/github.com/cihub/seelog/behavior_asynclooplogger.go:40 +0x74

goroutine 4 [semacquire]:
sync.runtime_Syncsemacquire(0xc21005f210)
    /usr/lib/go/src/pkg/runtime/sema.goc:257 +0xca
sync.(*Cond).Wait(0xc21005f200)
    /usr/lib/go/src/pkg/sync/cond.go:62 +0x89
github.com/cihub/seelog.(*asyncLoopLogger).processItem(0xc2100398a0, 0x0)
    /home/vagrant/workspace/ideazz-go/src/github.com/cihub/seelog/behavior_asynclooplogger.go:50 +0x95
github.com/cihub/seelog.(*asyncLoopLogger).processQueue(0xc2100398a0)
    /home/vagrant/workspace/ideazz-go/src/github.com/cihub/seelog/behavior_asynclooplogger.go:63 +0x31
created by github.com/cihub/seelog.newAsyncLoopLogger
    /home/vagrant/workspace/ideazz-go/src/github.com/cihub/seelog/behavior_asynclooplogger.go:40 +0x74

goroutine 6 [syscall]:
os/signal.loop()
    /usr/lib/go/src/pkg/os/signal/signal_unix.go:21 +0x1e
created by os/signal.init·1
    /usr/lib/go/src/pkg/os/signal/signal_unix.go:27 +0x31
exit status 2

My code is:

    // Zookeeper config
    zkConfig := kafka.NewZookeeperConfig()
    zkConfig.ZookeeperConnect = []string{fmt.Sprintf("localhost:%d", os.Getenv("IDEAZZ_ZOOKEEPER_PORT"))}

    // Initialize kafka config
    config := kafka.DefaultConsumerConfig()
    config.AutoOffsetReset = kafka.SmallestOffset
    config.Groupid = "ideazz-product-group"
    config.Coordinator = kafka.NewZookeeperCoordinator(zkConfig)
    config.Strategy = func(_ *kafka.Worker, message *kafka.Message, id kafka.TaskId) kafka.WorkerResult {
        return kafka.NewSuccessfulResult(id)
    }
    config.WorkerFailureCallback = func(worker_manager *kafka.WorkerManager) kafka.FailedDecision {
        return kafka.DoNotCommitOffsetAndContinue
    }
    config.WorkerFailedAttemptCallback = func(_ *kafka.Task, _ kafka.WorkerResult) kafka.FailedDecision {

        return kafka.DoNotCommitOffsetAndContinue
    }

    consumer := kafka.NewConsumer(config)
    go consumer.StartStatic(map[string]int{"new-product": 3})
@vladiacob
Copy link
Contributor Author

I made a pull request to fix this issues: #2

@vladiacob
Copy link
Contributor Author

How can I read from kafka from 10 seconds to 10 seconds ? Now is reading when I'm opening the server but after this not retry again.

If I publish messages after consumer was started, consumer will get this messages only after is restarted.

@serejja
Copy link
Contributor

serejja commented Dec 8, 2014

@vladiacob I merged you pull request. Thank you for your contribution!

Not sure about your problem but consider trying config.AutoResetOffset = SmallestOffset as the default value is LargestOffset and the problem sounds like you don't get messages when you produce them earlier than you start the consumer.

@vladiacob
Copy link
Contributor Author

If I start consumer and after this I start producer I don't get messages in consumer. After this if I stop consumer and start again I get all messages sent before but if I try to send a new message is the same behaviour.

@serejja
Copy link
Contributor

serejja commented Dec 10, 2014

I created a separate issue #6 for this problem. The fix is on the way.

The current issue looks solved so I close it.

@serejja serejja closed this as completed Dec 10, 2014
serejja added a commit that referenced this issue Feb 16, 2015
serejja pushed a commit that referenced this issue Jul 1, 2015
Getting latest from stealthly
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants