Skip to content

Commit

Permalink
Use pkg/handler in indexer
Browse files Browse the repository at this point in the history
  • Loading branch information
Masayoshi Mizutani committed Sep 25, 2020
1 parent 1c73c90 commit 9f7ce5c
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 43 deletions.
4 changes: 4 additions & 0 deletions pkg/handler/arguments.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/m-mizutani/minerva/internal/adaptor"
"github.com/m-mizutani/minerva/internal/repository"
"github.com/m-mizutani/minerva/internal/service"
"github.com/m-mizutani/rlogs"
"github.com/pkg/errors"
)

Expand All @@ -20,6 +21,9 @@ type Arguments struct {
ChunkRepo repository.ChunkRepository `json:"-"`
NewEncoder adaptor.EncoderFactory `json:"-"`
NewDecoder adaptor.DecoderFactory `json:"-"`

// Only required for indexer
Reader *rlogs.Reader
}

// EventRecord is decapslated event data (e.g. Body of SQS event)
Expand Down
7 changes: 6 additions & 1 deletion pkg/handler/lambda.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/aws/aws-lambda-go/lambda"
"github.com/m-mizutani/minerva/internal"
"github.com/m-mizutani/rlogs"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
Expand All @@ -16,7 +17,7 @@ var Logger = internal.Logger
type Handler func(Arguments) error

// StartLambda initialize AWS Lambda and invokes handler
func StartLambda(handler Handler) {
func StartLambda(handler Handler, reader ...*rlogs.Reader) {
Logger.SetLevel(logrus.InfoLevel)
Logger.SetFormatter(&logrus.JSONFormatter{})

Expand All @@ -34,6 +35,10 @@ func StartLambda(handler Handler) {
Logger.WithFields(logrus.Fields{"args": args, "event": event}).Debug("Start handler")
args.Event = event

if len(reader) > 0 {
args.Reader = reader[0]
}

if err := handler(args); err != nil {
Logger.WithFields(logrus.Fields{"args": args, "event": event}).Error("Failed Handler")
err = errors.Wrap(err, "Failed Handler")
Expand Down
72 changes: 30 additions & 42 deletions pkg/indexer/main.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
package indexer

import (
"context"
"encoding/json"
"strings"

"github.com/aws/aws-lambda-go/events"
"github.com/m-mizutani/minerva/internal"
"github.com/m-mizutani/minerva/internal/adaptor"
"github.com/m-mizutani/minerva/internal/repository"
"github.com/m-mizutani/minerva/internal/service"
"github.com/m-mizutani/minerva/pkg/handler"
"github.com/m-mizutani/minerva/pkg/models"
"github.com/m-mizutani/rlogs"
Expand All @@ -20,22 +17,8 @@ var logger = handler.Logger

// RunIndexer is main handler of indexer. It requires log reader based on rlogs.
// Main procedures are in handleEvent() to reduce number of internal.HandleError().
func RunIndexer(ctx context.Context, sqsEvent events.SQSEvent, reader *rlogs.Reader) error {
defer internal.FlushError()

args := Arguments{
Event: sqsEvent,
Reader: reader,
NewS3: adaptor.NewS3Client,
NewSQS: adaptor.NewSQSClient,
NewEncoder: adaptor.NewMsgpackEncoder,
}
if err := handleEvent(args); err != nil {
internal.HandleError(err)
return err
}

return nil
func RunIndexer(reader *rlogs.Reader) {
handler.StartLambda(handleEvent, reader)
}

type Arguments struct {
Expand All @@ -49,48 +32,52 @@ type Arguments struct {
NewDecoder adaptor.DecoderFactory `json:"-"`
}

func handleEvent(args Arguments) error {
if err := args.BindEnvVars(); err != nil {
func handleEvent(args handler.Arguments) error {
records, err := args.DecapSQSEvent()
if err != nil {
return err
}

internal.SetupLogger(args.LogLevel)
logger.WithField("event", args.Event).Debug("Start handler")

for _, sqsRecord := range args.Event.Records {
for _, record := range records {
var snsEntity events.SNSEntity
if err := json.Unmarshal([]byte(sqsRecord.Body), &snsEntity); err != nil {
return errors.Wrapf(err, "Fail to unmarshal SNS event in SQS message: %s", sqsRecord.Body)
if err := record.Bind(&snsEntity); err != nil {
return err
}
logger.WithField("snsEntity", snsEntity).Debug("Received SNS Event")

var s3Event events.S3Event
if err := json.Unmarshal([]byte(snsEntity.Message), &s3Event); err != nil {
return errors.Wrapf(err, "Fail to unmarshal S3 event in SNS message: %s", snsEntity.Message)
}
logger.WithField("s3Event", s3Event).Debug("Received S3 Event")

for _, s3record := range s3Event.Records {
s3Key := s3record.S3.Object.Key
if s3Key == "" || strings.HasSuffix(s3Key, "/") {
logger.WithField("s3", s3record).Warn("No key of S3 object OR invalid object key")
continue
}

logger.WithField("args", args).Info("Start indexer")
if err := MakeIndex(args, s3record); err != nil {
return errors.Wrap(err, "Fail to create inverted index")
}
if err := handleS3Event(args, s3Event); err != nil {
return err
}
}

return nil
}

func handleS3Event(args handler.Arguments, s3Event events.S3Event) error {
for _, s3record := range s3Event.Records {
s3Key := s3record.S3.Object.Key
if s3Key == "" || strings.HasSuffix(s3Key, "/") {
logger.WithField("s3", s3record).Warn("No key of S3 object OR invalid object key")
continue
}

logger.WithField("args", args).Info("Start indexer")
if err := MakeIndex(args, s3record); err != nil {
return errors.Wrap(err, "Fail to create inverted index")
}
}
return nil
}

// MakeIndex is a process for one S3 object to make index file.
func MakeIndex(args Arguments, record events.S3EventRecord) error {
func MakeIndex(args handler.Arguments, record events.S3EventRecord) error {
srcObject := models.NewS3ObjectFromRecord(record)
sqsService := service.NewSQSService(args.NewSQS)
sqsService := args.SQSService()

meta := repository.NewMetaDynamoDB(args.AwsRegion, args.MetaTableName)
objectID, err := meta.GetObjecID(srcObject.Bucket, srcObject.Key)
Expand All @@ -99,7 +86,8 @@ func MakeIndex(args Arguments, record events.S3EventRecord) error {
}

dstBase := models.NewS3Object(args.S3Region, args.S3Bucket, args.S3Prefix)
recordService := service.NewRecordService(args.NewS3, args.NewEncoder, args.NewDecoder)
recordService := args.RecordService()

for q := range makeLogChannel(srcObject, args.Reader) {
if q.Err != nil {
return q.Err
Expand Down

0 comments on commit 9f7ce5c

Please sign in to comment.