From 74d669b92b314443c4e4d068ebafbe54fbb2b562 Mon Sep 17 00:00:00 2001 From: Matt Silverlock Date: Wed, 14 Feb 2018 11:21:15 -0800 Subject: [PATCH] [build] Fix line endings; Travis config. --- .travis.yml | 8 +- LICENSE | 2 +- aggregate.go | 206 +++++++++---------- firestore_store.go | 242 +++++++++++----------- search.go | 496 ++++++++++++++++++++++----------------------- server.go | 9 + 6 files changed, 486 insertions(+), 477 deletions(-) diff --git a/.travis.yml b/.travis.yml index bbd4d77..8df9aa4 100644 --- a/.travis.yml +++ b/.travis.yml @@ -12,6 +12,7 @@ matrix: before_install: - go get github.com/mitchellh/gox + install: - # skip @@ -20,8 +21,7 @@ script: - diff -u <(echo -n) <(gofmt -d .) - go vet $(go list ./... | grep -v /vendor/) - go test -v -race ./... - - if [ "${LATEST}" = "true" ]; then gox -os="linux darwin" -arch="amd64" -output="centimentd.." - -ldflags "-X main.Rev=`git rev-parse --short HEAD`" -verbose ./...; fi + - if [ "${LATEST}" = "true" ]; then gox -os="linux darwin" -arch="amd64" -output="centimentd.{{.OS}}.{{.Arch}}" -ldflags "-X main.Rev=`git rev-parse --short HEAD`" -verbose ./...; fi deploy: provider: releases @@ -29,8 +29,8 @@ deploy: api_key: secure: i3RaRoIL+BYxBHemfLF4m8yMtGn7W/r12+jXyJlLCOvTV22Z2bZLXGeyxnHz/gj4W9xN5g/nyPLOIY9ak1/IsYajKQXZRmmDljLyLQnCh7DYGvWfLxDnsE0C5BUyTtn5a2c8hK+9kudpK77OMYwuw1HMfRt/JyYo/i8RHdaK3TzMCvp+c+58DhwpJm5YU+8iO3LKwb0lQGaF5JJBNJd1nR3K8L/tkWC9oZ7mv5528RhxtpIYgU4Lsk70RpvcwgSYcjwIb0M4wZgNlXFCuO2WDb3TY7CVzO6hoUymE65rW2iTQrc4EP7U4ulxjUgF6QcLblyQZ8Oz+kzJf/D4BkdNg4pu6MrQLmeMt7DEjliafMqPgWJmKBch/5W0BSkIp5ksDFNHck3qIK79KcCsCbnBYXyOYYfA+Pl6uZXgpeZ+ti7N/Z+eKrCT2FemSkxEzbQ65bWzINC+26Hy6/tpNzzWvmKgM91DvNi3axEn+IuvzzTfKohSvd03Ti3zHBOUwxkG6Tu7VNAHODghbIiyGCOFikcvRyGjYDzLIE8sdgG9WfpNSg9lDCoY+dj2uc73v5Y8NnGp9pfot3A8dBYwXeVcftrwTYVuZ0o3Kd7lHMgqcNtQCNn9eoy9xsQGOC11Pgi7vXGSwEQOvxebfUksseevdMceO4+ZgD4gxt5e92saFjc= file: - - centimentd.darwin.amd64 - - centimentd.linux.amd64 + - "centimentd.darwin.amd64" + - "centimentd.linux.amd64" on: repo: elithrar/centiment tags: true diff --git a/LICENSE b/LICENSE index d535c5a..cc8aab6 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ BSD 3-Clause License -Copyright (c) 2017, Matt Silverlock +Copyright (c) 2017-2018, Matt Silverlock All rights reserved. Redistribution and use in source and binary forms, with or without diff --git a/aggregate.go b/aggregate.go index da5a351..4264fd0 100644 --- a/aggregate.go +++ b/aggregate.go @@ -1,103 +1,103 @@ -package centiment - -import ( - "context" - - "github.com/go-kit/kit/log" - "github.com/pkg/errors" -) - -// Aggregator aggregates results from an analysis run. -type Aggregator struct { - logger log.Logger - db DB -} - -// NewAggregator creates a new Aggregator: call Run to collect results and save -// them to the given DB. -func NewAggregator(logger log.Logger, db DB) (*Aggregator, error) { - agg := &Aggregator{ - db: db, - logger: logger, - } - - return agg, nil -} - -// Run an aggregatation on the provided results. -func (ag *Aggregator) Run(ctx context.Context, results <-chan *AnalyzerResult) error { - var sentiments = make(map[string]*Sentiment) - - // TODO(matt): Handle cancellation. Use for-select here with two cases. - for res := range results { - topic := res.SearchTerm.Topic - if sentiments[topic] == nil { - sentiments[topic] = &Sentiment{} - } - - // Update the rolling aggregate for each topic. - sentiments[topic] = ag.updateAggregate( - res.Score, - res.Magnitude, - res.TweetID, - sentiments[topic], - ) - - sentiments[topic].populateWithSearch(res.SearchTerm) - } - - for topic, sentiment := range sentiments { - sentiment.finalize() - id, err := ag.db.SaveSentiment(ctx, *sentiment) - if err != nil { - // TODO(matt): Implement retry logic w/ back-off. - ag.logger.Log( - "err", errors.Wrap(err, "failed to save topic"), - "topic", topic, - ) - continue - } - - ag.logger.Log( - "state", "saved", - "topic", sentiment.Topic, - "slug", sentiment.Slug, - "id", id, - "score", sentiment.Score, - "count", sentiment.Count, - "stddev", sentiment.StdDev, - "variance", sentiment.Variance, - ) - } - - return nil -} - -func (ag *Aggregator) updateAggregate(score float32, magnitude float32, tweetID int64, sentiment *Sentiment) *Sentiment { - sentiment.Count++ - oldAverage := sentiment.Score - sentiment.Score = updateAverage(score, sentiment.Score, sentiment.Count) - sentiment.Variance = updateVariance( - score, - sentiment.Variance, - oldAverage, - sentiment.Score, - sentiment.Count, - ) - - // Record the largest (newest) Tweet ID we've seen across our results for this - // topic, as the checkpoint for future searches. - if tweetID > sentiment.LastSeenID { - sentiment.LastSeenID = tweetID - } - - return sentiment -} - -func updateAverage(value float32, currentAverage float64, count int64) float64 { - return currentAverage + ((float64(value) - currentAverage) / float64(count)) -} - -func updateVariance(value float32, variance float64, oldAverage float64, newAverage float64, count int64) float64 { - return variance + (float64(value)-oldAverage)*(float64(value)-newAverage) -} +package centiment + +import ( + "context" + + "github.com/go-kit/kit/log" + "github.com/pkg/errors" +) + +// Aggregator aggregates results from an analysis run. +type Aggregator struct { + logger log.Logger + db DB +} + +// NewAggregator creates a new Aggregator: call Run to collect results and save +// them to the given DB. +func NewAggregator(logger log.Logger, db DB) (*Aggregator, error) { + agg := &Aggregator{ + db: db, + logger: logger, + } + + return agg, nil +} + +// Run an aggregatation on the provided results. +func (ag *Aggregator) Run(ctx context.Context, results <-chan *AnalyzerResult) error { + var sentiments = make(map[string]*Sentiment) + + // TODO(matt): Handle cancellation. Use for-select here with two cases. + for res := range results { + topic := res.SearchTerm.Topic + if sentiments[topic] == nil { + sentiments[topic] = &Sentiment{} + } + + // Update the rolling aggregate for each topic. + sentiments[topic] = ag.updateAggregate( + res.Score, + res.Magnitude, + res.TweetID, + sentiments[topic], + ) + + sentiments[topic].populateWithSearch(res.SearchTerm) + } + + for topic, sentiment := range sentiments { + sentiment.finalize() + id, err := ag.db.SaveSentiment(ctx, *sentiment) + if err != nil { + // TODO(matt): Implement retry logic w/ back-off. + ag.logger.Log( + "err", errors.Wrap(err, "failed to save topic"), + "topic", topic, + ) + continue + } + + ag.logger.Log( + "state", "saved", + "topic", sentiment.Topic, + "slug", sentiment.Slug, + "id", id, + "score", sentiment.Score, + "count", sentiment.Count, + "stddev", sentiment.StdDev, + "variance", sentiment.Variance, + ) + } + + return nil +} + +func (ag *Aggregator) updateAggregate(score float32, magnitude float32, tweetID int64, sentiment *Sentiment) *Sentiment { + sentiment.Count++ + oldAverage := sentiment.Score + sentiment.Score = updateAverage(score, sentiment.Score, sentiment.Count) + sentiment.Variance = updateVariance( + score, + sentiment.Variance, + oldAverage, + sentiment.Score, + sentiment.Count, + ) + + // Record the largest (newest) Tweet ID we've seen across our results for this + // topic, as the checkpoint for future searches. + if tweetID > sentiment.LastSeenID { + sentiment.LastSeenID = tweetID + } + + return sentiment +} + +func updateAverage(value float32, currentAverage float64, count int64) float64 { + return currentAverage + ((float64(value) - currentAverage) / float64(count)) +} + +func updateVariance(value float32, variance float64, oldAverage float64, newAverage float64, count int64) float64 { + return variance + (float64(value)-oldAverage)*(float64(value)-newAverage) +} diff --git a/firestore_store.go b/firestore_store.go index 19b44b8..cf9aa41 100644 --- a/firestore_store.go +++ b/firestore_store.go @@ -1,121 +1,121 @@ -package centiment - -import ( - "context" - - "github.com/gosimple/slug" - - "cloud.google.com/go/firestore" - "github.com/pkg/errors" - "google.golang.org/api/iterator" -) - -// SaveSentiment saves a Sentiment to the datastore, and returns generated ID of -// the new record. -func (fs *Firestore) SaveSentiment(ctx context.Context, sentiment Sentiment) (string, error) { - var id string - - collection := fs.Store.Collection(fs.CollectionName) - ref, _, err := collection.Add(ctx, sentiment) - if err != nil { - return id, errors.Wrapf(err, "failed to save sentiment (%#v)", sentiment) - } - - return ref.ID, nil -} - -// GetSentimentsByTopic fetches all historical sentiments for the given topic, -// up to limit records. Providing a limit of 0 (or less) will -// fetch all records. Records are ordered from most recent to least recent. -// -// An error (ErrNoResultsFound) will be returned if no records were found. -func (fs *Firestore) GetSentimentsByTopic(ctx context.Context, topic string, limit int) ([]*Sentiment, error) { - return fs.getSentimentsByField(ctx, "topic", topic, limit) -} - -// GetSentimentsBySlug fetches all historical sentiments for the given slug -// ("slugified" topic name) up to limit records. Providing a limit of 0 (or -// less) will fetch all records. Records are ordered from most recent to least -// recent. -// -// An error (ErrNoResultsFound) will be returned if no records were found. -func (fs *Firestore) GetSentimentsBySlug(ctx context.Context, topicSlug string, limit int) ([]*Sentiment, error) { - if !slug.IsSlug(topicSlug) { - return nil, errors.Wrapf(ErrInvalidSlug, "%s is not a valid URL slug", topicSlug) - } - return fs.getSentimentsByField(ctx, "slug", topicSlug, limit) -} - -// getSentimentsByField returns a slice of Sentiments where field == name, ordered by the most recent timestamp up to limit. -// -// An error (ErrNoResultsFound) will be returned if no records were found. -func (fs *Firestore) getSentimentsByField(ctx context.Context, field string, name string, limit int) ([]*Sentiment, error) { - collection := fs.Store.Collection(fs.CollectionName) - query := collection.Where(field, "==", name).OrderBy("fetchedAt", firestore.Desc) - - if limit > 0 { - query = query.Limit(limit) - } - - iter := query.Documents(ctx) - sentiments := make([]*Sentiment, 0, limit) - - // Fetch all Sentiments, marshalling them and appending to our - // result slice. - for { - doc, err := iter.Next() - - if err == iterator.Done { - break - } - - if err != nil { - return nil, err - } - - var result *Sentiment - if err := doc.DataTo(&result); err != nil { - return nil, err - } - - result.ID = doc.Ref.ID - sentiments = append(sentiments, result) - } - - if len(sentiments) == 0 { - return nil, ErrNoResultsFound - } - - return sentiments, nil -} - -// GetSentimentByID fetches an existing Sentiment by its ID. It will return a -// nil value and no error if no record was found. -func (fs *Firestore) GetSentimentByID(ctx context.Context, id string) (*Sentiment, error) { - collection := fs.Store.Collection(fs.CollectionName) - iter := collection.Where(firestore.DocumentID, "==", id).Limit(1).Documents(ctx) - - var sentiment *Sentiment - - // Confirm we have the correct document by ID. - for { - doc, err := iter.Next() - if doc.Ref.ID == id { - if err := doc.DataTo(sentiment); err != nil { - return nil, err - } - - break - } - - if err == iterator.Done { - break - } - - if err != nil { - return nil, err - } - } - - return sentiment, nil -} +package centiment + +import ( + "context" + + "github.com/gosimple/slug" + + "cloud.google.com/go/firestore" + "github.com/pkg/errors" + "google.golang.org/api/iterator" +) + +// SaveSentiment saves a Sentiment to the datastore, and returns generated ID of +// the new record. +func (fs *Firestore) SaveSentiment(ctx context.Context, sentiment Sentiment) (string, error) { + var id string + + collection := fs.Store.Collection(fs.CollectionName) + ref, _, err := collection.Add(ctx, sentiment) + if err != nil { + return id, errors.Wrapf(err, "failed to save sentiment (%#v)", sentiment) + } + + return ref.ID, nil +} + +// GetSentimentsByTopic fetches all historical sentiments for the given topic, +// up to limit records. Providing a limit of 0 (or less) will +// fetch all records. Records are ordered from most recent to least recent. +// +// An error (ErrNoResultsFound) will be returned if no records were found. +func (fs *Firestore) GetSentimentsByTopic(ctx context.Context, topic string, limit int) ([]*Sentiment, error) { + return fs.getSentimentsByField(ctx, "topic", topic, limit) +} + +// GetSentimentsBySlug fetches all historical sentiments for the given slug +// ("slugified" topic name) up to limit records. Providing a limit of 0 (or +// less) will fetch all records. Records are ordered from most recent to least +// recent. +// +// An error (ErrNoResultsFound) will be returned if no records were found. +func (fs *Firestore) GetSentimentsBySlug(ctx context.Context, topicSlug string, limit int) ([]*Sentiment, error) { + if !slug.IsSlug(topicSlug) { + return nil, errors.Wrapf(ErrInvalidSlug, "%s is not a valid URL slug", topicSlug) + } + return fs.getSentimentsByField(ctx, "slug", topicSlug, limit) +} + +// getSentimentsByField returns a slice of Sentiments where field == name, ordered by the most recent timestamp up to limit. +// +// An error (ErrNoResultsFound) will be returned if no records were found. +func (fs *Firestore) getSentimentsByField(ctx context.Context, field string, name string, limit int) ([]*Sentiment, error) { + collection := fs.Store.Collection(fs.CollectionName) + query := collection.Where(field, "==", name).OrderBy("fetchedAt", firestore.Desc) + + if limit > 0 { + query = query.Limit(limit) + } + + iter := query.Documents(ctx) + sentiments := make([]*Sentiment, 0, limit) + + // Fetch all Sentiments, marshalling them and appending to our + // result slice. + for { + doc, err := iter.Next() + + if err == iterator.Done { + break + } + + if err != nil { + return nil, err + } + + var result *Sentiment + if err := doc.DataTo(&result); err != nil { + return nil, err + } + + result.ID = doc.Ref.ID + sentiments = append(sentiments, result) + } + + if len(sentiments) == 0 { + return nil, ErrNoResultsFound + } + + return sentiments, nil +} + +// GetSentimentByID fetches an existing Sentiment by its ID. It will return a +// nil value and no error if no record was found. +func (fs *Firestore) GetSentimentByID(ctx context.Context, id string) (*Sentiment, error) { + collection := fs.Store.Collection(fs.CollectionName) + iter := collection.Where(firestore.DocumentID, "==", id).Limit(1).Documents(ctx) + + var sentiment *Sentiment + + // Confirm we have the correct document by ID. + for { + doc, err := iter.Next() + if doc.Ref.ID == id { + if err := doc.DataTo(sentiment); err != nil { + return nil, err + } + + break + } + + if err == iterator.Done { + break + } + + if err != nil { + return nil, err + } + } + + return sentiment, nil +} diff --git a/search.go b/search.go index c1a0ce0..8c9703a 100644 --- a/search.go +++ b/search.go @@ -1,248 +1,248 @@ -package centiment - -import ( - "context" - "math" - "net/http" - "net/url" - "strconv" - "strings" - "sync" - "time" - - "github.com/ChimeraCoder/anaconda" - "github.com/go-kit/kit/log" - "github.com/gosimple/slug" - "github.com/pkg/errors" - "golang.org/x/text/unicode/norm" - languagepb "google.golang.org/genproto/googleapis/cloud/language/v1" -) - -// SearchTerm represents the term or phrase to search for a given topic. -type SearchTerm struct { - // The human-readable topic of the search. - Topic string - // The Twitter search query - // Ref: https://developer.twitter.com/en/docs/tweets/search/guides/standard-operators - Query string -} - -func (st *SearchTerm) buildQuery() string { - st.Query = strings.TrimSpace(st.Query) - return st.Query -} - -// SearchResult represents the result of a search against Twitter, and -// encapsulates a Tweet. -type SearchResult struct { - searchTerm *SearchTerm - tweetID int64 - retweet bool - content string -} - -// sentimentRequest prepares a SearchResult for sentiment analysis. -func (s *SearchResult) sentimentRequest() *languagepb.AnalyzeSentimentRequest { - return &languagepb.AnalyzeSentimentRequest{ - Document: &languagepb.Document{ - Source: &languagepb.Document_Content{ - Content: string(norm.NFC.Bytes([]byte(s.content))), - }, - Type: languagepb.Document_PLAIN_TEXT, - Language: "en", - }, - } -} - -// Searcher is a worker pool that searches Twitter for the given -// set of search terms. Call NewSearcher to configure a new pool. -// Pools are safe to use concurrently. -// -// The "Run" method on Searcher should be used to begin a search. -type Searcher struct { - twitterClient *anaconda.TwitterApi - db DB - httpClient *http.Client - logger log.Logger - wg sync.WaitGroup - searchTerms []*SearchTerm - minResults int - maxAge time.Duration -} - -// NewSearcher creates a new Searcher with the given search terms. It will attempt to fetch minResults per search term and return tweets newer than maxAge. -func NewSearcher(logger log.Logger, terms []*SearchTerm, minResults int, maxAge time.Duration, client *anaconda.TwitterApi, db DB) (*Searcher, error) { - if terms == nil || len(terms) < 1 { - return nil, errors.New("searcher: terms must not be nil or empty") - } - - // TODO(matt): Create validate() method on *SearchTerm type instead? - for _, t := range terms { - if t.Topic == "" { - return nil, errors.New("searcher: search topics must not be empty") - } - - if t.Query == "" { - return nil, errors.New("searcher: search queries must not be empty") - } - } - - if minResults < 1 { - return nil, errors.New("searcher: minResults must be > 0") - } - - sr := &Searcher{ - twitterClient: client, - httpClient: &http.Client{}, - db: db, - maxAge: maxAge, - minResults: minResults, - logger: logger, - searchTerms: terms, - } - - sr.twitterClient.HttpClient = sr.httpClient - - _, err := sr.twitterClient.VerifyCredentials() - if err != nil { - return nil, errors.Wrap(err, "could not authenticate to Twitter API") - } - - return sr, nil -} - -// Run performs a concurrent search against the configured terms, and returns -// results onto the provided searched channel. -// -// Run returns when searches have completed, and can be cancelled by wrapping -// the provided context with context.WithCancel and calling the provided -// CancelFunc. -func (sr *Searcher) Run(ctx context.Context, searched chan<- *SearchResult) error { - sr.wg.Add(len(sr.searchTerms)) - for _, term := range sr.searchTerms { - go sr.search(ctx, *term, searched) - } - - sr.wg.Wait() - close(searched) - - return nil -} - -func (sr *Searcher) getLastSeenID(ctx context.Context, st SearchTerm) (int64, error) { - topicSlug := slug.Make(st.Topic) - sentiments, err := sr.db.GetSentimentsBySlug( - ctx, - topicSlug, - 1, - ) - if err != nil { - return 0, err - } - - if len(sentiments) != 1 { - return 0, errors.Errorf("ambiguous number of sentiments returned: want %d, got %d", 1, len(sentiments)) - } - - return sentiments[0].LastSeenID, nil -} - -func (sr *Searcher) search(ctx context.Context, st SearchTerm, searched chan<- *SearchResult) { - defer sr.wg.Done() - - fromID, err := sr.getLastSeenID(ctx, st) - if err != nil { - // Log the error, but proceed without the checkpoint. - sr.logger.Log("err", err, "topic", st.Topic) - } - - params := url.Values{} - params.Set("result_type", "recent") - params.Set("lang", "en") - if sr.minResults > 100 { - params.Set("count", "100") - } else { - params.Set("count", strconv.Itoa(sr.minResults)) - - } - - term := st.buildQuery() - sr.logger.Log( - "status", "searching", - "topic", st.Topic, - "query", st.Query, - "fromID", fromID, - ) - - var ( - collected int // Total tweets collected - seen int // Total tweets seen - // Acts as our paginaton cursor. We use this to fetch the next set (older) results. - // Ref: https://developer.twitter.com/en/docs/tweets/timelines/guides/working-with-timelines - cursor int64 = math.MaxInt64 - ) - - // If we see 3x the minimum result count, and have not collected sufficient - // results, we give up the search until the next run. This may occur when we - // are attempting to fetch too many tweets at short intervals for a search - // query with minimal results. - for (collected < sr.minResults) || (seen >= sr.minResults*3) { - select { - // Cancel before the next fetch, but still allow any fetched tweets to be - // processed. - case <-ctx.Done(): - sr.logger.Log("status", "closing", "err", ctx.Err()) - return - default: - } - - // Only fetch tweets older than our cursor - params.Set("max_id", strconv.FormatInt(cursor-1, 10)) - // Don't fetch tweets older than since_id - params.Set("since_id", strconv.FormatInt(fromID, 10)) - - sr.twitterClient.ReturnRateLimitError(true) - resp, err := sr.twitterClient.GetSearch(term, params) - if err != nil { - // TODO(matt): Inspect & log rate limit errors. - // TODO(matt): Implement retry logic. - sr.logger.Log("err", err, "msg", "Twitter API error") - return - } - - for _, status := range resp.Statuses { - // Track the oldest (lowest) tweet ID as our pagination cursor. - if cursor > status.Id { - cursor = status.Id - } - - t, err := status.CreatedAtTime() - if err != nil { - seen++ - continue - } - - // Skip "old" results to ensure relevance. - if time.Since(t) > sr.maxAge { - seen++ - continue - } - - var retweet bool - if status.RetweetedStatus != nil { - retweet = true - } - - s := &SearchResult{ - searchTerm: &st, - tweetID: status.Id, - retweet: retweet, - content: status.Text, - } - - searched <- s - collected++ - seen++ - } - } -} +package centiment + +import ( + "context" + "math" + "net/http" + "net/url" + "strconv" + "strings" + "sync" + "time" + + "github.com/ChimeraCoder/anaconda" + "github.com/go-kit/kit/log" + "github.com/gosimple/slug" + "github.com/pkg/errors" + "golang.org/x/text/unicode/norm" + languagepb "google.golang.org/genproto/googleapis/cloud/language/v1" +) + +// SearchTerm represents the term or phrase to search for a given topic. +type SearchTerm struct { + // The human-readable topic of the search. + Topic string + // The Twitter search query + // Ref: https://developer.twitter.com/en/docs/tweets/search/guides/standard-operators + Query string +} + +func (st *SearchTerm) buildQuery() string { + st.Query = strings.TrimSpace(st.Query) + return st.Query +} + +// SearchResult represents the result of a search against Twitter, and +// encapsulates a Tweet. +type SearchResult struct { + searchTerm *SearchTerm + tweetID int64 + retweet bool + content string +} + +// sentimentRequest prepares a SearchResult for sentiment analysis. +func (s *SearchResult) sentimentRequest() *languagepb.AnalyzeSentimentRequest { + return &languagepb.AnalyzeSentimentRequest{ + Document: &languagepb.Document{ + Source: &languagepb.Document_Content{ + Content: string(norm.NFC.Bytes([]byte(s.content))), + }, + Type: languagepb.Document_PLAIN_TEXT, + Language: "en", + }, + } +} + +// Searcher is a worker pool that searches Twitter for the given +// set of search terms. Call NewSearcher to configure a new pool. +// Pools are safe to use concurrently. +// +// The "Run" method on Searcher should be used to begin a search. +type Searcher struct { + twitterClient *anaconda.TwitterApi + db DB + httpClient *http.Client + logger log.Logger + wg sync.WaitGroup + searchTerms []*SearchTerm + minResults int + maxAge time.Duration +} + +// NewSearcher creates a new Searcher with the given search terms. It will attempt to fetch minResults per search term and return tweets newer than maxAge. +func NewSearcher(logger log.Logger, terms []*SearchTerm, minResults int, maxAge time.Duration, client *anaconda.TwitterApi, db DB) (*Searcher, error) { + if terms == nil || len(terms) < 1 { + return nil, errors.New("searcher: terms must not be nil or empty") + } + + // TODO(matt): Create validate() method on *SearchTerm type instead? + for _, t := range terms { + if t.Topic == "" { + return nil, errors.New("searcher: search topics must not be empty") + } + + if t.Query == "" { + return nil, errors.New("searcher: search queries must not be empty") + } + } + + if minResults < 1 { + return nil, errors.New("searcher: minResults must be > 0") + } + + sr := &Searcher{ + twitterClient: client, + httpClient: &http.Client{}, + db: db, + maxAge: maxAge, + minResults: minResults, + logger: logger, + searchTerms: terms, + } + + sr.twitterClient.HttpClient = sr.httpClient + + _, err := sr.twitterClient.VerifyCredentials() + if err != nil { + return nil, errors.Wrap(err, "could not authenticate to Twitter API") + } + + return sr, nil +} + +// Run performs a concurrent search against the configured terms, and returns +// results onto the provided searched channel. +// +// Run returns when searches have completed, and can be cancelled by wrapping +// the provided context with context.WithCancel and calling the provided +// CancelFunc. +func (sr *Searcher) Run(ctx context.Context, searched chan<- *SearchResult) error { + sr.wg.Add(len(sr.searchTerms)) + for _, term := range sr.searchTerms { + go sr.search(ctx, *term, searched) + } + + sr.wg.Wait() + close(searched) + + return nil +} + +func (sr *Searcher) getLastSeenID(ctx context.Context, st SearchTerm) (int64, error) { + topicSlug := slug.Make(st.Topic) + sentiments, err := sr.db.GetSentimentsBySlug( + ctx, + topicSlug, + 1, + ) + if err != nil { + return 0, err + } + + if len(sentiments) != 1 { + return 0, errors.Errorf("ambiguous number of sentiments returned: want %d, got %d", 1, len(sentiments)) + } + + return sentiments[0].LastSeenID, nil +} + +func (sr *Searcher) search(ctx context.Context, st SearchTerm, searched chan<- *SearchResult) { + defer sr.wg.Done() + + fromID, err := sr.getLastSeenID(ctx, st) + if err != nil { + // Log the error, but proceed without the checkpoint. + sr.logger.Log("err", err, "topic", st.Topic) + } + + params := url.Values{} + params.Set("result_type", "recent") + params.Set("lang", "en") + if sr.minResults > 100 { + params.Set("count", "100") + } else { + params.Set("count", strconv.Itoa(sr.minResults)) + + } + + term := st.buildQuery() + sr.logger.Log( + "status", "searching", + "topic", st.Topic, + "query", st.Query, + "fromID", fromID, + ) + + var ( + collected int // Total tweets collected + seen int // Total tweets seen + // Acts as our paginaton cursor. We use this to fetch the next set (older) results. + // Ref: https://developer.twitter.com/en/docs/tweets/timelines/guides/working-with-timelines + cursor int64 = math.MaxInt64 + ) + + // If we see 3x the minimum result count, and have not collected sufficient + // results, we give up the search until the next run. This may occur when we + // are attempting to fetch too many tweets at short intervals for a search + // query with minimal results. + for (collected < sr.minResults) || (seen >= sr.minResults*3) { + select { + // Cancel before the next fetch, but still allow any fetched tweets to be + // processed. + case <-ctx.Done(): + sr.logger.Log("status", "closing", "err", ctx.Err()) + return + default: + } + + // Only fetch tweets older than our cursor + params.Set("max_id", strconv.FormatInt(cursor-1, 10)) + // Don't fetch tweets older than since_id + params.Set("since_id", strconv.FormatInt(fromID, 10)) + + sr.twitterClient.ReturnRateLimitError(true) + resp, err := sr.twitterClient.GetSearch(term, params) + if err != nil { + // TODO(matt): Inspect & log rate limit errors. + // TODO(matt): Implement retry logic. + sr.logger.Log("err", err, "msg", "Twitter API error") + return + } + + for _, status := range resp.Statuses { + // Track the oldest (lowest) tweet ID as our pagination cursor. + if cursor > status.Id { + cursor = status.Id + } + + t, err := status.CreatedAtTime() + if err != nil { + seen++ + continue + } + + // Skip "old" results to ensure relevance. + if time.Since(t) > sr.maxAge { + seen++ + continue + } + + var retweet bool + if status.RetweetedStatus != nil { + retweet = true + } + + s := &SearchResult{ + searchTerm: &st, + tweetID: status.Id, + retweet: retweet, + content: status.Text, + } + + searched <- s + collected++ + seen++ + } + } +} diff --git a/server.go b/server.go index 8abe90e..dee02c3 100644 --- a/server.go +++ b/server.go @@ -65,6 +65,7 @@ func (ep *Endpoint) ServeHTTP(w http.ResponseWriter, r *http.Request) { } } +// AddIndexEndpoints adds the entrypoint/index handlers to the given router. func AddIndexEndpoints(r *mux.Router, env *Env) *mux.Router { h := func(w http.ResponseWriter, r *http.Request) { fmt.Fprintf(w, "%s\n", env.Hostname) @@ -74,6 +75,8 @@ func AddIndexEndpoints(r *mux.Router, env *Env) *mux.Router { return r } +// AddSentimentEndpoints adds the sentiment endpoints to the given router, and +// returns an instance of the Subrouter. func AddSentimentEndpoints(r *mux.Router, env *Env) *mux.Router { s := r.PathPrefix("/sentiments").Subrouter() s.Handle("/{topicSlug}", &Endpoint{Env: env, Handler: sentimentHandler}) @@ -89,6 +92,8 @@ func AddSentimentEndpoints(r *mux.Router, env *Env) *mux.Router { return s } +// AddMetricEndpoints adds the metric/debugging endpoints to the given router, and +// returns an instance of the Subrouter. func AddMetricEndpoints(r *mux.Router, env *Env) *mux.Router { m := r.PathPrefix("/metrics").Subrouter() m.Handle("/{profile}", &Endpoint{Env: env, Handler: metricsHandler}) @@ -96,6 +101,8 @@ func AddMetricEndpoints(r *mux.Router, env *Env) *mux.Router { return m } +// AddHealthCheckEndpoints adds the health check endpoints to the given router, and +// returns an instance of the Subrouter. func AddHealthCheckEndpoints(r *mux.Router, env *Env) *mux.Router { // App Engine health checks. h := r.PathPrefix("/health").Subrouter() @@ -104,6 +111,7 @@ func AddHealthCheckEndpoints(r *mux.Router, env *Env) *mux.Router { return h } +// LogRequest logs each HTTP request, using the given logger. func LogRequest(logger log.Logger) func(http.Handler) http.Handler { fn := func(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -111,6 +119,7 @@ func LogRequest(logger log.Logger) func(http.Handler) http.Handler { next.ServeHTTP(w, r) logger.Log( "method", r.Method, + "host", r.Host, "url", r.URL.String(), "ip", r.RemoteAddr, "forwarded-ip", r.Header.Get("X-Forwarded-For"),