Skip to content

Commit

Permalink
lesson 56: Send async tasks to Redis with DB Transaction
Browse files Browse the repository at this point in the history
  • Loading branch information
blessedmadukoma committed Sep 29, 2023
1 parent 5d68e66 commit 15d7ff1
Show file tree
Hide file tree
Showing 6 changed files with 167 additions and 109 deletions.
15 changes: 15 additions & 0 deletions db/mock/store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

83 changes: 1 addition & 82 deletions db/sqlc/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
type Store interface {
Querier
TransferTx(ctx context.Context, arg TransferTxParams) (TransferTxResult, error)
CreateUserTx(ctx context.Context, arg CreateUserTxParams) (CreateUserTxResult, error)
}

// SQLStore provides all functions to execute SQL queries and transactions
Expand Down Expand Up @@ -44,85 +45,3 @@ func (store *SQLStore) execTx(ctx context.Context, fn func(*Queries) error) erro

return tx.Commit()
}

// TransferTxParams contains the input parameters of the transfer transaction
type TransferTxParams struct {
FromAccountID int64 `json:"from_account_id"`
ToAccountID int64 `json:"to_account_id"`
Amount int64 `json:"amount"`
}

// TransferTxResult is the result of the transfer transaction
type TransferTxResult struct {
Transfer Transfer `json:"transfer"`
FromAccount Account `json:"from_account"`
ToAccount Account `json:"to_account"`
FromEntry Entry `json:"from_entry"`
ToEntry Entry `json:"to_entry"`
}

// TransferTx performs a money transfer from one account to the other
// create a transfer record, add account entries, update accounts' balance within a single datase transaction
func (s *SQLStore) TransferTx(ctx context.Context, arg TransferTxParams) (TransferTxResult, error) {
var result TransferTxResult

err := s.execTx(ctx, func(q *Queries) error {
var err error

result.Transfer, err = q.CreateTransfer(ctx, CreateTransferParams{
FromAccountID: arg.FromAccountID,
ToAccountID: arg.ToAccountID,
Amount: arg.Amount,
})
if err != nil {
return err
}

result.FromEntry, err = q.CreateEntry(ctx, CreateEntryParams{
AccountID: arg.FromAccountID,
Amount: -arg.Amount,
})
if err != nil {
return err
}

result.ToEntry, err = q.CreateEntry(ctx, CreateEntryParams{
AccountID: arg.ToAccountID,
Amount: arg.Amount,
})
if err != nil {
return err
}

// update accounts' balance
if arg.FromAccountID < arg.ToAccountID {
result.FromAccount, result.ToAccount, _ = addMoney(ctx, q, arg.FromAccountID, -arg.Amount, arg.ToAccountID, arg.Amount)
} else {
result.ToAccount, result.FromAccount, _ = addMoney(ctx, q, arg.ToAccountID, arg.Amount, arg.FromAccountID, -arg.Amount)
}

return nil
})

return result, err
}

func addMoney(ctx context.Context, q *Queries, accountID1, amount1, accountID2, amount2 int64) (account1, account2 Account, err error) {
account1, err = q.AddAccountBalance(ctx, AddAccountBalanceParams{
ID: accountID1,
Amount: amount1,
})
if err != nil {
return
}

account2, err = q.AddAccountBalance(ctx, AddAccountBalanceParams{
ID: accountID2,
Amount: amount2,
})
if err != nil {
return
}

return
}
32 changes: 32 additions & 0 deletions db/sqlc/tx_create_user.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package db

import "context"

// CreateUserTxParams contains the input parameters of the create user transaction
type CreateUserTxParams struct {
CreateUserParams
AfterCreate func(user User) error
}

// CreateUserTxResult is the result of the create user transaction
type CreateUserTxResult struct {
User User
}

// CreateUserTx performs a
func (s *SQLStore) CreateUserTx(ctx context.Context, arg CreateUserTxParams) (CreateUserTxResult, error) {
var result CreateUserTxResult

err := s.execTx(ctx, func(q *Queries) error {
var err error

result.User, err = q.CreateUser(ctx, arg.CreateUserParams)
if err != nil {
return err
}

return arg.AfterCreate(result.User) // call the callback function to allow for retry if there is an error
})

return result, err
}
86 changes: 86 additions & 0 deletions db/sqlc/tx_transfer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package db

import "context"

// TransferTxParams contains the input parameters of the transfer transaction
type TransferTxParams struct {
FromAccountID int64 `json:"from_account_id"`
ToAccountID int64 `json:"to_account_id"`
Amount int64 `json:"amount"`
}

// TransferTxResult is the result of the transfer transaction
type TransferTxResult struct {
Transfer Transfer `json:"transfer"`
FromAccount Account `json:"from_account"`
ToAccount Account `json:"to_account"`
FromEntry Entry `json:"from_entry"`
ToEntry Entry `json:"to_entry"`
}

