From f368315e856cc038b93f5f7e9bfcf08f5e2e8444 Mon Sep 17 00:00:00 2001 From: mcamou Date: Thu, 21 Aug 2025 16:57:37 +0200 Subject: [PATCH 1/6] Implement Reddit scraper --- .gitignore | 8 +- Makefile | 9 +- api/types/job.go | 13 ++ api/types/reddit/reddit.go | 190 ++++++++++++++++++ api/types/reddit/reddit_suite_test.go | 13 ++ api/types/reddit/reddit_test.go | 150 ++++++++++++++ go.mod | 3 + go.sum | 4 +- internal/capabilities/detector.go | 6 +- internal/jobs/reddit.go | 111 +++++++++++ internal/jobs/reddit_test.go | 232 ++++++++++++++++++++++ internal/jobs/redditapify/client.go | 174 +++++++++++++++++ internal/jobs/redditapify/client_test.go | 239 +++++++++++++++++++++++ internal/jobs/twitter.go | 33 ++-- internal/jobs/twitterapify/client.go | 16 +- internal/jobs/webscraper.go | 15 +- internal/jobserver/jobserver.go | 9 +- internal/jobserver/jobserver_test.go | 6 +- internal/jobserver/worker.go | 5 +- pkg/client/apify_client.go | 54 ++--- 20 files changed, 1216 insertions(+), 74 deletions(-) create mode 100644 api/types/reddit/reddit.go create mode 100644 api/types/reddit/reddit_suite_test.go create mode 100644 api/types/reddit/reddit_test.go create mode 100644 internal/jobs/reddit.go create mode 100644 internal/jobs/reddit_test.go create mode 100644 internal/jobs/redditapify/client.go create mode 100644 internal/jobs/redditapify/client_test.go diff --git a/.gitignore b/.gitignore index bf81d40d..72e5a502 100644 --- a/.gitignore +++ b/.gitignore @@ -74,9 +74,13 @@ snippets.txt dist/ bp-todo.md +# Test files .masa/.env +.masa/* + # TEE tee/private.pem -.aider* -.masa/* \ No newline at end of file +# LLML +.aider* +GEMINI.md diff --git a/Makefile b/Makefile index c29b44cf..b45da9fd 100644 --- a/Makefile +++ b/Makefile @@ -61,11 +61,11 @@ docker-build-test: tee/private.pem @docker build --target=dependencies --build-arg baseimage=builder --secret id=private_key,src=./tee/private.pem -t $(TEST_IMAGE) -f Dockerfile . ci-test: - @go test -coverprofile=coverage/coverage.txt -covermode=atomic -v $(TEST_ARGS) + go test -coverprofile=coverage/coverage.txt -covermode=atomic -v $(TEST_ARGS) .PHONY: test test: docker-build-test - @docker run --user root $(ENV_FILE_ARG) -e LOG_LEVEL=debug -v $(PWD)/coverage:/app/coverage --rm --workdir /app $(TEST_IMAGE) go test -coverprofile=coverage/coverage.txt -covermode=atomic -v $(TEST_ARGS) + docker run --user root $(ENV_FILE_ARG) -e LOG_LEVEL=debug -v $(PWD)/coverage:/app/coverage --rm --workdir /app $(TEST_IMAGE) go test -coverprofile=coverage/coverage.txt -covermode=atomic -v $(TEST_ARGS) test-capabilities: docker-build-test @docker run --user root $(ENV_FILE_ARG) -e LOG_LEVEL=debug -v $(PWD)/coverage:/app/coverage --rm --workdir /app $(TEST_IMAGE) go test -coverprofile=coverage/coverage-capabilities.txt -covermode=atomic -v ./internal/capabilities @@ -79,8 +79,11 @@ test-twitter: docker-build-test test-tiktok: docker-build-test @docker run --user root $(ENV_FILE_ARG) -v $(PWD)/.masa:/home/masa -v $(PWD)/coverage:/app/coverage --rm --workdir /app -e DATA_DIR=/home/masa $(TEST_IMAGE) go test -v ./internal/jobs/tiktok_transcription_test.go ./internal/jobs/jobs_suite_test.go +test-reddit: docker-build-test + @docker run --user root $(ENV_FILE_ARG) -v $(PWD)/.masa:/home/masa -v $(PWD)/coverage:/app/coverage --rm --workdir /app -e DATA_DIR=/home/masa $(TEST_IMAGE) go test -v ./internal/jobs/reddit_test.go ./internal/jobs/redditapify/client_test.go ./api/types/reddit/reddit_suite_test.go + test-web: docker-build-test @docker run --user root $(ENV_FILE_ARG) -v $(PWD)/.masa:/home/masa -v $(PWD)/coverage:/app/coverage --rm --workdir /app -e DATA_DIR=/home/masa $(TEST_IMAGE) go test -v ./internal/jobs/webscraper_test.go ./internal/jobs/jobs_suite_test.go test-telemetry: docker-build-test - @docker run --user root $(ENV_FILE_ARG) -v $(PWD)/.masa:/home/masa -v $(PWD)/coverage:/app/coverage --rm --workdir /app -e DATA_DIR=/home/masa $(TEST_IMAGE) go test -v ./internal/jobs/telemetry_test.go ./internal/jobs/jobs_suite_test.go \ No newline at end of file + @docker run --user root $(ENV_FILE_ARG) -v $(PWD)/.masa:/home/masa -v $(PWD)/coverage:/app/coverage --rm --workdir /app -e DATA_DIR=/home/masa $(TEST_IMAGE) go test -v ./internal/jobs/telemetry_test.go ./internal/jobs/jobs_suite_test.go diff --git a/api/types/job.go b/api/types/job.go index a228c66c..7313683c 100644 --- a/api/types/job.go +++ b/api/types/job.go @@ -202,3 +202,16 @@ func (jc JobConfiguration) GetTwitterConfig() TwitterScraperConfig { SkipLoginVerification: jc.GetBool("skip_login_verification", false), } } + +// RedditConfig represents the configuration needed for Reddit scraping via Apify +type RedditConfig struct { + ApifyApiKey string +} + +// GetRedditConfig constructs a RedditConfig directly from the JobConfiguration +// This eliminates the need for JSON marshaling/unmarshaling +func (jc JobConfiguration) GetRedditConfig() RedditConfig { + return RedditConfig{ + ApifyApiKey: jc.GetString("apify_api_key", ""), + } +} diff --git a/api/types/reddit/reddit.go b/api/types/reddit/reddit.go new file mode 100644 index 00000000..b8c4fcb8 --- /dev/null +++ b/api/types/reddit/reddit.go @@ -0,0 +1,190 @@ +package reddit + +import ( + "encoding/json" + "fmt" + "time" + + "github.com/masa-finance/tee-types/pkg/util" +) + +type QueryType string + +const ( + ScrapeUrls QueryType = "scrapeurls" + SearchPosts QueryType = "searchposts" + SearchUsers QueryType = "searchusers" + SearchCommunities QueryType = "searchcommunities" +) + +var AllQueryTypes = util.NewSet(ScrapeUrls, SearchPosts, SearchUsers, SearchCommunities) + +type SortType string + +const ( + SortRelevance SortType = "relevance" + SortHot SortType = "hot" + SortTop SortType = "top" + SortNew SortType = "new" + SortRising SortType = "rising" + SortComments SortType = "comments" +) + +var AllSortTypes = util.NewSet( + SortRelevance, + SortHot, + SortTop, + SortNew, + SortRising, + SortComments, +) + +// StartURL represents a single start URL for the Apify Reddit scraper. +type StartURL struct { + URL string `json:"url"` + Method string `json:"method"` +} + +type ResponseType string + +const ( + UserResponse ResponseType = "user" + PostResponse ResponseType = "post" + CommentResponse ResponseType = "comment" + CommunityResponse ResponseType = "community" +) + +// User represents the data structure for a Reddit user from the Apify scraper. +type User struct { + ID string `json:"id"` + URL string `json:"url"` + Username string `json:"username"` + UserIcon string `json:"userIcon"` + PostKarma int `json:"postKarma"` + CommentKarma int `json:"commentKarma"` + Description string `json:"description"` + Over18 bool `json:"over18"` + CreatedAt time.Time `json:"createdAt"` + ScrapedAt time.Time `json:"scrapedAt"` + DataType string `json:"dataType"` +} + +// Post represents the data structure for a Reddit post from the Apify scraper. +type Post struct { + ID string `json:"id"` + ParsedID string `json:"parsedId"` + URL string `json:"url"` + Username string `json:"username"` + Title string `json:"title"` + CommunityName string `json:"communityName"` + ParsedCommunityName string `json:"parsedCommunityName"` + Body string `json:"body"` + HTML *string `json:"html"` + NumberOfComments int `json:"numberOfComments"` + UpVotes int `json:"upVotes"` + IsVideo bool `json:"isVideo"` + IsAd bool `json:"isAd"` + Over18 bool `json:"over18"` + CreatedAt time.Time `json:"createdAt"` + ScrapedAt time.Time `json:"scrapedAt"` + DataType string `json:"dataType"` +} + +// Comment represents the data structure for a Reddit comment from the Apify scraper. +type Comment struct { + ID string `json:"id"` + ParsedID string `json:"parsedId"` + URL string `json:"url"` + ParentID string `json:"parentId"` + Username string `json:"username"` + Category string `json:"category"` + CommunityName string `json:"communityName"` + Body string `json:"body"` + CreatedAt time.Time `json:"createdAt"` + ScrapedAt time.Time `json:"scrapedAt"` + UpVotes int `json:"upVotes"` + NumberOfReplies int `json:"numberOfreplies"` + HTML string `json:"html"` + DataType string `json:"dataType"` +} + +// Community represents the data structure for a Reddit community from the Apify scraper. +type Community struct { + ID string `json:"id"` + Name string `json:"name"` + Title string `json:"title"` + HeaderImage string `json:"headerImage"` + Description string `json:"description"` + Over18 bool `json:"over18"` + CreatedAt time.Time `json:"createdAt"` + ScrapedAt time.Time `json:"scrapedAt"` + NumberOfMembers int `json:"numberOfMembers"` + URL string `json:"url"` + DataType string `json:"dataType"` +} + +type TypeSwitch struct { + Type ResponseType `json:"dataType"` +} + +type Response struct { + TypeSwitch *TypeSwitch + User *User + Post *Post + Comment *Comment + Community *Community +} + +func (t *Response) UnmarshalJSON(data []byte) error { + t.TypeSwitch = &TypeSwitch{} + if err := json.Unmarshal(data, &t.TypeSwitch); err != nil { + return fmt.Errorf("failed to unmarshal reddit response type: %w", err) + } + + switch t.TypeSwitch.Type { + case UserResponse: + t.User = &User{} + if err := json.Unmarshal(data, t.User); err != nil { + return fmt.Errorf("failed to unmarshal reddit user: %w", err) + } + case PostResponse: + t.Post = &Post{} + if err := json.Unmarshal(data, t.Post); err != nil { + return fmt.Errorf("failed to unmarshal reddit post: %w", err) + } + case CommentResponse: + t.Comment = &Comment{} + if err := json.Unmarshal(data, t.Comment); err != nil { + return fmt.Errorf("failed to unmarshal reddit comment: %w", err) + } + case CommunityResponse: + t.Community = &Community{} + if err := json.Unmarshal(data, t.Community); err != nil { + return fmt.Errorf("failed to unmarshal reddit community: %w", err) + } + default: + return fmt.Errorf("unknown Reddit response type during unmarshal: %s", t.TypeSwitch.Type) + } + return nil +} + +// MarshalJSON implements the json.Marshaler interface for Response. +// It unwraps the inner struct (User, Post, Comment, or Community) and marshals it directly. +func (t *Response) MarshalJSON() ([]byte, error) { + if t.TypeSwitch == nil { + return []byte("null"), nil + } + + switch t.TypeSwitch.Type { + case UserResponse: + return json.Marshal(t.User) + case PostResponse: + return json.Marshal(t.Post) + case CommentResponse: + return json.Marshal(t.Comment) + case CommunityResponse: + return json.Marshal(t.Community) + default: + return nil, fmt.Errorf("unknown Reddit response type during marshal: %s", t.TypeSwitch.Type) + } +} diff --git a/api/types/reddit/reddit_suite_test.go b/api/types/reddit/reddit_suite_test.go new file mode 100644 index 00000000..22d7fd62 --- /dev/null +++ b/api/types/reddit/reddit_suite_test.go @@ -0,0 +1,13 @@ +package reddit_test + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestReddit(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Reddit Suite") +} \ No newline at end of file diff --git a/api/types/reddit/reddit_test.go b/api/types/reddit/reddit_test.go new file mode 100644 index 00000000..2f15c742 --- /dev/null +++ b/api/types/reddit/reddit_test.go @@ -0,0 +1,150 @@ +package reddit_test + +import ( + "encoding/json" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/masa-finance/tee-worker/api/types/reddit" +) + +var _ = Describe("Response", func() { + Context("Unmarshalling JSON", func() { + It("should correctly unmarshal a UserResponse", func() { + jsonData := `{"type": "user", "id": "u1", "username": "testuser"}` + var resp reddit.Response + err := json.Unmarshal([]byte(jsonData), &resp) + Expect(err).NotTo(HaveOccurred()) + Expect(resp.User).NotTo(BeNil()) + Expect(resp.User.ID).To(Equal("u1")) + Expect(resp.User.Username).To(Equal("testuser")) + Expect(resp.Post).To(BeNil()) + Expect(resp.Comment).To(BeNil()) + Expect(resp.Community).To(BeNil()) + }) + + It("should correctly unmarshal a PostResponse", func() { + jsonData := `{"type": "post", "id": "p1", "title": "Test Post"}` + var resp reddit.Response + err := json.Unmarshal([]byte(jsonData), &resp) + Expect(err).NotTo(HaveOccurred()) + Expect(resp.Post).NotTo(BeNil()) + Expect(resp.Post.ID).To(Equal("p1")) + Expect(resp.Post.Title).To(Equal("Test Post")) + Expect(resp.User).To(BeNil()) + Expect(resp.Comment).To(BeNil()) + Expect(resp.Community).To(BeNil()) + }) + + It("should correctly unmarshal a CommentResponse", func() { + jsonData := `{"type": "comment", "id": "c1", "body": "Test Comment"}` + var resp reddit.Response + err := json.Unmarshal([]byte(jsonData), &resp) + Expect(err).NotTo(HaveOccurred()) + Expect(resp.Comment).NotTo(BeNil()) + Expect(resp.Comment.ID).To(Equal("c1")) + Expect(resp.Comment.Body).To(Equal("Test Comment")) + Expect(resp.User).To(BeNil()) + Expect(resp.Post).To(BeNil()) + Expect(resp.Community).To(BeNil()) + }) + + It("should correctly unmarshal a CommunityResponse", func() { + jsonData := `{"type": "community", "id": "co1", "name": "Test Community"}` + var resp reddit.Response + err := json.Unmarshal([]byte(jsonData), &resp) + Expect(err).NotTo(HaveOccurred()) + Expect(resp.Community).NotTo(BeNil()) + Expect(resp.Community.ID).To(Equal("co1")) + Expect(resp.Community.Name).To(Equal("Test Community")) + Expect(resp.User).To(BeNil()) + Expect(resp.Post).To(BeNil()) + Expect(resp.Comment).To(BeNil()) + }) + + It("should return an error for an unknown type", func() { + jsonData := `{"type": "unknown", "id": "u1"}` + var resp reddit.Response + err := json.Unmarshal([]byte(jsonData), &resp) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("unknown Reddit response type: unknown")) + }) + + It("should return an error for invalid JSON", func() { + jsonData := `{"type": "user", "id": "u1"` + var resp reddit.Response + err := json.Unmarshal([]byte(jsonData), &resp) + Expect(err).To(HaveOccurred()) + }) + }) + + Context("Marshalling JSON", func() { + It("should correctly marshal a UserResponse", func() { + resp := reddit.Response{ + TypeSwitch: &reddit.TypeSwitch{Type: reddit.UserResponse}, + User: &reddit.User{ID: "u1", Username: "testuser", DataType: "user"}, + } + jsonData, err := json.Marshal(&resp) + Expect(err).NotTo(HaveOccurred()) + expectedJSON := `{"id":"u1","url":"","username":"testuser","userIcon":"","postKarma":0,"commentKarma":0,"description":"","over18":false,"createdAt":"0001-01-01T00:00:00Z","scrapedAt":"0001-01-01T00:00:00Z","dataType":"user"}` + Expect(jsonData).To(MatchJSON(expectedJSON)) + }) + + It("should correctly marshal a PostResponse", func() { + resp := reddit.Response{ + TypeSwitch: &reddit.TypeSwitch{Type: reddit.PostResponse}, + Post: &reddit.Post{ID: "p1", Title: "Test Post", DataType: "post"}, + } + jsonData, err := json.Marshal(&resp) + Expect(err).NotTo(HaveOccurred()) + expectedJSON := `{"id":"p1","parsedId":"","url":"","username":"","title":"Test Post","communityName":"","parsedCommunityName":"","body":"","html":null,"numberOfComments":0,"upVotes":0,"isVideo":false,"isAd":false,"over18":false,"createdAt":"0001-01-01T00:00:00Z","scrapedAt":"0001-01-01T00:00:00Z","dataType":"post"}` + Expect(jsonData).To(MatchJSON(expectedJSON)) + }) + + It("should correctly marshal a CommentResponse", func() { + now := time.Now().UTC() + resp := reddit.Response{ + TypeSwitch: &reddit.TypeSwitch{Type: reddit.CommentResponse}, + Comment: &reddit.Comment{ID: "c1", Body: "Test Comment", CreatedAt: now, ScrapedAt: now, DataType: "comment"}, + } + jsonData, err := json.Marshal(&resp) + Expect(err).NotTo(HaveOccurred()) + + expectedComment := &reddit.Comment{ID: "c1", Body: "Test Comment", CreatedAt: now, ScrapedAt: now, DataType: "comment"} + expectedJSON, _ := json.Marshal(expectedComment) + Expect(jsonData).To(MatchJSON(expectedJSON)) + }) + + It("should correctly marshal a CommunityResponse", func() { + now := time.Now().UTC() + resp := reddit.Response{ + TypeSwitch: &reddit.TypeSwitch{Type: reddit.CommunityResponse}, + Community: &reddit.Community{ID: "co1", Name: "Test Community", CreatedAt: now, ScrapedAt: now, DataType: "community"}, + } + jsonData, err := json.Marshal(&resp) + Expect(err).NotTo(HaveOccurred()) + + expectedCommunity := &reddit.Community{ID: "co1", Name: "Test Community", CreatedAt: now, ScrapedAt: now, DataType: "community"} + expectedJSON, _ := json.Marshal(expectedCommunity) + Expect(jsonData).To(MatchJSON(expectedJSON)) + }) + + It("should return an error for an unknown type", func() { + resp := reddit.Response{ + TypeSwitch: &reddit.TypeSwitch{Type: "unknown"}, + } + _, err := json.Marshal(&resp) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("unknown Reddit response type: unknown")) + }) + + It("should marshal to null if TypeSwitch is nil", func() { + resp := reddit.Response{} + jsonData, err := json.Marshal(&resp) + Expect(err).NotTo(HaveOccurred()) + Expect(string(jsonData)).To(Equal("null")) + }) + }) +}) diff --git a/go.mod b/go.mod index 15fbf216..bfbb2b87 100644 --- a/go.mod +++ b/go.mod @@ -21,6 +21,9 @@ require ( replace github.com/imperatrona/twitter-scraper => github.com/masa-finance/twitter-scraper v1.0.2 +// FIXME: Remove once the correct version of tee-types is tagged +replace github.com/masa-finance/tee-types => github.com/masa-finance/tee-types v1.1.8-0.20250821113038-3a5a22710aa3 + require ( github.com/AlexEidt/Vidio v1.5.1 // indirect github.com/go-task/slim-sprig/v3 v3.0.0 // indirect diff --git a/go.sum b/go.sum index c6e9ffa7..dd2ba8f0 100644 --- a/go.sum +++ b/go.sum @@ -57,8 +57,8 @@ github.com/labstack/echo/v4 v4.13.4 h1:oTZZW+T3s9gAu5L8vmzihV7/lkXGZuITzTQkTEhcX github.com/labstack/echo/v4 v4.13.4/go.mod h1:g63b33BZ5vZzcIUF8AtRH40DrTlXnx4UMC8rBdndmjQ= github.com/labstack/gommon v0.4.2 h1:F8qTUNXgG1+6WQmqoUWnz8WiEU60mXVVw0P4ht1WRA0= github.com/labstack/gommon v0.4.2/go.mod h1:QlUFxVM+SNXhDL/Z7YhocGIBYOiwB0mXm1+1bAPHPyU= -github.com/masa-finance/tee-types v1.1.7 h1:VF55egisbUCAsfyhvGM26rzt+oKfJgHqcK/LW4uQ9M4= -github.com/masa-finance/tee-types v1.1.7/go.mod h1:sB98t0axFlPi2d0zUPFZSQ84mPGwbr9eRY5yLLE3fSc= +github.com/masa-finance/tee-types v1.1.8-0.20250821113038-3a5a22710aa3 h1:XgubCLsiFOfJ7O2Ak4+qmGUwxADVzgim6fnMGs5Xb/c= +github.com/masa-finance/tee-types v1.1.8-0.20250821113038-3a5a22710aa3/go.mod h1:sB98t0axFlPi2d0zUPFZSQ84mPGwbr9eRY5yLLE3fSc= github.com/masa-finance/twitter-scraper v1.0.2 h1:him+wvYZHg/7EDdy73z1ceUywDJDRAhPLD2CSEa2Vfk= github.com/masa-finance/twitter-scraper v1.0.2/go.mod h1:38MY3g/h4V7Xl4HbW9lnkL8S3YiFZenBFv86hN57RG8= github.com/mattn/go-colorable v0.1.14 h1:9A9LHSqF/7dyVVX6g0U9cwm9pG3kP9gSzcuIPHPsaIE= diff --git a/internal/capabilities/detector.go b/internal/capabilities/detector.go index bbdbb378..32720507 100644 --- a/internal/capabilities/detector.go +++ b/internal/capabilities/detector.go @@ -7,6 +7,7 @@ import ( teetypes "github.com/masa-finance/tee-types/types" "github.com/masa-finance/tee-worker/api/types" "github.com/masa-finance/tee-worker/internal/jobs/twitter" + "maps" ) // JobServerInterface defines the methods we need from JobServer to avoid circular dependencies @@ -27,9 +28,7 @@ func DetectCapabilities(jc types.JobConfiguration, jobServer JobServerInterface) capabilities := make(teetypes.WorkerCapabilities) // Start with always available capabilities - for jobType, caps := range teetypes.AlwaysAvailableCapabilities { - capabilities[jobType] = caps - } + maps.Copy(capabilities, teetypes.AlwaysAvailableCapabilities) // Check what Twitter authentication methods are available accounts := jc.GetStringSlice("twitter_accounts", nil) @@ -61,6 +60,7 @@ func DetectCapabilities(jc types.JobConfiguration, jobServer JobServerInterface) // Add Apify-specific capabilities based on available API key if hasApifyKey { capabilities[teetypes.TwitterApifyJob] = teetypes.TwitterApifyCaps + capabilities[teetypes.RedditJob] = teetypes.RedditCaps } // Add general TwitterJob capability if any Twitter auth is available diff --git a/internal/jobs/reddit.go b/internal/jobs/reddit.go new file mode 100644 index 00000000..818d2e2a --- /dev/null +++ b/internal/jobs/reddit.go @@ -0,0 +1,111 @@ +package jobs + +import ( + "encoding/json" + "errors" + "fmt" + + "time" + + "github.com/sirupsen/logrus" + + "github.com/masa-finance/tee-worker/api/types" + "github.com/masa-finance/tee-worker/api/types/reddit" + "github.com/masa-finance/tee-worker/internal/jobs/redditapify" + "github.com/masa-finance/tee-worker/internal/jobs/stats" + "github.com/masa-finance/tee-worker/pkg/client" + + teeargs "github.com/masa-finance/tee-types/args" + teetypes "github.com/masa-finance/tee-types/types" +) + +// RedditApifyClient defines the interface for the Reddit Apify client. +// This allows for mocking in tests. +type RedditApifyClient interface { + ScrapeUrls(urls []teetypes.RedditStartURL, after time.Time, args redditapify.CommonArgs, cursor client.Cursor, maxResults uint) ([]*reddit.Response, client.Cursor, error) + SearchPosts(queries []string, after time.Time, args redditapify.CommonArgs, cursor client.Cursor, maxResults uint) ([]*reddit.Response, client.Cursor, error) + SearchCommunities(queries []string, args redditapify.CommonArgs, cursor client.Cursor, maxResults uint) ([]*reddit.Response, client.Cursor, error) + SearchUsers(queries []string, skipPosts bool, args redditapify.CommonArgs, cursor client.Cursor, maxResults uint) ([]*reddit.Response, client.Cursor, error) +} + +// NewRedditApifyClient is a function variable that can be replaced in tests. +// It defaults to the actual implementation. +var NewRedditApifyClient = func(apiKey string) (RedditApifyClient, error) { + return redditapify.NewClient(apiKey) +} + +type RedditScraper struct { + configuration types.RedditConfig + statsCollector *stats.StatsCollector + capabilities []teetypes.Capability +} + +func NewRedditScraper(jc types.JobConfiguration, statsCollector *stats.StatsCollector) *RedditScraper { + return &RedditScraper{ + configuration: jc.GetRedditConfig(), + statsCollector: statsCollector, + capabilities: teetypes.RedditCaps, + } +} + +func (r *RedditScraper) ExecuteJob(j types.Job) (types.JobResult, error) { + logrus.WithField("job_uuid", j.UUID).Info("Starting ExecuteJob for Reddit scrape") + + jobArgs, err := teeargs.UnmarshalJobArguments(teetypes.JobType(j.Type), map[string]any(j.Arguments)) + if err != nil { + msg := fmt.Errorf("failed to unmarshal job arguments: %w", err) + return types.JobResult{Error: msg.Error()}, msg + } + + // Type assert to TikTok arguments + redditArgs, ok := jobArgs.(*teeargs.RedditArguments) + if !ok { + return types.JobResult{Error: "invalid argument type for Reddit job"}, errors.New("invalid argument type") + } + logrus.Debugf("reddit job args: %+v", *redditArgs) + + redditClient, err := NewRedditApifyClient(r.configuration.ApifyApiKey) + if err != nil { + return types.JobResult{Error: "error while scraping Reddit"}, fmt.Errorf("error creating Reddit Apify client: %w", err) + } + + commonArgs := redditapify.CommonArgs{} + commonArgs.CopyFromArgs(redditArgs) + + switch redditArgs.QueryType { + case teetypes.RedditScrapeUrls: + resp, cursor, err := redditClient.ScrapeUrls(redditArgs.URLs, redditArgs.After, commonArgs, client.Cursor(redditArgs.NextCursor), redditArgs.MaxResults) + return processRedditResponse(j, resp, cursor, err) + + case teetypes.RedditSearchUsers: + resp, cursor, err := redditClient.SearchUsers(redditArgs.Queries, redditArgs.SkipPosts, commonArgs, client.Cursor(redditArgs.NextCursor), redditArgs.MaxResults) + return processRedditResponse(j, resp, cursor, err) + + case teetypes.RedditSearchPosts: + resp, cursor, err := redditClient.SearchPosts(redditArgs.Queries, redditArgs.After, commonArgs, client.Cursor(redditArgs.NextCursor), redditArgs.MaxResults) + return processRedditResponse(j, resp, cursor, err) + + case teetypes.RedditSearchCommunities: + resp, cursor, err := redditClient.SearchCommunities(redditArgs.Queries, commonArgs, client.Cursor(redditArgs.NextCursor), redditArgs.MaxResults) + return processRedditResponse(j, resp, cursor, err) + + default: + return types.JobResult{Error: "invalid type for Reddit job"}, fmt.Errorf("invalid type for Reddit job: %s", redditArgs.QueryType) + } +} + +func processRedditResponse(j types.Job, resp []*reddit.Response, cursor client.Cursor, err error) (types.JobResult, error) { + if err != nil { + return types.JobResult{Error: fmt.Sprintf("error while scraping Reddit: %s", err.Error())}, fmt.Errorf("error scraping Reddit: %w", err) + } + + data, err := json.Marshal(resp) + if err != nil { + return types.JobResult{Error: fmt.Sprintf("error marshalling Reddit response")}, fmt.Errorf("error marshalling Reddit response: %w", err) + } + return types.JobResult{ + Data: data, + Job: j, + NextCursor: cursor.String(), + }, nil +} diff --git a/internal/jobs/reddit_test.go b/internal/jobs/reddit_test.go new file mode 100644 index 00000000..5b77c148 --- /dev/null +++ b/internal/jobs/reddit_test.go @@ -0,0 +1,232 @@ +package jobs_test + +import ( + "encoding/json" + "errors" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/sirupsen/logrus" + + "github.com/masa-finance/tee-worker/api/types" + "github.com/masa-finance/tee-worker/api/types/reddit" + "github.com/masa-finance/tee-worker/internal/jobs" + "github.com/masa-finance/tee-worker/internal/jobs/redditapify" + "github.com/masa-finance/tee-worker/internal/jobs/stats" + "github.com/masa-finance/tee-worker/pkg/client" + + teetypes "github.com/masa-finance/tee-types/types" +) + +// MockRedditApifyClient is a mock implementation of the RedditApifyClient. +type MockRedditApifyClient struct { + ScrapeUrlsFunc func(urls []teetypes.RedditStartURL, after time.Time, args redditapify.CommonArgs, cursor client.Cursor, maxResults uint) ([]*reddit.Response, client.Cursor, error) + SearchPostsFunc func(queries []string, after time.Time, args redditapify.CommonArgs, cursor client.Cursor, maxResults uint) ([]*reddit.Response, client.Cursor, error) + SearchCommunitiesFunc func(queries []string, args redditapify.CommonArgs, cursor client.Cursor, maxResults uint) ([]*reddit.Response, client.Cursor, error) + SearchUsersFunc func(queries []string, skipPosts bool, args redditapify.CommonArgs, cursor client.Cursor, maxResults uint) ([]*reddit.Response, client.Cursor, error) +} + +func (m *MockRedditApifyClient) ScrapeUrls(urls []teetypes.RedditStartURL, after time.Time, args redditapify.CommonArgs, cursor client.Cursor, maxResults uint) ([]*reddit.Response, client.Cursor, error) { + if m != nil && m.ScrapeUrlsFunc != nil { + res, cursor, err := m.ScrapeUrlsFunc(urls, after, args, cursor, maxResults) + for i, r := range res { + logrus.Debugf("Scrape URLs result %d: %+v", i, r) + } + return res, cursor, err + } + return nil, "", nil +} + +func (m *MockRedditApifyClient) SearchPosts(queries []string, after time.Time, args redditapify.CommonArgs, cursor client.Cursor, maxResults uint) ([]*reddit.Response, client.Cursor, error) { + if m != nil && m.SearchPostsFunc != nil { + return m.SearchPostsFunc(queries, after, args, cursor, maxResults) + } + return nil, "", nil +} + +func (m *MockRedditApifyClient) SearchCommunities(queries []string, args redditapify.CommonArgs, cursor client.Cursor, maxResults uint) ([]*reddit.Response, client.Cursor, error) { + if m != nil && m.SearchCommunitiesFunc != nil { + return m.SearchCommunitiesFunc(queries, args, cursor, maxResults) + } + return nil, "", nil +} + +func (m *MockRedditApifyClient) SearchUsers(queries []string, skipPosts bool, args redditapify.CommonArgs, cursor client.Cursor, maxResults uint) ([]*reddit.Response, client.Cursor, error) { + if m != nil && m.SearchUsersFunc != nil { + return m.SearchUsersFunc(queries, skipPosts, args, cursor, maxResults) + } + return nil, "", nil +} + +var _ = Describe("RedditScraper", func() { + var ( + scraper *jobs.RedditScraper + statsCollector *stats.StatsCollector + job types.Job + mockClient *MockRedditApifyClient + ) + + BeforeEach(func() { + statsCollector = stats.StartCollector(128, types.JobConfiguration{}) + cfg := types.JobConfiguration{ + "apify_api_key": "test-key", + } + scraper = jobs.NewRedditScraper(cfg, statsCollector) + mockClient = &MockRedditApifyClient{} + + // Replace the client creation function with one that returns the mock + jobs.NewRedditApifyClient = func(apiKey string) (jobs.RedditApifyClient, error) { + return mockClient, nil + } + + job = types.Job{ + UUID: "test-uuid", + Type: teetypes.RedditJob, + } + }) + + Context("ExecuteJob", func() { + It("should return an error for invalid arguments", func() { + job.Arguments = map[string]any{"invalid": "args"} + result, err := scraper.ExecuteJob(job) + Expect(err).To(HaveOccurred()) + Expect(result.Error).To(ContainSubstring("failed to unmarshal job arguments")) + }) + + It("should call ScrapeUrls for the correct QueryType", func() { + job.Arguments = map[string]any{ + "type": teetypes.RedditScrapeUrls, + "urls": []teetypes.RedditStartURL{{URL: "https://www.reddit.com/u/zaphod/", Method: "GET"}}, + } + + mockClient.ScrapeUrlsFunc = func(urls []teetypes.RedditStartURL, after time.Time, cArgs redditapify.CommonArgs, cursor client.Cursor, maxResults uint) ([]*reddit.Response, client.Cursor, error) { + Expect(urls).To(HaveLen(1)) + Expect(urls[0].URL).To(Equal("https://www.reddit.com/u/zaphod/")) + return []*reddit.Response{{TypeSwitch: &reddit.TypeSwitch{Type: reddit.UserResponse}, User: &reddit.User{ID: "user1", DataType: string(reddit.UserResponse)}}}, "next", nil + } + + result, err := scraper.ExecuteJob(job) + Expect(err).NotTo(HaveOccurred()) + Expect(result.NextCursor).To(Equal("next")) + var resp []*reddit.Response + err = json.Unmarshal(result.Data, &resp) + Expect(err).NotTo(HaveOccurred()) + Expect(resp).To(HaveLen(1)) + Expect(resp[0]).NotTo(BeNil()) + Expect(resp[0].User).NotTo(BeNil()) + Expect(resp[0].User.ID).To(Equal("user1")) + }) + + It("should call SearchUsers for the correct QueryType", func() { + job.Arguments = map[string]any{ + "type": teetypes.RedditSearchUsers, + "queries": []string{"user-query"}, + } + + mockClient.SearchUsersFunc = func(queries []string, skipPosts bool, cArgs redditapify.CommonArgs, cursor client.Cursor, maxResults uint) ([]*reddit.Response, client.Cursor, error) { + Expect(queries).To(Equal([]string{"user-query"})) + return []*reddit.Response{{TypeSwitch: &reddit.TypeSwitch{Type: reddit.UserResponse}, User: &reddit.User{ID: "user2", DataType: string(reddit.UserResponse)}}}, "next-user", nil + } + + result, err := scraper.ExecuteJob(job) + Expect(err).NotTo(HaveOccurred()) + Expect(result.NextCursor).To(Equal("next-user")) + var resp []*reddit.Response + err = json.Unmarshal(result.Data, &resp) + Expect(err).NotTo(HaveOccurred()) + Expect(resp).To(HaveLen(1)) + Expect(resp[0]).NotTo(BeNil()) + Expect(resp[0].User).NotTo(BeNil()) + Expect(resp[0].User.ID).To(Equal("user2")) + }) + + It("should call SearchPosts for the correct QueryType", func() { + job.Arguments = map[string]any{ + "type": teetypes.RedditSearchPosts, + "queries": []string{"post-query"}, + } + + mockClient.SearchPostsFunc = func(queries []string, after time.Time, cArgs redditapify.CommonArgs, cursor client.Cursor, maxResults uint) ([]*reddit.Response, client.Cursor, error) { + Expect(queries).To(Equal([]string{"post-query"})) + return []*reddit.Response{{TypeSwitch: &reddit.TypeSwitch{Type: reddit.PostResponse}, Post: &reddit.Post{ID: "post1", DataType: string(reddit.PostResponse)}}}, "next-post", nil + } + + result, err := scraper.ExecuteJob(job) + Expect(err).NotTo(HaveOccurred()) + Expect(result.NextCursor).To(Equal("next-post")) + var resp []*reddit.Response + err = json.Unmarshal(result.Data, &resp) + Expect(err).NotTo(HaveOccurred()) + Expect(resp).To(HaveLen(1)) + Expect(resp[0]).NotTo(BeNil()) + Expect(resp[0].Post).NotTo(BeNil()) + Expect(resp[0].Post.ID).To(Equal("post1")) + }) + + It("should call SearchCommunities for the correct QueryType", func() { + job.Arguments = map[string]any{ + "type": teetypes.RedditSearchCommunities, + "queries": []string{"community-query"}, + } + + mockClient.SearchCommunitiesFunc = func(queries []string, cArgs redditapify.CommonArgs, cursor client.Cursor, maxResults uint) ([]*reddit.Response, client.Cursor, error) { + Expect(queries).To(Equal([]string{"community-query"})) + return []*reddit.Response{{TypeSwitch: &reddit.TypeSwitch{Type: reddit.CommunityResponse}, Community: &reddit.Community{ID: "comm1", DataType: string(reddit.CommunityResponse)}}}, "next-comm", nil + } + + result, err := scraper.ExecuteJob(job) + Expect(err).NotTo(HaveOccurred()) + Expect(result.NextCursor).To(Equal("next-comm")) + var resp []*reddit.Response + err = json.Unmarshal(result.Data, &resp) + Expect(err).NotTo(HaveOccurred()) + Expect(resp).To(HaveLen(1)) + Expect(resp[0]).NotTo(BeNil()) + Expect(resp[0].Community).NotTo(BeNil()) + Expect(resp[0].Community.ID).To(Equal("comm1")) + }) + + It("should return an error for an invalid QueryType", func() { + job.Arguments = map[string]any{ + "type": "invalid-type", + } + + result, err := scraper.ExecuteJob(job) + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError(ContainSubstring("invalid type"))) + Expect(result.Error).To(ContainSubstring("invalid type")) + }) + + It("should handle errors from the reddit client", func() { + job.Arguments = map[string]any{ + "type": teetypes.RedditSearchPosts, + "queries": []string{"post-query"}, + } + + expectedErr := errors.New("client error") + mockClient.SearchPostsFunc = func(queries []string, after time.Time, cArgs redditapify.CommonArgs, cursor client.Cursor, maxResults uint) ([]*reddit.Response, client.Cursor, error) { + return nil, "", expectedErr + } + + result, err := scraper.ExecuteJob(job) + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError(ContainSubstring("client error"))) + Expect(result.Error).To(ContainSubstring("error while scraping Reddit: client error")) + }) + + It("should handle errors when creating the client", func() { + jobs.NewRedditApifyClient = func(apiKey string) (jobs.RedditApifyClient, error) { + return nil, errors.New("client creation failed") + } + job.Arguments = map[string]any{ + "type": teetypes.RedditSearchPosts, + "queries": []string{"post-query"}, + } + + result, err := scraper.ExecuteJob(job) + Expect(err).To(HaveOccurred()) + Expect(result.Error).To(Equal("error while scraping Reddit")) + }) + }) +}) diff --git a/internal/jobs/redditapify/client.go b/internal/jobs/redditapify/client.go new file mode 100644 index 00000000..cb7dff64 --- /dev/null +++ b/internal/jobs/redditapify/client.go @@ -0,0 +1,174 @@ +package redditapify + +import ( + "encoding/json" + "fmt" + "time" + + "github.com/sirupsen/logrus" + + "github.com/masa-finance/tee-worker/api/types/reddit" + "github.com/masa-finance/tee-worker/pkg/client" + + teeargs "github.com/masa-finance/tee-types/args" + teetypes "github.com/masa-finance/tee-types/types" +) + +const ( + RedditActorID = "trudax~reddit-scraper" +) + +// CommonArgs holds the parameters that all Reddit searches support, in a single struct +type CommonArgs struct { + Sort teetypes.RedditSortType + IncludeNSFW bool + MaxItems uint + MaxPosts uint + MaxComments uint + MaxCommunities uint + MaxUsers uint +} + +func (ca *CommonArgs) CopyFromArgs(a *teeargs.RedditArguments) { + ca.Sort = a.Sort + ca.IncludeNSFW = a.IncludeNSFW + ca.MaxItems = a.MaxItems + ca.MaxPosts = a.MaxPosts + ca.MaxComments = a.MaxComments + ca.MaxCommunities = a.MaxCommunities + ca.MaxUsers = a.MaxUsers +} + +func (args *CommonArgs) ToActorRequest() RedditActorRequest { + return RedditActorRequest{ + Sort: args.Sort, + IncludeNSFW: args.IncludeNSFW, + MaxItems: args.MaxItems, + MaxPostCount: args.MaxPosts, + MaxComments: args.MaxComments, + MaxCommunitiesCount: args.MaxCommunities, + MaxUserCount: args.MaxUsers, + } +} + +// ApifyRedditQuery represents the query parameters for the Apify Reddit Scraper actor. +// Based on the input schema of https://apify.com/trudax/reddit-scraper +type RedditActorRequest struct { + Type teetypes.RedditQueryType `json:"type,omitempty"` + Searches []string `json:"searches,omitempty"` + StartUrls []teetypes.RedditStartURL `json:"startUrls,omitempty"` + Sort teetypes.RedditSortType `json:"sort,omitempty"` + PostDateLimit *time.Time `json:"postDateLimit,omitempty"` + IncludeNSFW bool `json:"includeNSFW"` + MaxItems uint `json:"maxItems,omitempty"` // Total number of items to scrape + MaxPostCount uint `json:"maxPostCount,omitempty"` // Max number of posts per page + MaxComments uint `json:"maxComments,omitempty"` // Max number of comments per page + MaxCommunitiesCount uint `json:"maxCommunitiesCount,omitempty"` // Max number of communities per page + MaxUserCount uint `json:"maxUserCount,omitempty"` // Max number of users per page + SearchComments bool `json:"searchComments"` + SearchCommunities bool `json:"searchCommunities"` + SearchPosts bool `json:"searchPosts"` + SearchUsers bool `json:"searchUsers"` + SkipUserPosts bool `json:"skipUserPosts"` + SkipComments bool `json:"skipComments"` +} + +// RedditApifyClient wraps the generic Apify client for Reddit-specific operations +type RedditApifyClient struct { + apifyClient client.Apify +} + +// NewInternalClient is a function variable that can be replaced in tests. +// It defaults to the actual implementation. +var NewInternalClient = func(apiKey string) (client.Apify, error) { + return client.NewApifyClient(apiKey) +} + +// NewClient creates a new Reddit Apify client +func NewClient(apiToken string) (*RedditApifyClient, error) { + apifyClient, err := NewInternalClient(apiToken) + if err != nil { + return nil, fmt.Errorf("failed to create apify client: %w", err) + } + + return &RedditApifyClient{ + apifyClient: apifyClient, + }, nil +} + +// ValidateApiKey tests if the Apify API token is valid +func (c *RedditApifyClient) ValidateApiKey() error { + return c.apifyClient.ValidateApiKey() +} + +// ScrapeUrls scrapes Reddit URLs +func (c *RedditApifyClient) ScrapeUrls(urls []teetypes.RedditStartURL, after time.Time, args CommonArgs, cursor client.Cursor, maxResults uint) ([]*reddit.Response, client.Cursor, error) { + input := args.ToActorRequest() + input.StartUrls = urls + input.PostDateLimit = &after + input.Searches = nil + input.SearchUsers = true + input.SearchComments = true + input.SearchPosts = true + input.SearchCommunities = true + input.SkipUserPosts = input.MaxPostCount == 0 + + return c.queryReddit(input, cursor, maxResults) +} + +// SearchPosts searches Reddit posts +func (c *RedditApifyClient) SearchPosts(queries []string, after time.Time, args CommonArgs, cursor client.Cursor, maxResults uint) ([]*reddit.Response, client.Cursor, error) { + input := args.ToActorRequest() + input.Searches = queries + input.StartUrls = nil + input.PostDateLimit = &after + input.Type = "posts" + + input.SearchPosts = true + input.SkipComments = input.MaxComments == 0 + + return c.queryReddit(input, cursor, maxResults) +} + +// SearchCommunities searches Reddit communities +func (c *RedditApifyClient) SearchCommunities(queries []string, args CommonArgs, cursor client.Cursor, maxResults uint) ([]*reddit.Response, client.Cursor, error) { + input := args.ToActorRequest() + input.Searches = queries + input.StartUrls = nil + input.Type = "communities" + input.SearchCommunities = true + + return c.queryReddit(input, cursor, maxResults) +} + +// SearchUsers searches Reddit users +func (c *RedditApifyClient) SearchUsers(queries []string, skipPosts bool, args CommonArgs, cursor client.Cursor, maxResults uint) ([]*reddit.Response, client.Cursor, error) { + input := args.ToActorRequest() + input.Searches = queries + input.StartUrls = nil + input.SkipUserPosts = skipPosts + input.Type = "users" + input.SearchUsers = true + + return c.queryReddit(input, cursor, maxResults) +} + +// getProfiles runs the actor and retrieves profiles from the dataset +func (c *RedditApifyClient) queryReddit(input RedditActorRequest, cursor client.Cursor, limit uint) ([]*reddit.Response, client.Cursor, error) { + dataset, nextCursor, err := c.apifyClient.RunActorAndGetResponse(RedditActorID, input, cursor, limit) + if err != nil { + return nil, client.EmptyCursor, err + } + + response := make([]*reddit.Response, 0, len(dataset.Data.Items)) + for i, item := range dataset.Data.Items { + var resp reddit.Response + if err := json.Unmarshal(item, &resp); err != nil { + logrus.Warnf("Failed to unmarshal profile at index %d: %v", i, err) + continue + } + response = append(response, &resp) + } + + return response, nextCursor, nil +} diff --git a/internal/jobs/redditapify/client_test.go b/internal/jobs/redditapify/client_test.go new file mode 100644 index 00000000..8cc023c3 --- /dev/null +++ b/internal/jobs/redditapify/client_test.go @@ -0,0 +1,239 @@ +package redditapify_test + +import ( + "encoding/json" + "errors" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/masa-finance/tee-worker/internal/jobs/redditapify" + "github.com/masa-finance/tee-worker/pkg/client" + + teeargs "github.com/masa-finance/tee-types/args" + teetypes "github.com/masa-finance/tee-types/types" +) + +// FIXME: Really test with a live API key +// FIXME: Fix documentation +// MockApifyClient is a mock implementation of the ApifyClient. +type MockApifyClient struct { + RunActorAndGetResponseFunc func(actorID string, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) + ValidateApiKeyFunc func() error +} + +func (m *MockApifyClient) RunActorAndGetResponse(actorID string, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { + if m.RunActorAndGetResponseFunc != nil { + return m.RunActorAndGetResponseFunc(actorID, input, cursor, limit) + } + return nil, "", errors.New("RunActorAndGetResponseFunc not defined") +} + +func (m *MockApifyClient) ValidateApiKey() error { + if m.ValidateApiKeyFunc != nil { + return m.ValidateApiKeyFunc() + } + return errors.New("ValidateApiKeyFunc not defined") +} + +var _ = Describe("RedditApifyClient", func() { + var ( + mockClient *MockApifyClient + redditClient *redditapify.RedditApifyClient + ) + + BeforeEach(func() { + mockClient = &MockApifyClient{} + // Replace the client creation function with one that returns the mock + redditapify.NewInternalClient = func(apiKey string) (client.Apify, error) { + return mockClient, nil + } + var err error + redditClient, err = redditapify.NewClient("test-token") + Expect(err).NotTo(HaveOccurred()) + }) + + Describe("ScrapeUrls", func() { + It("should construct the correct actor input", func() { + urls := []teetypes.RedditStartURL{{URL: "http://reddit.com/r/golang"}} + after := time.Now() + args := redditapify.CommonArgs{MaxPosts: 10} + + mockClient.RunActorAndGetResponseFunc = func(actorID string, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { + Expect(actorID).To(Equal(redditapify.RedditActorID)) + req := input.(redditapify.RedditActorRequest) + Expect(req.StartUrls).To(Equal(urls)) + Expect(*req.PostDateLimit).To(BeTemporally("~", after, time.Second)) + Expect(req.Searches).To(BeNil()) + Expect(req.SearchUsers).To(BeTrue()) + Expect(req.SearchPosts).To(BeTrue()) + Expect(req.SearchCommunities).To(BeTrue()) + Expect(req.SkipUserPosts).To(BeFalse()) + Expect(req.MaxPostCount).To(Equal(uint(10))) + return &client.DatasetResponse{Data: client.ApifyDatasetData{Items: []json.RawMessage{}}}, "next", nil + } + + _, _, err := redditClient.ScrapeUrls(urls, after, args, "", 100) + Expect(err).NotTo(HaveOccurred()) + }) + }) + + Describe("SearchPosts", func() { + It("should construct the correct actor input", func() { + queries := []string{"golang"} + after := time.Now() + args := redditapify.CommonArgs{MaxComments: 5} + + mockClient.RunActorAndGetResponseFunc = func(actorID string, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { + Expect(actorID).To(Equal(redditapify.RedditActorID)) + req := input.(redditapify.RedditActorRequest) + Expect(req.Searches).To(Equal(queries)) + Expect(req.StartUrls).To(BeNil()) + Expect(*req.PostDateLimit).To(BeTemporally("~", after, time.Second)) + Expect(req.Type).To(Equal(teetypes.RedditQueryType("posts"))) + Expect(req.SearchPosts).To(BeTrue()) + Expect(req.SkipComments).To(BeFalse()) + Expect(req.MaxComments).To(Equal(uint(5))) + return &client.DatasetResponse{Data: client.ApifyDatasetData{Items: []json.RawMessage{}}}, "next", nil + } + + _, _, err := redditClient.SearchPosts(queries, after, args, "", 100) + Expect(err).NotTo(HaveOccurred()) + }) + }) + + Describe("SearchCommunities", func() { + It("should construct the correct actor input", func() { + queries := []string{"golang"} + args := redditapify.CommonArgs{} + + mockClient.RunActorAndGetResponseFunc = func(actorID string, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { + Expect(actorID).To(Equal(redditapify.RedditActorID)) + req := input.(redditapify.RedditActorRequest) + Expect(req.Searches).To(Equal(queries)) + Expect(req.StartUrls).To(BeNil()) + Expect(req.Type).To(Equal(teetypes.RedditQueryType("communities"))) + Expect(req.SearchCommunities).To(BeTrue()) + return &client.DatasetResponse{Data: client.ApifyDatasetData{Items: []json.RawMessage{}}}, "next", nil + } + + _, _, err := redditClient.SearchCommunities(queries, args, "", 100) + Expect(err).NotTo(HaveOccurred()) + }) + }) + + Describe("SearchUsers", func() { + It("should construct the correct actor input", func() { + queries := []string{"gopher"} + args := redditapify.CommonArgs{} + + mockClient.RunActorAndGetResponseFunc = func(actorID string, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { + Expect(actorID).To(Equal(redditapify.RedditActorID)) + req := input.(redditapify.RedditActorRequest) + Expect(req.Searches).To(Equal(queries)) + Expect(req.StartUrls).To(BeNil()) + Expect(req.Type).To(Equal(teetypes.RedditQueryType("users"))) + Expect(req.SearchUsers).To(BeTrue()) + Expect(req.SkipUserPosts).To(BeTrue()) + return &client.DatasetResponse{Data: client.ApifyDatasetData{Items: []json.RawMessage{}}}, "next", nil + } + + _, _, err := redditClient.SearchUsers(queries, true, args, "", 100) + Expect(err).NotTo(HaveOccurred()) + }) + }) + + Context("queryReddit", func() { + It("should handle errors from the apify client", func() { + expectedErr := errors.New("apify error") + mockClient.RunActorAndGetResponseFunc = func(actorID string, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { + return nil, "", expectedErr + } + _, _, err := redditClient.SearchUsers([]string{"test"}, false, redditapify.CommonArgs{}, "", 10) + Expect(err).To(MatchError(expectedErr)) + }) + + It("should handle JSON unmarshalling errors gracefully", func() { + invalidJSON := []byte(`{"type": "user", "id": 123}`) // id should be a string + dataset := &client.DatasetResponse{ + Data: client.ApifyDatasetData{ + Items: []json.RawMessage{invalidJSON}, + }, + } + mockClient.RunActorAndGetResponseFunc = func(actorID string, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { + return dataset, "next", nil + } + + // This is a bit of a hack to test the private queryReddit method + // We call a public method that uses it + profiles, _, err := redditClient.SearchUsers([]string{"test"}, false, redditapify.CommonArgs{}, "", 10) + Expect(err).NotTo(HaveOccurred()) + Expect(profiles).To(BeEmpty()) // The invalid item should be skipped + }) + + It("should correctly unmarshal valid items", func() { + userJSON, _ := json.Marshal(map[string]any{"type": "user", "id": "u1", "username": "testuser"}) + dataset := &client.DatasetResponse{ + Data: client.ApifyDatasetData{ + Items: []json.RawMessage{userJSON}, + }, + } + mockClient.RunActorAndGetResponseFunc = func(actorID string, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { + return dataset, "next", nil + } + + profiles, cursor, err := redditClient.SearchUsers([]string{"test"}, false, redditapify.CommonArgs{}, "", 10) + Expect(err).NotTo(HaveOccurred()) + Expect(cursor).To(Equal(client.Cursor("next"))) + Expect(profiles).To(HaveLen(1)) + Expect(profiles[0].User).NotTo(BeNil()) + Expect(profiles[0].User.ID).To(Equal("u1")) + }) + }) + + Describe("CommonArgs", func() { + It("should copy from RedditArguments correctly", func() { + redditArgs := &teeargs.RedditArguments{ + Sort: teetypes.RedditSortTop, + IncludeNSFW: true, + MaxItems: 1, + MaxPosts: 2, + MaxComments: 3, + MaxCommunities: 4, + MaxUsers: 5, + } + commonArgs := redditapify.CommonArgs{} + commonArgs.CopyFromArgs(redditArgs) + + Expect(commonArgs.Sort).To(Equal(teetypes.RedditSortTop)) + Expect(commonArgs.IncludeNSFW).To(BeTrue()) + Expect(commonArgs.MaxItems).To(Equal(uint(1))) + Expect(commonArgs.MaxPosts).To(Equal(uint(2))) + Expect(commonArgs.MaxComments).To(Equal(uint(3))) + Expect(commonArgs.MaxCommunities).To(Equal(uint(4))) + Expect(commonArgs.MaxUsers).To(Equal(uint(5))) + }) + + It("should convert to RedditActorRequest correctly", func() { + commonArgs := redditapify.CommonArgs{ + Sort: teetypes.RedditSortNew, + IncludeNSFW: true, + MaxItems: 10, + MaxPosts: 20, + MaxComments: 30, + MaxCommunities: 40, + MaxUsers: 50, + } + actorReq := commonArgs.ToActorRequest() + + Expect(actorReq.Sort).To(Equal(teetypes.RedditSortNew)) + Expect(actorReq.IncludeNSFW).To(BeTrue()) + Expect(actorReq.MaxItems).To(Equal(uint(10))) + Expect(actorReq.MaxPostCount).To(Equal(uint(20))) + Expect(actorReq.MaxComments).To(Equal(uint(30))) + Expect(actorReq.MaxCommunitiesCount).To(Equal(uint(40))) + Expect(actorReq.MaxUserCount).To(Equal(uint(50))) + }) + }) +}) diff --git a/internal/jobs/twitter.go b/internal/jobs/twitter.go index e04dc57b..116671b4 100644 --- a/internal/jobs/twitter.go +++ b/internal/jobs/twitter.go @@ -9,7 +9,6 @@ import ( "strings" "time" - "github.com/masa-finance/tee-types/args" teeargs "github.com/masa-finance/tee-types/args" teetypes "github.com/masa-finance/tee-types/types" @@ -266,7 +265,7 @@ func (ts *TwitterScraper) queryTweetsWithCredentials(j types.Job, baseDir string return ts.scrapeTweetsWithCredentials(j, query, count, scraper, account) } -func (ts *TwitterScraper) queryTweetsWithApiKey(j types.Job, baseQueryEndpoint string, baseDir string, query string, count int) ([]*teetypes.TweetResult, error) { +func (ts *TwitterScraper) queryTweetsWithApiKey(j types.Job, baseQueryEndpoint string, query string, count int) ([]*teetypes.TweetResult, error) { twitterXScraper, apiKey, err := ts.getApiScraper(j) if err != nil { return nil, err @@ -394,12 +393,6 @@ EndLoop: return tweets, nil } -func (ts *TwitterScraper) scrapeTweetsWithApiKey(j types.Job, baseQueryEndpoint string, query string, count int, apiKey *twitter.TwitterApiKey) ([]*teetypes.TweetResult, error) { - apiClient := client.NewTwitterXClient(apiKey.Key) - twitterXScraper := twitterx.NewTwitterXScraper(apiClient) - return ts.scrapeTweets(j, baseQueryEndpoint, query, count, twitterXScraper, apiKey) -} - func (ts *TwitterScraper) ScrapeTweetByID(j types.Job, baseDir string, tweetID string) (*teetypes.TweetResult, error) { ts.statsCollector.Add(j.WorkerID, stats.TwitterScrapes, 1) @@ -880,7 +873,7 @@ func (ts *TwitterScraper) GetFollowing(j types.Job, baseDir, username string, co } // getFollowersApify retrieves followers using Apify -func (ts *TwitterScraper) getFollowersApify(j types.Job, username string, maxResults int, cursor client.Cursor) ([]*teetypes.ProfileResultApify, client.Cursor, error) { +func (ts *TwitterScraper) getFollowersApify(j types.Job, username string, maxResults uint, cursor client.Cursor) ([]*teetypes.ProfileResultApify, client.Cursor, error) { apifyScraper, err := ts.getApifyScraper(j) if err != nil { return nil, "", err @@ -898,7 +891,7 @@ func (ts *TwitterScraper) getFollowersApify(j types.Job, username string, maxRes } // getFollowingApify retrieves following using Apify -func (ts *TwitterScraper) getFollowingApify(j types.Job, username string, maxResults int, cursor client.Cursor) ([]*teetypes.ProfileResultApify, client.Cursor, error) { +func (ts *TwitterScraper) getFollowingApify(j types.Job, username string, maxResults uint, cursor client.Cursor) ([]*teetypes.ProfileResultApify, client.Cursor, error) { apifyScraper, err := ts.getApifyScraper(j) if err != nil { return nil, "", err @@ -1101,7 +1094,7 @@ func (ts *TwitterScraper) GetStructuredCapabilities() teetypes.WorkerCapabilitie } type TwitterScrapeStrategy interface { - Execute(j types.Job, ts *TwitterScraper, jobArgs *args.TwitterSearchArguments) (types.JobResult, error) + Execute(j types.Job, ts *TwitterScraper, jobArgs *teeargs.TwitterSearchArguments) (types.JobResult, error) } func getScrapeStrategy(jobType teetypes.JobType) TwitterScrapeStrategy { @@ -1119,7 +1112,7 @@ func getScrapeStrategy(jobType teetypes.JobType) TwitterScrapeStrategy { type CredentialScrapeStrategy struct{} -func (s *CredentialScrapeStrategy) Execute(j types.Job, ts *TwitterScraper, jobArgs *args.TwitterSearchArguments) (types.JobResult, error) { +func (s *CredentialScrapeStrategy) Execute(j types.Job, ts *TwitterScraper, jobArgs *teeargs.TwitterSearchArguments) (types.JobResult, error) { capability := jobArgs.GetCapability() switch capability { case teetypes.CapSearchByQuery: @@ -1136,14 +1129,14 @@ func (s *CredentialScrapeStrategy) Execute(j types.Job, ts *TwitterScraper, jobA type ApiKeyScrapeStrategy struct{} -func (s *ApiKeyScrapeStrategy) Execute(j types.Job, ts *TwitterScraper, jobArgs *args.TwitterSearchArguments) (types.JobResult, error) { +func (s *ApiKeyScrapeStrategy) Execute(j types.Job, ts *TwitterScraper, jobArgs *teeargs.TwitterSearchArguments) (types.JobResult, error) { capability := jobArgs.GetCapability() switch capability { case teetypes.CapSearchByQuery: - tweets, err := ts.queryTweetsWithApiKey(j, twitterx.TweetsSearchRecent, ts.configuration.DataDir, jobArgs.Query, jobArgs.MaxResults) + tweets, err := ts.queryTweetsWithApiKey(j, twitterx.TweetsSearchRecent, jobArgs.Query, jobArgs.MaxResults) return processResponse(tweets, "", err) case teetypes.CapSearchByFullArchive: - tweets, err := ts.queryTweetsWithApiKey(j, twitterx.TweetsAll, ts.configuration.DataDir, jobArgs.Query, jobArgs.MaxResults) + tweets, err := ts.queryTweetsWithApiKey(j, twitterx.TweetsAll, jobArgs.Query, jobArgs.MaxResults) return processResponse(tweets, "", err) case teetypes.CapGetProfileById: _, apiKey, err := ts.getApiScraper(j) @@ -1166,14 +1159,14 @@ func (s *ApiKeyScrapeStrategy) Execute(j types.Job, ts *TwitterScraper, jobArgs type ApifyScrapeStrategy struct{} -func (s *ApifyScrapeStrategy) Execute(j types.Job, ts *TwitterScraper, jobArgs *args.TwitterSearchArguments) (types.JobResult, error) { +func (s *ApifyScrapeStrategy) Execute(j types.Job, ts *TwitterScraper, jobArgs *teeargs.TwitterSearchArguments) (types.JobResult, error) { capability := teetypes.Capability(jobArgs.QueryType) switch capability { case teetypes.CapGetFollowers: - followers, nextCursor, err := ts.getFollowersApify(j, jobArgs.Query, jobArgs.MaxResults, client.Cursor(jobArgs.NextCursor)) + followers, nextCursor, err := ts.getFollowersApify(j, jobArgs.Query, uint(jobArgs.MaxResults), client.Cursor(jobArgs.NextCursor)) return processResponse(followers, nextCursor.String(), err) case teetypes.CapGetFollowing: - following, nextCursor, err := ts.getFollowingApify(j, jobArgs.Query, jobArgs.MaxResults, client.Cursor(jobArgs.NextCursor)) + following, nextCursor, err := ts.getFollowingApify(j, jobArgs.Query, uint(jobArgs.MaxResults), client.Cursor(jobArgs.NextCursor)) return processResponse(following, nextCursor.String(), err) default: return types.JobResult{Error: fmt.Sprintf("unsupported capability %s for Apify job", capability)}, fmt.Errorf("unsupported capability %s for Apify job", capability) @@ -1183,7 +1176,7 @@ func (s *ApifyScrapeStrategy) Execute(j types.Job, ts *TwitterScraper, jobArgs * type DefaultScrapeStrategy struct{} // FIXED: Now using validated QueryType from centralized unmarshaller (addresses the TODO comment) -func (s *DefaultScrapeStrategy) Execute(j types.Job, ts *TwitterScraper, jobArgs *args.TwitterSearchArguments) (types.JobResult, error) { +func (s *DefaultScrapeStrategy) Execute(j types.Job, ts *TwitterScraper, jobArgs *teeargs.TwitterSearchArguments) (types.JobResult, error) { capability := teetypes.Capability(jobArgs.QueryType) switch capability { case teetypes.CapGetFollowers, teetypes.CapGetFollowing: @@ -1290,7 +1283,7 @@ func processResponse(response any, nextCursor string, err error) (types.JobResul return types.JobResult{Data: dat, NextCursor: nextCursor}, nil } -func defaultStrategyFallback(j types.Job, ts *TwitterScraper, jobArgs *args.TwitterSearchArguments) (types.JobResult, error) { +func defaultStrategyFallback(j types.Job, ts *TwitterScraper, jobArgs *teeargs.TwitterSearchArguments) (types.JobResult, error) { capability := jobArgs.GetCapability() switch capability { case teetypes.CapSearchByProfile: diff --git a/internal/jobs/twitterapify/client.go b/internal/jobs/twitterapify/client.go index bb67e879..f9c40d45 100644 --- a/internal/jobs/twitterapify/client.go +++ b/internal/jobs/twitterapify/client.go @@ -18,15 +18,15 @@ const ( type FollowerActorRunRequest struct { UserNames []string `json:"user_names"` UserIds []string `json:"user_ids"` - MaxFollowers int `json:"maxFollowers"` - MaxFollowings int `json:"maxFollowings"` + MaxFollowers uint `json:"maxFollowers"` + MaxFollowings uint `json:"maxFollowings"` GetFollowers bool `json:"getFollowers"` GetFollowing bool `json:"getFollowing"` } // TwitterApifyClient wraps the generic Apify client for Twitter-specific operations type TwitterApifyClient struct { - apifyClient *client.ApifyClient + apifyClient client.Apify } // NewTwitterApifyClient creates a new Twitter Apify client @@ -47,8 +47,8 @@ func (c *TwitterApifyClient) ValidateApiKey() error { } // GetFollowers retrieves followers for a username using Apify -func (c *TwitterApifyClient) GetFollowers(username string, maxResults int, cursor client.Cursor) ([]*teetypes.ProfileResultApify, client.Cursor, error) { - minimum := 200 +func (c *TwitterApifyClient) GetFollowers(username string, maxResults uint, cursor client.Cursor) ([]*teetypes.ProfileResultApify, client.Cursor, error) { + minimum := uint(200) // Ensure minimum of 200 as required by the actor maxFollowers := util.Max(maxResults, minimum) @@ -66,8 +66,8 @@ func (c *TwitterApifyClient) GetFollowers(username string, maxResults int, curso } // GetFollowing retrieves following for a username using Apify -func (c *TwitterApifyClient) GetFollowing(username string, cursor client.Cursor, maxResults int) ([]*teetypes.ProfileResultApify, client.Cursor, error) { - minimum := 200 +func (c *TwitterApifyClient) GetFollowing(username string, cursor client.Cursor, maxResults uint) ([]*teetypes.ProfileResultApify, client.Cursor, error) { + minimum := uint(200) // Ensure minimum of 200 as required by the actor maxFollowings := util.Max(maxResults, minimum) @@ -85,7 +85,7 @@ func (c *TwitterApifyClient) GetFollowing(username string, cursor client.Cursor, } // getProfiles runs the actor and retrieves profiles from the dataset -func (c *TwitterApifyClient) getProfiles(input FollowerActorRunRequest, cursor client.Cursor, limit int) ([]*teetypes.ProfileResultApify, client.Cursor, error) { +func (c *TwitterApifyClient) getProfiles(input FollowerActorRunRequest, cursor client.Cursor, limit uint) ([]*teetypes.ProfileResultApify, client.Cursor, error) { dataset, nextCursor, err := c.apifyClient.RunActorAndGetResponse(TwitterFollowerActorID, input, cursor, limit) if err != nil { return nil, client.EmptyCursor, err diff --git a/internal/jobs/webscraper.go b/internal/jobs/webscraper.go index 2a120afa..7e4d652c 100644 --- a/internal/jobs/webscraper.go +++ b/internal/jobs/webscraper.go @@ -49,7 +49,7 @@ func (ws *WebScraper) ExecuteJob(j types.Job) (types.JobResult, error) { // Step 1: Use centralized type-safe unmarshaller jobArgs, err := teeargs.UnmarshalJobArguments(teetypes.JobType(j.Type), map[string]any(j.Arguments)) if err != nil { - logrus.Errorf("Failed to unmarshal job arguments: %v", err) + logrus.Warnf("Failed to unmarshal job arguments: %v", err) ws.stats.Add(j.WorkerID, stats.WebInvalid, 1) return types.JobResult{Error: fmt.Sprintf("Invalid arguments: %v", err)}, nil } @@ -60,16 +60,15 @@ func (ws *WebScraper) ExecuteJob(j types.Job) (types.JobResult, error) { logrus.Errorf("Expected Web arguments for job ID %s, type %s", j.UUID, j.Type) return types.JobResult{Error: "invalid argument type for Web job"}, nil } - logrus.Infof("Job arguments unmarshaled and validated successfully: %+v", args) + logrus.Debugf("Job arguments unmarshaled and validated successfully: %+v", args) // Step 2: Validate URL against blacklist - logrus.Info("Validating URL against blacklist") + logrus.Debug("Validating URL against blacklist") for _, u := range ws.configuration.Blacklist { logrus.Debugf("Checking if URL contains blacklisted term: %s", u) if strings.Contains(args.URL, u) { logrus.Warnf("URL %s is blacklisted due to term: %s", args.URL, u) ws.stats.Add(j.WorkerID, stats.WebInvalid, 1) - logrus.Errorf("Blacklisted URL: %s", args.URL) return types.JobResult{ Error: fmt.Sprintf("URL blacklisted: %s", args.URL), }, nil @@ -78,20 +77,20 @@ func (ws *WebScraper) ExecuteJob(j types.Job) (types.JobResult, error) { logrus.Infof("URL %s passed blacklist validation", args.URL) // Step 3: Use enhanced methods for cleaner logic and validation - logrus.Infof("Initiating web scraping for URL: %s (max_depth: %d, has_selector: %t, is_deep_scrape: %t)", + logrus.Debugf("Initiating web scraping for URL: %s (max_depth: %d, has_selector: %t, is_deep_scrape: %t)", args.URL, args.GetEffectiveMaxDepth(), args.HasSelector(), args.IsDeepScrape()) // Perform web scraping using the effective max depth result, err := scrapeWeb([]string{args.URL}, args.GetEffectiveMaxDepth()) if err != nil { - logrus.Errorf("Web scraping failed for URL %s: %v", args.URL, err) + logrus.Warnf("Web scraping failed for URL %s: %v", args.URL, err) ws.stats.Add(j.WorkerID, stats.WebErrors, 1) return types.JobResult{Error: err.Error()}, err } - logrus.Infof("Web scraping succeeded for URL %s: %v", args.URL, result) + logrus.Debugf("Web scraping succeeded for URL %s: %v", args.URL, string(result)) // Step 4: Process result and return - logrus.Info("Updating statistics for successful web scraping") + logrus.Debugf("Updating statistics for successful web scraping") ws.stats.Add(j.WorkerID, stats.WebSuccess, 1) logrus.Infof("Returning web scraping result for URL %s", args.URL) return types.JobResult{ diff --git a/internal/jobserver/jobserver.go b/internal/jobserver/jobserver.go index 45cfd9a6..58abd86e 100644 --- a/internal/jobserver/jobserver.go +++ b/internal/jobserver/jobserver.go @@ -96,12 +96,15 @@ func NewJobServer(workers int, jc types.JobConfiguration) *JobServer { teetypes.TwitterApifyJob: { w: jobs.NewTwitterScraper(jc, s), // Register Apify job type with Twitter scraper }, - teetypes.TelemetryJob: { - w: jobs.NewTelemetryJob(jc, s), - }, teetypes.TiktokJob: { w: jobs.NewTikTokTranscriber(jc, s), }, + teetypes.RedditJob: { + w: jobs.NewRedditScraper(jc, s), + }, + teetypes.TelemetryJob: { + w: jobs.NewTelemetryJob(jc, s), + }, } // Validate that all workers were initialized successfully for jobType, workerEntry := range jobworkers { diff --git a/internal/jobserver/jobserver_test.go b/internal/jobserver/jobserver_test.go index 92312d8d..3a133e8c 100644 --- a/internal/jobserver/jobserver_test.go +++ b/internal/jobserver/jobserver_test.go @@ -53,7 +53,8 @@ var _ = Describe("Jobserver", func() { Arguments: map[string]interface{}{ "url": "google", }, - Nonce: "1234567890", + Nonce: "1234567890", + WorkerID: "miner3", }) Expect(uuid).To(BeEmpty()) @@ -82,7 +83,8 @@ var _ = Describe("Jobserver", func() { Arguments: map[string]interface{}{ "url": "google", }, - Nonce: "1234567890", + Nonce: "1234567890", + WorkerID: "miner3", }) Expect(uuid).ToNot(BeEmpty()) diff --git a/internal/jobserver/worker.go b/internal/jobserver/worker.go index acb8d4eb..ec55110f 100644 --- a/internal/jobserver/worker.go +++ b/internal/jobserver/worker.go @@ -46,7 +46,10 @@ func (js *JobServer) doWork(j types.Job) error { result, err := w.w.ExecuteJob(j) if err != nil { - result.Error = err.Error() + logrus.Infof("Error executing job type %s: %s", j.Type, err.Error()) + if len(result.Error) == 0 { + result.Error = err.Error() + } } result.Job = j diff --git a/pkg/client/apify_client.go b/pkg/client/apify_client.go index a33b6916..d14ca0c6 100644 --- a/pkg/client/apify_client.go +++ b/pkg/client/apify_client.go @@ -24,6 +24,12 @@ const ( ActorStatusAborted = "ABORTED" ) +// Apify provides an interface for interacting with the Apify API. +type Apify interface { + RunActorAndGetResponse(actorId string, input any, cursor Cursor, limit uint) (*DatasetResponse, Cursor, error) + ValidateApiKey() error +} + // ApifyClient represents a client for the Apify API type ApifyClient struct { apiToken string @@ -40,19 +46,22 @@ type ActorRunResponse struct { } `json:"data"` } +// ApifyDatasetData holds the items from an Apify dataset +type ApifyDatasetData struct { + Items []json.RawMessage `json:"items"` + Count uint `json:"count"` + Offset uint `json:"offset"` + Limit uint `json:"limit"` +} + // DatasetResponse represents the response from getting dataset items type DatasetResponse struct { - Data struct { - Items []json.RawMessage `json:"items"` - Count int `json:"count"` - Offset int `json:"offset"` - Limit int `json:"limit"` - } `json:"data"` + Data ApifyDatasetData `json:"data"` } // CursorData represents the pagination data stored in cursor type CursorData struct { - Offset int `json:"offset"` + Offset uint `json:"offset"` } // Cursor represents an encoded CursorData @@ -66,7 +75,7 @@ func (c Cursor) String() string { } // NewApifyClient creates a new Apify client with functional options -func NewApifyClient(apiToken string, opts ...Option) (*ApifyClient, error) { +func NewApifyClient(apiToken string, opts ...Option) (Apify, error) { logrus.Info("Creating new ApifyClient with API token") options, err := NewOptions(opts...) @@ -87,7 +96,7 @@ func (c *ApifyClient) HTTPClient() *http.Client { } // RunActor runs an actor with the given input -func (c *ApifyClient) RunActor(actorId string, input interface{}) (*ActorRunResponse, error) { +func (c *ApifyClient) RunActor(actorId string, input any) (*ActorRunResponse, error) { url := fmt.Sprintf("%s/acts/%s/runs?token=%s", c.baseUrl, actorId, c.apiToken) logrus.Infof("Running actor %s", actorId) @@ -184,7 +193,7 @@ func (c *ApifyClient) GetActorRun(runId string) (*ActorRunResponse, error) { } // GetDatasetItems gets items from a dataset with pagination -func (c *ApifyClient) GetDatasetItems(datasetId string, offset, limit int) (*DatasetResponse, error) { +func (c *ApifyClient) GetDatasetItems(datasetId string, offset, limit uint) (*DatasetResponse, error) { url := fmt.Sprintf("%s/datasets/%s/items?token=%s&offset=%d&limit=%d", c.baseUrl, datasetId, c.apiToken, offset, limit) logrus.Debugf("Getting dataset items: %s (offset: %d, limit: %d)", datasetId, offset, limit) @@ -228,12 +237,12 @@ func (c *ApifyClient) GetDatasetItems(datasetId string, offset, limit int) (*Dat datasetResp := &DatasetResponse{ Data: struct { Items []json.RawMessage `json:"items"` - Count int `json:"count"` - Offset int `json:"offset"` - Limit int `json:"limit"` + Count uint `json:"count"` + Offset uint `json:"offset"` + Limit uint `json:"limit"` }{ Items: items, - Count: len(items), + Count: uint(len(items)), Offset: offset, Limit: limit, }, @@ -286,8 +295,8 @@ var ( ) // runActorAndGetProfiles runs the actor and retrieves profiles from the dataset -func (c *ApifyClient) RunActorAndGetResponse(actorId string, input any, cursor Cursor, limit int) (*DatasetResponse, Cursor, error) { - var offset int +func (c *ApifyClient) RunActorAndGetResponse(actorId string, input any, cursor Cursor, limit uint) (*DatasetResponse, Cursor, error) { + var offset uint if cursor != EmptyCursor { offset = parseCursor(cursor) } @@ -339,12 +348,13 @@ PollLoop: // 4. Generate next cursor if more data may be available var nextCursor Cursor - if len(dataset.Data.Items) == limit { - nextCursor = generateCursor(offset + len(dataset.Data.Items)) - logrus.Debugf("Generated next cursor for offset %d", offset+len(dataset.Data.Items)) + if uint(len(dataset.Data.Items)) == limit { + nextOffset := offset + uint(len(dataset.Data.Items)) + nextCursor = generateCursor(nextOffset) + logrus.Debugf("Generated next cursor for offset %d", nextOffset) } - if len(dataset.Data.Items) == limit { + if uint(len(dataset.Data.Items)) == limit { logrus.Infof("Successfully retrieved %d profiles; more may be available", len(dataset.Data.Items)) } else { logrus.Infof("Successfully retrieved %d profiles", len(dataset.Data.Items)) @@ -353,7 +363,7 @@ PollLoop: } // parseCursor decodes a base64 cursor to get the offset -func parseCursor(cursor Cursor) int { +func parseCursor(cursor Cursor) uint { if cursor == "" { return 0 } @@ -374,7 +384,7 @@ func parseCursor(cursor Cursor) int { } // generateCursor encodes an offset as a base64 cursor -func generateCursor(offset int) Cursor { +func generateCursor(offset uint) Cursor { cursorData := CursorData{Offset: offset} data, err := json.Marshal(cursorData) if err != nil { From bc77dcb185b21c6a6ed861332d8a23633fc9067d Mon Sep 17 00:00:00 2001 From: mcamou Date: Thu, 21 Aug 2025 17:06:41 +0200 Subject: [PATCH 2/6] Test fixes --- api/types/reddit/reddit_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/api/types/reddit/reddit_test.go b/api/types/reddit/reddit_test.go index 2f15c742..3eef7089 100644 --- a/api/types/reddit/reddit_test.go +++ b/api/types/reddit/reddit_test.go @@ -13,7 +13,7 @@ import ( var _ = Describe("Response", func() { Context("Unmarshalling JSON", func() { It("should correctly unmarshal a UserResponse", func() { - jsonData := `{"type": "user", "id": "u1", "username": "testuser"}` + jsonData := `{"dataType": "user", "id": "u1", "username": "testuser"}` var resp reddit.Response err := json.Unmarshal([]byte(jsonData), &resp) Expect(err).NotTo(HaveOccurred()) @@ -26,7 +26,7 @@ var _ = Describe("Response", func() { }) It("should correctly unmarshal a PostResponse", func() { - jsonData := `{"type": "post", "id": "p1", "title": "Test Post"}` + jsonData := `{"dataType": "post", "id": "p1", "title": "Test Post"}` var resp reddit.Response err := json.Unmarshal([]byte(jsonData), &resp) Expect(err).NotTo(HaveOccurred()) @@ -39,7 +39,7 @@ var _ = Describe("Response", func() { }) It("should correctly unmarshal a CommentResponse", func() { - jsonData := `{"type": "comment", "id": "c1", "body": "Test Comment"}` + jsonData := `{"dataType": "comment", "id": "c1", "body": "Test Comment"}` var resp reddit.Response err := json.Unmarshal([]byte(jsonData), &resp) Expect(err).NotTo(HaveOccurred()) @@ -52,7 +52,7 @@ var _ = Describe("Response", func() { }) It("should correctly unmarshal a CommunityResponse", func() { - jsonData := `{"type": "community", "id": "co1", "name": "Test Community"}` + jsonData := `{"dataType": "community", "id": "co1", "name": "Test Community"}` var resp reddit.Response err := json.Unmarshal([]byte(jsonData), &resp) Expect(err).NotTo(HaveOccurred()) @@ -65,11 +65,11 @@ var _ = Describe("Response", func() { }) It("should return an error for an unknown type", func() { - jsonData := `{"type": "unknown", "id": "u1"}` + jsonData := `{"dataType": "unknown", "id": "u1"}` var resp reddit.Response err := json.Unmarshal([]byte(jsonData), &resp) Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring("unknown Reddit response type: unknown")) + Expect(err.Error()).To(ContainSubstring("unknown Reddit response type during unmarshal: unknown")) }) It("should return an error for invalid JSON", func() { From 20f5576c372dbd73feaccfc63d8a4c7d14b9dd61 Mon Sep 17 00:00:00 2001 From: mcamou Date: Thu, 21 Aug 2025 18:20:22 +0200 Subject: [PATCH 3/6] Some debugging info and update README --- README.md | 2 +- api/types/job.go | 4 ++++ internal/jobs/reddit.go | 4 +++- 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 1828962b..8b8c3d95 100644 --- a/README.md +++ b/README.md @@ -119,7 +119,7 @@ SIG=$(curl -s localhost:8080/job/generate \ -H "Content-Type: application/json" \ -H "Authorization: Bearer ${API_KEY}" \ -d '{ - "type": "web-scraper", + "type": "web", "arguments": { "url": "https://example.com", "depth": 1 diff --git a/api/types/job.go b/api/types/job.go index 7313683c..2f91ba7d 100644 --- a/api/types/job.go +++ b/api/types/job.go @@ -31,6 +31,10 @@ type Job struct { Timeout time.Duration `json:"timeout"` } +func (j Job) String() string { + return fmt.Sprintf("UUID: %s Type: %s Arguments: %s", j.UUID, j.Type, j.Arguments) +} + var letterRunes = []rune("0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!@#$%^&*()_+") func randStringRunes(n int) string { diff --git a/internal/jobs/reddit.go b/internal/jobs/reddit.go index 818d2e2a..f305a964 100644 --- a/internal/jobs/reddit.go +++ b/internal/jobs/reddit.go @@ -41,8 +41,10 @@ type RedditScraper struct { } func NewRedditScraper(jc types.JobConfiguration, statsCollector *stats.StatsCollector) *RedditScraper { + config := jc.GetRedditConfig() + logrus.Info("Reddit scraper via Apify initialized") return &RedditScraper{ - configuration: jc.GetRedditConfig(), + configuration: config, statsCollector: statsCollector, capabilities: teetypes.RedditCaps, } From c633b24ca4174a425bc49d3d0de1fd71f82115d1 Mon Sep 17 00:00:00 2001 From: mcamou Date: Thu, 21 Aug 2025 19:11:10 +0200 Subject: [PATCH 4/6] README updates, and add more explicit error messages --- README.md | 81 +++++++++++++++++++++++++++++++++++++++--- api/types/encrypted.go | 7 ++-- internal/api/routes.go | 2 +- 3 files changed, 81 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index 8b8c3d95..078f65e1 100644 --- a/README.md +++ b/README.md @@ -80,28 +80,32 @@ The worker automatically detects and exposes capabilities based on available con - **Sub-capabilities**: `["transcription"]` - **Requirements**: None (always available) +3. **`reddit`** - Reddit scraping services + - **Sub-capabilities**: `["scrapeurls","searchposts","searchusers","searchcommunities"]` + - **Requirements**: `APIFY_API_KEY` environment variable + **Twitter Services (Configuration-Dependent):** -3. **`twitter-credential`** - Twitter scraping with credentials +4. **`twitter-credential`** - Twitter scraping with credentials - **Sub-capabilities**: `["searchbyquery", "searchbyfullarchive", "searchbyprofile", "getbyid", "getreplies", "getretweeters", "gettweets", "getmedia", "gethometweets", "getforyoutweets", "getprofilebyid", "gettrends", "getfollowing", "getfollowers", "getspace"]` - **Requirements**: `TWITTER_ACCOUNTS` environment variable -4. **`twitter-api`** - Twitter scraping with API keys +5. **`twitter-api`** - Twitter scraping with API keys - **Sub-capabilities**: `["searchbyquery", "getbyid", "getprofilebyid"]` (basic), plus `["searchbyfullarchive"]` for elevated API keys - **Requirements**: `TWITTER_API_KEYS` environment variable -5. **`twitter`** - General Twitter scraping (uses best available auth) +6. **`twitter`** - General Twitter scraping (uses best available auth) - **Sub-capabilities**: Dynamic based on available authentication (combines capabilities from credential, API, and Apify depending on what's configured) - **Requirements**: Either `TWITTER_ACCOUNTS`, `TWITTER_API_KEYS`, or `APIFY_API_KEY` - **Priority**: For follower/following operations: Apify > Credentials. For search operations: Credentials > API. -6. **`twitter-apify`** - Twitter scraping using Apify's API (requires `APIFY_API_KEY`) +7. **`twitter-apify`** - Twitter scraping using Apify's API (requires `APIFY_API_KEY`) - **Sub-capabilities**: `["getfollowers", "getfollowing"]` - **Requirements**: `APIFY_API_KEY` environment variable **Stats Service (Always Available):** -7. **`telemetry`** - Worker monitoring and stats +8. **`telemetry`** - Worker monitoring and stats - **Sub-capabilities**: `["telemetry"]` - **Requirements**: None (always available) @@ -201,6 +205,73 @@ Transcribes TikTok videos to text. } ``` +#### Reddit Job Types + +There are four different types of Reddit searches: + +- `scrapeurls`: Gets the content of one or more Reddit URLs +- `searchposts`: Searches posts and comments +- `searchusers`: Searches user profiles +- `searchcommunities`: Searches communities + +**Parameters** (all are optional except where noted) + +- `urls` (array of object with `url` and `query` keys, required for `scrapeurls`): Each element contains a Reddit URL to scrape together with the method (which by default will be `"GET"`). +- `queries` (array of string, required for all job types except `scrapeurls`): Each element is a string to search for. +- `sort` (string) What to order by. Possible values are `"relevance"`, `"hot"`, `"top"`, `"new"`, `"rising"` and `"comments"`. +- `include_nsfw` (boolean): Whether to include content tagged NSFW. Default is `false`. +- `skip_posts`: (boolean): If `true`, `searchusers` will not return user posts. Default is `false`. +- `after`: (string, ISO8601 timestamp): Only return entries created after this date/time. +- `max_items` (nonnegative integer): How many items to load in the server cache (page through them using the cursor). Default is 10. +- `max_results` (nonnegative integer): How many results to return per page. Default is 10. +- `max_posts` (nonnegative integer): How many results to return per page. Default is 10. +- `max_comments` (nonnegative integer): How many results to return per page maximum. Default is 10. +- `max_communities` (nonnegative integer): How many results to return per page maximum. Default is 2. +- `max_users` (nonnegative integer): How many users to return per page maximum. Default is 2. +- `next_cursor` (string, optional): Pagination cursor. + +##### Reddit Search Operations + +**`scrapeurls`** - Scrape Reddit URLs + +``` json +{ + "type": "reddit", + "arguments": { + "type": "scrapeurls", + "urls": [ + { + "url": "https://reddit.com/r/ArtificialIntelligence", + "method": "GET" + }, + { + "url": "https://reddit.com/u/TheTelegraph" + } + ], + "sort": "new", + "include_nsfw": true, + "max_items": 100 + } +} +``` + +**`searchusers`** - Search Reddit users + +``` json +{ + "type": "reddit", + "arguments": { + "type": "searchusers", + "queries": [ + "NASA", + "European Space Agency" + ], + "sort": "relevance", + "skip_posts": true, + } +} +``` + #### Twitter Job Types Twitter scraping is available through four job types: diff --git a/api/types/encrypted.go b/api/types/encrypted.go index 90e7c7c1..1d5f36ad 100644 --- a/api/types/encrypted.go +++ b/api/types/encrypted.go @@ -2,6 +2,7 @@ package types import ( "encoding/json" + "fmt" "github.com/masa-finance/tee-worker/pkg/tee" ) @@ -14,17 +15,17 @@ type EncryptedRequest struct { func (payload EncryptedRequest) Unseal() (string, error) { jobRequest, err := tee.Unseal(payload.EncryptedRequest) if err != nil { - return "", err + return "", fmt.Errorf("error while unsealing the encrypted request: %w", err) } job := Job{} if err := json.Unmarshal(jobRequest, &job); err != nil { - return "", err + return "", fmt.Errorf("error while unmarshalling the job request: %w", err) } dat, err := tee.UnsealWithKey(job.Nonce, payload.EncryptedResult) if err != nil { - return "", err + return "", fmt.Errorf("error while unsealing the job result: %w", err) } return string(dat), nil diff --git a/internal/api/routes.go b/internal/api/routes.go index 79231df3..7c631b52 100644 --- a/internal/api/routes.go +++ b/internal/api/routes.go @@ -108,7 +108,7 @@ func result(c echo.Context) error { result, err := payload.Unseal() if err != nil { - logrus.Errorf("Error while binding for getting result: %s", err) + logrus.Errorf("Error while unsealing payload for getting result: %s", err) return c.JSON(http.StatusInternalServerError, types.JobError{Error: err.Error()}) } From 20bd1f1e8667be434e3c585a688e340cf7df388e Mon Sep 17 00:00:00 2001 From: mcamou Date: Thu, 21 Aug 2025 19:43:23 +0200 Subject: [PATCH 5/6] Update README and test --- README.md | 34 +++++++++++++++++++++++++++++++++ api/types/reddit/reddit_test.go | 2 +- go.mod | 2 +- go.sum | 4 ++-- 4 files changed, 38 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 078f65e1..2be1c560 100644 --- a/README.md +++ b/README.md @@ -272,6 +272,40 @@ There are four different types of Reddit searches: } ``` +**`searchposts`** - Search Reddit posts + +``` json +{ + "type": "reddit", + "arguments": { + "type": "searchposts", + "queries": [ + "NASA", + "European Space Agency" + ], + "max_items": 100, + "max_results": 10, + "max_posts": 5 + } +} +``` + +**`searchcommunities`** - Search Reddit posts + +``` json +{ + "type": "reddit", + "arguments": { + "type": "searchposts", + "queries": [ + "Artificial Intelligence" + ], + "max_items": 100, + "max_results": 10, + } +} +``` + #### Twitter Job Types Twitter scraping is available through four job types: diff --git a/api/types/reddit/reddit_test.go b/api/types/reddit/reddit_test.go index 3eef7089..d12319e4 100644 --- a/api/types/reddit/reddit_test.go +++ b/api/types/reddit/reddit_test.go @@ -137,7 +137,7 @@ var _ = Describe("Response", func() { } _, err := json.Marshal(&resp) Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring("unknown Reddit response type: unknown")) + Expect(err.Error()).To(ContainSubstring("unknown Reddit response type during marshal: unknown")) }) It("should marshal to null if TypeSwitch is nil", func() { diff --git a/go.mod b/go.mod index bfbb2b87..2e80f229 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ require ( replace github.com/imperatrona/twitter-scraper => github.com/masa-finance/twitter-scraper v1.0.2 // FIXME: Remove once the correct version of tee-types is tagged -replace github.com/masa-finance/tee-types => github.com/masa-finance/tee-types v1.1.8-0.20250821113038-3a5a22710aa3 +replace github.com/masa-finance/tee-types => github.com/masa-finance/tee-types v1.1.8-0.20250821165509-06dd40db19ea require ( github.com/AlexEidt/Vidio v1.5.1 // indirect diff --git a/go.sum b/go.sum index dd2ba8f0..a2943562 100644 --- a/go.sum +++ b/go.sum @@ -57,8 +57,8 @@ github.com/labstack/echo/v4 v4.13.4 h1:oTZZW+T3s9gAu5L8vmzihV7/lkXGZuITzTQkTEhcX github.com/labstack/echo/v4 v4.13.4/go.mod h1:g63b33BZ5vZzcIUF8AtRH40DrTlXnx4UMC8rBdndmjQ= github.com/labstack/gommon v0.4.2 h1:F8qTUNXgG1+6WQmqoUWnz8WiEU60mXVVw0P4ht1WRA0= github.com/labstack/gommon v0.4.2/go.mod h1:QlUFxVM+SNXhDL/Z7YhocGIBYOiwB0mXm1+1bAPHPyU= -github.com/masa-finance/tee-types v1.1.8-0.20250821113038-3a5a22710aa3 h1:XgubCLsiFOfJ7O2Ak4+qmGUwxADVzgim6fnMGs5Xb/c= -github.com/masa-finance/tee-types v1.1.8-0.20250821113038-3a5a22710aa3/go.mod h1:sB98t0axFlPi2d0zUPFZSQ84mPGwbr9eRY5yLLE3fSc= +github.com/masa-finance/tee-types v1.1.8-0.20250821165509-06dd40db19ea h1:cUxF5cwrFVUUq4C0gnGgYJssAi6SyEPnxsOWYD7ryeE= +github.com/masa-finance/tee-types v1.1.8-0.20250821165509-06dd40db19ea/go.mod h1:sB98t0axFlPi2d0zUPFZSQ84mPGwbr9eRY5yLLE3fSc= github.com/masa-finance/twitter-scraper v1.0.2 h1:him+wvYZHg/7EDdy73z1ceUywDJDRAhPLD2CSEa2Vfk= github.com/masa-finance/twitter-scraper v1.0.2/go.mod h1:38MY3g/h4V7Xl4HbW9lnkL8S3YiFZenBFv86hN57RG8= github.com/mattn/go-colorable v0.1.14 h1:9A9LHSqF/7dyVVX6g0U9cwm9pG3kP9gSzcuIPHPsaIE= From 97de63b816384b3993a390febe8c1d50af02e82c Mon Sep 17 00:00:00 2001 From: mcamou Date: Fri, 22 Aug 2025 11:58:48 +0200 Subject: [PATCH 6/6] Update tee-types and PR comments --- README.md | 2 +- go.mod | 5 +---- go.sum | 4 ++-- internal/jobs/reddit.go | 2 +- internal/jobs/redditapify/client_test.go | 2 -- pkg/client/apify_client.go | 7 +------ 6 files changed, 6 insertions(+), 16 deletions(-) diff --git a/README.md b/README.md index 2be1c560..c8850862 100644 --- a/README.md +++ b/README.md @@ -296,7 +296,7 @@ There are four different types of Reddit searches: { "type": "reddit", "arguments": { - "type": "searchposts", + "type": "searchcommunities", "queries": [ "Artificial Intelligence" ], diff --git a/go.mod b/go.mod index 2e80f229..bb88eb33 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/joho/godotenv v1.5.1 github.com/labstack/echo-contrib v0.17.4 github.com/labstack/echo/v4 v4.13.4 - github.com/masa-finance/tee-types v1.1.7 + github.com/masa-finance/tee-types v1.1.9 github.com/onsi/ginkgo/v2 v2.23.4 github.com/onsi/gomega v1.38.0 github.com/sirupsen/logrus v1.9.3 @@ -21,9 +21,6 @@ require ( replace github.com/imperatrona/twitter-scraper => github.com/masa-finance/twitter-scraper v1.0.2 -// FIXME: Remove once the correct version of tee-types is tagged -replace github.com/masa-finance/tee-types => github.com/masa-finance/tee-types v1.1.8-0.20250821165509-06dd40db19ea - require ( github.com/AlexEidt/Vidio v1.5.1 // indirect github.com/go-task/slim-sprig/v3 v3.0.0 // indirect diff --git a/go.sum b/go.sum index a2943562..a33a8255 100644 --- a/go.sum +++ b/go.sum @@ -57,8 +57,8 @@ github.com/labstack/echo/v4 v4.13.4 h1:oTZZW+T3s9gAu5L8vmzihV7/lkXGZuITzTQkTEhcX github.com/labstack/echo/v4 v4.13.4/go.mod h1:g63b33BZ5vZzcIUF8AtRH40DrTlXnx4UMC8rBdndmjQ= github.com/labstack/gommon v0.4.2 h1:F8qTUNXgG1+6WQmqoUWnz8WiEU60mXVVw0P4ht1WRA0= github.com/labstack/gommon v0.4.2/go.mod h1:QlUFxVM+SNXhDL/Z7YhocGIBYOiwB0mXm1+1bAPHPyU= -github.com/masa-finance/tee-types v1.1.8-0.20250821165509-06dd40db19ea h1:cUxF5cwrFVUUq4C0gnGgYJssAi6SyEPnxsOWYD7ryeE= -github.com/masa-finance/tee-types v1.1.8-0.20250821165509-06dd40db19ea/go.mod h1:sB98t0axFlPi2d0zUPFZSQ84mPGwbr9eRY5yLLE3fSc= +github.com/masa-finance/tee-types v1.1.9 h1:Wdw5l+GkwXPjzJfVRULpSUw1WPH4KCirOCtU2Doc3So= +github.com/masa-finance/tee-types v1.1.9/go.mod h1:sB98t0axFlPi2d0zUPFZSQ84mPGwbr9eRY5yLLE3fSc= github.com/masa-finance/twitter-scraper v1.0.2 h1:him+wvYZHg/7EDdy73z1ceUywDJDRAhPLD2CSEa2Vfk= github.com/masa-finance/twitter-scraper v1.0.2/go.mod h1:38MY3g/h4V7Xl4HbW9lnkL8S3YiFZenBFv86hN57RG8= github.com/mattn/go-colorable v0.1.14 h1:9A9LHSqF/7dyVVX6g0U9cwm9pG3kP9gSzcuIPHPsaIE= diff --git a/internal/jobs/reddit.go b/internal/jobs/reddit.go index f305a964..07272768 100644 --- a/internal/jobs/reddit.go +++ b/internal/jobs/reddit.go @@ -59,7 +59,7 @@ func (r *RedditScraper) ExecuteJob(j types.Job) (types.JobResult, error) { return types.JobResult{Error: msg.Error()}, msg } - // Type assert to TikTok arguments + // Type assert to Reddit arguments redditArgs, ok := jobArgs.(*teeargs.RedditArguments) if !ok { return types.JobResult{Error: "invalid argument type for Reddit job"}, errors.New("invalid argument type") diff --git a/internal/jobs/redditapify/client_test.go b/internal/jobs/redditapify/client_test.go index 8cc023c3..b6f075da 100644 --- a/internal/jobs/redditapify/client_test.go +++ b/internal/jobs/redditapify/client_test.go @@ -15,8 +15,6 @@ import ( teetypes "github.com/masa-finance/tee-types/types" ) -// FIXME: Really test with a live API key -// FIXME: Fix documentation // MockApifyClient is a mock implementation of the ApifyClient. type MockApifyClient struct { RunActorAndGetResponseFunc func(actorID string, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) diff --git a/pkg/client/apify_client.go b/pkg/client/apify_client.go index d14ca0c6..9f3c637a 100644 --- a/pkg/client/apify_client.go +++ b/pkg/client/apify_client.go @@ -235,12 +235,7 @@ func (c *ApifyClient) GetDatasetItems(datasetId string, offset, limit uint) (*Da // Create a DatasetResponse object with the items and estimated pagination info datasetResp := &DatasetResponse{ - Data: struct { - Items []json.RawMessage `json:"items"` - Count uint `json:"count"` - Offset uint `json:"offset"` - Limit uint `json:"limit"` - }{ + Data: ApifyDatasetData{ Items: items, Count: uint(len(items)), Offset: offset,