Skip to content

Commit

Permalink
integrate redis worker, update import binary package
Browse files Browse the repository at this point in the history
  • Loading branch information
hafizxd committed Feb 24, 2024
1 parent 1380145 commit 320d146
Show file tree
Hide file tree
Showing 15 changed files with 218 additions and 124 deletions.
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -55,5 +55,8 @@ proto:
evans:
evans --host localhost --port 9090 -r repl

redis:
docker run --name redis -p 6379:6379 -d redis:7.2-alpine

.PHONY: postgres postgresstart createdb dropdb migrateup migratedown sqlc test server mock db_docs db_schema proto evans

.PHONY: postgres postgresstart createdb dropdb migrateup migratedown sqlc test server mock db_docs db_schema proto evans redis
3 changes: 2 additions & 1 deletion app.env
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ HTTP_SERVER_ADDRESS=0.0.0.0:8080
GRPC_SERVER_ADDRESS=0.0.0.0:9090
TOKEN_SYMMETRIC_KEY=12345678901234567890123456789012
TOKEN_DURATION=15m
REFRESH_TOKEN_DURATION=24h
REFRESH_TOKEN_DURATION=24h
REDIS_ADDRESS=0.0.0.0:6379
15 changes: 15 additions & 0 deletions db/mock/store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

86 changes: 1 addition & 85 deletions db/sqlc/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
type Store interface {
Querier
TransferTx(ctx context.Context, arg TransferTxParams) (TransferTxResult, error)
CreateUserTx(ctx context.Context, arg CreateUserTxParams) (CreateUserTxResult, error)
}

type SQLStore struct {
Expand Down Expand Up @@ -41,88 +42,3 @@ func (store *SQLStore) ExecTx(ctx context.Context, fn func(*Queries) error) erro

return tx.Commit()
}

type TransferTxParams struct {
FromAccountId int64 `json:"from_account_id"`
ToAccountId int64 `json:"to_account_id"`
Amount int64 `json:"amount"`
}

type TransferTxResult struct {
Transfer Transfer `json:"transfer"`
FromAccount Account `json:"from_account"`
ToAccount Account `json:"to_account"`
FromEntry Entry `json:"from_entry"`
ToEntry Entry `json:"to_entry"`
}

func (store *SQLStore) TransferTx(ctx context.Context, arg TransferTxParams) (TransferTxResult, error) {
var result TransferTxResult

err := store.ExecTx(ctx, func(q *Queries) error {
var err error

result.Transfer, err = q.CreateTransfer(ctx, CreateTransferParams{
FromAccountID: arg.FromAccountId,
ToAccountID: arg.ToAccountId,
Amount: arg.Amount,
})
if err != nil {
return err
}

result.FromEntry, err = q.CreateEntry(ctx, CreateEntryParams{
AccountID: arg.FromAccountId,
Amount: -arg.Amount,
})
if err != nil {
return err
}

result.ToEntry, err = q.CreateEntry(ctx, CreateEntryParams{
AccountID: arg.ToAccountId,
Amount: arg.Amount,
})
if err != nil {
return err
}

if arg.FromAccountId < arg.ToAccountId {
err = addMoney(q, context.Background(), &result, arg.FromAccountId, arg.ToAccountId, -arg.Amount, arg.Amount)
} else {
err = addMoney(q, context.Background(), &result, arg.ToAccountId, arg.FromAccountId, arg.Amount, -arg.Amount)
}
if err != nil {
return err
}

return nil
})
if err != nil {
return TransferTxResult{}, err
}

return result, nil
}

func addMoney(q *Queries, ctx context.Context, result *TransferTxResult, accountId1, accountId2, amount1, amount2 int64) error {
var err error

result.FromAccount, err = q.AddAccountBalance(ctx, AddAccountBalanceParams{
ID: accountId1,
Amount: amount1,
})
if err != nil {
return err
}

result.ToAccount, err = q.AddAccountBalance(ctx, AddAccountBalanceParams{
ID: accountId2,
Amount: amount2,
})
if err != nil {
return err
}

return nil
}
32 changes: 32 additions & 0 deletions db/sqlc/tx_create_user.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package db

import "context"

type CreateUserTxParams struct {
CreateUserParams
AfterCreate func(user User) error
}

type CreateUserTxResult struct {
User User
}

func (store *SQLStore) CreateUserTx(ctx context.Context, arg CreateUserTxParams) (CreateUserTxResult, error) {
var result CreateUserTxResult

err := store.ExecTx(ctx, func(q *Queries) error {
var err error

result.User, err = q.CreateUser(ctx, arg.CreateUserParams)
if err != nil {
return err
}

return arg.AfterCreate(result.User)
})
if err != nil {
return CreateUserTxResult{}, err
}

return result, nil
}
88 changes: 88 additions & 0 deletions db/sqlc/tx_transfer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package db

import "context"

type TransferTxParams struct {
FromAccountId int64 `json:"from_account_id"`
ToAccountId int64 `json:"to_account_id"`
Amount int64 `json:"amount"`
}

type TransferTxResult struct {
Transfer Transfer `json:"transfer"`
FromAccount Account `json:"from_account"`
ToAccount Account `json:"to_account"`
FromEntry Entry `json:"from_entry"`
ToEntry Entry `json:"to_entry"`
}

