Skip to content

Commit

Permalink
Fixes after race testing TestUpdatePhase #18
Browse files Browse the repository at this point in the history
- Updated twitter.ClientWrapper struct and methods to better synchronise ClientWrapper.RateLimits and ClientWrapper.TweetCap (17/04/2023 - 12:05:17)
- Added the twitter.ClientWrapper.RateLimit method which returns the latest rate limit for the given BindingType. This was in order to make accessing the sync.Map that holds the rate limits a bit easier (17/04/2023 - 12:06:05)
- Access token that is held in reddit.Client can now only be accessed and set using methods. This is because there is now a RWMutex that manages synchronisation for it (17/04/2023 - 12:06:58)
- Changed all accesses to ClientWrapper.RateLimits in update.go to instead use the ClientWrapper.RateLimit method (17/04/2023 - 12:07:56)
- Updated gapi to v1.0.1 to consolidate synchronisation fixes (17/04/2023 - 12:10:15)
  • Loading branch information
andygello555 committed Apr 17, 2023
1 parent f7c515c commit 6ab667f
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 62 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ require (
cloud.google.com/go/iam v0.7.0 // indirect
cloud.google.com/go/pubsub v1.10.0 // indirect
github.com/andygello555/agem v1.0.2 // indirect
github.com/andygello555/gapi v1.0.0 // indirect
github.com/andygello555/gapi v1.0.1 // indirect
github.com/andygello555/go-steamcmd v1.0.0 // indirect
github.com/andygello555/url-fmt v1.0.0 // indirect
github.com/antonmedv/expr v1.12.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ github.com/andygello555/agem v1.0.2 h1:+WvErgnQYSmSZfviOnGbqn6HC2Z5Gkq4jPv5xLP7X
github.com/andygello555/agem v1.0.2/go.mod h1:QX1Da5PpvjO8oujQ2oOIVOxsI63t2mwzGMg2w2vnbzM=
github.com/andygello555/gapi v1.0.0 h1:kqxk37lFjAiGQZpwYD52L2FM9h0uvUvYhZSuZ4huZ20=
github.com/andygello555/gapi v1.0.0/go.mod h1:dzB8kvK74KuQTGFo7EvmRbtuu78Y9neyXmnwlbd9UHo=
github.com/andygello555/gapi v1.0.1 h1:FnGGYVl2SkpkD5sAcroApSVeR0/wtFotI8ywvBEhtBA=
github.com/andygello555/gapi v1.0.1/go.mod h1:dzB8kvK74KuQTGFo7EvmRbtuu78Y9neyXmnwlbd9UHo=
github.com/andygello555/go-steamcmd v1.0.0 h1:fci0WASRmbLH4bmRifPojyOXI+3QBYZiTMxznd9pRVo=
github.com/andygello555/go-steamcmd v1.0.0/go.mod h1:NXdNjIrm0ALxOtVbEH9TFwU7fvJB6vZJmfG10rShonQ=
github.com/andygello555/gotils/v2 v2.1.0 h1:/nL14tDKiVyVNY/iBU8nQZHVYmQF4A5WoUbWUC0XvAY=
Expand Down
9 changes: 2 additions & 7 deletions reddit/bindings.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"net/http"
"net/url"
"strings"
"time"
)

