Skip to content

Commit

Permalink
Merge pull request #5 from shubhamcoc/main
Browse files Browse the repository at this point in the history
feat: adding configurable ssl configs and change the format of storing kafk…
  • Loading branch information
huantt committed Sep 12, 2023
2 parents f0e26b2 + 12cbcb9 commit 30efacc
Show file tree
Hide file tree
Showing 11 changed files with 236 additions and 53 deletions.
62 changes: 54 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,18 @@ Flags:
--limit uint Supports file splitting. Files are split by the number of messages specified
--max-waiting-seconds-for-new-message int Max waiting seconds for new message, then this process will be marked as finish. Set -1 to wait forever. (default 30)
--queued-max-messages-kbytes int Maximum number of kilobytes per topic+partition in the local consumer queue. This value may be overshot by fetch.message.max.bytes (default 128000)
--ssl-ca-location string location of client ca cert file in pem
--ssl-certificate-location string client's certificate location
--ssl-key-location string path to ssl private key
--ssl-key-password string password for ssl private key passphrase
--storage string Storage type: local file (file) or Google cloud storage (gcs) (default "file")
Global Flags:
--log-level string Log level (default "info")
```
#### Sample
- Connect to Kafka cluster without the SSL encryption being enabled for exporting the data.
```shell
kafka-dump export \
--storage=file
Expand All @@ -56,24 +62,49 @@ kafka-dump export \
--kafka-sasl-mechanism=PLAIN
```
- Connect to Kafka cluster with the SSL encryption being enabled for exporting the data.
```shell
kafka-dump export \
--storage=file
--file=path/to/output/data.parquet \
--kafka-topics=users-activities \
--kafka-group-id=id=kafka-dump.local \
--kafka-servers=localhost:9092 \
--kafka-username=admin \
--kafka-password=admin \
--kafka-security-protocol=SSL \
--kafka-sasl-mechanism=PLAIN
--ssl-ca-location=<path to ssl cacert>
--ssl-certificate-location=<path to ssl cert>
--ssl-key-location=<path to ssl key>
--ssl-key-password=<ssl key password>
```
### Import Kafka topics from parquet file
```shell
Usage:
import [flags]
Flags:
-f, --file string Output file path (required)
-h, --help help for import
--kafka-password string Kafka password
--kafka-sasl-mechanism string Kafka password
--kafka-security-protocol string Kafka security protocol
--kafka-servers string Kafka servers string
--kafka-username string Kafka username
-f, --file string Output file path (required)
-h, --help help for import
-i, --include-partition-and-offset to store partition and offset of kafka message in file
--kafka-password string Kafka password
--kafka-sasl-mechanism string Kafka password
--kafka-security-protocol string Kafka security protocol
--kafka-servers string Kafka servers string
--kafka-username string Kafka username
--ssl-ca-location string location of client ca cert file in pem
--ssl-certificate-location string client's certificate location
--ssl-key-location string path to ssl private key
--ssl-key-password string password for ssl private key passphrase

Global Flags:
--log-level string Log level (default "info")
```
#### Sample

- Connect to Kafka cluster without the SSL encryption being enabled for importing the data.
```shell
kafka-dump import \
--file=path/to/input/data.parquet \
Expand All @@ -84,6 +115,21 @@ kafka-dump import \
--kafka-sasl-mechanism=PLAIN
```

- Connect to Kafka cluster with the SSL encryption being enabled for importing the data.
```shell
kafka-dump import \
--file=path/to/input/data.parquet \
--kafka-servers=localhost:9092 \
--kafka-username=admin \
--kafka-password=admin \
--kafka-security-protocol=SSL \
--kafka-sasl-mechanism=PLAIN
--ssl-ca-location=<path to ssl cacert>
--ssl-certificate-location=<path to ssl cert>
--ssl-key-location=<path to ssl key>
--ssl-key-password=<ssl key password>
```

### Stream messages topic to topic
```shell
Usage:
Expand Down Expand Up @@ -166,4 +212,4 @@ kafka-dump export \

