Skip to content

Commit

Permalink
Merge b0c130d into 97b6cac
Browse files Browse the repository at this point in the history
  • Loading branch information
fr33r committed Jul 26, 2020
2 parents 97b6cac + b0c130d commit f986663
Show file tree
Hide file tree
Showing 6 changed files with 384 additions and 37 deletions.
45 changes: 29 additions & 16 deletions v3/best_effort_unit.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,13 @@ func (u *bestEffortUnit) rollback() (err error) {
}

func (u *bestEffortUnit) applyInserts() (err error) {
u.logger.Debug("attempting to insert entities", zap.Int("count", len(u.additions)))
for typeName, additions := range u.additions {
if err = u.mappers[typeName].Insert(additions...); err != nil {
err = multierr.Combine(err, u.rollback())
u.executeActions(UnitActionTypeBeforeRollback)
var errRb error
if errRb = u.rollback(); errRb == nil {
u.executeActions(UnitActionTypeAfterRollback)
}
u.logger.Error(err.Error(), zap.String("typeName", typeName.String()))
return
}
Expand All @@ -169,10 +172,13 @@ func (u *bestEffortUnit) applyInserts() (err error) {
}

func (u *bestEffortUnit) applyUpdates() (err error) {
u.logger.Debug("attempting to update entities", zap.Int("count", len(u.alterations)))
for typeName, alterations := range u.alterations {
if err = u.mappers[typeName].Update(alterations...); err != nil {
err = multierr.Combine(err, u.rollback())
u.executeActions(UnitActionTypeBeforeRollback)
var errRb error
if errRb = u.rollback(); errRb == nil {
u.executeActions(UnitActionTypeAfterRollback)
}
u.logger.Error(err.Error(), zap.String("typeName", typeName.String()))
return
}
Expand All @@ -187,10 +193,14 @@ func (u *bestEffortUnit) applyUpdates() (err error) {
}

func (u *bestEffortUnit) applyDeletes() (err error) {
u.logger.Debug("attempting to remove entities", zap.Int("count", len(u.removals)))
for typeName, removals := range u.removals {
if err = u.mappers[typeName].Delete(removals...); err != nil {
err = multierr.Combine(err, u.rollback())
u.executeActions(UnitActionTypeBeforeRollback)
var errRb error
if errRb = u.rollback(); errRb == nil {
u.executeActions(UnitActionTypeAfterRollback)
}
err = multierr.Combine(err, errRb)
u.logger.Error(err.Error(), zap.String("typeName", typeName.String()))
return
}
Expand Down Expand Up @@ -243,6 +253,7 @@ func (u *bestEffortUnit) Remove(entities ...interface{}) error {
// Save commits the new additions, modifications, and removals
// within the work unit to a persistent store.
func (u *bestEffortUnit) Save() (err error) {
u.executeActions(UnitActionTypeBeforeSave)

//setup timer.
stop := u.scope.Timer(save).Start().Stop
Expand All @@ -251,39 +262,41 @@ func (u *bestEffortUnit) Save() (err error) {
defer func() {
stop()
if r := recover(); r != nil {
u.executeActions(UnitActionTypeBeforeRollback)
if err = u.rollback(); err == nil {
u.executeActions(UnitActionTypeAfterRollback)
}
err = multierr.Combine(
fmt.Errorf("panic: unable to save work unit\n%v", r), u.rollback())
fmt.Errorf("panic: unable to save work unit\n%v", r), err)
u.logger.Error("panic: unable to save work unit",
zap.String("panic", fmt.Sprintf("%v", r)))
panic(r)
}
if err == nil {
u.scope.Counter(saveSuccess).Inc(1)
u.executeActions(UnitActionTypeAfterSave)
}
}()

//insert newly added entities.
u.executeActions(UnitActionTypeBeforeInserts)
if err = u.applyInserts(); err != nil {
return
}
u.executeActions(UnitActionTypeAfterInserts)

//update altered entities.
u.executeActions(UnitActionTypeBeforeUpdates)
if err = u.applyUpdates(); err != nil {
return
}
u.executeActions(UnitActionTypeAfterUpdates)

//delete removed entities.
u.executeActions(UnitActionTypeBeforeDeletes)
if err = u.applyDeletes(); err != nil {
return
}

totalCount :=
u.additionCount + u.alterationCount + u.removalCount + u.registerCount
u.logger.Info("successfully saved unit",
zap.Int("insertCount", u.additionCount),
zap.Int("updateCount", u.alterationCount),
zap.Int("deleteCount", u.removalCount),
zap.Int("registerCount", u.registerCount),
zap.Int("totalCount", totalCount))
u.executeActions(UnitActionTypeAfterDeletes)
return
}
52 changes: 34 additions & 18 deletions v3/sql_unit.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ func NewSQLUnit(

// set defaults.
o := UnitOptions{
Logger: zap.NewNop(),
Scope: tally.NoopScope,
Logger: zap.NewNop(),
Scope: tally.NoopScope,
Actions: make(map[UnitActionType][]UnitAction),
}

// apply options.
Expand Down Expand Up @@ -123,10 +124,14 @@ func (u *sqlUnit) rollback(tx *sql.Tx) (err error) {
}

func (u *sqlUnit) applyInserts(tx *sql.Tx) (err error) {
u.logger.Debug("attempting to insert entities", zap.Int("count", u.additionCount))
for typeName, additions := range u.additions {
if err = u.mappers[typeName].Insert(tx, additions...); err != nil {
err = multierr.Combine(err, u.rollback(tx))
u.executeActions(UnitActionTypeBeforeRollback)
var errRb error
if errRb = u.rollback(tx); errRb == nil {
u.executeActions(UnitActionTypeAfterRollback)
}
err = multierr.Combine(err, errRb)
u.logger.Error(err.Error(), zap.String("typeName", typeName.String()))
return
}
Expand All @@ -135,10 +140,14 @@ func (u *sqlUnit) applyInserts(tx *sql.Tx) (err error) {
}

func (u *sqlUnit) applyUpdates(tx *sql.Tx) (err error) {
u.logger.Debug("attempting to update entities", zap.Int("count", u.alterationCount))
for typeName, alterations := range u.alterations {
if err = u.mappers[typeName].Update(tx, alterations...); err != nil {
err = multierr.Combine(err, u.rollback(tx))
u.executeActions(UnitActionTypeBeforeRollback)
var errRb error
if errRb = u.rollback(tx); errRb == nil {
u.executeActions(UnitActionTypeAfterRollback)
}
err = multierr.Combine(err, errRb)
u.logger.Error(err.Error(), zap.String("typeName", typeName.String()))
return
}
Expand All @@ -147,10 +156,13 @@ func (u *sqlUnit) applyUpdates(tx *sql.Tx) (err error) {
}

func (u *sqlUnit) applyDeletes(tx *sql.Tx) (err error) {
u.logger.Debug("attempting to remove entities", zap.Int("count", u.removalCount))
for typeName, removals := range u.removals {
if err = u.mappers[typeName].Delete(tx, removals...); err != nil {
err = multierr.Combine(err, u.rollback(tx))
u.executeActions(UnitActionTypeBeforeRollback)
var errRb error
if errRb = u.rollback(tx); errRb == nil {
u.executeActions(UnitActionTypeAfterRollback)
}
u.logger.Error(err.Error(), zap.String("typeName", typeName.String()))
return
}
Expand All @@ -161,13 +173,15 @@ func (u *sqlUnit) applyDeletes(tx *sql.Tx) (err error) {
// Save commits the new additions, modifications, and removals
// within the work unit to an SQL store.
func (u *sqlUnit) Save() (err error) {
u.executeActions(UnitActionTypeBeforeSave)

//setup timer.
stop := u.scope.Timer(save).Start().Stop
defer func() {
stop()
if err == nil {
u.scope.Counter(saveSuccess).Inc(1)
u.executeActions(UnitActionTypeAfterSave)
}
}()

Expand All @@ -184,44 +198,46 @@ func (u *sqlUnit) Save() (err error) {
//rollback if there is a panic.
defer func() {
if r := recover(); r != nil {
u.executeActions(UnitActionTypeBeforeRollback)
if err = u.rollback(tx); err == nil {
u.executeActions(UnitActionTypeAfterRollback)
}
msg := "panic: unable to save work unit"
err = multierr.Combine(fmt.Errorf("%s\n%v", msg, r), u.rollback(tx))
err = multierr.Combine(fmt.Errorf("%s\n%v", msg, r), err)
u.logger.Error(msg, zap.String("panic", fmt.Sprintf("%v", r)))
panic(r)
}
}()

//insert newly added entities.
u.executeActions(UnitActionTypeBeforeInserts)
if err = u.applyInserts(tx); err != nil {
return
}
u.executeActions(UnitActionTypeAfterInserts)

//update altered entities.
u.executeActions(UnitActionTypeBeforeUpdates)
if err = u.applyUpdates(tx); err != nil {
return
}
u.executeActions(UnitActionTypeAfterUpdates)

//delete removed entities.
u.executeActions(UnitActionTypeBeforeDeletes)
if err = u.applyDeletes(tx); err != nil {
return
}
u.executeActions(UnitActionTypeAfterDeletes)

if err = tx.Commit(); err != nil {
// consider error during transaction commit as successful rollback,
// since the rollback is implicitly done.
// please see https://golang.org/src/database/sql/sql.go#L1991 for reference.
u.executeActions(UnitActionTypeAfterRollback)
u.scope.Counter(rollbackSuccess).Inc(1)
u.logger.Error(err.Error())
return
}

totalCount :=
u.additionCount + u.alterationCount + u.removalCount + u.registerCount
u.logger.Info("successfully saved unit",
zap.Int("insertCount", u.additionCount),
zap.Int("updateCount", u.alterationCount),
zap.Int("deleteCount", u.removalCount),
zap.Int("registerCount", u.registerCount),
zap.Int("totalCount", totalCount))
return
}
28 changes: 27 additions & 1 deletion v3/unit.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,21 +73,28 @@ type unit struct {
registerCount int
logger *zap.Logger
scope tally.Scope
actions map[UnitActionType][]UnitAction
}

func newUnit(options UnitOptions) unit {
if !options.DisableDefaultLoggingActions {
UnitDefaultLoggingActions()(&options)
}

u := unit{
additions: make(map[TypeName][]interface{}),
alterations: make(map[TypeName][]interface{}),
removals: make(map[TypeName][]interface{}),
registered: make(map[TypeName][]interface{}),
logger: options.Logger,
scope: options.Scope.SubScope("unit"),
actions: options.Actions,
}
return u
}

func (u *unit) register(checker func(t TypeName) bool, entities ...interface{}) error {
u.executeActions(UnitActionTypeBeforeRegister)
for _, entity := range entities {
tName := TypeNameOf(entity)
if ok := checker(tName); !ok {
Expand All @@ -101,11 +108,12 @@ func (u *unit) register(checker func(t TypeName) bool, entities ...interface{})
u.registered[tName] = append(u.registered[tName], entity)
u.registerCount = u.registerCount + 1
}
u.executeActions(UnitActionTypeAfterRegister)
return nil
}

func (u *unit) add(checker func(t TypeName) bool, entities ...interface{}) error {

u.executeActions(UnitActionTypeBeforeAdd)
for _, entity := range entities {
tName := TypeNameOf(entity)
if ok := checker(tName); !ok {
Expand All @@ -120,10 +128,12 @@ func (u *unit) add(checker func(t TypeName) bool, entities ...interface{}) error
u.additions[tName] = append(u.additions[tName], entity)
u.additionCount = u.additionCount + 1
}
u.executeActions(UnitActionTypeAfterAdd)
return nil
}

func (u *unit) alter(checker func(t TypeName) bool, entities ...interface{}) error {
u.executeActions(UnitActionTypeBeforeAlter)
for _, entity := range entities {
tName := TypeNameOf(entity)
if ok := checker(tName); !ok {
Expand All @@ -138,10 +148,12 @@ func (u *unit) alter(checker func(t TypeName) bool, entities ...interface{}) err
u.alterations[tName] = append(u.alterations[tName], entity)
u.alterationCount = u.alterationCount + 1
}
u.executeActions(UnitActionTypeAfterAlter)
return nil
}

func (u *unit) remove(checker func(t TypeName) bool, entities ...interface{}) error {
u.executeActions(UnitActionTypeBeforeRemove)
for _, entity := range entities {
tName := TypeNameOf(entity)
if ok := checker(tName); !ok {
Expand All @@ -156,5 +168,19 @@ func (u *unit) remove(checker func(t TypeName) bool, entities ...interface{}) er
u.removals[tName] = append(u.removals[tName], entity)
u.removalCount = u.removalCount + 1
}
u.executeActions(UnitActionTypeAfterRemove)
return nil
}

func (u *unit) executeActions(actionType UnitActionType) {
for _, action := range u.actions[actionType] {
action(UnitActionContext{
Logger: u.logger,
Scope: u.scope,
AdditionCount: u.additionCount,
AlterationCount: u.alterationCount,
RemovalCount: u.removalCount,
RegisterCount: u.registerCount,
})
}
}
62 changes: 62 additions & 0 deletions v3/unit_action.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/* Copyright 2019 Freerware
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package work

// Action represents an operation performed during a paticular lifecycle event of a work unit.
type UnitAction func(UnitActionContext)

// UnitActionType represents the type of work unit action.
type UnitActionType int

// The various types of actions that are executed throughout the lifecycle of a work unit.
const (
// UnitActionTypeAfterRegister indicates an action type that occurs after an entity is registered.
UnitActionTypeAfterRegister = iota
// UnitActionTypeAfterAdd indicates an action type that occurs after an entity is added.
UnitActionTypeAfterAdd
// UnitActionTypeAfterAlter indicates an action type that occurs after an entity is altered.
UnitActionTypeAfterAlter
// UnitActionTypeAfterRemove indicates an action type that occurs after an entity is removed.
UnitActionTypeAfterRemove
// UnitActionTypeAfterInserts indicates an action type that occurs after new entities are inserted in the data store.
UnitActionTypeAfterInserts
// UnitActionTypeAfterUpdates indicates an action type that occurs after existing entities are updated in the data store.
UnitActionTypeAfterUpdates
// UnitActionTypeAfterDeletes indicates an action type that occurs after existing entities are deleted in the data store.
UnitActionTypeAfterDeletes
// UnitActionTypeAfterRollback indicates an action type that occurs after rollback.
UnitActionTypeAfterRollback
// UnitActionTypeAfterSave indicates an action type that occurs after save.
UnitActionTypeAfterSave
// UnitActionTypeBeforeRegister indicates an action type that occurs before an entity is registered.
UnitActionTypeBeforeRegister
// UnitActionTypeBeforeAdd indicates an action type that occurs before an entity is added.
UnitActionTypeBeforeAdd
// UnitActionTypeBeforeAlter indicates an action type that occurs before an entity is altered.
UnitActionTypeBeforeAlter
// UnitActionTypeBeforeRemove indicates an action type that occurs before an entity is removed.
UnitActionTypeBeforeRemove
// UnitActionTypeBeforeInserts indicates an action type that occurs before new entities are inserted in the data store.
UnitActionTypeBeforeInserts
// UnitActionTypeBeforeUpdates indicates an action type that occurs before existing entities are updated in the data store.
UnitActionTypeBeforeUpdates
// UnitActionTypeBeforeDeletes indicates an action type that occurs before existing entities are deleted in the data store.
UnitActionTypeBeforeDeletes
// UnitActionTypeBeforeRollback indicates an action type that occurs before rollback.
UnitActionTypeBeforeRollback
// UnitActionTypeBeforeSave indicates an action type that occurs before save.
UnitActionTypeBeforeSave
)

0 comments on commit f986663

Please sign in to comment.