Skip to content

Commit

Permalink
feat: add mau summarizer cronjob
Browse files Browse the repository at this point in the history
  • Loading branch information
masaaania committed Oct 26, 2023
1 parent 78f10e0 commit 6094e9d
Show file tree
Hide file tree
Showing 9 changed files with 356 additions and 22 deletions.
61 changes: 61 additions & 0 deletions manifests/bucketeer/charts/batch/templates/cronjob.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -606,3 +606,64 @@ spec:
exit 1
fi
restartPolicy: Never

---
apiVersion: batch/v1
kind: CronJob
metadata:
name: {{ template "batch-server.fullname" . }}-mau-summarizer
namespace: {{ .Values.namespace }}
labels:
app: {{ template "batch-server.name" . }}
chart: {{ template "batch-server.chart" . }}
release: {{ template "batch-server.fullname" . }}
heritage: {{ .Release.Service }}
spec:
concurrencyPolicy: Forbid
timeZone: {{ .Values.env.timezone }}
schedule: "{{ .Values.cronjob.mauSummarizerSchedule }}"
successfulJobsHistoryLimit: {{ .Values.cronjob.successfulJobsHistoryLimit }}
failedJobsHistoryLimit: {{ .Values.cronjob.failedJobsHistoryLimit }}
jobTemplate:
spec:
backoffLimit: 0
template:
spec:
volumes:
- name: service-cert-secret
secret:
secretName: {{ template "service-cert-secret" . }}
- name: service-token-secret
secret:
secretName: {{ template "service-token-secret" . }}
containers:
- name: experiment-calculator
image: curlimages/curl:8.1.2
imagePullPolicy: IfNotPresent
volumeMounts:
- name: service-cert-secret
mountPath: /usr/local/certs/service
readOnly: true
- name: service-token-secret
mountPath: /usr/local/service-token
readOnly: true
env:
- name: WEB_GATEWAY_ADDRESS
value: "{{ .Values.cronjob.webGatewayAddress }}"
command:
- /bin/sh
args:
- -c
- |
echo "Start mau-summarizer job."
ENDPOINT="${WEB_GATEWAY_ADDRESS}/bucketeer.batch.BatchService/ExecuteBatchJob"
TOKEN=`cat /usr/local/service-token/token`
RES=`curl -X POST --cacert /usr/local/certs/service/tls.crt -d '{"job": "MauSummarizer"}' -H "authorization: bearer ${TOKEN}" -H "Content-Type: application/json" -s -o /dev/null -w '%{http_code}\\n' ${ENDPOINT}`
echo "mau-summarizer job result: ${RES}"
if [ "$RES" == 200 ]
then
exit 0
else
exit 1
fi
restartPolicy: Never
1 change: 1 addition & 0 deletions manifests/bucketeer/charts/batch/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,4 @@ cronjob:
opsProgressiveRolloutWatcherSchedule: "* * * * *"
redisCounterDeleterSchedule: "0 0 * * *"
experimentCalculatorSchedule: "*/30 * * * *"
mauSummarizerSchedule: "30 0 * * *"
5 changes: 5 additions & 0 deletions pkg/batch/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type batchService struct {
progressiveRolloutWatcher jobs.Job
redisCounterDeleter jobs.Job
experimentCalculator jobs.Job
mauSummarizer jobs.Job
environmentClient envclient.Client
domainEventPuller puller.Puller
notificationSender notificationsender.Sender
Expand All @@ -61,6 +62,7 @@ func NewBatchService(
notificationSender notificationsender.Sender,
redisCounterDeleter jobs.Job,
experimentCalculator jobs.Job,
mauSummarizer jobs.Job,
logger *zap.Logger,
options ...notification.Option,
) *batchService {
Expand All @@ -74,6 +76,7 @@ func NewBatchService(
progressiveRolloutWatcher: progressiveRolloutWatcher,
redisCounterDeleter: redisCounterDeleter,
experimentCalculator: experimentCalculator,
mauSummarizer: mauSummarizer,
environmentClient: environmentClient,
domainEventPuller: domainEventPuller,
notificationSender: notificationSender,
Expand Down Expand Up @@ -112,6 +115,8 @@ func (s *batchService) ExecuteBatchJob(
err = s.progressiveRolloutWatcher.Run(ctx)
case batch.BatchJob_ExperimentCalculator:
err = s.experimentCalculator.Run(ctx)
case batch.BatchJob_MauSummarizer:
err = s.mauSummarizer.Run(ctx)
default:
s.logger.Error("Unknown job",
log.FieldsFromImcomingContext(ctx).AddFields(
Expand Down
8 changes: 8 additions & 0 deletions pkg/batch/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/bucketeer-io/bucketeer/pkg/batch/jobs"
"github.com/bucketeer-io/bucketeer/pkg/batch/jobs/calculator"
"github.com/bucketeer-io/bucketeer/pkg/batch/jobs/experiment"
"github.com/bucketeer-io/bucketeer/pkg/batch/jobs/mau"
"github.com/bucketeer-io/bucketeer/pkg/batch/jobs/notification"
"github.com/bucketeer-io/bucketeer/pkg/batch/jobs/opsevent"
"github.com/bucketeer-io/bucketeer/pkg/batch/jobs/rediscounter"
Expand Down Expand Up @@ -627,6 +628,13 @@ func newBatchService(t *testing.T,
jobs.WithTimeout(5*time.Minute),
jobs.WithLogger(logger),
),
mau.NewMAUSummarizer(
mysqlMockClient,
eventCounterMockClient,
jpLocation,
jobs.WithTimeout(30*time.Minute),
jobs.WithLogger(logger),
),
logger,
notification.WithRunningDurationPerBatch(pullerRunningDuration),
)
Expand Down
8 changes: 8 additions & 0 deletions pkg/batch/cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/bucketeer-io/bucketeer/pkg/batch/jobs"
"github.com/bucketeer-io/bucketeer/pkg/batch/jobs/calculator"
"github.com/bucketeer-io/bucketeer/pkg/batch/jobs/experiment"
"github.com/bucketeer-io/bucketeer/pkg/batch/jobs/mau"
"github.com/bucketeer-io/bucketeer/pkg/batch/jobs/notification"
"github.com/bucketeer-io/bucketeer/pkg/batch/jobs/opsevent"
"github.com/bucketeer-io/bucketeer/pkg/batch/jobs/rediscounter"
Expand Down Expand Up @@ -387,6 +388,13 @@ func (s *server) Run(ctx context.Context, metrics metrics.Metrics, logger *zap.L
jobs.WithTimeout(5*time.Minute),
jobs.WithLogger(logger),
),
mau.NewMAUSummarizer(
mysqlClient,
eventCounterClient,
location,
jobs.WithTimeout(30*time.Minute),
jobs.WithLogger(logger),
),
logger,
notification.WithRunningDurationPerBatch(*s.runningDurationPerBatch),
notification.WithLogger(logger),
Expand Down
79 changes: 79 additions & 0 deletions pkg/batch/jobs/mau/mau_summarizer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package mau

import (
"context"
"fmt"
"time"

"go.uber.org/zap"

"github.com/bucketeer-io/bucketeer/pkg/batch/jobs"
ecclient "github.com/bucketeer-io/bucketeer/pkg/eventcounter/client"
"github.com/bucketeer-io/bucketeer/pkg/storage/v2/mysql"
"github.com/bucketeer-io/bucketeer/proto/eventcounter"
)

type mauSummarizer struct {
mysqlClient mysql.Client
eventCounterClient ecclient.Client
location *time.Location
opts *jobs.Options
logger *zap.Logger
}

func NewMAUSummarizer(
mysqlClient mysql.Client,
eventCounterClient ecclient.Client,
location *time.Location,
opts ...jobs.Option) jobs.Job {
dopts := &jobs.Options{
Timeout: 5 * time.Minute,
Logger: zap.NewNop(),
}
for _, opt := range opts {
opt(dopts)
}
return &mauSummarizer{
mysqlClient: mysqlClient,
eventCounterClient: eventCounterClient,
location: location,
opts: dopts,
logger: dopts.Logger.Named("mau-count-watcher"),
}
}

func (s *mauSummarizer) Run(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, s.opts.Timeout)
defer cancel()
now := time.Now().In(s.location)
yesterday := s.getYesterday(now)
_, err := s.eventCounterClient.SummarizeMAUCounts(
ctx,
&eventcounter.SummarizeMAUCountsRequest{
YearMonth: s.newYearMonth(int32(yesterday.Year()), int32(yesterday.Month())),
IsFinishDay: s.isEndDateOfMonth(yesterday),
},
)
if err != nil {
s.logger.Error("Failed to summarize MAU counts",
zap.Error(err),
zap.Int32("year", int32(yesterday.Year())),
zap.Int32("month", int32(yesterday.Month())),
)
return err
}
return nil
}

func (s *mauSummarizer) getYesterday(now time.Time) time.Time {
return now.AddDate(0, 0, -1)
}

func (s *mauSummarizer) newYearMonth(year, month int32) string {
return fmt.Sprintf("%d%02d", year, month)
}

func (s *mauSummarizer) isEndDateOfMonth(target time.Time) bool {
next := target.AddDate(0, 0, 1)
return next.Day() == 1
}
166 changes: 166 additions & 0 deletions pkg/batch/jobs/mau/mau_summarizer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
// Copyright 2023 The Bucketeer Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package mau

import (
"context"
"errors"
"testing"
"time"

"github.com/stretchr/testify/assert"
"go.uber.org/mock/gomock"
"go.uber.org/zap"

"github.com/bucketeer-io/bucketeer/pkg/batch/jobs"
ecclientmock "github.com/bucketeer-io/bucketeer/pkg/eventcounter/client/mock"
mysqlmock "github.com/bucketeer-io/bucketeer/pkg/storage/v2/mysql/mock"
ecproto "github.com/bucketeer-io/bucketeer/proto/eventcounter"
)

func TestRun(t *testing.T) {
t.Parallel()
mockController := gomock.NewController(t)
defer mockController.Finish()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

patterns := []struct {
desc string
setup func(r *mauSummarizer)
expected error
}{
{
desc: "fail",
setup: func(r *mauSummarizer) {
r.eventCounterClient.(*ecclientmock.MockClient).EXPECT().SummarizeMAUCounts(gomock.Any(), gomock.Any()).Return(
nil, errors.New("test"))
},
expected: errors.New("test"),
},
{
desc: "success",
setup: func(r *mauSummarizer) {
r.eventCounterClient.(*ecclientmock.MockClient).EXPECT().SummarizeMAUCounts(gomock.Any(), gomock.Any()).Return(
&ecproto.SummarizeMAUCountsResponse{}, nil)
},
expected: nil,
},
}

for _, p := range patterns {
t.Run(p.desc, func(t *testing.T) {
summarizer := newMockMAUSummarizer(t, mockController)
p.setup(summarizer)
err := summarizer.Run(ctx)
assert.Equal(t, p.expected, err)
})
}
}

func TestGetYesterday(t *testing.T) {
t.Parallel()
jst := time.FixedZone("Asia/Tokyo", 9*60*60)
patterns := []struct {
desc string
input time.Time
expected time.Time
}{
{
desc: "success",
input: time.Date(2023, 1, 31, 1, 00, 00, 0, time.UTC),
expected: time.Date(2023, 1, 30, 1, 00, 00, 0, time.UTC),
},
{
desc: "over month",
input: time.Date(2023, 2, 1, 1, 00, 00, 0, time.UTC),
expected: time.Date(2023, 1, 31, 1, 00, 00, 0, time.UTC),
},
{
desc: "over year",
input: time.Date(2023, 1, 1, 1, 00, 00, 0, time.UTC),
expected: time.Date(2022, 12, 31, 1, 00, 00, 0, time.UTC),
},
{
desc: "success JST",
input: time.Date(2023, 1, 31, 1, 00, 00, 0, jst),
expected: time.Date(2023, 1, 30, 1, 00, 00, 0, jst),
},
{
desc: "over month JST",
input: time.Date(2023, 2, 1, 1, 00, 00, 0, jst),
expected: time.Date(2023, 1, 31, 1, 00, 00, 0, jst),
},
{
desc: "over year JST",
input: time.Date(2023, 1, 1, 1, 00, 00, 0, jst),
expected: time.Date(2022, 12, 31, 1, 00, 00, 0, jst),
},
}
for _, p := range patterns {
t.Run(p.desc, func(t *testing.T) {
summarizer := &mauSummarizer{}
actual := summarizer.getYesterday(p.input)
assert.Equal(t, p.expected, actual)
})
}
}

func TestIsEndDateOfMonth(t *testing.T) {
t.Parallel()
jst := time.FixedZone("Asia/Tokyo", 9*60*60)
patterns := []struct {
desc string
input time.Time
expected bool
}{
{
desc: "not end of month",
input: time.Date(2023, 1, 30, 1, 00, 00, 0, time.UTC),
expected: false,
},
{
desc: "success1",
input: time.Date(2023, 1, 31, 1, 00, 00, 0, time.UTC),
expected: true,
},
{
desc: "success2",
input: time.Date(2023, 2, 1, 1, 00, 00, 0, jst),
expected: false,
},
}
for _, p := range patterns {
t.Run(p.desc, func(t *testing.T) {
summarizer := &mauSummarizer{}
actual := summarizer.isEndDateOfMonth(p.input)
assert.Equal(t, p.expected, actual)
})
}
}

func newMockMAUSummarizer(t *testing.T, c *gomock.Controller) *mauSummarizer {
t.Helper()
return &mauSummarizer{
mysqlClient: mysqlmock.NewMockClient(c),
eventCounterClient: ecclientmock.NewMockClient(c),
location: time.UTC,
opts: &jobs.Options{
Timeout: 30 * time.Second,
},
logger: zap.NewNop().Named("test-mau-summarizer"),
}
}

0 comments on commit 6094e9d

Please sign in to comment.