Skip to content

Commit

Permalink
feat: Sender service
Browse files Browse the repository at this point in the history
  • Loading branch information
XanderKon committed Dec 16, 2023
1 parent c7d3048 commit e5ac5d7
Show file tree
Hide file tree
Showing 8 changed files with 266 additions and 29 deletions.
21 changes: 18 additions & 3 deletions hw12_13_14_15_calendar/Makefile
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
BIN := "./bin/calendar"
BIN_CS := "./bin/calendar_scheduler"
BIN_SE := "./bin/calendar_sender"
DOCKER_IMG="calendar:develop"

GIT_HASH := $(shell git log --format="%h" -n 1)
LDFLAGS := -X main.release="develop" -X main.buildDate=$(shell date -u +%Y-%m-%dT%H:%M:%S) -X main.gitHash=$(GIT_HASH)

build:
build-calendar:
go build -v -o $(BIN) -ldflags "$(LDFLAGS)" ./cmd/calendar

run: build
run-calendar: build
$(BIN) -config ./configs/calendar_config.toml

build-scheduler:
Expand All @@ -17,6 +18,14 @@ build-scheduler:
run-scheduler: build-scheduler
$(BIN_CS) -config ./configs/scheduler_config.toml

build-sender:
go build -v -o $(BIN_SE) -ldflags "$(LDFLAGS)" ./cmd/sender

run-sender: build-sender
$(BIN_SE) -config ./configs/sender_config.toml

build: build-calendar build-scheduler build-sender

build-img:
docker build \
--build-arg=LDFLAGS="$(LDFLAGS)" \
Expand All @@ -37,6 +46,12 @@ generate:
version: build
$(BIN) version

scheduler-version: build
$(BIN_CS) version

sender-version: build
$(BIN) version

test:
go test -race ./internal/... ./pkg/...

Expand All @@ -46,4 +61,4 @@ install-lint-deps:
lint: install-lint-deps
golangci-lint run ./...

.PHONY: build run build-scheduler run-scheduler build-img run-img generate version test lint
.PHONY: build build-calendar run-calendar build-scheduler run-scheduler build-sender run-sender build-img run-img generate version scheduler-version sender-version test lint
15 changes: 7 additions & 8 deletions hw12_13_14_15_calendar/cmd/scheduler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,18 +97,17 @@ func main() {
wg.Done()
}()

scheduler := scheduler.New(logg, eventStorage, rmqInstance, config.Scheduler.RunFrequencyInterval, config.Scheduler.TimeForRemoveOldEvents)
scheduler := scheduler.New(
logg,
eventStorage,
rmqInstance,
config.Scheduler.RunFrequencyInterval,
config.Scheduler.TimeForRemoveOldEvents,
)

wg.Add(1)
go func() {
scheduler.NotificationSender(ctx)
}()
wg.Wait()
}

// func Worker(ctx context.Context, ch <-chan amqp.Delivery) {
// for msg := range ch {
// fmt.Printf("Received a message: %s\n", string(msg.Body))
// msg.Ack(false)
// }
// }
60 changes: 60 additions & 0 deletions hw12_13_14_15_calendar/cmd/sender/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package main

import (
"fmt"
"os"
"time"

"github.com/spf13/viper"
)

// При желании конфигурацию можно вынести в internal/config.
// Организация конфига в main принуждает нас сужать API компонентов, использовать
// при их конструировании только необходимые параметры, а также уменьшает вероятность циклической зависимости.
type Config struct {
Logger LoggerConf `mapstructure:"logger"`
Sender SenderConf `mapstructure:"sender"`
Rmq RMQConf `mapstructure:"rmq"`
}

type LoggerConf struct {
Level string `mapstructure:"level"`
Path string `mapstructure:"path"`
}

type SenderConf struct {
Threads int `mapstructure:"threads"`
}

type ExchangeConf struct {
Name string `mapstructure:"name"`
Type string `mapstructure:"type"`
QueueName string `mapstructure:"queueName"`
BindingKey string `mapstructure:"bindingKey"`
}

type RMQConf struct {
URI string `mapstructure:"uri"`
ConsumerTag string `mapstructure:"consumerTag"`
MaxElapsedTime string `mapstructure:"maxElapsedTime"`
InitialInterval string `mapstructure:"initialInterval"`
Multiplier int `mapstructure:"multiplier"`
MaxInterval time.Duration `mapstructure:"maxInterval"`
Exchange ExchangeConf
}

func NewConfig() *Config {
v := viper.New()
v.SetConfigFile(configFile)

if err := v.ReadInConfig(); err != nil {
fmt.Printf("couldn't load config: %s", err)
os.Exit(1)
}
var config Config
if err := v.Unmarshal(&config); err != nil {
fmt.Printf("couldn't read config: %s", err)
}

return &config
}
85 changes: 85 additions & 0 deletions hw12_13_14_15_calendar/cmd/sender/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package main

