From 94c442a6322a07e5fb3f51b3ba862c8b57d7cd9e Mon Sep 17 00:00:00 2001 From: KN4CK3R Date: Thu, 9 Dec 2021 16:13:58 +0000 Subject: [PATCH 01/11] Decouple HookTask from Repository. --- models/fixtures/hook_task.yml | 1 - models/migrations/migrations.go | 2 + models/migrations/v204.go | 21 ++++++++ models/repo.go | 1 - models/webhook/hooktask.go | 10 ---- models/webhook/webhook_test.go | 7 --- modules/notification/webhook/webhook.go | 72 ++++++++++++------------- routers/api/v1/repo/hook.go | 2 +- routers/api/v1/repo/hook_test.go | 1 - routers/web/repo/webhook.go | 2 +- services/webhook/deliver.go | 19 +++---- services/webhook/webhook.go | 56 +++++++++++-------- services/webhook/webhook_test.go | 12 ++--- 13 files changed, 108 insertions(+), 98 deletions(-) create mode 100644 models/migrations/v204.go diff --git a/models/fixtures/hook_task.yml b/models/fixtures/hook_task.yml index bb662345cdff..6dbb10151abf 100644 --- a/models/fixtures/hook_task.yml +++ b/models/fixtures/hook_task.yml @@ -1,6 +1,5 @@ - id: 1 - repo_id: 1 hook_id: 1 uuid: uuid1 is_delivered: true diff --git a/models/migrations/migrations.go b/models/migrations/migrations.go index a5bacd0d92bd..3003ec1c0bf3 100644 --- a/models/migrations/migrations.go +++ b/models/migrations/migrations.go @@ -361,6 +361,8 @@ var migrations = []Migration{ NewMigration("Create key/value table for user settings", createUserSettingsTable), // v203 -> v204 NewMigration("Add Sorting to ProjectIssue table", addProjectIssueSorting), + // v204 -> v205 + NewMigration("Drop column repo_id from HookTask table", dropColumnRepoIDOnHookTask), } // GetCurrentDBVersion returns the current db version diff --git a/models/migrations/v204.go b/models/migrations/v204.go new file mode 100644 index 000000000000..ce01a4dc53a2 --- /dev/null +++ b/models/migrations/v204.go @@ -0,0 +1,21 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package migrations + +import ( + "xorm.io/xorm" +) + +func dropColumnRepoIDOnHookTask(x *xorm.Engine) error { + sess := x.NewSession() + defer sess.Close() + if err := sess.Begin(); err != nil { + return err + } + if err := dropTableColumns(sess, "hook_task", "repo_id"); err != nil { + return err + } + return sess.Commit() +} diff --git a/models/repo.go b/models/repo.go index 4f6b1c346467..d25da4545328 100644 --- a/models/repo.go +++ b/models/repo.go @@ -1515,7 +1515,6 @@ func DeleteRepository(doer *user_model.User, uid, repoID int64) error { &Comment{RefRepoID: repoID}, &CommitStatus{RepoID: repoID}, &DeletedBranch{RepoID: repoID}, - &webhook.HookTask{RepoID: repoID}, &LFSLock{RepoID: repoID}, &LanguageStat{RepoID: repoID}, &Milestone{RepoID: repoID}, diff --git a/models/webhook/hooktask.go b/models/webhook/hooktask.go index 1967ded298da..6926dcc991d0 100644 --- a/models/webhook/hooktask.go +++ b/models/webhook/hooktask.go @@ -100,7 +100,6 @@ type HookResponse struct { // HookTask represents a hook task. type HookTask struct { ID int64 `xorm:"pk autoincr"` - RepoID int64 `xorm:"INDEX"` HookID int64 UUID string api.Payloader `xorm:"-"` @@ -204,15 +203,6 @@ func FindUndeliveredHookTasks() ([]*HookTask, error) { return tasks, nil } -// FindRepoUndeliveredHookTasks represents find the undelivered hook tasks of one repository -func FindRepoUndeliveredHookTasks(repoID int64) ([]*HookTask, error) { - tasks := make([]*HookTask, 0, 5) - if err := db.GetEngine(db.DefaultContext).Where("repo_id=? AND is_delivered=?", repoID, false).Find(&tasks); err != nil { - return nil, err - } - return tasks, nil -} - // CleanupHookTaskTable deletes rows from hook_task as needed. func CleanupHookTaskTable(ctx context.Context, cleanupType HookTaskCleanupType, olderThan time.Duration, numberToKeep int) error { log.Trace("Doing: CleanupHookTaskTable") diff --git a/models/webhook/webhook_test.go b/models/webhook/webhook_test.go index d1a76795fdbe..a01b0ee4e7e5 100644 --- a/models/webhook/webhook_test.go +++ b/models/webhook/webhook_test.go @@ -207,7 +207,6 @@ func TestHookTasks(t *testing.T) { func TestCreateHookTask(t *testing.T) { assert.NoError(t, unittest.PrepareTestDatabase()) hookTask := &HookTask{ - RepoID: 3, HookID: 3, Payloader: &api.PushPayload{}, } @@ -231,7 +230,6 @@ func TestUpdateHookTask(t *testing.T) { func TestCleanupHookTaskTable_PerWebhook_DeletesDelivered(t *testing.T) { assert.NoError(t, unittest.PrepareTestDatabase()) hookTask := &HookTask{ - RepoID: 3, HookID: 3, Payloader: &api.PushPayload{}, IsDelivered: true, @@ -248,7 +246,6 @@ func TestCleanupHookTaskTable_PerWebhook_DeletesDelivered(t *testing.T) { func TestCleanupHookTaskTable_PerWebhook_LeavesUndelivered(t *testing.T) { assert.NoError(t, unittest.PrepareTestDatabase()) hookTask := &HookTask{ - RepoID: 2, HookID: 4, Payloader: &api.PushPayload{}, IsDelivered: false, @@ -264,7 +261,6 @@ func TestCleanupHookTaskTable_PerWebhook_LeavesUndelivered(t *testing.T) { func TestCleanupHookTaskTable_PerWebhook_LeavesMostRecentTask(t *testing.T) { assert.NoError(t, unittest.PrepareTestDatabase()) hookTask := &HookTask{ - RepoID: 2, HookID: 4, Payloader: &api.PushPayload{}, IsDelivered: true, @@ -281,7 +277,6 @@ func TestCleanupHookTaskTable_PerWebhook_LeavesMostRecentTask(t *testing.T) { func TestCleanupHookTaskTable_OlderThan_DeletesDelivered(t *testing.T) { assert.NoError(t, unittest.PrepareTestDatabase()) hookTask := &HookTask{ - RepoID: 3, HookID: 3, Payloader: &api.PushPayload{}, IsDelivered: true, @@ -298,7 +293,6 @@ func TestCleanupHookTaskTable_OlderThan_DeletesDelivered(t *testing.T) { func TestCleanupHookTaskTable_OlderThan_LeavesUndelivered(t *testing.T) { assert.NoError(t, unittest.PrepareTestDatabase()) hookTask := &HookTask{ - RepoID: 2, HookID: 4, Payloader: &api.PushPayload{}, IsDelivered: false, @@ -314,7 +308,6 @@ func TestCleanupHookTaskTable_OlderThan_LeavesUndelivered(t *testing.T) { func TestCleanupHookTaskTable_OlderThan_LeavesTaskEarlierThanAgeToDelete(t *testing.T) { assert.NoError(t, unittest.PrepareTestDatabase()) hookTask := &HookTask{ - RepoID: 2, HookID: 4, Payloader: &api.PushPayload{}, IsDelivered: true, diff --git a/modules/notification/webhook/webhook.go b/modules/notification/webhook/webhook.go index 378e7fd20264..25fc39e66330 100644 --- a/modules/notification/webhook/webhook.go +++ b/modules/notification/webhook/webhook.go @@ -52,7 +52,7 @@ func (m *webhookNotifier) NotifyIssueClearLabels(doer *user_model.User, issue *m return } - err = webhook_services.PrepareWebhooks(issue.Repo, webhook.HookEventPullRequestLabel, &api.PullRequestPayload{ + err = webhook_services.PrepareWebhooks(webhook_services.SourceContext{Repository: issue.Repo}, webhook.HookEventPullRequestLabel, &api.PullRequestPayload{ Action: api.HookIssueLabelCleared, Index: issue.Index, PullRequest: convert.ToAPIPullRequest(issue.PullRequest, nil), @@ -60,7 +60,7 @@ func (m *webhookNotifier) NotifyIssueClearLabels(doer *user_model.User, issue *m Sender: convert.ToUser(doer, nil), }) } else { - err = webhook_services.PrepareWebhooks(issue.Repo, webhook.HookEventIssueLabel, &api.IssuePayload{ + err = webhook_services.PrepareWebhooks(webhook_services.SourceContext{Repository: issue.Repo}, webhook.HookEventIssueLabel, &api.IssuePayload{ Action: api.HookIssueLabelCleared, Index: issue.Index, Issue: convert.ToAPIIssue(issue), @@ -78,7 +78,7 @@ func (m *webhookNotifier) NotifyForkRepository(doer *user_model.User, oldRepo, r mode, _ := models.AccessLevel(doer, repo) // forked webhook - if err := webhook_services.PrepareWebhooks(oldRepo, webhook.HookEventFork, &api.ForkPayload{ + if err := webhook_services.PrepareWebhooks(webhook_services.SourceContext{Repository: oldRepo}, webhook.HookEventFork, &api.ForkPayload{ Forkee: convert.ToRepo(oldRepo, oldMode), Repo: convert.ToRepo(repo, mode), Sender: convert.ToUser(doer, nil), @@ -90,7 +90,7 @@ func (m *webhookNotifier) NotifyForkRepository(doer *user_model.User, oldRepo, r // Add to hook queue for created repo after session commit. if u.IsOrganization() { - if err := webhook_services.PrepareWebhooks(repo, webhook.HookEventRepository, &api.RepositoryPayload{ + if err := webhook_services.PrepareWebhooks(webhook_services.SourceContext{Repository: repo}, webhook.HookEventRepository, &api.RepositoryPayload{ Action: api.HookRepoCreated, Repository: convert.ToRepo(repo, perm.AccessModeOwner), Organization: convert.ToUser(u, nil), @@ -103,7 +103,7 @@ func (m *webhookNotifier) NotifyForkRepository(doer *user_model.User, oldRepo, r func (m *webhookNotifier) NotifyCreateRepository(doer *user_model.User, u *user_model.User, repo *models.Repository) { // Add to hook queue for created repo after session commit. - if err := webhook_services.PrepareWebhooks(repo, webhook.HookEventRepository, &api.RepositoryPayload{ + if err := webhook_services.PrepareWebhooks(webhook_services.SourceContext{Repository: repo}, webhook.HookEventRepository, &api.RepositoryPayload{ Action: api.HookRepoCreated, Repository: convert.ToRepo(repo, perm.AccessModeOwner), Organization: convert.ToUser(u, nil), @@ -116,7 +116,7 @@ func (m *webhookNotifier) NotifyCreateRepository(doer *user_model.User, u *user_ func (m *webhookNotifier) NotifyDeleteRepository(doer *user_model.User, repo *models.Repository) { u := repo.MustOwner() - if err := webhook_services.PrepareWebhooks(repo, webhook.HookEventRepository, &api.RepositoryPayload{ + if err := webhook_services.PrepareWebhooks(webhook_services.SourceContext{Repository: repo}, webhook.HookEventRepository, &api.RepositoryPayload{ Action: api.HookRepoDeleted, Repository: convert.ToRepo(repo, perm.AccessModeOwner), Organization: convert.ToUser(u, nil), @@ -128,7 +128,7 @@ func (m *webhookNotifier) NotifyDeleteRepository(doer *user_model.User, repo *mo func (m *webhookNotifier) NotifyMigrateRepository(doer *user_model.User, u *user_model.User, repo *models.Repository) { // Add to hook queue for created repo after session commit. - if err := webhook_services.PrepareWebhooks(repo, webhook.HookEventRepository, &api.RepositoryPayload{ + if err := webhook_services.PrepareWebhooks(webhook_services.SourceContext{Repository: repo}, webhook.HookEventRepository, &api.RepositoryPayload{ Action: api.HookRepoCreated, Repository: convert.ToRepo(repo, perm.AccessModeOwner), Organization: convert.ToUser(u, nil), @@ -159,7 +159,7 @@ func (m *webhookNotifier) NotifyIssueChangeAssignee(doer *user_model.User, issue apiPullRequest.Action = api.HookIssueAssigned } // Assignee comment triggers a webhook - if err := webhook_services.PrepareWebhooks(issue.Repo, webhook.HookEventPullRequestAssign, apiPullRequest); err != nil { + if err := webhook_services.PrepareWebhooks(webhook_services.SourceContext{Repository: issue.Repo}, webhook.HookEventPullRequestAssign, apiPullRequest); err != nil { log.Error("PrepareWebhooks [is_pull: %v, remove_assignee: %v]: %v", issue.IsPull, removed, err) return } @@ -177,7 +177,7 @@ func (m *webhookNotifier) NotifyIssueChangeAssignee(doer *user_model.User, issue apiIssue.Action = api.HookIssueAssigned } // Assignee comment triggers a webhook - if err := webhook_services.PrepareWebhooks(issue.Repo, webhook.HookEventIssueAssign, apiIssue); err != nil { + if err := webhook_services.PrepareWebhooks(webhook_services.SourceContext{Repository: issue.Repo}, webhook.HookEventIssueAssign, apiIssue); err != nil { log.Error("PrepareWebhooks [is_pull: %v, remove_assignee: %v]: %v", issue.IsPull, removed, err) return } @@ -193,7 +193,7 @@ func (m *webhookNotifier) NotifyIssueChangeTitle(doer *user_model.User, issue *m return } issue.PullRequest.Issue = issue - err = webhook_services.PrepareWebhooks(issue.Repo, webhook.HookEventPullRequest, &api.PullRequestPayload{ + err = webhook_services.PrepareWebhooks(webhook_services.SourceContext{Repository: issue.Repo}, webhook.HookEventPullRequest, &api.PullRequestPayload{ Action: api.HookIssueEdited, Index: issue.Index, Changes: &api.ChangesPayload{ @@ -206,7 +206,7 @@ func (m *webhookNotifier) NotifyIssueChangeTitle(doer *user_model.User, issue *m Sender: convert.ToUser(doer, nil), }) } else { - err = webhook_services.PrepareWebhooks(issue.Repo, webhook.HookEventIssues, &api.IssuePayload{ + err = webhook_services.PrepareWebhooks(webhook_services.SourceContext{Repository: issue.Repo}, webhook.HookEventIssues, &api.IssuePayload{ Action: api.HookIssueEdited, Index: issue.Index, Changes: &api.ChangesPayload{ @@ -245,7 +245,7 @@ func (m *webhookNotifier) NotifyIssueChangeStatus(doer *user_model.User, issue * } else { apiPullRequest.Action = api.HookIssueReOpened } - err = webhook_services.PrepareWebhooks(issue.Repo, webhook.HookEventPullRequest, apiPullRequest) + err = webhook_services.PrepareWebhooks(webhook_services.SourceContext{Repository: issue.Repo}, webhook.HookEventPullRequest, apiPullRequest) } else { apiIssue := &api.IssuePayload{ Index: issue.Index, @@ -258,7 +258,7 @@ func (m *webhookNotifier) NotifyIssueChangeStatus(doer *user_model.User, issue * } else { apiIssue.Action = api.HookIssueReOpened } - err = webhook_services.PrepareWebhooks(issue.Repo, webhook.HookEventIssues, apiIssue) + err = webhook_services.PrepareWebhooks(webhook_services.SourceContext{Repository: issue.Repo}, webhook.HookEventIssues, apiIssue) } if err != nil { log.Error("PrepareWebhooks [is_pull: %v, is_closed: %v]: %v", issue.IsPull, isClosed, err) @@ -276,7 +276,7 @@ func (m *webhookNotifier) NotifyNewIssue(issue *models.Issue, mentions []*user_m } mode, _ := models.AccessLevel(issue.Poster, issue.Repo) - if err := webhook_services.PrepareWebhooks(issue.Repo, webhook.HookEventIssues, &api.IssuePayload{ + if err := webhook_services.PrepareWebhooks(webhook_services.SourceContext{Repository: issue.Repo}, webhook.HookEventIssues, &api.IssuePayload{ Action: api.HookIssueOpened, Index: issue.Index, Issue: convert.ToAPIIssue(issue), @@ -302,7 +302,7 @@ func (m *webhookNotifier) NotifyNewPullRequest(pull *models.PullRequest, mention } mode, _ := models.AccessLevel(pull.Issue.Poster, pull.Issue.Repo) - if err := webhook_services.PrepareWebhooks(pull.Issue.Repo, webhook.HookEventPullRequest, &api.PullRequestPayload{ + if err := webhook_services.PrepareWebhooks(webhook_services.SourceContext{Repository: pull.Issue.Repo}, webhook.HookEventPullRequest, &api.PullRequestPayload{ Action: api.HookIssueOpened, Index: pull.Issue.Index, PullRequest: convert.ToAPIPullRequest(pull, nil), @@ -318,7 +318,7 @@ func (m *webhookNotifier) NotifyIssueChangeContent(doer *user_model.User, issue var err error if issue.IsPull { issue.PullRequest.Issue = issue - err = webhook_services.PrepareWebhooks(issue.Repo, webhook.HookEventPullRequest, &api.PullRequestPayload{ + err = webhook_services.PrepareWebhooks(webhook_services.SourceContext{Repository: issue.Repo}, webhook.HookEventPullRequest, &api.PullRequestPayload{ Action: api.HookIssueEdited, Index: issue.Index, Changes: &api.ChangesPayload{ @@ -331,7 +331,7 @@ func (m *webhookNotifier) NotifyIssueChangeContent(doer *user_model.User, issue Sender: convert.ToUser(doer, nil), }) } else { - err = webhook_services.PrepareWebhooks(issue.Repo, webhook.HookEventIssues, &api.IssuePayload{ + err = webhook_services.PrepareWebhooks(webhook_services.SourceContext{Repository: issue.Repo}, webhook.HookEventIssues, &api.IssuePayload{ Action: api.HookIssueEdited, Index: issue.Index, Changes: &api.ChangesPayload{ @@ -368,7 +368,7 @@ func (m *webhookNotifier) NotifyUpdateComment(doer *user_model.User, c *models.C mode, _ := models.AccessLevel(doer, c.Issue.Repo) if c.Issue.IsPull { - err = webhook_services.PrepareWebhooks(c.Issue.Repo, webhook.HookEventPullRequestComment, &api.IssueCommentPayload{ + err = webhook_services.PrepareWebhooks(webhook_services.SourceContext{Repository: c.Issue.Repo}, webhook.HookEventPullRequestComment, &api.IssueCommentPayload{ Action: api.HookIssueCommentEdited, Issue: convert.ToAPIIssue(c.Issue), Comment: convert.ToComment(c), @@ -382,7 +382,7 @@ func (m *webhookNotifier) NotifyUpdateComment(doer *user_model.User, c *models.C IsPull: true, }) } else { - err = webhook_services.PrepareWebhooks(c.Issue.Repo, webhook.HookEventIssueComment, &api.IssueCommentPayload{ + err = webhook_services.PrepareWebhooks(webhook_services.SourceContext{Repository: c.Issue.Repo}, webhook.HookEventIssueComment, &api.IssueCommentPayload{ Action: api.HookIssueCommentEdited, Issue: convert.ToAPIIssue(c.Issue), Comment: convert.ToComment(c), @@ -408,7 +408,7 @@ func (m *webhookNotifier) NotifyCreateIssueComment(doer *user_model.User, repo * var err error if issue.IsPull { - err = webhook_services.PrepareWebhooks(issue.Repo, webhook.HookEventPullRequestComment, &api.IssueCommentPayload{ + err = webhook_services.PrepareWebhooks(webhook_services.SourceContext{Repository: issue.Repo}, webhook.HookEventPullRequestComment, &api.IssueCommentPayload{ Action: api.HookIssueCommentCreated, Issue: convert.ToAPIIssue(issue), Comment: convert.ToComment(comment), @@ -417,7 +417,7 @@ func (m *webhookNotifier) NotifyCreateIssueComment(doer *user_model.User, repo * IsPull: true, }) } else { - err = webhook_services.PrepareWebhooks(issue.Repo, webhook.HookEventIssueComment, &api.IssueCommentPayload{ + err = webhook_services.PrepareWebhooks(webhook_services.SourceContext{Repository: issue.Repo}, webhook.HookEventIssueComment, &api.IssueCommentPayload{ Action: api.HookIssueCommentCreated, Issue: convert.ToAPIIssue(issue), Comment: convert.ToComment(comment), @@ -452,7 +452,7 @@ func (m *webhookNotifier) NotifyDeleteComment(doer *user_model.User, comment *mo mode, _ := models.AccessLevel(doer, comment.Issue.Repo) if comment.Issue.IsPull { - err = webhook_services.PrepareWebhooks(comment.Issue.Repo, webhook.HookEventPullRequestComment, &api.IssueCommentPayload{ + err = webhook_services.PrepareWebhooks(webhook_services.SourceContext{Repository: comment.Issue.Repo}, webhook.HookEventPullRequestComment, &api.IssueCommentPayload{ Action: api.HookIssueCommentDeleted, Issue: convert.ToAPIIssue(comment.Issue), Comment: convert.ToComment(comment), @@ -461,7 +461,7 @@ func (m *webhookNotifier) NotifyDeleteComment(doer *user_model.User, comment *mo IsPull: true, }) } else { - err = webhook_services.PrepareWebhooks(comment.Issue.Repo, webhook.HookEventIssueComment, &api.IssueCommentPayload{ + err = webhook_services.PrepareWebhooks(webhook_services.SourceContext{Repository: comment.Issue.Repo}, webhook.HookEventIssueComment, &api.IssueCommentPayload{ Action: api.HookIssueCommentDeleted, Issue: convert.ToAPIIssue(comment.Issue), Comment: convert.ToComment(comment), @@ -501,7 +501,7 @@ func (m *webhookNotifier) NotifyIssueChangeLabels(doer *user_model.User, issue * log.Error("LoadIssue: %v", err) return } - err = webhook_services.PrepareWebhooks(issue.Repo, webhook.HookEventPullRequestLabel, &api.PullRequestPayload{ + err = webhook_services.PrepareWebhooks(webhook_services.SourceContext{Repository: issue.Repo}, webhook.HookEventPullRequestLabel, &api.PullRequestPayload{ Action: api.HookIssueLabelUpdated, Index: issue.Index, PullRequest: convert.ToAPIPullRequest(issue.PullRequest, nil), @@ -509,7 +509,7 @@ func (m *webhookNotifier) NotifyIssueChangeLabels(doer *user_model.User, issue * Sender: convert.ToUser(doer, nil), }) } else { - err = webhook_services.PrepareWebhooks(issue.Repo, webhook.HookEventIssueLabel, &api.IssuePayload{ + err = webhook_services.PrepareWebhooks(webhook_services.SourceContext{Repository: issue.Repo}, webhook.HookEventIssueLabel, &api.IssuePayload{ Action: api.HookIssueLabelUpdated, Index: issue.Index, Issue: convert.ToAPIIssue(issue), @@ -543,7 +543,7 @@ func (m *webhookNotifier) NotifyIssueChangeMilestone(doer *user_model.User, issu log.Error("LoadIssue: %v", err) return } - err = webhook_services.PrepareWebhooks(issue.Repo, webhook.HookEventPullRequestMilestone, &api.PullRequestPayload{ + err = webhook_services.PrepareWebhooks(webhook_services.SourceContext{Repository: issue.Repo}, webhook.HookEventPullRequestMilestone, &api.PullRequestPayload{ Action: hookAction, Index: issue.Index, PullRequest: convert.ToAPIPullRequest(issue.PullRequest, nil), @@ -551,7 +551,7 @@ func (m *webhookNotifier) NotifyIssueChangeMilestone(doer *user_model.User, issu Sender: convert.ToUser(doer, nil), }) } else { - err = webhook_services.PrepareWebhooks(issue.Repo, webhook.HookEventIssueMilestone, &api.IssuePayload{ + err = webhook_services.PrepareWebhooks(webhook_services.SourceContext{Repository: issue.Repo}, webhook.HookEventIssueMilestone, &api.IssuePayload{ Action: hookAction, Index: issue.Index, Issue: convert.ToAPIIssue(issue), @@ -572,7 +572,7 @@ func (m *webhookNotifier) NotifyPushCommits(pusher *user_model.User, repo *model return } - if err := webhook_services.PrepareWebhooks(repo, webhook.HookEventPush, &api.PushPayload{ + if err := webhook_services.PrepareWebhooks(webhook_services.SourceContext{Repository: repo}, webhook.HookEventPush, &api.PushPayload{ Ref: opts.RefFullName, Before: opts.OldCommitID, After: opts.NewCommitID, @@ -619,7 +619,7 @@ func (*webhookNotifier) NotifyMergePullRequest(pr *models.PullRequest, doer *use Action: api.HookIssueClosed, } - err = webhook_services.PrepareWebhooks(pr.Issue.Repo, webhook.HookEventPullRequest, apiPullRequest) + err = webhook_services.PrepareWebhooks(webhook_services.SourceContext{Repository: pr.Issue.Repo}, webhook.HookEventPullRequest, apiPullRequest) if err != nil { log.Error("PrepareWebhooks: %v", err) } @@ -638,7 +638,7 @@ func (m *webhookNotifier) NotifyPullRequestChangeTargetBranch(doer *user_model.U } issue.PullRequest.Issue = issue mode, _ := models.AccessLevel(issue.Poster, issue.Repo) - err = webhook_services.PrepareWebhooks(issue.Repo, webhook.HookEventPullRequest, &api.PullRequestPayload{ + err = webhook_services.PrepareWebhooks(webhook_services.SourceContext{Repository: issue.Repo}, webhook.HookEventPullRequest, &api.PullRequestPayload{ Action: api.HookIssueEdited, Index: issue.Index, Changes: &api.ChangesPayload{ @@ -682,7 +682,7 @@ func (m *webhookNotifier) NotifyPullRequestReview(pr *models.PullRequest, review log.Error("models.AccessLevel: %v", err) return } - if err := webhook_services.PrepareWebhooks(review.Issue.Repo, reviewHookType, &api.PullRequestPayload{ + if err := webhook_services.PrepareWebhooks(webhook_services.SourceContext{Repository: review.Issue.Repo}, reviewHookType, &api.PullRequestPayload{ Action: api.HookIssueReviewed, Index: review.Issue.Index, PullRequest: convert.ToAPIPullRequest(pr, nil), @@ -716,7 +716,7 @@ func (m *webhookNotifier) NotifyCreateRef(pusher *user_model.User, repo *models. } gitRepo.Close() - if err = webhook_services.PrepareWebhooks(repo, webhook.HookEventCreate, &api.CreatePayload{ + if err = webhook_services.PrepareWebhooks(webhook_services.SourceContext{Repository: repo}, webhook.HookEventCreate, &api.CreatePayload{ Ref: refName, Sha: shaSum, RefType: refType, @@ -737,7 +737,7 @@ func (m *webhookNotifier) NotifyPullRequestSynchronized(doer *user_model.User, p return } - if err := webhook_services.PrepareWebhooks(pr.Issue.Repo, webhook.HookEventPullRequestSync, &api.PullRequestPayload{ + if err := webhook_services.PrepareWebhooks(webhook_services.SourceContext{Repository: pr.Issue.Repo}, webhook.HookEventPullRequestSync, &api.PullRequestPayload{ Action: api.HookIssueSynchronized, Index: pr.Issue.Index, PullRequest: convert.ToAPIPullRequest(pr, nil), @@ -753,7 +753,7 @@ func (m *webhookNotifier) NotifyDeleteRef(pusher *user_model.User, repo *models. apiRepo := convert.ToRepo(repo, perm.AccessModeNone) refName := git.RefEndName(refFullName) - if err := webhook_services.PrepareWebhooks(repo, webhook.HookEventDelete, &api.DeletePayload{ + if err := webhook_services.PrepareWebhooks(webhook_services.SourceContext{Repository: repo}, webhook.HookEventDelete, &api.DeletePayload{ Ref: refName, RefType: refType, PusherType: api.PusherTypeUser, @@ -771,7 +771,7 @@ func sendReleaseHook(doer *user_model.User, rel *models.Release, action api.Hook } mode, _ := models.AccessLevel(doer, rel.Repo) - if err := webhook_services.PrepareWebhooks(rel.Repo, webhook.HookEventRelease, &api.ReleasePayload{ + if err := webhook_services.PrepareWebhooks(webhook_services.SourceContext{Repository: rel.Repo}, webhook.HookEventRelease, &api.ReleasePayload{ Action: action, Release: convert.ToRelease(rel), Repository: convert.ToRepo(rel.Repo, mode), @@ -801,7 +801,7 @@ func (m *webhookNotifier) NotifySyncPushCommits(pusher *user_model.User, repo *m return } - if err := webhook_services.PrepareWebhooks(repo, webhook.HookEventPush, &api.PushPayload{ + if err := webhook_services.PrepareWebhooks(webhook_services.SourceContext{Repository: repo}, webhook.HookEventPush, &api.PushPayload{ Ref: opts.RefFullName, Before: opts.OldCommitID, After: opts.NewCommitID, diff --git a/routers/api/v1/repo/hook.go b/routers/api/v1/repo/hook.go index fdcaf5e389a3..e5f5c029dd39 100644 --- a/routers/api/v1/repo/hook.go +++ b/routers/api/v1/repo/hook.go @@ -156,7 +156,7 @@ func TestHook(ctx *context.APIContext) { commit := convert.ToPayloadCommit(ctx.Repo.Repository, ctx.Repo.Commit) - if err := webhook_service.PrepareWebhook(hook, ctx.Repo.Repository, webhook.HookEventPush, &api.PushPayload{ + if err := webhook_service.PrepareWebhook(hook, webhook.HookEventPush, &api.PushPayload{ Ref: git.BranchPrefix + ctx.Repo.Repository.DefaultBranch, Before: ctx.Repo.Commit.ID.String(), After: ctx.Repo.Commit.ID.String(), diff --git a/routers/api/v1/repo/hook_test.go b/routers/api/v1/repo/hook_test.go index 07f1532f82d6..fd9a165bf358 100644 --- a/routers/api/v1/repo/hook_test.go +++ b/routers/api/v1/repo/hook_test.go @@ -28,7 +28,6 @@ func TestTestHook(t *testing.T) { assert.EqualValues(t, http.StatusNoContent, ctx.Resp.Status()) unittest.AssertExistsAndLoadBean(t, &webhook.HookTask{ - RepoID: 1, HookID: 1, }, unittest.Cond("is_delivered=?", false)) } diff --git a/routers/web/repo/webhook.go b/routers/web/repo/webhook.go index 47d84136718d..5dfa2b727164 100644 --- a/routers/web/repo/webhook.go +++ b/routers/web/repo/webhook.go @@ -1185,7 +1185,7 @@ func TestWebhook(ctx *context.Context) { Pusher: apiUser, Sender: apiUser, } - if err := webhook_service.PrepareWebhook(w, ctx.Repo.Repository, webhook.HookEventPush, p); err != nil { + if err := webhook_service.PrepareWebhook(w, webhook.HookEventPush, p); err != nil { ctx.Flash.Error("PrepareWebhook: " + err.Error()) ctx.Status(500) } else { diff --git a/services/webhook/deliver.go b/services/webhook/deliver.go index 36169baad469..8913f135ab81 100644 --- a/services/webhook/deliver.go +++ b/services/webhook/deliver.go @@ -15,7 +15,6 @@ import ( "io" "net/http" "net/url" - "strconv" "strings" "sync" "time" @@ -43,7 +42,7 @@ func Deliver(t *webhook_model.HookTask) error { return } // There was a panic whilst delivering a hook... - log.Error("PANIC whilst trying to deliver webhook[%d] for repo[%d] to %s Panic: %v\nStacktrace: %s", t.ID, t.RepoID, w.URL, err, log.Stack(2)) + log.Error("PANIC whilst trying to deliver webhook[%d] to %s Panic: %v\nStacktrace: %s", t.ID, w.URL, err, log.Stack(2)) }() t.IsDelivered = true @@ -228,19 +227,13 @@ func DeliverHooks(ctx context.Context) { case <-ctx.Done(): hookQueue.Close() return - case repoIDStr := <-hookQueue.Queue(): - log.Trace("DeliverHooks [repo_id: %v]", repoIDStr) - hookQueue.Remove(repoIDStr) + case dummy := <-hookQueue.Queue(): + log.Trace("DeliverHooks") + hookQueue.Remove(dummy) - repoID, err := strconv.ParseInt(repoIDStr, 10, 64) + tasks, err := webhook_model.FindUndeliveredHookTasks() if err != nil { - log.Error("Invalid repo ID: %s", repoIDStr) - continue - } - - tasks, err := webhook_model.FindRepoUndeliveredHookTasks(repoID) - if err != nil { - log.Error("Get repository [%d] hook tasks: %v", repoID, err) + log.Error("Get hook tasks: %v", err) continue } for _, t := range tasks { diff --git a/services/webhook/webhook.go b/services/webhook/webhook.go index d4fe4e3bcca3..a773dcafe757 100644 --- a/services/webhook/webhook.go +++ b/services/webhook/webhook.go @@ -9,6 +9,7 @@ import ( "strings" "code.gitea.io/gitea/models" + user_model "code.gitea.io/gitea/models/user" webhook_model "code.gitea.io/gitea/models/webhook" "code.gitea.io/gitea/modules/git" "code.gitea.io/gitea/modules/log" @@ -99,12 +100,12 @@ func getPayloadBranch(p api.Payloader) string { } // PrepareWebhook adds special webhook to task queue for given payload. -func PrepareWebhook(w *webhook_model.Webhook, repo *models.Repository, event webhook_model.HookEventType, p api.Payloader) error { - if err := prepareWebhook(w, repo, event, p); err != nil { +func PrepareWebhook(w *webhook_model.Webhook, event webhook_model.HookEventType, p api.Payloader) error { + if err := prepareWebhook(w, event, p); err != nil { return err } - go hookQueue.Add(repo.ID) + go hookQueue.Add("dummy") return nil } @@ -123,7 +124,7 @@ func checkBranch(w *webhook_model.Webhook, branch string) bool { return g.Match(branch) } -func prepareWebhook(w *webhook_model.Webhook, repo *models.Repository, event webhook_model.HookEventType, p api.Payloader) error { +func prepareWebhook(w *webhook_model.Webhook, event webhook_model.HookEventType, p api.Payloader) error { // Skip sending if webhooks are disabled. if setting.DisableWebhooks { return nil @@ -169,7 +170,6 @@ func prepareWebhook(w *webhook_model.Webhook, repo *models.Repository, event web } if err = webhook_model.CreateHookTask(&webhook_model.HookTask{ - RepoID: repo.ID, HookID: w.ID, Payloader: payloader, EventType: event, @@ -179,34 +179,48 @@ func prepareWebhook(w *webhook_model.Webhook, repo *models.Repository, event web return nil } +// SourceContext represents the source of a webhook action. Repository and/or Owner must be set. +type SourceContext struct { + Repository *models.Repository + Owner *user_model.User +} + // PrepareWebhooks adds new webhooks to task queue for given payload. -func PrepareWebhooks(repo *models.Repository, event webhook_model.HookEventType, p api.Payloader) error { - if err := prepareWebhooks(repo, event, p); err != nil { +func PrepareWebhooks(ctx SourceContext, event webhook_model.HookEventType, p api.Payloader) error { + if err := prepareWebhooks(ctx, event, p); err != nil { return err } - go hookQueue.Add(repo.ID) + go hookQueue.Add("dummy") return nil } -func prepareWebhooks(repo *models.Repository, event webhook_model.HookEventType, p api.Payloader) error { - ws, err := webhook_model.ListWebhooksByOpts(&webhook_model.ListWebhookOptions{ - RepoID: repo.ID, - IsActive: util.OptionalBoolTrue, - }) - if err != nil { - return fmt.Errorf("GetActiveWebhooksByRepoID: %v", err) +func prepareWebhooks(ctx SourceContext, event webhook_model.HookEventType, p api.Payloader) error { + owner := ctx.Owner + + var ws []*webhook_model.Webhook + + if ctx.Repository != nil { + repoHooks, err := webhook_model.ListWebhooksByOpts(&webhook_model.ListWebhookOptions{ + RepoID: ctx.Repository.ID, + IsActive: util.OptionalBoolTrue, + }) + if err != nil { + return fmt.Errorf("ListWebhooksByOpts: %v", err) + } + ws = append(ws, repoHooks...) + + owner = ctx.Repository.MustOwner() } - // check if repo belongs to org and append additional webhooks - if repo.MustOwner().IsOrganization() { - // get hooks for org + // check if owner is an org and append additional webhooks + if owner.IsOrganization() { orgHooks, err := webhook_model.ListWebhooksByOpts(&webhook_model.ListWebhookOptions{ - OrgID: repo.OwnerID, + OrgID: owner.ID, IsActive: util.OptionalBoolTrue, }) if err != nil { - return fmt.Errorf("GetActiveWebhooksByOrgID: %v", err) + return fmt.Errorf("ListWebhooksByOpts: %v", err) } ws = append(ws, orgHooks...) } @@ -223,7 +237,7 @@ func prepareWebhooks(repo *models.Repository, event webhook_model.HookEventType, } for _, w := range ws { - if err = prepareWebhook(w, repo, event, p); err != nil { + if err = prepareWebhook(w, event, p); err != nil { return err } } diff --git a/services/webhook/webhook_test.go b/services/webhook/webhook_test.go index c3323327870f..5f90f40e7bff 100644 --- a/services/webhook/webhook_test.go +++ b/services/webhook/webhook_test.go @@ -32,12 +32,12 @@ func TestPrepareWebhooks(t *testing.T) { repo := unittest.AssertExistsAndLoadBean(t, &models.Repository{ID: 1}).(*models.Repository) hookTasks := []*webhook_model.HookTask{ - {RepoID: repo.ID, HookID: 1, EventType: webhook_model.HookEventPush}, + {HookID: 1, EventType: webhook_model.HookEventPush}, } for _, hookTask := range hookTasks { unittest.AssertNotExistsBean(t, hookTask) } - assert.NoError(t, PrepareWebhooks(repo, webhook_model.HookEventPush, &api.PushPayload{Commits: []*api.PayloadCommit{{}}})) + assert.NoError(t, PrepareWebhooks(SourceContext{Repository: repo}, webhook_model.HookEventPush, &api.PushPayload{Commits: []*api.PayloadCommit{{}}})) for _, hookTask := range hookTasks { unittest.AssertExistsAndLoadBean(t, hookTask) } @@ -48,13 +48,13 @@ func TestPrepareWebhooksBranchFilterMatch(t *testing.T) { repo := unittest.AssertExistsAndLoadBean(t, &models.Repository{ID: 2}).(*models.Repository) hookTasks := []*webhook_model.HookTask{ - {RepoID: repo.ID, HookID: 4, EventType: webhook_model.HookEventPush}, + {HookID: 4, EventType: webhook_model.HookEventPush}, } for _, hookTask := range hookTasks { unittest.AssertNotExistsBean(t, hookTask) } // this test also ensures that * doesn't handle / in any special way (like shell would) - assert.NoError(t, PrepareWebhooks(repo, webhook_model.HookEventPush, &api.PushPayload{Ref: "refs/heads/feature/7791", Commits: []*api.PayloadCommit{{}}})) + assert.NoError(t, PrepareWebhooks(SourceContext{Repository: repo}, webhook_model.HookEventPush, &api.PushPayload{Ref: "refs/heads/feature/7791", Commits: []*api.PayloadCommit{{}}})) for _, hookTask := range hookTasks { unittest.AssertExistsAndLoadBean(t, hookTask) } @@ -65,12 +65,12 @@ func TestPrepareWebhooksBranchFilterNoMatch(t *testing.T) { repo := unittest.AssertExistsAndLoadBean(t, &models.Repository{ID: 2}).(*models.Repository) hookTasks := []*webhook_model.HookTask{ - {RepoID: repo.ID, HookID: 4, EventType: webhook_model.HookEventPush}, + {HookID: 4, EventType: webhook_model.HookEventPush}, } for _, hookTask := range hookTasks { unittest.AssertNotExistsBean(t, hookTask) } - assert.NoError(t, PrepareWebhooks(repo, webhook_model.HookEventPush, &api.PushPayload{Ref: "refs/heads/fix_weird_bug"})) + assert.NoError(t, PrepareWebhooks(SourceContext{Repository: repo}, webhook_model.HookEventPush, &api.PushPayload{Ref: "refs/heads/fix_weird_bug"})) for _, hookTask := range hookTasks { unittest.AssertNotExistsBean(t, hookTask) From 09a677ceb989187ef52d86a502fb08ac87b0a835 Mon Sep 17 00:00:00 2001 From: KN4CK3R Date: Fri, 10 Dec 2021 11:50:05 +0000 Subject: [PATCH 02/11] Remove hook tasks when deleting repo. --- models/repo.go | 10 ++++++++++ models/webhook/webhook.go | 8 +++++--- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/models/repo.go b/models/repo.go index d25da4545328..6dadee284274 100644 --- a/models/repo.go +++ b/models/repo.go @@ -1508,6 +1508,16 @@ func DeleteRepository(doer *user_model.User, uid, repoID int64) error { return err } + hooks, err := webhook.ListWebhooksByOptsCtx(ctx, &webhook.ListWebhookOptions{RepoID: repoID}) + if err != nil { + return err + } + for _, hook := range hooks { + if err := deleteBeans(sess, &webhook.HookTask{HookID: hook.ID}); err != nil { + return err + } + } + if err := deleteBeans(sess, &Access{RepoID: repo.ID}, &Action{RepoID: repo.ID}, diff --git a/models/webhook/webhook.go b/models/webhook/webhook.go index d01f548eed41..59784ca6a7ce 100644 --- a/models/webhook/webhook.go +++ b/models/webhook/webhook.go @@ -429,8 +429,10 @@ func (opts *ListWebhookOptions) toCond() builder.Cond { return cond } -func listWebhooksByOpts(e db.Engine, opts *ListWebhookOptions) ([]*Webhook, error) { - sess := e.Where(opts.toCond()) +// ListWebhooksByOptsCtx return webhooks based on options +func ListWebhooksByOptsCtx(ctx context.Context, opts *ListWebhookOptions) ([]*Webhook, error) { + sess := db.GetEngine(ctx). + Where(opts.toCond()) if opts.Page != 0 { sess = db.SetSessionPagination(sess, opts) @@ -446,7 +448,7 @@ func listWebhooksByOpts(e db.Engine, opts *ListWebhookOptions) ([]*Webhook, erro // ListWebhooksByOpts return webhooks based on options func ListWebhooksByOpts(opts *ListWebhookOptions) ([]*Webhook, error) { - return listWebhooksByOpts(db.GetEngine(db.DefaultContext), opts) + return ListWebhooksByOptsCtx(db.DefaultContext, opts) } // CountWebhooksByOpts count webhooks based on options and ignore pagination From 252d1b22095706ba80740dc3343ed2fa10f0ba5f Mon Sep 17 00:00:00 2001 From: KN4CK3R Date: Sun, 12 Dec 2021 13:47:28 +0000 Subject: [PATCH 03/11] Revert removed column and mark as unused. --- models/migrations/migrations.go | 2 -- models/migrations/v204.go | 21 --------------------- models/webhook/hooktask.go | 1 + routers/api/v1/repo/hook.go | 2 +- routers/web/repo/webhook.go | 2 +- services/webhook/webhook.go | 26 ++++++++++++++++---------- 6 files changed, 19 insertions(+), 35 deletions(-) delete mode 100644 models/migrations/v204.go diff --git a/models/migrations/migrations.go b/models/migrations/migrations.go index 3003ec1c0bf3..a5bacd0d92bd 100644 --- a/models/migrations/migrations.go +++ b/models/migrations/migrations.go @@ -361,8 +361,6 @@ var migrations = []Migration{ NewMigration("Create key/value table for user settings", createUserSettingsTable), // v203 -> v204 NewMigration("Add Sorting to ProjectIssue table", addProjectIssueSorting), - // v204 -> v205 - NewMigration("Drop column repo_id from HookTask table", dropColumnRepoIDOnHookTask), } // GetCurrentDBVersion returns the current db version diff --git a/models/migrations/v204.go b/models/migrations/v204.go deleted file mode 100644 index ce01a4dc53a2..000000000000 --- a/models/migrations/v204.go +++ /dev/null @@ -1,21 +0,0 @@ -// Copyright 2019 The Gitea Authors. All rights reserved. -// Use of this source code is governed by a MIT-style -// license that can be found in the LICENSE file. - -package migrations - -import ( - "xorm.io/xorm" -) - -func dropColumnRepoIDOnHookTask(x *xorm.Engine) error { - sess := x.NewSession() - defer sess.Close() - if err := sess.Begin(); err != nil { - return err - } - if err := dropTableColumns(sess, "hook_task", "repo_id"); err != nil { - return err - } - return sess.Commit() -} diff --git a/models/webhook/hooktask.go b/models/webhook/hooktask.go index 6926dcc991d0..79b752c15ebc 100644 --- a/models/webhook/hooktask.go +++ b/models/webhook/hooktask.go @@ -100,6 +100,7 @@ type HookResponse struct { // HookTask represents a hook task. type HookTask struct { ID int64 `xorm:"pk autoincr"` + RepoID int64 `xorm:"INDEX"` // unused HookID int64 UUID string api.Payloader `xorm:"-"` diff --git a/routers/api/v1/repo/hook.go b/routers/api/v1/repo/hook.go index e5f5c029dd39..946d678730f3 100644 --- a/routers/api/v1/repo/hook.go +++ b/routers/api/v1/repo/hook.go @@ -156,7 +156,7 @@ func TestHook(ctx *context.APIContext) { commit := convert.ToPayloadCommit(ctx.Repo.Repository, ctx.Repo.Commit) - if err := webhook_service.PrepareWebhook(hook, webhook.HookEventPush, &api.PushPayload{ + if err := webhook_service.PrepareWebhook(webhook_service.SourceContext{Repository: ctx.Repo.Repository}, hook, webhook.HookEventPush, &api.PushPayload{ Ref: git.BranchPrefix + ctx.Repo.Repository.DefaultBranch, Before: ctx.Repo.Commit.ID.String(), After: ctx.Repo.Commit.ID.String(), diff --git a/routers/web/repo/webhook.go b/routers/web/repo/webhook.go index 5dfa2b727164..dc981cbb9a29 100644 --- a/routers/web/repo/webhook.go +++ b/routers/web/repo/webhook.go @@ -1185,7 +1185,7 @@ func TestWebhook(ctx *context.Context) { Pusher: apiUser, Sender: apiUser, } - if err := webhook_service.PrepareWebhook(w, webhook.HookEventPush, p); err != nil { + if err := webhook_service.PrepareWebhook(webhook_service.SourceContext{Repository: ctx.Repo.Repository}, w, webhook.HookEventPush, p); err != nil { ctx.Flash.Error("PrepareWebhook: " + err.Error()) ctx.Status(500) } else { diff --git a/services/webhook/webhook.go b/services/webhook/webhook.go index ae735f1e1f77..c33199b4ee84 100644 --- a/services/webhook/webhook.go +++ b/services/webhook/webhook.go @@ -99,9 +99,15 @@ func getPayloadBranch(p api.Payloader) string { return "" } +// SourceContext represents the source of a webhook action. Repository and/or Owner must be set. +type SourceContext struct { + Repository *repo_model.Repository + Owner *user_model.User +} + // PrepareWebhook adds special webhook to task queue for given payload. -func PrepareWebhook(w *webhook_model.Webhook, event webhook_model.HookEventType, p api.Payloader) error { - if err := prepareWebhook(w, event, p); err != nil { +func PrepareWebhook(ctx SourceContext, w *webhook_model.Webhook, event webhook_model.HookEventType, p api.Payloader) error { + if err := prepareWebhook(ctx, w, event, p); err != nil { return err } @@ -124,7 +130,7 @@ func checkBranch(w *webhook_model.Webhook, branch string) bool { return g.Match(branch) } -func prepareWebhook(w *webhook_model.Webhook, event webhook_model.HookEventType, p api.Payloader) error { +func prepareWebhook(ctx SourceContext, w *webhook_model.Webhook, event webhook_model.HookEventType, p api.Payloader) error { // Skip sending if webhooks are disabled. if setting.DisableWebhooks { return nil @@ -169,7 +175,13 @@ func prepareWebhook(w *webhook_model.Webhook, event webhook_model.HookEventType, payloader = p } + repoID := int64(0) + if ctx.Repository != nil { + repoID = ctx.Repository.ID + } + if err = webhook_model.CreateHookTask(&webhook_model.HookTask{ + RepoID: repoID, HookID: w.ID, Payloader: payloader, EventType: event, @@ -179,12 +191,6 @@ func prepareWebhook(w *webhook_model.Webhook, event webhook_model.HookEventType, return nil } -// SourceContext represents the source of a webhook action. Repository and/or Owner must be set. -type SourceContext struct { - Repository *repo_model.Repository - Owner *user_model.User -} - // PrepareWebhooks adds new webhooks to task queue for given payload. func PrepareWebhooks(ctx SourceContext, event webhook_model.HookEventType, p api.Payloader) error { if err := prepareWebhooks(ctx, event, p); err != nil { @@ -237,7 +243,7 @@ func prepareWebhooks(ctx SourceContext, event webhook_model.HookEventType, p api } for _, w := range ws { - if err = prepareWebhook(w, event, p); err != nil { + if err = prepareWebhook(ctx, w, event, p); err != nil { return err } } From 33e6b5840462f7a5bda6dd8a1577a15c9437c77e Mon Sep 17 00:00:00 2001 From: KN4CK3R Date: Thu, 6 Jan 2022 16:12:47 +0000 Subject: [PATCH 04/11] Removed RepoID from ReplayHookTask. --- models/webhook/hooktask.go | 11 +++-------- services/webhook/webhook.go | 5 ++--- 2 files changed, 5 insertions(+), 11 deletions(-) diff --git a/models/webhook/hooktask.go b/models/webhook/hooktask.go index 82f1cc0dd731..aee1e7f8df1c 100644 --- a/models/webhook/hooktask.go +++ b/models/webhook/hooktask.go @@ -191,10 +191,8 @@ func UpdateHookTask(t *HookTask) error { } // ReplayHookTask copies a hook task to get re-delivered -func ReplayHookTask(hookID int64, uuid string) (*HookTask, error) { - var newTask *HookTask - - err := db.WithTx(func(ctx context.Context) error { +func ReplayHookTask(hookID int64, uuid string) error { + return db.WithTx(func(ctx context.Context) error { task := &HookTask{ HookID: hookID, UUID: uuid, @@ -209,17 +207,14 @@ func ReplayHookTask(hookID int64, uuid string) (*HookTask, error) { } } - newTask = &HookTask{ + newTask := &HookTask{ UUID: gouuid.New().String(), - RepoID: task.RepoID, HookID: task.HookID, PayloadContent: task.PayloadContent, EventType: task.EventType, } return db.Insert(ctx, newTask) }) - - return newTask, err } // FindUndeliveredHookTasks represents find the undelivered hook tasks diff --git a/services/webhook/webhook.go b/services/webhook/webhook.go index 4f0d38460f0f..207a3eaf93a7 100644 --- a/services/webhook/webhook.go +++ b/services/webhook/webhook.go @@ -252,12 +252,11 @@ func prepareWebhooks(ctx SourceContext, event webhook_model.HookEventType, p api // ReplayHookTask replays a webhook task func ReplayHookTask(w *webhook_model.Webhook, uuid string) error { - t, err := webhook_model.ReplayHookTask(w.ID, uuid) - if err != nil { + if err := webhook_model.ReplayHookTask(w.ID, uuid); err != nil { return err } - go hookQueue.Add(t.RepoID) + go hookQueue.Add("dummy") return nil } From b05d84f5d63234e804fb00bcacb99bb399772f95 Mon Sep 17 00:00:00 2001 From: KN4CK3R Date: Thu, 31 Mar 2022 21:38:35 +0000 Subject: [PATCH 05/11] Renamed struct. --- modules/notification/webhook/webhook.go | 80 ++++++++++++------------- routers/api/v1/repo/hook.go | 2 +- routers/web/repo/webhook.go | 2 +- services/webhook/webhook.go | 30 +++++----- services/webhook/webhook_test.go | 6 +- 5 files changed, 60 insertions(+), 60 deletions(-) diff --git a/modules/notification/webhook/webhook.go b/modules/notification/webhook/webhook.go index c456da64ed73..9487d0770f66 100644 --- a/modules/notification/webhook/webhook.go +++ b/modules/notification/webhook/webhook.go @@ -59,7 +59,7 @@ func (m *webhookNotifier) NotifyIssueClearLabels(doer *user_model.User, issue *m return } - err = webhook_services.PrepareWebhooks(webhook_services.SourceContext{Repository: issue.Repo}, webhook.HookEventPullRequestLabel, &api.PullRequestPayload{ + err = webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: issue.Repo}, webhook.HookEventPullRequestLabel, &api.PullRequestPayload{ Action: api.HookIssueLabelCleared, Index: issue.Index, PullRequest: convert.ToAPIPullRequest(ctx, issue.PullRequest, nil), @@ -67,7 +67,7 @@ func (m *webhookNotifier) NotifyIssueClearLabels(doer *user_model.User, issue *m Sender: convert.ToUser(doer, nil), }) } else { - err = webhook_services.PrepareWebhooks(webhook_services.SourceContext{Repository: issue.Repo}, webhook.HookEventIssueLabel, &api.IssuePayload{ + err = webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: issue.Repo}, webhook.HookEventIssueLabel, &api.IssuePayload{ Action: api.HookIssueLabelCleared, Index: issue.Index, Issue: convert.ToAPIIssue(issue), @@ -85,7 +85,7 @@ func (m *webhookNotifier) NotifyForkRepository(doer *user_model.User, oldRepo, r mode, _ := models.AccessLevel(doer, repo) // forked webhook - if err := webhook_services.PrepareWebhooks(webhook_services.SourceContext{Repository: oldRepo}, webhook.HookEventFork, &api.ForkPayload{ + if err := webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: oldRepo}, webhook.HookEventFork, &api.ForkPayload{ Forkee: convert.ToRepo(oldRepo, oldMode), Repo: convert.ToRepo(repo, mode), Sender: convert.ToUser(doer, nil), @@ -97,7 +97,7 @@ func (m *webhookNotifier) NotifyForkRepository(doer *user_model.User, oldRepo, r // Add to hook queue for created repo after session commit. if u.IsOrganization() { - if err := webhook_services.PrepareWebhooks(webhook_services.SourceContext{Repository: repo}, webhook.HookEventRepository, &api.RepositoryPayload{ + if err := webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: repo}, webhook.HookEventRepository, &api.RepositoryPayload{ Action: api.HookRepoCreated, Repository: convert.ToRepo(repo, perm.AccessModeOwner), Organization: convert.ToUser(u, nil), @@ -110,7 +110,7 @@ func (m *webhookNotifier) NotifyForkRepository(doer *user_model.User, oldRepo, r func (m *webhookNotifier) NotifyCreateRepository(doer, u *user_model.User, repo *repo_model.Repository) { // Add to hook queue for created repo after session commit. - if err := webhook_services.PrepareWebhooks(webhook_services.SourceContext{Repository: repo}, webhook.HookEventRepository, &api.RepositoryPayload{ + if err := webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: repo}, webhook.HookEventRepository, &api.RepositoryPayload{ Action: api.HookRepoCreated, Repository: convert.ToRepo(repo, perm.AccessModeOwner), Organization: convert.ToUser(u, nil), @@ -123,7 +123,7 @@ func (m *webhookNotifier) NotifyCreateRepository(doer, u *user_model.User, repo func (m *webhookNotifier) NotifyDeleteRepository(doer *user_model.User, repo *repo_model.Repository) { u := repo.MustOwner() - if err := webhook_services.PrepareWebhooks(webhook_services.SourceContext{Repository: repo}, webhook.HookEventRepository, &api.RepositoryPayload{ + if err := webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: repo}, webhook.HookEventRepository, &api.RepositoryPayload{ Action: api.HookRepoDeleted, Repository: convert.ToRepo(repo, perm.AccessModeOwner), Organization: convert.ToUser(u, nil), @@ -135,7 +135,7 @@ func (m *webhookNotifier) NotifyDeleteRepository(doer *user_model.User, repo *re func (m *webhookNotifier) NotifyMigrateRepository(doer, u *user_model.User, repo *repo_model.Repository) { // Add to hook queue for created repo after session commit. - if err := webhook_services.PrepareWebhooks(webhook_services.SourceContext{Repository: repo}, webhook.HookEventRepository, &api.RepositoryPayload{ + if err := webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: repo}, webhook.HookEventRepository, &api.RepositoryPayload{ Action: api.HookRepoCreated, Repository: convert.ToRepo(repo, perm.AccessModeOwner), Organization: convert.ToUser(u, nil), @@ -169,7 +169,7 @@ func (m *webhookNotifier) NotifyIssueChangeAssignee(doer *user_model.User, issue apiPullRequest.Action = api.HookIssueAssigned } // Assignee comment triggers a webhook - if err := webhook_services.PrepareWebhooks(webhook_services.SourceContext{Repository: issue.Repo}, webhook.HookEventPullRequestAssign, apiPullRequest); err != nil { + if err := webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: issue.Repo}, webhook.HookEventPullRequestAssign, apiPullRequest); err != nil { log.Error("PrepareWebhooks [is_pull: %v, remove_assignee: %v]: %v", issue.IsPull, removed, err) return } @@ -187,7 +187,7 @@ func (m *webhookNotifier) NotifyIssueChangeAssignee(doer *user_model.User, issue apiIssue.Action = api.HookIssueAssigned } // Assignee comment triggers a webhook - if err := webhook_services.PrepareWebhooks(webhook_services.SourceContext{Repository: issue.Repo}, webhook.HookEventIssueAssign, apiIssue); err != nil { + if err := webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: issue.Repo}, webhook.HookEventIssueAssign, apiIssue); err != nil { log.Error("PrepareWebhooks [is_pull: %v, remove_assignee: %v]: %v", issue.IsPull, removed, err) return } @@ -206,7 +206,7 @@ func (m *webhookNotifier) NotifyIssueChangeTitle(doer *user_model.User, issue *m return } issue.PullRequest.Issue = issue - err = webhook_services.PrepareWebhooks(webhook_services.SourceContext{Repository: issue.Repo}, webhook.HookEventPullRequest, &api.PullRequestPayload{ + err = webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: issue.Repo}, webhook.HookEventPullRequest, &api.PullRequestPayload{ Action: api.HookIssueEdited, Index: issue.Index, Changes: &api.ChangesPayload{ @@ -219,7 +219,7 @@ func (m *webhookNotifier) NotifyIssueChangeTitle(doer *user_model.User, issue *m Sender: convert.ToUser(doer, nil), }) } else { - err = webhook_services.PrepareWebhooks(webhook_services.SourceContext{Repository: issue.Repo}, webhook.HookEventIssues, &api.IssuePayload{ + err = webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: issue.Repo}, webhook.HookEventIssues, &api.IssuePayload{ Action: api.HookIssueEdited, Index: issue.Index, Changes: &api.ChangesPayload{ @@ -261,7 +261,7 @@ func (m *webhookNotifier) NotifyIssueChangeStatus(doer *user_model.User, issue * } else { apiPullRequest.Action = api.HookIssueReOpened } - err = webhook_services.PrepareWebhooks(webhook_services.SourceContext{Repository: issue.Repo}, webhook.HookEventPullRequest, apiPullRequest) + err = webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: issue.Repo}, webhook.HookEventPullRequest, apiPullRequest) } else { apiIssue := &api.IssuePayload{ Index: issue.Index, @@ -274,7 +274,7 @@ func (m *webhookNotifier) NotifyIssueChangeStatus(doer *user_model.User, issue * } else { apiIssue.Action = api.HookIssueReOpened } - err = webhook_services.PrepareWebhooks(webhook_services.SourceContext{Repository: issue.Repo}, webhook.HookEventIssues, apiIssue) + err = webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: issue.Repo}, webhook.HookEventIssues, apiIssue) } if err != nil { log.Error("PrepareWebhooks [is_pull: %v, is_closed: %v]: %v", issue.IsPull, isClosed, err) @@ -292,7 +292,7 @@ func (m *webhookNotifier) NotifyNewIssue(issue *models.Issue, mentions []*user_m } mode, _ := models.AccessLevel(issue.Poster, issue.Repo) - if err := webhook_services.PrepareWebhooks(webhook_services.SourceContext{Repository: issue.Repo}, webhook.HookEventIssues, &api.IssuePayload{ + if err := webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: issue.Repo}, webhook.HookEventIssues, &api.IssuePayload{ Action: api.HookIssueOpened, Index: issue.Index, Issue: convert.ToAPIIssue(issue), @@ -321,7 +321,7 @@ func (m *webhookNotifier) NotifyNewPullRequest(pull *models.PullRequest, mention } mode, _ := models.AccessLevel(pull.Issue.Poster, pull.Issue.Repo) - if err := webhook_services.PrepareWebhooks(webhook_services.SourceContext{Repository: pull.Issue.Repo}, webhook.HookEventPullRequest, &api.PullRequestPayload{ + if err := webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: pull.Issue.Repo}, webhook.HookEventPullRequest, &api.PullRequestPayload{ Action: api.HookIssueOpened, Index: pull.Issue.Index, PullRequest: convert.ToAPIPullRequest(ctx, pull, nil), @@ -340,7 +340,7 @@ func (m *webhookNotifier) NotifyIssueChangeContent(doer *user_model.User, issue var err error if issue.IsPull { issue.PullRequest.Issue = issue - err = webhook_services.PrepareWebhooks(webhook_services.SourceContext{Repository: issue.Repo}, webhook.HookEventPullRequest, &api.PullRequestPayload{ + err = webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: issue.Repo}, webhook.HookEventPullRequest, &api.PullRequestPayload{ Action: api.HookIssueEdited, Index: issue.Index, Changes: &api.ChangesPayload{ @@ -353,7 +353,7 @@ func (m *webhookNotifier) NotifyIssueChangeContent(doer *user_model.User, issue Sender: convert.ToUser(doer, nil), }) } else { - err = webhook_services.PrepareWebhooks(webhook_services.SourceContext{Repository: issue.Repo}, webhook.HookEventIssues, &api.IssuePayload{ + err = webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: issue.Repo}, webhook.HookEventIssues, &api.IssuePayload{ Action: api.HookIssueEdited, Index: issue.Index, Changes: &api.ChangesPayload{ @@ -390,7 +390,7 @@ func (m *webhookNotifier) NotifyUpdateComment(doer *user_model.User, c *models.C mode, _ := models.AccessLevel(doer, c.Issue.Repo) if c.Issue.IsPull { - err = webhook_services.PrepareWebhooks(webhook_services.SourceContext{Repository: c.Issue.Repo}, webhook.HookEventPullRequestComment, &api.IssueCommentPayload{ + err = webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: c.Issue.Repo}, webhook.HookEventPullRequestComment, &api.IssueCommentPayload{ Action: api.HookIssueCommentEdited, Issue: convert.ToAPIIssue(c.Issue), Comment: convert.ToComment(c), @@ -404,7 +404,7 @@ func (m *webhookNotifier) NotifyUpdateComment(doer *user_model.User, c *models.C IsPull: true, }) } else { - err = webhook_services.PrepareWebhooks(webhook_services.SourceContext{Repository: c.Issue.Repo}, webhook.HookEventIssueComment, &api.IssueCommentPayload{ + err = webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: c.Issue.Repo}, webhook.HookEventIssueComment, &api.IssueCommentPayload{ Action: api.HookIssueCommentEdited, Issue: convert.ToAPIIssue(c.Issue), Comment: convert.ToComment(c), @@ -431,7 +431,7 @@ func (m *webhookNotifier) NotifyCreateIssueComment(doer *user_model.User, repo * var err error if issue.IsPull { - err = webhook_services.PrepareWebhooks(webhook_services.SourceContext{Repository: issue.Repo}, webhook.HookEventPullRequestComment, &api.IssueCommentPayload{ + err = webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: issue.Repo}, webhook.HookEventPullRequestComment, &api.IssueCommentPayload{ Action: api.HookIssueCommentCreated, Issue: convert.ToAPIIssue(issue), Comment: convert.ToComment(comment), @@ -440,7 +440,7 @@ func (m *webhookNotifier) NotifyCreateIssueComment(doer *user_model.User, repo * IsPull: true, }) } else { - err = webhook_services.PrepareWebhooks(webhook_services.SourceContext{Repository: issue.Repo}, webhook.HookEventIssueComment, &api.IssueCommentPayload{ + err = webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: issue.Repo}, webhook.HookEventIssueComment, &api.IssueCommentPayload{ Action: api.HookIssueCommentCreated, Issue: convert.ToAPIIssue(issue), Comment: convert.ToComment(comment), @@ -475,7 +475,7 @@ func (m *webhookNotifier) NotifyDeleteComment(doer *user_model.User, comment *mo mode, _ := models.AccessLevel(doer, comment.Issue.Repo) if comment.Issue.IsPull { - err = webhook_services.PrepareWebhooks(webhook_services.SourceContext{Repository: comment.Issue.Repo}, webhook.HookEventPullRequestComment, &api.IssueCommentPayload{ + err = webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: comment.Issue.Repo}, webhook.HookEventPullRequestComment, &api.IssueCommentPayload{ Action: api.HookIssueCommentDeleted, Issue: convert.ToAPIIssue(comment.Issue), Comment: convert.ToComment(comment), @@ -484,7 +484,7 @@ func (m *webhookNotifier) NotifyDeleteComment(doer *user_model.User, comment *mo IsPull: true, }) } else { - err = webhook_services.PrepareWebhooks(webhook_services.SourceContext{Repository: comment.Issue.Repo}, webhook.HookEventIssueComment, &api.IssueCommentPayload{ + err = webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: comment.Issue.Repo}, webhook.HookEventIssueComment, &api.IssueCommentPayload{ Action: api.HookIssueCommentDeleted, Issue: convert.ToAPIIssue(comment.Issue), Comment: convert.ToComment(comment), @@ -527,7 +527,7 @@ func (m *webhookNotifier) NotifyIssueChangeLabels(doer *user_model.User, issue * log.Error("LoadIssue: %v", err) return } - err = webhook_services.PrepareWebhooks(webhook_services.SourceContext{Repository: issue.Repo}, webhook.HookEventPullRequestLabel, &api.PullRequestPayload{ + err = webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: issue.Repo}, webhook.HookEventPullRequestLabel, &api.PullRequestPayload{ Action: api.HookIssueLabelUpdated, Index: issue.Index, PullRequest: convert.ToAPIPullRequest(ctx, issue.PullRequest, nil), @@ -535,7 +535,7 @@ func (m *webhookNotifier) NotifyIssueChangeLabels(doer *user_model.User, issue * Sender: convert.ToUser(doer, nil), }) } else { - err = webhook_services.PrepareWebhooks(webhook_services.SourceContext{Repository: issue.Repo}, webhook.HookEventIssueLabel, &api.IssuePayload{ + err = webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: issue.Repo}, webhook.HookEventIssueLabel, &api.IssuePayload{ Action: api.HookIssueLabelUpdated, Index: issue.Index, Issue: convert.ToAPIIssue(issue), @@ -572,7 +572,7 @@ func (m *webhookNotifier) NotifyIssueChangeMilestone(doer *user_model.User, issu log.Error("LoadIssue: %v", err) return } - err = webhook_services.PrepareWebhooks(webhook_services.SourceContext{Repository: issue.Repo}, webhook.HookEventPullRequestMilestone, &api.PullRequestPayload{ + err = webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: issue.Repo}, webhook.HookEventPullRequestMilestone, &api.PullRequestPayload{ Action: hookAction, Index: issue.Index, PullRequest: convert.ToAPIPullRequest(ctx, issue.PullRequest, nil), @@ -580,7 +580,7 @@ func (m *webhookNotifier) NotifyIssueChangeMilestone(doer *user_model.User, issu Sender: convert.ToUser(doer, nil), }) } else { - err = webhook_services.PrepareWebhooks(webhook_services.SourceContext{Repository: issue.Repo}, webhook.HookEventIssueMilestone, &api.IssuePayload{ + err = webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: issue.Repo}, webhook.HookEventIssueMilestone, &api.IssuePayload{ Action: hookAction, Index: issue.Index, Issue: convert.ToAPIIssue(issue), @@ -604,7 +604,7 @@ func (m *webhookNotifier) NotifyPushCommits(pusher *user_model.User, repo *repo_ return } - if err := webhook_services.PrepareWebhooks(webhook_services.SourceContext{Repository: repo}, webhook.HookEventPush, &api.PushPayload{ + if err := webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: repo}, webhook.HookEventPush, &api.PushPayload{ Ref: opts.RefFullName, Before: opts.OldCommitID, After: opts.NewCommitID, @@ -654,7 +654,7 @@ func (*webhookNotifier) NotifyMergePullRequest(pr *models.PullRequest, doer *use Action: api.HookIssueClosed, } - err = webhook_services.PrepareWebhooks(webhook_services.SourceContext{Repository: pr.Issue.Repo}, webhook.HookEventPullRequest, apiPullRequest) + err = webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: pr.Issue.Repo}, webhook.HookEventPullRequest, apiPullRequest) if err != nil { log.Error("PrepareWebhooks: %v", err) } @@ -676,7 +676,7 @@ func (m *webhookNotifier) NotifyPullRequestChangeTargetBranch(doer *user_model.U } issue.PullRequest.Issue = issue mode, _ := models.AccessLevel(issue.Poster, issue.Repo) - err = webhook_services.PrepareWebhooks(webhook_services.SourceContext{Repository: issue.Repo}, webhook.HookEventPullRequest, &api.PullRequestPayload{ + err = webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: issue.Repo}, webhook.HookEventPullRequest, &api.PullRequestPayload{ Action: api.HookIssueEdited, Index: issue.Index, Changes: &api.ChangesPayload{ @@ -723,7 +723,7 @@ func (m *webhookNotifier) NotifyPullRequestReview(pr *models.PullRequest, review log.Error("models.AccessLevel: %v", err) return } - if err := webhook_services.PrepareWebhooks(webhook_services.SourceContext{Repository: review.Issue.Repo}, reviewHookType, &api.PullRequestPayload{ + if err := webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: review.Issue.Repo}, reviewHookType, &api.PullRequestPayload{ Action: api.HookIssueReviewed, Index: review.Issue.Index, PullRequest: convert.ToAPIPullRequest(ctx, pr, nil), @@ -743,7 +743,7 @@ func (m *webhookNotifier) NotifyCreateRef(pusher *user_model.User, repo *repo_mo apiRepo := convert.ToRepo(repo, perm.AccessModeNone) refName := git.RefEndName(refFullName) - if err := webhook_services.PrepareWebhooks(webhook_services.SourceContext{Repository: repo}, webhook.HookEventCreate, &api.CreatePayload{ + if err := webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: repo}, webhook.HookEventCreate, &api.CreatePayload{ Ref: refName, Sha: refID, RefType: refType, @@ -767,7 +767,7 @@ func (m *webhookNotifier) NotifyPullRequestSynchronized(doer *user_model.User, p return } - if err := webhook_services.PrepareWebhooks(webhook_services.SourceContext{Repository: pr.Issue.Repo}, webhook.HookEventPullRequestSync, &api.PullRequestPayload{ + if err := webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: pr.Issue.Repo}, webhook.HookEventPullRequestSync, &api.PullRequestPayload{ Action: api.HookIssueSynchronized, Index: pr.Issue.Index, PullRequest: convert.ToAPIPullRequest(ctx, pr, nil), @@ -783,7 +783,7 @@ func (m *webhookNotifier) NotifyDeleteRef(pusher *user_model.User, repo *repo_mo apiRepo := convert.ToRepo(repo, perm.AccessModeNone) refName := git.RefEndName(refFullName) - if err := webhook_services.PrepareWebhooks(webhook_services.SourceContext{Repository: repo}, webhook.HookEventDelete, &api.DeletePayload{ + if err := webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: repo}, webhook.HookEventDelete, &api.DeletePayload{ Ref: refName, RefType: refType, PusherType: api.PusherTypeUser, @@ -801,7 +801,7 @@ func sendReleaseHook(doer *user_model.User, rel *models.Release, action api.Hook } mode, _ := models.AccessLevel(doer, rel.Repo) - if err := webhook_services.PrepareWebhooks(webhook_services.SourceContext{Repository: rel.Repo}, webhook.HookEventRelease, &api.ReleasePayload{ + if err := webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: rel.Repo}, webhook.HookEventRelease, &api.ReleasePayload{ Action: action, Release: convert.ToRelease(rel), Repository: convert.ToRepo(rel.Repo, mode), @@ -834,7 +834,7 @@ func (m *webhookNotifier) NotifySyncPushCommits(pusher *user_model.User, repo *r return } - if err := webhook_services.PrepareWebhooks(webhook_services.SourceContext{Repository: repo}, webhook.HookEventPush, &api.PushPayload{ + if err := webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: repo}, webhook.HookEventPush, &api.PushPayload{ Ref: opts.RefFullName, Before: opts.OldCommitID, After: opts.NewCommitID, @@ -866,9 +866,9 @@ func (m *webhookNotifier) NotifyPackageDelete(doer *user_model.User, pd *package } func notifyPackage(sender *user_model.User, pd *packages_model.PackageDescriptor, action api.HookPackageAction) { - if pd.Repository == nil { - // TODO https://github.com/go-gitea/gitea/pull/17940 - return + source := webhook_services.EventSource{ + Repository: pd.Repository, + Owner: pd.Owner, } org := pd.Owner @@ -876,7 +876,7 @@ func notifyPackage(sender *user_model.User, pd *packages_model.PackageDescriptor org = nil } - if err := webhook_services.PrepareWebhooks(pd.Repository, webhook.HookEventPackage, &api.PackagePayload{ + if err := webhook_services.PrepareWebhooks(source, webhook.HookEventPackage, &api.PackagePayload{ Action: action, Repository: convert.ToRepo(pd.Repository, perm.AccessModeNone), Package: convert.ToPackage(pd), diff --git a/routers/api/v1/repo/hook.go b/routers/api/v1/repo/hook.go index 578f05995934..7b31b0f2a12d 100644 --- a/routers/api/v1/repo/hook.go +++ b/routers/api/v1/repo/hook.go @@ -156,7 +156,7 @@ func TestHook(ctx *context.APIContext) { commit := convert.ToPayloadCommit(ctx.Repo.Repository, ctx.Repo.Commit) - if err := webhook_service.PrepareWebhook(webhook_service.SourceContext{Repository: ctx.Repo.Repository}, hook, webhook.HookEventPush, &api.PushPayload{ + if err := webhook_service.PrepareWebhook(webhook_service.EventSource{Repository: ctx.Repo.Repository}, hook, webhook.HookEventPush, &api.PushPayload{ Ref: git.BranchPrefix + ctx.Repo.Repository.DefaultBranch, Before: ctx.Repo.Commit.ID.String(), After: ctx.Repo.Commit.ID.String(), diff --git a/routers/web/repo/webhook.go b/routers/web/repo/webhook.go index bce4fa8c80d8..0e6c59cd4e05 100644 --- a/routers/web/repo/webhook.go +++ b/routers/web/repo/webhook.go @@ -1284,7 +1284,7 @@ func TestWebhook(ctx *context.Context) { Pusher: apiUser, Sender: apiUser, } - if err := webhook_service.PrepareWebhook(webhook_service.SourceContext{Repository: ctx.Repo.Repository}, w, webhook.HookEventPush, p); err != nil { + if err := webhook_service.PrepareWebhook(webhook_service.EventSource{Repository: ctx.Repo.Repository}, w, webhook.HookEventPush, p); err != nil { ctx.Flash.Error("PrepareWebhook: " + err.Error()) ctx.Status(http.StatusInternalServerError) } else { diff --git a/services/webhook/webhook.go b/services/webhook/webhook.go index fdbab8b83fcb..aa76b7ecd67e 100644 --- a/services/webhook/webhook.go +++ b/services/webhook/webhook.go @@ -101,15 +101,15 @@ func getPayloadBranch(p api.Payloader) string { return "" } -// SourceContext represents the source of a webhook action. Repository and/or Owner must be set. -type SourceContext struct { +// EventSource represents the source of a webhook action. Repository and/or Owner must be set. +type EventSource struct { Repository *repo_model.Repository Owner *user_model.User } // PrepareWebhook adds special webhook to task queue for given payload. -func PrepareWebhook(ctx SourceContext, w *webhook_model.Webhook, event webhook_model.HookEventType, p api.Payloader) error { - if err := prepareWebhook(ctx, w, event, p); err != nil { +func PrepareWebhook(source EventSource, w *webhook_model.Webhook, event webhook_model.HookEventType, p api.Payloader) error { + if err := prepareWebhook(source, w, event, p); err != nil { return err } @@ -132,7 +132,7 @@ func checkBranch(w *webhook_model.Webhook, branch string) bool { return g.Match(branch) } -func prepareWebhook(ctx SourceContext, w *webhook_model.Webhook, event webhook_model.HookEventType, p api.Payloader) error { +func prepareWebhook(source EventSource, w *webhook_model.Webhook, event webhook_model.HookEventType, p api.Payloader) error { // Skip sending if webhooks are disabled. if setting.DisableWebhooks { return nil @@ -178,8 +178,8 @@ func prepareWebhook(ctx SourceContext, w *webhook_model.Webhook, event webhook_m } repoID := int64(0) - if ctx.Repository != nil { - repoID = ctx.Repository.ID + if source.Repository != nil { + repoID = source.Repository.ID } if err = webhook_model.CreateHookTask(&webhook_model.HookTask{ @@ -194,8 +194,8 @@ func prepareWebhook(ctx SourceContext, w *webhook_model.Webhook, event webhook_m } // PrepareWebhooks adds new webhooks to task queue for given payload. -func PrepareWebhooks(ctx SourceContext, event webhook_model.HookEventType, p api.Payloader) error { - if err := prepareWebhooks(ctx, event, p); err != nil { +func PrepareWebhooks(source EventSource, event webhook_model.HookEventType, p api.Payloader) error { + if err := prepareWebhooks(source, event, p); err != nil { return err } @@ -203,14 +203,14 @@ func PrepareWebhooks(ctx SourceContext, event webhook_model.HookEventType, p api return nil } -func prepareWebhooks(ctx SourceContext, event webhook_model.HookEventType, p api.Payloader) error { - owner := ctx.Owner +func prepareWebhooks(source EventSource, event webhook_model.HookEventType, p api.Payloader) error { + owner := source.Owner var ws []*webhook_model.Webhook - if ctx.Repository != nil { + if source.Repository != nil { repoHooks, err := webhook_model.ListWebhooksByOpts(&webhook_model.ListWebhookOptions{ - RepoID: ctx.Repository.ID, + RepoID: source.Repository.ID, IsActive: util.OptionalBoolTrue, }) if err != nil { @@ -218,7 +218,7 @@ func prepareWebhooks(ctx SourceContext, event webhook_model.HookEventType, p api } ws = append(ws, repoHooks...) - owner = ctx.Repository.MustOwner() + owner = source.Repository.MustOwner() } // check if owner is an org and append additional webhooks @@ -245,7 +245,7 @@ func prepareWebhooks(ctx SourceContext, event webhook_model.HookEventType, p api } for _, w := range ws { - if err = prepareWebhook(ctx, w, event, p); err != nil { + if err = prepareWebhook(source, w, event, p); err != nil { return err } } diff --git a/services/webhook/webhook_test.go b/services/webhook/webhook_test.go index 823b93fa2129..20450257a34d 100644 --- a/services/webhook/webhook_test.go +++ b/services/webhook/webhook_test.go @@ -37,7 +37,7 @@ func TestPrepareWebhooks(t *testing.T) { for _, hookTask := range hookTasks { unittest.AssertNotExistsBean(t, hookTask) } - assert.NoError(t, PrepareWebhooks(SourceContext{Repository: repo}, webhook_model.HookEventPush, &api.PushPayload{Commits: []*api.PayloadCommit{{}}})) + assert.NoError(t, PrepareWebhooks(EventSource{Repository: repo}, webhook_model.HookEventPush, &api.PushPayload{Commits: []*api.PayloadCommit{{}}})) for _, hookTask := range hookTasks { unittest.AssertExistsAndLoadBean(t, hookTask) } @@ -54,7 +54,7 @@ func TestPrepareWebhooksBranchFilterMatch(t *testing.T) { unittest.AssertNotExistsBean(t, hookTask) } // this test also ensures that * doesn't handle / in any special way (like shell would) - assert.NoError(t, PrepareWebhooks(SourceContext{Repository: repo}, webhook_model.HookEventPush, &api.PushPayload{Ref: "refs/heads/feature/7791", Commits: []*api.PayloadCommit{{}}})) + assert.NoError(t, PrepareWebhooks(EventSource{Repository: repo}, webhook_model.HookEventPush, &api.PushPayload{Ref: "refs/heads/feature/7791", Commits: []*api.PayloadCommit{{}}})) for _, hookTask := range hookTasks { unittest.AssertExistsAndLoadBean(t, hookTask) } @@ -70,7 +70,7 @@ func TestPrepareWebhooksBranchFilterNoMatch(t *testing.T) { for _, hookTask := range hookTasks { unittest.AssertNotExistsBean(t, hookTask) } - assert.NoError(t, PrepareWebhooks(SourceContext{Repository: repo}, webhook_model.HookEventPush, &api.PushPayload{Ref: "refs/heads/fix_weird_bug"})) + assert.NoError(t, PrepareWebhooks(EventSource{Repository: repo}, webhook_model.HookEventPush, &api.PushPayload{Ref: "refs/heads/fix_weird_bug"})) for _, hookTask := range hookTasks { unittest.AssertNotExistsBean(t, hookTask) From 3f8c5498892507bc1360ef2717da2b858fffa47d Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Thu, 12 May 2022 18:26:20 +0200 Subject: [PATCH 06/11] use builder and add to beens too --- models/repo.go | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/models/repo.go b/models/repo.go index 395c6a036833..dd29a35089e4 100644 --- a/models/repo.go +++ b/models/repo.go @@ -651,15 +651,10 @@ func DeleteRepository(doer *user_model.User, uid, repoID int64) error { return err } - hooks, err := webhook.ListWebhooksByOptsCtx(ctx, &webhook.ListWebhookOptions{RepoID: repoID}) - if err != nil { + if _, err := db.GetEngine(ctx).In("hook_id", builder.Select("id").From("webhook").Where(builder.Eq{"webhook.repo_id": repo.ID})). + Delete(&webhook.HookTask{}); err != nil { return err } - for _, hook := range hooks { - if err := db.DeleteBeans(ctx, &webhook.HookTask{HookID: hook.ID}); err != nil { - return err - } - } if err := db.DeleteBeans(ctx, &access_model.Access{RepoID: repo.ID}, @@ -684,6 +679,7 @@ func DeleteRepository(doer *user_model.User, uid, repoID int64) error { &Task{RepoID: repoID}, &repo_model.Watch{RepoID: repoID}, &webhook.Webhook{RepoID: repoID}, + &webhook.HookTask{RepoID: repoID}, ); err != nil { return fmt.Errorf("deleteBeans: %v", err) } From 8d5198b028665c86c055f35be3030c1bfef426cc Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Thu, 12 May 2022 18:28:19 +0200 Subject: [PATCH 07/11] jup --- models/repo.go | 1 - 1 file changed, 1 deletion(-) diff --git a/models/repo.go b/models/repo.go index dd29a35089e4..b6e8d6159215 100644 --- a/models/repo.go +++ b/models/repo.go @@ -679,7 +679,6 @@ func DeleteRepository(doer *user_model.User, uid, repoID int64) error { &Task{RepoID: repoID}, &repo_model.Watch{RepoID: repoID}, &webhook.Webhook{RepoID: repoID}, - &webhook.HookTask{RepoID: repoID}, ); err != nil { return fmt.Errorf("deleteBeans: %v", err) } From cd71a9d725aa4037a8bd1712dd44c29538e99463 Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Thu, 12 May 2022 18:33:49 +0200 Subject: [PATCH 08/11] Update models/webhook/hooktask.go --- models/webhook/hooktask.go | 1 - 1 file changed, 1 deletion(-) diff --git a/models/webhook/hooktask.go b/models/webhook/hooktask.go index 1cb89b001f3d..f24e2ac8d855 100644 --- a/models/webhook/hooktask.go +++ b/models/webhook/hooktask.go @@ -101,7 +101,6 @@ type HookResponse struct { // HookTask represents a hook task. type HookTask struct { ID int64 `xorm:"pk autoincr"` - RepoID int64 `xorm:"INDEX"` // unused HookID int64 UUID string api.Payloader `xorm:"-"` From 4011cbc1c755a83fb043ced1f1b9a544294aefa1 Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Thu, 12 May 2022 18:37:27 +0200 Subject: [PATCH 09/11] fix --- services/webhook/webhook.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/services/webhook/webhook.go b/services/webhook/webhook.go index 0fe065734800..65e368d7d7e3 100644 --- a/services/webhook/webhook.go +++ b/services/webhook/webhook.go @@ -200,13 +200,7 @@ func prepareWebhook(source EventSource, w *webhook_model.Webhook, event webhook_ payloader = p } - repoID := int64(0) - if source.Repository != nil { - repoID = source.Repository.ID - } - if err = webhook_model.CreateHookTask(&webhook_model.HookTask{ - RepoID: repoID, HookID: w.ID, Payloader: payloader, EventType: event, From fb453ec3460b339fefca4f12386d10fc14981fc9 Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Thu, 12 May 2022 18:43:08 +0200 Subject: [PATCH 10/11] fix lint --- services/webhook/deliver.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/services/webhook/deliver.go b/services/webhook/deliver.go index 27c9365d822d..7db6769646ca 100644 --- a/services/webhook/deliver.go +++ b/services/webhook/deliver.go @@ -257,7 +257,5 @@ func Init() error { } go graceful.GetManager().RunWithShutdownFns(hookQueue.Run) - triggerTaskProcessing() - - return nil + return triggerTaskProcessing() } From f454853948f7a97f1cc6ac816040e95e06b96108 Mon Sep 17 00:00:00 2001 From: KN4CK3R Date: Thu, 20 Oct 2022 16:24:23 +0000 Subject: [PATCH 11/11] Put individual hook tasks in queue. --- models/webhook/hooktask.go | 73 ++++++---- models/webhook/webhook.go | 10 +- models/webhook/webhook_test.go | 21 ++- modules/notification/webhook/webhook.go | 184 ++++++++++-------------- routers/api/v1/repo/hook.go | 2 +- routers/web/repo/webhook.go | 6 +- services/webhook/deliver.go | 16 ++- services/webhook/webhook.go | 67 ++++----- services/webhook/webhook_test.go | 7 +- 9 files changed, 187 insertions(+), 199 deletions(-) diff --git a/models/webhook/hooktask.go b/models/webhook/hooktask.go index cfb6d9e142b0..2b9b63c09bf5 100644 --- a/models/webhook/hooktask.go +++ b/models/webhook/hooktask.go @@ -177,14 +177,29 @@ func HookTasks(hookID int64, page int) ([]*HookTask, error) { // CreateHookTask creates a new hook task, // it handles conversion from Payload to PayloadContent. -func CreateHookTask(t *HookTask) error { +func CreateHookTask(ctx context.Context, t *HookTask) (*HookTask, error) { data, err := t.Payloader.JSONPayload() if err != nil { - return err + return nil, err } t.UUID = gouuid.New().String() t.PayloadContent = string(data) - return db.Insert(db.DefaultContext, t) + return t, db.Insert(ctx, t) +} + +func GetHookTaskByID(ctx context.Context, id int64) (*HookTask, error) { + t := &HookTask{} + + has, err := db.GetEngine(ctx).ID(id).Get(t) + if err != nil { + return nil, err + } + if !has { + return nil, ErrHookTaskNotExist{ + TaskID: id, + } + } + return t, nil } // UpdateHookTask updates information of hook task. @@ -194,39 +209,36 @@ func UpdateHookTask(t *HookTask) error { } // ReplayHookTask copies a hook task to get re-delivered -func ReplayHookTask(hookID int64, uuid string) error { - return db.WithTx(func(ctx context.Context) error { - task := &HookTask{ +func ReplayHookTask(ctx context.Context, hookID int64, uuid string) (*HookTask, error) { + task := &HookTask{ + HookID: hookID, + UUID: uuid, + } + has, err := db.GetByBean(ctx, task) + if err != nil { + return nil, err + } else if !has { + return nil, ErrHookTaskNotExist{ HookID: hookID, UUID: uuid, } - has, err := db.GetByBean(ctx, task) - if err != nil { - return err - } else if !has { - return ErrHookTaskNotExist{ - HookID: hookID, - UUID: uuid, - } - } + } - newTask := &HookTask{ - UUID: gouuid.New().String(), - HookID: task.HookID, - PayloadContent: task.PayloadContent, - EventType: task.EventType, - } - return db.Insert(ctx, newTask) - }) + newTask := &HookTask{ + UUID: gouuid.New().String(), + HookID: task.HookID, + PayloadContent: task.PayloadContent, + EventType: task.EventType, + } + return newTask, db.Insert(ctx, newTask) } // FindUndeliveredHookTasks represents find the undelivered hook tasks -func FindUndeliveredHookTasks() ([]*HookTask, error) { +func FindUndeliveredHookTasks(ctx context.Context) ([]*HookTask, error) { tasks := make([]*HookTask, 0, 10) - if err := db.GetEngine(db.DefaultContext).Where("is_delivered=?", false).Find(&tasks); err != nil { - return nil, err - } - return tasks, nil + return tasks, db.GetEngine(ctx). + Where("is_delivered=?", false). + Find(&tasks) } // CleanupHookTaskTable deletes rows from hook_task as needed. @@ -235,7 +247,7 @@ func CleanupHookTaskTable(ctx context.Context, cleanupType HookTaskCleanupType, if cleanupType == OlderThan { deleteOlderThan := time.Now().Add(-olderThan).UnixNano() - deletes, err := db.GetEngine(db.DefaultContext). + deletes, err := db.GetEngine(ctx). Where("is_delivered = ? and delivered < ?", true, deleteOlderThan). Delete(new(HookTask)) if err != nil { @@ -244,7 +256,8 @@ func CleanupHookTaskTable(ctx context.Context, cleanupType HookTaskCleanupType, log.Trace("Deleted %d rows from hook_task", deletes) } else if cleanupType == PerWebhook { hookIDs := make([]int64, 0, 10) - err := db.GetEngine(db.DefaultContext).Table("webhook"). + err := db.GetEngine(ctx). + Table("webhook"). Where("id > 0"). Cols("id"). Find(&hookIDs) diff --git a/models/webhook/webhook.go b/models/webhook/webhook.go index 83200a3d1c21..4551dcff5fb1 100644 --- a/models/webhook/webhook.go +++ b/models/webhook/webhook.go @@ -19,13 +19,6 @@ import ( "xorm.io/builder" ) -// __ __ ___. .__ __ -// / \ / \ ____\_ |__ | |__ ____ ____ | | __ -// \ \/\/ // __ \| __ \| | \ / _ \ / _ \| |/ / -// \ /\ ___/| \_\ \ Y ( <_> | <_> ) < -// \__/\ / \___ >___ /___| /\____/ \____/|__|_ \ -// \/ \/ \/ \/ \/ - // ErrWebhookNotExist represents a "WebhookNotExist" kind of error. type ErrWebhookNotExist struct { ID int64 @@ -47,6 +40,7 @@ func (err ErrWebhookNotExist) Unwrap() error { // ErrHookTaskNotExist represents a "HookTaskNotExist" kind of error. type ErrHookTaskNotExist struct { + TaskID int64 HookID int64 UUID string } @@ -58,7 +52,7 @@ func IsErrHookTaskNotExist(err error) bool { } func (err ErrHookTaskNotExist) Error() string { - return fmt.Sprintf("hook task does not exist [hook: %d, uuid: %s]", err.HookID, err.UUID) + return fmt.Sprintf("hook task does not exist [task: %d, hook: %d, uuid: %s]", err.TaskID, err.HookID, err.UUID) } func (err ErrHookTaskNotExist) Unwrap() error { diff --git a/models/webhook/webhook_test.go b/models/webhook/webhook_test.go index 695f11f0aee4..8c4838ebdc05 100644 --- a/models/webhook/webhook_test.go +++ b/models/webhook/webhook_test.go @@ -212,7 +212,8 @@ func TestCreateHookTask(t *testing.T) { Payloader: &api.PushPayload{}, } unittest.AssertNotExistsBean(t, hookTask) - assert.NoError(t, CreateHookTask(hookTask)) + _, err := CreateHookTask(db.DefaultContext, hookTask) + assert.NoError(t, err) unittest.AssertExistsAndLoadBean(t, hookTask) } @@ -237,7 +238,8 @@ func TestCleanupHookTaskTable_PerWebhook_DeletesDelivered(t *testing.T) { Delivered: time.Now().UnixNano(), } unittest.AssertNotExistsBean(t, hookTask) - assert.NoError(t, CreateHookTask(hookTask)) + _, err := CreateHookTask(db.DefaultContext, hookTask) + assert.NoError(t, err) unittest.AssertExistsAndLoadBean(t, hookTask) assert.NoError(t, CleanupHookTaskTable(context.Background(), PerWebhook, 168*time.Hour, 0)) @@ -252,7 +254,8 @@ func TestCleanupHookTaskTable_PerWebhook_LeavesUndelivered(t *testing.T) { IsDelivered: false, } unittest.AssertNotExistsBean(t, hookTask) - assert.NoError(t, CreateHookTask(hookTask)) + _, err := CreateHookTask(db.DefaultContext, hookTask) + assert.NoError(t, err) unittest.AssertExistsAndLoadBean(t, hookTask) assert.NoError(t, CleanupHookTaskTable(context.Background(), PerWebhook, 168*time.Hour, 0)) @@ -268,7 +271,8 @@ func TestCleanupHookTaskTable_PerWebhook_LeavesMostRecentTask(t *testing.T) { Delivered: time.Now().UnixNano(), } unittest.AssertNotExistsBean(t, hookTask) - assert.NoError(t, CreateHookTask(hookTask)) + _, err := CreateHookTask(db.DefaultContext, hookTask) + assert.NoError(t, err) unittest.AssertExistsAndLoadBean(t, hookTask) assert.NoError(t, CleanupHookTaskTable(context.Background(), PerWebhook, 168*time.Hour, 1)) @@ -284,7 +288,8 @@ func TestCleanupHookTaskTable_OlderThan_DeletesDelivered(t *testing.T) { Delivered: time.Now().AddDate(0, 0, -8).UnixNano(), } unittest.AssertNotExistsBean(t, hookTask) - assert.NoError(t, CreateHookTask(hookTask)) + _, err := CreateHookTask(db.DefaultContext, hookTask) + assert.NoError(t, err) unittest.AssertExistsAndLoadBean(t, hookTask) assert.NoError(t, CleanupHookTaskTable(context.Background(), OlderThan, 168*time.Hour, 0)) @@ -299,7 +304,8 @@ func TestCleanupHookTaskTable_OlderThan_LeavesUndelivered(t *testing.T) { IsDelivered: false, } unittest.AssertNotExistsBean(t, hookTask) - assert.NoError(t, CreateHookTask(hookTask)) + _, err := CreateHookTask(db.DefaultContext, hookTask) + assert.NoError(t, err) unittest.AssertExistsAndLoadBean(t, hookTask) assert.NoError(t, CleanupHookTaskTable(context.Background(), OlderThan, 168*time.Hour, 0)) @@ -315,7 +321,8 @@ func TestCleanupHookTaskTable_OlderThan_LeavesTaskEarlierThanAgeToDelete(t *test Delivered: time.Now().AddDate(0, 0, -6).UnixNano(), } unittest.AssertNotExistsBean(t, hookTask) - assert.NoError(t, CreateHookTask(hookTask)) + _, err := CreateHookTask(db.DefaultContext, hookTask) + assert.NoError(t, err) unittest.AssertExistsAndLoadBean(t, hookTask) assert.NoError(t, CleanupHookTaskTable(context.Background(), OlderThan, 168*time.Hour, 0)) diff --git a/modules/notification/webhook/webhook.go b/modules/notification/webhook/webhook.go index 265c37c7fba0..630b56598464 100644 --- a/modules/notification/webhook/webhook.go +++ b/modules/notification/webhook/webhook.go @@ -61,7 +61,7 @@ func (m *webhookNotifier) NotifyIssueClearLabels(doer *user_model.User, issue *i return } - err = webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: issue.Repo}, webhook.HookEventPullRequestLabel, &api.PullRequestPayload{ + err = webhook_services.PrepareWebhooks(ctx, webhook_services.EventSource{Repository: issue.Repo}, webhook.HookEventPullRequestLabel, &api.PullRequestPayload{ Action: api.HookIssueLabelCleared, Index: issue.Index, PullRequest: convert.ToAPIPullRequest(ctx, issue.PullRequest, nil), @@ -69,7 +69,7 @@ func (m *webhookNotifier) NotifyIssueClearLabels(doer *user_model.User, issue *i Sender: convert.ToUser(doer, nil), }) } else { - err = webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: issue.Repo}, webhook.HookEventIssueLabel, &api.IssuePayload{ + err = webhook_services.PrepareWebhooks(ctx, webhook_services.EventSource{Repository: issue.Repo}, webhook.HookEventIssueLabel, &api.IssuePayload{ Action: api.HookIssueLabelCleared, Index: issue.Index, Issue: convert.ToAPIIssue(issue), @@ -87,7 +87,7 @@ func (m *webhookNotifier) NotifyForkRepository(doer *user_model.User, oldRepo, r mode, _ := access_model.AccessLevel(doer, repo) // forked webhook - if err := webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: oldRepo}, webhook.HookEventFork, &api.ForkPayload{ + if err := webhook_services.PrepareWebhooks(db.DefaultContext, webhook_services.EventSource{Repository: oldRepo}, webhook.HookEventFork, &api.ForkPayload{ Forkee: convert.ToRepo(oldRepo, oldMode), Repo: convert.ToRepo(repo, mode), Sender: convert.ToUser(doer, nil), @@ -99,7 +99,7 @@ func (m *webhookNotifier) NotifyForkRepository(doer *user_model.User, oldRepo, r // Add to hook queue for created repo after session commit. if u.IsOrganization() { - if err := webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: repo}, webhook.HookEventRepository, &api.RepositoryPayload{ + if err := webhook_services.PrepareWebhooks(db.DefaultContext, webhook_services.EventSource{Repository: repo}, webhook.HookEventRepository, &api.RepositoryPayload{ Action: api.HookRepoCreated, Repository: convert.ToRepo(repo, perm.AccessModeOwner), Organization: convert.ToUser(u, nil), @@ -112,7 +112,7 @@ func (m *webhookNotifier) NotifyForkRepository(doer *user_model.User, oldRepo, r func (m *webhookNotifier) NotifyCreateRepository(doer, u *user_model.User, repo *repo_model.Repository) { // Add to hook queue for created repo after session commit. - if err := webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: repo}, webhook.HookEventRepository, &api.RepositoryPayload{ + if err := webhook_services.PrepareWebhooks(db.DefaultContext, webhook_services.EventSource{Repository: repo}, webhook.HookEventRepository, &api.RepositoryPayload{ Action: api.HookRepoCreated, Repository: convert.ToRepo(repo, perm.AccessModeOwner), Organization: convert.ToUser(u, nil), @@ -125,7 +125,7 @@ func (m *webhookNotifier) NotifyCreateRepository(doer, u *user_model.User, repo func (m *webhookNotifier) NotifyDeleteRepository(doer *user_model.User, repo *repo_model.Repository) { u := repo.MustOwner() - if err := webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: repo}, webhook.HookEventRepository, &api.RepositoryPayload{ + if err := webhook_services.PrepareWebhooks(db.DefaultContext, webhook_services.EventSource{Repository: repo}, webhook.HookEventRepository, &api.RepositoryPayload{ Action: api.HookRepoDeleted, Repository: convert.ToRepo(repo, perm.AccessModeOwner), Organization: convert.ToUser(u, nil), @@ -137,7 +137,7 @@ func (m *webhookNotifier) NotifyDeleteRepository(doer *user_model.User, repo *re func (m *webhookNotifier) NotifyMigrateRepository(doer, u *user_model.User, repo *repo_model.Repository) { // Add to hook queue for created repo after session commit. - if err := webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: repo}, webhook.HookEventRepository, &api.RepositoryPayload{ + if err := webhook_services.PrepareWebhooks(db.DefaultContext, webhook_services.EventSource{Repository: repo}, webhook.HookEventRepository, &api.RepositoryPayload{ Action: api.HookRepoCreated, Repository: convert.ToRepo(repo, perm.AccessModeOwner), Organization: convert.ToUser(u, nil), @@ -171,7 +171,7 @@ func (m *webhookNotifier) NotifyIssueChangeAssignee(doer *user_model.User, issue apiPullRequest.Action = api.HookIssueAssigned } // Assignee comment triggers a webhook - if err := webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: issue.Repo}, webhook.HookEventPullRequestAssign, apiPullRequest); err != nil { + if err := webhook_services.PrepareWebhooks(ctx, webhook_services.EventSource{Repository: issue.Repo}, webhook.HookEventPullRequestAssign, apiPullRequest); err != nil { log.Error("PrepareWebhooks [is_pull: %v, remove_assignee: %v]: %v", issue.IsPull, removed, err) return } @@ -189,7 +189,7 @@ func (m *webhookNotifier) NotifyIssueChangeAssignee(doer *user_model.User, issue apiIssue.Action = api.HookIssueAssigned } // Assignee comment triggers a webhook - if err := webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: issue.Repo}, webhook.HookEventIssueAssign, apiIssue); err != nil { + if err := webhook_services.PrepareWebhooks(ctx, webhook_services.EventSource{Repository: issue.Repo}, webhook.HookEventIssueAssign, apiIssue); err != nil { log.Error("PrepareWebhooks [is_pull: %v, remove_assignee: %v]: %v", issue.IsPull, removed, err) return } @@ -208,7 +208,7 @@ func (m *webhookNotifier) NotifyIssueChangeTitle(doer *user_model.User, issue *i return } issue.PullRequest.Issue = issue - err = webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: issue.Repo}, webhook.HookEventPullRequest, &api.PullRequestPayload{ + err = webhook_services.PrepareWebhooks(ctx, webhook_services.EventSource{Repository: issue.Repo}, webhook.HookEventPullRequest, &api.PullRequestPayload{ Action: api.HookIssueEdited, Index: issue.Index, Changes: &api.ChangesPayload{ @@ -221,7 +221,7 @@ func (m *webhookNotifier) NotifyIssueChangeTitle(doer *user_model.User, issue *i Sender: convert.ToUser(doer, nil), }) } else { - err = webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: issue.Repo}, webhook.HookEventIssues, &api.IssuePayload{ + err = webhook_services.PrepareWebhooks(ctx, webhook_services.EventSource{Repository: issue.Repo}, webhook.HookEventIssues, &api.IssuePayload{ Action: api.HookIssueEdited, Index: issue.Index, Changes: &api.ChangesPayload{ @@ -263,7 +263,7 @@ func (m *webhookNotifier) NotifyIssueChangeStatus(doer *user_model.User, issue * } else { apiPullRequest.Action = api.HookIssueReOpened } - err = webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: issue.Repo}, webhook.HookEventPullRequest, apiPullRequest) + err = webhook_services.PrepareWebhooks(ctx, webhook_services.EventSource{Repository: issue.Repo}, webhook.HookEventPullRequest, apiPullRequest) } else { apiIssue := &api.IssuePayload{ Index: issue.Index, @@ -276,7 +276,7 @@ func (m *webhookNotifier) NotifyIssueChangeStatus(doer *user_model.User, issue * } else { apiIssue.Action = api.HookIssueReOpened } - err = webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: issue.Repo}, webhook.HookEventIssues, apiIssue) + err = webhook_services.PrepareWebhooks(ctx, webhook_services.EventSource{Repository: issue.Repo}, webhook.HookEventIssues, apiIssue) } if err != nil { log.Error("PrepareWebhooks [is_pull: %v, is_closed: %v]: %v", issue.IsPull, isClosed, err) @@ -294,7 +294,7 @@ func (m *webhookNotifier) NotifyNewIssue(issue *issues_model.Issue, mentions []* } mode, _ := access_model.AccessLevel(issue.Poster, issue.Repo) - if err := webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: issue.Repo}, webhook.HookEventIssues, &api.IssuePayload{ + if err := webhook_services.PrepareWebhooks(db.DefaultContext, webhook_services.EventSource{Repository: issue.Repo}, webhook.HookEventIssues, &api.IssuePayload{ Action: api.HookIssueOpened, Index: issue.Index, Issue: convert.ToAPIIssue(issue), @@ -323,7 +323,7 @@ func (m *webhookNotifier) NotifyNewPullRequest(pull *issues_model.PullRequest, m } mode, _ := access_model.AccessLevel(pull.Issue.Poster, pull.Issue.Repo) - if err := webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: pull.Issue.Repo}, webhook.HookEventPullRequest, &api.PullRequestPayload{ + if err := webhook_services.PrepareWebhooks(ctx, webhook_services.EventSource{Repository: pull.Issue.Repo}, webhook.HookEventPullRequest, &api.PullRequestPayload{ Action: api.HookIssueOpened, Index: pull.Issue.Index, PullRequest: convert.ToAPIPullRequest(ctx, pull, nil), @@ -342,7 +342,7 @@ func (m *webhookNotifier) NotifyIssueChangeContent(doer *user_model.User, issue var err error if issue.IsPull { issue.PullRequest.Issue = issue - err = webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: issue.Repo}, webhook.HookEventPullRequest, &api.PullRequestPayload{ + err = webhook_services.PrepareWebhooks(ctx, webhook_services.EventSource{Repository: issue.Repo}, webhook.HookEventPullRequest, &api.PullRequestPayload{ Action: api.HookIssueEdited, Index: issue.Index, Changes: &api.ChangesPayload{ @@ -355,7 +355,7 @@ func (m *webhookNotifier) NotifyIssueChangeContent(doer *user_model.User, issue Sender: convert.ToUser(doer, nil), }) } else { - err = webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: issue.Repo}, webhook.HookEventIssues, &api.IssuePayload{ + err = webhook_services.PrepareWebhooks(ctx, webhook_services.EventSource{Repository: issue.Repo}, webhook.HookEventIssues, &api.IssuePayload{ Action: api.HookIssueEdited, Index: issue.Index, Changes: &api.ChangesPayload{ @@ -374,54 +374,41 @@ func (m *webhookNotifier) NotifyIssueChangeContent(doer *user_model.User, issue } func (m *webhookNotifier) NotifyUpdateComment(doer *user_model.User, c *issues_model.Comment, oldContent string) { - var err error - - if err = c.LoadPoster(); err != nil { + if err := c.LoadPoster(); err != nil { log.Error("LoadPoster: %v", err) return } - if err = c.LoadIssue(); err != nil { + if err := c.LoadIssue(); err != nil { log.Error("LoadIssue: %v", err) return } - if err = c.Issue.LoadAttributes(db.DefaultContext); err != nil { + if err := c.Issue.LoadAttributes(db.DefaultContext); err != nil { log.Error("LoadAttributes: %v", err) return } - mode, _ := access_model.AccessLevel(doer, c.Issue.Repo) + var eventType webhook.HookEventType if c.Issue.IsPull { - err = webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: c.Issue.Repo}, webhook.HookEventPullRequestComment, &api.IssueCommentPayload{ - Action: api.HookIssueCommentEdited, - Issue: convert.ToAPIIssue(c.Issue), - Comment: convert.ToComment(c), - Changes: &api.ChangesPayload{ - Body: &api.ChangesFromPayload{ - From: oldContent, - }, - }, - Repository: convert.ToRepo(c.Issue.Repo, mode), - Sender: convert.ToUser(doer, nil), - IsPull: true, - }) + eventType = webhook.HookEventPullRequestComment } else { - err = webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: c.Issue.Repo}, webhook.HookEventIssueComment, &api.IssueCommentPayload{ - Action: api.HookIssueCommentEdited, - Issue: convert.ToAPIIssue(c.Issue), - Comment: convert.ToComment(c), - Changes: &api.ChangesPayload{ - Body: &api.ChangesFromPayload{ - From: oldContent, - }, - }, - Repository: convert.ToRepo(c.Issue.Repo, mode), - Sender: convert.ToUser(doer, nil), - IsPull: false, - }) + eventType = webhook.HookEventIssueComment } - if err != nil { + mode, _ := access_model.AccessLevel(doer, c.Issue.Repo) + if err := webhook_services.PrepareWebhooks(db.DefaultContext, webhook_services.EventSource{Repository: c.Issue.Repo}, eventType, &api.IssueCommentPayload{ + Action: api.HookIssueCommentEdited, + Issue: convert.ToAPIIssue(c.Issue), + Comment: convert.ToComment(c), + Changes: &api.ChangesPayload{ + Body: &api.ChangesFromPayload{ + From: oldContent, + }, + }, + Repository: convert.ToRepo(c.Issue.Repo, mode), + Sender: convert.ToUser(doer, nil), + IsPull: c.Issue.IsPull, + }); err != nil { log.Error("PrepareWebhooks [comment_id: %d]: %v", c.ID, err) } } @@ -429,30 +416,22 @@ func (m *webhookNotifier) NotifyUpdateComment(doer *user_model.User, c *issues_m func (m *webhookNotifier) NotifyCreateIssueComment(doer *user_model.User, repo *repo_model.Repository, issue *issues_model.Issue, comment *issues_model.Comment, mentions []*user_model.User, ) { - mode, _ := access_model.AccessLevel(doer, repo) - - var err error + var eventType webhook.HookEventType if issue.IsPull { - err = webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: issue.Repo}, webhook.HookEventPullRequestComment, &api.IssueCommentPayload{ - Action: api.HookIssueCommentCreated, - Issue: convert.ToAPIIssue(issue), - Comment: convert.ToComment(comment), - Repository: convert.ToRepo(repo, mode), - Sender: convert.ToUser(doer, nil), - IsPull: true, - }) + eventType = webhook.HookEventPullRequestComment } else { - err = webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: issue.Repo}, webhook.HookEventIssueComment, &api.IssueCommentPayload{ - Action: api.HookIssueCommentCreated, - Issue: convert.ToAPIIssue(issue), - Comment: convert.ToComment(comment), - Repository: convert.ToRepo(repo, mode), - Sender: convert.ToUser(doer, nil), - IsPull: false, - }) + eventType = webhook.HookEventIssueComment } - if err != nil { + mode, _ := access_model.AccessLevel(doer, repo) + if err := webhook_services.PrepareWebhooks(db.DefaultContext, webhook_services.EventSource{Repository: issue.Repo}, eventType, &api.IssueCommentPayload{ + Action: api.HookIssueCommentCreated, + Issue: convert.ToAPIIssue(issue), + Comment: convert.ToComment(comment), + Repository: convert.ToRepo(repo, mode), + Sender: convert.ToUser(doer, nil), + IsPull: issue.IsPull, + }); err != nil { log.Error("PrepareWebhooks [comment_id: %d]: %v", comment.ID, err) } } @@ -474,36 +453,29 @@ func (m *webhookNotifier) NotifyDeleteComment(doer *user_model.User, comment *is return } - mode, _ := access_model.AccessLevel(doer, comment.Issue.Repo) - + var eventType webhook.HookEventType if comment.Issue.IsPull { - err = webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: comment.Issue.Repo}, webhook.HookEventPullRequestComment, &api.IssueCommentPayload{ - Action: api.HookIssueCommentDeleted, - Issue: convert.ToAPIIssue(comment.Issue), - Comment: convert.ToComment(comment), - Repository: convert.ToRepo(comment.Issue.Repo, mode), - Sender: convert.ToUser(doer, nil), - IsPull: true, - }) + eventType = webhook.HookEventPullRequestComment } else { - err = webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: comment.Issue.Repo}, webhook.HookEventIssueComment, &api.IssueCommentPayload{ - Action: api.HookIssueCommentDeleted, - Issue: convert.ToAPIIssue(comment.Issue), - Comment: convert.ToComment(comment), - Repository: convert.ToRepo(comment.Issue.Repo, mode), - Sender: convert.ToUser(doer, nil), - IsPull: false, - }) + eventType = webhook.HookEventIssueComment } - if err != nil { + mode, _ := access_model.AccessLevel(doer, comment.Issue.Repo) + if err := webhook_services.PrepareWebhooks(db.DefaultContext, webhook_services.EventSource{Repository: comment.Issue.Repo}, eventType, &api.IssueCommentPayload{ + Action: api.HookIssueCommentDeleted, + Issue: convert.ToAPIIssue(comment.Issue), + Comment: convert.ToComment(comment), + Repository: convert.ToRepo(comment.Issue.Repo, mode), + Sender: convert.ToUser(doer, nil), + IsPull: comment.Issue.IsPull, + }); err != nil { log.Error("PrepareWebhooks [comment_id: %d]: %v", comment.ID, err) } } func (m *webhookNotifier) NotifyNewWikiPage(doer *user_model.User, repo *repo_model.Repository, page, comment string) { // Add to hook queue for created wiki page. - if err := webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: repo}, webhook.HookEventWiki, &api.WikiPayload{ + if err := webhook_services.PrepareWebhooks(db.DefaultContext, webhook_services.EventSource{Repository: repo}, webhook.HookEventWiki, &api.WikiPayload{ Action: api.HookWikiCreated, Repository: convert.ToRepo(repo, perm.AccessModeOwner), Sender: convert.ToUser(doer, nil), @@ -516,7 +488,7 @@ func (m *webhookNotifier) NotifyNewWikiPage(doer *user_model.User, repo *repo_mo func (m *webhookNotifier) NotifyEditWikiPage(doer *user_model.User, repo *repo_model.Repository, page, comment string) { // Add to hook queue for edit wiki page. - if err := webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: repo}, webhook.HookEventWiki, &api.WikiPayload{ + if err := webhook_services.PrepareWebhooks(db.DefaultContext, webhook_services.EventSource{Repository: repo}, webhook.HookEventWiki, &api.WikiPayload{ Action: api.HookWikiEdited, Repository: convert.ToRepo(repo, perm.AccessModeOwner), Sender: convert.ToUser(doer, nil), @@ -529,7 +501,7 @@ func (m *webhookNotifier) NotifyEditWikiPage(doer *user_model.User, repo *repo_m func (m *webhookNotifier) NotifyDeleteWikiPage(doer *user_model.User, repo *repo_model.Repository, page string) { // Add to hook queue for edit wiki page. - if err := webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: repo}, webhook.HookEventWiki, &api.WikiPayload{ + if err := webhook_services.PrepareWebhooks(db.DefaultContext, webhook_services.EventSource{Repository: repo}, webhook.HookEventWiki, &api.WikiPayload{ Action: api.HookWikiDeleted, Repository: convert.ToRepo(repo, perm.AccessModeOwner), Sender: convert.ToUser(doer, nil), @@ -567,7 +539,7 @@ func (m *webhookNotifier) NotifyIssueChangeLabels(doer *user_model.User, issue * log.Error("LoadIssue: %v", err) return } - err = webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: issue.Repo}, webhook.HookEventPullRequestLabel, &api.PullRequestPayload{ + err = webhook_services.PrepareWebhooks(ctx, webhook_services.EventSource{Repository: issue.Repo}, webhook.HookEventPullRequestLabel, &api.PullRequestPayload{ Action: api.HookIssueLabelUpdated, Index: issue.Index, PullRequest: convert.ToAPIPullRequest(ctx, issue.PullRequest, nil), @@ -575,7 +547,7 @@ func (m *webhookNotifier) NotifyIssueChangeLabels(doer *user_model.User, issue * Sender: convert.ToUser(doer, nil), }) } else { - err = webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: issue.Repo}, webhook.HookEventIssueLabel, &api.IssuePayload{ + err = webhook_services.PrepareWebhooks(ctx, webhook_services.EventSource{Repository: issue.Repo}, webhook.HookEventIssueLabel, &api.IssuePayload{ Action: api.HookIssueLabelUpdated, Index: issue.Index, Issue: convert.ToAPIIssue(issue), @@ -612,7 +584,7 @@ func (m *webhookNotifier) NotifyIssueChangeMilestone(doer *user_model.User, issu log.Error("LoadIssue: %v", err) return } - err = webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: issue.Repo}, webhook.HookEventPullRequestMilestone, &api.PullRequestPayload{ + err = webhook_services.PrepareWebhooks(ctx, webhook_services.EventSource{Repository: issue.Repo}, webhook.HookEventPullRequestMilestone, &api.PullRequestPayload{ Action: hookAction, Index: issue.Index, PullRequest: convert.ToAPIPullRequest(ctx, issue.PullRequest, nil), @@ -620,7 +592,7 @@ func (m *webhookNotifier) NotifyIssueChangeMilestone(doer *user_model.User, issu Sender: convert.ToUser(doer, nil), }) } else { - err = webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: issue.Repo}, webhook.HookEventIssueMilestone, &api.IssuePayload{ + err = webhook_services.PrepareWebhooks(ctx, webhook_services.EventSource{Repository: issue.Repo}, webhook.HookEventIssueMilestone, &api.IssuePayload{ Action: hookAction, Index: issue.Index, Issue: convert.ToAPIIssue(issue), @@ -644,7 +616,7 @@ func (m *webhookNotifier) NotifyPushCommits(pusher *user_model.User, repo *repo_ return } - if err := webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: repo}, webhook.HookEventPush, &api.PushPayload{ + if err := webhook_services.PrepareWebhooks(ctx, webhook_services.EventSource{Repository: repo}, webhook.HookEventPush, &api.PushPayload{ Ref: opts.RefFullName, Before: opts.OldCommitID, After: opts.NewCommitID, @@ -695,7 +667,7 @@ func (*webhookNotifier) NotifyMergePullRequest(pr *issues_model.PullRequest, doe Action: api.HookIssueClosed, } - err = webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: pr.Issue.Repo}, webhook.HookEventPullRequest, apiPullRequest) + err = webhook_services.PrepareWebhooks(ctx, webhook_services.EventSource{Repository: pr.Issue.Repo}, webhook.HookEventPullRequest, apiPullRequest) if err != nil { log.Error("PrepareWebhooks: %v", err) } @@ -717,7 +689,7 @@ func (m *webhookNotifier) NotifyPullRequestChangeTargetBranch(doer *user_model.U } issue.PullRequest.Issue = issue mode, _ := access_model.AccessLevel(issue.Poster, issue.Repo) - err = webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: issue.Repo}, webhook.HookEventPullRequest, &api.PullRequestPayload{ + err = webhook_services.PrepareWebhooks(ctx, webhook_services.EventSource{Repository: issue.Repo}, webhook.HookEventPullRequest, &api.PullRequestPayload{ Action: api.HookIssueEdited, Index: issue.Index, Changes: &api.ChangesPayload{ @@ -764,7 +736,7 @@ func (m *webhookNotifier) NotifyPullRequestReview(pr *issues_model.PullRequest, log.Error("models.AccessLevel: %v", err) return } - if err := webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: review.Issue.Repo}, reviewHookType, &api.PullRequestPayload{ + if err := webhook_services.PrepareWebhooks(ctx, webhook_services.EventSource{Repository: review.Issue.Repo}, reviewHookType, &api.PullRequestPayload{ Action: api.HookIssueReviewed, Index: review.Issue.Index, PullRequest: convert.ToAPIPullRequest(ctx, pr, nil), @@ -784,7 +756,7 @@ func (m *webhookNotifier) NotifyCreateRef(pusher *user_model.User, repo *repo_mo apiRepo := convert.ToRepo(repo, perm.AccessModeNone) refName := git.RefEndName(refFullName) - if err := webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: repo}, webhook.HookEventCreate, &api.CreatePayload{ + if err := webhook_services.PrepareWebhooks(db.DefaultContext, webhook_services.EventSource{Repository: repo}, webhook.HookEventCreate, &api.CreatePayload{ Ref: refName, Sha: refID, RefType: refType, @@ -808,7 +780,7 @@ func (m *webhookNotifier) NotifyPullRequestSynchronized(doer *user_model.User, p return } - if err := webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: pr.Issue.Repo}, webhook.HookEventPullRequestSync, &api.PullRequestPayload{ + if err := webhook_services.PrepareWebhooks(ctx, webhook_services.EventSource{Repository: pr.Issue.Repo}, webhook.HookEventPullRequestSync, &api.PullRequestPayload{ Action: api.HookIssueSynchronized, Index: pr.Issue.Index, PullRequest: convert.ToAPIPullRequest(ctx, pr, nil), @@ -824,7 +796,7 @@ func (m *webhookNotifier) NotifyDeleteRef(pusher *user_model.User, repo *repo_mo apiRepo := convert.ToRepo(repo, perm.AccessModeNone) refName := git.RefEndName(refFullName) - if err := webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: repo}, webhook.HookEventDelete, &api.DeletePayload{ + if err := webhook_services.PrepareWebhooks(db.DefaultContext, webhook_services.EventSource{Repository: repo}, webhook.HookEventDelete, &api.DeletePayload{ Ref: refName, RefType: refType, PusherType: api.PusherTypeUser, @@ -842,7 +814,7 @@ func sendReleaseHook(doer *user_model.User, rel *repo_model.Release, action api. } mode, _ := access_model.AccessLevel(doer, rel.Repo) - if err := webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: rel.Repo}, webhook.HookEventRelease, &api.ReleasePayload{ + if err := webhook_services.PrepareWebhooks(db.DefaultContext, webhook_services.EventSource{Repository: rel.Repo}, webhook.HookEventRelease, &api.ReleasePayload{ Action: action, Release: convert.ToRelease(rel), Repository: convert.ToRepo(rel.Repo, mode), @@ -875,7 +847,7 @@ func (m *webhookNotifier) NotifySyncPushCommits(pusher *user_model.User, repo *r return } - if err := webhook_services.PrepareWebhooks(webhook_services.EventSource{Repository: repo}, webhook.HookEventPush, &api.PushPayload{ + if err := webhook_services.PrepareWebhooks(ctx, webhook_services.EventSource{Repository: repo}, webhook.HookEventPush, &api.PushPayload{ Ref: opts.RefFullName, Before: opts.OldCommitID, After: opts.NewCommitID, @@ -922,7 +894,7 @@ func notifyPackage(sender *user_model.User, pd *packages_model.PackageDescriptor return } - if err := webhook_services.PrepareWebhooks(source, webhook.HookEventPackage, &api.PackagePayload{ + if err := webhook_services.PrepareWebhooks(ctx, source, webhook.HookEventPackage, &api.PackagePayload{ Action: action, Package: apiPackage, Sender: convert.ToUser(sender, nil), diff --git a/routers/api/v1/repo/hook.go b/routers/api/v1/repo/hook.go index 8cfecb31ab44..5956fe9da92f 100644 --- a/routers/api/v1/repo/hook.go +++ b/routers/api/v1/repo/hook.go @@ -168,7 +168,7 @@ func TestHook(ctx *context.APIContext) { commit := convert.ToPayloadCommit(ctx.Repo.Repository, ctx.Repo.Commit) commitID := ctx.Repo.Commit.ID.String() - if err := webhook_service.PrepareWebhook(webhook_service.EventSource{Repository: ctx.Repo.Repository}, hook, webhook.HookEventPush, &api.PushPayload{ + if err := webhook_service.PrepareWebhook(ctx, hook, webhook.HookEventPush, &api.PushPayload{ Ref: ref, Before: commitID, After: commitID, diff --git a/routers/web/repo/webhook.go b/routers/web/repo/webhook.go index 29e22ba9e26b..ee980333b72f 100644 --- a/routers/web/repo/webhook.go +++ b/routers/web/repo/webhook.go @@ -633,7 +633,7 @@ func TestWebhook(ctx *context.Context) { hookID := ctx.ParamsInt64(":id") w, err := webhook.GetWebhookByRepoID(ctx.Repo.Repository.ID, hookID) if err != nil { - ctx.Flash.Error("GetWebhookByID: " + err.Error()) + ctx.Flash.Error("GetWebhookByRepoID: " + err.Error()) ctx.Status(http.StatusInternalServerError) return } @@ -679,7 +679,7 @@ func TestWebhook(ctx *context.Context) { Pusher: apiUser, Sender: apiUser, } - if err := webhook_service.PrepareWebhook(webhook_service.EventSource{Repository: ctx.Repo.Repository}, w, webhook.HookEventPush, p); err != nil { + if err := webhook_service.PrepareWebhook(ctx, w, webhook.HookEventPush, p); err != nil { ctx.Flash.Error("PrepareWebhook: " + err.Error()) ctx.Status(http.StatusInternalServerError) } else { @@ -697,7 +697,7 @@ func ReplayWebhook(ctx *context.Context) { return } - if err := webhook_service.ReplayHookTask(w, hookTaskUUID); err != nil { + if err := webhook_service.ReplayHookTask(ctx, w, hookTaskUUID); err != nil { if webhook.IsErrHookTaskNotExist(err) { ctx.NotFound("ReplayHookTask", nil) } else { diff --git a/services/webhook/deliver.go b/services/webhook/deliver.go index 7db6769646ca..74a69c297ca3 100644 --- a/services/webhook/deliver.go +++ b/services/webhook/deliver.go @@ -251,11 +251,23 @@ func Init() error { }, } - hookQueue = queue.CreateUniqueQueue("webhook_sender", handle, "") + hookQueue = queue.CreateUniqueQueue("webhook_sender", handle, int64(0)) if hookQueue == nil { return fmt.Errorf("Unable to create webhook_sender Queue") } go graceful.GetManager().RunWithShutdownFns(hookQueue.Run) - return triggerTaskProcessing() + tasks, err := webhook_model.FindUndeliveredHookTasks(graceful.GetManager().HammerContext()) + if err != nil { + log.Error("FindUndeliveredHookTasks failed: %v", err) + return err + } + + for _, task := range tasks { + if err := enqueueHookTask(task); err != nil { + log.Error("enqueueHookTask failed: %v", err) + } + } + + return nil } diff --git a/services/webhook/webhook.go b/services/webhook/webhook.go index 50b910f3c557..e877e16edaa3 100644 --- a/services/webhook/webhook.go +++ b/services/webhook/webhook.go @@ -9,7 +9,6 @@ import ( "fmt" "strings" - "code.gitea.io/gitea/models/db" repo_model "code.gitea.io/gitea/models/repo" user_model "code.gitea.io/gitea/models/user" webhook_model "code.gitea.io/gitea/models/webhook" @@ -110,38 +109,32 @@ type EventSource struct { Owner *user_model.User } -// handle delivers all undelivered tasks +// handle delivers hook tasks func handle(data ...queue.Data) []queue.Data { - tasks, err := webhook_model.FindUndeliveredHookTasks() - if err != nil { - log.Error("Get undelivered hook tasks: %v", err) - return nil - } - for _, t := range tasks { - if err = Deliver(graceful.GetManager().HammerContext(), t); err != nil { - log.Error("deliver: %v", err) + ctx := graceful.GetManager().HammerContext() + + for _, taskID := range data { + task, err := webhook_model.GetHookTaskByID(ctx, taskID.(int64)) + if err != nil { + log.Error("GetHookTaskByID failed: %v", err) + } else { + if err := Deliver(ctx, task); err != nil { + log.Error("webhook.Deliver failed: %v", err) + } } } + return nil } -func triggerTaskProcessing() error { - err := hookQueue.PushFunc("dummy", nil) +func enqueueHookTask(task *webhook_model.HookTask) error { + err := hookQueue.PushFunc(task.ID, nil) if err != nil && err != queue.ErrAlreadyInQueue { return err } return nil } -// PrepareWebhook adds special webhook to task queue for given payload. -func PrepareWebhook(source EventSource, w *webhook_model.Webhook, event webhook_model.HookEventType, p api.Payloader) error { - if err := prepareWebhook(source, w, event, p); err != nil { - return err - } - - return triggerTaskProcessing() -} - func checkBranch(w *webhook_model.Webhook, branch string) bool { if w.BranchFilter == "" || w.BranchFilter == "*" { return true @@ -157,7 +150,8 @@ func checkBranch(w *webhook_model.Webhook, branch string) bool { return g.Match(branch) } -func prepareWebhook(source EventSource, w *webhook_model.Webhook, event webhook_model.HookEventType, p api.Payloader) error { +// PrepareWebhook creates a hook task and enqueues it for processing +func PrepareWebhook(ctx context.Context, w *webhook_model.Webhook, event webhook_model.HookEventType, p api.Payloader) error { // Skip sending if webhooks are disabled. if setting.DisableWebhooks { return nil @@ -202,26 +196,20 @@ func prepareWebhook(source EventSource, w *webhook_model.Webhook, event webhook_ payloader = p } - if err = webhook_model.CreateHookTask(&webhook_model.HookTask{ + task, err := webhook_model.CreateHookTask(ctx, &webhook_model.HookTask{ HookID: w.ID, Payloader: payloader, EventType: event, - }); err != nil { + }) + if err != nil { return fmt.Errorf("CreateHookTask: %v", err) } - return nil -} - -// PrepareWebhooks adds new webhooks to task queue for given payload. -func PrepareWebhooks(source EventSource, event webhook_model.HookEventType, p api.Payloader) error { - if err := prepareWebhooks(db.DefaultContext, source, event, p); err != nil { - return err - } - return triggerTaskProcessing() + return enqueueHookTask(task) } -func prepareWebhooks(ctx context.Context, source EventSource, event webhook_model.HookEventType, p api.Payloader) error { +// PrepareWebhooks adds new webhooks to task queue for given payload. +func PrepareWebhooks(ctx context.Context, source EventSource, event webhook_model.HookEventType, p api.Payloader) error { owner := source.Owner var ws []*webhook_model.Webhook @@ -240,7 +228,7 @@ func prepareWebhooks(ctx context.Context, source EventSource, event webhook_mode } // check if owner is an org and append additional webhooks - if owner.IsOrganization() { + if owner != nil && owner.IsOrganization() { orgHooks, err := webhook_model.ListWebhooksByOpts(ctx, &webhook_model.ListWebhookOptions{ OrgID: owner.ID, IsActive: util.OptionalBoolTrue, @@ -263,7 +251,7 @@ func prepareWebhooks(ctx context.Context, source EventSource, event webhook_mode } for _, w := range ws { - if err = prepareWebhook(source, w, event, p); err != nil { + if err := PrepareWebhook(ctx, w, event, p); err != nil { return err } } @@ -271,10 +259,11 @@ func prepareWebhooks(ctx context.Context, source EventSource, event webhook_mode } // ReplayHookTask replays a webhook task -func ReplayHookTask(w *webhook_model.Webhook, uuid string) error { - if err := webhook_model.ReplayHookTask(w.ID, uuid); err != nil { +func ReplayHookTask(ctx context.Context, w *webhook_model.Webhook, uuid string) error { + task, err := webhook_model.ReplayHookTask(ctx, w.ID, uuid) + if err != nil { return err } - return triggerTaskProcessing() + return enqueueHookTask(task) } diff --git a/services/webhook/webhook_test.go b/services/webhook/webhook_test.go index d64654a0697c..8d44aa504ae2 100644 --- a/services/webhook/webhook_test.go +++ b/services/webhook/webhook_test.go @@ -7,6 +7,7 @@ package webhook import ( "testing" + "code.gitea.io/gitea/models/db" repo_model "code.gitea.io/gitea/models/repo" "code.gitea.io/gitea/models/unittest" webhook_model "code.gitea.io/gitea/models/webhook" @@ -37,7 +38,7 @@ func TestPrepareWebhooks(t *testing.T) { for _, hookTask := range hookTasks { unittest.AssertNotExistsBean(t, hookTask) } - assert.NoError(t, PrepareWebhooks(EventSource{Repository: repo}, webhook_model.HookEventPush, &api.PushPayload{Commits: []*api.PayloadCommit{{}}})) + assert.NoError(t, PrepareWebhooks(db.DefaultContext, EventSource{Repository: repo}, webhook_model.HookEventPush, &api.PushPayload{Commits: []*api.PayloadCommit{{}}})) for _, hookTask := range hookTasks { unittest.AssertExistsAndLoadBean(t, hookTask) } @@ -54,7 +55,7 @@ func TestPrepareWebhooksBranchFilterMatch(t *testing.T) { unittest.AssertNotExistsBean(t, hookTask) } // this test also ensures that * doesn't handle / in any special way (like shell would) - assert.NoError(t, PrepareWebhooks(EventSource{Repository: repo}, webhook_model.HookEventPush, &api.PushPayload{Ref: "refs/heads/feature/7791", Commits: []*api.PayloadCommit{{}}})) + assert.NoError(t, PrepareWebhooks(db.DefaultContext, EventSource{Repository: repo}, webhook_model.HookEventPush, &api.PushPayload{Ref: "refs/heads/feature/7791", Commits: []*api.PayloadCommit{{}}})) for _, hookTask := range hookTasks { unittest.AssertExistsAndLoadBean(t, hookTask) } @@ -70,7 +71,7 @@ func TestPrepareWebhooksBranchFilterNoMatch(t *testing.T) { for _, hookTask := range hookTasks { unittest.AssertNotExistsBean(t, hookTask) } - assert.NoError(t, PrepareWebhooks(EventSource{Repository: repo}, webhook_model.HookEventPush, &api.PushPayload{Ref: "refs/heads/fix_weird_bug"})) + assert.NoError(t, PrepareWebhooks(db.DefaultContext, EventSource{Repository: repo}, webhook_model.HookEventPush, &api.PushPayload{Ref: "refs/heads/fix_weird_bug"})) for _, hookTask := range hookTasks { unittest.AssertNotExistsBean(t, hookTask)