Skip to content

Commit

Permalink
Use transactions in jobs with multiple queries
Browse files Browse the repository at this point in the history
  • Loading branch information
NickyMateev committed Apr 3, 2019
1 parent 9641736 commit 7ce5537
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 91 deletions.
93 changes: 48 additions & 45 deletions job/pull_request_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package job

import (
"context"
"database/sql"
"fmt"
"github.com/NickyMateev/Reviewer/models"
"github.com/NickyMateev/Reviewer/storage"
Expand Down Expand Up @@ -63,65 +64,67 @@ func (prf *PullRequestFetcher) Run() {
log.Printf("(%d) pull request(s) fetched for project %v\n", len(pullRequests), projectName)

wg.Add(1)
go prf.fetchPullRequests(pullRequests, project.ID, projectName, &wg)
go prf.persistPullRequests(pullRequests, project.ID, projectName, &wg)
}
wg.Wait()
}

func (prf *PullRequestFetcher) fetchPullRequests(pullRequests []*github.PullRequest, projectID int64, projectName string, wg *sync.WaitGroup) {
func (prf *PullRequestFetcher) persistPullRequests(pullRequests []*github.PullRequest, projectID int64, projectName string, wg *sync.WaitGroup) {
defer wg.Done()
db := prf.storage.Get()
for _, pullRequest := range pullRequests {
exists, err := models.PullRequests(qm.Where("github_id = ?", pullRequest.GetID())).Exists(context.Background(), db)
if err != nil {
log.Printf("Error retrieving pull requests for %v: %v\n", projectName, err.Error())
continue
}
txErr := prf.storage.Transaction(context.Background(), func(context context.Context, tx *sql.Tx) error {
exists, err := models.PullRequests(qm.Where("github_id = ?", pullRequest.GetID())).Exists(context, tx)
if err != nil {
return fmt.Errorf("error retrieving pull requests for %v: %s", projectName, err)
}

if exists {
continue
}
if exists {
return nil
}

user, err := transformUser(pullRequest.GetUser(), db)
if err != nil {
log.Println("Unable to transform user:", err)
continue
}
user, err := transformUser(context, tx, pullRequest.GetUser())
if err != nil {
return fmt.Errorf("unable to transform user: %s", err)
}

log.Printf("Persisting new pull request: %q (%v)\n", pullRequest.GetTitle(), projectName)

pr := models.PullRequest{
Title: pullRequest.GetTitle(),
URL: pullRequest.GetHTMLURL(),
Number: int64(pullRequest.GetNumber()),
GithubID: pullRequest.GetID(),
UserID: user.ID,
ProjectID: projectID,
CreatedAt: time.Now(),
UpdatedAt: time.Now()}

err = pr.Insert(context.Background(), db, boil.Infer())
if err != nil {
log.Printf("Error persisting pull request %q (%v): %v\n", pr.Title, projectName, err.Error())
continue
}
log.Printf("Pull request %q successfully persisted (%v)\n", pr.Title, projectName)
log.Printf("Persisting new pull request: %q (%v)\n", pullRequest.GetTitle(), projectName)

reviewers := make([]*models.User, 0)
for _, reviewer := range pullRequest.RequestedReviewers {
rev, err := transformUser(reviewer, db)
pr := models.PullRequest{
Title: pullRequest.GetTitle(),
URL: pullRequest.GetHTMLURL(),
Number: int64(pullRequest.GetNumber()),
GithubID: pullRequest.GetID(),
UserID: user.ID,
ProjectID: projectID,
CreatedAt: time.Now(),
UpdatedAt: time.Now()}

err = pr.Insert(context, tx, boil.Infer())
if err != nil {
log.Println("Unable to transform reviewer user:", err)
continue
return err
}
log.Printf("Pull request %q successfully persisted (%v)\n", pr.Title, projectName)

reviewers = append(reviewers, rev)
}
if len(reviewers) > 0 {
err := pr.AddReviewers(context.Background(), db, false, reviewers...)
if err != nil {
log.Printf("Error persisting pull request reviewers %q (%v): %v\n", pr.Title, projectName, err.Error())
reviewers := make([]*models.User, 0)
for _, reviewer := range pullRequest.RequestedReviewers {
rev, err := transformUser(context, tx, reviewer)
if err != nil {
return fmt.Errorf("unable to transform reviewer user: %s", err)
}

reviewers = append(reviewers, rev)
}
if len(reviewers) > 0 {
err := pr.AddReviewers(context, tx, false, reviewers...)
if err != nil {
return err
}
}
return nil
})

if txErr != nil {
log.Printf("Unable to persist pull request %q: %s\n", pullRequest.Title, txErr)
}
}
}
82 changes: 42 additions & 40 deletions job/review_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package job

import (
"context"
"database/sql"
"fmt"
"github.com/NickyMateev/Reviewer/models"
"github.com/NickyMateev/Reviewer/storage"
"github.com/google/go-github/github"
Expand Down Expand Up @@ -42,68 +44,68 @@ func (rf *ReviewFetcher) Run() {
log.Printf("STARTING %v job", rf.Name())
defer log.Printf("FINISHED %v job", rf.Name())

db := rf.storage.Get()

pullRequests, err := models.PullRequests(qm.Load("Project"), qm.Load("Reviewers")).All(context.Background(), db)
pullRequests, err := models.PullRequests(qm.Load("Project"), qm.Load("Reviewers")).All(context.Background(), rf.storage.Get())
if err != nil {
log.Println("Unable to fetch pull requests:", err)
return
}

for _, pr := range pullRequests {
reviews, resp, err := rf.client.PullRequests.ListReviews(context.Background(), pr.R.Project.RepoOwner, pr.R.Project.RepoName, int(pr.Number), nil)
for _, pullRequest := range pullRequests {
reviews, resp, err := rf.client.PullRequests.ListReviews(context.Background(), pullRequest.R.Project.RepoOwner, pullRequest.R.Project.RepoName, int(pullRequest.Number), nil)
if err != nil || resp.StatusCode != http.StatusOK {
log.Println("Unable to fetch pull request reviews:", err)
continue
}
log.Printf("(%d) review(s) fetched for pull request %q\n", len(reviews), pr.Title)

reviewers := make([]*models.User, 0)
for _, review := range reviews {
user, err := transformUser(review.GetUser(), db)
if err != nil {
log.Println("Unable to transform reviewer user:", err)
continue
}
log.Printf("(%d) review(s) fetched for pull request %q\n", len(reviews), pullRequest.Title)

reviewers = append(reviewers, user)

if review.GetState() == approvedState {
exists, err := user.ApprovedPullRequests(qm.Where("pull_request_id = ?", pr.ID)).Exists(context.Background(), db)
txErr := rf.storage.Transaction(context.Background(), func(context context.Context, tx *sql.Tx) error {
reviewers := make([]*models.User, 0)
for _, review := range reviews {
user, err := transformUser(context, tx, review.GetUser())
if err != nil {
log.Println("Unable to check pull request activity record:", err)
continue
return fmt.Errorf("unable to transform reviewer user: %s", err)
}
if !exists {
err = user.AddApprovedPullRequests(context.Background(), db, false, pr)

reviewers = append(reviewers, user)

if review.GetState() == approvedState {
exists, err := user.ApprovedPullRequests(qm.Where("pull_request_id = ?", pullRequest.ID)).Exists(context, tx)
if err != nil {
log.Println("Unable to persist user approved pull request")
continue
return err
}
}
} else {
exists, err := user.CommentedPullRequests(qm.Where("pull_request_id = ?", pr.ID)).Exists(context.Background(), db)
if err != nil {
log.Println("Unable to check pull request activity record:", err)
continue
}
if !exists {
err = user.AddCommentedPullRequests(context.Background(), db, false, pr)
if !exists {
err := user.AddApprovedPullRequests(context, tx, false, pullRequest)
if err != nil {
return err
}
}
} else {
exists, err := user.CommentedPullRequests(qm.Where("pull_request_id = ?", pullRequest.ID)).Exists(context, tx)
if err != nil {
log.Println("Unable to persist user commented pull request")
continue
return err
}
if !exists {
err := user.AddCommentedPullRequests(context, tx, false, pullRequest)
if err != nil {
return err
}
}
}
}

idlers := rf.findIdlers(pullRequest.R.Reviewers, reviewers)
err = pullRequest.AddIdlers(context, tx, false, idlers...)
if err != nil {
log.Println("Unable to persist pull request activity record:", err)
return err
}
}

idlers := rf.findIdlers(pr.R.Reviewers, reviewers)
pr.AddIdlers(context.Background(), db, false, idlers...)
_, err = pullRequest.Update(context, tx, boil.Infer()) // updates the 'updated_at' column
return err
})

pr.Update(context.Background(), db, boil.Infer()) // updates the 'updated_at' column
if txErr != nil {
log.Printf("Unable to persist pull request %q activity records: %v\n", pullRequest.Title, txErr)
}
}
}

Expand Down
8 changes: 4 additions & 4 deletions job/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,21 @@ type SlackConfig struct {
BotToken string
}

func transformUser(githubUser *github.User, db *sql.DB) (*models.User, error) {
exists, err := models.Users(qm.Where("github_id = ?", githubUser.GetID())).Exists(context.Background(), db)
func transformUser(context context.Context, tx *sql.Tx, githubUser *github.User) (*models.User, error) {
exists, err := models.Users(qm.Where("github_id = ?", githubUser.GetID())).Exists(context, tx)
if err != nil {
return nil, fmt.Errorf("Error searching for user %q [%v]: %v\n", githubUser.GetLogin(), githubUser.GetID(), err.Error())
}

var user *models.User
if !exists {
user = &models.User{Username: githubUser.GetLogin(), GithubID: githubUser.GetID()}
err := user.Insert(context.Background(), db, boil.Infer())
err := user.Insert(context, tx, boil.Infer())
if err != nil {
return nil, fmt.Errorf("Error persisting user %q [%v]: %v\n", githubUser.GetLogin(), githubUser.GetID(), err.Error())
}
} else {
user, err = models.Users(qm.Where("github_id = ?", githubUser.GetID())).One(context.Background(), db)
user, err = models.Users(qm.Where("github_id = ?", githubUser.GetID())).One(context, tx)
if err != nil {
return nil, fmt.Errorf("Error retrieving user %q [%v]: %v\n", githubUser.GetLogin(), githubUser.GetID(), err.Error())
}
Expand Down
5 changes: 3 additions & 2 deletions storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,14 @@ func (ps *postgresStorage) Close() error {
func New(cfg Config) (Storage, error) {
db, err := sql.Open(cfg.Type, cfg.URI)
if err != nil {
return nil, err
return nil, fmt.Errorf("unable to open db connection: %s", err)
}

err = updateSchema(db, cfg.Type)
if err != nil {
return nil, err
return nil, fmt.Errorf("unable to update db schema: %s", err)
}

log.Println("Database is up-to-date")

return &postgresStorage{
Expand Down

0 comments on commit 7ce5537

Please sign in to comment.