import (
"context"
"errors"
"flag"
"os"
"os/signal"
"sync"
"syscall"
"time"

"github.com/XanderKon/hw-otus/hw12_13_14_15_calendar/internal/app/sender"
"github.com/XanderKon/hw-otus/hw12_13_14_15_calendar/internal/logger"
"github.com/XanderKon/hw-otus/hw12_13_14_15_calendar/pkg/rmq"
)

var configFile string

func init() {
flag.StringVar(&configFile, "config", "/etc/sender_config.toml", "Path to configuration file")
}

func main() {
flag.Parse()

if flag.Arg(0) == "version" {
printVersion()
return
}

// init context
ctx, cancel := signal.NotifyContext(context.Background(),
syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP, syscall.SIGTSTP)
defer cancel()

config := NewConfig()

logg := logger.New(config.Logger.Level, os.Stdout)

rmqInstance := rmq.NewRmq(
config.Rmq.ConsumerTag,
config.Rmq.URI,
config.Rmq.Exchange.Name,
config.Rmq.Exchange.Type,
config.Rmq.Exchange.QueueName,
config.Rmq.Exchange.BindingKey,
config.Rmq.MaxInterval,
)

err := rmqInstance.Connect()
if err != nil {
logg.Error("cannot connect to AMQP server: " + err.Error())
return
}

var wg sync.WaitGroup

go func() {
<-ctx.Done()

_, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()

if err := rmqInstance.Shutdown(); err != nil && !errors.Is(err, rmq.ErrChannelClosed) {
logg.Error("failed to shutdown RMQ server: " + err.Error())
}

logg.Info("RMQ server successfully terminated!")

wg.Done()
}()

sender := sender.New(logg, rmqInstance, config.Sender.Threads)

wg.Add(1)
go func() {
err := sender.Consume(ctx)
if err != nil {
wg.Done()
logg.Error("cannot init conumer for AMQP server: %s", err.Error())
}
}()
wg.Wait()
}
27 changes: 27 additions & 0 deletions hw12_13_14_15_calendar/cmd/sender/version.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package main

import (
"encoding/json"
"fmt"
"os"
)

var (
release = "UNKNOWN"
buildDate = "UNKNOWN"
gitHash = "UNKNOWN"
)

func printVersion() {
if err := json.NewEncoder(os.Stdout).Encode(struct {
Release string
BuildDate string
GitHash string
}{
Release: release,
BuildDate: buildDate,
GitHash: gitHash,
}); err != nil {
fmt.Printf("error while decode version info: %v\n", err)
}
}
33 changes: 16 additions & 17 deletions hw12_13_14_15_calendar/configs/sender_config.toml
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
[logger]
level = "INFO"
path = "./logs/calendar.log"
level = "DEBUG"
path = "./logs/sender.log"

[storage]
driver = "postgres" #[memory|postgres]
[sender]
threads = 2 # How many workers for reading from Queue

[db]
host = "localhost"
port = 5432
name = "otus-db"
username = "postgres"
password = "postgres"
[rmq]
uri = "amqp://guest:guest@localhost:5672/"
consumerTag = "test"
maxElapsedTime = "1m"
initialInterval = "1s"
multiplier = 2
maxInterval = "15s"

[http]
host = "localhost"
port = 8080

[grpc]
host = "localhost"
port = 8081
[rmq.exchange]
name = "events"
type = "fanout"
queueName = "notifications"
bindingKey = ""
49 changes: 49 additions & 0 deletions hw12_13_14_15_calendar/internal/app/sender/app.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package sender

import (
"context"

"github.com/XanderKon/hw-otus/hw12_13_14_15_calendar/pkg/rmq"
"github.com/streadway/amqp"
)

type Sender struct {
logger Logger
rmq *rmq.Rmq
threads int
}

type Logger interface {
Debug(msg string, a ...any)
Info(msg string, a ...any)
Warning(msg string, a ...any)
Error(msg string, a ...any)
}

func New(
logger Logger,
rmq *rmq.Rmq,
threads int,
) *Sender {
return &Sender{
logger: logger,
rmq: rmq,
threads: threads,
}
}

func (s *Sender) Consume(ctx context.Context) error {
return s.rmq.Handle(ctx, s.worker, s.threads)
}

func (s *Sender) worker(ctx context.Context, ch <-chan amqp.Delivery) {
for {
select {
case msg := <-ch:
s.logger.Info("successfully receive from queue: %s", msg.Body)
msg.Ack(false)
case <-ctx.Done():
return
}
}
}
5 changes: 4 additions & 1 deletion hw12_13_14_15_calendar/pkg/rmq/rmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,10 @@ func (r *Rmq) Handle(ctx context.Context, fn Worker, threads int) error {
go fn(ctx, msgs)
}

if <-r.done != nil {
select {
case <-ctx.Done():
return nil
case <-r.done:
err = r.reConnect(ctx)
if err != nil {
return errors.Join(ErrReconnection, err)
Expand Down

0 comments on commit e5ac5d7

Please sign in to comment.