func (store *SQLStore) TransferTx(ctx context.Context, arg TransferTxParams) (TransferTxResult, error) {
var result TransferTxResult

err := store.ExecTx(ctx, func(q *Queries) error {
var err error

result.Transfer, err = q.CreateTransfer(ctx, CreateTransferParams{
FromAccountID: arg.FromAccountId,
ToAccountID: arg.ToAccountId,
Amount: arg.Amount,
})
if err != nil {
return err
}

result.FromEntry, err = q.CreateEntry(ctx, CreateEntryParams{
AccountID: arg.FromAccountId,
Amount: -arg.Amount,
})
if err != nil {
return err
}

result.ToEntry, err = q.CreateEntry(ctx, CreateEntryParams{
AccountID: arg.ToAccountId,
Amount: arg.Amount,
})
if err != nil {
return err
}

if arg.FromAccountId < arg.ToAccountId {
err = addMoney(q, context.Background(), &result, arg.FromAccountId, arg.ToAccountId, -arg.Amount, arg.Amount)
} else {
err = addMoney(q, context.Background(), &result, arg.ToAccountId, arg.FromAccountId, arg.Amount, -arg.Amount)
}
if err != nil {
return err
}

return nil
})
if err != nil {
return TransferTxResult{}, err
}

return result, nil
}

func addMoney(q *Queries, ctx context.Context, result *TransferTxResult, accountId1, accountId2, amount1, amount2 int64) error {
var err error

result.FromAccount, err = q.AddAccountBalance(ctx, AddAccountBalanceParams{
ID: accountId1,
Amount: amount1,
})
if err != nil {
return err
}

result.ToAccount, err = q.AddAccountBalance(ctx, AddAccountBalanceParams{
ID: accountId2,
Amount: amount2,
})
if err != nil {
return err
}

return nil
}
2 changes: 1 addition & 1 deletion doc/statik/statik.go

Large diffs are not rendered by default.

32 changes: 23 additions & 9 deletions gapi/rpc_create_user.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@ import (
"github.com/hafizxd/micro-bank/pb"
"github.com/hafizxd/micro-bank/util"
"github.com/hafizxd/micro-bank/val"
"github.com/hafizxd/micro-bank/worker"
"github.com/hibiken/asynq"
"github.com/lib/pq"
"google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"time"
)

func (server *Server) CreateUser(ctx context.Context, req *pb.CreateUserRequest) (*pb.CreateUserResponse, error) {
Expand All @@ -23,14 +26,27 @@ func (server *Server) CreateUser(ctx context.Context, req *pb.CreateUserRequest)
return nil, status.Errorf(codes.Internal, "failed to hash password: %s", err)
}

arg := db.CreateUserParams{
Username: req.GetUsername(),
HashedPassword: hashedPassword,
FullName: req.GetFullName(),
Email: req.GetEmail(),
arg := db.CreateUserTxParams{
CreateUserParams: db.CreateUserParams{
Username: req.GetUsername(),
HashedPassword: hashedPassword,
FullName: req.GetFullName(),
Email: req.GetEmail(),
},
AfterCreate: func(user db.User) error {
taskPayload := &worker.PayloadSendVerifyEmail{
Username: user.Username,
}
opts := []asynq.Option{
asynq.MaxRetry(10),
asynq.ProcessIn(10 * time.Second),
asynq.Queue(worker.QueueCritical),
}
return server.taskDistributor.DistributeTaskSendVerifyEmail(ctx, taskPayload, opts...)
},
}

user, err := server.store.CreateUser(ctx, arg)
txResult, err := server.store.CreateUserTx(ctx, arg)
if err != nil {
if pqErr, ok := err.(*pq.Error); ok {
switch pqErr.Code.Name() {
Expand All @@ -42,10 +58,8 @@ func (server *Server) CreateUser(ctx context.Context, req *pb.CreateUserRequest)
return nil, status.Errorf(codes.Internal, "failed to create user: %s", err)
}

//verify email

rsp := &pb.CreateUserResponse{
User: convertUser(user),
User: convertUser(txResult.User),
}
return rsp, nil
}
Expand Down
17 changes: 10 additions & 7 deletions gapi/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,28 @@ import (
"github.com/hafizxd/micro-bank/pb"
"github.com/hafizxd/micro-bank/token"
"github.com/hafizxd/micro-bank/util"
"github.com/hafizxd/micro-bank/worker"
)

type Server struct {
pb.UnimplementedMicroBankServer
config util.Config
store db.Store
tokenMaker token.Maker
config util.Config
store db.Store
tokenMaker token.Maker
taskDistributor worker.TaskDistributor
}

func NewServer(config util.Config, store db.Store) (*Server, error) {
func NewServer(config util.Config, store db.Store, taskDistributor worker.TaskDistributor) (*Server, error) {
tokenMaker, err := token.NewPasetoMaker(config.TokenSymmetricKey)
if err != nil {
return nil, fmt.Errorf("cannot create token marker: %v", err)
}

server := &Server{
config: config,
store: store,
tokenMaker: tokenMaker,
config: config,
store: store,
tokenMaker: tokenMaker,
taskDistributor: taskDistributor,
}

return server, nil
Expand Down
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/golang/mock v1.6.0
github.com/google/uuid v1.6.0
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1
github.com/hibiken/asynq v0.24.1
github.com/lib/pq v1.10.9
github.com/o1egl/paseto v1.0.0
github.com/rakyll/statik v0.1.7
Expand All @@ -21,7 +22,6 @@ require (
google.golang.org/genproto/googleapis/api v0.0.0-20240125205218-1f4bbc51befe
google.golang.org/genproto/googleapis/rpc v0.0.0-20240125205218-1f4bbc51befe
google.golang.org/grpc v1.61.0
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.3.0
google.golang.org/protobuf v1.32.0
)

Expand All @@ -44,7 +44,6 @@ require (
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/hibiken/asynq v0.24.1 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/cpuid/v2 v2.2.6 // indirect
github.com/leodido/go-urn v1.4.0 // indirect
Expand Down
Loading

0 comments on commit 320d146

Please sign in to comment.