Skip to content

Commit

Permalink
Merge pull request #31 from cam-inc/develop
Browse files Browse the repository at this point in the history
Merge to main branch
  • Loading branch information
KenFujimoto12 committed Jan 19, 2023
2 parents d608b76 + b76d386 commit c5c3a54
Show file tree
Hide file tree
Showing 13 changed files with 140 additions and 45 deletions.
5 changes: 4 additions & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ on:
push:
branches:
- '**'
pull_request:
types: [opened, reopened]

jobs:
sonarcloud:
name: SonarCloud
Expand All @@ -24,4 +27,4 @@ jobs:
uses: SonarSource/sonarcloud-github-action@master
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
12 changes: 7 additions & 5 deletions application/export_change_streams.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package application

import (
"cloud.google.com/go/bigquery"
"cloud.google.com/go/pubsub"
"context"
"fmt"
"strings"
"time"

"cloud.google.com/go/bigquery"
"cloud.google.com/go/pubsub"
"github.com/aws/aws-sdk-go-v2/service/kinesis"
"github.com/cam-inc/mxtransporter/config"
pconfig "github.com/cam-inc/mxtransporter/config/pubsub"
interfaceForBigquery "github.com/cam-inc/mxtransporter/interfaces/bigquery"
iff "github.com/cam-inc/mxtransporter/interfaces/file"
interfaceForKinesisStream "github.com/cam-inc/mxtransporter/interfaces/kinesis-stream"
Expand All @@ -20,8 +24,6 @@ import (
"go.mongodb.org/mongo-driver/mongo/options"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"strings"
"time"
)

type agent string
Expand Down Expand Up @@ -164,7 +166,7 @@ func (c *ChangeStreamsWatcherImpl) WatchChangeStreams(ctx context.Context) error
return err
}
psClientImpl := &interfaceForPubsub.PubsubClientImpl{psClient, c.Log}
psImpl = interfaceForPubsub.PubsubImpl{psClientImpl, c.Log}
psImpl = interfaceForPubsub.PubsubImpl{psClientImpl, c.Log, pconfig.PubSubConfig().OrderingBy}
case KinesisStream:
ksClient, err := c.Watcher.newKinesisClient(ctx)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion config/constant/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ const (
KINESIS_STREAM_NAME = "KINESIS_STREAM_NAME"
KINESIS_STREAM_REGION = "KINESIS_STREAM_REGION"

PUBSUB_TOPIC_NAME = "PUBSUB_TOPIC_NAME"
PUBSUB_TOPIC_NAME = "PUBSUB_TOPIC_NAME"
PUBSUB_ORDERING_BY = "PUBSUB_ORDERING_BY"

MONGODB_HOST = "MONGODB_HOST"
MONGODB_DATABASE = "MONGODB_DATABASE"
Expand Down
7 changes: 5 additions & 2 deletions config/pubsub/pubsub_config.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
package pubsub

import (
"github.com/cam-inc/mxtransporter/config/constant"
"os"

"github.com/cam-inc/mxtransporter/config/constant"
)

type PubSub struct {
TopicName string
TopicName string
OrderingBy string
}

func PubSubConfig() PubSub {
var psCfg PubSub
psCfg.TopicName = os.Getenv(constant.PUBSUB_TOPIC_NAME)
psCfg.OrderingBy = os.Getenv(constant.PUBSUB_ORDERING_BY)
return psCfg
}
15 changes: 10 additions & 5 deletions config/pubsub/pubsub_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,19 @@ import (

func Test_PubSubConfig(t *testing.T) {
t.Run("Check to call the set environment variable.", func(t *testing.T) {
mTopicID := "xxx"
if err := os.Setenv("PUBSUB_TOPIC_NAME", mTopicID); err != nil {
if err := os.Setenv("PUBSUB_TOPIC_NAME", "xxx"); err != nil {
t.Fatalf("Failed to set file PUBSUB_TOPIC_NAME environment variables.")
}

if err := os.Setenv("PUBSUB_ORDERING_BY", "yyy"); err != nil {
t.Fatalf("Failed to set file PUBSUB_ORDERING_BY environment variables.")
}
psCfg := PubSubConfig()
if e, a := psCfg.TopicName, mTopicID; !reflect.DeepEqual(e, a) {
t.Fatal("Environment variable MONGODB_DATABASE is not acquired correctly.")
want := PubSub{
TopicName: "xxx",
OrderingBy: "yyy",
}
if !reflect.DeepEqual(want, psCfg) {
t.Fatalf("Environment variable PUBSUB_* is not acquired correctly. want: %v, got: %v", want, psCfg)
}
})
}
11 changes: 10 additions & 1 deletion docs/gcp/gke/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,15 @@ or
EXPORT_DESTINATION=bigquery,pubsub
```

### Pubsub Ordering (optional)
If you want to order message in pubsub, set ```PUBSUB_ORDERING_BY``` env.<br>
Specify the field name of one of the Change Streams.<br>
https://cloud.google.com/pubsub/docs/ordering

**NOTICE**
ordering message can cause performance issues.<br>
see https://medium.com/google-cloud/google-cloud-pub-sub-ordered-delivery-1e4181f60bc8

### BigQuery schema (optional)
If you want to export change streams to BigQuery, specify the following table schema.

Expand Down Expand Up @@ -162,4 +171,4 @@ $ make upgrade
![image](https://user-images.githubusercontent.com/37132477/141406547-41edf9eb-5a17-4191-9ee3-3f13ba17ec07.png)

A pod is created for each collection, and a persistent volume is linked to each pod.
Since the StatefulSet is created, even if the pod stops, you can get the change streams by referring to the resume token saved in the persistent volume again.
Since the StatefulSet is created, even if the pod stops, you can get the change streams by referring to the resume token saved in the persistent volume again.
9 changes: 9 additions & 0 deletions docs/gcp/gke/README_JP.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,15 @@ or
EXPORT_DESTINATION=bigquery,pubsub
```

### Pubsub Ordering (オプション)
メッセージの順序指定を利用したい場合、環境変数```PUBSUB_ORDERING_BY```を設定する必要があります。<br>
Change Streamsのいずれかのフィールド名を指定します。<br>
https://cloud.google.com/pubsub/docs/ordering

**注意**
メッセージの順序指定はパフォーマンスに影響をもたらす可能性があります。<br>
参照: https://medium.com/google-cloud/google-cloud-pub-sub-ordered-delivery-1e4181f60bc8

### BigQuery スキーマ (オプション)
Change Streams を BigQuery にエクスポートしたい場合、以下のようなテーブルスキーマを指定する必要があります。

Expand Down
12 changes: 12 additions & 0 deletions docs/gcp/gke/helm/templates/stateless.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,18 @@ spec:
# secretKeyRef:
# name: {{ $.Values.secrets.name }}
# key: BIGQUERY_TABLE_{{ $collection }}
# # Optional
# - name: PUBSUB_TOPIC_NAME
# valueFrom:
# secretKeyRef:
# name: {{ $.Values.secrets.name }}
# key: PUBSUB_TOPIC_NAME{{ $collection }}
# # Optional
# - name: PUBSUB_ORDERING_BY
# valueFrom:
# secretKeyRef:
# name: {{ $.Values.secrets.name }}
# key: PUBSUB_ORDERING_BY{{ $collection }}
- name: RESUME_TOKEN_VOLUME_DIR
valueFrom:
secretKeyRef:
Expand Down
2 changes: 2 additions & 0 deletions docs/gcp/gke/secrets.env.template
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ EXPORT_DESTINATION=
# Optional
## You have to specify this environment variable if you want to export Cloud PubSub.
PUBSUB_TOPIC_NAME=
## You have to specify this environment variable if you want to take advantage of PubSub message ordering.
PUBSUB_ORDERING_BY=

# Require
## Specify the time zone you run this middleware by referring to the following. (e.g. TIME_ZONE=Asia/Tokyo)
Expand Down
52 changes: 39 additions & 13 deletions interfaces/pubsub/export.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
package pubsub

import (
"cloud.google.com/go/pubsub"
"context"
"encoding/json"
"fmt"
"strings"
"time"

"cloud.google.com/go/pubsub"
pubsubConfig "github.com/cam-inc/mxtransporter/config/pubsub"
"github.com/cam-inc/mxtransporter/pkg/errors"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.uber.org/zap"
"strings"
"time"
)

var results []*pubsub.PublishResult
Expand All @@ -18,12 +20,13 @@ type (
IPubsub interface {
topicExists(ctx context.Context, topicID string) (bool, error)
createTopic(ctx context.Context, topicID string) (*pubsub.Topic, error)
publishMessage(ctx context.Context, topicID string, csArray []string) error
publishMessage(ctx context.Context, topicID string, csArray []string, pmo ...publishMessageOption) error
}

PubsubImpl struct {
Pubsub IPubsub
Log *zap.SugaredLogger
Pubsub IPubsub
Log *zap.SugaredLogger
OrderingBy string
}

PubsubClientImpl struct {
Expand All @@ -32,6 +35,14 @@ type (
}
)

func withOrderingKey(orderingKey string) publishMessageOption {
return func(o *pubsub.Message) {
o.OrderingKey = orderingKey
}
}

type publishMessageOption func(opts *pubsub.Message)

func (p *PubsubClientImpl) topicExists(ctx context.Context, topicID string) (bool, error) {
return p.PubsubClient.Topic(topicID).Exists(ctx)
}
Expand All @@ -40,13 +51,16 @@ func (p *PubsubClientImpl) createTopic(ctx context.Context, topicID string) (*pu
return p.PubsubClient.CreateTopic(ctx, topicID)
}

func (p *PubsubClientImpl) publishMessage(ctx context.Context, topicID string, csArray []string) error {
func (p *PubsubClientImpl) publishMessage(ctx context.Context, topicID string, csArray []string, pmo ...publishMessageOption) error {
topic := p.PubsubClient.Topic(topicID)
defer topic.Stop()

r := topic.Publish(ctx, &pubsub.Message{
message := &pubsub.Message{
Data: []byte(strings.Join(csArray, "|")),
})
}
for _, pmo := range pmo {
pmo(message)
}
r := topic.Publish(ctx, message)

for _, r := range append(results, r) {
id, err := r.Get(ctx)
Expand Down Expand Up @@ -111,9 +125,21 @@ func (p *PubsubImpl) ExportToPubsub(ctx context.Context, cs primitive.M) error {
string(updDesc),
}

if err := p.Pubsub.publishMessage(ctx, topicID, r); err != nil {
return err
if p.OrderingBy != "" {
key, err := p.orderingKey(cs)
if err != nil {
return err
}
return p.Pubsub.publishMessage(ctx, topicID, r, withOrderingKey(key))
}

return nil
return p.Pubsub.publishMessage(ctx, topicID, r)
}

func (p *PubsubImpl) orderingKey(cs primitive.M) (string, error) {
key, ok := cs[p.OrderingBy]
if !ok {
return "", errors.InvalidErrorPubSubOrderingKey.New(fmt.Sprintf("Failed to get orderingKey cs: %v, orderingBy: %s", cs, p.OrderingBy))
}
return fmt.Sprintf("%v", key), nil
}
5 changes: 3 additions & 2 deletions interfaces/pubsub/export_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
package pubsub

import (
"cloud.google.com/go/pubsub"
"context"
"fmt"
"reflect"

"cloud.google.com/go/pubsub"
)

type mockPubsubClientImpl struct {
Expand All @@ -23,7 +24,7 @@ func (*mockPubsubClientImpl) createTopic(ctx context.Context, topicID string) (*
return nil, nil
}

func (m *mockPubsubClientImpl) publishMessage(_ context.Context, _ string, csArray []string) error {
func (m *mockPubsubClientImpl) publishMessage(_ context.Context, _ string, csArray []string, pmo ...publishMessageOption) error {
if csArray == nil {
return fmt.Errorf("Expect csItems to not be nil.")
}
Expand Down
Loading

0 comments on commit c5c3a54

Please sign in to comment.