Skip to content

Commit

Permalink
Fixes #29 Add ability to do longpoll/continuous changes requests
Browse files Browse the repository at this point in the history
  • Loading branch information
Traun Leyden committed Dec 22, 2016
1 parent c0f07a8 commit e691ce9
Show file tree
Hide file tree
Showing 10 changed files with 47 additions and 15 deletions.
4 changes: 4 additions & 0 deletions cmd/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ const (
NUM_UPDATERS_CMD_DEFAULT = 100
NUM_UPDATERS_CMD_DESC = "The number of unique users that will update documents. Each updater runs concurrently in it's own goroutine"

FEED_TYPE_CMD_NAME = "readerfeedtype"
FEED_TYPE_CMD_DEFAULT = "longpoll"
FEED_TYPE_CMD_DESC = "The changes feed type: normal or longpoll"

NUM_REVS_PER_DOC_CMD_NAME = "numrevsperdoc"
NUM_REVS_PER_DOC_CMD_DEFAULT = 5
NUM_REVS_PER_DOC_CMD_DESC = "The number of updates per doc (total revs will be numrevsperdoc * numrevsperupdate)"
Expand Down
8 changes: 8 additions & 0 deletions cmd/gateload.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ var (
glCreateWriters *bool
glNumRevsPerDoc *int
glNumUpdaters *int
glFeedType *string
)

var gateloadCmd = &cobra.Command{
Expand All @@ -41,6 +42,7 @@ var gateloadCmd = &cobra.Command{
NumChansPerReader: *glNumChansPerReader,
CreateReaders: *glCreateReaders,
NumRevGenerationsExpected: calcNumRevGenerationsExpected(),
FeedType: sgload.ChangesFeedType(*glFeedType),
}

updateLoadSpec := sgload.UpdateLoadSpec{
Expand Down Expand Up @@ -129,4 +131,10 @@ func init() {
NUM_UPDATERS_CMD_DESC,
)

glFeedType = gateloadCmd.PersistentFlags().String(
FEED_TYPE_CMD_NAME,
FEED_TYPE_CMD_DEFAULT,
FEED_TYPE_CMD_DESC,
)

}
8 changes: 8 additions & 0 deletions cmd/readload.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ var (
skipWriteload *bool
readLoadNumWriters *int
readLoadCreateWriters *bool
readLoadFeedType *string
logger log15.Logger
)

Expand All @@ -39,6 +40,7 @@ or it can pass a test session id from a previously run write load.`,
CreateReaders: *createReaders,
SkipWriteLoadSetup: *skipWriteload,
NumRevGenerationsExpected: 1, // Expect writer to add one rev
FeedType: sgload.ChangesFeedType(*readLoadFeedType),
}

logger.Info("Running readload scenario", "readLoadSpec", readLoadSpec)
Expand Down Expand Up @@ -127,4 +129,10 @@ func init() {
CREATE_WRITERS_CMD_DESC,
)

readLoadFeedType = readloadCmd.PersistentFlags().String(
FEED_TYPE_CMD_NAME,
FEED_TYPE_CMD_DEFAULT,
FEED_TYPE_CMD_DESC,
)

}
2 changes: 1 addition & 1 deletion sgload/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type DataStore interface {
SetUserCreds(u UserCred)

// Get all the changes since the since value
Changes(sinceVal Sincer, limit int) (changes sgreplicate.Changes, newSinceVal Sincer, err error)
Changes(sinceVal Sincer, limit int, feedType ChangesFeedType) (changes sgreplicate.Changes, newSinceVal Sincer, err error)

// Does a bulk get on docs in bulk get request, discards actual docs
BulkGetDocuments(sgreplicate.BulkGetRequest) ([]sgreplicate.Document, error)
Expand Down
2 changes: 1 addition & 1 deletion sgload/mockdatastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (m *MockDataStore) SetUserCreds(u UserCred) {
// ignore these
}

func (m MockDataStore) Changes(sinceVal Sincer, limit int) (changes sgreplicate.Changes, newSinceVal Sincer, err error) {
func (m MockDataStore) Changes(sinceVal Sincer, limit int, feedType ChangesFeedType) (changes sgreplicate.Changes, newSinceVal Sincer, err error) {
return sgreplicate.Changes{}, nil, nil
}

Expand Down
9 changes: 8 additions & 1 deletion sgload/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ type Reader struct {
NumRevGenerationsExpected int // The expected generate that each doc is expected to reach
BatchSize int // The number of docs to pull in batch (_changes feed and bulk_get)
lastNumRevs int
feedType ChangesFeedType // Whether to use "feedtype=normal" or "feedtype=longpoll"

}

func NewReader(agentSpec AgentSpec) *Reader {
Expand All @@ -24,6 +26,7 @@ func NewReader(agentSpec AgentSpec) *Reader {
AgentSpec: agentSpec,
},
NumRevGenerationsExpected: 1,
feedType: FEED_TYPE_LONGPOLL,
}

reader.setupExpVarStats(readersProgressStats)
Expand All @@ -32,6 +35,10 @@ func NewReader(agentSpec AgentSpec) *Reader {

}

func (r *Reader) SetFeedType(feedType ChangesFeedType) {
r.feedType = feedType
}

func (r *Reader) SetChannels(sgChannels []string) {
r.SGChannels = sgChannels
}
Expand Down Expand Up @@ -225,7 +232,7 @@ func (r *Reader) pullMoreDocs(since Sincer) (pullMoreDocsResult, error) {

result := pullMoreDocsResult{}

changes, newSince, err := r.DataStore.Changes(since, r.BatchSize)
changes, newSince, err := r.DataStore.Changes(since, r.BatchSize, r.feedType)
if err != nil {
return false, err, result
}
Expand Down
1 change: 1 addition & 0 deletions sgload/readload_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ func (rlr ReadLoadRunner) createReaders(wg *sync.WaitGroup) ([]*Reader, error) {
}

reader := NewReader(agentSpec)
reader.SetFeedType(rlr.ReadLoadSpec.FeedType)
reader.SetChannels(sgChannels)
reader.SetBatchSize(rlr.ReadLoadSpec.BatchSize)
reader.SetNumDocsExpected(numDocsExpectedPerReader)
Expand Down
4 changes: 3 additions & 1 deletion sgload/readload_spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ type ReadLoadSpec struct {
NumReaders int
NumChansPerReader int
NumRevGenerationsExpected int
SkipWriteLoadSetup bool // By default the readload scenario runs the writeload scenario first. If this is true, it will skip the writeload scenario.
SkipWriteLoadSetup bool // By default the readload scenario runs the writeload scenario first. If this is true, it will skip the writeload scenario.
FeedType ChangesFeedType // "Normal" or "Longpoll"

}

func (rls ReadLoadSpec) Validate() error {
Expand Down
4 changes: 2 additions & 2 deletions sgload/sg_datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,14 +200,14 @@ func (s SGDataStore) sgAdminURL() (string, error) {

}

func (s SGDataStore) Changes(sinceVal Sincer, limit int) (changes sgreplicate.Changes, newSinceVal Sincer, err error) {
func (s SGDataStore) Changes(sinceVal Sincer, limit int, feedType ChangesFeedType) (changes sgreplicate.Changes, newSinceVal Sincer, err error) {

changesFeedEndpoint, err := addEndpointToUrl(s.SyncGatewayUrl, "_changes")
if err != nil {
return sgreplicate.Changes{}, sinceVal, err
}

changesFeedParams := NewChangesFeedParams(sinceVal, limit)
changesFeedParams := NewChangesFeedParams(sinceVal, limit, feedType)

changesFeedUrl := fmt.Sprintf(
"%s?%s",
Expand Down
20 changes: 11 additions & 9 deletions sgload/sg_datastore_changes_feed_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,23 @@ import (
"strings"
)

const FEED_TYPE_LONGPOLL = "longpoll"
const FEED_TYPE_NORMAL = "normal"
type ChangesFeedType string

const FEED_TYPE_LONGPOLL = ChangesFeedType("longpoll")
const FEED_TYPE_NORMAL = ChangesFeedType("normal")

type ChangesFeedParams struct {
feedType string // eg, "normal" or "longpoll"
limit int // eg, 50
heartbeatTimeMillis int // eg, 300000
feedStyle string // eg, "all_docs"
since Sincer // eg, "3",
feedType ChangesFeedType // eg, "normal" or "longpoll"
limit int // eg, 50
heartbeatTimeMillis int // eg, 300000
feedStyle string // eg, "all_docs"
since Sincer // eg, "3",
channels []string
}

func NewChangesFeedParams(sinceVal Sincer, limit int) *ChangesFeedParams {
func NewChangesFeedParams(sinceVal Sincer, limit int, feedType ChangesFeedType) *ChangesFeedParams {
return &ChangesFeedParams{
feedType: FEED_TYPE_NORMAL,
feedType: feedType,
limit: limit,
heartbeatTimeMillis: 30 * 1000,
feedStyle: "all_docs",
Expand Down

0 comments on commit e691ce9

Please sign in to comment.