## TODO
- Import topics from multiple files or directory
- Import topics from Google Cloud Storage files or directory
- Import topics from Google Cloud Storage files or directory
32 changes: 24 additions & 8 deletions cmd/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package cmd
import (
"context"
"fmt"
"sync"
"time"

"github.com/huantt/kafka-dump/impl"
"github.com/huantt/kafka-dump/pkg/gcs_utils"
"github.com/huantt/kafka-dump/pkg/kafka_utils"
Expand All @@ -12,8 +15,6 @@ import (
"github.com/xitongsys/parquet-go-source/gcs"
"github.com/xitongsys/parquet-go-source/local"
"github.com/xitongsys/parquet-go/source"
"sync"
"time"
)

func CreateExportCommand() (*cobra.Command, error) {
Expand All @@ -34,18 +35,28 @@ func CreateExportCommand() (*cobra.Command, error) {
var storageType string
var gcsBucketName string
var gcsProjectID string
var sslCaLocation string
var sslKeyPassword string
var sslCertLocation string
var sslKeyLocation string
var enableAutoOffsetStore bool

command := cobra.Command{
Use: "export",
Run: func(cmd *cobra.Command, args []string) {
log.Infof("Limit: %d - Concurrent consumers: %d", exportLimitPerFile, concurrentConsumers)
kafkaConsumerConfig := kafka_utils.Config{
BootstrapServers: kafkaServers,
SecurityProtocol: kafkaSecurityProtocol,
SASLMechanism: kafkaSASKMechanism,
SASLUsername: kafkaUsername,
SASLPassword: kafkaPassword,
GroupId: kafkaGroupID,
BootstrapServers: kafkaServers,
SecurityProtocol: kafkaSecurityProtocol,
SASLMechanism: kafkaSASKMechanism,
SASLUsername: kafkaUsername,
SASLPassword: kafkaPassword,
GroupId: kafkaGroupID,
SSLCALocation: sslCaLocation,
SSLKeyPassword: sslKeyPassword,
SSLKeyLocation: sslKeyLocation,
SSLCertLocation: sslCertLocation,
EnableAutoOffsetStore: enableAutoOffsetStore,
}
consumer, err := kafka_utils.NewConsumer(kafkaConsumerConfig)
if err != nil {
Expand Down Expand Up @@ -113,6 +124,11 @@ func CreateExportCommand() (*cobra.Command, error) {
command.Flags().StringVar(&kafkaUsername, "kafka-username", "", "Kafka username")
command.Flags().StringVar(&kafkaPassword, "kafka-password", "", "Kafka password")
command.Flags().StringVar(&kafkaSASKMechanism, "kafka-sasl-mechanism", "", "Kafka password")
command.Flags().StringVar(&sslCaLocation, "ssl-ca-location", "", "location of client ca cert file in pem")
command.Flags().StringVar(&sslKeyPassword, "ssl-key-password", "", "password for ssl private key passphrase")
command.Flags().StringVar(&sslCertLocation, "ssl-certificate-location", "", "client's certificate location")
command.Flags().StringVar(&sslKeyLocation, "ssl-key-location", "", "path to ssl private key")
command.Flags().BoolVar(&enableAutoOffsetStore, "enable-auto-offset-store", true, "to store offset in kafka broker")
command.Flags().StringVar(&kafkaSecurityProtocol, "kafka-security-protocol", "", "Kafka security protocol")
command.Flags().StringVar(&kafkaGroupID, "kafka-group-id", "", "Kafka consumer group ID")
command.Flags().Uint64Var(&exportLimitPerFile, "limit", 0, "Supports file splitting. Files are split by the number of messages specified")
Expand Down
37 changes: 27 additions & 10 deletions cmd/importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cmd

import (
"fmt"

"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/huantt/kafka-dump/impl"
"github.com/huantt/kafka-dump/pkg/kafka_utils"
Expand All @@ -17,21 +18,36 @@ func CreateImportCmd() (*cobra.Command, error) {
var kafkaPassword string
var kafkaSecurityProtocol string
var kafkaSASKMechanism string
var sslCaLocation string
var sslKeyPassword string
var sslCertLocation string
var sslKeyLocation string
var includePartitionAndOffset bool

command := cobra.Command{
Use: "import",
Run: func(cmd *cobra.Command, args []string) {
log.Infof("Input file: %s", filePath)
parquetReader, err := impl.NewParquetReader(filePath)
parquetReader, err := impl.NewParquetReader(filePath, includePartitionAndOffset)
if err != nil {
panic(errors.Wrap(err, "Unable to init parquet file reader"))
}
kafkaProducerConfig := kafka_utils.Config{
BootstrapServers: kafkaServers,
SecurityProtocol: kafkaSecurityProtocol,
SASLMechanism: kafkaSASKMechanism,
SASLUsername: kafkaUsername,
SASLPassword: kafkaPassword,
BootstrapServers: kafkaServers,
SecurityProtocol: kafkaSecurityProtocol,
SASLMechanism: kafkaSASKMechanism,
SASLUsername: kafkaUsername,
SASLPassword: kafkaPassword,
ReadTimeoutSeconds: 0,
GroupId: "",
QueueBufferingMaxMessages: 0,
QueuedMaxMessagesKbytes: 0,
FetchMessageMaxBytes: 0,
SSLCALocation: sslCaLocation,
SSLKeyLocation: sslKeyLocation,
SSLCertLocation: sslCertLocation,
SSLKeyPassword: sslKeyPassword,
EnableAutoOffsetStore: false,
}
producer, err := kafka_utils.NewProducer(kafkaProducerConfig)
if err != nil {
Expand All @@ -54,10 +70,6 @@ func CreateImportCmd() (*cobra.Command, error) {
}
}()
importer := impl.NewImporter(producer, deliveryChan, parquetReader)
if err != nil {
panic(errors.Wrap(err, "Unable to init importer"))
}

err = importer.Run()
if err != nil {
panic(errors.Wrap(err, "Error while running importer"))
Expand All @@ -71,5 +83,10 @@ func CreateImportCmd() (*cobra.Command, error) {
command.Flags().StringVar(&kafkaSASKMechanism, "kafka-sasl-mechanism", "", "Kafka password")
command.Flags().StringVar(&kafkaSecurityProtocol, "kafka-security-protocol", "", "Kafka security protocol")
command.MarkFlagsRequiredTogether("kafka-username", "kafka-password", "kafka-sasl-mechanism", "kafka-security-protocol")
command.Flags().StringVar(&sslCaLocation, "ssl-ca-location", "", "location of client ca cert file in pem")
command.Flags().StringVar(&sslKeyPassword, "ssl-key-password", "", "password for ssl private key passphrase")
command.Flags().StringVar(&sslCertLocation, "ssl-certificate-location", "", "client's certificate location")
command.Flags().StringVar(&sslKeyLocation, "ssl-key-location", "", "path to ssl private key")
command.Flags().BoolVarP(&includePartitionAndOffset, "include-partition-and-offset", "i", false, "to store partition and offset of kafka message in file")
return &command, nil
}
2 changes: 1 addition & 1 deletion cmd/parquet_row_counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func CreateCountParquetRowCommand() (*cobra.Command, error) {
command := cobra.Command{
Use: "count-parquet-rows",
Run: func(cmd *cobra.Command, args []string) {
parquetReader, err := impl.NewParquetReader(filePath)
parquetReader, err := impl.NewParquetReader(filePath, false)
if err != nil {
panic(errors.Wrap(err, "Unable to init parquet file reader"))
}
Expand Down
14 changes: 9 additions & 5 deletions impl/exporter.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package impl

import (
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/huantt/kafka-dump/pkg/log"
"github.com/pkg/errors"
"os"
"os/signal"
"syscall"
"time"

"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/huantt/kafka-dump/pkg/log"
"github.com/pkg/errors"
)

type Exporter struct {
Expand Down Expand Up @@ -92,8 +93,11 @@ func (e *Exporter) flushData() error {
}
_, err = e.consumer.Commit()
if err != nil {
err = errors.Wrap(err, "Failed to commit messages")
return err
if kafkaErr, ok := err.(kafka.Error); ok && kafkaErr.Code() == kafka.ErrNoOffset {
log.Warnf("No offset, it can happen when there is no message to read, error is: %v", err)
} else {
return errors.Wrap(err, "Failed to commit messages")
}
}
return nil
}
5 changes: 3 additions & 2 deletions impl/importer.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package impl

import (
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/pkg/errors"
"os"
"os/signal"
"syscall"

"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/pkg/errors"
)

type Importer struct {
Expand Down
Loading

0 comments on commit 30efacc

Please sign in to comment.