Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add api to get mau summaries #554

Merged
merged 5 commits into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
61 changes: 61 additions & 0 deletions manifests/bucketeer/charts/batch/templates/cronjob.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -606,3 +606,64 @@ spec:
exit 1
fi
restartPolicy: Never

---
apiVersion: batch/v1
kind: CronJob
metadata:
name: {{ template "batch-server.fullname" . }}-mau-summarizer
namespace: {{ .Values.namespace }}
labels:
app: {{ template "batch-server.name" . }}
chart: {{ template "batch-server.chart" . }}
release: {{ template "batch-server.fullname" . }}
heritage: {{ .Release.Service }}
spec:
concurrencyPolicy: Forbid
timeZone: {{ .Values.env.timezone }}
schedule: "{{ .Values.cronjob.mauSummarizerSchedule }}"
successfulJobsHistoryLimit: {{ .Values.cronjob.successfulJobsHistoryLimit }}
failedJobsHistoryLimit: {{ .Values.cronjob.failedJobsHistoryLimit }}
jobTemplate:
spec:
backoffLimit: 0
template:
spec:
volumes:
- name: service-cert-secret
secret:
secretName: {{ template "service-cert-secret" . }}
- name: service-token-secret
secret:
secretName: {{ template "service-token-secret" . }}
containers:
- name: experiment-calculator
image: curlimages/curl:8.1.2
imagePullPolicy: IfNotPresent
volumeMounts:
- name: service-cert-secret
mountPath: /usr/local/certs/service
readOnly: true
- name: service-token-secret
mountPath: /usr/local/service-token
readOnly: true
env:
- name: WEB_GATEWAY_ADDRESS
value: "{{ .Values.cronjob.webGatewayAddress }}"
command:
- /bin/sh
args:
- -c
- |
echo "Start mau-summarizer job."
ENDPOINT="${WEB_GATEWAY_ADDRESS}/bucketeer.batch.BatchService/ExecuteBatchJob"
TOKEN=`cat /usr/local/service-token/token`
RES=`curl -X POST --cacert /usr/local/certs/service/tls.crt -d '{"job": "MauSummarizer"}' -H "authorization: bearer ${TOKEN}" -H "Content-Type: application/json" -s -o /dev/null -w '%{http_code}\\n' ${ENDPOINT}`
echo "mau-summarizer job result: ${RES}"
if [ "$RES" == 200 ]
then
exit 0
else
exit 1
fi
restartPolicy: Never
1 change: 1 addition & 0 deletions manifests/bucketeer/charts/batch/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,4 @@ cronjob:
opsProgressiveRolloutWatcherSchedule: "* * * * *"
redisCounterDeleterSchedule: "0 0 * * *"
experimentCalculatorSchedule: "*/30 * * * *"
mauSummarizerSchedule: "30 0 * * *"
4 changes: 2 additions & 2 deletions manifests/bucketeer/charts/web-gateway/values.yaml

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions pkg/batch/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type batchService struct {
progressiveRolloutWatcher jobs.Job
redisCounterDeleter jobs.Job
experimentCalculator jobs.Job
mauSummarizer jobs.Job
environmentClient envclient.Client
domainEventPuller puller.Puller
notificationSender notificationsender.Sender
Expand All @@ -61,6 +62,7 @@ func NewBatchService(
notificationSender notificationsender.Sender,
redisCounterDeleter jobs.Job,
experimentCalculator jobs.Job,
mauSummarizer jobs.Job,
logger *zap.Logger,
options ...notification.Option,
) *batchService {
Expand All @@ -74,6 +76,7 @@ func NewBatchService(
progressiveRolloutWatcher: progressiveRolloutWatcher,
redisCounterDeleter: redisCounterDeleter,
experimentCalculator: experimentCalculator,
mauSummarizer: mauSummarizer,
environmentClient: environmentClient,
domainEventPuller: domainEventPuller,
notificationSender: notificationSender,
Expand Down Expand Up @@ -112,6 +115,8 @@ func (s *batchService) ExecuteBatchJob(
err = s.progressiveRolloutWatcher.Run(ctx)
case batch.BatchJob_ExperimentCalculator:
err = s.experimentCalculator.Run(ctx)
case batch.BatchJob_MauSummarizer:
err = s.mauSummarizer.Run(ctx)
default:
s.logger.Error("Unknown job",
log.FieldsFromImcomingContext(ctx).AddFields(
Expand Down
8 changes: 8 additions & 0 deletions pkg/batch/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/bucketeer-io/bucketeer/pkg/batch/jobs"
"github.com/bucketeer-io/bucketeer/pkg/batch/jobs/calculator"
"github.com/bucketeer-io/bucketeer/pkg/batch/jobs/experiment"
"github.com/bucketeer-io/bucketeer/pkg/batch/jobs/mau"
"github.com/bucketeer-io/bucketeer/pkg/batch/jobs/notification"
"github.com/bucketeer-io/bucketeer/pkg/batch/jobs/opsevent"
"github.com/bucketeer-io/bucketeer/pkg/batch/jobs/rediscounter"
Expand Down Expand Up @@ -627,6 +628,13 @@ func newBatchService(t *testing.T,
jobs.WithTimeout(5*time.Minute),
jobs.WithLogger(logger),
),
mau.NewMAUSummarizer(
mysqlMockClient,
eventCounterMockClient,
jpLocation,
jobs.WithTimeout(30*time.Minute),
jobs.WithLogger(logger),
),
logger,
notification.WithRunningDurationPerBatch(pullerRunningDuration),
)
Expand Down
8 changes: 8 additions & 0 deletions pkg/batch/cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/bucketeer-io/bucketeer/pkg/batch/jobs"
"github.com/bucketeer-io/bucketeer/pkg/batch/jobs/calculator"
"github.com/bucketeer-io/bucketeer/pkg/batch/jobs/experiment"
"github.com/bucketeer-io/bucketeer/pkg/batch/jobs/mau"
"github.com/bucketeer-io/bucketeer/pkg/batch/jobs/notification"
"github.com/bucketeer-io/bucketeer/pkg/batch/jobs/opsevent"
"github.com/bucketeer-io/bucketeer/pkg/batch/jobs/rediscounter"
Expand Down Expand Up @@ -387,6 +388,13 @@ func (s *server) Run(ctx context.Context, metrics metrics.Metrics, logger *zap.L
jobs.WithTimeout(5*time.Minute),
jobs.WithLogger(logger),
),
mau.NewMAUSummarizer(
mysqlClient,
eventCounterClient,
location,
jobs.WithTimeout(30*time.Minute),
jobs.WithLogger(logger),
),
logger,
notification.WithRunningDurationPerBatch(*s.runningDurationPerBatch),
notification.WithLogger(logger),
Expand Down
79 changes: 79 additions & 0 deletions pkg/batch/jobs/mau/mau_summarizer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package mau

import (
"context"
"fmt"
"time"

"go.uber.org/zap"

"github.com/bucketeer-io/bucketeer/pkg/batch/jobs"
ecclient "github.com/bucketeer-io/bucketeer/pkg/eventcounter/client"
"github.com/bucketeer-io/bucketeer/pkg/storage/v2/mysql"
"github.com/bucketeer-io/bucketeer/proto/eventcounter"
)

type mauSummarizer struct {
mysqlClient mysql.Client
eventCounterClient ecclient.Client
location *time.Location
opts *jobs.Options
logger *zap.Logger
}

func NewMAUSummarizer(
mysqlClient mysql.Client,
eventCounterClient ecclient.Client,
location *time.Location,
opts ...jobs.Option) jobs.Job {
dopts := &jobs.Options{
Timeout: 5 * time.Minute,
Logger: zap.NewNop(),
}
for _, opt := range opts {
opt(dopts)
}
return &mauSummarizer{
mysqlClient: mysqlClient,
eventCounterClient: eventCounterClient,
location: location,
opts: dopts,
logger: dopts.Logger.Named("mau-count-watcher"),
}
}

func (s *mauSummarizer) Run(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, s.opts.Timeout)
defer cancel()
now := time.Now().In(s.location)
yesterday := s.getYesterday(now)
_, err := s.eventCounterClient.SummarizeMAUCounts(
ctx,
&eventcounter.SummarizeMAUCountsRequest{
YearMonth: s.newYearMonth(int32(yesterday.Year()), int32(yesterday.Month())),
IsFinished: s.isEndDateOfMonth(yesterday),
},
)
if err != nil {
s.logger.Error("Failed to summarize MAU counts",
zap.Error(err),
zap.Int32("year", int32(yesterday.Year())),
zap.Int32("month", int32(yesterday.Month())),
)
return err
}
return nil
}

func (s *mauSummarizer) getYesterday(now time.Time) time.Time {
return now.AddDate(0, 0, -1)
}

func (s *mauSummarizer) newYearMonth(year, month int32) string {
return fmt.Sprintf("%d%02d", year, month)
}

func (s *mauSummarizer) isEndDateOfMonth(target time.Time) bool {
next := target.AddDate(0, 0, 1)
return next.Day() == 1
}
166 changes: 166 additions & 0 deletions pkg/batch/jobs/mau/mau_summarizer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
// Copyright 2023 The Bucketeer Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package mau

import (
"context"
"errors"
"testing"
"time"

"github.com/stretchr/testify/assert"
"go.uber.org/mock/gomock"
"go.uber.org/zap"

"github.com/bucketeer-io/bucketeer/pkg/batch/jobs"
ecclientmock "github.com/bucketeer-io/bucketeer/pkg/eventcounter/client/mock"
mysqlmock "github.com/bucketeer-io/bucketeer/pkg/storage/v2/mysql/mock"
ecproto "github.com/bucketeer-io/bucketeer/proto/eventcounter"
)

func TestRun(t *testing.T) {
t.Parallel()
mockController := gomock.NewController(t)
defer mockController.Finish()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

patterns := []struct {
desc string
setup func(r *mauSummarizer)
expected error
}{
{
desc: "fail",
setup: func(r *mauSummarizer) {
r.eventCounterClient.(*ecclientmock.MockClient).EXPECT().SummarizeMAUCounts(gomock.Any(), gomock.Any()).Return(
nil, errors.New("test"))
},
expected: errors.New("test"),
},
{
desc: "success",
setup: func(r *mauSummarizer) {
r.eventCounterClient.(*ecclientmock.MockClient).EXPECT().SummarizeMAUCounts(gomock.Any(), gomock.Any()).Return(
&ecproto.SummarizeMAUCountsResponse{}, nil)
},
expected: nil,
},
}

for _, p := range patterns {
t.Run(p.desc, func(t *testing.T) {
summarizer := newMockMAUSummarizer(t, mockController)
p.setup(summarizer)
err := summarizer.Run(ctx)
assert.Equal(t, p.expected, err)
})
}
}

func TestGetYesterday(t *testing.T) {
t.Parallel()
jst := time.FixedZone("Asia/Tokyo", 9*60*60)
patterns := []struct {
desc string
input time.Time
expected time.Time
}{
{
desc: "success",
input: time.Date(2023, 1, 31, 1, 00, 00, 0, time.UTC),
expected: time.Date(2023, 1, 30, 1, 00, 00, 0, time.UTC),
},
{
desc: "over month",
input: time.Date(2023, 2, 1, 1, 00, 00, 0, time.UTC),
expected: time.Date(2023, 1, 31, 1, 00, 00, 0, time.UTC),
},
{
desc: "over year",
input: time.Date(2023, 1, 1, 1, 00, 00, 0, time.UTC),
expected: time.Date(2022, 12, 31, 1, 00, 00, 0, time.UTC),
},
{
desc: "success JST",
input: time.Date(2023, 1, 31, 1, 00, 00, 0, jst),
expected: time.Date(2023, 1, 30, 1, 00, 00, 0, jst),
},
{
desc: "over month JST",
input: time.Date(2023, 2, 1, 1, 00, 00, 0, jst),
expected: time.Date(2023, 1, 31, 1, 00, 00, 0, jst),
},
{
desc: "over year JST",
input: time.Date(2023, 1, 1, 1, 00, 00, 0, jst),
expected: time.Date(2022, 12, 31, 1, 00, 00, 0, jst),
},
}
for _, p := range patterns {
t.Run(p.desc, func(t *testing.T) {
summarizer := &mauSummarizer{}
actual := summarizer.getYesterday(p.input)
assert.Equal(t, p.expected, actual)
})
}
}

func TestIsEndDateOfMonth(t *testing.T) {
t.Parallel()
jst := time.FixedZone("Asia/Tokyo", 9*60*60)
patterns := []struct {
desc string
input time.Time
expected bool
}{
{
desc: "not end of month",
input: time.Date(2023, 1, 30, 1, 00, 00, 0, time.UTC),
expected: false,
},
{
desc: "success1",
input: time.Date(2023, 1, 31, 1, 00, 00, 0, time.UTC),
expected: true,
},
{
desc: "success2",
input: time.Date(2023, 2, 1, 1, 00, 00, 0, jst),
expected: false,
},
}
for _, p := range patterns {
t.Run(p.desc, func(t *testing.T) {
summarizer := &mauSummarizer{}
actual := summarizer.isEndDateOfMonth(p.input)
assert.Equal(t, p.expected, actual)
})
}
}

func newMockMAUSummarizer(t *testing.T, c *gomock.Controller) *mauSummarizer {
t.Helper()
return &mauSummarizer{
mysqlClient: mysqlmock.NewMockClient(c),
eventCounterClient: ecclientmock.NewMockClient(c),
location: time.UTC,
opts: &jobs.Options{
Timeout: 30 * time.Second,
},
logger: zap.NewNop().Named("test-mau-summarizer"),
}
}