diff --git a/.github/workflows/initiate_release.yml b/.github/workflows/initiate_release.yml index f45e15d..29c8229 100644 --- a/.github/workflows/initiate_release.yml +++ b/.github/workflows/initiate_release.yml @@ -28,7 +28,7 @@ jobs: git push -q -u origin "release-$VERSION" - name: Get changelog diff - uses: actions/github-script@v5 + uses: actions/github-script@v6 with: script: | const get_change_log_diff = require('./scripts/get_changelog_diff.js') diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 65bb61f..fd0a06e 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -12,11 +12,11 @@ jobs: if: github.event.pull_request.merged && startsWith(github.head_ref, 'release-') runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 with: fetch-depth: 0 - - uses: actions/github-script@v5 + - uses: actions/github-script@v6 with: script: | const get_change_log_diff = require('./scripts/get_changelog_diff.js') diff --git a/.github/workflows/reviewdog.yml b/.github/workflows/reviewdog.yml index 2d15bf0..291d4f8 100644 --- a/.github/workflows/reviewdog.yml +++ b/.github/workflows/reviewdog.yml @@ -24,7 +24,7 @@ jobs: - name: Install golangci-lint run: - curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin v1.45.2 + curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin v1.46.0 - name: Reviewdog env: diff --git a/README.md b/README.md index 051bdde..b5fb872 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,10 @@ Request Feature
+--- +> ## 🚨 Breaking change in v7.0 < +> From version 7.0.0, all methods' first argument is `context.Context`. The reason is that we internally changed `http.NewRequest()` to `http.NewRequestWithContext()` so that it's easier to handle cancellations, timeouts and deadlines for our users. + ## About Stream stream-go2 is a Go client for [Stream](https://getstream.io) Feeds API. @@ -170,7 +174,7 @@ for _, activity := range resp.Results { You can retrieve flat feeds with [custom ranking](https://getstream.io/docs/#custom_ranking), using the dedicated method: ```go -resp, err := flat.GetActivitiesWithRanking("popularity") +resp, err := flat.GetActivitiesWithRanking(ctx, "popularity") if err != nil { // ... } @@ -179,7 +183,7 @@ if err != nil { #### Aggregated feeds ```go -resp, err := aggregated.GetActivities() +resp, err := aggregated.GetActivities(ctx) if err != nil { // ... } @@ -200,7 +204,7 @@ for _, group := range resp.Results { #### Notification feeds ```go -resp, err := notification.GetActivities() +resp, err := notification.GetActivities(ctx) if err != nil { // ... } @@ -227,6 +231,7 @@ You can pass supported options and filters when retrieving activities: ```go resp, err := flat.GetActivities( + ctx, stream.WithActivitiesIDGTE("f505b3fb-a212-11e7-..."), stream.WithActivitiesLimit(5), ..., @@ -238,7 +243,7 @@ resp, err := flat.GetActivities( Add a single activity: ```go -resp, err := feed.AddActivity(stream.Activity{Actor: "bob", ...}) +resp, err := feed.AddActivity(ctx, stream.Activity{Actor: "bob", ...}) if err != nil { // ... } @@ -255,7 +260,7 @@ a1 := stream.Activity{Actor: "bob", ...} a2 := stream.Activity{Actor: "john", ...} a3 := stream.Activity{Actor: "alice", ...} -resp, err := feed.AddActivities(a1, a2, a3) +resp, err := feed.AddActivities(ctx, a1, a2, a3) if err != nil { // ... } @@ -271,7 +276,7 @@ for _, activity := range resp.Activities { ### Updating activities ```go -_, err := feed.UpdateActivities(a1, a2, ...) +_, err := feed.UpdateActivities(ctx, a1, a2, ...) if err != nil { // ... } @@ -284,7 +289,7 @@ You can partial update activities identified either by ID: ``` go changesetA := stream.NewUpdateActivityRequestByID("f505b3fb-a212-11e7-...", map[string]interface{}{"key": "new-value"}, []string{"removed", "keys"}) changesetB := stream.NewUpdateActivityRequestByID("f707b3fb-a212-11e7-...", map[string]interface{}{"key": "new-value"}, []string{"removed", "keys"}) -resp, err := client.PartialUpdateActivities(changesetA, changesetB) +resp, err := client.PartialUpdateActivities(ctx, changesetA, changesetB) if err != nil { // ... } @@ -294,7 +299,7 @@ or by a ForeignID and timestamp pair: ``` go changesetA := stream.NewUpdateActivityRequestByForeignID("dothings:1", stream.Time{...}, map[string]interface{}{"key": "new-value"}, []string{"removed", "keys"}) changesetB := stream.NewUpdateActivityRequestByForeignID("dothings:2", stream.Time{...}, map[string]interface{}{"key": "new-value"}, []string{"removed", "keys"}) -resp, err := client.PartialUpdateActivities(changesetA, changesetB) +resp, err := client.PartialUpdateActivities(ctx, changesetA, changesetB) if err != nil { // ... } @@ -305,12 +310,12 @@ if err != nil { You can either remove activities by ID or ForeignID: ```go -_, err := feed.RemoveActivityByID("f505b3fb-a212-11e7-...") +_, err := feed.RemoveActivityByID(ctx, "f505b3fb-a212-11e7-...") if err != nil { // ... } -_, err := feed.RemoveActivityByForeignID("bob:123") +_, err := feed.RemoveActivityByForeignID(ctx, "bob:123") if err != nil { // ... } @@ -319,7 +324,7 @@ if err != nil { ### Following another feed ```go -_, err := feed.Follow(anotherFeed) +_, err := feed.Follow(ctx, anotherFeed) if err != nil { // ... } @@ -332,7 +337,8 @@ Beware that it's possible to follow only flat feeds. You can pass options to the `Follow` method. For example: ```go -_, err := feed.Follow(anotherFeed, +_, err := feed.Follow(ctx, + anotherFeed, stream.WithFollowFeedActivityCopyLimit(15), ..., ) @@ -345,7 +351,7 @@ _, err := feed.Follow(anotherFeed, Get the feeds that a feed is following: ```go -resp, err := feed.GetFollowing() +resp, err := feed.GetFollowing(ctx) if err != nil { // ... } @@ -360,6 +366,7 @@ You can pass options to `GetFollowing`: ```go resp, err := feed.GetFollowing( + ctx, stream.WithFollowingLimit(5), ..., ) @@ -368,7 +375,7 @@ resp, err := feed.GetFollowing( #### Followers ```go -resp, err := flat.GetFollowers() +resp, err := flat.GetFollowers(ctx) if err != nil { // ... } @@ -386,6 +393,7 @@ You can pass options to `GetFollowers`: ```go resp, err := feed.GetFollowing( + ctx, stream.WithFollowersLimit(5), ..., ) @@ -394,7 +402,7 @@ resp, err := feed.GetFollowing( ### Unfollowing a feed ```go -_, err := flat.Unfollow(anotherFeed) +_, err := flat.Unfollow(ctx, anotherFeed) if err != nil { // ... } @@ -403,7 +411,8 @@ if err != nil { You can pass options to `Unfollow`: ```go -_, err := flat.Unfollow(anotherFeed, +_, err := flat.Unfollow(ctx, + anotherFeed, stream.WithUnfollowKeepHistory(true), ..., ) @@ -416,7 +425,7 @@ Remove all old targets and set new ones (replace): ```go newTargets := []stream.Feed{f1, f2} -_, err := feed.UpdateToTargets(activity, stream.WithToTargetsNew(newTargets...)) +_, err := feed.UpdateToTargets(ctx, activity, stream.WithToTargetsNew(newTargets...)) if err != nil { // ... } @@ -429,6 +438,7 @@ add := []stream.Feed{target1, target2} remove := []stream.Feed{oldTarget1, oldTarget2} _, err := feed.UpdateToTargets( + ctx, activity, stream.WithToTargetsAdd(add), stream.WithToTargetsRemove(remove), @@ -445,8 +455,8 @@ Note: you can't mix `stream.WithToTargetsNew` with `stream.WithToTargetsAdd` or You can add the same activities to multiple feeds at once with the `(*Client).AddToMany` method ([docs](https://getstream.io/docs_rest/#add_to_many)): ```go -_, err := client.AddToMany(activity, - feed1, feed2, ..., +_, err := client.AddToMany(ctx, + activity, feed1, feed2, ..., ) if err != nil { // ... @@ -463,7 +473,7 @@ relationships := []stream.FollowRelationship{ ..., } -_, err := client.FollowMany(relationships) +_, err := client.FollowMany(ctx, relationships) if err != nil { // ... } @@ -515,7 +525,7 @@ event := stream.EngagementEvent{}. WithLocation("homepage") // Track the event(s) -_, err := analytics.TrackEngagement(event) +_, err := analytics.TrackEngagement(ctx, event) if err != nil { // ... } @@ -533,7 +543,7 @@ imp := stream.ImpressionEventData{}. WithLocation("storepage") // Track the events -_, err := analytics.TrackImpression(imp) +_, err := analytics.TrackImpression(ctx, imp) if err != nil { // ... } @@ -590,7 +600,7 @@ data := map[string]interface{}{ "source_feed_slug": "timeline", "target_feed_slug": "user", } -resp, err = personalization.Get("follow_recommendations", data) +resp, err = personalization.Get(ctx, "follow_recommendations", data) if err != nil { // ... } @@ -618,25 +628,25 @@ object := stream.CollectionObject{ "location": "North America", }, } -_, err = collections.Upsert("picture", object) +_, err = collections.Upsert(ctx, "picture", object) if err != nil { // ... } // Get the data from the "picture" collection for ID "123" and "456" -objects, err := collections.Select("picture", "123", "456") +objects, err := collections.Select(ctx, "picture", "123", "456") if err != nil { // ... } // Delete the data from the "picture" collection for picture with ID "123" -_, err = collections.Delete("picture", "123") +_, err = collections.Delete(ctx, "picture", "123") if err != nil { // ... } // Get a single collection object from the "pictures" collection with ID "123" -_, err = collections.Get("pictures", "123") +_, err = collections.Get(ctx, "pictures", "123") if err != nil { // ... } @@ -662,7 +672,7 @@ user := stream.User{ }, } -insertedUser, err := users.Add(user, false) +insertedUser, err := users.Add(ctx, user, false) if err != nil { // ... } @@ -672,12 +682,12 @@ newUserData :=map[string]interface{}{ "age": 7, } -updatedUser, err := users.Update("123", newUserData) +updatedUser, err := users.Update(ctx, "123", newUserData) if err != nil { // ... } -_, err = users.Delete("123") +_, err = users.Delete(ctx, "123") if err != nil { // ... } @@ -706,7 +716,7 @@ r := stream.AddReactionRequestObject{ TargetFeeds: []string{"user:bob", "timeline:alice"}, } -comment, err := reactions.Add(r) +comment, err := reactions.Add(ctx, r) if err != nil { // ... } @@ -716,13 +726,13 @@ like := stream.AddReactionRequestObject{ UserID: "456", } -childReaction, err := reactions.AddChild(comment.ID, like) +childReaction, err := reactions.AddChild(ctx, comment.ID, like) if err != nil { // ... } // If we fetch the "comment" reaction now, it will have the child reaction(s) present. -parent, err := reactions.Get(comment.ID) +parent, err := reactions.Get(ctx, comment.ID) if err != nil { // ... } @@ -732,14 +742,15 @@ for kind, children := range parent.ChildrenReactions { } // update the target feeds for the `comment` reaction -updatedReaction, err := reactions.Update(comment.ID, nil, []string{"timeline:jeff"}) +updatedReaction, err := reactions.Update(ctx, comment.ID, nil, []string{"timeline:jeff"}) if err != nil { // ... } // get all reactions for the activity "87a9eec0-fd5f-11e8-8080-80013fed2f5b", // paginated 5 at a time, including the activity data -response, err := reactions.Filter(stream.ByActivityID("87a9eec0-fd5f-11e8-8080-80013fed2f5b"), +response, err := reactions.Filter(ctx, + stream.ByActivityID("87a9eec0-fd5f-11e8-8080-80013fed2f5b"), stream.WithLimit(5), stream.WithActivityData()) if err != nil { @@ -754,13 +765,13 @@ for _, reaction := range response.Results{ } //get the next page of reactions -response, err = reactions.GetNextPageFilteredReactions(response) +response, err = reactions.GetNextPageFilteredReactions(ctx, response) if err != nil { // ... } // get all likes by user "123" -response, err = reactions.Filter(stream.ByUserID("123").ByKind("like")) +response, err = reactions.Filter(ctx, stream.ByUserID("123").ByKind("like")) if err != nil { // ... } @@ -783,7 +794,7 @@ u := stream.User{ } // We add a user -user, err := client.Users().Add(u, true) +user, err := client.Users().Add(ctx, u, true) if err != nil { // ... } @@ -797,7 +808,7 @@ c := stream.CollectionObject{ } // We add a collection object -collectionObject, err := client.Collections().Add("picture", c) +collectionObject, err := client.Collections().Add(ctx, "picture", c) if err != nil { // ... } @@ -813,19 +824,19 @@ act := stream.Activity{ // We add the activity to the user's feed feed, _ := client.FlatFeed("user", "123") -_, err = feed.AddActivity(act) +_, err = feed.AddActivity(ctx, act) if err != nil { // ... } -result, err := feed.GetActivities() +result, err := feed.GetActivities(ctx) if err != nil { // ... } fmt.Println(result.Results[0].Actor) // Will output the user reference fmt.Println(result.Results[0].Object) // Will output the collection reference -enrichedResult, err := feed.GetEnrichedActivities() +enrichedResult, err := feed.GetEnrichedActivities(ctx) if err != nil { // ... } diff --git a/aggregated_feed.go b/aggregated_feed.go index 402ff48..0e2b5f2 100644 --- a/aggregated_feed.go +++ b/aggregated_feed.go @@ -1,6 +1,7 @@ package stream import ( + "context" "encoding/json" ) @@ -12,8 +13,8 @@ type AggregatedFeed struct { // GetActivities requests and retrieves the activities and groups for the // aggregated feed. -func (f *AggregatedFeed) GetActivities(opts ...GetActivitiesOption) (*AggregatedFeedResponse, error) { - body, err := f.client.getActivities(f, opts...) +func (f *AggregatedFeed) GetActivities(ctx context.Context, opts ...GetActivitiesOption) (*AggregatedFeedResponse, error) { + body, err := f.client.getActivities(ctx, f, opts...) if err != nil { return nil, err } @@ -26,24 +27,24 @@ func (f *AggregatedFeed) GetActivities(opts ...GetActivitiesOption) (*Aggregated // GetActivitiesWithRanking returns the activities and groups for the given AggregatedFeed, // using the provided ranking method. -func (f *AggregatedFeed) GetActivitiesWithRanking(ranking string, opts ...GetActivitiesOption) (*AggregatedFeedResponse, error) { - return f.GetActivities(append(opts, withActivitiesRanking(ranking))...) +func (f *AggregatedFeed) GetActivitiesWithRanking(ctx context.Context, ranking string, opts ...GetActivitiesOption) (*AggregatedFeedResponse, error) { + return f.GetActivities(ctx, append(opts, withActivitiesRanking(ranking))...) } // GetNextPageActivities returns the activities for the given AggregatedFeed at the "next" page // of a previous *AggregatedFeedResponse response, if any. -func (f *AggregatedFeed) GetNextPageActivities(resp *AggregatedFeedResponse) (*AggregatedFeedResponse, error) { +func (f *AggregatedFeed) GetNextPageActivities(ctx context.Context, resp *AggregatedFeedResponse) (*AggregatedFeedResponse, error) { opts, err := resp.parseNext() if err != nil { return nil, err } - return f.GetActivities(opts...) + return f.GetActivities(ctx, opts...) } // GetEnrichedActivities requests and retrieves the enriched activities and groups for the // aggregated feed. -func (f *AggregatedFeed) GetEnrichedActivities(opts ...GetActivitiesOption) (*EnrichedAggregatedFeedResponse, error) { - body, err := f.client.getEnrichedActivities(f, opts...) +func (f *AggregatedFeed) GetEnrichedActivities(ctx context.Context, opts ...GetActivitiesOption) (*EnrichedAggregatedFeedResponse, error) { + body, err := f.client.getEnrichedActivities(ctx, f, opts...) if err != nil { return nil, err } @@ -56,16 +57,16 @@ func (f *AggregatedFeed) GetEnrichedActivities(opts ...GetActivitiesOption) (*En // GetNextPageEnrichedActivities returns the enriched activities for the given AggregatedFeed at the "next" page // of a previous *EnrichedAggregatedFeedResponse response, if any. -func (f *AggregatedFeed) GetNextPageEnrichedActivities(resp *EnrichedAggregatedFeedResponse) (*EnrichedAggregatedFeedResponse, error) { +func (f *AggregatedFeed) GetNextPageEnrichedActivities(ctx context.Context, resp *EnrichedAggregatedFeedResponse) (*EnrichedAggregatedFeedResponse, error) { opts, err := resp.parseNext() if err != nil { return nil, err } - return f.GetEnrichedActivities(opts...) + return f.GetEnrichedActivities(ctx, opts...) } // GetEnrichedActivitiesWithRanking returns the enriched activities and groups for the given AggregatedFeed, // using the provided ranking method. -func (f *AggregatedFeed) GetEnrichedActivitiesWithRanking(ranking string, opts ...GetActivitiesOption) (*EnrichedAggregatedFeedResponse, error) { - return f.GetEnrichedActivities(append(opts, withActivitiesRanking(ranking))...) +func (f *AggregatedFeed) GetEnrichedActivitiesWithRanking(ctx context.Context, ranking string, opts ...GetActivitiesOption) (*EnrichedAggregatedFeedResponse, error) { + return f.GetEnrichedActivities(ctx, append(opts, withActivitiesRanking(ranking))...) } diff --git a/aggregated_feed_test.go b/aggregated_feed_test.go index 31b293b..9fd2cc4 100644 --- a/aggregated_feed_test.go +++ b/aggregated_feed_test.go @@ -1,6 +1,7 @@ package stream_test import ( + "context" "fmt" "net/http" "testing" @@ -12,6 +13,7 @@ import ( ) func TestAggregatedFeedGetActivities(t *testing.T) { + ctx := context.Background() client, requester := newClient(t) aggregated, _ := newAggregatedFeedWithUserID(client, "123") testCases := []struct { @@ -36,55 +38,56 @@ func TestAggregatedFeedGetActivities(t *testing.T) { } for _, tc := range testCases { - _, err := aggregated.GetActivities(tc.opts...) + _, err := aggregated.GetActivities(ctx, tc.opts...) assert.NoError(t, err) testRequest(t, requester.req, http.MethodGet, tc.url, "") - _, err = aggregated.GetActivitiesWithRanking("popularity", tc.opts...) + _, err = aggregated.GetActivitiesWithRanking(ctx, "popularity", tc.opts...) testRequest(t, requester.req, http.MethodGet, fmt.Sprintf("%s&ranking=popularity", tc.url), "") assert.NoError(t, err) - _, err = aggregated.GetEnrichedActivities(tc.opts...) + _, err = aggregated.GetEnrichedActivities(ctx, tc.opts...) assert.NoError(t, err) testRequest(t, requester.req, http.MethodGet, tc.enrichedURL, "") - _, err = aggregated.GetEnrichedActivitiesWithRanking("popularity", tc.opts...) + _, err = aggregated.GetEnrichedActivitiesWithRanking(ctx, "popularity", tc.opts...) testRequest(t, requester.req, http.MethodGet, fmt.Sprintf("%s&ranking=popularity", tc.enrichedURL), "") assert.NoError(t, err) } } func TestAggregatedFeedGetNextPageActivities(t *testing.T) { + ctx := context.Background() client, requester := newClient(t) aggregated, _ := newAggregatedFeedWithUserID(client, "123") requester.resp = `{"next":"/api/v1.0/feed/aggregated/123/?id_lt=78c1a709-aff2-11e7-b3a7-a45e60be7d3b&limit=25"}` - resp, err := aggregated.GetActivities() + resp, err := aggregated.GetActivities(ctx) require.NoError(t, err) - _, err = aggregated.GetNextPageActivities(resp) + _, err = aggregated.GetNextPageActivities(ctx, resp) testRequest(t, requester.req, http.MethodGet, "https://api.stream-io-api.com/api/v1.0/feed/aggregated/123/?api_key=key&id_lt=78c1a709-aff2-11e7-b3a7-a45e60be7d3b&limit=25", "") require.NoError(t, err) requester.resp = `{"next":"/api/v1.0/enrich/feed/aggregated/123/?id_lt=78c1a709-aff2-11e7-b3a7-a45e60be7d3b&limit=25"}` - enrichedResp, err := aggregated.GetEnrichedActivities() + enrichedResp, err := aggregated.GetEnrichedActivities(ctx) require.NoError(t, err) - _, err = aggregated.GetNextPageEnrichedActivities(enrichedResp) + _, err = aggregated.GetNextPageEnrichedActivities(ctx, enrichedResp) testRequest(t, requester.req, http.MethodGet, "https://api.stream-io-api.com/api/v1.0/enrich/feed/aggregated/123/?api_key=key&id_lt=78c1a709-aff2-11e7-b3a7-a45e60be7d3b&limit=25", "") require.NoError(t, err) requester.resp = `{"next":123}` - _, err = aggregated.GetActivities() + _, err = aggregated.GetActivities(ctx) require.Error(t, err) requester.resp = `{"next":"123"}` - resp, err = aggregated.GetActivities() + resp, err = aggregated.GetActivities(ctx) require.NoError(t, err) - _, err = aggregated.GetNextPageActivities(resp) + _, err = aggregated.GetNextPageActivities(ctx, resp) require.Error(t, err) requester.resp = `{"next":"?q=a%"}` - resp, err = aggregated.GetActivities() + resp, err = aggregated.GetActivities(ctx) require.NoError(t, err) - _, err = aggregated.GetNextPageActivities(resp) + _, err = aggregated.GetNextPageActivities(ctx, resp) require.Error(t, err) } diff --git a/analytics.go b/analytics.go index 38f525c..42269b4 100644 --- a/analytics.go +++ b/analytics.go @@ -1,6 +1,7 @@ package stream import ( + "context" "encoding/json" ) @@ -11,18 +12,18 @@ type AnalyticsClient struct { } // TrackEngagement is used to send and track analytics EngagementEvents. -func (c *AnalyticsClient) TrackEngagement(events ...EngagementEvent) (*BaseResponse, error) { +func (c *AnalyticsClient) TrackEngagement(ctx context.Context, events ...EngagementEvent) (*BaseResponse, error) { endpoint := c.client.makeEndpoint("engagement/") data := map[string]interface{}{ "content_list": events, } - return decode(c.client.post(endpoint, data, c.client.authenticator.analyticsAuth)) + return decode(c.client.post(ctx, endpoint, data, c.client.authenticator.analyticsAuth)) } // TrackImpression is used to send and track analytics ImpressionEvents. -func (c *AnalyticsClient) TrackImpression(eventsData ImpressionEventsData) (*BaseResponse, error) { +func (c *AnalyticsClient) TrackImpression(ctx context.Context, eventsData ImpressionEventsData) (*BaseResponse, error) { endpoint := c.client.makeEndpoint("impression/") - return decode(c.client.post(endpoint, eventsData, c.client.authenticator.analyticsAuth)) + return decode(c.client.post(ctx, endpoint, eventsData, c.client.authenticator.analyticsAuth)) } // RedirectAndTrack is used to send and track analytics ImpressionEvents. It tracks diff --git a/analytics_test.go b/analytics_test.go index ce34044..9b804f6 100644 --- a/analytics_test.go +++ b/analytics_test.go @@ -1,6 +1,7 @@ package stream_test import ( + "context" "net/http" "net/url" "testing" @@ -12,6 +13,7 @@ import ( ) func TestAnalyticsTrackEngagement(t *testing.T) { + ctx := context.Background() client, requester := newClient(t) analytics := client.Analytics() event1 := stream.EngagementEvent{}. @@ -33,7 +35,7 @@ func TestAnalyticsTrackEngagement(t *testing.T) { stream.NewEventFeature("size", "xxl"), ) - _, err := analytics.TrackEngagement(event1, event2) + _, err := analytics.TrackEngagement(ctx, event1, event2) require.NoError(t, err) expectedURL := "https://analytics.stream-io-api.com/analytics/v1.0/engagement/?api_key=key" expectedBody := `{"content_list":[{"boost":10,"content":"abcdef","feed_id":"timeline:123","label":"click","location":"hawaii","position":42,"user_data":{"alias":"John Doe","id":12345}},{"content":"aabbccdd","features":[{"group":"color","value":"red"},{"group":"size","value":"xxl"}],"feed_id":"timeline:123","label":"share","user_data":"bob"}]}` @@ -41,6 +43,7 @@ func TestAnalyticsTrackEngagement(t *testing.T) { } func TestAnalyticsTrackImpression(t *testing.T) { + ctx := context.Background() client, requester := newClient(t) analytics := client.Analytics() imp := stream.ImpressionEventsData{}. @@ -54,7 +57,7 @@ func TestAnalyticsTrackImpression(t *testing.T) { WithLocation("hawaii"). WithPosition(42) - _, err := analytics.TrackImpression(imp) + _, err := analytics.TrackImpression(ctx, imp) require.NoError(t, err) expectedURL := "https://analytics.stream-io-api.com/analytics/v1.0/impression/?api_key=key" expectedBody := `{"content_list":["a","b","c","d"],"features":[{"group":"color","value":"red"},{"group":"size","value":"xxl"}],"feed_id":"timeline:123","location":"hawaii","position":42,"user_data":123}` diff --git a/client.go b/client.go index 8990a2e..4e7ece8 100644 --- a/client.go +++ b/client.go @@ -2,6 +2,7 @@ package stream import ( "bytes" + "context" "encoding/json" "errors" "fmt" @@ -158,7 +159,7 @@ func (c *Client) GenericFeed(targetID string) (Feed, error) { } // AddToMany adds an activity to multiple feeds at once. -func (c *Client) AddToMany(activity Activity, feeds ...Feed) error { +func (c *Client) AddToMany(ctx context.Context, activity Activity, feeds ...Feed) error { endpoint := c.makeEndpoint("feed/add_to_many/") ids := make([]string, len(feeds)) for i := range feeds { @@ -168,24 +169,24 @@ func (c *Client) AddToMany(activity Activity, feeds ...Feed) error { Activity: activity, FeedIDs: ids, } - _, err := c.post(endpoint, req, c.authenticator.feedAuth(resFeed, nil)) + _, err := c.post(ctx, endpoint, req, c.authenticator.feedAuth(resFeed, nil)) return err } // FollowMany creates multiple follows at once. -func (c *Client) FollowMany(relationships []FollowRelationship, opts ...FollowManyOption) error { +func (c *Client) FollowMany(ctx context.Context, relationships []FollowRelationship, opts ...FollowManyOption) error { endpoint := c.makeEndpoint("follow_many/") for _, opt := range opts { endpoint.addQueryParam(opt) } - _, err := c.post(endpoint, relationships, c.authenticator.feedAuth(resFollower, nil)) + _, err := c.post(ctx, endpoint, relationships, c.authenticator.feedAuth(resFollower, nil)) return err } // UnfollowMany removes multiple follow relationships at once. -func (c *Client) UnfollowMany(relationships []UnfollowRelationship) error { +func (c *Client) UnfollowMany(ctx context.Context, relationships []UnfollowRelationship) error { endpoint := c.makeEndpoint("unfollow_many/") - _, err := c.post(endpoint, relationships, c.authenticator.feedAuth(resFollower, nil)) + _, err := c.post(ctx, endpoint, relationships, c.authenticator.feedAuth(resFollower, nil)) return err } @@ -229,30 +230,30 @@ func (c *Client) Personalization() *PersonalizationClient { } // GetActivitiesByID returns activities for the current app having the given IDs. -func (c *Client) GetActivitiesByID(ids ...string) (*GetActivitiesResponse, error) { - return c.getAppActivities(makeRequestOption("ids", strings.Join(ids, ","))) +func (c *Client) GetActivitiesByID(ctx context.Context, ids ...string) (*GetActivitiesResponse, error) { + return c.getAppActivities(ctx, makeRequestOption("ids", strings.Join(ids, ","))) } // GetActivitiesByForeignID returns activities for the current app having the given foreign IDs and timestamps. -func (c *Client) GetActivitiesByForeignID(values ...ForeignIDTimePair) (*GetActivitiesResponse, error) { +func (c *Client) GetActivitiesByForeignID(ctx context.Context, values ...ForeignIDTimePair) (*GetActivitiesResponse, error) { foreignIDs := make([]string, len(values)) timestamps := make([]string, len(values)) for i, v := range values { foreignIDs[i] = v.ForeignID timestamps[i] = v.Timestamp.Format(TimeLayout) } - return c.getAppActivities( + return c.getAppActivities(ctx, makeRequestOption("foreign_ids", strings.Join(foreignIDs, ",")), makeRequestOption("timestamps", strings.Join(timestamps, ",")), ) } -func (c *Client) getAppActivities(values ...valuer) (*GetActivitiesResponse, error) { +func (c *Client) getAppActivities(ctx context.Context, values ...valuer) (*GetActivitiesResponse, error) { endpoint := c.makeEndpoint("activities/") for _, v := range values { endpoint.addQueryParam(v) } - data, err := c.get(endpoint, nil, c.authenticator.feedAuth(resActivities, nil)) + data, err := c.get(ctx, endpoint, nil, c.authenticator.feedAuth(resActivities, nil)) if err != nil { return nil, err } @@ -265,13 +266,13 @@ func (c *Client) getAppActivities(values ...valuer) (*GetActivitiesResponse, err } // GetEnrichedActivitiesByID returns enriched activities for the current app having the given IDs. -func (c *Client) GetEnrichedActivitiesByID(ids []string, opts ...GetActivitiesOption) (*GetEnrichedActivitiesResponse, error) { +func (c *Client) GetEnrichedActivitiesByID(ctx context.Context, ids []string, opts ...GetActivitiesOption) (*GetEnrichedActivitiesResponse, error) { options := []GetActivitiesOption{{makeRequestOption("ids", strings.Join(ids, ","))}} - return c.getAppEnrichedActivities(append(options, opts...)...) + return c.getAppEnrichedActivities(ctx, append(options, opts...)...) } // GetEnrichedActivitiesByForeignID returns enriched activities for the current app having the given foreign IDs and timestamps. -func (c *Client) GetEnrichedActivitiesByForeignID(values []ForeignIDTimePair, opts ...GetActivitiesOption) (*GetEnrichedActivitiesResponse, error) { +func (c *Client) GetEnrichedActivitiesByForeignID(ctx context.Context, values []ForeignIDTimePair, opts ...GetActivitiesOption) (*GetEnrichedActivitiesResponse, error) { foreignIDs := make([]string, len(values)) timestamps := make([]string, len(values)) for i, v := range values { @@ -283,15 +284,15 @@ func (c *Client) GetEnrichedActivitiesByForeignID(values []ForeignIDTimePair, op {makeRequestOption("timestamps", strings.Join(timestamps, ","))}, } - return c.getAppEnrichedActivities(append(options, opts...)...) + return c.getAppEnrichedActivities(ctx, append(options, opts...)...) } -func (c *Client) getAppEnrichedActivities(options ...GetActivitiesOption) (*GetEnrichedActivitiesResponse, error) { +func (c *Client) getAppEnrichedActivities(ctx context.Context, options ...GetActivitiesOption) (*GetEnrichedActivitiesResponse, error) { endpoint := c.makeEndpoint("enrich/activities/") for _, v := range options { endpoint.addQueryParam(v.requestOption) } - data, err := c.get(endpoint, nil, c.authenticator.feedAuth(resActivities, nil)) + data, err := c.get(ctx, endpoint, nil, c.authenticator.feedAuth(resActivities, nil)) if err != nil { return nil, err } @@ -304,26 +305,26 @@ func (c *Client) getAppEnrichedActivities(options ...GetActivitiesOption) (*GetE } // UpdateActivities updates existing activities. -func (c *Client) UpdateActivities(activities ...Activity) (*BaseResponse, error) { +func (c *Client) UpdateActivities(ctx context.Context, activities ...Activity) (*BaseResponse, error) { req := struct { Activities []Activity `json:"activities,omitempty"` }{ Activities: activities, } endpoint := c.makeEndpoint("activities/") - return decode(c.post(endpoint, req, c.authenticator.feedAuth(resActivities, nil))) + return decode(c.post(ctx, endpoint, req, c.authenticator.feedAuth(resActivities, nil))) } // PartialUpdateActivities performs a partial update on multiple activities with the given set and unset operations // specified by each changeset. This returns the affected activities. -func (c *Client) PartialUpdateActivities(changesets ...UpdateActivityRequest) (*UpdateActivitiesResponse, error) { +func (c *Client) PartialUpdateActivities(ctx context.Context, changesets ...UpdateActivityRequest) (*UpdateActivitiesResponse, error) { req := struct { Activities []UpdateActivityRequest `json:"changes,omitempty"` }{ Activities: changesets, } endpoint := c.makeEndpoint("activity/") - data, err := c.post(endpoint, req, c.authenticator.feedAuth(resActivities, nil)) + data, err := c.post(ctx, endpoint, req, c.authenticator.feedAuth(resActivities, nil)) if err != nil { return nil, err } @@ -336,8 +337,8 @@ func (c *Client) PartialUpdateActivities(changesets ...UpdateActivityRequest) (* // UpdateActivityByID performs a partial activity update with the given set and unset operations, returning the // affected activity, on the activity with the given ID. -func (c *Client) UpdateActivityByID(id string, set map[string]interface{}, unset []string) (*UpdateActivityResponse, error) { - return c.updateActivity(UpdateActivityRequest{ +func (c *Client) UpdateActivityByID(ctx context.Context, id string, set map[string]interface{}, unset []string) (*UpdateActivityResponse, error) { + return c.updateActivity(ctx, UpdateActivityRequest{ ID: &id, Set: set, Unset: unset, @@ -346,8 +347,8 @@ func (c *Client) UpdateActivityByID(id string, set map[string]interface{}, unset // UpdateActivityByForeignID performs a partial activity update with the given set and unset operations, returning the // affected activity, on the activity with the given foreign ID and timestamp. -func (c *Client) UpdateActivityByForeignID(foreignID string, timestamp Time, set map[string]interface{}, unset []string) (*UpdateActivityResponse, error) { - return c.updateActivity(UpdateActivityRequest{ +func (c *Client) UpdateActivityByForeignID(ctx context.Context, foreignID string, timestamp Time, set map[string]interface{}, unset []string) (*UpdateActivityResponse, error) { + return c.updateActivity(ctx, UpdateActivityRequest{ ForeignID: &foreignID, Time: ×tamp, Set: set, @@ -355,9 +356,9 @@ func (c *Client) UpdateActivityByForeignID(foreignID string, timestamp Time, set }) } -func (c *Client) updateActivity(req UpdateActivityRequest) (*UpdateActivityResponse, error) { +func (c *Client) updateActivity(ctx context.Context, req UpdateActivityRequest) (*UpdateActivityResponse, error) { endpoint := c.makeEndpoint("activity/") - data, err := c.post(endpoint, req, c.authenticator.feedAuth(resActivities, nil)) + data, err := c.post(ctx, endpoint, req, c.authenticator.feedAuth(resActivities, nil)) if err != nil { return nil, err } @@ -417,20 +418,20 @@ func (c *Client) makeEndpoint(format string, a ...interface{}) endpoint { } } -func (c *Client) get(endpoint endpoint, data interface{}, authFn authFunc) ([]byte, error) { - return c.request(http.MethodGet, endpoint, data, authFn) +func (c *Client) get(ctx context.Context, endpoint endpoint, data interface{}, authFn authFunc) ([]byte, error) { + return c.request(ctx, http.MethodGet, endpoint, data, authFn) } -func (c *Client) post(endpoint endpoint, data interface{}, authFn authFunc) ([]byte, error) { - return c.request(http.MethodPost, endpoint, data, authFn) +func (c *Client) post(ctx context.Context, endpoint endpoint, data interface{}, authFn authFunc) ([]byte, error) { + return c.request(ctx, http.MethodPost, endpoint, data, authFn) } -func (c *Client) put(endpoint endpoint, data interface{}, authFn authFunc) ([]byte, error) { - return c.request(http.MethodPut, endpoint, data, authFn) +func (c *Client) put(ctx context.Context, endpoint endpoint, data interface{}, authFn authFunc) ([]byte, error) { + return c.request(ctx, http.MethodPut, endpoint, data, authFn) } -func (c *Client) delete(endpoint endpoint, data interface{}, authFn authFunc) ([]byte, error) { - return c.request(http.MethodDelete, endpoint, data, authFn) +func (c *Client) delete(ctx context.Context, endpoint endpoint, data interface{}, authFn authFunc) ([]byte, error) { + return c.request(ctx, http.MethodDelete, endpoint, data, authFn) } func (c *Client) setBaseHeaders(r *http.Request) { @@ -438,7 +439,7 @@ func (c *Client) setBaseHeaders(r *http.Request) { r.Header.Set("X-Stream-Client", fmt.Sprintf("stream-go2-client-%s", Version)) } -func (c *Client) request(method string, endpoint endpoint, data interface{}, authFn authFunc) ([]byte, error) { +func (c *Client) request(ctx context.Context, method string, endpoint endpoint, data interface{}, authFn authFunc) ([]byte, error) { var reader io.Reader if data != nil { payload, err := json.Marshal(data) @@ -448,7 +449,7 @@ func (c *Client) request(method string, endpoint endpoint, data interface{}, aut reader = bytes.NewReader(payload) } - req, err := http.NewRequest(method, endpoint.String(), reader) + req, err := http.NewRequestWithContext(ctx, method, endpoint.String(), reader) if err != nil { return nil, fmt.Errorf("cannot create request: %w", err) } @@ -491,9 +492,9 @@ func (c *Client) request(method string, endpoint endpoint, data interface{}, aut return body, nil } -func (c *Client) addActivity(feed Feed, activity Activity) (*AddActivityResponse, error) { +func (c *Client) addActivity(ctx context.Context, feed Feed, activity Activity) (*AddActivityResponse, error) { endpoint := c.makeEndpoint("feed/%s/%s/", feed.Slug(), feed.UserID()) - resp, err := c.post(endpoint, activity, c.authenticator.feedAuth(resFeed, feed)) + resp, err := c.post(ctx, endpoint, activity, c.authenticator.feedAuth(resFeed, feed)) if err != nil { return nil, err } @@ -504,14 +505,14 @@ func (c *Client) addActivity(feed Feed, activity Activity) (*AddActivityResponse return &out, nil } -func (c *Client) addActivities(feed Feed, activities ...Activity) (*AddActivitiesResponse, error) { +func (c *Client) addActivities(ctx context.Context, feed Feed, activities ...Activity) (*AddActivitiesResponse, error) { reqBody := struct { Activities []Activity `json:"activities,omitempty"` }{ Activities: activities, } endpoint := c.makeEndpoint("feed/%s/%s/", feed.Slug(), feed.UserID()) - resp, err := c.post(endpoint, reqBody, c.authenticator.feedAuth(resFeed, feed)) + resp, err := c.post(ctx, endpoint, reqBody, c.authenticator.feedAuth(resFeed, feed)) if err != nil { return nil, err } @@ -522,9 +523,9 @@ func (c *Client) addActivities(feed Feed, activities ...Activity) (*AddActivitie return &out, nil } -func (c *Client) removeActivityByID(feed Feed, activityID string) (*RemoveActivityResponse, error) { +func (c *Client) removeActivityByID(ctx context.Context, feed Feed, activityID string) (*RemoveActivityResponse, error) { endpoint := c.makeEndpoint("feed/%s/%s/%s/", feed.Slug(), feed.UserID(), activityID) - resp, err := c.delete(endpoint, nil, c.authenticator.feedAuth(resFeed, feed)) + resp, err := c.delete(ctx, endpoint, nil, c.authenticator.feedAuth(resFeed, feed)) if err != nil { return nil, err } @@ -535,10 +536,10 @@ func (c *Client) removeActivityByID(feed Feed, activityID string) (*RemoveActivi return &out, nil } -func (c *Client) removeActivityByForeignID(feed Feed, foreignID string) (*RemoveActivityResponse, error) { +func (c *Client) removeActivityByForeignID(ctx context.Context, feed Feed, foreignID string) (*RemoveActivityResponse, error) { endpoint := c.makeEndpoint("feed/%s/%s/%s/", feed.Slug(), feed.UserID(), foreignID) endpoint.addQueryParam(makeRequestOption("foreign_id", 1)) - resp, err := c.delete(endpoint, nil, c.authenticator.feedAuth(resFeed, feed)) + resp, err := c.delete(ctx, endpoint, nil, c.authenticator.feedAuth(resFeed, feed)) if err != nil { return nil, err } @@ -549,35 +550,35 @@ func (c *Client) removeActivityByForeignID(feed Feed, foreignID string) (*Remove return &out, nil } -func (c *Client) getActivities(feed Feed, opts ...GetActivitiesOption) ([]byte, error) { +func (c *Client) getActivities(ctx context.Context, feed Feed, opts ...GetActivitiesOption) ([]byte, error) { endpoint := c.makeEndpoint("feed/%s/%s/", feed.Slug(), feed.UserID()) - return c.getActivitiesInternal(endpoint, feed, opts...) + return c.getActivitiesInternal(ctx, endpoint, feed, opts...) } -func (c *Client) getEnrichedActivities(feed Feed, opts ...GetActivitiesOption) ([]byte, error) { +func (c *Client) getEnrichedActivities(ctx context.Context, feed Feed, opts ...GetActivitiesOption) ([]byte, error) { endpoint := c.makeEndpoint("enrich/feed/%s/%s/", feed.Slug(), feed.UserID()) - return c.getActivitiesInternal(endpoint, feed, opts...) + return c.getActivitiesInternal(ctx, endpoint, feed, opts...) } -func (c *Client) getActivitiesInternal(endpoint endpoint, feed Feed, opts ...GetActivitiesOption) ([]byte, error) { +func (c *Client) getActivitiesInternal(ctx context.Context, endpoint endpoint, feed Feed, opts ...GetActivitiesOption) ([]byte, error) { for _, opt := range opts { endpoint.addQueryParam(opt) } - return c.get(endpoint, nil, c.authenticator.feedAuth(resFeed, feed)) + return c.get(ctx, endpoint, nil, c.authenticator.feedAuth(resFeed, feed)) } -func (c *Client) follow(feed Feed, opts *followFeedOptions) (*BaseResponse, error) { +func (c *Client) follow(ctx context.Context, feed Feed, opts *followFeedOptions) (*BaseResponse, error) { endpoint := c.makeEndpoint("feed/%s/%s/follows/", feed.Slug(), feed.UserID()) - return decode(c.post(endpoint, opts, c.authenticator.feedAuth(resFollower, feed))) + return decode(c.post(ctx, endpoint, opts, c.authenticator.feedAuth(resFollower, feed))) } -func (c *Client) getFollowers(feed Feed, opts ...FollowersOption) (*FollowersResponse, error) { +func (c *Client) getFollowers(ctx context.Context, feed Feed, opts ...FollowersOption) (*FollowersResponse, error) { endpoint := c.makeEndpoint("feed/%s/%s/followers/", feed.Slug(), feed.UserID()) for _, opt := range opts { endpoint.addQueryParam(opt) } - resp, err := c.get(endpoint, nil, c.authenticator.feedAuth(resFollower, feed)) + resp, err := c.get(ctx, endpoint, nil, c.authenticator.feedAuth(resFollower, feed)) if err != nil { return nil, err } @@ -588,13 +589,13 @@ func (c *Client) getFollowers(feed Feed, opts ...FollowersOption) (*FollowersRes return &out, nil } -func (c *Client) getFollowing(feed Feed, opts ...FollowingOption) (*FollowingResponse, error) { +func (c *Client) getFollowing(ctx context.Context, feed Feed, opts ...FollowingOption) (*FollowingResponse, error) { endpoint := c.makeEndpoint("feed/%s/%s/follows/", feed.Slug(), feed.UserID()) for _, opt := range opts { endpoint.addQueryParam(opt) } - resp, err := c.get(endpoint, nil, c.authenticator.feedAuth(resFollower, feed)) + resp, err := c.get(ctx, endpoint, nil, c.authenticator.feedAuth(resFollower, feed)) if err != nil { return nil, err } @@ -605,16 +606,16 @@ func (c *Client) getFollowing(feed Feed, opts ...FollowingOption) (*FollowingRes return &out, nil } -func (c *Client) unfollow(feed Feed, target string, opts ...UnfollowOption) (*BaseResponse, error) { +func (c *Client) unfollow(ctx context.Context, feed Feed, target string, opts ...UnfollowOption) (*BaseResponse, error) { endpoint := c.makeEndpoint("feed/%s/%s/follows/%s/", feed.Slug(), feed.UserID(), target) for _, opt := range opts { endpoint.addQueryParam(opt) } - return decode(c.delete(endpoint, nil, c.authenticator.feedAuth(resFollower, feed))) + return decode(c.delete(ctx, endpoint, nil, c.authenticator.feedAuth(resFollower, feed))) } -func (c *Client) followStats(feed Feed, opts ...FollowStatOption) (*FollowStatResponse, error) { +func (c *Client) followStats(ctx context.Context, feed Feed, opts ...FollowStatOption) (*FollowStatResponse, error) { endpoint := c.makeEndpoint("stats/follow/") endpoint.addQueryParam(makeRequestOption("followers", feed.ID())) endpoint.addQueryParam(makeRequestOption("following", feed.ID())) @@ -622,7 +623,7 @@ func (c *Client) followStats(feed Feed, opts ...FollowStatOption) (*FollowStatRe endpoint.addQueryParam(opt) } - resp, err := c.get(endpoint, nil, c.authenticator.feedAuth(resFollower, nil)) + resp, err := c.get(ctx, endpoint, nil, c.authenticator.feedAuth(resFollower, nil)) if err != nil { return nil, err } @@ -633,7 +634,7 @@ func (c *Client) followStats(feed Feed, opts ...FollowStatOption) (*FollowStatRe return &out, nil } -func (c *Client) updateToTargets(feed Feed, activity Activity, opts ...UpdateToTargetsOption) (*UpdateToTargetsResponse, error) { +func (c *Client) updateToTargets(ctx context.Context, feed Feed, activity Activity, opts ...UpdateToTargetsOption) (*UpdateToTargetsResponse, error) { endpoint := c.makeEndpoint("feed_targets/%s/%s/activity_to_targets/", feed.Slug(), feed.UserID()) req := &updateToTargetsRequest{ @@ -644,7 +645,7 @@ func (c *Client) updateToTargets(feed Feed, activity Activity, opts ...UpdateToT opt(req) } - resp, err := c.post(endpoint, req, c.authenticator.feedAuth(resFeedTargets, feed)) + resp, err := c.post(ctx, endpoint, req, c.authenticator.feedAuth(resFeedTargets, feed)) if err != nil { return nil, err } diff --git a/client_internal_test.go b/client_internal_test.go index 54f7dc2..5cccb86 100644 --- a/client_internal_test.go +++ b/client_internal_test.go @@ -1,6 +1,7 @@ package stream import ( + "context" "fmt" "io" "net/http" @@ -193,6 +194,7 @@ func (r requester) Do(*http.Request) (*http.Response, error) { } func Test_requestErrors(t *testing.T) { + ctx := context.Background() testCases := []struct { data interface{} method string @@ -236,7 +238,7 @@ func Test_requestErrors(t *testing.T) { for _, tc := range testCases { c := &Client{requester: tc.requester} - _, err := c.request(tc.method, endpoint{url: &url.URL{}, query: url.Values{}}, tc.data, tc.authFn) + _, err := c.request(ctx, tc.method, endpoint{url: &url.URL{}, query: url.Values{}}, tc.data, tc.authFn) require.Error(t, err) assert.Equal(t, tc.expected.Error(), err.Error()) } diff --git a/client_test.go b/client_test.go index 68c7da1..3e34028 100644 --- a/client_test.go +++ b/client_test.go @@ -1,6 +1,7 @@ package stream_test import ( + "context" "net/http" "strconv" "testing" @@ -14,11 +15,12 @@ import ( ) func TestHeaders(t *testing.T) { + ctx := context.Background() client, requester := newClient(t) feed, err := client.FlatFeed("user", "123") require.NoError(t, err) - _, err = feed.GetActivities() + _, err = feed.GetActivities(ctx) require.NoError(t, err) assert.Equal(t, "application/json", requester.req.Header.Get("content-type")) assert.Regexp(t, "^stream-go2-client-v[0-9\\.]+$", requester.req.Header.Get("x-stream-client")) @@ -27,12 +29,13 @@ func TestHeaders(t *testing.T) { func TestAddToMany(t *testing.T) { var ( client, requester = newClient(t) + ctx = context.Background() activity = stream.Activity{Actor: "bob", Verb: "like", Object: "cake"} flat, _ = newFlatFeedWithUserID(client, "123") aggregated, _ = newAggregatedFeedWithUserID(client, "123") ) - err := client.AddToMany(activity, flat, aggregated) + err := client.AddToMany(ctx, activity, flat, aggregated) require.NoError(t, err) body := `{"activity":{"actor":"bob","object":"cake","verb":"like"},"feeds":["flat:123","aggregated:123"]}` testRequest(t, requester.req, http.MethodPost, "https://api.stream-io-api.com/api/v1.0/feed/add_to_many/?api_key=key", body) @@ -41,6 +44,7 @@ func TestAddToMany(t *testing.T) { func TestFollowMany(t *testing.T) { var ( client, requester = newClient(t) + ctx = context.Background() relationships = make([]stream.FollowRelationship, 3) flat, _ = newFlatFeedWithUserID(client, "123") ) @@ -50,12 +54,12 @@ func TestFollowMany(t *testing.T) { relationships[i] = stream.NewFollowRelationship(other, flat) } - err := client.FollowMany(relationships) + err := client.FollowMany(ctx, relationships) require.NoError(t, err) body := `[{"source":"aggregated:0","target":"flat:123"},{"source":"aggregated:1","target":"flat:123"},{"source":"aggregated:2","target":"flat:123"}]` testRequest(t, requester.req, http.MethodPost, "https://api.stream-io-api.com/api/v1.0/follow_many/?api_key=key", body) - err = client.FollowMany(relationships, stream.WithFollowManyActivityCopyLimit(500)) + err = client.FollowMany(ctx, relationships, stream.WithFollowManyActivityCopyLimit(500)) require.NoError(t, err) body = `[{"source":"aggregated:0","target":"flat:123"},{"source":"aggregated:1","target":"flat:123"},{"source":"aggregated:2","target":"flat:123"}]` testRequest(t, requester.req, http.MethodPost, "https://api.stream-io-api.com/api/v1.0/follow_many/?activity_copy_limit=500&api_key=key", body) @@ -64,6 +68,7 @@ func TestFollowMany(t *testing.T) { func TestFollowManyActivityCopyLimit(t *testing.T) { var ( client, requester = newClient(t) + ctx = context.Background() relationships = make([]stream.FollowRelationship, 3) flat, _ = newFlatFeedWithUserID(client, "123") ) @@ -73,12 +78,12 @@ func TestFollowManyActivityCopyLimit(t *testing.T) { relationships[i] = stream.NewFollowRelationship(other, flat, stream.WithFollowRelationshipActivityCopyLimit(i)) } - err := client.FollowMany(relationships) + err := client.FollowMany(ctx, relationships) require.NoError(t, err) body := `[{"source":"aggregated:0","target":"flat:123","activity_copy_limit":0},{"source":"aggregated:1","target":"flat:123","activity_copy_limit":1},{"source":"aggregated:2","target":"flat:123","activity_copy_limit":2}]` testRequest(t, requester.req, http.MethodPost, "https://api.stream-io-api.com/api/v1.0/follow_many/?api_key=key", body) - err = client.FollowMany(relationships, stream.WithFollowManyActivityCopyLimit(123)) + err = client.FollowMany(ctx, relationships, stream.WithFollowManyActivityCopyLimit(123)) require.NoError(t, err) body = `[{"source":"aggregated:0","target":"flat:123","activity_copy_limit":0},{"source":"aggregated:1","target":"flat:123","activity_copy_limit":1},{"source":"aggregated:2","target":"flat:123","activity_copy_limit":2}]` testRequest(t, requester.req, http.MethodPost, "https://api.stream-io-api.com/api/v1.0/follow_many/?activity_copy_limit=123&api_key=key", body) @@ -87,6 +92,7 @@ func TestFollowManyActivityCopyLimit(t *testing.T) { func TestUnfollowMany(t *testing.T) { var ( client, requester = newClient(t) + ctx = context.Background() relationships = make([]stream.UnfollowRelationship, 3) ) for i := range relationships { @@ -102,18 +108,20 @@ func TestUnfollowMany(t *testing.T) { relationships[i] = stream.NewUnfollowRelationship(src, tgt, options...) } - err := client.UnfollowMany(relationships) + err := client.UnfollowMany(ctx, relationships) require.NoError(t, err) body := `[{"source":"src:0","target":"tgt:0","keep_history":true},{"source":"src:1","target":"tgt:1","keep_history":false},{"source":"src:2","target":"tgt:2","keep_history":true}]` testRequest(t, requester.req, http.MethodPost, "https://api.stream-io-api.com/api/v1.0/unfollow_many/?api_key=key", body) } func TestGetActivities(t *testing.T) { + ctx := context.Background() client, requester := newClient(t) - _, err := client.GetActivitiesByID("foo", "bar", "baz") + _, err := client.GetActivitiesByID(ctx, "foo", "bar", "baz") require.NoError(t, err) testRequest(t, requester.req, http.MethodGet, "https://api.stream-io-api.com/api/v1.0/activities/?api_key=key&ids=foo%2Cbar%2Cbaz", "") _, err = client.GetActivitiesByForeignID( + ctx, stream.NewForeignIDTimePair("foo", stream.Time{}), stream.NewForeignIDTimePair("bar", stream.Time{Time: time.Time{}.Add(time.Second)}), ) @@ -122,11 +130,13 @@ func TestGetActivities(t *testing.T) { } func TestGetEnrichedActivities(t *testing.T) { + ctx := context.Background() client, requester := newClient(t) - _, err := client.GetEnrichedActivitiesByID([]string{"foo", "bar", "baz"}, stream.WithEnrichReactionCounts()) + _, err := client.GetEnrichedActivitiesByID(ctx, []string{"foo", "bar", "baz"}, stream.WithEnrichReactionCounts()) require.NoError(t, err) testRequest(t, requester.req, http.MethodGet, "https://api.stream-io-api.com/api/v1.0/enrich/activities/?api_key=key&ids=foo%2Cbar%2Cbaz&withReactionCounts=true", "") _, err = client.GetEnrichedActivitiesByForeignID( + ctx, []stream.ForeignIDTimePair{ stream.NewForeignIDTimePair("foo", stream.Time{}), stream.NewForeignIDTimePair("bar", stream.Time{Time: time.Time{}.Add(time.Second)}), @@ -138,28 +148,31 @@ func TestGetEnrichedActivities(t *testing.T) { } func TestUpdateActivityByID(t *testing.T) { + ctx := context.Background() client, requester := newClient(t) - _, err := client.UpdateActivityByID("abcdef", map[string]interface{}{"foo.bar": "baz", "popularity": 42, "color": map[string]interface{}{"hex": "FF0000", "rgb": "255,0,0"}}, []string{"a", "b", "c"}) + _, err := client.UpdateActivityByID(ctx, "abcdef", map[string]interface{}{"foo.bar": "baz", "popularity": 42, "color": map[string]interface{}{"hex": "FF0000", "rgb": "255,0,0"}}, []string{"a", "b", "c"}) require.NoError(t, err) body := `{"id":"abcdef","set":{"color":{"hex":"FF0000","rgb":"255,0,0"},"foo.bar":"baz","popularity":42},"unset":["a","b","c"]}` testRequest(t, requester.req, http.MethodPost, "https://api.stream-io-api.com/api/v1.0/activity/?api_key=key", body) - _, err = client.UpdateActivityByID("abcdef", map[string]interface{}{"foo.bar": "baz", "popularity": 42, "color": map[string]interface{}{"hex": "FF0000", "rgb": "255,0,0"}}, nil) + _, err = client.UpdateActivityByID(ctx, "abcdef", map[string]interface{}{"foo.bar": "baz", "popularity": 42, "color": map[string]interface{}{"hex": "FF0000", "rgb": "255,0,0"}}, nil) require.NoError(t, err) body = `{"id":"abcdef","set":{"color":{"hex":"FF0000","rgb":"255,0,0"},"foo.bar":"baz","popularity":42}}` testRequest(t, requester.req, http.MethodPost, "https://api.stream-io-api.com/api/v1.0/activity/?api_key=key", body) - _, err = client.UpdateActivityByID("abcdef", nil, []string{"a", "b", "c"}) + _, err = client.UpdateActivityByID(ctx, "abcdef", nil, []string{"a", "b", "c"}) require.NoError(t, err) body = `{"id":"abcdef","unset":["a","b","c"]}` testRequest(t, requester.req, http.MethodPost, "https://api.stream-io-api.com/api/v1.0/activity/?api_key=key", body) } func TestPartialUpdateActivities(t *testing.T) { + ctx := context.Background() client, requester := newClient(t) _, err := client.PartialUpdateActivities( + ctx, stream.NewUpdateActivityRequestByID( "abcdef", map[string]interface{}{"foo.bar": "baz"}, @@ -177,6 +190,7 @@ func TestPartialUpdateActivities(t *testing.T) { tt, _ := time.Parse(stream.TimeLayout, "2006-01-02T15:04:05.999999") _, err = client.PartialUpdateActivities( + ctx, stream.NewUpdateActivityRequestByForeignID( "abcdef:123", stream.Time{Time: tt}, @@ -196,21 +210,22 @@ func TestPartialUpdateActivities(t *testing.T) { } func TestUpdateActivityByForeignID(t *testing.T) { + ctx := context.Background() client, requester := newClient(t) tt := stream.Time{Time: time.Date(2018, 6, 24, 11, 28, 0, 0, time.UTC)} - _, err := client.UpdateActivityByForeignID("fid:123", tt, map[string]interface{}{"foo.bar": "baz", "popularity": 42, "color": map[string]interface{}{"hex": "FF0000", "rgb": "255,0,0"}}, []string{"a", "b", "c"}) + _, err := client.UpdateActivityByForeignID(ctx, "fid:123", tt, map[string]interface{}{"foo.bar": "baz", "popularity": 42, "color": map[string]interface{}{"hex": "FF0000", "rgb": "255,0,0"}}, []string{"a", "b", "c"}) require.NoError(t, err) body := `{"foreign_id":"fid:123","time":"2018-06-24T11:28:00","set":{"color":{"hex":"FF0000","rgb":"255,0,0"},"foo.bar":"baz","popularity":42},"unset":["a","b","c"]}` testRequest(t, requester.req, http.MethodPost, "https://api.stream-io-api.com/api/v1.0/activity/?api_key=key", body) - _, err = client.UpdateActivityByForeignID("fid:123", tt, map[string]interface{}{"foo.bar": "baz", "popularity": 42, "color": map[string]interface{}{"hex": "FF0000", "rgb": "255,0,0"}}, nil) + _, err = client.UpdateActivityByForeignID(ctx, "fid:123", tt, map[string]interface{}{"foo.bar": "baz", "popularity": 42, "color": map[string]interface{}{"hex": "FF0000", "rgb": "255,0,0"}}, nil) require.NoError(t, err) body = `{"foreign_id":"fid:123","time":"2018-06-24T11:28:00","set":{"color":{"hex":"FF0000","rgb":"255,0,0"},"foo.bar":"baz","popularity":42}}` testRequest(t, requester.req, http.MethodPost, "https://api.stream-io-api.com/api/v1.0/activity/?api_key=key", body) - _, err = client.UpdateActivityByForeignID("fid:123", tt, nil, []string{"a", "b", "c"}) + _, err = client.UpdateActivityByForeignID(ctx, "fid:123", tt, nil, []string{"a", "b", "c"}) require.NoError(t, err) body = `{"foreign_id":"fid:123","time":"2018-06-24T11:28:00","unset":["a","b","c"]}` testRequest(t, requester.req, http.MethodPost, "https://api.stream-io-api.com/api/v1.0/activity/?api_key=key", body) diff --git a/collections.go b/collections.go index 4c0c5d5..00c277b 100644 --- a/collections.go +++ b/collections.go @@ -1,6 +1,7 @@ package stream import ( + "context" "encoding/json" "errors" "fmt" @@ -13,7 +14,7 @@ type CollectionsClient struct { } // Upsert creates new or updates existing objects for the given collection's name. -func (c *CollectionsClient) Upsert(collection string, objects ...CollectionObject) (*BaseResponse, error) { +func (c *CollectionsClient) Upsert(ctx context.Context, collection string, objects ...CollectionObject) (*BaseResponse, error) { if collection == "" { return nil, errors.New("collection name required") } @@ -23,12 +24,12 @@ func (c *CollectionsClient) Upsert(collection string, objects ...CollectionObjec collection: objects, }, } - return decode(c.client.post(endpoint, data, c.client.authenticator.collectionsAuth)) + return decode(c.client.post(ctx, endpoint, data, c.client.authenticator.collectionsAuth)) } // Select returns a list of CollectionObjects for the given collection name // having the given IDs. -func (c *CollectionsClient) Select(collection string, ids ...string) (*GetCollectionResponse, error) { +func (c *CollectionsClient) Select(ctx context.Context, collection string, ids ...string) (*GetCollectionResponse, error) { if collection == "" { return nil, errors.New("collection name required") } @@ -38,7 +39,7 @@ func (c *CollectionsClient) Select(collection string, ids ...string) (*GetCollec } endpoint := c.client.makeEndpoint("collections/") endpoint.addQueryParam(makeRequestOption("foreign_ids", strings.Join(foreignIDs, ","))) - resp, err := c.client.get(endpoint, nil, c.client.authenticator.collectionsAuth) + resp, err := c.client.get(ctx, endpoint, nil, c.client.authenticator.collectionsAuth) if err != nil { return nil, err } @@ -53,14 +54,14 @@ func (c *CollectionsClient) Select(collection string, ids ...string) (*GetCollec } // DeleteMany removes from a collection the objects having the given IDs. -func (c *CollectionsClient) DeleteMany(collection string, ids ...string) (*BaseResponse, error) { +func (c *CollectionsClient) DeleteMany(ctx context.Context, collection string, ids ...string) (*BaseResponse, error) { if collection == "" { return nil, errors.New("collection name required") } endpoint := c.client.makeEndpoint("collections/") endpoint.addQueryParam(makeRequestOption("collection_name", collection)) endpoint.addQueryParam(makeRequestOption("ids", strings.Join(ids, ","))) - return decode(c.client.delete(endpoint, nil, c.client.authenticator.collectionsAuth)) + return decode(c.client.delete(ctx, endpoint, nil, c.client.authenticator.collectionsAuth)) } func (c *CollectionsClient) decodeObject(resp []byte, err error) (*CollectionObjectResponse, error) { @@ -75,7 +76,7 @@ func (c *CollectionsClient) decodeObject(resp []byte, err error) (*CollectionObj } // Add adds a single object to a collection. -func (c *CollectionsClient) Add(collection string, object CollectionObject, opts ...AddObjectOption) (*CollectionObjectResponse, error) { +func (c *CollectionsClient) Add(ctx context.Context, collection string, object CollectionObject, opts ...AddObjectOption) (*CollectionObjectResponse, error) { if collection == "" { return nil, errors.New("collection name required") } @@ -90,21 +91,21 @@ func (c *CollectionsClient) Add(collection string, object CollectionObject, opts req.ID = object.ID req.Data = object.Data - return c.decodeObject(c.client.post(endpoint, req, c.client.authenticator.collectionsAuth)) + return c.decodeObject(c.client.post(ctx, endpoint, req, c.client.authenticator.collectionsAuth)) } // Get retrieves a collection object having the given ID. -func (c *CollectionsClient) Get(collection, id string) (*CollectionObjectResponse, error) { +func (c *CollectionsClient) Get(ctx context.Context, collection, id string) (*CollectionObjectResponse, error) { if collection == "" { return nil, errors.New("collection name required") } endpoint := c.client.makeEndpoint("collections/%s/%s/", collection, id) - return c.decodeObject(c.client.get(endpoint, nil, c.client.authenticator.collectionsAuth)) + return c.decodeObject(c.client.get(ctx, endpoint, nil, c.client.authenticator.collectionsAuth)) } // Update updates the given collection object's data. -func (c *CollectionsClient) Update(collection, id string, data map[string]interface{}) (*CollectionObjectResponse, error) { +func (c *CollectionsClient) Update(ctx context.Context, collection, id string, data map[string]interface{}) (*CollectionObjectResponse, error) { if collection == "" { return nil, errors.New("collection name required") } @@ -113,17 +114,17 @@ func (c *CollectionsClient) Update(collection, id string, data map[string]interf "data": data, } - return c.decodeObject(c.client.put(endpoint, reqData, c.client.authenticator.collectionsAuth)) + return c.decodeObject(c.client.put(ctx, endpoint, reqData, c.client.authenticator.collectionsAuth)) } // Delete removes from a collection the object having the given ID. -func (c *CollectionsClient) Delete(collection, id string) (*BaseResponse, error) { +func (c *CollectionsClient) Delete(ctx context.Context, collection, id string) (*BaseResponse, error) { if collection == "" { return nil, errors.New("collection name required") } endpoint := c.client.makeEndpoint("collections/%s/%s/", collection, id) - return decode(c.client.delete(endpoint, nil, c.client.authenticator.collectionsAuth)) + return decode(c.client.delete(ctx, endpoint, nil, c.client.authenticator.collectionsAuth)) } // CreateReference returns a new reference string in the form SO: