diff --git a/lib/mq/imq.go b/lib/mq/imq.go index c1acb35..15ef5de 100644 --- a/lib/mq/imq.go +++ b/lib/mq/imq.go @@ -7,6 +7,8 @@ package mq type MessageQueue interface { DeclareQueue(string) error Publish(string, []byte) error + // Consume should block until the underlying connection to is closed + // or the Receiver function send a bool to the channel it's passed Consume(string, Receiver) error } @@ -17,7 +19,7 @@ type MessageQueue interface { // acknowledge or non acknowledge the message for it to be // requeued or not. // The boolean channel can make the message queue stop consuming the queue. -// If so, it will be needed to call Consume again. +// If so, it will be needed to call Consume again, because the function will return. type Receiver func(Delivery, chan bool) // Delivery interface provide a wrapper for the message and acknowledgment diff --git a/service/creator/creator.go b/service/creator/creator.go index 3aa91c6..b85b4aa 100644 --- a/service/creator/creator.go +++ b/service/creator/creator.go @@ -28,7 +28,6 @@ type Creator struct { // NewCreator creates a new creator func NewCreator(db store.Store, queue mq.MessageQueue) Creator { - // connect to the AMQP server return Creator{ t: service.CreatorName, db: db, @@ -52,7 +51,8 @@ func (c Creator) Run() error { } func (c Creator) receiveMessage(d mq.Delivery, forever chan bool) { - log.Printf("Received a message: %s", d.Body) + body := d.Body() + log.Printf("Received a message: %s", body) err := c.creatorWork(d.Body()) if err == store.ErrAlreadyExist { @@ -77,13 +77,17 @@ func (c Creator) creatorWork(body []byte) error { } repoInfo := github.RepoInfo{ - WorkedOn: true, + ID: apiJob.RepoInfo.ID, + Name: apiJob.RepoInfo.Name, + Count: apiJob.RepoInfo.Count, + CreationDate: apiJob.RepoInfo.CreationDate, + WorkedOn: true, } // Create the repository on the store, claim the work key, err := c.db.AddRepo(repoInfo) if err != nil { - return fmt.Errorf("Adding to store error with %s: %v", body, err) + return err } timestamps, err := GetAllTimestamps(c.jobQueue, 100, apiJob.Token, repoInfo) diff --git a/service/creator/creator_test.go b/service/creator/creator_test.go index 14a7f2f..e0ff2d6 100644 --- a/service/creator/creator_test.go +++ b/service/creator/creator_test.go @@ -9,6 +9,8 @@ import ( "testing" "github.com/evermax/stargraph/github" + "github.com/evermax/stargraph/lib/mq" + "github.com/evermax/stargraph/lib/store" "github.com/evermax/stargraph/service" ) @@ -25,6 +27,54 @@ func (info mockRepoInfo) URL() string { return info.url } +func TestCreatorWorkNonJSONMessage(t *testing.T) { + var db = storedb{} + var d = &delvry{body: []byte("Hello, world")} + var q = &msgq{delivery: d} + var creator = NewCreator(db, q) + creator.Run() + + if !d.nack { + t.Fatalf("Nack should have been called on the delivery") + } + + if d.ack { + t.Fatal("Ack shouldn't have been called on the delivery") + } +} + +/*func TestCreatorWorkNonAPIJobMessage(t *testing.T) { + var db = storedb{} + var d = &delvry{body: []byte("{\"test\": \"test\"}")} + var q = &msgq{delivery: d} + var creator = NewCreator(db, q) + creator.Run() + + if !d.nack { + t.Fatalf("Nack should have been called on the delivery") + } + + if d.ack { + t.Fatal("Ack shouldn't have been called on the delivery") + } +} + +func TestCreatorWorkRepoAlreadyExist(t *testing.T) { + var db = storedb{} + var d = &delvry{body: []byte("{\"name\": \"evermax/stargraph\"}")} + var q = &msgq{delivery: d} + var creator = NewCreator(db, q) + creator.Run() + + if !d.ack { + t.Fatal("Ack should have been called on the delivery") + } + if d.nack { + t.Fatalf("Nack shouldn't have been called on the delivery") + } + +}*/ + func TestGetAllTimestamps(t *testing.T) { filePathFormat := "testdata/distributed_stars_%d.json" expectedTimestamps := 16 @@ -82,3 +132,120 @@ func TestGetAllTimestamps(t *testing.T) { } dispatch.Stop() } + +type storedb struct { + addRepoFail bool + getRepoFail bool + putRepoFail bool + claimWorkFail bool + exist bool +} + +var ErrNoName error + +func (db storedb) AddRepo(repo github.RepoInfo) (store.ID, error) { + if repo.Name == "evermax/stargraph" { + return iD{}, store.ErrAlreadyExist + } + if repo.Name == "" { + return iD{}, ErrNoName + } + if repo.ID == 0 { + return iD{}, ErrNoName + } + if db.addRepoFail { + return iD{}, fmt.Errorf("Random Error") + } + return iD{}, nil +} + +func (db storedb) GetRepo(repo string) (github.RepoInfo, store.ID, error) { + if db.getRepoFail { + return github.RepoInfo{}, iD{}, fmt.Errorf("Random Error") + } + repoInfo := github.RepoInfo{} + (&repoInfo).SetExist(db.exist) + return repoInfo, iD{}, nil +} + +func (db storedb) PutRepo(repo github.RepoInfo, id store.ID) error { + if db.putRepoFail { + return fmt.Errorf("Random Error") + } + return nil +} + +func (db storedb) ClaimWork(repo github.RepoInfo, id store.ID) error { + if db.claimWorkFail { + return fmt.Errorf("Random Error") + } + return nil +} + +type iD struct{} + +func (id iD) Test() string { + return "" +} + +type msgq struct { + addQueue string + updateQueue string + addJobTriggered int + updateJobTriggered int + token string + delivery *delvry +} + +func (q *msgq) DeclareQueue(name string) error { + if name == "" { + return fmt.Errorf("Name is null") + } + if name == "add" { + q.addQueue = name + } + if name == "update" { + q.updateQueue = name + } + return nil +} + +func (q *msgq) Publish(name string, body []byte) error { + if q.addQueue != name && q.updateQueue != name { + // TODO check if indeed an non existing queue would indeed trigger error + return fmt.Errorf("No such Q %s", name) + } + if name == "add" { + q.addJobTriggered++ + } + if name == "update" { + q.updateJobTriggered++ + } + return nil +} + +func (q *msgq) Consume(name string, r mq.Receiver) error { + var forever chan bool + r(q.delivery, forever) + + return nil +} + +type delvry struct { + body []byte + ack bool + nack bool +} + +func (d *delvry) Body() []byte { + return d.body +} + +func (d *delvry) Ack(multiple bool) error { + d.ack = true + return nil +} +func (d *delvry) Nack(multiple, requeue bool) error { + d.nack = true + return nil +} diff --git a/service/service.go b/service/service.go index cab88ad..241d5e6 100644 --- a/service/service.go +++ b/service/service.go @@ -16,7 +16,7 @@ const ( // of the service being one or the other type SWorker interface { Type() string - Run(string, string) + Run() JobQueue() chan Job } @@ -30,14 +30,7 @@ func NewService(workers []SWorker, amqpURL, creatorQueue, updatorQueue, address dispatcher := NewDispatcher(maxWorker, jobQSize) defer dispatcher.Stop() for _, worker := range workers { - var qName string - switch worker.Type() { - case CreatorName: - qName = creatorQueue - case UpdatorName: - qName = updatorQueue - } - go worker.Run(amqpURL, qName) + go worker.Run() } // Start HTTP server here for status check and the one for heartbeat service := Service{ diff --git a/service/update/updator.go b/service/update/updator.go index 1c64be3..76fb23b 100644 --- a/service/update/updator.go +++ b/service/update/updator.go @@ -19,11 +19,10 @@ func (u Updator) Type() string { // Run create a connection to the AMQP server and listen to incoming requests // To update Github repository graphs. -func (u Updator) Run(jobQueue chan service.Job, amqpURL, addQueueN string) error { +func (u Updator) Run() { // Get all the timestamps from the database // Get the number of pages and the repo info from Github // Compare and go back until it is alright // Update the DB. // Set it to not worked on anymore - return nil }