Skip to content

Commit

Permalink
feat: export dead branch session api (#266)
Browse files Browse the repository at this point in the history
  • Loading branch information
dk-lockdown committed Sep 16, 2022
1 parent 2c20554 commit fd87df8
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 1 deletion.
1 change: 1 addition & 0 deletions cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ var (
}

if dbpackConf.DistributedTransaction != nil {
dbpackHttp.AppendApplicationID(dbpackConf.AppID)
dt.RegisterTransactionManager(dbpackConf.DistributedTransaction)
}
}
Expand Down
49 changes: 49 additions & 0 deletions pkg/dt/storage/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ const (
LockKeyFormat = "lk/%s/%s"
// BranchKeyFormat bs/${XID}/${BranchSessionID}
BranchKeyFormat = "bs/%s/%d"
// DeadBranchKeyFormat dead/${BranchID}
DeadBranchKeyFormat = "dead/%s"
// DeadBranchKeyPrefix dead/bs/${ApplicationID}
DeadBranchKeyPrefix = "dead/bs/%s"
)

type store struct {
Expand Down Expand Up @@ -524,6 +528,51 @@ func (s *store) ReleaseLockKeys(ctx context.Context, resourceID string, lockKeys
return true, nil
}

func (s *store) SetBranchSessionDead(ctx context.Context, branchSession *api.BranchSession) error {
data, err := branchSession.Marshal()
if err != nil {
return err
}
ops := make([]clientv3.Op, 0)
deadBranchKey := fmt.Sprintf(DeadBranchKeyFormat, branchSession.BranchID)
ops = append(ops, clientv3.OpPut(deadBranchKey, string(data)))
ops = append(ops, clientv3.OpDelete(branchSession.BranchID))

txn := s.client.Txn(ctx)
txn.Then(ops...)
txnResp, err := txn.Commit()
if err != nil {
return err
}
if !txnResp.Succeeded {
return errors.New("failed to set branch session dead")
}
return nil
}

func (s *store) ListDeadBranchSession(ctx context.Context, applicationID string) ([]*api.BranchSession, error) {
prefix := fmt.Sprintf(DeadBranchKeyPrefix, applicationID)
resp, err := s.client.Get(ctx, prefix, clientv3.WithSerializable(), clientv3.WithPrefix())
if err != nil {
return nil, err
}
s.initBranchSessionRevision = resp.Header.Revision
if len(resp.Kvs) == 0 {
return nil, nil
}
var result []*api.BranchSession
for i := len(resp.Kvs) - 1; i >= 0; i-- {
kv := resp.Kvs[i]
branchSession := &api.BranchSession{}
err = branchSession.Unmarshal(kv.Value)
if err != nil {
return nil, err
}
result = append(result, branchSession)
}
return result, nil
}

func notFound(key string) clientv3.Cmp {
return clientv3.Compare(clientv3.ModRevision(key), "=", 0)
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/dt/storage/storagedriver.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ type Driver interface {
IsLockable(ctx context.Context, resourceID string, lockKey string) (bool, error)
IsLockableWithXID(ctx context.Context, resourceID string, lockKey string, xid string) (bool, error)
ReleaseLockKeys(ctx context.Context, resourceID string, lockKeys []string) (bool, error)
SetBranchSessionDead(ctx context.Context, branchSession *api.BranchSession) error
ListDeadBranchSession(ctx context.Context, applicationID string) ([]*api.BranchSession, error)
WatchGlobalSessions(ctx context.Context, applicationID string) Watcher
WatchBranchSessions(ctx context.Context, applicationID string) Watcher
}
Expand Down
11 changes: 10 additions & 1 deletion pkg/dt/transaction_manger.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,10 @@ func (manager *DistributedTransactionManager) IsLockableWithXID(ctx context.Cont
return manager.storageDriver.IsLockableWithXID(ctx, resourceID, lockKey, xid)
}

func (manager *DistributedTransactionManager) ListDeadBranchSessions(ctx context.Context) ([]*api.BranchSession, error) {
return manager.storageDriver.ListDeadBranchSession(ctx, manager.applicationID)
}

func (manager *DistributedTransactionManager) IsMaster() bool {
return manager.isMaster
}
Expand Down Expand Up @@ -391,6 +395,9 @@ func (manager *DistributedTransactionManager) processBranchSessions() error {
return err
}
}
if err := manager.storageDriver.SetBranchSessionDead(context.Background(), bs); err != nil {
return err
}
} else {
manager.branchSessionQueue.Add(bs)
}
Expand Down Expand Up @@ -445,6 +452,9 @@ func (manager *DistributedTransactionManager) processNextBranchSession(ctx conte
log.Error(err)
}
}
if err := manager.storageDriver.SetBranchSessionDead(context.Background(), bs); err != nil {
log.Error(err)
}
} else {
status, err = manager.branchRollback(bs)
if err != nil {
Expand All @@ -463,7 +473,6 @@ func (manager *DistributedTransactionManager) processNextBranchSession(ctx conte
metrics.BranchTransactionCounter.WithLabelValues(manager.applicationID, bs.ResourceID, metrics.TransactionStatusActive).Desc()
metrics.BranchTransactionCounter.WithLabelValues(manager.applicationID, bs.ResourceID, transactionStatus).Inc()
}

return true
}

Expand Down
61 changes: 61 additions & 0 deletions pkg/http/branchsession.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright 2022 CECTC, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package http

import (
"context"
"encoding/json"
"net/http"

"github.com/gorilla/mux"

"github.com/cectc/dbpack/pkg/dt"
"github.com/cectc/dbpack/pkg/dt/api"
"github.com/cectc/dbpack/pkg/log"
)

const (
deadBranchSessionsPath = "/deadBranchSessions"
)

func registerBranchSessionsRouter(router *mux.Router) {
router.Methods(http.MethodGet).Path(deadBranchSessionsPath).HandlerFunc(deadBranchSessionHandler)
}

func deadBranchSessionHandler(w http.ResponseWriter, r *http.Request) {
result := make(map[string][]*api.BranchSession)
for _, applicationID := range applicationIDs {
transactionManager := dt.GetTransactionManager(applicationID)
if transactionManager != nil {
deadBranchSessions, err := transactionManager.ListDeadBranchSessions(context.Background())
if err != nil {
log.Error(err)
}
if len(deadBranchSessions) != 0 {
result[applicationID] = deadBranchSessions
}
}
}
b, err := json.Marshal(result)
if err != nil {
w.Write([]byte(err.Error()))
w.WriteHeader(http.StatusInternalServerError)
return
}
w.Write(b)
w.WriteHeader(http.StatusOK)
}
3 changes: 3 additions & 0 deletions pkg/http/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ func RegisterRoutes() (http.Handler, error) {
// Add status router
registerStatusRouter(router)

// Add branch session router
registerBranchSessionsRouter(router)

return router, nil
}

Expand Down
1 change: 1 addition & 0 deletions pkg/proto/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type (
ReleaseLockKeys(ctx context.Context, resourceID string, lockKeys []string) (bool, error)
IsLockable(ctx context.Context, resourceID, lockKey string) (bool, error)
IsLockableWithXID(ctx context.Context, resourceID, lockKey, xid string) (bool, error)
ListDeadBranchSessions(ctx context.Context) ([]*api.BranchSession, error)
IsMaster() bool
}

Expand Down

0 comments on commit fd87df8

Please sign in to comment.