Skip to content

Commit

Permalink
creator use MessageQueue interface and start tests
Browse files Browse the repository at this point in the history
  • Loading branch information
evermax committed Jun 10, 2016
1 parent a860536 commit a3e6e46
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 16 deletions.
4 changes: 3 additions & 1 deletion lib/mq/imq.go
Expand Up @@ -7,6 +7,8 @@ package mq
type MessageQueue interface {
DeclareQueue(string) error
Publish(string, []byte) error
// Consume should block until the underlying connection to is closed
// or the Receiver function send a bool to the channel it's passed
Consume(string, Receiver) error
}

Expand All @@ -17,7 +19,7 @@ type MessageQueue interface {
// acknowledge or non acknowledge the message for it to be
// requeued or not.
// The boolean channel can make the message queue stop consuming the queue.
// If so, it will be needed to call Consume again.
// If so, it will be needed to call Consume again, because the function will return.
type Receiver func(Delivery, chan bool)

// Delivery interface provide a wrapper for the message and acknowledgment
Expand Down
12 changes: 8 additions & 4 deletions service/creator/creator.go
Expand Up @@ -28,7 +28,6 @@ type Creator struct {

// NewCreator creates a new creator
func NewCreator(db store.Store, queue mq.MessageQueue) Creator {
// connect to the AMQP server
return Creator{
t: service.CreatorName,
db: db,
Expand All @@ -52,7 +51,8 @@ func (c Creator) Run() error {
}

func (c Creator) receiveMessage(d mq.Delivery, forever chan bool) {
log.Printf("Received a message: %s", d.Body)
body := d.Body()
log.Printf("Received a message: %s", body)

err := c.creatorWork(d.Body())
if err == store.ErrAlreadyExist {
Expand All @@ -77,13 +77,17 @@ func (c Creator) creatorWork(body []byte) error {
}

repoInfo := github.RepoInfo{
WorkedOn: true,
ID: apiJob.RepoInfo.ID,
Name: apiJob.RepoInfo.Name,
Count: apiJob.RepoInfo.Count,
CreationDate: apiJob.RepoInfo.CreationDate,
WorkedOn: true,
}

// Create the repository on the store, claim the work
key, err := c.db.AddRepo(repoInfo)
if err != nil {
return fmt.Errorf("Adding to store error with %s: %v", body, err)
return err
}

timestamps, err := GetAllTimestamps(c.jobQueue, 100, apiJob.Token, repoInfo)
Expand Down
167 changes: 167 additions & 0 deletions service/creator/creator_test.go
Expand Up @@ -9,6 +9,8 @@ import (
"testing"

"github.com/evermax/stargraph/github"
"github.com/evermax/stargraph/lib/mq"
"github.com/evermax/stargraph/lib/store"
"github.com/evermax/stargraph/service"
)

Expand All @@ -25,6 +27,54 @@ func (info mockRepoInfo) URL() string {
return info.url
}

func TestCreatorWorkNonJSONMessage(t *testing.T) {
var db = storedb{}
var d = &delvry{body: []byte("Hello, world")}
var q = &msgq{delivery: d}
var creator = NewCreator(db, q)
creator.Run()

if !d.nack {
t.Fatalf("Nack should have been called on the delivery")
}

if d.ack {
t.Fatal("Ack shouldn't have been called on the delivery")
}
}

/*func TestCreatorWorkNonAPIJobMessage(t *testing.T) {
var db = storedb{}
var d = &delvry{body: []byte("{\"test\": \"test\"}")}
var q = &msgq{delivery: d}
var creator = NewCreator(db, q)
creator.Run()
if !d.nack {
t.Fatalf("Nack should have been called on the delivery")
}
if d.ack {
t.Fatal("Ack shouldn't have been called on the delivery")
}
}
func TestCreatorWorkRepoAlreadyExist(t *testing.T) {
var db = storedb{}
var d = &delvry{body: []byte("{\"name\": \"evermax/stargraph\"}")}
var q = &msgq{delivery: d}
var creator = NewCreator(db, q)
creator.Run()
if !d.ack {
t.Fatal("Ack should have been called on the delivery")
}
if d.nack {
t.Fatalf("Nack shouldn't have been called on the delivery")
}
}*/

func TestGetAllTimestamps(t *testing.T) {
filePathFormat := "testdata/distributed_stars_%d.json"
expectedTimestamps := 16
Expand Down Expand Up @@ -82,3 +132,120 @@ func TestGetAllTimestamps(t *testing.T) {
}
dispatch.Stop()
}

type storedb struct {
addRepoFail bool
getRepoFail bool
putRepoFail bool
claimWorkFail bool
exist bool
}

var ErrNoName error

func (db storedb) AddRepo(repo github.RepoInfo) (store.ID, error) {
if repo.Name == "evermax/stargraph" {
return iD{}, store.ErrAlreadyExist
}
if repo.Name == "" {
return iD{}, ErrNoName
}
if repo.ID == 0 {
return iD{}, ErrNoName
}
if db.addRepoFail {
return iD{}, fmt.Errorf("Random Error")
}
return iD{}, nil
}

func (db storedb) GetRepo(repo string) (github.RepoInfo, store.ID, error) {
if db.getRepoFail {
return github.RepoInfo{}, iD{}, fmt.Errorf("Random Error")
}
repoInfo := github.RepoInfo{}
(&repoInfo).SetExist(db.exist)
return repoInfo, iD{}, nil
}

func (db storedb) PutRepo(repo github.RepoInfo, id store.ID) error {
if db.putRepoFail {
return fmt.Errorf("Random Error")
}
return nil
}

func (db storedb) ClaimWork(repo github.RepoInfo, id store.ID) error {
if db.claimWorkFail {
return fmt.Errorf("Random Error")
}
return nil
}

type iD struct{}

func (id iD) Test() string {
return ""
}

type msgq struct {
addQueue string
updateQueue string
addJobTriggered int
updateJobTriggered int
token string
delivery *delvry
}

func (q *msgq) DeclareQueue(name string) error {
if name == "" {
return fmt.Errorf("Name is null")
}
if name == "add" {
q.addQueue = name
}
if name == "update" {
q.updateQueue = name
}
return nil
}

func (q *msgq) Publish(name string, body []byte) error {
if q.addQueue != name && q.updateQueue != name {
// TODO check if indeed an non existing queue would indeed trigger error
return fmt.Errorf("No such Q %s", name)
}
if name == "add" {
q.addJobTriggered++
}
if name == "update" {
q.updateJobTriggered++
}
return nil
}

func (q *msgq) Consume(name string, r mq.Receiver) error {
var forever chan bool
r(q.delivery, forever)

return nil
}

type delvry struct {
body []byte
ack bool
nack bool
}

func (d *delvry) Body() []byte {
return d.body
}

func (d *delvry) Ack(multiple bool) error {
d.ack = true
return nil
}
func (d *delvry) Nack(multiple, requeue bool) error {
d.nack = true
return nil
}
11 changes: 2 additions & 9 deletions service/service.go
Expand Up @@ -16,7 +16,7 @@ const (
// of the service being one or the other
type SWorker interface {
Type() string
Run(string, string)
Run()
JobQueue() chan Job
}

Expand All @@ -30,14 +30,7 @@ func NewService(workers []SWorker, amqpURL, creatorQueue, updatorQueue, address
dispatcher := NewDispatcher(maxWorker, jobQSize)
defer dispatcher.Stop()
for _, worker := range workers {
var qName string
switch worker.Type() {
case CreatorName:
qName = creatorQueue
case UpdatorName:
qName = updatorQueue
}
go worker.Run(amqpURL, qName)
go worker.Run()
}
// Start HTTP server here for status check and the one for heartbeat
service := Service{
Expand Down
3 changes: 1 addition & 2 deletions service/update/updator.go
Expand Up @@ -19,11 +19,10 @@ func (u Updator) Type() string {

// Run create a connection to the AMQP server and listen to incoming requests
// To update Github repository graphs.
func (u Updator) Run(jobQueue chan service.Job, amqpURL, addQueueN string) error {
func (u Updator) Run() {
// Get all the timestamps from the database
// Get the number of pages and the repo info from Github
// Compare and go back until it is alright
// Update the DB.
// Set it to not worked on anymore
return nil
}

0 comments on commit a3e6e46

Please sign in to comment.