From 28c7e2708fee40d0397256b6d4b42b8fe90ab7c0 Mon Sep 17 00:00:00 2001 From: grantdfoster Date: Thu, 18 Sep 2025 04:54:02 +0200 Subject: [PATCH 01/10] feat: rented actor detection --- internal/actors/actors.go | 11 ++++++ internal/capabilities/detector.go | 38 +++++++++++++++++---- internal/jobs/llmapify/client.go | 7 ++-- internal/jobs/llmapify/client_test.go | 3 +- internal/jobs/redditapify/client.go | 7 ++-- internal/jobs/redditapify/client_test.go | 9 ++--- internal/jobs/stats/stats.go | 30 +++++++---------- internal/jobs/tiktokapify/client.go | 11 ++---- internal/jobs/twitterapify/client.go | 7 ++-- internal/jobs/webapify/client.go | 7 ++-- internal/jobs/webapify/client_test.go | 3 +- pkg/client/apify_client.go | 43 ++++++++++++++++++++++++ 12 files changed, 117 insertions(+), 59 deletions(-) create mode 100644 internal/actors/actors.go diff --git a/internal/actors/actors.go b/internal/actors/actors.go new file mode 100644 index 00000000..15d4d637 --- /dev/null +++ b/internal/actors/actors.go @@ -0,0 +1,11 @@ +package actors + +// Apify actors that are used in various job types +const ( + RedditScraper = "trudax~reddit-scraper" + TikTokSearchScraper = "epctex~tiktok-search-scraper" + TikTokTrendingScraper = "lexis-solutions~tiktok-trending-videos-scraper" + LLMDatasetProcessor = "dusan.vystrcil~llm-dataset-processor" + TwitterFollowers = "kaitoeasyapi~premium-x-follower-scraper-following-data" + WebScraper = "apify~website-content-crawler" +) diff --git a/internal/capabilities/detector.go b/internal/capabilities/detector.go index 27a4cd14..eb9ab895 100644 --- a/internal/capabilities/detector.go +++ b/internal/capabilities/detector.go @@ -8,6 +8,7 @@ import ( util "github.com/masa-finance/tee-types/pkg/util" teetypes "github.com/masa-finance/tee-types/types" + "github.com/masa-finance/tee-worker/internal/actors" "github.com/masa-finance/tee-worker/internal/config" "github.com/masa-finance/tee-worker/internal/jobs/twitter" "github.com/masa-finance/tee-worker/pkg/client" @@ -64,16 +65,39 @@ func DetectCapabilities(jc config.JobConfiguration, jobServer JobServerInterface } // Add Apify-specific capabilities based on available API key - // TODO: We should verify whether each of the actors is actually available through this API key if hasApifyKey { + // Add default Apify capabilities capabilities[teetypes.TwitterApifyJob] = teetypes.TwitterApifyCaps - capabilities[teetypes.RedditJob] = teetypes.RedditCaps - // Merge TikTok search caps with any existing - existing := capabilities[teetypes.TiktokJob] - s := util.NewSet(existing...) - s.Add(teetypes.TiktokSearchCaps...) - capabilities[teetypes.TiktokJob] = s.Items() + // Create an Apify client for probing rented actors + c, err := client.NewApifyClient(apifyApiKey) + if err != nil { + logrus.Errorf("Failed to create Apify client for access probe: %v", err) + } else { + // Reddit access probe + if ok, _ := c.(*client.ApifyClient).ProbeActorAccess(actors.RedditScraper, map[string]any{}); ok { + capabilities[teetypes.RedditJob] = teetypes.RedditCaps + } else { + logrus.Warnf("Apify token does not have access to actor %s", actors.RedditScraper) + } + + // TikTok probes (search and trending handled independently) + tiktokCapSet := util.NewSet(capabilities[teetypes.TiktokJob]...) + + if ok, _ := c.(*client.ApifyClient).ProbeActorAccess(actors.TikTokSearchScraper, map[string]any{"proxy": map[string]any{"useApifyProxy": true}}); ok { + tiktokCapSet.Add(teetypes.CapSearchByQuery) + } else { + logrus.Warnf("Apify token does not have access to actor %s", actors.TikTokSearchScraper) + } + if ok, _ := c.(*client.ApifyClient).ProbeActorAccess(actors.TikTokTrendingScraper, map[string]any{}); ok { + tiktokCapSet.Add(teetypes.CapSearchByTrending) + } else { + logrus.Warnf("Apify token does not have access to actor %s", actors.TikTokTrendingScraper) + } + + capabilities[teetypes.TiktokJob] = tiktokCapSet.Items() + + } if hasLLMKey { capabilities[teetypes.WebJob] = teetypes.WebCaps diff --git a/internal/jobs/llmapify/client.go b/internal/jobs/llmapify/client.go index e6d5b977..4d8f2169 100644 --- a/internal/jobs/llmapify/client.go +++ b/internal/jobs/llmapify/client.go @@ -7,16 +7,13 @@ import ( teeargs "github.com/masa-finance/tee-types/args" teetypes "github.com/masa-finance/tee-types/types" + "github.com/masa-finance/tee-worker/internal/actors" "github.com/masa-finance/tee-worker/internal/config" "github.com/masa-finance/tee-worker/internal/jobs/stats" "github.com/masa-finance/tee-worker/pkg/client" "github.com/sirupsen/logrus" ) -const ( - ActorID = "dusan.vystrcil~llm-dataset-processor" -) - var ( ErrProviderKeyRequired = errors.New("llm provider key is required") ErrFailedToCreateClient = errors.New("failed to create apify client") @@ -66,7 +63,7 @@ func (c *ApifyClient) Process(workerID string, args teeargs.LLMProcessorArgument input.LLMProviderApiKey = string(c.llmConfig.GeminiApiKey) limit := uint(args.Items) - dataset, nextCursor, err := c.client.RunActorAndGetResponse(ActorID, input, cursor, limit) + dataset, nextCursor, err := c.client.RunActorAndGetResponse(actors.LLMDatasetProcessor, input, cursor, limit) if err != nil { if c.statsCollector != nil { c.statsCollector.Add(workerID, stats.LLMErrors, 1) diff --git a/internal/jobs/llmapify/client_test.go b/internal/jobs/llmapify/client_test.go index ed81ae60..96136481 100644 --- a/internal/jobs/llmapify/client_test.go +++ b/internal/jobs/llmapify/client_test.go @@ -10,6 +10,7 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "github.com/masa-finance/tee-worker/internal/actors" "github.com/masa-finance/tee-worker/internal/config" "github.com/masa-finance/tee-worker/internal/jobs/llmapify" "github.com/masa-finance/tee-worker/pkg/client" @@ -71,7 +72,7 @@ var _ = Describe("LLMApifyClient", func() { Expect(err).ToNot(HaveOccurred()) mockClient.RunActorAndGetResponseFunc = func(actorID string, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { - Expect(actorID).To(Equal(llmapify.ActorID)) + Expect(actorID).To(Equal(actors.LLMDatasetProcessor)) Expect(limit).To(Equal(uint(1))) // Verify the input is correctly converted to LLMProcessorRequest diff --git a/internal/jobs/redditapify/client.go b/internal/jobs/redditapify/client.go index 70a2ba9b..456ef49b 100644 --- a/internal/jobs/redditapify/client.go +++ b/internal/jobs/redditapify/client.go @@ -8,6 +8,7 @@ import ( "github.com/sirupsen/logrus" "github.com/masa-finance/tee-worker/api/types/reddit" + "github.com/masa-finance/tee-worker/internal/actors" "github.com/masa-finance/tee-worker/internal/jobs/stats" "github.com/masa-finance/tee-worker/pkg/client" @@ -15,10 +16,6 @@ import ( teetypes "github.com/masa-finance/tee-types/types" ) -const ( - RedditActorID = "trudax~reddit-scraper" // must rent this actor from apify explicitly -) - // CommonArgs holds the parameters that all Reddit searches support, in a single struct type CommonArgs struct { Sort teetypes.RedditSortType @@ -167,7 +164,7 @@ func (c *RedditApifyClient) queryReddit(workerID string, input RedditActorReques c.statsCollector.Add(workerID, stats.RedditQueries, 1) } - dataset, nextCursor, err := c.apifyClient.RunActorAndGetResponse(RedditActorID, input, cursor, limit) + dataset, nextCursor, err := c.apifyClient.RunActorAndGetResponse(actors.RedditScraper, input, cursor, limit) if err != nil { if c.statsCollector != nil { c.statsCollector.Add(workerID, stats.RedditErrors, 1) diff --git a/internal/jobs/redditapify/client_test.go b/internal/jobs/redditapify/client_test.go index 913fd17d..29dba003 100644 --- a/internal/jobs/redditapify/client_test.go +++ b/internal/jobs/redditapify/client_test.go @@ -8,6 +8,7 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "github.com/masa-finance/tee-worker/internal/actors" "github.com/masa-finance/tee-worker/internal/jobs/redditapify" "github.com/masa-finance/tee-worker/pkg/client" @@ -59,7 +60,7 @@ var _ = Describe("RedditApifyClient", func() { args := redditapify.CommonArgs{MaxPosts: 10} mockClient.RunActorAndGetResponseFunc = func(actorID string, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { - Expect(actorID).To(Equal(redditapify.RedditActorID)) + Expect(actorID).To(Equal(actors.RedditScraper)) req := input.(redditapify.RedditActorRequest) Expect(req.StartUrls).To(Equal(urls)) Expect(*req.PostDateLimit).To(BeTemporally("~", after, time.Second)) @@ -84,7 +85,7 @@ var _ = Describe("RedditApifyClient", func() { args := redditapify.CommonArgs{MaxComments: 5} mockClient.RunActorAndGetResponseFunc = func(actorID string, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { - Expect(actorID).To(Equal(redditapify.RedditActorID)) + Expect(actorID).To(Equal(actors.RedditScraper)) req := input.(redditapify.RedditActorRequest) Expect(req.Searches).To(Equal(queries)) Expect(req.StartUrls).To(BeNil()) @@ -107,7 +108,7 @@ var _ = Describe("RedditApifyClient", func() { args := redditapify.CommonArgs{} mockClient.RunActorAndGetResponseFunc = func(actorID string, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { - Expect(actorID).To(Equal(redditapify.RedditActorID)) + Expect(actorID).To(Equal(actors.RedditScraper)) req := input.(redditapify.RedditActorRequest) Expect(req.Searches).To(Equal(queries)) Expect(req.StartUrls).To(BeNil()) @@ -127,7 +128,7 @@ var _ = Describe("RedditApifyClient", func() { args := redditapify.CommonArgs{} mockClient.RunActorAndGetResponseFunc = func(actorID string, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { - Expect(actorID).To(Equal(redditapify.RedditActorID)) + Expect(actorID).To(Equal(actors.RedditScraper)) req := input.(redditapify.RedditActorRequest) Expect(req.Searches).To(Equal(queries)) Expect(req.StartUrls).To(BeNil()) diff --git a/internal/jobs/stats/stats.go b/internal/jobs/stats/stats.go index ada7ab98..e41edbfe 100644 --- a/internal/jobs/stats/stats.go +++ b/internal/jobs/stats/stats.go @@ -6,12 +6,16 @@ import ( "time" teetypes "github.com/masa-finance/tee-types/types" - "github.com/masa-finance/tee-worker/internal/capabilities" "github.com/masa-finance/tee-worker/internal/config" "github.com/masa-finance/tee-worker/internal/versioning" "github.com/sirupsen/logrus" ) +// WorkerCapabilitiesProvider abstracts capability retrieval to avoid import cycles +type WorkerCapabilitiesProvider interface { + GetWorkerCapabilities() teetypes.WorkerCapabilities +} + // These are the types of statistics that we can add. The value is the JSON key that will be used for serialization. type StatType string @@ -68,7 +72,7 @@ type Stats struct { type StatsCollector struct { Stats *Stats Chan chan AddStat - jobServer capabilities.JobServerInterface + jobServer WorkerCapabilitiesProvider jobConfiguration config.JobConfiguration } @@ -83,12 +87,6 @@ func StartCollector(bufSize uint, jc config.JobConfiguration) *StatsCollector { ApplicationVersion: versioning.ApplicationVersion, } - // Initial capability detection without JobServer (basic capabilities only) - // Full capability detection will happen when JobServer is set - s.ReportedCapabilities = capabilities.DetectCapabilities(jc, nil) - - logrus.Infof("Initial structured capabilities: %+v", s.ReportedCapabilities) - ch := make(chan AddStat, bufSize) go func(s *Stats, ch chan AddStat) { @@ -99,11 +97,7 @@ func StartCollector(bufSize uint, jc config.JobConfiguration) *StatsCollector { if _, ok := s.Stats[stat.WorkerID]; !ok { s.Stats[stat.WorkerID] = make(map[StatType]uint) } - if _, ok := s.Stats[stat.WorkerID][stat.Type]; ok { - s.Stats[stat.WorkerID][stat.Type] += stat.Num - } else { - s.Stats[stat.WorkerID][stat.Type] = stat.Num - } + s.Stats[stat.WorkerID][stat.Type] += stat.Num s.Unlock() logrus.Debugf("Added %d to stat %s. Current stats: %#v", stat.Num, stat.Type, s) } @@ -132,16 +126,16 @@ func (s *StatsCollector) SetWorkerID(workerID string) { s.Stats.WorkerID = workerID } -// SetJobServer sets the JobServer reference and updates capabilities with full detection -func (s *StatsCollector) SetJobServer(js capabilities.JobServerInterface) { +// SetJobServer sets the JobServer reference and updates capabilities +func (s *StatsCollector) SetJobServer(js WorkerCapabilitiesProvider) { s.jobServer = js - // Now that we have the JobServer, update capabilities with full detection + // Now that we have the JobServer, update capabilities s.Stats.Lock() defer s.Stats.Unlock() - // Auto-detect capabilities using the JobServer - s.Stats.ReportedCapabilities = capabilities.DetectCapabilities(s.jobConfiguration, js) + // Get capabilities from the JobServer directly + s.Stats.ReportedCapabilities = js.GetWorkerCapabilities() logrus.Infof("Updated structured capabilities with JobServer: %+v", s.Stats.ReportedCapabilities) } diff --git a/internal/jobs/tiktokapify/client.go b/internal/jobs/tiktokapify/client.go index 63a88a8b..0502a561 100644 --- a/internal/jobs/tiktokapify/client.go +++ b/internal/jobs/tiktokapify/client.go @@ -6,15 +6,10 @@ import ( teeargs "github.com/masa-finance/tee-types/args" teetypes "github.com/masa-finance/tee-types/types" + "github.com/masa-finance/tee-worker/internal/actors" "github.com/masa-finance/tee-worker/pkg/client" ) -const ( - // Actors - SearchActorID = "epctex~tiktok-search-scraper" // must rent this actor from apify explicitly - TrendingActorID = "lexis-solutions~tiktok-trending-videos-scraper" // must rent this actor from apify explicitly -) - type TikTokSearchByQueryRequest struct { SearchTerms []string `json:"search"` StartUrls []string `json:"startUrls"` @@ -79,7 +74,7 @@ func (c *TikTokApifyClient) SearchByQuery(input teeargs.TikTokSearchByQueryArgum return nil, "", fmt.Errorf("failed to unmarshal to map: %w", err) } - dataset, next, err := c.apify.RunActorAndGetResponse(SearchActorID, apifyInput, cursor, limit) + dataset, next, err := c.apify.RunActorAndGetResponse(actors.TikTokSearchScraper, apifyInput, cursor, limit) if err != nil { return nil, "", fmt.Errorf("apify run (search): %w", err) } @@ -115,7 +110,7 @@ func (c *TikTokApifyClient) SearchByTrending(input teeargs.TikTokSearchByTrendin return nil, "", fmt.Errorf("failed to unmarshal to map: %w", err) } - dataset, next, err := c.apify.RunActorAndGetResponse(TrendingActorID, apifyInput, cursor, limit) + dataset, next, err := c.apify.RunActorAndGetResponse(actors.TikTokTrendingScraper, apifyInput, cursor, limit) if err != nil { return nil, "", fmt.Errorf("apify run (trending): %w", err) } diff --git a/internal/jobs/twitterapify/client.go b/internal/jobs/twitterapify/client.go index f9c40d45..6fe9eaa2 100644 --- a/internal/jobs/twitterapify/client.go +++ b/internal/jobs/twitterapify/client.go @@ -6,14 +6,11 @@ import ( util "github.com/masa-finance/tee-types/pkg/util" teetypes "github.com/masa-finance/tee-types/types" + "github.com/masa-finance/tee-worker/internal/actors" "github.com/masa-finance/tee-worker/pkg/client" "github.com/sirupsen/logrus" ) -const ( - TwitterFollowerActorID = "kaitoeasyapi~premium-x-follower-scraper-following-data" -) - // FollowerActorRunRequest represents the input for running the Twitter follower actor type FollowerActorRunRequest struct { UserNames []string `json:"user_names"` @@ -86,7 +83,7 @@ func (c *TwitterApifyClient) GetFollowing(username string, cursor client.Cursor, // getProfiles runs the actor and retrieves profiles from the dataset func (c *TwitterApifyClient) getProfiles(input FollowerActorRunRequest, cursor client.Cursor, limit uint) ([]*teetypes.ProfileResultApify, client.Cursor, error) { - dataset, nextCursor, err := c.apifyClient.RunActorAndGetResponse(TwitterFollowerActorID, input, cursor, limit) + dataset, nextCursor, err := c.apifyClient.RunActorAndGetResponse(actors.TwitterFollowers, input, cursor, limit) if err != nil { return nil, client.EmptyCursor, err } diff --git a/internal/jobs/webapify/client.go b/internal/jobs/webapify/client.go index 601f23e9..508e27f8 100644 --- a/internal/jobs/webapify/client.go +++ b/internal/jobs/webapify/client.go @@ -6,15 +6,12 @@ import ( teeargs "github.com/masa-finance/tee-types/args" teetypes "github.com/masa-finance/tee-types/types" + "github.com/masa-finance/tee-worker/internal/actors" "github.com/masa-finance/tee-worker/internal/jobs/stats" "github.com/masa-finance/tee-worker/pkg/client" "github.com/sirupsen/logrus" ) -const ( - ActorID = "apify~website-content-crawler" -) - type ApifyClient struct { client client.Apify statsCollector *stats.StatsCollector @@ -52,7 +49,7 @@ func (c *ApifyClient) Scrape(workerID string, args teeargs.WebArguments, cursor input := args.ToWebScraperRequest() limit := uint(args.MaxPages) - dataset, nextCursor, err := c.client.RunActorAndGetResponse(ActorID, input, cursor, limit) + dataset, nextCursor, err := c.client.RunActorAndGetResponse(actors.WebScraper, input, cursor, limit) if err != nil { if c.statsCollector != nil { c.statsCollector.Add(workerID, stats.WebErrors, 1) diff --git a/internal/jobs/webapify/client_test.go b/internal/jobs/webapify/client_test.go index b377ae53..61dcc916 100644 --- a/internal/jobs/webapify/client_test.go +++ b/internal/jobs/webapify/client_test.go @@ -8,6 +8,7 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "github.com/masa-finance/tee-worker/internal/actors" "github.com/masa-finance/tee-worker/internal/jobs/webapify" "github.com/masa-finance/tee-worker/pkg/client" @@ -64,7 +65,7 @@ var _ = Describe("WebApifyClient", func() { } mockClient.RunActorAndGetResponseFunc = func(actorID string, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { - Expect(actorID).To(Equal(webapify.ActorID)) + Expect(actorID).To(Equal(actors.WebScraper)) Expect(limit).To(Equal(uint(2))) return &client.DatasetResponse{Data: client.ApifyDatasetData{Items: []json.RawMessage{}}}, "next", nil } diff --git a/pkg/client/apify_client.go b/pkg/client/apify_client.go index 64413420..095968cf 100644 --- a/pkg/client/apify_client.go +++ b/pkg/client/apify_client.go @@ -96,6 +96,49 @@ func (c *ApifyClient) HTTPClient() *http.Client { return c.httpOptions.HttpClient } +// AbortActorRun sends an abort request for a given actor run ID +func (c *ApifyClient) AbortActorRun(runId string) error { + url := fmt.Sprintf("%s/actor-runs/%s/abort?token=%s", c.baseUrl, runId, c.apiToken) + logrus.Infof("Stopping actor run: %s", runId) + + req, err := http.NewRequest("POST", url, nil) + if err != nil { + return fmt.Errorf("error creating abort request: %w", err) + } + + resp, err := c.httpOptions.HttpClient.Do(req) + if err != nil { + return fmt.Errorf("error making abort request: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { // Apify returns 200 OK on abort + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("unexpected status code on abort %d: %s", resp.StatusCode, string(body)) + } + return nil +} + +// ProbeActorAccess attempts to start a run and immediately abort to verify access +// Returns true if the token can start the actor (permission/rental present) +// Some actors require a default input to be provided +func (c *ApifyClient) ProbeActorAccess(actorId string, input map[string]any) (bool, error) { + // Use empty input; most actors accept defaults. We do not wait for finish. + runResp, err := c.RunActor(actorId, input) + if err != nil { + // RunActor already wraps status and message; treat any non-201 as no access + return false, err + } + // Best-effort abort to avoid consuming resources + if runResp != nil && runResp.Data.ID != "" { + if err := c.AbortActorRun(runResp.Data.ID); err != nil { + // Do not fail access detection if abort fails; just log + logrus.Warnf("Failed to abort probe run %s for actor %s: %v", runResp.Data.ID, actorId, err) + } + } + return true, nil +} + // RunActor runs an actor with the given input func (c *ApifyClient) RunActor(actorId string, input any) (*ActorRunResponse, error) { url := fmt.Sprintf("%s/acts/%s/runs?token=%s", c.baseUrl, actorId, c.apiToken) From f276ed90915308c998551280986900dc2a471df2 Mon Sep 17 00:00:00 2001 From: grantdfoster Date: Thu, 18 Sep 2025 05:11:06 +0200 Subject: [PATCH 02/10] chore: copilot comments --- internal/capabilities/detector.go | 6 +++--- internal/jobs/llmapify/client_test.go | 8 ++++++++ internal/jobs/redditapify/client_test.go | 8 ++++++++ internal/jobs/webapify/client_test.go | 8 ++++++++ pkg/client/apify_client.go | 1 + 5 files changed, 28 insertions(+), 3 deletions(-) diff --git a/internal/capabilities/detector.go b/internal/capabilities/detector.go index eb9ab895..c025671f 100644 --- a/internal/capabilities/detector.go +++ b/internal/capabilities/detector.go @@ -75,7 +75,7 @@ func DetectCapabilities(jc config.JobConfiguration, jobServer JobServerInterface logrus.Errorf("Failed to create Apify client for access probe: %v", err) } else { // Reddit access probe - if ok, _ := c.(*client.ApifyClient).ProbeActorAccess(actors.RedditScraper, map[string]any{}); ok { + if ok, _ := c.ProbeActorAccess(actors.RedditScraper, map[string]any{}); ok { capabilities[teetypes.RedditJob] = teetypes.RedditCaps } else { logrus.Warnf("Apify token does not have access to actor %s", actors.RedditScraper) @@ -84,12 +84,12 @@ func DetectCapabilities(jc config.JobConfiguration, jobServer JobServerInterface // TikTok probes (search and trending handled independently) tiktokCapSet := util.NewSet(capabilities[teetypes.TiktokJob]...) - if ok, _ := c.(*client.ApifyClient).ProbeActorAccess(actors.TikTokSearchScraper, map[string]any{"proxy": map[string]any{"useApifyProxy": true}}); ok { + if ok, _ := c.ProbeActorAccess(actors.TikTokSearchScraper, map[string]any{"proxy": map[string]any{"useApifyProxy": true}}); ok { tiktokCapSet.Add(teetypes.CapSearchByQuery) } else { logrus.Warnf("Apify token does not have access to actor %s", actors.TikTokSearchScraper) } - if ok, _ := c.(*client.ApifyClient).ProbeActorAccess(actors.TikTokTrendingScraper, map[string]any{}); ok { + if ok, _ := c.ProbeActorAccess(actors.TikTokTrendingScraper, map[string]any{}); ok { tiktokCapSet.Add(teetypes.CapSearchByTrending) } else { logrus.Warnf("Apify token does not have access to actor %s", actors.TikTokTrendingScraper) diff --git a/internal/jobs/llmapify/client_test.go b/internal/jobs/llmapify/client_test.go index 96136481..bfc1d0ec 100644 --- a/internal/jobs/llmapify/client_test.go +++ b/internal/jobs/llmapify/client_test.go @@ -23,6 +23,7 @@ import ( type MockApifyClient struct { RunActorAndGetResponseFunc func(actorID string, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) ValidateApiKeyFunc func() error + ProbeActorAccessFunc func(actorID string, input map[string]any) (bool, error) } func (m *MockApifyClient) RunActorAndGetResponse(actorID string, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { @@ -39,6 +40,13 @@ func (m *MockApifyClient) ValidateApiKey() error { return errors.New("ValidateApiKeyFunc not defined") } +func (m *MockApifyClient) ProbeActorAccess(actorID string, input map[string]any) (bool, error) { + if m.ProbeActorAccessFunc != nil { + return m.ProbeActorAccessFunc(actorID, input) + } + return false, errors.New("ProbeActorAccessFunc not defined") +} + var _ = Describe("LLMApifyClient", func() { var ( mockClient *MockApifyClient diff --git a/internal/jobs/redditapify/client_test.go b/internal/jobs/redditapify/client_test.go index 29dba003..64141d44 100644 --- a/internal/jobs/redditapify/client_test.go +++ b/internal/jobs/redditapify/client_test.go @@ -20,6 +20,7 @@ import ( type MockApifyClient struct { RunActorAndGetResponseFunc func(actorID string, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) ValidateApiKeyFunc func() error + ProbeActorAccessFunc func(actorID string, input map[string]any) (bool, error) } func (m *MockApifyClient) RunActorAndGetResponse(actorID string, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { @@ -36,6 +37,13 @@ func (m *MockApifyClient) ValidateApiKey() error { return errors.New("ValidateApiKeyFunc not defined") } +func (m *MockApifyClient) ProbeActorAccess(actorID string, input map[string]any) (bool, error) { + if m.ProbeActorAccessFunc != nil { + return m.ProbeActorAccessFunc(actorID, input) + } + return false, errors.New("ProbeActorAccessFunc not defined") +} + var _ = Describe("RedditApifyClient", func() { var ( mockClient *MockApifyClient diff --git a/internal/jobs/webapify/client_test.go b/internal/jobs/webapify/client_test.go index 61dcc916..65363611 100644 --- a/internal/jobs/webapify/client_test.go +++ b/internal/jobs/webapify/client_test.go @@ -19,6 +19,7 @@ import ( type MockApifyClient struct { RunActorAndGetResponseFunc func(actorID string, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) ValidateApiKeyFunc func() error + ProbeActorAccessFunc func(actorID string, input map[string]any) (bool, error) } func (m *MockApifyClient) RunActorAndGetResponse(actorID string, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { @@ -35,6 +36,13 @@ func (m *MockApifyClient) ValidateApiKey() error { return errors.New("ValidateApiKeyFunc not defined") } +func (m *MockApifyClient) ProbeActorAccess(actorID string, input map[string]any) (bool, error) { + if m.ProbeActorAccessFunc != nil { + return m.ProbeActorAccessFunc(actorID, input) + } + return false, errors.New("ProbeActorAccessFunc not defined") +} + var _ = Describe("WebApifyClient", func() { var ( mockClient *MockApifyClient diff --git a/pkg/client/apify_client.go b/pkg/client/apify_client.go index 095968cf..e3d67c71 100644 --- a/pkg/client/apify_client.go +++ b/pkg/client/apify_client.go @@ -28,6 +28,7 @@ const ( type Apify interface { RunActorAndGetResponse(actorId string, input any, cursor Cursor, limit uint) (*DatasetResponse, Cursor, error) ValidateApiKey() error + ProbeActorAccess(actorId string, input map[string]any) (bool, error) } // ApifyClient represents a client for the Apify API From cbf4ff45e4a70d46017471d0585a3f1d19a4ecfb Mon Sep 17 00:00:00 2001 From: grantdfoster Date: Thu, 18 Sep 2025 18:21:10 +0200 Subject: [PATCH 03/10] chore: cleans up actor imports --- internal/actors/actors.go | 11 ----------- internal/apify/actors.go | 19 +++++++++++++++++++ internal/capabilities/detector.go | 14 +++++++------- internal/jobs/llmapify/client.go | 4 ++-- internal/jobs/llmapify/client_test.go | 14 +++++++------- internal/jobs/redditapify/client.go | 4 ++-- internal/jobs/redditapify/client_test.go | 20 ++++++++++---------- internal/jobs/tiktokapify/client.go | 6 +++--- internal/jobs/twitterapify/client.go | 4 ++-- internal/jobs/webapify/client.go | 4 ++-- internal/jobs/webapify/client_test.go | 14 +++++++------- 11 files changed, 61 insertions(+), 53 deletions(-) delete mode 100644 internal/actors/actors.go create mode 100644 internal/apify/actors.go diff --git a/internal/actors/actors.go b/internal/actors/actors.go deleted file mode 100644 index 15d4d637..00000000 --- a/internal/actors/actors.go +++ /dev/null @@ -1,11 +0,0 @@ -package actors - -// Apify actors that are used in various job types -const ( - RedditScraper = "trudax~reddit-scraper" - TikTokSearchScraper = "epctex~tiktok-search-scraper" - TikTokTrendingScraper = "lexis-solutions~tiktok-trending-videos-scraper" - LLMDatasetProcessor = "dusan.vystrcil~llm-dataset-processor" - TwitterFollowers = "kaitoeasyapi~premium-x-follower-scraper-following-data" - WebScraper = "apify~website-content-crawler" -) diff --git a/internal/apify/actors.go b/internal/apify/actors.go new file mode 100644 index 00000000..e548ada3 --- /dev/null +++ b/internal/apify/actors.go @@ -0,0 +1,19 @@ +package apify + +type actorIds struct { + RedditScraper string + TikTokSearchScraper string + TikTokTrendingScraper string + LLMDatasetProcessor string + TwitterFollowers string + WebScraper string +} + +var Actors = actorIds{ + RedditScraper: "trudax~reddit-scraper", + TikTokSearchScraper: "epctex~tiktok-search-scraper", + TikTokTrendingScraper: "lexis-solutions~tiktok-trending-videos-scraper", + LLMDatasetProcessor: "dusan.vystrcil~llm-dataset-processor", + TwitterFollowers: "kaitoeasyapi~premium-x-follower-scraper-following-data", + WebScraper: "apify~website-content-crawler", +} diff --git a/internal/capabilities/detector.go b/internal/capabilities/detector.go index c025671f..3d8de4c6 100644 --- a/internal/capabilities/detector.go +++ b/internal/capabilities/detector.go @@ -8,7 +8,7 @@ import ( util "github.com/masa-finance/tee-types/pkg/util" teetypes "github.com/masa-finance/tee-types/types" - "github.com/masa-finance/tee-worker/internal/actors" + "github.com/masa-finance/tee-worker/internal/apify" "github.com/masa-finance/tee-worker/internal/config" "github.com/masa-finance/tee-worker/internal/jobs/twitter" "github.com/masa-finance/tee-worker/pkg/client" @@ -75,24 +75,24 @@ func DetectCapabilities(jc config.JobConfiguration, jobServer JobServerInterface logrus.Errorf("Failed to create Apify client for access probe: %v", err) } else { // Reddit access probe - if ok, _ := c.ProbeActorAccess(actors.RedditScraper, map[string]any{}); ok { + if ok, _ := c.ProbeActorAccess(apify.Actors.RedditScraper, map[string]any{}); ok { capabilities[teetypes.RedditJob] = teetypes.RedditCaps } else { - logrus.Warnf("Apify token does not have access to actor %s", actors.RedditScraper) + logrus.Warnf("Apify token does not have access to actor %s", apify.Actors.RedditScraper) } // TikTok probes (search and trending handled independently) tiktokCapSet := util.NewSet(capabilities[teetypes.TiktokJob]...) - if ok, _ := c.ProbeActorAccess(actors.TikTokSearchScraper, map[string]any{"proxy": map[string]any{"useApifyProxy": true}}); ok { + if ok, _ := c.ProbeActorAccess(apify.Actors.TikTokSearchScraper, map[string]any{"proxy": map[string]any{"useApifyProxy": true}}); ok { tiktokCapSet.Add(teetypes.CapSearchByQuery) } else { - logrus.Warnf("Apify token does not have access to actor %s", actors.TikTokSearchScraper) + logrus.Warnf("Apify token does not have access to actor %s", apify.Actors.TikTokSearchScraper) } - if ok, _ := c.ProbeActorAccess(actors.TikTokTrendingScraper, map[string]any{}); ok { + if ok, _ := c.ProbeActorAccess(apify.Actors.TikTokTrendingScraper, map[string]any{}); ok { tiktokCapSet.Add(teetypes.CapSearchByTrending) } else { - logrus.Warnf("Apify token does not have access to actor %s", actors.TikTokTrendingScraper) + logrus.Warnf("Apify token does not have access to actor %s", apify.Actors.TikTokTrendingScraper) } capabilities[teetypes.TiktokJob] = tiktokCapSet.Items() diff --git a/internal/jobs/llmapify/client.go b/internal/jobs/llmapify/client.go index 4d8f2169..dde3d8b3 100644 --- a/internal/jobs/llmapify/client.go +++ b/internal/jobs/llmapify/client.go @@ -7,7 +7,7 @@ import ( teeargs "github.com/masa-finance/tee-types/args" teetypes "github.com/masa-finance/tee-types/types" - "github.com/masa-finance/tee-worker/internal/actors" + "github.com/masa-finance/tee-worker/internal/apify" "github.com/masa-finance/tee-worker/internal/config" "github.com/masa-finance/tee-worker/internal/jobs/stats" "github.com/masa-finance/tee-worker/pkg/client" @@ -63,7 +63,7 @@ func (c *ApifyClient) Process(workerID string, args teeargs.LLMProcessorArgument input.LLMProviderApiKey = string(c.llmConfig.GeminiApiKey) limit := uint(args.Items) - dataset, nextCursor, err := c.client.RunActorAndGetResponse(actors.LLMDatasetProcessor, input, cursor, limit) + dataset, nextCursor, err := c.client.RunActorAndGetResponse(apify.Actors.LLMDatasetProcessor, input, cursor, limit) if err != nil { if c.statsCollector != nil { c.statsCollector.Add(workerID, stats.LLMErrors, 1) diff --git a/internal/jobs/llmapify/client_test.go b/internal/jobs/llmapify/client_test.go index bfc1d0ec..4ed7e463 100644 --- a/internal/jobs/llmapify/client_test.go +++ b/internal/jobs/llmapify/client_test.go @@ -10,7 +10,7 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" - "github.com/masa-finance/tee-worker/internal/actors" + "github.com/masa-finance/tee-worker/internal/apify" "github.com/masa-finance/tee-worker/internal/config" "github.com/masa-finance/tee-worker/internal/jobs/llmapify" "github.com/masa-finance/tee-worker/pkg/client" @@ -23,7 +23,7 @@ import ( type MockApifyClient struct { RunActorAndGetResponseFunc func(actorID string, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) ValidateApiKeyFunc func() error - ProbeActorAccessFunc func(actorID string, input map[string]any) (bool, error) + ProbeActorAccessFunc func(actorID string, input map[string]any) (bool, error) } func (m *MockApifyClient) RunActorAndGetResponse(actorID string, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { @@ -41,10 +41,10 @@ func (m *MockApifyClient) ValidateApiKey() error { } func (m *MockApifyClient) ProbeActorAccess(actorID string, input map[string]any) (bool, error) { - if m.ProbeActorAccessFunc != nil { - return m.ProbeActorAccessFunc(actorID, input) - } - return false, errors.New("ProbeActorAccessFunc not defined") + if m.ProbeActorAccessFunc != nil { + return m.ProbeActorAccessFunc(actorID, input) + } + return false, errors.New("ProbeActorAccessFunc not defined") } var _ = Describe("LLMApifyClient", func() { @@ -80,7 +80,7 @@ var _ = Describe("LLMApifyClient", func() { Expect(err).ToNot(HaveOccurred()) mockClient.RunActorAndGetResponseFunc = func(actorID string, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { - Expect(actorID).To(Equal(actors.LLMDatasetProcessor)) + Expect(actorID).To(Equal(apify.Actors.LLMDatasetProcessor)) Expect(limit).To(Equal(uint(1))) // Verify the input is correctly converted to LLMProcessorRequest diff --git a/internal/jobs/redditapify/client.go b/internal/jobs/redditapify/client.go index 456ef49b..a5084b3c 100644 --- a/internal/jobs/redditapify/client.go +++ b/internal/jobs/redditapify/client.go @@ -8,7 +8,7 @@ import ( "github.com/sirupsen/logrus" "github.com/masa-finance/tee-worker/api/types/reddit" - "github.com/masa-finance/tee-worker/internal/actors" + "github.com/masa-finance/tee-worker/internal/apify" "github.com/masa-finance/tee-worker/internal/jobs/stats" "github.com/masa-finance/tee-worker/pkg/client" @@ -164,7 +164,7 @@ func (c *RedditApifyClient) queryReddit(workerID string, input RedditActorReques c.statsCollector.Add(workerID, stats.RedditQueries, 1) } - dataset, nextCursor, err := c.apifyClient.RunActorAndGetResponse(actors.RedditScraper, input, cursor, limit) + dataset, nextCursor, err := c.apifyClient.RunActorAndGetResponse(apify.Actors.RedditScraper, input, cursor, limit) if err != nil { if c.statsCollector != nil { c.statsCollector.Add(workerID, stats.RedditErrors, 1) diff --git a/internal/jobs/redditapify/client_test.go b/internal/jobs/redditapify/client_test.go index 64141d44..dfa97efd 100644 --- a/internal/jobs/redditapify/client_test.go +++ b/internal/jobs/redditapify/client_test.go @@ -8,7 +8,7 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" - "github.com/masa-finance/tee-worker/internal/actors" + "github.com/masa-finance/tee-worker/internal/apify" "github.com/masa-finance/tee-worker/internal/jobs/redditapify" "github.com/masa-finance/tee-worker/pkg/client" @@ -20,7 +20,7 @@ import ( type MockApifyClient struct { RunActorAndGetResponseFunc func(actorID string, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) ValidateApiKeyFunc func() error - ProbeActorAccessFunc func(actorID string, input map[string]any) (bool, error) + ProbeActorAccessFunc func(actorID string, input map[string]any) (bool, error) } func (m *MockApifyClient) RunActorAndGetResponse(actorID string, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { @@ -38,10 +38,10 @@ func (m *MockApifyClient) ValidateApiKey() error { } func (m *MockApifyClient) ProbeActorAccess(actorID string, input map[string]any) (bool, error) { - if m.ProbeActorAccessFunc != nil { - return m.ProbeActorAccessFunc(actorID, input) - } - return false, errors.New("ProbeActorAccessFunc not defined") + if m.ProbeActorAccessFunc != nil { + return m.ProbeActorAccessFunc(actorID, input) + } + return false, errors.New("ProbeActorAccessFunc not defined") } var _ = Describe("RedditApifyClient", func() { @@ -68,7 +68,7 @@ var _ = Describe("RedditApifyClient", func() { args := redditapify.CommonArgs{MaxPosts: 10} mockClient.RunActorAndGetResponseFunc = func(actorID string, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { - Expect(actorID).To(Equal(actors.RedditScraper)) + Expect(actorID).To(Equal(apify.Actors.RedditScraper)) req := input.(redditapify.RedditActorRequest) Expect(req.StartUrls).To(Equal(urls)) Expect(*req.PostDateLimit).To(BeTemporally("~", after, time.Second)) @@ -93,7 +93,7 @@ var _ = Describe("RedditApifyClient", func() { args := redditapify.CommonArgs{MaxComments: 5} mockClient.RunActorAndGetResponseFunc = func(actorID string, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { - Expect(actorID).To(Equal(actors.RedditScraper)) + Expect(actorID).To(Equal(apify.Actors.RedditScraper)) req := input.(redditapify.RedditActorRequest) Expect(req.Searches).To(Equal(queries)) Expect(req.StartUrls).To(BeNil()) @@ -116,7 +116,7 @@ var _ = Describe("RedditApifyClient", func() { args := redditapify.CommonArgs{} mockClient.RunActorAndGetResponseFunc = func(actorID string, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { - Expect(actorID).To(Equal(actors.RedditScraper)) + Expect(actorID).To(Equal(apify.Actors.RedditScraper)) req := input.(redditapify.RedditActorRequest) Expect(req.Searches).To(Equal(queries)) Expect(req.StartUrls).To(BeNil()) @@ -136,7 +136,7 @@ var _ = Describe("RedditApifyClient", func() { args := redditapify.CommonArgs{} mockClient.RunActorAndGetResponseFunc = func(actorID string, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { - Expect(actorID).To(Equal(actors.RedditScraper)) + Expect(actorID).To(Equal(apify.Actors.RedditScraper)) req := input.(redditapify.RedditActorRequest) Expect(req.Searches).To(Equal(queries)) Expect(req.StartUrls).To(BeNil()) diff --git a/internal/jobs/tiktokapify/client.go b/internal/jobs/tiktokapify/client.go index 0502a561..91bfd89a 100644 --- a/internal/jobs/tiktokapify/client.go +++ b/internal/jobs/tiktokapify/client.go @@ -6,7 +6,7 @@ import ( teeargs "github.com/masa-finance/tee-types/args" teetypes "github.com/masa-finance/tee-types/types" - "github.com/masa-finance/tee-worker/internal/actors" + "github.com/masa-finance/tee-worker/internal/apify" "github.com/masa-finance/tee-worker/pkg/client" ) @@ -74,7 +74,7 @@ func (c *TikTokApifyClient) SearchByQuery(input teeargs.TikTokSearchByQueryArgum return nil, "", fmt.Errorf("failed to unmarshal to map: %w", err) } - dataset, next, err := c.apify.RunActorAndGetResponse(actors.TikTokSearchScraper, apifyInput, cursor, limit) + dataset, next, err := c.apify.RunActorAndGetResponse(apify.Actors.TikTokSearchScraper, apifyInput, cursor, limit) if err != nil { return nil, "", fmt.Errorf("apify run (search): %w", err) } @@ -110,7 +110,7 @@ func (c *TikTokApifyClient) SearchByTrending(input teeargs.TikTokSearchByTrendin return nil, "", fmt.Errorf("failed to unmarshal to map: %w", err) } - dataset, next, err := c.apify.RunActorAndGetResponse(actors.TikTokTrendingScraper, apifyInput, cursor, limit) + dataset, next, err := c.apify.RunActorAndGetResponse(apify.Actors.TikTokTrendingScraper, apifyInput, cursor, limit) if err != nil { return nil, "", fmt.Errorf("apify run (trending): %w", err) } diff --git a/internal/jobs/twitterapify/client.go b/internal/jobs/twitterapify/client.go index 6fe9eaa2..372702ea 100644 --- a/internal/jobs/twitterapify/client.go +++ b/internal/jobs/twitterapify/client.go @@ -6,7 +6,7 @@ import ( util "github.com/masa-finance/tee-types/pkg/util" teetypes "github.com/masa-finance/tee-types/types" - "github.com/masa-finance/tee-worker/internal/actors" + "github.com/masa-finance/tee-worker/internal/apify" "github.com/masa-finance/tee-worker/pkg/client" "github.com/sirupsen/logrus" ) @@ -83,7 +83,7 @@ func (c *TwitterApifyClient) GetFollowing(username string, cursor client.Cursor, // getProfiles runs the actor and retrieves profiles from the dataset func (c *TwitterApifyClient) getProfiles(input FollowerActorRunRequest, cursor client.Cursor, limit uint) ([]*teetypes.ProfileResultApify, client.Cursor, error) { - dataset, nextCursor, err := c.apifyClient.RunActorAndGetResponse(actors.TwitterFollowers, input, cursor, limit) + dataset, nextCursor, err := c.apifyClient.RunActorAndGetResponse(apify.Actors.TwitterFollowers, input, cursor, limit) if err != nil { return nil, client.EmptyCursor, err } diff --git a/internal/jobs/webapify/client.go b/internal/jobs/webapify/client.go index 508e27f8..a32b484d 100644 --- a/internal/jobs/webapify/client.go +++ b/internal/jobs/webapify/client.go @@ -6,7 +6,7 @@ import ( teeargs "github.com/masa-finance/tee-types/args" teetypes "github.com/masa-finance/tee-types/types" - "github.com/masa-finance/tee-worker/internal/actors" + "github.com/masa-finance/tee-worker/internal/apify" "github.com/masa-finance/tee-worker/internal/jobs/stats" "github.com/masa-finance/tee-worker/pkg/client" "github.com/sirupsen/logrus" @@ -49,7 +49,7 @@ func (c *ApifyClient) Scrape(workerID string, args teeargs.WebArguments, cursor input := args.ToWebScraperRequest() limit := uint(args.MaxPages) - dataset, nextCursor, err := c.client.RunActorAndGetResponse(actors.WebScraper, input, cursor, limit) + dataset, nextCursor, err := c.client.RunActorAndGetResponse(apify.Actors.WebScraper, input, cursor, limit) if err != nil { if c.statsCollector != nil { c.statsCollector.Add(workerID, stats.WebErrors, 1) diff --git a/internal/jobs/webapify/client_test.go b/internal/jobs/webapify/client_test.go index 65363611..78bc0fe6 100644 --- a/internal/jobs/webapify/client_test.go +++ b/internal/jobs/webapify/client_test.go @@ -8,7 +8,7 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" - "github.com/masa-finance/tee-worker/internal/actors" + "github.com/masa-finance/tee-worker/internal/apify" "github.com/masa-finance/tee-worker/internal/jobs/webapify" "github.com/masa-finance/tee-worker/pkg/client" @@ -19,7 +19,7 @@ import ( type MockApifyClient struct { RunActorAndGetResponseFunc func(actorID string, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) ValidateApiKeyFunc func() error - ProbeActorAccessFunc func(actorID string, input map[string]any) (bool, error) + ProbeActorAccessFunc func(actorID string, input map[string]any) (bool, error) } func (m *MockApifyClient) RunActorAndGetResponse(actorID string, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { @@ -37,10 +37,10 @@ func (m *MockApifyClient) ValidateApiKey() error { } func (m *MockApifyClient) ProbeActorAccess(actorID string, input map[string]any) (bool, error) { - if m.ProbeActorAccessFunc != nil { - return m.ProbeActorAccessFunc(actorID, input) - } - return false, errors.New("ProbeActorAccessFunc not defined") + if m.ProbeActorAccessFunc != nil { + return m.ProbeActorAccessFunc(actorID, input) + } + return false, errors.New("ProbeActorAccessFunc not defined") } var _ = Describe("WebApifyClient", func() { @@ -73,7 +73,7 @@ var _ = Describe("WebApifyClient", func() { } mockClient.RunActorAndGetResponseFunc = func(actorID string, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { - Expect(actorID).To(Equal(actors.WebScraper)) + Expect(actorID).To(Equal(apify.Actors.WebScraper)) Expect(limit).To(Equal(uint(2))) return &client.DatasetResponse{Data: client.ApifyDatasetData{Items: []json.RawMessage{}}}, "next", nil } From 297fb5a95fd4adaca7b5f9e887c8b15466896b6f Mon Sep 17 00:00:00 2001 From: grantdfoster Date: Thu, 18 Sep 2025 18:52:40 +0200 Subject: [PATCH 04/10] feat: update actors config --- internal/apify/actors.go | 69 +++++++++++++++++++++--- internal/capabilities/detector.go | 12 ++--- internal/jobs/llmapify/client.go | 2 +- internal/jobs/llmapify/client_test.go | 22 ++++---- internal/jobs/redditapify/client.go | 2 +- internal/jobs/redditapify/client_test.go | 30 +++++------ internal/jobs/tiktokapify/client.go | 4 +- internal/jobs/twitterapify/client.go | 2 +- internal/jobs/webapify/client.go | 2 +- internal/jobs/webapify/client_test.go | 18 +++---- pkg/client/apify_client.go | 11 ++-- 11 files changed, 115 insertions(+), 59 deletions(-) diff --git a/internal/apify/actors.go b/internal/apify/actors.go index e548ada3..7131ed22 100644 --- a/internal/apify/actors.go +++ b/internal/apify/actors.go @@ -1,15 +1,21 @@ package apify +import teetypes "github.com/masa-finance/tee-types/types" + +type ActorId string + +type defaultActorInput map[string]any + type actorIds struct { - RedditScraper string - TikTokSearchScraper string - TikTokTrendingScraper string - LLMDatasetProcessor string - TwitterFollowers string - WebScraper string + RedditScraper ActorId + TikTokSearchScraper ActorId + TikTokTrendingScraper ActorId + LLMDatasetProcessor ActorId + TwitterFollowers ActorId + WebScraper ActorId } -var Actors = actorIds{ +var ActorIds = actorIds{ RedditScraper: "trudax~reddit-scraper", TikTokSearchScraper: "epctex~tiktok-search-scraper", TikTokTrendingScraper: "lexis-solutions~tiktok-trending-videos-scraper", @@ -17,3 +23,52 @@ var Actors = actorIds{ TwitterFollowers: "kaitoeasyapi~premium-x-follower-scraper-following-data", WebScraper: "apify~website-content-crawler", } + +var ( + rentedActorIds = []ActorId{ + ActorIds.RedditScraper, + ActorIds.TikTokSearchScraper, + ActorIds.TikTokTrendingScraper, + } +) + +type ActorConfig struct { + ActorId ActorId + Input defaultActorInput + Capabilities []teetypes.Capability + JobType teetypes.JobType +} + +// Actors is a list of actor configurations for Apify. Omitting LLM for now as it's not a standalone actor / has no dedicated capabilities +var Actors = []ActorConfig{ + { + ActorId: ActorIds.RedditScraper, + Input: defaultActorInput{}, + Capabilities: []teetypes.Capability{teetypes.CapScrapeUrls}, + JobType: teetypes.RedditJob, + }, + { + ActorId: ActorIds.TikTokSearchScraper, + Input: defaultActorInput{"proxy": map[string]any{"useApifyProxy": true}}, + Capabilities: []teetypes.Capability{teetypes.CapSearchByQuery}, + JobType: teetypes.TiktokJob, + }, + { + ActorId: ActorIds.TikTokTrendingScraper, + Input: defaultActorInput{}, + Capabilities: []teetypes.Capability{teetypes.CapSearchByTrending}, + JobType: teetypes.TiktokJob, + }, + { + ActorId: ActorIds.TwitterFollowers, + Input: defaultActorInput{}, + Capabilities: teetypes.TwitterApifyCaps, + JobType: teetypes.TwitterApifyJob, + }, + { + ActorId: ActorIds.WebScraper, + Input: defaultActorInput{}, + Capabilities: []teetypes.Capability{teetypes.CapScraper}, + JobType: teetypes.WebJob, + }, +} diff --git a/internal/capabilities/detector.go b/internal/capabilities/detector.go index 3d8de4c6..401ef701 100644 --- a/internal/capabilities/detector.go +++ b/internal/capabilities/detector.go @@ -75,24 +75,24 @@ func DetectCapabilities(jc config.JobConfiguration, jobServer JobServerInterface logrus.Errorf("Failed to create Apify client for access probe: %v", err) } else { // Reddit access probe - if ok, _ := c.ProbeActorAccess(apify.Actors.RedditScraper, map[string]any{}); ok { + if ok, _ := c.ProbeActorAccess(apify.ActorIds.RedditScraper, map[string]any{}); ok { capabilities[teetypes.RedditJob] = teetypes.RedditCaps } else { - logrus.Warnf("Apify token does not have access to actor %s", apify.Actors.RedditScraper) + logrus.Warnf("Apify token does not have access to actor %s", apify.ActorIds.RedditScraper) } // TikTok probes (search and trending handled independently) tiktokCapSet := util.NewSet(capabilities[teetypes.TiktokJob]...) - if ok, _ := c.ProbeActorAccess(apify.Actors.TikTokSearchScraper, map[string]any{"proxy": map[string]any{"useApifyProxy": true}}); ok { + if ok, _ := c.ProbeActorAccess(apify.ActorIds.TikTokSearchScraper, map[string]any{"proxy": map[string]any{"useApifyProxy": true}}); ok { tiktokCapSet.Add(teetypes.CapSearchByQuery) } else { - logrus.Warnf("Apify token does not have access to actor %s", apify.Actors.TikTokSearchScraper) + logrus.Warnf("Apify token does not have access to actor %s", apify.ActorIds.TikTokSearchScraper) } - if ok, _ := c.ProbeActorAccess(apify.Actors.TikTokTrendingScraper, map[string]any{}); ok { + if ok, _ := c.ProbeActorAccess(apify.ActorIds.TikTokTrendingScraper, map[string]any{}); ok { tiktokCapSet.Add(teetypes.CapSearchByTrending) } else { - logrus.Warnf("Apify token does not have access to actor %s", apify.Actors.TikTokTrendingScraper) + logrus.Warnf("Apify token does not have access to actor %s", apify.ActorIds.TikTokTrendingScraper) } capabilities[teetypes.TiktokJob] = tiktokCapSet.Items() diff --git a/internal/jobs/llmapify/client.go b/internal/jobs/llmapify/client.go index dde3d8b3..554178e9 100644 --- a/internal/jobs/llmapify/client.go +++ b/internal/jobs/llmapify/client.go @@ -63,7 +63,7 @@ func (c *ApifyClient) Process(workerID string, args teeargs.LLMProcessorArgument input.LLMProviderApiKey = string(c.llmConfig.GeminiApiKey) limit := uint(args.Items) - dataset, nextCursor, err := c.client.RunActorAndGetResponse(apify.Actors.LLMDatasetProcessor, input, cursor, limit) + dataset, nextCursor, err := c.client.RunActorAndGetResponse(apify.ActorIds.LLMDatasetProcessor, input, cursor, limit) if err != nil { if c.statsCollector != nil { c.statsCollector.Add(workerID, stats.LLMErrors, 1) diff --git a/internal/jobs/llmapify/client_test.go b/internal/jobs/llmapify/client_test.go index 4ed7e463..0ee0f6f6 100644 --- a/internal/jobs/llmapify/client_test.go +++ b/internal/jobs/llmapify/client_test.go @@ -21,12 +21,12 @@ import ( // MockApifyClient is a mock implementation of the ApifyClient. type MockApifyClient struct { - RunActorAndGetResponseFunc func(actorID string, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) + RunActorAndGetResponseFunc func(actorID apify.ActorId, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) ValidateApiKeyFunc func() error - ProbeActorAccessFunc func(actorID string, input map[string]any) (bool, error) + ProbeActorAccessFunc func(actorID apify.ActorId, input map[string]any) (bool, error) } -func (m *MockApifyClient) RunActorAndGetResponse(actorID string, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { +func (m *MockApifyClient) RunActorAndGetResponse(actorID apify.ActorId, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { if m.RunActorAndGetResponseFunc != nil { return m.RunActorAndGetResponseFunc(actorID, input, cursor, limit) } @@ -40,7 +40,7 @@ func (m *MockApifyClient) ValidateApiKey() error { return errors.New("ValidateApiKeyFunc not defined") } -func (m *MockApifyClient) ProbeActorAccess(actorID string, input map[string]any) (bool, error) { +func (m *MockApifyClient) ProbeActorAccess(actorID apify.ActorId, input map[string]any) (bool, error) { if m.ProbeActorAccessFunc != nil { return m.ProbeActorAccessFunc(actorID, input) } @@ -79,8 +79,8 @@ var _ = Describe("LLMApifyClient", func() { err = json.Unmarshal(jsonData, &args) Expect(err).ToNot(HaveOccurred()) - mockClient.RunActorAndGetResponseFunc = func(actorID string, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { - Expect(actorID).To(Equal(apify.Actors.LLMDatasetProcessor)) + mockClient.RunActorAndGetResponseFunc = func(actorID apify.ActorId, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { + Expect(actorID).To(Equal(apify.ActorIds.LLMDatasetProcessor)) Expect(limit).To(Equal(uint(1))) // Verify the input is correctly converted to LLMProcessorRequest @@ -103,7 +103,7 @@ var _ = Describe("LLMApifyClient", func() { It("should handle errors from the apify client", func() { expectedErr := errors.New("apify error") - mockClient.RunActorAndGetResponseFunc = func(actorID string, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { + mockClient.RunActorAndGetResponseFunc = func(actorID apify.ActorId, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { return nil, "", expectedErr } @@ -122,7 +122,7 @@ var _ = Describe("LLMApifyClient", func() { Items: []json.RawMessage{invalidJSON}, }, } - mockClient.RunActorAndGetResponseFunc = func(actorID string, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { + mockClient.RunActorAndGetResponseFunc = func(actorID apify.ActorId, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { return dataset, "next", nil } @@ -144,7 +144,7 @@ var _ = Describe("LLMApifyClient", func() { Items: []json.RawMessage{llmResultJSON}, }, } - mockClient.RunActorAndGetResponseFunc = func(actorID string, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { + mockClient.RunActorAndGetResponseFunc = func(actorID apify.ActorId, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { return dataset, "next", nil } @@ -171,7 +171,7 @@ var _ = Describe("LLMApifyClient", func() { Items: []json.RawMessage{llmResult1, llmResult2}, }, } - mockClient.RunActorAndGetResponseFunc = func(actorID string, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { + mockClient.RunActorAndGetResponseFunc = func(actorID apify.ActorId, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { return dataset, "next", nil } @@ -194,7 +194,7 @@ var _ = Describe("LLMApifyClient", func() { Temperature: 0.5, } - mockClient.RunActorAndGetResponseFunc = func(actorID string, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { + mockClient.RunActorAndGetResponseFunc = func(actorID apify.ActorId, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { request, ok := input.(teetypes.LLMProcessorRequest) Expect(ok).To(BeTrue()) Expect(request.MaxTokens).To(Equal(uint(500))) diff --git a/internal/jobs/redditapify/client.go b/internal/jobs/redditapify/client.go index a5084b3c..e90e7e75 100644 --- a/internal/jobs/redditapify/client.go +++ b/internal/jobs/redditapify/client.go @@ -164,7 +164,7 @@ func (c *RedditApifyClient) queryReddit(workerID string, input RedditActorReques c.statsCollector.Add(workerID, stats.RedditQueries, 1) } - dataset, nextCursor, err := c.apifyClient.RunActorAndGetResponse(apify.Actors.RedditScraper, input, cursor, limit) + dataset, nextCursor, err := c.apifyClient.RunActorAndGetResponse(apify.ActorIds.RedditScraper, input, cursor, limit) if err != nil { if c.statsCollector != nil { c.statsCollector.Add(workerID, stats.RedditErrors, 1) diff --git a/internal/jobs/redditapify/client_test.go b/internal/jobs/redditapify/client_test.go index dfa97efd..1712157e 100644 --- a/internal/jobs/redditapify/client_test.go +++ b/internal/jobs/redditapify/client_test.go @@ -18,12 +18,12 @@ import ( // MockApifyClient is a mock implementation of the ApifyClient. type MockApifyClient struct { - RunActorAndGetResponseFunc func(actorID string, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) + RunActorAndGetResponseFunc func(actorID apify.ActorId, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) ValidateApiKeyFunc func() error - ProbeActorAccessFunc func(actorID string, input map[string]any) (bool, error) + ProbeActorAccessFunc func(actorID apify.ActorId, input map[string]any) (bool, error) } -func (m *MockApifyClient) RunActorAndGetResponse(actorID string, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { +func (m *MockApifyClient) RunActorAndGetResponse(actorID apify.ActorId, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { if m.RunActorAndGetResponseFunc != nil { return m.RunActorAndGetResponseFunc(actorID, input, cursor, limit) } @@ -37,7 +37,7 @@ func (m *MockApifyClient) ValidateApiKey() error { return errors.New("ValidateApiKeyFunc not defined") } -func (m *MockApifyClient) ProbeActorAccess(actorID string, input map[string]any) (bool, error) { +func (m *MockApifyClient) ProbeActorAccess(actorID apify.ActorId, input map[string]any) (bool, error) { if m.ProbeActorAccessFunc != nil { return m.ProbeActorAccessFunc(actorID, input) } @@ -67,8 +67,8 @@ var _ = Describe("RedditApifyClient", func() { after := time.Now() args := redditapify.CommonArgs{MaxPosts: 10} - mockClient.RunActorAndGetResponseFunc = func(actorID string, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { - Expect(actorID).To(Equal(apify.Actors.RedditScraper)) + mockClient.RunActorAndGetResponseFunc = func(actorID apify.ActorId, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { + Expect(actorID).To(Equal(apify.ActorIds.RedditScraper)) req := input.(redditapify.RedditActorRequest) Expect(req.StartUrls).To(Equal(urls)) Expect(*req.PostDateLimit).To(BeTemporally("~", after, time.Second)) @@ -92,8 +92,8 @@ var _ = Describe("RedditApifyClient", func() { after := time.Now() args := redditapify.CommonArgs{MaxComments: 5} - mockClient.RunActorAndGetResponseFunc = func(actorID string, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { - Expect(actorID).To(Equal(apify.Actors.RedditScraper)) + mockClient.RunActorAndGetResponseFunc = func(actorID apify.ActorId, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { + Expect(actorID).To(Equal(apify.ActorIds.RedditScraper)) req := input.(redditapify.RedditActorRequest) Expect(req.Searches).To(Equal(queries)) Expect(req.StartUrls).To(BeNil()) @@ -115,8 +115,8 @@ var _ = Describe("RedditApifyClient", func() { queries := []string{"golang"} args := redditapify.CommonArgs{} - mockClient.RunActorAndGetResponseFunc = func(actorID string, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { - Expect(actorID).To(Equal(apify.Actors.RedditScraper)) + mockClient.RunActorAndGetResponseFunc = func(actorID apify.ActorId, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { + Expect(actorID).To(Equal(apify.ActorIds.RedditScraper)) req := input.(redditapify.RedditActorRequest) Expect(req.Searches).To(Equal(queries)) Expect(req.StartUrls).To(BeNil()) @@ -135,8 +135,8 @@ var _ = Describe("RedditApifyClient", func() { queries := []string{"gopher"} args := redditapify.CommonArgs{} - mockClient.RunActorAndGetResponseFunc = func(actorID string, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { - Expect(actorID).To(Equal(apify.Actors.RedditScraper)) + mockClient.RunActorAndGetResponseFunc = func(actorID apify.ActorId, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { + Expect(actorID).To(Equal(apify.ActorIds.RedditScraper)) req := input.(redditapify.RedditActorRequest) Expect(req.Searches).To(Equal(queries)) Expect(req.StartUrls).To(BeNil()) @@ -154,7 +154,7 @@ var _ = Describe("RedditApifyClient", func() { Context("queryReddit", func() { It("should handle errors from the apify client", func() { expectedErr := errors.New("apify error") - mockClient.RunActorAndGetResponseFunc = func(actorID string, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { + mockClient.RunActorAndGetResponseFunc = func(actorID apify.ActorId, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { return nil, "", expectedErr } _, _, err := redditClient.SearchUsers("", []string{"test"}, false, redditapify.CommonArgs{}, "", 10) @@ -168,7 +168,7 @@ var _ = Describe("RedditApifyClient", func() { Items: []json.RawMessage{invalidJSON}, }, } - mockClient.RunActorAndGetResponseFunc = func(actorID string, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { + mockClient.RunActorAndGetResponseFunc = func(actorID apify.ActorId, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { return dataset, "next", nil } @@ -186,7 +186,7 @@ var _ = Describe("RedditApifyClient", func() { Items: []json.RawMessage{userJSON}, }, } - mockClient.RunActorAndGetResponseFunc = func(actorID string, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { + mockClient.RunActorAndGetResponseFunc = func(actorID apify.ActorId, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { return dataset, "next", nil } diff --git a/internal/jobs/tiktokapify/client.go b/internal/jobs/tiktokapify/client.go index 91bfd89a..6ad22f24 100644 --- a/internal/jobs/tiktokapify/client.go +++ b/internal/jobs/tiktokapify/client.go @@ -74,7 +74,7 @@ func (c *TikTokApifyClient) SearchByQuery(input teeargs.TikTokSearchByQueryArgum return nil, "", fmt.Errorf("failed to unmarshal to map: %w", err) } - dataset, next, err := c.apify.RunActorAndGetResponse(apify.Actors.TikTokSearchScraper, apifyInput, cursor, limit) + dataset, next, err := c.apify.RunActorAndGetResponse(apify.ActorIds.TikTokSearchScraper, apifyInput, cursor, limit) if err != nil { return nil, "", fmt.Errorf("apify run (search): %w", err) } @@ -110,7 +110,7 @@ func (c *TikTokApifyClient) SearchByTrending(input teeargs.TikTokSearchByTrendin return nil, "", fmt.Errorf("failed to unmarshal to map: %w", err) } - dataset, next, err := c.apify.RunActorAndGetResponse(apify.Actors.TikTokTrendingScraper, apifyInput, cursor, limit) + dataset, next, err := c.apify.RunActorAndGetResponse(apify.ActorIds.TikTokTrendingScraper, apifyInput, cursor, limit) if err != nil { return nil, "", fmt.Errorf("apify run (trending): %w", err) } diff --git a/internal/jobs/twitterapify/client.go b/internal/jobs/twitterapify/client.go index 372702ea..cbdddca5 100644 --- a/internal/jobs/twitterapify/client.go +++ b/internal/jobs/twitterapify/client.go @@ -83,7 +83,7 @@ func (c *TwitterApifyClient) GetFollowing(username string, cursor client.Cursor, // getProfiles runs the actor and retrieves profiles from the dataset func (c *TwitterApifyClient) getProfiles(input FollowerActorRunRequest, cursor client.Cursor, limit uint) ([]*teetypes.ProfileResultApify, client.Cursor, error) { - dataset, nextCursor, err := c.apifyClient.RunActorAndGetResponse(apify.Actors.TwitterFollowers, input, cursor, limit) + dataset, nextCursor, err := c.apifyClient.RunActorAndGetResponse(apify.ActorIds.TwitterFollowers, input, cursor, limit) if err != nil { return nil, client.EmptyCursor, err } diff --git a/internal/jobs/webapify/client.go b/internal/jobs/webapify/client.go index a32b484d..e59d550d 100644 --- a/internal/jobs/webapify/client.go +++ b/internal/jobs/webapify/client.go @@ -49,7 +49,7 @@ func (c *ApifyClient) Scrape(workerID string, args teeargs.WebArguments, cursor input := args.ToWebScraperRequest() limit := uint(args.MaxPages) - dataset, nextCursor, err := c.client.RunActorAndGetResponse(apify.Actors.WebScraper, input, cursor, limit) + dataset, nextCursor, err := c.client.RunActorAndGetResponse(apify.ActorIds.WebScraper, input, cursor, limit) if err != nil { if c.statsCollector != nil { c.statsCollector.Add(workerID, stats.WebErrors, 1) diff --git a/internal/jobs/webapify/client_test.go b/internal/jobs/webapify/client_test.go index 78bc0fe6..a23867dd 100644 --- a/internal/jobs/webapify/client_test.go +++ b/internal/jobs/webapify/client_test.go @@ -17,12 +17,12 @@ import ( // MockApifyClient is a mock implementation of the ApifyClient. type MockApifyClient struct { - RunActorAndGetResponseFunc func(actorID string, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) + RunActorAndGetResponseFunc func(actorID apify.ActorId, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) ValidateApiKeyFunc func() error - ProbeActorAccessFunc func(actorID string, input map[string]any) (bool, error) + ProbeActorAccessFunc func(actorID apify.ActorId, input map[string]any) (bool, error) } -func (m *MockApifyClient) RunActorAndGetResponse(actorID string, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { +func (m *MockApifyClient) RunActorAndGetResponse(actorID apify.ActorId, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { if m.RunActorAndGetResponseFunc != nil { return m.RunActorAndGetResponseFunc(actorID, input, cursor, limit) } @@ -36,7 +36,7 @@ func (m *MockApifyClient) ValidateApiKey() error { return errors.New("ValidateApiKeyFunc not defined") } -func (m *MockApifyClient) ProbeActorAccess(actorID string, input map[string]any) (bool, error) { +func (m *MockApifyClient) ProbeActorAccess(actorID apify.ActorId, input map[string]any) (bool, error) { if m.ProbeActorAccessFunc != nil { return m.ProbeActorAccessFunc(actorID, input) } @@ -72,8 +72,8 @@ var _ = Describe("WebApifyClient", func() { MaxPages: 2, } - mockClient.RunActorAndGetResponseFunc = func(actorID string, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { - Expect(actorID).To(Equal(apify.Actors.WebScraper)) + mockClient.RunActorAndGetResponseFunc = func(actorID apify.ActorId, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { + Expect(actorID).To(Equal(apify.ActorIds.WebScraper)) Expect(limit).To(Equal(uint(2))) return &client.DatasetResponse{Data: client.ApifyDatasetData{Items: []json.RawMessage{}}}, "next", nil } @@ -84,7 +84,7 @@ var _ = Describe("WebApifyClient", func() { It("should handle errors from the apify client", func() { expectedErr := errors.New("apify error") - mockClient.RunActorAndGetResponseFunc = func(actorID string, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { + mockClient.RunActorAndGetResponseFunc = func(actorID apify.ActorId, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { return nil, "", expectedErr } @@ -104,7 +104,7 @@ var _ = Describe("WebApifyClient", func() { Items: []json.RawMessage{invalidJSON}, }, } - mockClient.RunActorAndGetResponseFunc = func(actorID string, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { + mockClient.RunActorAndGetResponseFunc = func(actorID apify.ActorId, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { return dataset, "next", nil } @@ -129,7 +129,7 @@ var _ = Describe("WebApifyClient", func() { Items: []json.RawMessage{webResultJSON}, }, } - mockClient.RunActorAndGetResponseFunc = func(actorID string, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { + mockClient.RunActorAndGetResponseFunc = func(actorID apify.ActorId, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { return dataset, "next", nil } diff --git a/pkg/client/apify_client.go b/pkg/client/apify_client.go index e3d67c71..fafad74f 100644 --- a/pkg/client/apify_client.go +++ b/pkg/client/apify_client.go @@ -10,6 +10,7 @@ import ( "net/http" "time" + "github.com/masa-finance/tee-worker/internal/apify" "github.com/sirupsen/logrus" ) @@ -26,9 +27,9 @@ const ( // Apify provides an interface for interacting with the Apify API. type Apify interface { - RunActorAndGetResponse(actorId string, input any, cursor Cursor, limit uint) (*DatasetResponse, Cursor, error) + RunActorAndGetResponse(actorId apify.ActorId, input any, cursor Cursor, limit uint) (*DatasetResponse, Cursor, error) ValidateApiKey() error - ProbeActorAccess(actorId string, input map[string]any) (bool, error) + ProbeActorAccess(actorId apify.ActorId, input map[string]any) (bool, error) } // ApifyClient represents a client for the Apify API @@ -123,7 +124,7 @@ func (c *ApifyClient) AbortActorRun(runId string) error { // ProbeActorAccess attempts to start a run and immediately abort to verify access // Returns true if the token can start the actor (permission/rental present) // Some actors require a default input to be provided -func (c *ApifyClient) ProbeActorAccess(actorId string, input map[string]any) (bool, error) { +func (c *ApifyClient) ProbeActorAccess(actorId apify.ActorId, input map[string]any) (bool, error) { // Use empty input; most actors accept defaults. We do not wait for finish. runResp, err := c.RunActor(actorId, input) if err != nil { @@ -141,7 +142,7 @@ func (c *ApifyClient) ProbeActorAccess(actorId string, input map[string]any) (bo } // RunActor runs an actor with the given input -func (c *ApifyClient) RunActor(actorId string, input any) (*ActorRunResponse, error) { +func (c *ApifyClient) RunActor(actorId apify.ActorId, input any) (*ActorRunResponse, error) { url := fmt.Sprintf("%s/acts/%s/runs?token=%s", c.baseUrl, actorId, c.apiToken) logrus.Infof("Running actor %s", actorId) @@ -335,7 +336,7 @@ var ( ) // runActorAndGetProfiles runs the actor and retrieves profiles from the dataset -func (c *ApifyClient) RunActorAndGetResponse(actorId string, input any, cursor Cursor, limit uint) (*DatasetResponse, Cursor, error) { +func (c *ApifyClient) RunActorAndGetResponse(actorId apify.ActorId, input any, cursor Cursor, limit uint) (*DatasetResponse, Cursor, error) { var offset uint if cursor != EmptyCursor { offset = parseCursor(cursor) From 17d2c2c0225eb56e70ad0f34c2aebe86313d3318 Mon Sep 17 00:00:00 2001 From: grantdfoster Date: Thu, 18 Sep 2025 19:03:47 +0200 Subject: [PATCH 05/10] fix: loops over actors for readiness --- internal/apify/actors.go | 26 +++++++++----------------- 1 file changed, 9 insertions(+), 17 deletions(-) diff --git a/internal/apify/actors.go b/internal/apify/actors.go index 7131ed22..c9cf0134 100644 --- a/internal/apify/actors.go +++ b/internal/apify/actors.go @@ -4,8 +4,6 @@ import teetypes "github.com/masa-finance/tee-types/types" type ActorId string -type defaultActorInput map[string]any - type actorIds struct { RedditScraper ActorId TikTokSearchScraper ActorId @@ -24,17 +22,11 @@ var ActorIds = actorIds{ WebScraper: "apify~website-content-crawler", } -var ( - rentedActorIds = []ActorId{ - ActorIds.RedditScraper, - ActorIds.TikTokSearchScraper, - ActorIds.TikTokTrendingScraper, - } -) +type defaultActorInput map[string]any type ActorConfig struct { ActorId ActorId - Input defaultActorInput + DefaultInput defaultActorInput Capabilities []teetypes.Capability JobType teetypes.JobType } @@ -43,32 +35,32 @@ type ActorConfig struct { var Actors = []ActorConfig{ { ActorId: ActorIds.RedditScraper, - Input: defaultActorInput{}, - Capabilities: []teetypes.Capability{teetypes.CapScrapeUrls}, + DefaultInput: defaultActorInput{}, + Capabilities: teetypes.RedditCaps, JobType: teetypes.RedditJob, }, { ActorId: ActorIds.TikTokSearchScraper, - Input: defaultActorInput{"proxy": map[string]any{"useApifyProxy": true}}, + DefaultInput: defaultActorInput{"proxy": map[string]any{"useApifyProxy": true}}, Capabilities: []teetypes.Capability{teetypes.CapSearchByQuery}, JobType: teetypes.TiktokJob, }, { ActorId: ActorIds.TikTokTrendingScraper, - Input: defaultActorInput{}, + DefaultInput: defaultActorInput{}, Capabilities: []teetypes.Capability{teetypes.CapSearchByTrending}, JobType: teetypes.TiktokJob, }, { ActorId: ActorIds.TwitterFollowers, - Input: defaultActorInput{}, + DefaultInput: defaultActorInput{}, Capabilities: teetypes.TwitterApifyCaps, JobType: teetypes.TwitterApifyJob, }, { ActorId: ActorIds.WebScraper, - Input: defaultActorInput{}, - Capabilities: []teetypes.Capability{teetypes.CapScraper}, + DefaultInput: defaultActorInput{}, + Capabilities: teetypes.WebCaps, JobType: teetypes.WebJob, }, } From c7c9ccc97185659ebdd21aab4db1e1e9a7d0a0db Mon Sep 17 00:00:00 2001 From: grantdfoster Date: Thu, 18 Sep 2025 19:11:27 +0200 Subject: [PATCH 06/10] fix: loop actually implemented --- internal/capabilities/detector.go | 54 ++++++++++++++----------------- 1 file changed, 24 insertions(+), 30 deletions(-) diff --git a/internal/capabilities/detector.go b/internal/capabilities/detector.go index 401ef701..600b02e9 100644 --- a/internal/capabilities/detector.go +++ b/internal/capabilities/detector.go @@ -64,43 +64,37 @@ func DetectCapabilities(jc config.JobConfiguration, jobServer JobServerInterface capabilities[teetypes.TwitterApiJob] = apiCaps } - // Add Apify-specific capabilities based on available API key if hasApifyKey { - // Add default Apify capabilities - capabilities[teetypes.TwitterApifyJob] = teetypes.TwitterApifyCaps - - // Create an Apify client for probing rented actors + // Create an Apify client for probing actors c, err := client.NewApifyClient(apifyApiKey) if err != nil { - logrus.Errorf("Failed to create Apify client for access probe: %v", err) + logrus.Errorf("Failed to create Apify client for access probes: %v", err) } else { - // Reddit access probe - if ok, _ := c.ProbeActorAccess(apify.ActorIds.RedditScraper, map[string]any{}); ok { - capabilities[teetypes.RedditJob] = teetypes.RedditCaps - } else { - logrus.Warnf("Apify token does not have access to actor %s", apify.ActorIds.RedditScraper) + // Aggregate capabilities per job from accessible actors + jobToSet := map[teetypes.JobType]*util.Set[teetypes.Capability]{} + + for _, actor := range apify.Actors { + // Web requires a valid Gemini API key + if actor.JobType == teetypes.WebJob && !hasLLMKey { + logrus.Debug("Skipping Web actor due to missing Gemini key") + continue + } + + if ok, _ := c.ProbeActorAccess(actor.ActorId, map[string]any(actor.DefaultInput)); ok { + if _, exists := jobToSet[actor.JobType]; !exists { + jobToSet[actor.JobType] = util.NewSet[teetypes.Capability]() + } + jobToSet[actor.JobType].Add(actor.Capabilities...) + } else { + logrus.Warnf("Apify token does not have access to actor %s", actor.ActorId) + } } - // TikTok probes (search and trending handled independently) - tiktokCapSet := util.NewSet(capabilities[teetypes.TiktokJob]...) - - if ok, _ := c.ProbeActorAccess(apify.ActorIds.TikTokSearchScraper, map[string]any{"proxy": map[string]any{"useApifyProxy": true}}); ok { - tiktokCapSet.Add(teetypes.CapSearchByQuery) - } else { - logrus.Warnf("Apify token does not have access to actor %s", apify.ActorIds.TikTokSearchScraper) + // Union accessible-actor caps into existing caps + for job, set := range jobToSet { + existingCaps := util.NewSet(capabilities[job]...) + capabilities[job] = existingCaps.Add(set.Items()...).Items() } - if ok, _ := c.ProbeActorAccess(apify.ActorIds.TikTokTrendingScraper, map[string]any{}); ok { - tiktokCapSet.Add(teetypes.CapSearchByTrending) - } else { - logrus.Warnf("Apify token does not have access to actor %s", apify.ActorIds.TikTokTrendingScraper) - } - - capabilities[teetypes.TiktokJob] = tiktokCapSet.Items() - - } - - if hasLLMKey { - capabilities[teetypes.WebJob] = teetypes.WebCaps } } From bf522bfe9fdb150512756d566262f6be231efee6 Mon Sep 17 00:00:00 2001 From: grantdfoster Date: Thu, 18 Sep 2025 19:21:48 +0200 Subject: [PATCH 07/10] fix: default inputs --- internal/apify/actors.go | 2 +- internal/capabilities/detector_test.go | 10 +++++++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/internal/apify/actors.go b/internal/apify/actors.go index c9cf0134..30b0fc71 100644 --- a/internal/apify/actors.go +++ b/internal/apify/actors.go @@ -53,7 +53,7 @@ var Actors = []ActorConfig{ }, { ActorId: ActorIds.TwitterFollowers, - DefaultInput: defaultActorInput{}, + DefaultInput: defaultActorInput{"maxFollowers": 200, "maxFollowings": 200}, Capabilities: teetypes.TwitterApifyCaps, JobType: teetypes.TwitterApifyJob, }, diff --git a/internal/capabilities/detector_test.go b/internal/capabilities/detector_test.go index 5b0bdb4e..4caf4bc3 100644 --- a/internal/capabilities/detector_test.go +++ b/internal/capabilities/detector_test.go @@ -147,7 +147,7 @@ var _ = Describe("DetectCapabilities", func() { }) Context("Apify Integration", func() { - It("should add enhanced capabilities when valid Apify API key is provided", func() { + FIt("should add enhanced capabilities when valid Apify API key is provided", func() { apifyKey := os.Getenv("APIFY_API_KEY") if apifyKey == "" { Skip("APIFY_API_KEY is not set") @@ -171,9 +171,13 @@ var _ = Describe("DetectCapabilities", func() { Expect(twitterApifyCaps).To(ContainElement(teetypes.CapGetFollowers), "expected twitter-apify to include CapGetFollowers capability") Expect(twitterApifyCaps).To(ContainElement(teetypes.CapGetFollowing), "expected twitter-apify to include CapGetFollowing capability") - // Reddit should be present - _, hasReddit := caps[teetypes.RedditJob] + // Reddit should be present (only if rented!) + redditCaps, hasReddit := caps[teetypes.RedditJob] Expect(hasReddit).To(BeTrue(), "expected reddit capabilities to be present") + Expect(redditCaps).To(ContainElement(teetypes.CapScrapeUrls), "expected reddit to include CapScrapeUrls capability") + Expect(redditCaps).To(ContainElement(teetypes.CapSearchPosts), "expected reddit to include CapSearchPosts capability") + Expect(redditCaps).To(ContainElement(teetypes.CapSearchUsers), "expected reddit to include CapSearchUsers capability") + Expect(redditCaps).To(ContainElement(teetypes.CapSearchCommunities), "expected reddit to include CapSearchCommunities capability") }) It("should add enhanced capabilities when valid Apify API key is provided alongside a Gemini API key", func() { apifyKey := os.Getenv("APIFY_API_KEY") From 02bdd01fb57d835a8e1212f9207c7f16b643097a Mon Sep 17 00:00:00 2001 From: grantdfoster Date: Thu, 18 Sep 2025 19:21:59 +0200 Subject: [PATCH 08/10] fix: remove focused test --- internal/capabilities/detector_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/capabilities/detector_test.go b/internal/capabilities/detector_test.go index 4caf4bc3..c948ff98 100644 --- a/internal/capabilities/detector_test.go +++ b/internal/capabilities/detector_test.go @@ -147,7 +147,7 @@ var _ = Describe("DetectCapabilities", func() { }) Context("Apify Integration", func() { - FIt("should add enhanced capabilities when valid Apify API key is provided", func() { + It("should add enhanced capabilities when valid Apify API key is provided", func() { apifyKey := os.Getenv("APIFY_API_KEY") if apifyKey == "" { Skip("APIFY_API_KEY is not set") From 641701a46213e046f1dc6e7ddddd4876d416189b Mon Sep 17 00:00:00 2001 From: grantdfoster Date: Thu, 18 Sep 2025 19:29:37 +0200 Subject: [PATCH 09/10] fix: remove unecessary casting --- internal/capabilities/detector.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/capabilities/detector.go b/internal/capabilities/detector.go index 600b02e9..aa663b00 100644 --- a/internal/capabilities/detector.go +++ b/internal/capabilities/detector.go @@ -80,7 +80,7 @@ func DetectCapabilities(jc config.JobConfiguration, jobServer JobServerInterface continue } - if ok, _ := c.ProbeActorAccess(actor.ActorId, map[string]any(actor.DefaultInput)); ok { + if ok, _ := c.ProbeActorAccess(actor.ActorId, actor.DefaultInput); ok { if _, exists := jobToSet[actor.JobType]; !exists { jobToSet[actor.JobType] = util.NewSet[teetypes.Capability]() } From 1ccae9a6272ca0c6ffa51297db8f40447f210235 Mon Sep 17 00:00:00 2001 From: grantdfoster Date: Thu, 18 Sep 2025 19:34:31 +0200 Subject: [PATCH 10/10] fix: web detection --- internal/apify/actors.go | 2 +- internal/capabilities/detector_test.go | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/internal/apify/actors.go b/internal/apify/actors.go index 30b0fc71..3d858d3e 100644 --- a/internal/apify/actors.go +++ b/internal/apify/actors.go @@ -59,7 +59,7 @@ var Actors = []ActorConfig{ }, { ActorId: ActorIds.WebScraper, - DefaultInput: defaultActorInput{}, + DefaultInput: defaultActorInput{"startUrls": []map[string]any{{"url": "https://docs.learnbittensor.org"}}}, Capabilities: teetypes.WebCaps, JobType: teetypes.WebJob, }, diff --git a/internal/capabilities/detector_test.go b/internal/capabilities/detector_test.go index c948ff98..57b9e7e8 100644 --- a/internal/capabilities/detector_test.go +++ b/internal/capabilities/detector_test.go @@ -197,8 +197,9 @@ var _ = Describe("DetectCapabilities", func() { caps := DetectCapabilities(jc, nil) // Web should be present - _, hasWeb := caps[teetypes.WebJob] + webCaps, hasWeb := caps[teetypes.WebJob] Expect(hasWeb).To(BeTrue(), "expected web capabilities to be present") + Expect(webCaps).To(ContainElement(teetypes.CapScraper), "expected web to include CapScraper capability") }) }) })