Skip to content

Commit

Permalink
Cleanups
Browse files Browse the repository at this point in the history
  • Loading branch information
serejja committed Mar 17, 2016
1 parent 1722dd9 commit 94fa767
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 66 deletions.
49 changes: 12 additions & 37 deletions kafka_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ type RecordMetadata struct {
Error error
}

type PartitionInfo struct{}
type Metric struct{}
type ProducerConfig struct {
Partitioner Partitioner
MetadataExpire time.Duration
Expand Down Expand Up @@ -94,49 +92,33 @@ type Producer interface {
// Send the given record asynchronously and return a channel which will eventually contain the response information.
Send(*ProducerRecord) <-chan *RecordMetadata

// Flush any accumulated records from the producer. Blocks until all sends are complete.
Flush()

// Get a list of partitions for the given topic for custom partition assignment. The partition metadata will change
// over time so this list should not be cached.
PartitionsFor(topic string) []PartitionInfo

// Return a map of metrics maintained by the producer
Metrics() map[string]Metric

// Tries to close the producer cleanly within the specified timeout. If the close does not complete within the
// timeout, fail any pending send requests and force close the producer.
Close(timeout time.Duration)
// Tries to close the producer cleanly.
Close()
}

type KafkaProducer struct {
config *ProducerConfig
time time.Time
keySerializer Serializer
valueSerializer Serializer
metrics map[string]Metric
messagesChan chan *ProducerRecord
accumulatorConfig *RecordAccumulatorConfig
metricTags map[string]string
connector siesta.Connector
metadata *Metadata
RecordsMetadata chan *RecordMetadata
}

func NewKafkaProducer(config *ProducerConfig, keySerializer Serializer, valueSerializer Serializer, connector siesta.Connector) *KafkaProducer {
log.Println("Starting the Kafka producer")
producer := &KafkaProducer{}
producer.config = config
producer.time = time.Now()
producer.metrics = make(map[string]Metric)
producer.messagesChan = make(chan *ProducerRecord, config.BatchSize)
producer.keySerializer = keySerializer
producer.valueSerializer = valueSerializer
producer.connector = connector
producer.metadata = NewMetadata(connector, config.MetadataExpire)

networkClientConfig := NetworkClientConfig{}
client := NewNetworkClient(networkClientConfig, connector, config)
client := NewNetworkClient(connector, config)

producer.accumulatorConfig = &RecordAccumulatorConfig{
batchSize: config.BatchSize,
Expand Down Expand Up @@ -256,6 +238,10 @@ func (kp *KafkaProducer) messageDispatchLoop() {

accumulator <- message
}

for _, accumulator := range accumulators {
close(accumulator)
}
}

func (kp *KafkaProducer) topicDispatchLoop(topicMessagesChan chan *ProducerRecord) {
Expand All @@ -269,23 +255,12 @@ func (kp *KafkaProducer) topicDispatchLoop(topicMessagesChan chan *ProducerRecor

accumulator.input <- message
}
}

func (kp *KafkaProducer) Flush() {}

func (kp *KafkaProducer) PartitionsFor(topic string) []PartitionInfo {
return []PartitionInfo{}
}

func (kp *KafkaProducer) Metrics() map[string]Metric {
return make(map[string]Metric)
for _, accumulator := range accumulators {
close(accumulator.closeChan)
}
}

// TODO return channel and remove timeout
func (kp *KafkaProducer) Close(timeout time.Duration) {
//closed := kp.accumulator.close()
//select {
//case <-closed:
//case <-time.After(timeout):
//}
func (kp *KafkaProducer) Close() {
close(kp.messagesChan)
}
10 changes: 5 additions & 5 deletions kafka_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestProducerSend1(t *testing.T) {
t.Error("Could not get produce response within 5 seconds")
}

producer.Close(1 * time.Second)
producer.Close()
}

func TestProducerSend1000(t *testing.T) {
Expand All @@ -65,7 +65,7 @@ func TestProducerSend1000(t *testing.T) {
}
}

producer.Close(1 * time.Second)
producer.Close()
}

func TestProducerRequiredAcks0(t *testing.T) {
Expand Down Expand Up @@ -93,7 +93,7 @@ func TestProducerRequiredAcks0(t *testing.T) {
}
}

producer.Close(1 * time.Second)
producer.Close()
}

func TestProducerFlushTimeout(t *testing.T) {
Expand All @@ -120,7 +120,7 @@ func TestProducerFlushTimeout(t *testing.T) {
}
}

producer.Close(1 * time.Second)
producer.Close()
}

func TestProducerWithSeveralTopics(t *testing.T) {
Expand All @@ -132,7 +132,7 @@ func TestProducerWithSeveralTopics(t *testing.T) {
producer := NewKafkaProducer(producerConfig, ByteSerializer, StringSerializer, connector)
metadatas := make([]<-chan *RecordMetadata, 1000)
for i := 0; i < 1000; i++ {
topic := fmt.Sprintf("siesta-%d", rand.Intn(10))
topic := fmt.Sprintf("siesta--34-%d", rand.Intn(10))
metadatas[i] = producer.Send(&ProducerRecord{Topic: topic, Value: fmt.Sprintf("%d", i)})
}

Expand Down
24 changes: 5 additions & 19 deletions network_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,36 +3,22 @@ package producer
import (
"fmt"
"github.com/elodina/siesta"
"net"
)

type NetworkClient struct {
connector siesta.Connector
metadata Metadata
socketSendBuffer int
socketReceiveBuffer int
clientId string
nodeIndexOffset int
correlation int
metadataFetchInProgress bool
lastNoNodeAvailableMs int64
selector *Selector
connections map[string]*net.TCPConn
requiredAcks int
ackTimeoutMs int32
connector siesta.Connector
selector *Selector
requiredAcks int
ackTimeoutMs int32
}

type NetworkClientConfig struct {
}

func NewNetworkClient(config NetworkClientConfig, connector siesta.Connector, producerConfig *ProducerConfig) *NetworkClient {
func NewNetworkClient(connector siesta.Connector, producerConfig *ProducerConfig) *NetworkClient {
client := &NetworkClient{}
client.connector = connector
client.requiredAcks = producerConfig.RequiredAcks
client.ackTimeoutMs = producerConfig.AckTimeoutMs
selectorConfig := NewSelectorConfig(producerConfig)
client.selector = NewSelector(selectorConfig)
client.connections = make(map[string]*net.TCPConn, 0)
return client
}

Expand Down
8 changes: 3 additions & 5 deletions record_accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ type RecordAccumulator struct {
networkClient *NetworkClient
batchSize int

closing chan bool
closed chan bool
closeChan chan bool
}

func NewRecordAccumulator(config *RecordAccumulatorConfig) *RecordAccumulator {
Expand All @@ -25,8 +24,7 @@ func NewRecordAccumulator(config *RecordAccumulatorConfig) *RecordAccumulator {
accumulator.config = config
accumulator.batchSize = config.batchSize
accumulator.networkClient = config.networkClient
accumulator.closing = make(chan bool)
accumulator.closed = make(chan bool)
accumulator.closeChan = make(chan bool)

go accumulator.sender()

Expand All @@ -39,7 +37,7 @@ func (ra *RecordAccumulator) sender() {
batchIndex := 0
for {
select {
case <-ra.closing:
case <-ra.closeChan:
return
default:
select {
Expand Down

0 comments on commit 94fa767

Please sign in to comment.