Skip to content

Commit

Permalink
dynamolock: Adds context to logger interface. (#148)
Browse files Browse the repository at this point in the history
  • Loading branch information
mavenraven committed Jun 14, 2021
1 parent 9ce0a0a commit c471462
Showing 1 changed file with 39 additions and 11 deletions.
50 changes: 39 additions & 11 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,26 @@ type Logger interface {
Println(v ...interface{})
}

// ContextLogger defines a logger interface that can be used to pass extra information to the implementation.
// For example, if you use zap, you may have extra fields you want to add to the log line. You
// can add those extra fields to the parent context of calls like AcquireLockWithContext, and then retrieve them in
// your implementation of ContextLogger.
type ContextLogger interface {
Println(ctx context.Context, v ...interface{})
}

type contextLoggerAdapter struct {
logger Logger
}

func newContextLogAdapter(l Logger) *contextLoggerAdapter {
return &contextLoggerAdapter{logger: l}
}

func (cla *contextLoggerAdapter) Println(_ context.Context, v ...interface{}) {
cla.logger.Println(v)
}

// Client is a dynamoDB based distributed lock client.
type Client struct {
dynamoDB dynamodbiface.DynamoDBAPI
Expand All @@ -71,7 +91,7 @@ type Client struct {
locks sync.Map
sessionMonitorCancellations sync.Map

logger Logger
logger ContextLogger

stopHeartbeat context.CancelFunc

Expand All @@ -95,8 +115,10 @@ func New(dynamoDB dynamodbiface.DynamoDBAPI, tableName string, opts ...ClientOpt
leaseDuration: defaultLeaseDuration,
heartbeatPeriod: defaultHeartbeatPeriod,
ownerName: randString(32),
logger: log.New(ioutil.Discard, "", 0),
stopHeartbeat: func() {},
logger: &contextLoggerAdapter{
logger: log.New(ioutil.Discard, "", 0),
},
stopHeartbeat: func() {},
}

for _, opt := range opts {
Expand Down Expand Up @@ -152,6 +174,12 @@ func DisableHeartbeat() ClientOption {
// WithLogger injects a logger into the client, so its internals can be
// recorded.
func WithLogger(l Logger) ClientOption {
return func(c *Client) { c.logger = &contextLoggerAdapter{l} }
}

// WithContextLogger injects a logger into the client, so its internals can be
// recorded.
func WithContextLogger(l ContextLogger) ClientOption {
return func(c *Client) { c.logger = l }
}

Expand Down Expand Up @@ -323,7 +351,7 @@ func (c *Client) acquireLock(ctx context.Context, opt *acquireLockOptions) (*Loc
} else if l != nil {
return l, nil
}
c.logger.Println("Sleeping for a refresh period of ", getLockOptions.refreshPeriodDuration)
c.logger.Println(ctx, "Sleeping for a refresh period of ", getLockOptions.refreshPeriodDuration)
select {
case <-ctx.Done():
return nil, ctx.Err()
Expand All @@ -333,7 +361,7 @@ func (c *Client) acquireLock(ctx context.Context, opt *acquireLockOptions) (*Loc
}

func (c *Client) storeLock(ctx context.Context, getLockOptions *getLockOptions) (*Lock, error) {
c.logger.Println("Call GetItem to see if the lock for ",
c.logger.Println(ctx, "Call GetItem to see if the lock for ",
c.partitionKeyName, " =", getLockOptions.partitionKeyName, " exists in the table")
existingLock, err := c.getLockFromDynamoDB(ctx, *getLockOptions)
if err != nil {
Expand Down Expand Up @@ -475,7 +503,7 @@ func (c *Client) upsertAndMonitorExpiredLock(
ExpressionAttributeValues: putItemExpr.Values(),
}

c.logger.Println("Acquiring an existing lock whose revisionVersionNumber did not change for ",
c.logger.Println(ctx, "Acquiring an existing lock whose revisionVersionNumber did not change for ",
c.partitionKeyName, " partitionKeyName=", key)
return c.putLockItemAndStartSessionMonitor(
ctx, additionalAttributes, key, deleteLockOnRelease, newLockData,
Expand Down Expand Up @@ -513,7 +541,7 @@ func (c *Client) upsertAndMonitorNewOrReleasedLock(
// lock into DynamoDB should err on the side of thinking the lock will
// expire sooner than it actually will, so they start counting towards
// its expiration before the Put succeeds
c.logger.Println("Acquiring a new lock or an existing yet released lock on ", c.partitionKeyName, "=", key)
c.logger.Println(ctx, "Acquiring a new lock or an existing yet released lock on ", c.partitionKeyName, "=", key)
return c.putLockItemAndStartSessionMonitor(ctx, additionalAttributes, key,
deleteLockOnRelease, newLockData,
recordVersionNumber, sessionMonitor, req)
Expand Down Expand Up @@ -646,19 +674,19 @@ func randString(n int) string {
}

func (c *Client) heartbeat(ctx context.Context) {
c.logger.Println("starting heartbeats")
c.logger.Println(ctx, "starting heartbeats")
tick := time.NewTicker(c.heartbeatPeriod)
defer tick.Stop()
for range tick.C {
c.locks.Range(func(_ interface{}, value interface{}) bool {
lockItem := value.(*Lock)
if err := c.SendHeartbeat(lockItem); err != nil {
c.logger.Println("error sending heartbeat to", lockItem.partitionKey, ":", err)
c.logger.Println(ctx, "error sending heartbeat to", lockItem.partitionKey, ":", err)
}
return true
})
if ctx.Err() != nil {
c.logger.Println("client closed, stopping heartbeat")
c.logger.Println(ctx, "client closed, stopping heartbeat")
return
}
}
Expand Down Expand Up @@ -1019,7 +1047,7 @@ func (c *Client) lockSessionMonitorChecker(ctx context.Context,
default:
timeUntilDangerZone, err := lock.timeUntilDangerZoneEntered()
if err != nil {
c.logger.Println("cannot run session monitor because", err)
c.logger.Println(ctx, "cannot run session monitor because", err)
return
}
if timeUntilDangerZone <= 0 {
Expand Down

0 comments on commit c471462

Please sign in to comment.