From 15d7ff1566321ac52f1201d44350a39336eff0b2 Mon Sep 17 00:00:00 2001 From: blessedmadukoma Date: Fri, 29 Sep 2023 17:45:00 +0100 Subject: [PATCH] lesson 56: Send async tasks to Redis with DB Transaction --- db/mock/store.go | 15 +++++++ db/sqlc/store.go | 83 +------------------------------------ db/sqlc/tx_create_user.go | 32 +++++++++++++++ db/sqlc/tx_transfer.go | 86 +++++++++++++++++++++++++++++++++++++++ details.md | 10 ++++- gapi/rpc_create_user.go | 50 +++++++++++------------ 6 files changed, 167 insertions(+), 109 deletions(-) create mode 100644 db/sqlc/tx_create_user.go create mode 100644 db/sqlc/tx_transfer.go diff --git a/db/mock/store.go b/db/mock/store.go index fb307ea..b0fee8d 100644 --- a/db/mock/store.go +++ b/db/mock/store.go @@ -126,6 +126,21 @@ func (mr *MockStoreMockRecorder) CreateUser(arg0, arg1 interface{}) *gomock.Call return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateUser", reflect.TypeOf((*MockStore)(nil).CreateUser), arg0, arg1) } +// CreateUserTx mocks base method. +func (m *MockStore) CreateUserTx(arg0 context.Context, arg1 db.CreateUserTxParams) (db.CreateUserTxResult, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CreateUserTx", arg0, arg1) + ret0, _ := ret[0].(db.CreateUserTxResult) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CreateUserTx indicates an expected call of CreateUserTx. +func (mr *MockStoreMockRecorder) CreateUserTx(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateUserTx", reflect.TypeOf((*MockStore)(nil).CreateUserTx), arg0, arg1) +} + // DeleteAccount mocks base method. func (m *MockStore) DeleteAccount(arg0 context.Context, arg1 int64) error { m.ctrl.T.Helper() diff --git a/db/sqlc/store.go b/db/sqlc/store.go index 606135c..c36e089 100644 --- a/db/sqlc/store.go +++ b/db/sqlc/store.go @@ -10,6 +10,7 @@ import ( type Store interface { Querier TransferTx(ctx context.Context, arg TransferTxParams) (TransferTxResult, error) + CreateUserTx(ctx context.Context, arg CreateUserTxParams) (CreateUserTxResult, error) } // SQLStore provides all functions to execute SQL queries and transactions @@ -44,85 +45,3 @@ func (store *SQLStore) execTx(ctx context.Context, fn func(*Queries) error) erro return tx.Commit() } - -// TransferTxParams contains the input parameters of the transfer transaction -type TransferTxParams struct { - FromAccountID int64 `json:"from_account_id"` - ToAccountID int64 `json:"to_account_id"` - Amount int64 `json:"amount"` -} - -// TransferTxResult is the result of the transfer transaction -type TransferTxResult struct { - Transfer Transfer `json:"transfer"` - FromAccount Account `json:"from_account"` - ToAccount Account `json:"to_account"` - FromEntry Entry `json:"from_entry"` - ToEntry Entry `json:"to_entry"` -} - -// TransferTx performs a money transfer from one account to the other -// create a transfer record, add account entries, update accounts' balance within a single datase transaction -func (s *SQLStore) TransferTx(ctx context.Context, arg TransferTxParams) (TransferTxResult, error) { - var result TransferTxResult - - err := s.execTx(ctx, func(q *Queries) error { - var err error - - result.Transfer, err = q.CreateTransfer(ctx, CreateTransferParams{ - FromAccountID: arg.FromAccountID, - ToAccountID: arg.ToAccountID, - Amount: arg.Amount, - }) - if err != nil { - return err - } - - result.FromEntry, err = q.CreateEntry(ctx, CreateEntryParams{ - AccountID: arg.FromAccountID, - Amount: -arg.Amount, - }) - if err != nil { - return err - } - - result.ToEntry, err = q.CreateEntry(ctx, CreateEntryParams{ - AccountID: arg.ToAccountID, - Amount: arg.Amount, - }) - if err != nil { - return err - } - - // update accounts' balance - if arg.FromAccountID < arg.ToAccountID { - result.FromAccount, result.ToAccount, _ = addMoney(ctx, q, arg.FromAccountID, -arg.Amount, arg.ToAccountID, arg.Amount) - } else { - result.ToAccount, result.FromAccount, _ = addMoney(ctx, q, arg.ToAccountID, arg.Amount, arg.FromAccountID, -arg.Amount) - } - - return nil - }) - - return result, err -} - -func addMoney(ctx context.Context, q *Queries, accountID1, amount1, accountID2, amount2 int64) (account1, account2 Account, err error) { - account1, err = q.AddAccountBalance(ctx, AddAccountBalanceParams{ - ID: accountID1, - Amount: amount1, - }) - if err != nil { - return - } - - account2, err = q.AddAccountBalance(ctx, AddAccountBalanceParams{ - ID: accountID2, - Amount: amount2, - }) - if err != nil { - return - } - - return -} diff --git a/db/sqlc/tx_create_user.go b/db/sqlc/tx_create_user.go new file mode 100644 index 0000000..4efaef7 --- /dev/null +++ b/db/sqlc/tx_create_user.go @@ -0,0 +1,32 @@ +package db + +import "context" + +// CreateUserTxParams contains the input parameters of the create user transaction +type CreateUserTxParams struct { + CreateUserParams + AfterCreate func(user User) error +} + +// CreateUserTxResult is the result of the create user transaction +type CreateUserTxResult struct { + User User +} + +// CreateUserTx performs a +func (s *SQLStore) CreateUserTx(ctx context.Context, arg CreateUserTxParams) (CreateUserTxResult, error) { + var result CreateUserTxResult + + err := s.execTx(ctx, func(q *Queries) error { + var err error + + result.User, err = q.CreateUser(ctx, arg.CreateUserParams) + if err != nil { + return err + } + + return arg.AfterCreate(result.User) // call the callback function to allow for retry if there is an error + }) + + return result, err +} diff --git a/db/sqlc/tx_transfer.go b/db/sqlc/tx_transfer.go new file mode 100644 index 0000000..22a4867 --- /dev/null +++ b/db/sqlc/tx_transfer.go @@ -0,0 +1,86 @@ +package db + +import "context" + +// TransferTxParams contains the input parameters of the transfer transaction +type TransferTxParams struct { + FromAccountID int64 `json:"from_account_id"` + ToAccountID int64 `json:"to_account_id"` + Amount int64 `json:"amount"` +} + +// TransferTxResult is the result of the transfer transaction +type TransferTxResult struct { + Transfer Transfer `json:"transfer"` + FromAccount Account `json:"from_account"` + ToAccount Account `json:"to_account"` + FromEntry Entry `json:"from_entry"` + ToEntry Entry `json:"to_entry"` +} + +// TransferTx performs a money transfer from one account to the other +// create a transfer record, add account entries, update accounts' balance within a single datase transaction +func (s *SQLStore) TransferTx(ctx context.Context, arg TransferTxParams) (TransferTxResult, error) { + var result TransferTxResult + + err := s.execTx(ctx, func(q *Queries) error { + var err error + + result.Transfer, err = q.CreateTransfer(ctx, CreateTransferParams{ + FromAccountID: arg.FromAccountID, + ToAccountID: arg.ToAccountID, + Amount: arg.Amount, + }) + if err != nil { + return err + } + + result.FromEntry, err = q.CreateEntry(ctx, CreateEntryParams{ + AccountID: arg.FromAccountID, + Amount: -arg.Amount, + }) + if err != nil { + return err + } + + result.ToEntry, err = q.CreateEntry(ctx, CreateEntryParams{ + AccountID: arg.ToAccountID, + Amount: arg.Amount, + }) + if err != nil { + return err + } + + // update accounts' balance + if arg.FromAccountID < arg.ToAccountID { + result.FromAccount, result.ToAccount, _ = addMoney(ctx, q, arg.FromAccountID, -arg.Amount, arg.ToAccountID, arg.Amount) + } else { + result.ToAccount, result.FromAccount, _ = addMoney(ctx, q, arg.ToAccountID, arg.Amount, arg.FromAccountID, -arg.Amount) + } + + // return nil + return err + }) + + return result, err +} + +func addMoney(ctx context.Context, q *Queries, accountID1, amount1, accountID2, amount2 int64) (account1, account2 Account, err error) { + account1, err = q.AddAccountBalance(ctx, AddAccountBalanceParams{ + ID: accountID1, + Amount: amount1, + }) + if err != nil { + return + } + + account2, err = q.AddAccountBalance(ctx, AddAccountBalanceParams{ + ID: accountID2, + Amount: amount2, + }) + if err != nil { + return + } + + return +} diff --git a/details.md b/details.md index c8a5d4f..d6a8358 100644 --- a/details.md +++ b/details.md @@ -459,4 +459,12 @@ 3. initialized redis client in `main.go`, added background task worker to `gapi/server.go` to be available to all its gRPC handler functions and methods. 4. sent email to user in `rpc_create_user.go` by calling the worker and task distributor. 5. added `runTaskProcessor` to run/start the task processor - 6. set processor to take tasks from the critical queue, not only the default queue \ No newline at end of file + 6. set processor to take tasks from the critical queue, not only the default queue + +56. Send async tasks to Redis with DB Transaction + 1. refactored `db/sqlc/store.go` by moving the transactions to `db/sqlc/tx_transfer.go`. + 2. craeted `db/sqlc/tx_create_user.go` and implemented transaction queries + 3. updated `db/sqlc/store.go` to include `CreateUserTx` function + 4. updated `rpc_create_user.go` to include the newly created create user transaction methods and added the redis asynq queue to run in the same db transaction to create the user in which if there's an error, user is not added and the db retries the transaction. + 5. ran `make mock` to update the mockgen. + diff --git a/gapi/rpc_create_user.go b/gapi/rpc_create_user.go index a4243b9..43e7a4a 100644 --- a/gapi/rpc_create_user.go +++ b/gapi/rpc_create_user.go @@ -27,14 +27,31 @@ func (srv *Server) CreateUser(ctx context.Context, req *pb.CreateUserRequest) (* return nil, status.Errorf(codes.Internal, "failed to hash password: %s", err) } - arg := db.CreateUserParams{ - Username: req.Username, - HashedPassword: hashedPassword, - FullName: req.FullName, - Email: req.Email, + arg := db.CreateUserTxParams{ + CreateUserParams: db.CreateUserParams{ + Username: req.Username, + HashedPassword: hashedPassword, + FullName: req.FullName, + Email: req.Email, + }, + AfterCreate: func(user db.User) error { + // send verify email to user + taskPayload := &worker.PayloadSendVerifyEmail{ + Username: user.Username, + } + + // set task to be processed or retried + opts := []asynq.Option{ + asynq.MaxRetry(10), // to be retried at most 10 times + asynq.ProcessIn(10 * time.Second), // to be processed in 10 seconds + asynq.Queue(worker.QueueCritical), // to be processed in the "critical" queue + } + + return srv.taskDistributor.DistributeTaskSendVerifyEmail(ctx, *taskPayload, opts...) + }, } - user, err := srv.store.CreateUser(ctx, arg) + txResult, err := srv.store.CreateUserTx(ctx, arg) if err != nil { if pqErr, ok := err.(*pq.Error); ok { switch pqErr.Code.Name() { @@ -45,27 +62,8 @@ func (srv *Server) CreateUser(ctx context.Context, req *pb.CreateUserRequest) (* return nil, status.Errorf(codes.Unimplemented, "failed to create user: %s", err) } - // TODO: use db transaction - - // send verify email to user - taskPayload := &worker.PayloadSendVerifyEmail{ - Username: user.Username, - } - - // set task to be processed or retried - opts := []asynq.Option{ - asynq.MaxRetry(10), // to be retried at most 10 times - asynq.ProcessIn(10 * time.Second), // to be processed in 10 seconds - asynq.Queue(worker.QueueCritical), // to be processed in the "critical" queue - } - - err = srv.taskDistributor.DistributeTaskSendVerifyEmail(ctx, *taskPayload, opts...) - if err != nil { - return nil, status.Errorf(codes.Unimplemented, "failed to distribute task to send verify email: %s", err) - } - rsp := &pb.CreateUserResponse{ - User: convertUser(user), + User: convertUser(txResult.User), } return rsp, nil