From 62147e980f68b19984f9e3ca367e68bba41f74a2 Mon Sep 17 00:00:00 2001 From: NickyMateev Date: Tue, 2 Apr 2019 11:45:36 +0300 Subject: [PATCH 1/6] Refactor server bootstrap logic --- main.go | 7 +++---- server/server.go | 29 ++++++++++++++--------------- web/{utils.go => util.go} | 0 3 files changed, 17 insertions(+), 19 deletions(-) rename web/{utils.go => util.go} (100%) diff --git a/main.go b/main.go index b286e3b..188a43c 100644 --- a/main.go +++ b/main.go @@ -17,11 +17,10 @@ func main() { if err != nil { panic(err) } + defer db.Close() - srv, err := server.New(config.Server, db, github.NewClient(nil), config.Slack) - if err != nil { + srv := server.New(config.Server, db, github.NewClient(nil), config.Slack) + if err := srv.Run(); err != nil { panic(err) } - - srv.Run() } diff --git a/server/server.go b/server/server.go index b3c372f..9311c63 100644 --- a/server/server.go +++ b/server/server.go @@ -18,7 +18,7 @@ import ( type Server struct { Config Config DB *sql.DB - API web.API + Router *mux.Router JobContainer job.Container } @@ -29,41 +29,40 @@ type Config struct { } // New creates a new Server instance -func New(cfg Config, db *sql.DB, client *github.Client, slackConfig job.SlackConfig) (*Server, error) { +func New(cfg Config, db *sql.DB, client *github.Client, slackConfig job.SlackConfig) *Server { + defaultAPI := api.Default(db) + router := buildRouter(defaultAPI) + return &Server{ Config: cfg, DB: db, - API: api.Default(db), + Router: router, JobContainer: job.DefaultContainer(db, client, slackConfig), - }, nil + } } // Run runs the application server -func (s Server) Run() { - defer s.DB.Close() - - r := s.buildRouter() - +func (s *Server) Run() error { err := s.startJobs() if err != nil { - panic(err) + return err } server := http.Server{ - Handler: r, + Handler: s.Router, Addr: ":" + strconv.Itoa(s.Config.Port), ReadTimeout: s.Config.RequestTimeout, WriteTimeout: s.Config.RequestTimeout, } log.Println("Server listening on port:", s.Config.Port) - log.Fatal(server.ListenAndServe()) + return server.ListenAndServe() } -func (s Server) buildRouter() *mux.Router { +func buildRouter(api web.API) *mux.Router { router := mux.NewRouter().StrictSlash(true) - controllers := s.API.Controllers() + controllers := api.Controllers() for _, controller := range controllers { routes := controller.Routes() for _, route := range routes { @@ -74,7 +73,7 @@ func (s Server) buildRouter() *mux.Router { return router } -func (s Server) startJobs() error { +func (s *Server) startJobs() error { jobs := s.JobContainer.Jobs() scheduler := cron.New() diff --git a/web/utils.go b/web/util.go similarity index 100% rename from web/utils.go rename to web/util.go From d642b92bca527b3d95364676c689a2e3b29d7d06 Mon Sep 17 00:00:00 2001 From: NickyMateev Date: Tue, 2 Apr 2019 16:52:50 +0300 Subject: [PATCH 2/6] Introduce Storage interface --- api/api.go | 14 ++++++------ api/project/project.go | 6 ++--- api/project/project_controller.go | 11 +++++---- api/pullrequest/pullrequest.go | 6 ++--- api/pullrequest/pullrequest_controller.go | 7 +++--- api/user/user.go | 6 ++--- api/user/user_controllers.go | 13 ++++++----- job/idlers_reminder.go | 18 +++++++-------- job/job.go | 14 ++++++------ job/pull_request_fetcher.go | 24 +++++++++---------- job/review_fetcher.go | 28 +++++++++++------------ main.go | 6 ++--- server/server.go | 12 +++++----- storage/storage.go | 23 +++++++++++++++++-- 14 files changed, 105 insertions(+), 83 deletions(-) diff --git a/api/api.go b/api/api.go index 0b400ee..e075204 100644 --- a/api/api.go +++ b/api/api.go @@ -1,29 +1,29 @@ package api import ( - "database/sql" "github.com/NickyMateev/Reviewer/api/project" "github.com/NickyMateev/Reviewer/api/pullrequest" "github.com/NickyMateev/Reviewer/api/user" + "github.com/NickyMateev/Reviewer/storage" "github.com/NickyMateev/Reviewer/web" ) type defaultAPI struct { - db *sql.DB + storage storage.Storage } // Default returns an instance of the default API -func Default(db *sql.DB) web.API { +func Default(storage storage.Storage) web.API { return defaultAPI{ - db: db, + storage: storage, } } // Controllers returns the default API's controllers func (api defaultAPI) Controllers() []web.Controller { return []web.Controller{ - user.Controller(api.db), - project.Controller(api.db), - pullrequest.Controller(api.db), + user.Controller(api.storage), + project.Controller(api.storage), + pullrequest.Controller(api.storage), } } diff --git a/api/project/project.go b/api/project/project.go index 4e499ce..24d78e1 100644 --- a/api/project/project.go +++ b/api/project/project.go @@ -1,15 +1,15 @@ package project import ( - "database/sql" + "github.com/NickyMateev/Reviewer/storage" "github.com/NickyMateev/Reviewer/web" "net/http" ) // Controller returns an instance of the Project controller -func Controller(db *sql.DB) *controller { +func Controller(storage storage.Storage) *controller { return &controller{ - db: db, + storage: storage, } } diff --git a/api/project/project_controller.go b/api/project/project_controller.go index b2651a2..2322fa7 100644 --- a/api/project/project_controller.go +++ b/api/project/project_controller.go @@ -4,6 +4,7 @@ import ( "database/sql" "encoding/json" "github.com/NickyMateev/Reviewer/models" + "github.com/NickyMateev/Reviewer/storage" "github.com/NickyMateev/Reviewer/web" "github.com/gorilla/mux" "github.com/pkg/errors" @@ -14,7 +15,7 @@ import ( ) type controller struct { - db *sql.DB + storage storage.Storage } func (c *controller) createProject(w http.ResponseWriter, r *http.Request) { @@ -36,7 +37,7 @@ func (c *controller) createProject(w http.ResponseWriter, r *http.Request) { return } - err = project.Insert(r.Context(), c.db, boil.Infer()) + err = project.Insert(r.Context(), c.storage.Get(), boil.Infer()) if err != nil { log.Println("Error creating new project:", err) web.WriteResponse(w, http.StatusInternalServerError, struct{}{}) @@ -51,7 +52,7 @@ func (c *controller) getProject(w http.ResponseWriter, r *http.Request) { projectID := vars["id"] log.Println("Getting project with id", projectID) - project, err := models.Projects(qm.Where("id = ?", projectID)).One(r.Context(), c.db) + project, err := models.Projects(qm.Where("id = ?", projectID)).One(r.Context(), c.storage.Get()) if err != nil { if err == sql.ErrNoRows { log.Println("Missing project:", err) @@ -67,7 +68,7 @@ func (c *controller) getProject(w http.ResponseWriter, r *http.Request) { func (c *controller) listProject(w http.ResponseWriter, r *http.Request) { log.Println("Getting all projects") - projects, err := models.Projects().All(r.Context(), c.db) + projects, err := models.Projects().All(r.Context(), c.storage.Get()) if err != nil { log.Println("Error getting projects:", err) web.WriteResponse(w, http.StatusInternalServerError, struct{}{}) @@ -86,7 +87,7 @@ func (c *controller) deleteProject(w http.ResponseWriter, r *http.Request) { projectID := vars["id"] log.Println("Deleting project with id", projectID) - rows, err := models.Projects(qm.Where("id = ?", projectID)).DeleteAll(r.Context(), c.db) + rows, err := models.Projects(qm.Where("id = ?", projectID)).DeleteAll(r.Context(), c.storage.Get()) if err != nil { log.Printf("Error deleting project with id %v: %v", projectID, err) web.WriteResponse(w, http.StatusInternalServerError, struct{}{}) diff --git a/api/pullrequest/pullrequest.go b/api/pullrequest/pullrequest.go index aa5b5c7..fd3f263 100644 --- a/api/pullrequest/pullrequest.go +++ b/api/pullrequest/pullrequest.go @@ -1,15 +1,15 @@ package pullrequest import ( - "database/sql" + "github.com/NickyMateev/Reviewer/storage" "github.com/NickyMateev/Reviewer/web" "net/http" ) // Controller returns an instance of the Pull Request controller -func Controller(db *sql.DB) *controller { +func Controller(storage storage.Storage) *controller { return &controller{ - db: db, + storage: storage, } } diff --git a/api/pullrequest/pullrequest_controller.go b/api/pullrequest/pullrequest_controller.go index e22ecbb..0196625 100644 --- a/api/pullrequest/pullrequest_controller.go +++ b/api/pullrequest/pullrequest_controller.go @@ -3,6 +3,7 @@ package pullrequest import ( "database/sql" "github.com/NickyMateev/Reviewer/models" + "github.com/NickyMateev/Reviewer/storage" "github.com/NickyMateev/Reviewer/web" "github.com/gorilla/mux" "github.com/volatiletech/sqlboiler/queries/qm" @@ -11,7 +12,7 @@ import ( ) type controller struct { - db *sql.DB + storage storage.Storage } func (c *controller) getPullRequest(w http.ResponseWriter, r *http.Request) { @@ -24,7 +25,7 @@ func (c *controller) getPullRequest(w http.ResponseWriter, r *http.Request) { qm.Load("Author"), qm.Load("Approvers"), qm.Load("Commenters"), - qm.Load("Idlers")).One(r.Context(), c.db) + qm.Load("Idlers")).One(r.Context(), c.storage.Get()) if err != nil { if err == sql.ErrNoRows { @@ -62,7 +63,7 @@ func (c *controller) getPullRequest(w http.ResponseWriter, r *http.Request) { func (c *controller) listPullRequest(w http.ResponseWriter, r *http.Request) { log.Println("Getting all pull requests") - pullRequests, err := models.PullRequests().All(r.Context(), c.db) + pullRequests, err := models.PullRequests().All(r.Context(), c.storage.Get()) if err != nil { log.Println("Error getting pull requests:", err) web.WriteResponse(w, http.StatusInternalServerError, struct{}{}) diff --git a/api/user/user.go b/api/user/user.go index 7770125..62133b9 100644 --- a/api/user/user.go +++ b/api/user/user.go @@ -1,15 +1,15 @@ package user import ( - "database/sql" + "github.com/NickyMateev/Reviewer/storage" "github.com/NickyMateev/Reviewer/web" "net/http" ) // Controller returns an instance of the User controller -func Controller(db *sql.DB) *controller { +func Controller(storage storage.Storage) *controller { return &controller{ - db: db, + storage: storage, } } diff --git a/api/user/user_controllers.go b/api/user/user_controllers.go index aa5e5f9..73dfed5 100644 --- a/api/user/user_controllers.go +++ b/api/user/user_controllers.go @@ -6,6 +6,7 @@ import ( "encoding/json" "errors" "github.com/NickyMateev/Reviewer/models" + "github.com/NickyMateev/Reviewer/storage" "github.com/NickyMateev/Reviewer/web" "github.com/gorilla/mux" "github.com/volatiletech/sqlboiler/boil" @@ -15,7 +16,7 @@ import ( ) type controller struct { - db *sql.DB + storage storage.Storage } func (c *controller) getUser(w http.ResponseWriter, r *http.Request) { @@ -27,7 +28,7 @@ func (c *controller) getUser(w http.ResponseWriter, r *http.Request) { qm.Where("id = ?", userID), qm.Load("ApprovedPullRequests"), qm.Load("CommentedPullRequests"), - qm.Load("IdledPullRequests")).One(r.Context(), c.db) + qm.Load("IdledPullRequests")).One(r.Context(), c.storage.Get()) if err != nil { if err == sql.ErrNoRows { log.Println("Missing user:", err) @@ -62,7 +63,7 @@ func (c *controller) getUser(w http.ResponseWriter, r *http.Request) { func (c *controller) listUsers(w http.ResponseWriter, r *http.Request) { log.Println("Getting all users") - users, err := models.Users().All(r.Context(), c.db) + users, err := models.Users().All(r.Context(), c.storage.Get()) if err != nil { log.Println("Error getting users:", err) web.WriteResponse(w, http.StatusInternalServerError, struct{}{}) @@ -97,7 +98,7 @@ func (c *controller) patchUser(w http.ResponseWriter, r *http.Request) { return } - usr, err := models.Users(qm.Where("id = ?", userID)).One(r.Context(), c.db) + usr, err := models.Users(qm.Where("id = ?", userID)).One(r.Context(), c.storage.Get()) if err != nil { if err == sql.ErrNoRows { log.Println("Missing user:", err) @@ -110,7 +111,7 @@ func (c *controller) patchUser(w http.ResponseWriter, r *http.Request) { } usr.Metadata = user.Metadata - _, err = usr.Update(context.Background(), c.db, boil.Infer()) + _, err = usr.Update(context.Background(), c.storage.Get(), boil.Infer()) if err != nil { log.Println("Error updating user with id", usr.ID) web.WriteResponse(w, http.StatusInternalServerError, struct{}{}) @@ -125,7 +126,7 @@ func (c *controller) deleteUser(w http.ResponseWriter, r *http.Request) { userID := vars["id"] log.Println("Deleting user with id", userID) - rows, err := models.Users(qm.Where("id = ?", userID)).DeleteAll(r.Context(), c.db) + rows, err := models.Users(qm.Where("id = ?", userID)).DeleteAll(r.Context(), c.storage.Get()) if err != nil { log.Printf("Error deleting user with id %v: %v", userID, err) web.WriteResponse(w, http.StatusInternalServerError, struct{}{}) diff --git a/job/idlers_reminder.go b/job/idlers_reminder.go index 63b768b..d16b6bb 100644 --- a/job/idlers_reminder.go +++ b/job/idlers_reminder.go @@ -2,9 +2,9 @@ package job import ( "context" - "database/sql" "fmt" "github.com/NickyMateev/Reviewer/models" + "github.com/NickyMateev/Reviewer/storage" "github.com/nlopes/slack" "github.com/volatiletech/sqlboiler/queries/qm" "log" @@ -14,17 +14,17 @@ const githubURL = "https://github.com/%v/%v" // IdlersReminder is a regular job which sends a notification to all users who have not reviewed assigned pull requests type IdlersReminder struct { - db *sql.DB - client *slack.Client - config SlackConfig + storage storage.Storage + client *slack.Client + config SlackConfig } // NewIdlersReminder creates an instance of IdlersReminder -func NewIdlersReminder(db *sql.DB, config SlackConfig) *IdlersReminder { +func NewIdlersReminder(storage storage.Storage, config SlackConfig) *IdlersReminder { return &IdlersReminder{ - db: db, - client: slack.New(config.BotToken), - config: config, + storage: storage, + client: slack.New(config.BotToken), + config: config, } } @@ -43,7 +43,7 @@ func (ir *IdlersReminder) Run() { log.Printf("STARTING %v job", ir.Name()) defer log.Printf("FINISHED %v job", ir.Name()) - pullRequests, err := models.PullRequests(qm.Load("Idlers"), qm.Load("Project")).All(context.Background(), ir.db) + pullRequests, err := models.PullRequests(qm.Load("Idlers"), qm.Load("Project")).All(context.Background(), ir.storage.Get()) if err != nil { log.Panic("Error retrieving pull requests:", err) } diff --git a/job/job.go b/job/job.go index 38d4401..e76993a 100644 --- a/job/job.go +++ b/job/job.go @@ -1,7 +1,7 @@ package job import ( - "database/sql" + "github.com/NickyMateev/Reviewer/storage" "github.com/google/go-github/github" "github.com/robfig/cron" ) @@ -25,15 +25,15 @@ type Container interface { } type defaultContainer struct { - db *sql.DB + storage storage.Storage client *github.Client slackConfig SlackConfig } // DefaultContainer creates an instance of the default job container -func DefaultContainer(db *sql.DB, client *github.Client, config SlackConfig) Container { +func DefaultContainer(storage storage.Storage, client *github.Client, config SlackConfig) Container { return defaultContainer{ - db: db, + storage: storage, client: client, slackConfig: config, } @@ -42,8 +42,8 @@ func DefaultContainer(db *sql.DB, client *github.Client, config SlackConfig) Con // Jobs returns the defined jobs for the default job container func (jc defaultContainer) Jobs() []Job { return []Job{ - NewPullRequestFetcher(jc.db, jc.client), - NewReviewFetcher(jc.db, jc.client), - NewIdlersReminder(jc.db, jc.slackConfig), + NewPullRequestFetcher(jc.storage, jc.client), + NewReviewFetcher(jc.storage, jc.client), + NewIdlersReminder(jc.storage, jc.slackConfig), } } diff --git a/job/pull_request_fetcher.go b/job/pull_request_fetcher.go index 9e90ca4..59dc693 100644 --- a/job/pull_request_fetcher.go +++ b/job/pull_request_fetcher.go @@ -2,9 +2,9 @@ package job import ( "context" - "database/sql" "fmt" "github.com/NickyMateev/Reviewer/models" + "github.com/NickyMateev/Reviewer/storage" "github.com/google/go-github/github" "github.com/volatiletech/sqlboiler/boil" "github.com/volatiletech/sqlboiler/queries/qm" @@ -16,15 +16,15 @@ import ( // PullRequestFetcher is a regular job which fetches the pull requests for all registered projects type PullRequestFetcher struct { - db *sql.DB - client *github.Client + storage storage.Storage + client *github.Client } // NewPullRequestFetcher creates an instance of PullRequestFetcher -func NewPullRequestFetcher(db *sql.DB, client *github.Client) *PullRequestFetcher { +func NewPullRequestFetcher(storage storage.Storage, client *github.Client) *PullRequestFetcher { return &PullRequestFetcher{ - db: db, - client: client, + storage: storage, + client: client, } } @@ -43,7 +43,7 @@ func (prf *PullRequestFetcher) Run() { log.Printf("STARTING %v job", prf.Name()) defer log.Printf("FINISHED %v job", prf.Name()) - projects, err := models.Projects().All(context.Background(), prf.db) + projects, err := models.Projects().All(context.Background(), prf.storage.Get()) if err != nil { log.Panic("Unable to fetch projects:", err) } @@ -69,13 +69,13 @@ func (prf *PullRequestFetcher) Run() { func (prf *PullRequestFetcher) fetchPullRequests(pullRequests []*github.PullRequest, projectID int64, projectName string, wg *sync.WaitGroup) { defer wg.Done() for _, pullRequest := range pullRequests { - exists, err := models.PullRequests(qm.Where("github_id = ?", pullRequest.GetID())).Exists(context.Background(), prf.db) + exists, err := models.PullRequests(qm.Where("github_id = ?", pullRequest.GetID())).Exists(context.Background(), prf.storage.Get()) if err != nil { log.Panicf("Error retrieving pull requests for %v: %v\n", projectName, err.Error()) } if !exists { - user := transformUser(pullRequest.GetUser(), prf.db) + user := transformUser(pullRequest.GetUser(), prf.storage.Get()) log.Printf("Persisting new pull request: %q (%v)\n", pullRequest.GetTitle(), projectName) @@ -89,7 +89,7 @@ func (prf *PullRequestFetcher) fetchPullRequests(pullRequests []*github.PullRequ CreatedAt: time.Now(), UpdatedAt: time.Now()} - err = pr.Insert(context.Background(), prf.db, boil.Infer()) + err = pr.Insert(context.Background(), prf.storage.Get(), boil.Infer()) if err != nil { log.Panicf("Error persisting pull request %q (%v): %v\n", pr.Title, projectName, err.Error()) } @@ -97,10 +97,10 @@ func (prf *PullRequestFetcher) fetchPullRequests(pullRequests []*github.PullRequ reviewers := make([]*models.User, 0) for _, reviewer := range pullRequest.RequestedReviewers { - reviewers = append(reviewers, transformUser(reviewer, prf.db)) + reviewers = append(reviewers, transformUser(reviewer, prf.storage.Get())) } if len(reviewers) > 0 { - err := pr.AddReviewers(context.Background(), prf.db, false, reviewers...) + err := pr.AddReviewers(context.Background(), prf.storage.Get(), false, reviewers...) if err != nil { log.Panicf("Error persisting pull request reviewers %q (%v): %v\n", pr.Title, projectName, err.Error()) } diff --git a/job/review_fetcher.go b/job/review_fetcher.go index 9cda8e6..423eb66 100644 --- a/job/review_fetcher.go +++ b/job/review_fetcher.go @@ -2,8 +2,8 @@ package job import ( "context" - "database/sql" "github.com/NickyMateev/Reviewer/models" + "github.com/NickyMateev/Reviewer/storage" "github.com/google/go-github/github" "github.com/volatiletech/sqlboiler/boil" "github.com/volatiletech/sqlboiler/queries/qm" @@ -15,15 +15,15 @@ const approvedState = "APPROVED" // ReviewFetcher is a regular job which fetches reviews for tracked pull requests type ReviewFetcher struct { - db *sql.DB - client *github.Client + storage storage.Storage + client *github.Client } // NewReviewFetcher creates an instance of ReviewFetcheer -func NewReviewFetcher(db *sql.DB, client *github.Client) *ReviewFetcher { +func NewReviewFetcher(storage storage.Storage, client *github.Client) *ReviewFetcher { return &ReviewFetcher{ - db: db, - client: client, + storage: storage, + client: client, } } @@ -42,7 +42,7 @@ func (rf *ReviewFetcher) Run() { log.Printf("STARTING %v job", rf.Name()) defer log.Printf("FINISHED %v job", rf.Name()) - pullRequests, err := models.PullRequests(qm.Load("Project"), qm.Load("Reviewers")).All(context.Background(), rf.db) + pullRequests, err := models.PullRequests(qm.Load("Project"), qm.Load("Reviewers")).All(context.Background(), rf.storage.Get()) if err != nil { log.Panic("Unable to fetch pull requests:", err) } @@ -56,27 +56,27 @@ func (rf *ReviewFetcher) Run() { reviewers := make([]*models.User, 0) for _, review := range reviews { - user := transformUser(review.GetUser(), rf.db) + user := transformUser(review.GetUser(), rf.storage.Get()) reviewers = append(reviewers, user) if review.GetState() == approvedState { - exists, err := user.ApprovedPullRequests(qm.Where("pull_request_id = ?", pr.ID)).Exists(context.Background(), rf.db) + exists, err := user.ApprovedPullRequests(qm.Where("pull_request_id = ?", pr.ID)).Exists(context.Background(), rf.storage.Get()) if err != nil { log.Panic("Unable to check pull request activity record:", err) } if !exists { - err = user.AddApprovedPullRequests(context.Background(), rf.db, false, pr) + err = user.AddApprovedPullRequests(context.Background(), rf.storage.Get(), false, pr) if err != nil { log.Panic("Unable to persist user approved pull request") } } } else { - exists, err := user.CommentedPullRequests(qm.Where("pull_request_id = ?", pr.ID)).Exists(context.Background(), rf.db) + exists, err := user.CommentedPullRequests(qm.Where("pull_request_id = ?", pr.ID)).Exists(context.Background(), rf.storage.Get()) if err != nil { log.Panic("Unable to check pull request activity record:", err) } if !exists { - err = user.AddCommentedPullRequests(context.Background(), rf.db, false, pr) + err = user.AddCommentedPullRequests(context.Background(), rf.storage.Get(), false, pr) if err != nil { log.Panic("Unable to persist user commented pull request") } @@ -88,9 +88,9 @@ func (rf *ReviewFetcher) Run() { } idlers := rf.findIdlers(pr.R.Reviewers, reviewers) - pr.AddIdlers(context.Background(), rf.db, false, idlers...) + pr.AddIdlers(context.Background(), rf.storage.Get(), false, idlers...) - pr.Update(context.Background(), rf.db, boil.Infer()) // updates the 'updated_at' column + pr.Update(context.Background(), rf.storage.Get(), boil.Infer()) // updates the 'updated_at' column } } diff --git a/main.go b/main.go index 188a43c..5eb0b59 100644 --- a/main.go +++ b/main.go @@ -13,13 +13,13 @@ func main() { panic(err) } - db, err := storage.New(config.Storage) + storage, err := storage.New(config.Storage) if err != nil { panic(err) } - defer db.Close() + defer storage.Close() - srv := server.New(config.Server, db, github.NewClient(nil), config.Slack) + srv := server.New(config.Server, storage, github.NewClient(nil), config.Slack) if err := srv.Run(); err != nil { panic(err) } diff --git a/server/server.go b/server/server.go index 9311c63..5216492 100644 --- a/server/server.go +++ b/server/server.go @@ -1,9 +1,9 @@ package server import ( - "database/sql" "github.com/NickyMateev/Reviewer/api" "github.com/NickyMateev/Reviewer/job" + "github.com/NickyMateev/Reviewer/storage" "github.com/NickyMateev/Reviewer/web" "github.com/google/go-github/github" "github.com/gorilla/mux" @@ -17,7 +17,7 @@ import ( // Server represents the application's server type Server struct { Config Config - DB *sql.DB + Storage storage.Storage Router *mux.Router JobContainer job.Container } @@ -29,15 +29,15 @@ type Config struct { } // New creates a new Server instance -func New(cfg Config, db *sql.DB, client *github.Client, slackConfig job.SlackConfig) *Server { - defaultAPI := api.Default(db) +func New(cfg Config, storage storage.Storage, client *github.Client, slackConfig job.SlackConfig) *Server { + defaultAPI := api.Default(storage) router := buildRouter(defaultAPI) return &Server{ Config: cfg, - DB: db, + Storage: storage, Router: router, - JobContainer: job.DefaultContainer(db, client, slackConfig), + JobContainer: job.DefaultContainer(storage, client, slackConfig), } } diff --git a/storage/storage.go b/storage/storage.go index 8807f33..5d1f506 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -23,8 +23,25 @@ type Config struct { URI string } +type Storage interface { + Get() *sql.DB + Close() error +} + +type postgresStorage struct { + db *sql.DB +} + +func (ps *postgresStorage) Get() *sql.DB { + return ps.db +} + +func (ps *postgresStorage) Close() error { + return ps.db.Close() +} + // New creates an *sql.DB object and updates the database with the latest migrations -func New(cfg Config) (*sql.DB, error) { +func New(cfg Config) (Storage, error) { db, err := sql.Open(cfg.Type, cfg.URI) if err != nil { return nil, err @@ -36,7 +53,9 @@ func New(cfg Config) (*sql.DB, error) { } log.Println("Database is up-to-date") - return db, nil + return &postgresStorage{ + db: db, + }, nil } func updateSchema(db *sql.DB, dbType string) error { From afd12a7eb5980796cd5cb31490478edf7cbfe4d1 Mon Sep 17 00:00:00 2001 From: NickyMateev Date: Tue, 2 Apr 2019 17:46:48 +0300 Subject: [PATCH 3/6] Extend Storage interface with Transaction --- api/project/project_controller.go | 8 ++-- api/pullrequest/pullrequest_controller.go | 8 ++-- api/user/user_controllers.go | 46 ++++++++++++----------- storage/storage.go | 17 +++++++++ 4 files changed, 51 insertions(+), 28 deletions(-) diff --git a/api/project/project_controller.go b/api/project/project_controller.go index 2322fa7..bb7fd82 100644 --- a/api/project/project_controller.go +++ b/api/project/project_controller.go @@ -68,6 +68,8 @@ func (c *controller) getProject(w http.ResponseWriter, r *http.Request) { func (c *controller) listProject(w http.ResponseWriter, r *http.Request) { log.Println("Getting all projects") + + result := make([]*models.Project, 0) projects, err := models.Projects().All(r.Context(), c.storage.Get()) if err != nil { log.Println("Error getting projects:", err) @@ -75,11 +77,11 @@ func (c *controller) listProject(w http.ResponseWriter, r *http.Request) { return } - if len(projects) == 0 { - projects = []*models.Project{} + if len(projects) > 0 { + result = projects } - web.WriteResponse(w, http.StatusOK, projects) + web.WriteResponse(w, http.StatusOK, result) } func (c *controller) deleteProject(w http.ResponseWriter, r *http.Request) { diff --git a/api/pullrequest/pullrequest_controller.go b/api/pullrequest/pullrequest_controller.go index 0196625..1023753 100644 --- a/api/pullrequest/pullrequest_controller.go +++ b/api/pullrequest/pullrequest_controller.go @@ -63,6 +63,8 @@ func (c *controller) getPullRequest(w http.ResponseWriter, r *http.Request) { func (c *controller) listPullRequest(w http.ResponseWriter, r *http.Request) { log.Println("Getting all pull requests") + + result := make([]*models.PullRequest, 0) pullRequests, err := models.PullRequests().All(r.Context(), c.storage.Get()) if err != nil { log.Println("Error getting pull requests:", err) @@ -70,11 +72,11 @@ func (c *controller) listPullRequest(w http.ResponseWriter, r *http.Request) { return } - if len(pullRequests) == 0 { - pullRequests = []*models.PullRequest{} + if len(pullRequests) > 0 { + result = pullRequests } - web.WriteResponse(w, http.StatusOK, pullRequests) + web.WriteResponse(w, http.StatusOK, result) } func getUserStrings(users models.UserSlice) []string { diff --git a/api/user/user_controllers.go b/api/user/user_controllers.go index 73dfed5..359bedd 100644 --- a/api/user/user_controllers.go +++ b/api/user/user_controllers.go @@ -63,6 +63,8 @@ func (c *controller) getUser(w http.ResponseWriter, r *http.Request) { func (c *controller) listUsers(w http.ResponseWriter, r *http.Request) { log.Println("Getting all users") + + result := make([]*models.User, 0) users, err := models.Users().All(r.Context(), c.storage.Get()) if err != nil { log.Println("Error getting users:", err) @@ -70,11 +72,11 @@ func (c *controller) listUsers(w http.ResponseWriter, r *http.Request) { return } - if len(users) == 0 { - users = []*models.User{} + if len(users) > 0 { + result = users } - web.WriteResponse(w, http.StatusOK, users) + web.WriteResponse(w, http.StatusOK, result) } func (c *controller) patchUser(w http.ResponseWriter, r *http.Request) { @@ -83,42 +85,42 @@ func (c *controller) patchUser(w http.ResponseWriter, r *http.Request) { log.Println("Updating user with id", userID) decoder := json.NewDecoder(r.Body) - user := models.User{} - err := decoder.Decode(&user) + reqUser := models.User{} + err := decoder.Decode(&reqUser) if err != nil { log.Println("Error decoding user payload:", err) web.WriteResponse(w, http.StatusBadRequest, web.ErrorResponse{Error: "decoding error"}) return } - err = validateUser(&user) + err = validateUser(&reqUser) if err != nil { log.Println("Validation error:", err) web.WriteResponse(w, http.StatusBadRequest, web.ErrorResponse{Error: err.Error()}) return } - usr, err := models.Users(qm.Where("id = ?", userID)).One(r.Context(), c.storage.Get()) - if err != nil { - if err == sql.ErrNoRows { - log.Println("Missing user:", err) - web.WriteResponse(w, http.StatusNotFound, web.ErrorResponse{Error: "missing user"}) - } else { - log.Printf("Error getting user with id %v: %v\n", userID, err) - web.WriteResponse(w, http.StatusInternalServerError, struct{}{}) + var user *models.User + txErr := c.storage.Transaction(r.Context(), func(context context.Context, tx *sql.Tx) error { + var err error + user, err = models.Users(qm.Where("id = ?", userID)).One(context, tx) + if err != nil { + return err } - return - } + user.Metadata = reqUser.Metadata + _, err = user.Update(context, tx, boil.Infer()) + return err + }) - usr.Metadata = user.Metadata - _, err = usr.Update(context.Background(), c.storage.Get(), boil.Infer()) - if err != nil { - log.Println("Error updating user with id", usr.ID) + if txErr == sql.ErrNoRows { + log.Println("Missing user:", err) + web.WriteResponse(w, http.StatusNotFound, web.ErrorResponse{Error: "missing user"}) + } else if txErr != nil { + log.Printf("Error updating user with id %v: %v\n", userID, err) web.WriteResponse(w, http.StatusInternalServerError, struct{}{}) - return } - web.WriteResponse(w, http.StatusNoContent, usr) + web.WriteResponse(w, http.StatusOK, user) } func (c *controller) deleteUser(w http.ResponseWriter, r *http.Request) { diff --git a/storage/storage.go b/storage/storage.go index 5d1f506..90ffa72 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -1,6 +1,7 @@ package storage import ( + "context" "database/sql" "fmt" "github.com/golang-migrate/migrate" @@ -25,6 +26,7 @@ type Config struct { type Storage interface { Get() *sql.DB + Transaction(context context.Context, operation func(context context.Context, tx *sql.Tx) error) error Close() error } @@ -36,6 +38,21 @@ func (ps *postgresStorage) Get() *sql.DB { return ps.db } +func (ps *postgresStorage) Transaction(context context.Context, operation func(context context.Context, tx *sql.Tx) error) error { + tx, err := ps.db.BeginTx(context, nil) + if err != nil { + return err + } + + if opErr := operation(context, tx); opErr != nil { + if err := tx.Rollback(); err != nil { + return err + } + return opErr + } + return tx.Commit() +} + func (ps *postgresStorage) Close() error { return ps.db.Close() } From 964173685e277d27463abafd205707e53ac3d790 Mon Sep 17 00:00:00 2001 From: NickyMateev Date: Wed, 3 Apr 2019 19:36:28 +0300 Subject: [PATCH 4/6] Stop panicing in jobs --- job/idlers_reminder.go | 3 +- job/pull_request_fetcher.go | 73 +++++++++++++++++++++++-------------- job/review_fetcher.go | 43 ++++++++++++++-------- job/util.go | 12 +++--- storage/storage.go | 2 +- 5 files changed, 82 insertions(+), 51 deletions(-) diff --git a/job/idlers_reminder.go b/job/idlers_reminder.go index d16b6bb..7c707c9 100644 --- a/job/idlers_reminder.go +++ b/job/idlers_reminder.go @@ -45,7 +45,8 @@ func (ir *IdlersReminder) Run() { pullRequests, err := models.PullRequests(qm.Load("Idlers"), qm.Load("Project")).All(context.Background(), ir.storage.Get()) if err != nil { - log.Panic("Error retrieving pull requests:", err) + log.Println("Error retrieving pull requests:", err) + return } attachment := new(slack.Attachment) diff --git a/job/pull_request_fetcher.go b/job/pull_request_fetcher.go index 59dc693..aeb1dcb 100644 --- a/job/pull_request_fetcher.go +++ b/job/pull_request_fetcher.go @@ -45,7 +45,8 @@ func (prf *PullRequestFetcher) Run() { projects, err := models.Projects().All(context.Background(), prf.storage.Get()) if err != nil { - log.Panic("Unable to fetch projects:", err) + log.Println("Unable to fetch projects:", err) + return } log.Printf("%d project(s) are about to be reconciled\n", len(projects)) @@ -56,7 +57,8 @@ func (prf *PullRequestFetcher) Run() { pullRequests, resp, err := prf.client.PullRequests.List(context.Background(), project.RepoOwner, project.RepoName, nil) if err != nil || resp.StatusCode != http.StatusOK { - log.Panic("Unable to fetch pull requests:", err) + log.Println("Unable to fetch pull requests:", err) + continue } log.Printf("(%d) pull request(s) fetched for project %v\n", len(pullRequests), projectName) @@ -68,42 +70,57 @@ func (prf *PullRequestFetcher) Run() { func (prf *PullRequestFetcher) fetchPullRequests(pullRequests []*github.PullRequest, projectID int64, projectName string, wg *sync.WaitGroup) { defer wg.Done() + db := prf.storage.Get() for _, pullRequest := range pullRequests { - exists, err := models.PullRequests(qm.Where("github_id = ?", pullRequest.GetID())).Exists(context.Background(), prf.storage.Get()) + exists, err := models.PullRequests(qm.Where("github_id = ?", pullRequest.GetID())).Exists(context.Background(), db) if err != nil { - log.Panicf("Error retrieving pull requests for %v: %v\n", projectName, err.Error()) + log.Printf("Error retrieving pull requests for %v: %v\n", projectName, err.Error()) + continue } - if !exists { - user := transformUser(pullRequest.GetUser(), prf.storage.Get()) + if exists { + continue + } + + user, err := transformUser(pullRequest.GetUser(), db) + if err != nil { + log.Println("Unable to transform user:", err) + continue + } + + log.Printf("Persisting new pull request: %q (%v)\n", pullRequest.GetTitle(), projectName) - log.Printf("Persisting new pull request: %q (%v)\n", pullRequest.GetTitle(), projectName) + pr := models.PullRequest{ + Title: pullRequest.GetTitle(), + URL: pullRequest.GetHTMLURL(), + Number: int64(pullRequest.GetNumber()), + GithubID: pullRequest.GetID(), + UserID: user.ID, + ProjectID: projectID, + CreatedAt: time.Now(), + UpdatedAt: time.Now()} - pr := models.PullRequest{ - Title: pullRequest.GetTitle(), - URL: pullRequest.GetHTMLURL(), - Number: int64(pullRequest.GetNumber()), - GithubID: pullRequest.GetID(), - UserID: user.ID, - ProjectID: projectID, - CreatedAt: time.Now(), - UpdatedAt: time.Now()} + err = pr.Insert(context.Background(), db, boil.Infer()) + if err != nil { + log.Printf("Error persisting pull request %q (%v): %v\n", pr.Title, projectName, err.Error()) + continue + } + log.Printf("Pull request %q successfully persisted (%v)\n", pr.Title, projectName) - err = pr.Insert(context.Background(), prf.storage.Get(), boil.Infer()) + reviewers := make([]*models.User, 0) + for _, reviewer := range pullRequest.RequestedReviewers { + rev, err := transformUser(reviewer, db) if err != nil { - log.Panicf("Error persisting pull request %q (%v): %v\n", pr.Title, projectName, err.Error()) + log.Println("Unable to transform reviewer user:", err) + continue } - log.Printf("Pull request %q successfully persisted (%v)\n", pr.Title, projectName) - reviewers := make([]*models.User, 0) - for _, reviewer := range pullRequest.RequestedReviewers { - reviewers = append(reviewers, transformUser(reviewer, prf.storage.Get())) - } - if len(reviewers) > 0 { - err := pr.AddReviewers(context.Background(), prf.storage.Get(), false, reviewers...) - if err != nil { - log.Panicf("Error persisting pull request reviewers %q (%v): %v\n", pr.Title, projectName, err.Error()) - } + reviewers = append(reviewers, rev) + } + if len(reviewers) > 0 { + err := pr.AddReviewers(context.Background(), db, false, reviewers...) + if err != nil { + log.Printf("Error persisting pull request reviewers %q (%v): %v\n", pr.Title, projectName, err.Error()) } } } diff --git a/job/review_fetcher.go b/job/review_fetcher.go index 423eb66..d7b8aaa 100644 --- a/job/review_fetcher.go +++ b/job/review_fetcher.go @@ -42,55 +42,68 @@ func (rf *ReviewFetcher) Run() { log.Printf("STARTING %v job", rf.Name()) defer log.Printf("FINISHED %v job", rf.Name()) - pullRequests, err := models.PullRequests(qm.Load("Project"), qm.Load("Reviewers")).All(context.Background(), rf.storage.Get()) + db := rf.storage.Get() + + pullRequests, err := models.PullRequests(qm.Load("Project"), qm.Load("Reviewers")).All(context.Background(), db) if err != nil { - log.Panic("Unable to fetch pull requests:", err) + log.Println("Unable to fetch pull requests:", err) + return } for _, pr := range pullRequests { reviews, resp, err := rf.client.PullRequests.ListReviews(context.Background(), pr.R.Project.RepoOwner, pr.R.Project.RepoName, int(pr.Number), nil) if err != nil || resp.StatusCode != http.StatusOK { - log.Panic("Unable to fetch pull request reviews:", err) + log.Println("Unable to fetch pull request reviews:", err) + continue } log.Printf("(%d) review(s) fetched for pull request %q\n", len(reviews), pr.Title) reviewers := make([]*models.User, 0) for _, review := range reviews { - user := transformUser(review.GetUser(), rf.storage.Get()) + user, err := transformUser(review.GetUser(), db) + if err != nil { + log.Println("Unable to transform reviewer user:", err) + continue + } + reviewers = append(reviewers, user) if review.GetState() == approvedState { - exists, err := user.ApprovedPullRequests(qm.Where("pull_request_id = ?", pr.ID)).Exists(context.Background(), rf.storage.Get()) + exists, err := user.ApprovedPullRequests(qm.Where("pull_request_id = ?", pr.ID)).Exists(context.Background(), db) if err != nil { - log.Panic("Unable to check pull request activity record:", err) + log.Println("Unable to check pull request activity record:", err) + continue } if !exists { - err = user.AddApprovedPullRequests(context.Background(), rf.storage.Get(), false, pr) + err = user.AddApprovedPullRequests(context.Background(), db, false, pr) if err != nil { - log.Panic("Unable to persist user approved pull request") + log.Println("Unable to persist user approved pull request") + continue } } } else { - exists, err := user.CommentedPullRequests(qm.Where("pull_request_id = ?", pr.ID)).Exists(context.Background(), rf.storage.Get()) + exists, err := user.CommentedPullRequests(qm.Where("pull_request_id = ?", pr.ID)).Exists(context.Background(), db) if err != nil { - log.Panic("Unable to check pull request activity record:", err) + log.Println("Unable to check pull request activity record:", err) + continue } if !exists { - err = user.AddCommentedPullRequests(context.Background(), rf.storage.Get(), false, pr) + err = user.AddCommentedPullRequests(context.Background(), db, false, pr) if err != nil { - log.Panic("Unable to persist user commented pull request") + log.Println("Unable to persist user commented pull request") + continue } } } if err != nil { - log.Panic("Unable to persist pull request activity record:", err) + log.Println("Unable to persist pull request activity record:", err) } } idlers := rf.findIdlers(pr.R.Reviewers, reviewers) - pr.AddIdlers(context.Background(), rf.storage.Get(), false, idlers...) + pr.AddIdlers(context.Background(), db, false, idlers...) - pr.Update(context.Background(), rf.storage.Get(), boil.Infer()) // updates the 'updated_at' column + pr.Update(context.Background(), db, boil.Infer()) // updates the 'updated_at' column } } diff --git a/job/util.go b/job/util.go index ce3429f..c19758e 100644 --- a/job/util.go +++ b/job/util.go @@ -3,11 +3,11 @@ package job import ( "context" "database/sql" + "fmt" "github.com/NickyMateev/Reviewer/models" "github.com/google/go-github/github" "github.com/volatiletech/sqlboiler/boil" "github.com/volatiletech/sqlboiler/queries/qm" - "log" ) type SlackConfig struct { @@ -15,10 +15,10 @@ type SlackConfig struct { BotToken string } -func transformUser(githubUser *github.User, db *sql.DB) *models.User { +func transformUser(githubUser *github.User, db *sql.DB) (*models.User, error) { exists, err := models.Users(qm.Where("github_id = ?", githubUser.GetID())).Exists(context.Background(), db) if err != nil { - log.Panicf("Error searching for user %q [%v]: %v\n", githubUser.GetLogin(), githubUser.GetID(), err.Error()) + return nil, fmt.Errorf("Error searching for user %q [%v]: %v\n", githubUser.GetLogin(), githubUser.GetID(), err.Error()) } var user *models.User @@ -26,14 +26,14 @@ func transformUser(githubUser *github.User, db *sql.DB) *models.User { user = &models.User{Username: githubUser.GetLogin(), GithubID: githubUser.GetID()} err := user.Insert(context.Background(), db, boil.Infer()) if err != nil { - log.Panicf("Error persisting user %q [%v]: %v\n", githubUser.GetLogin(), githubUser.GetID(), err.Error()) + return nil, fmt.Errorf("Error persisting user %q [%v]: %v\n", githubUser.GetLogin(), githubUser.GetID(), err.Error()) } } else { user, err = models.Users(qm.Where("github_id = ?", githubUser.GetID())).One(context.Background(), db) if err != nil { - log.Panicf("Error retrieving user %q [%v]: %v\n", githubUser.GetLogin(), githubUser.GetID(), err.Error()) + return nil, fmt.Errorf("Error retrieving user %q [%v]: %v\n", githubUser.GetLogin(), githubUser.GetID(), err.Error()) } } - return user + return user, nil } diff --git a/storage/storage.go b/storage/storage.go index 90ffa72..2c1df15 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -57,7 +57,7 @@ func (ps *postgresStorage) Close() error { return ps.db.Close() } -// New creates an *sql.DB object and updates the database with the latest migrations +// New creates a Storage object and updates the database with the latest migrations func New(cfg Config) (Storage, error) { db, err := sql.Open(cfg.Type, cfg.URI) if err != nil { From 7ce55378261458dc0ee6bd9545fb58fa31fffa58 Mon Sep 17 00:00:00 2001 From: NickyMateev Date: Wed, 3 Apr 2019 20:34:15 +0300 Subject: [PATCH 5/6] Use transactions in jobs with multiple queries --- job/pull_request_fetcher.go | 93 +++++++++++++++++++------------------ job/review_fetcher.go | 82 ++++++++++++++++---------------- job/util.go | 8 ++-- storage/storage.go | 5 +- 4 files changed, 97 insertions(+), 91 deletions(-) diff --git a/job/pull_request_fetcher.go b/job/pull_request_fetcher.go index aeb1dcb..17ca26d 100644 --- a/job/pull_request_fetcher.go +++ b/job/pull_request_fetcher.go @@ -2,6 +2,7 @@ package job import ( "context" + "database/sql" "fmt" "github.com/NickyMateev/Reviewer/models" "github.com/NickyMateev/Reviewer/storage" @@ -63,65 +64,67 @@ func (prf *PullRequestFetcher) Run() { log.Printf("(%d) pull request(s) fetched for project %v\n", len(pullRequests), projectName) wg.Add(1) - go prf.fetchPullRequests(pullRequests, project.ID, projectName, &wg) + go prf.persistPullRequests(pullRequests, project.ID, projectName, &wg) } wg.Wait() } -func (prf *PullRequestFetcher) fetchPullRequests(pullRequests []*github.PullRequest, projectID int64, projectName string, wg *sync.WaitGroup) { +func (prf *PullRequestFetcher) persistPullRequests(pullRequests []*github.PullRequest, projectID int64, projectName string, wg *sync.WaitGroup) { defer wg.Done() - db := prf.storage.Get() for _, pullRequest := range pullRequests { - exists, err := models.PullRequests(qm.Where("github_id = ?", pullRequest.GetID())).Exists(context.Background(), db) - if err != nil { - log.Printf("Error retrieving pull requests for %v: %v\n", projectName, err.Error()) - continue - } + txErr := prf.storage.Transaction(context.Background(), func(context context.Context, tx *sql.Tx) error { + exists, err := models.PullRequests(qm.Where("github_id = ?", pullRequest.GetID())).Exists(context, tx) + if err != nil { + return fmt.Errorf("error retrieving pull requests for %v: %s", projectName, err) + } - if exists { - continue - } + if exists { + return nil + } - user, err := transformUser(pullRequest.GetUser(), db) - if err != nil { - log.Println("Unable to transform user:", err) - continue - } + user, err := transformUser(context, tx, pullRequest.GetUser()) + if err != nil { + return fmt.Errorf("unable to transform user: %s", err) + } - log.Printf("Persisting new pull request: %q (%v)\n", pullRequest.GetTitle(), projectName) - - pr := models.PullRequest{ - Title: pullRequest.GetTitle(), - URL: pullRequest.GetHTMLURL(), - Number: int64(pullRequest.GetNumber()), - GithubID: pullRequest.GetID(), - UserID: user.ID, - ProjectID: projectID, - CreatedAt: time.Now(), - UpdatedAt: time.Now()} - - err = pr.Insert(context.Background(), db, boil.Infer()) - if err != nil { - log.Printf("Error persisting pull request %q (%v): %v\n", pr.Title, projectName, err.Error()) - continue - } - log.Printf("Pull request %q successfully persisted (%v)\n", pr.Title, projectName) + log.Printf("Persisting new pull request: %q (%v)\n", pullRequest.GetTitle(), projectName) - reviewers := make([]*models.User, 0) - for _, reviewer := range pullRequest.RequestedReviewers { - rev, err := transformUser(reviewer, db) + pr := models.PullRequest{ + Title: pullRequest.GetTitle(), + URL: pullRequest.GetHTMLURL(), + Number: int64(pullRequest.GetNumber()), + GithubID: pullRequest.GetID(), + UserID: user.ID, + ProjectID: projectID, + CreatedAt: time.Now(), + UpdatedAt: time.Now()} + + err = pr.Insert(context, tx, boil.Infer()) if err != nil { - log.Println("Unable to transform reviewer user:", err) - continue + return err } + log.Printf("Pull request %q successfully persisted (%v)\n", pr.Title, projectName) - reviewers = append(reviewers, rev) - } - if len(reviewers) > 0 { - err := pr.AddReviewers(context.Background(), db, false, reviewers...) - if err != nil { - log.Printf("Error persisting pull request reviewers %q (%v): %v\n", pr.Title, projectName, err.Error()) + reviewers := make([]*models.User, 0) + for _, reviewer := range pullRequest.RequestedReviewers { + rev, err := transformUser(context, tx, reviewer) + if err != nil { + return fmt.Errorf("unable to transform reviewer user: %s", err) + } + + reviewers = append(reviewers, rev) } + if len(reviewers) > 0 { + err := pr.AddReviewers(context, tx, false, reviewers...) + if err != nil { + return err + } + } + return nil + }) + + if txErr != nil { + log.Printf("Unable to persist pull request %q: %s\n", pullRequest.Title, txErr) } } } diff --git a/job/review_fetcher.go b/job/review_fetcher.go index d7b8aaa..760398c 100644 --- a/job/review_fetcher.go +++ b/job/review_fetcher.go @@ -2,6 +2,8 @@ package job import ( "context" + "database/sql" + "fmt" "github.com/NickyMateev/Reviewer/models" "github.com/NickyMateev/Reviewer/storage" "github.com/google/go-github/github" @@ -42,68 +44,68 @@ func (rf *ReviewFetcher) Run() { log.Printf("STARTING %v job", rf.Name()) defer log.Printf("FINISHED %v job", rf.Name()) - db := rf.storage.Get() - - pullRequests, err := models.PullRequests(qm.Load("Project"), qm.Load("Reviewers")).All(context.Background(), db) + pullRequests, err := models.PullRequests(qm.Load("Project"), qm.Load("Reviewers")).All(context.Background(), rf.storage.Get()) if err != nil { log.Println("Unable to fetch pull requests:", err) return } - for _, pr := range pullRequests { - reviews, resp, err := rf.client.PullRequests.ListReviews(context.Background(), pr.R.Project.RepoOwner, pr.R.Project.RepoName, int(pr.Number), nil) + for _, pullRequest := range pullRequests { + reviews, resp, err := rf.client.PullRequests.ListReviews(context.Background(), pullRequest.R.Project.RepoOwner, pullRequest.R.Project.RepoName, int(pullRequest.Number), nil) if err != nil || resp.StatusCode != http.StatusOK { log.Println("Unable to fetch pull request reviews:", err) continue } - log.Printf("(%d) review(s) fetched for pull request %q\n", len(reviews), pr.Title) - - reviewers := make([]*models.User, 0) - for _, review := range reviews { - user, err := transformUser(review.GetUser(), db) - if err != nil { - log.Println("Unable to transform reviewer user:", err) - continue - } + log.Printf("(%d) review(s) fetched for pull request %q\n", len(reviews), pullRequest.Title) - reviewers = append(reviewers, user) - - if review.GetState() == approvedState { - exists, err := user.ApprovedPullRequests(qm.Where("pull_request_id = ?", pr.ID)).Exists(context.Background(), db) + txErr := rf.storage.Transaction(context.Background(), func(context context.Context, tx *sql.Tx) error { + reviewers := make([]*models.User, 0) + for _, review := range reviews { + user, err := transformUser(context, tx, review.GetUser()) if err != nil { - log.Println("Unable to check pull request activity record:", err) - continue + return fmt.Errorf("unable to transform reviewer user: %s", err) } - if !exists { - err = user.AddApprovedPullRequests(context.Background(), db, false, pr) + + reviewers = append(reviewers, user) + + if review.GetState() == approvedState { + exists, err := user.ApprovedPullRequests(qm.Where("pull_request_id = ?", pullRequest.ID)).Exists(context, tx) if err != nil { - log.Println("Unable to persist user approved pull request") - continue + return err } - } - } else { - exists, err := user.CommentedPullRequests(qm.Where("pull_request_id = ?", pr.ID)).Exists(context.Background(), db) - if err != nil { - log.Println("Unable to check pull request activity record:", err) - continue - } - if !exists { - err = user.AddCommentedPullRequests(context.Background(), db, false, pr) + if !exists { + err := user.AddApprovedPullRequests(context, tx, false, pullRequest) + if err != nil { + return err + } + } + } else { + exists, err := user.CommentedPullRequests(qm.Where("pull_request_id = ?", pullRequest.ID)).Exists(context, tx) if err != nil { - log.Println("Unable to persist user commented pull request") - continue + return err + } + if !exists { + err := user.AddCommentedPullRequests(context, tx, false, pullRequest) + if err != nil { + return err + } } } } + + idlers := rf.findIdlers(pullRequest.R.Reviewers, reviewers) + err = pullRequest.AddIdlers(context, tx, false, idlers...) if err != nil { - log.Println("Unable to persist pull request activity record:", err) + return err } - } - idlers := rf.findIdlers(pr.R.Reviewers, reviewers) - pr.AddIdlers(context.Background(), db, false, idlers...) + _, err = pullRequest.Update(context, tx, boil.Infer()) // updates the 'updated_at' column + return err + }) - pr.Update(context.Background(), db, boil.Infer()) // updates the 'updated_at' column + if txErr != nil { + log.Printf("Unable to persist pull request %q activity records: %v\n", pullRequest.Title, txErr) + } } } diff --git a/job/util.go b/job/util.go index c19758e..ab1489f 100644 --- a/job/util.go +++ b/job/util.go @@ -15,8 +15,8 @@ type SlackConfig struct { BotToken string } -func transformUser(githubUser *github.User, db *sql.DB) (*models.User, error) { - exists, err := models.Users(qm.Where("github_id = ?", githubUser.GetID())).Exists(context.Background(), db) +func transformUser(context context.Context, tx *sql.Tx, githubUser *github.User) (*models.User, error) { + exists, err := models.Users(qm.Where("github_id = ?", githubUser.GetID())).Exists(context, tx) if err != nil { return nil, fmt.Errorf("Error searching for user %q [%v]: %v\n", githubUser.GetLogin(), githubUser.GetID(), err.Error()) } @@ -24,12 +24,12 @@ func transformUser(githubUser *github.User, db *sql.DB) (*models.User, error) { var user *models.User if !exists { user = &models.User{Username: githubUser.GetLogin(), GithubID: githubUser.GetID()} - err := user.Insert(context.Background(), db, boil.Infer()) + err := user.Insert(context, tx, boil.Infer()) if err != nil { return nil, fmt.Errorf("Error persisting user %q [%v]: %v\n", githubUser.GetLogin(), githubUser.GetID(), err.Error()) } } else { - user, err = models.Users(qm.Where("github_id = ?", githubUser.GetID())).One(context.Background(), db) + user, err = models.Users(qm.Where("github_id = ?", githubUser.GetID())).One(context, tx) if err != nil { return nil, fmt.Errorf("Error retrieving user %q [%v]: %v\n", githubUser.GetLogin(), githubUser.GetID(), err.Error()) } diff --git a/storage/storage.go b/storage/storage.go index 2c1df15..9e6b00b 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -61,13 +61,14 @@ func (ps *postgresStorage) Close() error { func New(cfg Config) (Storage, error) { db, err := sql.Open(cfg.Type, cfg.URI) if err != nil { - return nil, err + return nil, fmt.Errorf("unable to open db connection: %s", err) } err = updateSchema(db, cfg.Type) if err != nil { - return nil, err + return nil, fmt.Errorf("unable to update db schema: %s", err) } + log.Println("Database is up-to-date") return &postgresStorage{ From af947a28c8b5d00d02dcac6506070d159e42429c Mon Sep 17 00:00:00 2001 From: NickyMateev Date: Wed, 3 Apr 2019 22:24:13 +0300 Subject: [PATCH 6/6] Add postgres storage implementation tests --- Gopkg.lock | 9 ++ Gopkg.toml | 4 + job/pull_request_fetcher.go | 2 +- storage/storage_test.go | 174 ++++++++++++++++++++++++++++++++++++ 4 files changed, 188 insertions(+), 1 deletion(-) create mode 100644 storage/storage_test.go diff --git a/Gopkg.lock b/Gopkg.lock index 2aea459..3f7e670 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1,6 +1,14 @@ # This file is autogenerated, do not edit; changes may be undone by the next 'dep ensure'. +[[projects]] + digest = "1:c84a587136cb69cecc11f3dbe9f9001444044c0dba74997b07f7e4c150b07cda" + name = "github.com/DATA-DOG/go-sqlmock" + packages = ["."] + pruneopts = "UT" + revision = "3f9954f6f6697845b082ca57995849ddf614f450" + version = "v1.3.3" + [[projects]] digest = "1:1b487fd946176530c6c3669e55108e4ee45e1bd1ea0a630e28e3a173352d93e8" name = "github.com/ericlagergren/decimal" @@ -281,6 +289,7 @@ analyzer-name = "dep" analyzer-version = 1 input-imports = [ + "github.com/DATA-DOG/go-sqlmock", "github.com/golang-migrate/migrate", "github.com/golang-migrate/migrate/database/postgres", "github.com/golang-migrate/migrate/source/file", diff --git a/Gopkg.toml b/Gopkg.toml index d359906..d89c483 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -59,3 +59,7 @@ [[constraint]] name = "github.com/nlopes/slack" version = "0.5.0" + +[[constraint]] + name = "github.com/DATA-DOG/go-sqlmock" + version = "1.3.3" diff --git a/job/pull_request_fetcher.go b/job/pull_request_fetcher.go index 17ca26d..86477f3 100644 --- a/job/pull_request_fetcher.go +++ b/job/pull_request_fetcher.go @@ -124,7 +124,7 @@ func (prf *PullRequestFetcher) persistPullRequests(pullRequests []*github.PullRe }) if txErr != nil { - log.Printf("Unable to persist pull request %q: %s\n", pullRequest.Title, txErr) + log.Printf("Unable to persist pull request %q: %s\n", *pullRequest.Title, txErr) } } } diff --git a/storage/storage_test.go b/storage/storage_test.go new file mode 100644 index 0000000..67628b3 --- /dev/null +++ b/storage/storage_test.go @@ -0,0 +1,174 @@ +package storage + +import ( + "context" + "database/sql" + "fmt" + "github.com/DATA-DOG/go-sqlmock" + "log" + "testing" +) + +const ( + insertQueryShort = "INSERT INTO \"table\"" +) + +func TestGetStorage(t *testing.T) { + db, _, err := sqlmock.New() + if err != nil { + log.Panicf("an error '%s' was not expected when opening a stub database connection", err) + } + defer db.Close() + + storage := postgresStorage{db: db} + + if storage.Get() != db { + t.Fail() + } +} + +func TestTransactionSuccessful(t *testing.T) { + db, sqlMock, err := sqlmock.New() + if err != nil { + log.Panicf("an error '%s' was not expected when opening a stub database connection", err) + } + defer db.Close() + + storage := postgresStorage{db: db} + + sqlMock.ExpectBegin() + + var expectedLastInsertID int64 = 1 + var expectedLastRowsAffected int64 = 1 + expectedResult := sqlmock.NewResult(expectedLastInsertID, expectedLastRowsAffected) + sqlMock.ExpectExec(insertQueryShort).WithArgs("arg1", "arg2", "arg3").WillReturnResult(expectedResult) + + sqlMock.ExpectCommit() + + txErr := storage.Transaction(context.TODO(), func(context context.Context, tx *sql.Tx) error { + actualResult, err := tx.Exec(insertQueryShort, "arg1", "arg2", "arg3") + actualLastInsertID, _ := actualResult.LastInsertId() + actualRowsAffected, _ := actualResult.RowsAffected() + + if actualLastInsertID != expectedLastInsertID || actualRowsAffected != expectedLastRowsAffected { + t.Fail() + } + return err + }) + + if txErr != nil { + t.Fail() + } + + if err := sqlMock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled expectations: %s", err) + } +} + +func TestTransactionFailToBeginTransaction(t *testing.T) { + db, sqlMock, err := sqlmock.New() + if err != nil { + log.Panicf("an error '%s' was not expected when opening a stub database connection", err) + } + defer db.Close() + + sqlMock.ExpectBegin() + + storage := postgresStorage{db: db} + txErr := storage.Transaction(context.TODO(), func(context context.Context, tx *sql.Tx) error { + return nil + }) + + if txErr == nil { + t.Fail() + } + + if err := sqlMock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled expectations: %s", err) + } +} + +func TestTransactionSuccessfulRollback(t *testing.T) { + db, sqlMock, err := sqlmock.New() + if err != nil { + log.Panicf("an error '%s' was not expected when opening a stub database connection", err) + } + defer db.Close() + + sqlMock.ExpectBegin() + sqlMock.ExpectRollback() + + storage := postgresStorage{db: db} + expectedErr := fmt.Errorf("unexpected error") + txErr := storage.Transaction(context.TODO(), func(context context.Context, tx *sql.Tx) error { + return expectedErr + }) + + if txErr != expectedErr { + t.Fail() + } + + if err := sqlMock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled expectations: %s", err) + } +} + +func TestTransactionFailRollback(t *testing.T) { + db, sqlMock, err := sqlmock.New() + if err != nil { + log.Panicf("an error '%s' was not expected when opening a stub database connection", err) + } + defer db.Close() + + sqlMock.ExpectBegin() + + storage := postgresStorage{db: db} + expectedErr := fmt.Errorf("unexpected error") + txErr := storage.Transaction(context.TODO(), func(context context.Context, tx *sql.Tx) error { + return expectedErr + }) + + if txErr == nil || txErr == expectedErr { + t.Fail() + } + + if err := sqlMock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled expectations: %s", err) + } +} + +func TestTransactionFailToCommitTransaction(t *testing.T) { + db, sqlMock, err := sqlmock.New() + if err != nil { + log.Panicf("an error '%s' was not expected when opening a stub database connection", err) + } + defer db.Close() + + storage := postgresStorage{db: db} + txErr := storage.Transaction(context.TODO(), func(context context.Context, tx *sql.Tx) error { + return nil + }) + + if txErr == nil { + t.Fail() + } + + if err := sqlMock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled expectations: %s", err) + } +} + +func TestCloseStorage(t *testing.T) { + db, _, err := sqlmock.New() + if err != nil { + log.Panicf("an error '%s' was not expected when opening a stub database connection", err) + } + + storage := postgresStorage{db: db} + storage.Close() + + err = db.Ping() + if err == nil { + t.Fail() + } +}