// TransferTx performs a money transfer from one account to the other
// create a transfer record, add account entries, update accounts' balance within a single datase transaction
func (s *SQLStore) TransferTx(ctx context.Context, arg TransferTxParams) (TransferTxResult, error) {
var result TransferTxResult

err := s.execTx(ctx, func(q *Queries) error {
var err error

result.Transfer, err = q.CreateTransfer(ctx, CreateTransferParams{
FromAccountID: arg.FromAccountID,
ToAccountID: arg.ToAccountID,
Amount: arg.Amount,
})
if err != nil {
return err
}

result.FromEntry, err = q.CreateEntry(ctx, CreateEntryParams{
AccountID: arg.FromAccountID,
Amount: -arg.Amount,
})
if err != nil {
return err
}

result.ToEntry, err = q.CreateEntry(ctx, CreateEntryParams{
AccountID: arg.ToAccountID,
Amount: arg.Amount,
})
if err != nil {
return err
}

// update accounts' balance
if arg.FromAccountID < arg.ToAccountID {
result.FromAccount, result.ToAccount, _ = addMoney(ctx, q, arg.FromAccountID, -arg.Amount, arg.ToAccountID, arg.Amount)
} else {
result.ToAccount, result.FromAccount, _ = addMoney(ctx, q, arg.ToAccountID, arg.Amount, arg.FromAccountID, -arg.Amount)
}

// return nil
return err
})

return result, err
}

func addMoney(ctx context.Context, q *Queries, accountID1, amount1, accountID2, amount2 int64) (account1, account2 Account, err error) {
account1, err = q.AddAccountBalance(ctx, AddAccountBalanceParams{
ID: accountID1,
Amount: amount1,
})
if err != nil {
return
}

account2, err = q.AddAccountBalance(ctx, AddAccountBalanceParams{
ID: accountID2,
Amount: amount2,
})
if err != nil {
return
}

return
}
10 changes: 9 additions & 1 deletion details.md
Original file line number Diff line number Diff line change
Expand Up @@ -459,4 +459,12 @@
3. initialized redis client in `main.go`, added background task worker to `gapi/server.go` to be available to all its gRPC handler functions and methods.
4. sent email to user in `rpc_create_user.go` by calling the worker and task distributor.
5. added `runTaskProcessor` to run/start the task processor
6. set processor to take tasks from the critical queue, not only the default queue
6. set processor to take tasks from the critical queue, not only the default queue

56. Send async tasks to Redis with DB Transaction
1. refactored `db/sqlc/store.go` by moving the transactions to `db/sqlc/tx_transfer.go`.
2. craeted `db/sqlc/tx_create_user.go` and implemented transaction queries
3. updated `db/sqlc/store.go` to include `CreateUserTx` function
4. updated `rpc_create_user.go` to include the newly created create user transaction methods and added the redis asynq queue to run in the same db transaction to create the user in which if there's an error, user is not added and the db retries the transaction.
5. ran `make mock` to update the mockgen.

50 changes: 24 additions & 26 deletions gapi/rpc_create_user.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,31 @@ func (srv *Server) CreateUser(ctx context.Context, req *pb.CreateUserRequest) (*
return nil, status.Errorf(codes.Internal, "failed to hash password: %s", err)
}

arg := db.CreateUserParams{
Username: req.Username,
HashedPassword: hashedPassword,
FullName: req.FullName,
Email: req.Email,
arg := db.CreateUserTxParams{
CreateUserParams: db.CreateUserParams{
Username: req.Username,
HashedPassword: hashedPassword,
FullName: req.FullName,
Email: req.Email,
},
AfterCreate: func(user db.User) error {
// send verify email to user
taskPayload := &worker.PayloadSendVerifyEmail{
Username: user.Username,
}

// set task to be processed or retried
opts := []asynq.Option{
asynq.MaxRetry(10), // to be retried at most 10 times
asynq.ProcessIn(10 * time.Second), // to be processed in 10 seconds
asynq.Queue(worker.QueueCritical), // to be processed in the "critical" queue
}

return srv.taskDistributor.DistributeTaskSendVerifyEmail(ctx, *taskPayload, opts...)
},
}

user, err := srv.store.CreateUser(ctx, arg)
txResult, err := srv.store.CreateUserTx(ctx, arg)
if err != nil {
if pqErr, ok := err.(*pq.Error); ok {
switch pqErr.Code.Name() {
Expand All @@ -45,27 +62,8 @@ func (srv *Server) CreateUser(ctx context.Context, req *pb.CreateUserRequest) (*
return nil, status.Errorf(codes.Unimplemented, "failed to create user: %s", err)
}

// TODO: use db transaction

// send verify email to user
taskPayload := &worker.PayloadSendVerifyEmail{
Username: user.Username,
}

// set task to be processed or retried
opts := []asynq.Option{
asynq.MaxRetry(10), // to be retried at most 10 times
asynq.ProcessIn(10 * time.Second), // to be processed in 10 seconds
asynq.Queue(worker.QueueCritical), // to be processed in the "critical" queue
}

err = srv.taskDistributor.DistributeTaskSendVerifyEmail(ctx, *taskPayload, opts...)
if err != nil {
return nil, status.Errorf(codes.Unimplemented, "failed to distribute task to send verify email: %s", err)
}

rsp := &pb.CreateUserResponse{
User: convertUser(user),
User: convertUser(txResult.User),
}

return rsp, nil
Expand Down

0 comments on commit 15d7ff1

Please sign in to comment.