diff --git a/kafka_producer.go b/kafka_producer.go index ba38470..0541ddf 100644 --- a/kafka_producer.go +++ b/kafka_producer.go @@ -28,8 +28,6 @@ type RecordMetadata struct { Error error } -type PartitionInfo struct{} -type Metric struct{} type ProducerConfig struct { Partitioner Partitioner MetadataExpire time.Duration @@ -94,19 +92,8 @@ type Producer interface { // Send the given record asynchronously and return a channel which will eventually contain the response information. Send(*ProducerRecord) <-chan *RecordMetadata - // Flush any accumulated records from the producer. Blocks until all sends are complete. - Flush() - - // Get a list of partitions for the given topic for custom partition assignment. The partition metadata will change - // over time so this list should not be cached. - PartitionsFor(topic string) []PartitionInfo - - // Return a map of metrics maintained by the producer - Metrics() map[string]Metric - - // Tries to close the producer cleanly within the specified timeout. If the close does not complete within the - // timeout, fail any pending send requests and force close the producer. - Close(timeout time.Duration) + // Tries to close the producer cleanly. + Close() } type KafkaProducer struct { @@ -114,13 +101,10 @@ type KafkaProducer struct { time time.Time keySerializer Serializer valueSerializer Serializer - metrics map[string]Metric messagesChan chan *ProducerRecord accumulatorConfig *RecordAccumulatorConfig - metricTags map[string]string connector siesta.Connector metadata *Metadata - RecordsMetadata chan *RecordMetadata } func NewKafkaProducer(config *ProducerConfig, keySerializer Serializer, valueSerializer Serializer, connector siesta.Connector) *KafkaProducer { @@ -128,15 +112,13 @@ func NewKafkaProducer(config *ProducerConfig, keySerializer Serializer, valueSer producer := &KafkaProducer{} producer.config = config producer.time = time.Now() - producer.metrics = make(map[string]Metric) producer.messagesChan = make(chan *ProducerRecord, config.BatchSize) producer.keySerializer = keySerializer producer.valueSerializer = valueSerializer producer.connector = connector producer.metadata = NewMetadata(connector, config.MetadataExpire) - networkClientConfig := NetworkClientConfig{} - client := NewNetworkClient(networkClientConfig, connector, config) + client := NewNetworkClient(connector, config) producer.accumulatorConfig = &RecordAccumulatorConfig{ batchSize: config.BatchSize, @@ -256,6 +238,10 @@ func (kp *KafkaProducer) messageDispatchLoop() { accumulator <- message } + + for _, accumulator := range accumulators { + close(accumulator) + } } func (kp *KafkaProducer) topicDispatchLoop(topicMessagesChan chan *ProducerRecord) { @@ -269,23 +255,12 @@ func (kp *KafkaProducer) topicDispatchLoop(topicMessagesChan chan *ProducerRecor accumulator.input <- message } -} - -func (kp *KafkaProducer) Flush() {} -func (kp *KafkaProducer) PartitionsFor(topic string) []PartitionInfo { - return []PartitionInfo{} -} - -func (kp *KafkaProducer) Metrics() map[string]Metric { - return make(map[string]Metric) + for _, accumulator := range accumulators { + close(accumulator.closeChan) + } } -// TODO return channel and remove timeout -func (kp *KafkaProducer) Close(timeout time.Duration) { - //closed := kp.accumulator.close() - //select { - //case <-closed: - //case <-time.After(timeout): - //} +func (kp *KafkaProducer) Close() { + close(kp.messagesChan) } diff --git a/kafka_producer_test.go b/kafka_producer_test.go index 682d952..743e592 100644 --- a/kafka_producer_test.go +++ b/kafka_producer_test.go @@ -40,7 +40,7 @@ func TestProducerSend1(t *testing.T) { t.Error("Could not get produce response within 5 seconds") } - producer.Close(1 * time.Second) + producer.Close() } func TestProducerSend1000(t *testing.T) { @@ -65,7 +65,7 @@ func TestProducerSend1000(t *testing.T) { } } - producer.Close(1 * time.Second) + producer.Close() } func TestProducerRequiredAcks0(t *testing.T) { @@ -93,7 +93,7 @@ func TestProducerRequiredAcks0(t *testing.T) { } } - producer.Close(1 * time.Second) + producer.Close() } func TestProducerFlushTimeout(t *testing.T) { @@ -120,7 +120,7 @@ func TestProducerFlushTimeout(t *testing.T) { } } - producer.Close(1 * time.Second) + producer.Close() } func TestProducerWithSeveralTopics(t *testing.T) { @@ -132,7 +132,7 @@ func TestProducerWithSeveralTopics(t *testing.T) { producer := NewKafkaProducer(producerConfig, ByteSerializer, StringSerializer, connector) metadatas := make([]<-chan *RecordMetadata, 1000) for i := 0; i < 1000; i++ { - topic := fmt.Sprintf("siesta-%d", rand.Intn(10)) + topic := fmt.Sprintf("siesta--34-%d", rand.Intn(10)) metadatas[i] = producer.Send(&ProducerRecord{Topic: topic, Value: fmt.Sprintf("%d", i)}) } diff --git a/network_client.go b/network_client.go index 7d074a7..6f0dc58 100644 --- a/network_client.go +++ b/network_client.go @@ -3,36 +3,22 @@ package producer import ( "fmt" "github.com/elodina/siesta" - "net" ) type NetworkClient struct { - connector siesta.Connector - metadata Metadata - socketSendBuffer int - socketReceiveBuffer int - clientId string - nodeIndexOffset int - correlation int - metadataFetchInProgress bool - lastNoNodeAvailableMs int64 - selector *Selector - connections map[string]*net.TCPConn - requiredAcks int - ackTimeoutMs int32 + connector siesta.Connector + selector *Selector + requiredAcks int + ackTimeoutMs int32 } -type NetworkClientConfig struct { -} - -func NewNetworkClient(config NetworkClientConfig, connector siesta.Connector, producerConfig *ProducerConfig) *NetworkClient { +func NewNetworkClient(connector siesta.Connector, producerConfig *ProducerConfig) *NetworkClient { client := &NetworkClient{} client.connector = connector client.requiredAcks = producerConfig.RequiredAcks client.ackTimeoutMs = producerConfig.AckTimeoutMs selectorConfig := NewSelectorConfig(producerConfig) client.selector = NewSelector(selectorConfig) - client.connections = make(map[string]*net.TCPConn, 0) return client } diff --git a/record_accumulator.go b/record_accumulator.go index be10cfb..b868071 100644 --- a/record_accumulator.go +++ b/record_accumulator.go @@ -15,8 +15,7 @@ type RecordAccumulator struct { networkClient *NetworkClient batchSize int - closing chan bool - closed chan bool + closeChan chan bool } func NewRecordAccumulator(config *RecordAccumulatorConfig) *RecordAccumulator { @@ -25,8 +24,7 @@ func NewRecordAccumulator(config *RecordAccumulatorConfig) *RecordAccumulator { accumulator.config = config accumulator.batchSize = config.batchSize accumulator.networkClient = config.networkClient - accumulator.closing = make(chan bool) - accumulator.closed = make(chan bool) + accumulator.closeChan = make(chan bool) go accumulator.sender() @@ -39,7 +37,7 @@ func (ra *RecordAccumulator) sender() { batchIndex := 0 for { select { - case <-ra.closing: + case <-ra.closeChan: return default: select {