var API = api.NewAPI(nil, api.Schema{
Expand All @@ -22,12 +21,8 @@ var API = api.NewAPI(nil, api.Schema{
return api.HTTPRequest{Request: req}
}).SetResponseMethod(func(binding api.Binding[accessTokenResponse, AccessToken], response accessTokenResponse, args ...any) AccessToken {
client := binding.Attrs()["client"].(*Client)
client.AccessToken = &AccessToken{
accessTokenResponse: response,
FetchedTime: time.Now().UTC(),
ExpireTime: time.Now().UTC().Add(time.Second * time.Duration(response.ExpiresIn)),
}
return *client.AccessToken
client.setAccessToken(response)
return *client.AccessToken()
}).AddAttrs(
func(client api.Client) (string, any) { return "client", client },
).SetName("access_token")),
Expand Down
44 changes: 40 additions & 4 deletions reddit/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,9 @@ func (r *RateLimit) String() string {
const requestsPeriodCountMax = 300

type Client struct {
Config Config
AccessToken *AccessToken
Config Config
accessTokenMutex sync.RWMutex
accessToken *AccessToken
// rateLimits is a sync.Map of binding names to references to api.RateLimit(s). I.e.
// map[string]api.RateLimit
rateLimits sync.Map
Expand Down Expand Up @@ -156,8 +157,37 @@ func (c *Client) Log(msg string) {
log.WARNING.Println(msg)
}

func (c *Client) AccessToken() *AccessToken {
c.accessTokenMutex.RLock()
defer c.accessTokenMutex.RUnlock()
if c.accessToken == nil {
return nil
}
return &AccessToken{
accessTokenResponse: c.accessToken.accessTokenResponse,
FetchedTime: c.accessToken.FetchedTime,
ExpireTime: c.accessToken.ExpireTime,
}
}

func (c *Client) setAccessToken(response accessTokenResponse) {
c.accessTokenMutex.Lock()
defer c.accessTokenMutex.Unlock()
c.accessToken = &AccessToken{
accessTokenResponse: response,
FetchedTime: time.Now().UTC(),
ExpireTime: time.Now().UTC().Add(time.Second * time.Duration(response.ExpiresIn)),
}
}

func (c *Client) RefreshToken() (err error) {
if c.AccessToken == nil || c.AccessToken.Expired() {
refresh := func() bool {
c.accessTokenMutex.RLock()
defer c.accessTokenMutex.RUnlock()
return c.accessToken == nil || c.accessToken.Expired()
}()

if refresh {
if _, err = API.Execute("access_token"); err != nil {
err = errors.Wrap(err, "could not fetched access_token for Reddit API")
}
Expand Down Expand Up @@ -240,7 +270,13 @@ func (c *Client) Run(ctx context.Context, bindingName string, attrs map[string]a
}

// Then we will set the headers from the AccessToken for this request
for header, values := range c.AccessToken.Headers() {
headers := func() http.Header {
c.accessTokenMutex.RLock()
defer c.accessTokenMutex.RUnlock()
return c.accessToken.Headers()
}()

for header, values := range headers {
for _, value := range values {
request.Header.Add(header, value)
}
Expand Down
133 changes: 93 additions & 40 deletions twitter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ import (
)

type ClientWrapper struct {
Client *twitter.Client
Config Config
Mutex sync.Mutex
RateLimits map[BindingType]*twitter.RateLimit
Client *twitter.Client
Config Config
TweetCapMutex sync.RWMutex
// RateLimits stores the twitter.RateLimit for each BindingType that has been executed. Actual map type:
// map[BindingType]*twitter.RateLimit
RateLimits sync.Map
TweetCap *TweetCap
}

Expand Down Expand Up @@ -69,7 +71,7 @@ func ClientCreate(config Config) error {
Host: "https://api.twitter.com",
},
Config: config,
RateLimits: make(map[BindingType]*twitter.RateLimit),
RateLimits: sync.Map{},
}
if err := Client.GetTweetCap(); err != nil {
return errors.Wrap(err, "could not create client, could not get Tweet cap")
Expand Down Expand Up @@ -103,6 +105,13 @@ func (tc *TweetCap) CheckFresh() bool {
return !(tc.LastFetched.Before(tc.Resets) && time.Now().UTC().After(tc.Resets))
}

// RateLimit returns the latest twitter.RateLimit for the given BindingType. The bool represents whether there is a
// twitter.RateLimit for the given BindingType.
func (w *ClientWrapper) RateLimit(bindingType BindingType) (*twitter.RateLimit, bool) {
rateLimitAny, ok := w.RateLimits.Load(bindingType)
return rateLimitAny.(*twitter.RateLimit), ok
}

// GetTweetCap will first check if the TweetCap cache file exists. If so, then it will read the TweetCap from there.
// Otherwise, it will log in to the Twitter developer portal using browser.Browser and parse the details on the
// dashboard to a TweetCap instance.
Expand All @@ -116,20 +125,37 @@ func (w *ClientWrapper) GetTweetCap() (err error) {
return err
}

w.TweetCap = &TweetCap{}
if err = json.Unmarshal(tweetCapData, w.TweetCap); err != nil {
return err
if err = func() error {
w.TweetCapMutex.Lock()
defer w.TweetCapMutex.Unlock()
w.TweetCap = &TweetCap{}
return json.Unmarshal(tweetCapData, w.TweetCap)
}(); err != nil {
return
}
}

tweetCapStringer := func() string {
w.TweetCapMutex.RLock()
defer w.TweetCapMutex.RUnlock()
return w.TweetCap.String()
}

// If the TweetCap was not loaded from a file, or the TweetCap is not fresh then we will fetch the TweetCap using a
// browser.Browser.
if w.TweetCap == nil || !w.TweetCap.CheckFresh() {
nilTweetCap, refresh := func() (bool, bool) {
w.TweetCapMutex.RLock()
defer w.TweetCapMutex.RUnlock()
nilTweetCap := w.TweetCap == nil
return nilTweetCap, nilTweetCap || !w.TweetCap.CheckFresh()
}()

if refresh {
message := "not fresh"
if w.TweetCap == nil {
if nilTweetCap {
message = "nil"
} else {
message += " (" + w.TweetCap.String() + ")"
message += " (" + tweetCapStringer() + ")"
}
log.INFO.Printf("\tTweetCap is %s, fetching TweetCap from web", message)
var b *browser.Browser
Expand Down Expand Up @@ -272,34 +298,52 @@ func (w *ClientWrapper) GetTweetCap() (err error) {
tweetCap.Resets.Location(),
)
tweetCap.LastFetched = time.Now().UTC()
w.TweetCap = tweetCap
log.INFO.Printf("\t\tSuccessfully fetched new TweetCap from web: %s", w.TweetCap.String())

func() {
w.TweetCapMutex.Lock()
defer w.TweetCapMutex.Unlock()
w.TweetCap = tweetCap
}()

log.INFO.Printf("\t\tSuccessfully fetched new TweetCap from web: %s", tweetCapStringer())
if err = w.WriteTweetCap(); err != nil {
return errors.Wrap(err, "could not write TweetCap cache file")
}
log.INFO.Printf("\tSuccessfully wrote TweetCap to %s", w.Config.TwitterTweetCapLocation())
}
log.INFO.Printf("Successfully fetched TweetCap: %s", w.TweetCap.String())

log.INFO.Printf("Successfully fetched TweetCap: %s", tweetCapStringer())
return nil
}

// WriteTweetCap will write the TweetCap to a JSON cache file located at DefaultTweetCapLocation.
func (w *ClientWrapper) WriteTweetCap() (err error) {
// Marshall the TweetCap to JSON (whilst holding a read lock)
var jsonBytes []byte
if jsonBytes, err = json.Marshal(w.TweetCap); err != nil {
if jsonBytes, err = func() ([]byte, error) {
w.TweetCapMutex.RLock()
defer w.TweetCapMutex.RUnlock()
return json.Marshal(w.TweetCap)
}(); err != nil {
return errors.Wrap(err, "could not Marshal TweetCap to JSON")
}

// Create a file for the TweetCap at the location from the config
var file *os.File
if file, err = os.Create(w.Config.TwitterTweetCapLocation()); err != nil {
return errors.Wrapf(err, "could not create TweetCap file %s", w.Config.TwitterTweetCapLocation())
}

// Defer a function to close the file
defer func(file *os.File) {
err = myErrors.MergeErrors(err, errors.Wrapf(
file.Close(),
"could not close TweetCap file %s",
w.Config.TwitterTweetCapLocation(),
))
}(file)

// Write the TweetCap JSON bytes to the file
if _, err = file.Write(jsonBytes); err != nil {
return errors.Wrapf(err, "could not write to TweetCap file %s", w.Config.TwitterTweetCapLocation())
}
Expand All @@ -313,8 +357,9 @@ func (w *ClientWrapper) SetTweetCap(used int, remaining int, total int, resets t
"Setting TweetCap to: %d used, %d remaining, %d total, %s reset, %s fetched",
used, remaining, total, resets.String(), lastFetched.String(),
)
w.Mutex.Lock()
defer w.Mutex.Unlock()

w.TweetCapMutex.Lock()
defer w.TweetCapMutex.Unlock()
if w.TweetCap == nil {
w.TweetCap = &TweetCap{}
}
Expand Down Expand Up @@ -356,28 +401,24 @@ func (w *ClientWrapper) CheckRateLimit(binding *Binding, totalResources int) (er

// We create copies of both the tweet cap and the current rate limit for this action (if there is one) so we don't
// have to lock the mutex for a long time.
w.Mutex.Lock()
tweetCap := TweetCap{
Used: w.TweetCap.Used,
Remaining: w.TweetCap.Remaining,
Total: w.TweetCap.Total,
Resets: w.TweetCap.Resets,
LastFetched: w.TweetCap.LastFetched,
}
rateLimit, rateLimitOk := w.RateLimits[binding.Type]
if rateLimitOk {
rateLimit = &twitter.RateLimit{
Limit: rateLimit.Limit,
Remaining: rateLimit.Remaining,
Reset: rateLimit.Reset,
tweetCap := func() TweetCap {
w.TweetCapMutex.RLock()
defer w.TweetCapMutex.RUnlock()
tweetCap := TweetCap{
Used: w.TweetCap.Used,
Remaining: w.TweetCap.Remaining,
Total: w.TweetCap.Total,
Resets: w.TweetCap.Resets,
LastFetched: w.TweetCap.LastFetched,
}
}
w.Mutex.Unlock()
return tweetCap
}()
rateLimit, rateLimitOk := w.RateLimit(binding.Type)

// If we don't have enough of our monthly tweet cap remaining to fetch the requested number of tweets then we will
// return an error.
if binding.ResourceType == Tweet && totalResources > tweetCap.Remaining {
log.WARNING.Printf("TweetCap check for %s failed when requesting %d tweets (%s)", binding.Type.String(), totalResources, w.TweetCap.String())
log.WARNING.Printf("TweetCap check for %s failed when requesting %d tweets (%s)", binding.Type.String(), totalResources, tweetCap.String())
return &RateLimitError{
Config: w.Config.TwitterRateLimits(),
TweetCap: &tweetCap,
Expand Down Expand Up @@ -472,8 +513,7 @@ func (w *ClientWrapper) ExecuteBinding(bindingType BindingType, options *Binding
// and the number of requests remaining.
updateRateLimits := func() {
rateLimit := bindingResult.RateLimit()
w.Mutex.Lock()
latestRateLimit, ok := w.RateLimits[bindingType]
latestRateLimit, ok := w.RateLimit(bindingType)
update := !ok
if ok {
// We initialise some booleans to make things more readable
Expand All @@ -482,20 +522,31 @@ func (w *ClientWrapper) ExecuteBinding(bindingType BindingType, options *Binding
afterPeriod := rateLimit.Reset.Time().After(latestRateLimit.Reset.Time())
update = (samePeriod && smallerRemaining) || afterPeriod
}

if update {
log.INFO.Printf(
"Updating RateLimit for %s to %d/%d (%s)",
bindingType.String(), rateLimit.Remaining, rateLimit.Limit, rateLimit.Reset.Time().String(),
)
w.RateLimits[bindingType] = rateLimit
w.RateLimits.Store(bindingType, rateLimit)
} else {
log.WARNING.Printf(
"RateLimit for %s: %d/%d (%s) was not newer than the currently cached RateLimit for %s: %d/%d (%s)",
bindingType.String(), rateLimit.Remaining, rateLimit.Limit, rateLimit.Reset.Time().String(),
bindingType.String(), latestRateLimit.Remaining, latestRateLimit.Limit, latestRateLimit.Reset.Time().String(),
)
}
w.Mutex.Unlock()
}

getTweetCap := func(offset int) (used int, remaining int, total int, resets time.Time, lastFetched time.Time) {
w.TweetCapMutex.RLock()
defer w.TweetCapMutex.RUnlock()
used = w.TweetCap.Used + offset
remaining = w.TweetCap.Remaining - offset
total = w.TweetCap.Total
resets = w.TweetCap.Resets
lastFetched = w.TweetCap.LastFetched
return
}

var response any
Expand Down Expand Up @@ -574,7 +625,8 @@ func (w *ClientWrapper) ExecuteBinding(bindingType BindingType, options *Binding

// Finally, we update the rate limits. First up it's the Tweet cap (if the BindingResourceType is Tweet)
if binding.ResourceType == Tweet {
if err = w.SetTweetCap(w.TweetCap.Used+offset, w.TweetCap.Remaining-offset, w.TweetCap.Total, w.TweetCap.Resets, w.TweetCap.LastFetched); err != nil {
used, remaining, total, resets, lastFetched := getTweetCap(offset)
if err = w.SetTweetCap(used, remaining, total, resets, lastFetched); err != nil {
return bindingResult, errors.Wrapf(err, "could not set TweetCap after request no. %d", requestNo-1)
}
}
Expand Down Expand Up @@ -614,7 +666,8 @@ func (w *ClientWrapper) ExecuteBinding(bindingType BindingType, options *Binding

// Finally, we update the rate limits. First up it's the Tweet cap (if the BindingResourceType is Tweet)
if binding.ResourceType == Tweet {
if err = w.SetTweetCap(w.TweetCap.Used+maxResults, w.TweetCap.Remaining-maxResults, w.TweetCap.Total, w.TweetCap.Resets, w.TweetCap.LastFetched); err != nil {
used, remaining, total, resets, lastFetched := getTweetCap(maxResults)
if err = w.SetTweetCap(used, remaining, total, resets, lastFetched); err != nil {
return bindingResult, errors.Wrapf(err, "could not set TweetCap after singleton %s request", binding.Type.String())
}
}
Expand Down
13 changes: 3 additions & 10 deletions update.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,9 +437,7 @@ func twitterBatchProducer(
}

// Check if there is a new rate limit
myTwitter.Client.Mutex.Lock()
rateLimit, ok = myTwitter.Client.RateLimits[myTwitter.RecentSearch]
myTwitter.Client.Mutex.Unlock()
rateLimit, ok = myTwitter.Client.RateLimit(myTwitter.RecentSearch)
log.INFO.Printf("Twitter batchNo: %d) I'm awake. New RateLimit?: %t", batchNo, ok)

// Set low and high for the next batch
Expand Down Expand Up @@ -709,10 +707,7 @@ func UpdatePhase(developerIDs []string, state *ScoutState) (err error) {
len(unscrapedTwitterDevelopers), len(unscrapedTwitterDevelopers),
)

myTwitter.Client.Mutex.Lock()
rateLimit, ok := myTwitter.Client.RateLimits[myTwitter.RecentSearch]
myTwitter.Client.Mutex.Unlock()

rateLimit, ok := myTwitter.Client.RateLimit(myTwitter.RecentSearch)
// If we cannot find the rate limit, or it has already ended then we will keep making singleton requests
// until we refresh the rate limit.
for !ok || rateLimit.Reset.Time().Before(time.Now().UTC()) {
Expand All @@ -737,9 +732,7 @@ func UpdatePhase(developerIDs []string, state *ScoutState) (err error) {
}

// Get the rate limit again
myTwitter.Client.Mutex.Lock()
rateLimit, ok = myTwitter.Client.RateLimits[myTwitter.RecentSearch]
myTwitter.Client.Mutex.Unlock()
rateLimit, ok = myTwitter.Client.RateLimit(myTwitter.RecentSearch)
}

log.WARNING.Printf("Managed to get rate limit for RecentSearch: %v", rateLimit)
Expand Down

0 comments on commit 6ab667f

Please sign in to comment.