Skip to content

Commit

Permalink
Kafka producers and consumers for quakes.
Browse files Browse the repository at this point in the history
  • Loading branch information
gclitheroe committed Jul 4, 2023
1 parent 8acd48c commit 4283f1b
Show file tree
Hide file tree
Showing 205 changed files with 3,152 additions and 232 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
.idea/
cmd/sc3ml2quake/sc3ml2quake
cmd/quake-producer-kafka/quake-producer-kafka
cmd/quake-consumer-kafka/quake-consumer-kafka
30 changes: 29 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,18 @@
# exp

Experiments, learning, and testing. Also stuff and ting.
Experiments and learning. Also stuff and ting.

## Protobufs

Generate Go libs from protobuf files:

```
protoc --proto_path=protobuf --go_out=internal --go_opt=paths=source_relative quake/quake.proto quake/key.proto
```

## sc3ml2quake

Converts earthquake information in SeismComPML format to Quake protobufs. See also [Protobufs With Go](https://blog.geoffc.nz/protobufs-go/.)

### Unmarshal Performance

Expand All @@ -16,3 +28,19 @@ BenchmarkUnmarshalQuakeJSON-4 1000 1800593 ns/op - Quake JSON
BenchmarkUnmarshalQuakeProtobuf-4 10000 163473 ns/op - Quake protobuf
```

## Kafka

Use the Confluent Kafka platform in Docker Compose https://ksqldb.io/quickstart-platform.html#quickstart-content

### quake-producer-kafka

Sends Quake protobufs to a Kafka topic using schema registry and key and quake protobuf schemas from `protobuf/quake`.
Protobufs for two quakes are included in `cmd/quake-producer-kafka/demo-data`. See sc3ml2quake for creating more data.

### quake-consumer-kafka

Reads Quake protobufs from a Kafka topic.

## Acknowledgement

The New Zealand GeoNet programme and its sponsors EQC, GNS Science, LINZ, NEMA and MBIE are acknowledged for providing data used in this repo.
103 changes: 103 additions & 0 deletions cmd/quake-consumer-kafka/quake-consumer-kafka.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// See the instructions in quake-producer-kafka for starting the broker and creating a topic
package main

import (
"flag"
"github.com/confluentinc/confluent-kafka-go/v2/schemaregistry"
"github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde"
"github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde/protobuf"
"github.com/gclitheroe/exp/internal/quake"
"log"
"os"
"os/signal"
"syscall"
"time"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

func main() {
var bootstrap, topic, group, schemaRegistry string

flag.StringVar(&bootstrap, "bootstrap", "localhost", "the Kafka bootstrap server")
flag.StringVar(&schemaRegistry, "schema-registry", "http://localhost:8081", "url for the schema registry")
flag.StringVar(&topic, "topic", "quake", "the topic to consume from")
flag.StringVar(&group, "group", "quakeConsumer", "the group")

flag.Parse()

sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)

c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": bootstrap,
"group.id": group,
// Start reading from the first message of each assigned
// partition if there are no previously committed offsets
// for this group.
"auto.offset.reset": "earliest",
// Do not automatically store offsets.
// To enable at least once processing.
"enable.auto.offset.store": false,
})
if err != nil {
log.Fatal(err)
}

defer c.Close()

client, err := schemaregistry.NewClient(schemaregistry.NewConfig(schemaRegistry))
if err != nil {
log.Fatal(err)
}

ser, err := protobuf.NewDeserializer(client, serde.ValueSerde, protobuf.NewDeserializerConfig())
if err != nil {
log.Fatal(err)
}

if err != nil {
panic(err)
}

c.SubscribeTopics([]string{topic}, nil)

run := true
var q quake.Quake
var msg *kafka.Message

for run {
select {
case _ = <-sigchan:
log.Println("shutting down.")
run = false
default:
msg, err = c.ReadMessage(time.Second)
if err != nil {
// The client will automatically try to recover from all errors.
// Timeout is not considered an error because it is raised by
// ReadMessage in absence of messages.
if !err.(kafka.Error).IsTimeout() {
log.Printf("Consumer error: %v (%v)\n", err, msg)
}
continue
}

err = ser.DeserializeInto(topic, msg.Value, &q)
if err != nil {
log.Println(err)
continue
}

// Any processing happens here e.g., store in a DB
log.Printf("Received message for %s", q.PublicID)

// Once processing is complete store the offsets.
// This ensures at least once processing.
_, err = c.StoreMessage(msg)
if err != nil {
log.Println(err)
}
}
}
}
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
136 changes: 136 additions & 0 deletions cmd/quake-producer-kafka/quake-producer-kafka.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
// Use the Confluent Kafka platform https://docs.confluent.io/platform/current/platform-quickstart.html#quickstart
// wget https://raw.githubusercontent.com/confluentinc/cp-all-in-one/7.4.0-post/cp-all-in-one-kraft/docker-compose.yml
// docker-compose up -d
// visit http://localhost:9021/clusters
// Select the control center cluster
// Create a topic called quake using a protobuf schema with quake.proto schema from the protobuf dir in this repo.
// build and run this application
// go build
// ./quake-producer-kafka
package main

import (
"flag"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/confluentinc/confluent-kafka-go/v2/schemaregistry"
"github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde"
"github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde/protobuf"
"github.com/gclitheroe/exp/internal/quake"
"log"
"os"
)

type info struct {
publicID string
file string
}

func main() {
var bootstrap, topic, inDir, schemaRegistry string

flag.StringVar(&bootstrap, "bootstrap", "localhost", "the Kafka bootstrap server")
flag.StringVar(&schemaRegistry, "schema-registry", "http://localhost:8081", "url for the schema registry")
flag.StringVar(&topic, "topic", "quake", "the topic to consume from")
flag.StringVar(&inDir, "input-dir", "/work/quake", "directory with input quake protobuf files")

flag.Parse()

p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": bootstrap})
if err != nil {
log.Fatal(err)
}

defer p.Close()

client, err := schemaregistry.NewClient(schemaregistry.NewConfig(schemaRegistry))
if err != nil {
log.Fatal(err)
}

serValue, err := protobuf.NewSerializer(client, serde.ValueSerde, protobuf.NewSerializerConfig())
if err != nil {
log.Fatal(err)
}

serKey, err := protobuf.NewSerializer(client, serde.KeySerde, protobuf.NewSerializerConfig())
if err != nil {
log.Fatal(err)
}

// Delivery report handler for produced messages.
go func() {
var i info
var ok bool

for e := range p.Events() {
switch ev := e.(type) {
case *kafka.Message:
if ev.TopicPartition.Error != nil {
log.Printf("Delivery failed: %v.", ev.TopicPartition)
} else {
i, ok = ev.Opaque.(info)
if ok {
log.Printf("Delivered message for %s", i.publicID)
// the input file for the quake could be deleted here.
}
}
}
}
}()

// Produce messages to topic (asynchronously)

files, err := os.ReadDir(inDir)
if err != nil {
log.Fatal(err)
}

var file string
var k []byte
var v []byte
var q quake.Quake
var key quake.Key

for _, f := range files {
file = inDir + string(os.PathSeparator) + f.Name()

q, err = quake.Read(file)
if err != nil {
log.Println(err)
// errored files could be moved or deleted here in a real application.
continue
}

v, err = serValue.Serialize(topic, &q)
if err != nil {
log.Println(err)
// errored files could be moved or deleted here in a real application.
continue
}

key.QuakeID = q.PublicID

k, err = serKey.Serialize(topic, &key)
if err != nil {
log.Println(err)
// errored files could be moved or deleted here in a real application.
continue
}

err = p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Key: k,
Value: v,
Opaque: info{file: file, publicID: q.PublicID},
}, nil)
if err != nil {
log.Println(err)
}
}

// Wait for message deliveries before shutting down
for p.Flush(10000) > 0 {
log.Println("waiting to flush outstanding messages.")
}

}
53 changes: 32 additions & 21 deletions cmd/sc3ml2quake/sc3ml2quake.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
// Converts SeisCompML (XML) to quake protobufs.
// Usage: sc3ml2quake input-dir output-dir
// Usage: sc3ml2quake input-dir output-dir
// Source data s3://seiscompml07
package main

import (
"flag"
"github.com/gclitheroe/exp/internal/quake"
"github.com/gclitheroe/exp/internal/sc3ml"
"io"
Expand All @@ -19,7 +23,14 @@ type files struct {
}

func main() {
d, err := os.ReadDir(os.Args[1])
var inDir, outDir string

flag.StringVar(&inDir, "input-dir", "/work/sc3ml", "directory with input sc3ml files")
flag.StringVar(&outDir, "output-dir", "/work/quake", "directory for quake protobuf file output")

flag.Parse()

d, err := os.ReadDir(inDir)
if err != nil {
log.Fatal(err)
}
Expand All @@ -40,7 +51,7 @@ func main() {
continue
}

sc3ml <- files{in: os.Args[1] + string(os.PathSeparator) + f.Name(), out: os.Args[2] + string(os.PathSeparator) + o + ".pb"}
sc3ml <- files{in: inDir + string(os.PathSeparator) + f.Name(), out: outDir + string(os.PathSeparator) + o + ".pb"}
}
}
}()
Expand Down Expand Up @@ -115,16 +126,16 @@ func fromSC3ML(file string) (quake.Quake, error) {
mt := e.ModificationTime()

q = quake.Quake{
PublicID: e.PublicID,
Type: e.Type,
Agency: e.CreationInfo.AgencyID,
PublicID: e.PublicID,
QuakeType: e.Type,
Agency: e.CreationInfo.AgencyID,
ModificationTime: &quake.Timestamp{
Seconds: mt.Unix(),
Nanos: int64(mt.Nanosecond()),
Secs: mt.Unix(),
Nanos: int64(mt.Nanosecond()),
},
Time: &quake.Timestamp{
Seconds: e.PreferredOrigin.Time.Value.Unix(),
Nanos: int64(e.PreferredOrigin.Time.Value.Nanosecond()),
Secs: e.PreferredOrigin.Time.Value.Unix(),
Nanos: int64(e.PreferredOrigin.Time.Value.Nanosecond()),
},
Latitude: e.PreferredOrigin.Latitude.Value,
LatitudeUncertainty: e.PreferredOrigin.Latitude.Uncertainty,
Expand All @@ -151,8 +162,8 @@ func fromSC3ML(file string) (quake.Quake, error) {
if v.Weight > 0.0 {
p := &quake.Phase{
Time: &quake.Timestamp{
Seconds: v.Pick.Time.Value.Unix(),
Nanos: int64(v.Pick.Time.Value.Nanosecond()),
Secs: v.Pick.Time.Value.Unix(),
Nanos: int64(v.Pick.Time.Value.Nanosecond()),
},
Phase: v.Phase,
Residual: v.TimeResidual,
Expand All @@ -175,22 +186,22 @@ func fromSC3ML(file string) (quake.Quake, error) {
mag := &quake.Magnitude{
Magnitude: m.Magnitude.Value,
MagnitudeUncertainty: m.Magnitude.Uncertainty,
Type: m.Type,
Method: m.MethodID,
MagnitudeType: m.Type,
MagnitudeMethod: m.MethodID,
StationCount: m.StationCount,
}

for _, v := range m.StationMagnitudeContributions {
if v.Weight > 0.0 {
s := &quake.StationMagnitude{
Weight: v.Weight,
NetworkCode: v.StationMagnitude.WaveformID.NetworkCode,
StationCode: v.StationMagnitude.WaveformID.StationCode,
LocationCode: v.StationMagnitude.WaveformID.LocationCode,
ChannelCode: v.StationMagnitude.WaveformID.ChannelCode,
Magnitude: v.StationMagnitude.Magnitude.Value,
Type: v.StationMagnitude.Type,
Amplitude: v.StationMagnitude.Amplitude.Amplitude.Value,
Weight: v.Weight,
NetworkCode: v.StationMagnitude.WaveformID.NetworkCode,
StationCode: v.StationMagnitude.WaveformID.StationCode,
LocationCode: v.StationMagnitude.WaveformID.LocationCode,
ChannelCode: v.StationMagnitude.WaveformID.ChannelCode,
Magnitude: v.StationMagnitude.Magnitude.Value,
MagnitudeType: v.StationMagnitude.Type,
Amplitude: v.StationMagnitude.Amplitude.Amplitude.Value,
}

mag.StationMagnitude = append(mag.StationMagnitude, s)
Expand Down

0 comments on commit 4283f1b

Please sign in to comment.