Skip to content

Commit

Permalink
Decouple HookTask from Repository (#17940)
Browse files Browse the repository at this point in the history
At the moment a repository reference is needed for webhooks. With the
upcoming package PR we need to send webhooks without a repository
reference. For example a package is uploaded to an organization. In
theory this enables the usage of webhooks for future user actions.

This PR removes the repository id from `HookTask` and changes how the
hooks are processed (see `services/webhook/deliver.go`). In a follow up
PR I want to remove the usage of the `UniqueQueue´ and replace it with a
normal queue because there is no reason to be unique.

Co-authored-by: 6543 <6543@obermui.de>
  • Loading branch information
KN4CK3R and 6543 committed Oct 21, 2022
1 parent e828564 commit 1887c95
Show file tree
Hide file tree
Showing 12 changed files with 216 additions and 279 deletions.
1 change: 0 additions & 1 deletion models/fixtures/hook_task.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
-
id: 1
repo_id: 1
hook_id: 1
uuid: uuid1
is_delivered: true
6 changes: 5 additions & 1 deletion models/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,14 +123,18 @@ func DeleteRepository(doer *user_model.User, uid, repoID int64) error {
return err
}

if _, err := db.GetEngine(ctx).In("hook_id", builder.Select("id").From("webhook").Where(builder.Eq{"webhook.repo_id": repo.ID})).
Delete(&webhook.HookTask{}); err != nil {
return err
}

if err := db.DeleteBeans(ctx,
&access_model.Access{RepoID: repo.ID},
&activities_model.Action{RepoID: repo.ID},
&repo_model.Collaboration{RepoID: repoID},
&issues_model.Comment{RefRepoID: repoID},
&git_model.CommitStatus{RepoID: repoID},
&git_model.DeletedBranch{RepoID: repoID},
&webhook.HookTask{RepoID: repoID},
&git_model.LFSLock{RepoID: repoID},
&repo_model.LanguageStat{RepoID: repoID},
&issues_model.Milestone{RepoID: repoID},
Expand Down
88 changes: 43 additions & 45 deletions models/webhook/hooktask.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ type HookResponse struct {
// HookTask represents a hook task.
type HookTask struct {
ID int64 `xorm:"pk autoincr"`
RepoID int64 `xorm:"INDEX"`
HookID int64
UUID string
api.Payloader `xorm:"-"`
Expand Down Expand Up @@ -178,14 +177,29 @@ func HookTasks(hookID int64, page int) ([]*HookTask, error) {

// CreateHookTask creates a new hook task,
// it handles conversion from Payload to PayloadContent.
func CreateHookTask(t *HookTask) error {
func CreateHookTask(ctx context.Context, t *HookTask) (*HookTask, error) {
data, err := t.Payloader.JSONPayload()
if err != nil {
return err
return nil, err
}
t.UUID = gouuid.New().String()
t.PayloadContent = string(data)
return db.Insert(db.DefaultContext, t)
return t, db.Insert(ctx, t)
}

func GetHookTaskByID(ctx context.Context, id int64) (*HookTask, error) {
t := &HookTask{}

has, err := db.GetEngine(ctx).ID(id).Get(t)
if err != nil {
return nil, err
}
if !has {
return nil, ErrHookTaskNotExist{
TaskID: id,
}
}
return t, nil
}

// UpdateHookTask updates information of hook task.
Expand All @@ -195,53 +209,36 @@ func UpdateHookTask(t *HookTask) error {
}

// ReplayHookTask copies a hook task to get re-delivered
func ReplayHookTask(hookID int64, uuid string) (*HookTask, error) {
var newTask *HookTask

err := db.WithTx(func(ctx context.Context) error {
task := &HookTask{
func ReplayHookTask(ctx context.Context, hookID int64, uuid string) (*HookTask, error) {
task := &HookTask{
HookID: hookID,
UUID: uuid,
}
has, err := db.GetByBean(ctx, task)
if err != nil {
return nil, err
} else if !has {
return nil, ErrHookTaskNotExist{
HookID: hookID,
UUID: uuid,
}
has, err := db.GetByBean(ctx, task)
if err != nil {
return err
} else if !has {
return ErrHookTaskNotExist{
HookID: hookID,
UUID: uuid,
}
}

newTask = &HookTask{
UUID: gouuid.New().String(),
RepoID: task.RepoID,
HookID: task.HookID,
PayloadContent: task.PayloadContent,
EventType: task.EventType,
}
return db.Insert(ctx, newTask)
})
}

return newTask, err
newTask := &HookTask{
UUID: gouuid.New().String(),
HookID: task.HookID,
PayloadContent: task.PayloadContent,
EventType: task.EventType,
}
return newTask, db.Insert(ctx, newTask)
}

// FindUndeliveredHookTasks represents find the undelivered hook tasks
func FindUndeliveredHookTasks() ([]*HookTask, error) {
func FindUndeliveredHookTasks(ctx context.Context) ([]*HookTask, error) {
tasks := make([]*HookTask, 0, 10)
if err := db.GetEngine(db.DefaultContext).Where("is_delivered=?", false).Find(&tasks); err != nil {
return nil, err
}
return tasks, nil
}

// FindRepoUndeliveredHookTasks represents find the undelivered hook tasks of one repository
func FindRepoUndeliveredHookTasks(repoID int64) ([]*HookTask, error) {
tasks := make([]*HookTask, 0, 5)
if err := db.GetEngine(db.DefaultContext).Where("repo_id=? AND is_delivered=?", repoID, false).Find(&tasks); err != nil {
return nil, err
}
return tasks, nil
return tasks, db.GetEngine(ctx).
Where("is_delivered=?", false).
Find(&tasks)
}

// CleanupHookTaskTable deletes rows from hook_task as needed.
Expand All @@ -250,7 +247,7 @@ func CleanupHookTaskTable(ctx context.Context, cleanupType HookTaskCleanupType,

if cleanupType == OlderThan {
deleteOlderThan := time.Now().Add(-olderThan).UnixNano()
deletes, err := db.GetEngine(db.DefaultContext).
deletes, err := db.GetEngine(ctx).
Where("is_delivered = ? and delivered < ?", true, deleteOlderThan).
Delete(new(HookTask))
if err != nil {
Expand All @@ -259,7 +256,8 @@ func CleanupHookTaskTable(ctx context.Context, cleanupType HookTaskCleanupType,
log.Trace("Deleted %d rows from hook_task", deletes)
} else if cleanupType == PerWebhook {
hookIDs := make([]int64, 0, 10)
err := db.GetEngine(db.DefaultContext).Table("webhook").
err := db.GetEngine(ctx).
Table("webhook").
Where("id > 0").
Cols("id").
Find(&hookIDs)
Expand Down
10 changes: 2 additions & 8 deletions models/webhook/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,6 @@ import (
"xorm.io/builder"
)

// __ __ ___. .__ __
// / \ / \ ____\_ |__ | |__ ____ ____ | | __
// \ \/\/ // __ \| __ \| | \ / _ \ / _ \| |/ /
// \ /\ ___/| \_\ \ Y ( <_> | <_> ) <
// \__/\ / \___ >___ /___| /\____/ \____/|__|_ \
// \/ \/ \/ \/ \/

// ErrWebhookNotExist represents a "WebhookNotExist" kind of error.
type ErrWebhookNotExist struct {
ID int64
Expand All @@ -47,6 +40,7 @@ func (err ErrWebhookNotExist) Unwrap() error {

// ErrHookTaskNotExist represents a "HookTaskNotExist" kind of error.
type ErrHookTaskNotExist struct {
TaskID int64
HookID int64
UUID string
}
Expand All @@ -58,7 +52,7 @@ func IsErrHookTaskNotExist(err error) bool {
}

func (err ErrHookTaskNotExist) Error() string {
return fmt.Sprintf("hook task does not exist [hook: %d, uuid: %s]", err.HookID, err.UUID)
return fmt.Sprintf("hook task does not exist [task: %d, hook: %d, uuid: %s]", err.TaskID, err.HookID, err.UUID)
}

func (err ErrHookTaskNotExist) Unwrap() error {
Expand Down
28 changes: 14 additions & 14 deletions models/webhook/webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,12 +208,12 @@ func TestHookTasks(t *testing.T) {
func TestCreateHookTask(t *testing.T) {
assert.NoError(t, unittest.PrepareTestDatabase())
hookTask := &HookTask{
RepoID: 3,
HookID: 3,
Payloader: &api.PushPayload{},
}
unittest.AssertNotExistsBean(t, hookTask)
assert.NoError(t, CreateHookTask(hookTask))
_, err := CreateHookTask(db.DefaultContext, hookTask)
assert.NoError(t, err)
unittest.AssertExistsAndLoadBean(t, hookTask)
}

Expand All @@ -232,14 +232,14 @@ func TestUpdateHookTask(t *testing.T) {
func TestCleanupHookTaskTable_PerWebhook_DeletesDelivered(t *testing.T) {
assert.NoError(t, unittest.PrepareTestDatabase())
hookTask := &HookTask{
RepoID: 3,
HookID: 3,
Payloader: &api.PushPayload{},
IsDelivered: true,
Delivered: time.Now().UnixNano(),
}
unittest.AssertNotExistsBean(t, hookTask)
assert.NoError(t, CreateHookTask(hookTask))
_, err := CreateHookTask(db.DefaultContext, hookTask)
assert.NoError(t, err)
unittest.AssertExistsAndLoadBean(t, hookTask)

assert.NoError(t, CleanupHookTaskTable(context.Background(), PerWebhook, 168*time.Hour, 0))
Expand All @@ -249,13 +249,13 @@ func TestCleanupHookTaskTable_PerWebhook_DeletesDelivered(t *testing.T) {
func TestCleanupHookTaskTable_PerWebhook_LeavesUndelivered(t *testing.T) {
assert.NoError(t, unittest.PrepareTestDatabase())
hookTask := &HookTask{
RepoID: 2,
HookID: 4,
Payloader: &api.PushPayload{},
IsDelivered: false,
}
unittest.AssertNotExistsBean(t, hookTask)
assert.NoError(t, CreateHookTask(hookTask))
_, err := CreateHookTask(db.DefaultContext, hookTask)
assert.NoError(t, err)
unittest.AssertExistsAndLoadBean(t, hookTask)

assert.NoError(t, CleanupHookTaskTable(context.Background(), PerWebhook, 168*time.Hour, 0))
Expand All @@ -265,14 +265,14 @@ func TestCleanupHookTaskTable_PerWebhook_LeavesUndelivered(t *testing.T) {
func TestCleanupHookTaskTable_PerWebhook_LeavesMostRecentTask(t *testing.T) {
assert.NoError(t, unittest.PrepareTestDatabase())
hookTask := &HookTask{
RepoID: 2,
HookID: 4,
Payloader: &api.PushPayload{},
IsDelivered: true,
Delivered: time.Now().UnixNano(),
}
unittest.AssertNotExistsBean(t, hookTask)
assert.NoError(t, CreateHookTask(hookTask))
_, err := CreateHookTask(db.DefaultContext, hookTask)
assert.NoError(t, err)
unittest.AssertExistsAndLoadBean(t, hookTask)

assert.NoError(t, CleanupHookTaskTable(context.Background(), PerWebhook, 168*time.Hour, 1))
Expand All @@ -282,14 +282,14 @@ func TestCleanupHookTaskTable_PerWebhook_LeavesMostRecentTask(t *testing.T) {
func TestCleanupHookTaskTable_OlderThan_DeletesDelivered(t *testing.T) {
assert.NoError(t, unittest.PrepareTestDatabase())
hookTask := &HookTask{
RepoID: 3,
HookID: 3,
Payloader: &api.PushPayload{},
IsDelivered: true,
Delivered: time.Now().AddDate(0, 0, -8).UnixNano(),
}
unittest.AssertNotExistsBean(t, hookTask)
assert.NoError(t, CreateHookTask(hookTask))
_, err := CreateHookTask(db.DefaultContext, hookTask)
assert.NoError(t, err)
unittest.AssertExistsAndLoadBean(t, hookTask)

assert.NoError(t, CleanupHookTaskTable(context.Background(), OlderThan, 168*time.Hour, 0))
Expand All @@ -299,13 +299,13 @@ func TestCleanupHookTaskTable_OlderThan_DeletesDelivered(t *testing.T) {
func TestCleanupHookTaskTable_OlderThan_LeavesUndelivered(t *testing.T) {
assert.NoError(t, unittest.PrepareTestDatabase())
hookTask := &HookTask{
RepoID: 2,
HookID: 4,
Payloader: &api.PushPayload{},
IsDelivered: false,
}
unittest.AssertNotExistsBean(t, hookTask)
assert.NoError(t, CreateHookTask(hookTask))
_, err := CreateHookTask(db.DefaultContext, hookTask)
assert.NoError(t, err)
unittest.AssertExistsAndLoadBean(t, hookTask)

assert.NoError(t, CleanupHookTaskTable(context.Background(), OlderThan, 168*time.Hour, 0))
Expand All @@ -315,14 +315,14 @@ func TestCleanupHookTaskTable_OlderThan_LeavesUndelivered(t *testing.T) {
func TestCleanupHookTaskTable_OlderThan_LeavesTaskEarlierThanAgeToDelete(t *testing.T) {
assert.NoError(t, unittest.PrepareTestDatabase())
hookTask := &HookTask{
RepoID: 2,
HookID: 4,
Payloader: &api.PushPayload{},
IsDelivered: true,
Delivered: time.Now().AddDate(0, 0, -6).UnixNano(),
}
unittest.AssertNotExistsBean(t, hookTask)
assert.NoError(t, CreateHookTask(hookTask))
_, err := CreateHookTask(db.DefaultContext, hookTask)
assert.NoError(t, err)
unittest.AssertExistsAndLoadBean(t, hookTask)

assert.NoError(t, CleanupHookTaskTable(context.Background(), OlderThan, 168*time.Hour, 0))
Expand Down
Loading

0 comments on commit 1887c95

Please sign in to comment.