Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,7 @@ func main() {

// Step 1: Create the job request
job := types.Job{
Type: "web-scraper",
Type: "web",
Arguments: map[string]interface{}{
"url": "https://google.com",
"depth": 1,
Expand Down
1 change: 1 addition & 0 deletions internal/capabilities/detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func DetectCapabilities(jc types.JobConfiguration, jobServer JobServerInterface)
}

// Add Apify-specific capabilities based on available API key
// TODO: We should verify whether each of the actors is actually available through this API key
if hasApifyKey {
capabilities[teetypes.TwitterApifyJob] = teetypes.TwitterApifyCaps
capabilities[teetypes.RedditJob] = teetypes.RedditCaps
Expand Down
6 changes: 0 additions & 6 deletions internal/capabilities/detector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,6 @@ func TestDetectCapabilities(t *testing.T) {
}
}

// Helper function to check if a job type exists in capabilities
func hasJobType(capabilities teetypes.WorkerCapabilities, jobName string) bool {
_, exists := capabilities[teetypes.JobType(jobName)]
return exists
}

func TestDetectCapabilities_ScraperTypes(t *testing.T) {
tests := []struct {
name string
Expand Down
14 changes: 14 additions & 0 deletions internal/jobs/reddit.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,17 @@ func processRedditResponse(j types.Job, resp []*reddit.Response, cursor client.C
NextCursor: cursor.String(),
}, nil
}

// GetStructuredCapabilities returns the structured capabilities supported by this Twitter scraper
// based on the available credentials and API keys
func (rs *RedditScraper) GetStructuredCapabilities() teetypes.WorkerCapabilities {
capabilities := make(teetypes.WorkerCapabilities)

// Add Apify-specific capabilities based on available API key
// TODO: We should verify whether each of the actors is actually available through this API key
if rs.configuration.ApifyApiKey != "" {
capabilities[teetypes.RedditJob] = teetypes.RedditCaps
}

return capabilities
}
4 changes: 4 additions & 0 deletions internal/jobs/twitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ func (ts *TwitterScraper) getApiScraper(j types.Job) (*twitterx.TwitterXScraper,

// getApifyScraper returns an Apify client
func (ts *TwitterScraper) getApifyScraper(j types.Job) (*twitterapify.TwitterApifyClient, error) {
// TODO: We should verify whether each of the actors is actually available through this API key
if ts.configuration.ApifyApiKey == "" {
ts.statsCollector.Add(j.WorkerID, stats.TwitterAuthErrors, 1)
return nil, fmt.Errorf("no Apify API key available")
Expand Down Expand Up @@ -980,6 +981,7 @@ func NewTwitterScraper(jc types.JobConfiguration, c *stats.StatsCollector) *Twit
accountManager.DetectAllApiKeyTypes()

// Validate Apify API key at startup if provided (similar to API key detection)
// TODO: We should verify whether each of the actors is actually available through this API key
if config.ApifyApiKey != "" {
apifyScraper, err := twitterapify.NewTwitterApifyClient(config.ApifyApiKey)
if err != nil {
Expand Down Expand Up @@ -1058,6 +1060,7 @@ func (ts *TwitterScraper) GetStructuredCapabilities() teetypes.WorkerCapabilitie
}

// Add Apify-specific capabilities based on available API key
// TODO: We should verify whether each of the actors is actually available through this API key
if ts.configuration.ApifyApiKey != "" {
capabilities[teetypes.TwitterApifyJob] = teetypes.TwitterApifyCaps
}
Expand Down Expand Up @@ -1181,6 +1184,7 @@ func (s *DefaultScrapeStrategy) Execute(j types.Job, ts *TwitterScraper, jobArgs
switch capability {
case teetypes.CapGetFollowers, teetypes.CapGetFollowing:
// Priority: Apify > Credentials for general TwitterJob
// TODO: We should verify whether each of the actors is actually available through this API key
if ts.configuration.ApifyApiKey != "" {
// Use Apify strategy
apifyStrategy := &ApifyScrapeStrategy{}
Expand Down
21 changes: 7 additions & 14 deletions internal/jobserver/jobserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,26 +151,19 @@ func NewJobServer(workers int, jc types.JobConfiguration) *JobServer {
return js
}

// CapabilityProvider is an interface for workers that can report their capabilities
type CapabilityProvider interface {
GetStructuredCapabilities() teetypes.WorkerCapabilities
}

// GetWorkerCapabilities returns the structured capabilities for all registered workers
func (js *JobServer) GetWorkerCapabilities() teetypes.WorkerCapabilities {
// Use a map to deduplicate capabilities by job type
jobTypeCapMap := make(map[teetypes.JobType]map[teetypes.Capability]struct{})

for _, workerEntry := range js.jobWorkers {
if provider, ok := workerEntry.w.(CapabilityProvider); ok {
workerCapabilities := provider.GetStructuredCapabilities()
for jobType, capabilities := range workerCapabilities {
if _, exists := jobTypeCapMap[jobType]; !exists {
jobTypeCapMap[jobType] = make(map[teetypes.Capability]struct{})
}
for _, capability := range capabilities {
jobTypeCapMap[jobType][capability] = struct{}{}
}
workerCapabilities := workerEntry.w.GetStructuredCapabilities()
for jobType, capabilities := range workerCapabilities {
if _, exists := jobTypeCapMap[jobType]; !exists {
jobTypeCapMap[jobType] = make(map[teetypes.Capability]struct{})
}
for _, capability := range capabilities {
jobTypeCapMap[jobType][capability] = struct{}{}
}
}
}
Expand Down
22 changes: 12 additions & 10 deletions internal/jobserver/jobserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

teetypes "github.com/masa-finance/tee-types/types"

"github.com/masa-finance/tee-worker/api/types"
"github.com/masa-finance/tee-worker/internal/config"
. "github.com/masa-finance/tee-worker/internal/jobserver"
Expand All @@ -22,8 +24,8 @@ var _ = Describe("Jobserver", func() {
jobserver := NewJobServer(2, types.JobConfiguration{})

uuid, err := jobserver.AddJob(types.Job{
Type: "web-scraper",
Arguments: map[string]interface{}{
Type: teetypes.WebJob,
Arguments: map[string]any{
"url": "google",
},
})
Expand All @@ -49,8 +51,8 @@ var _ = Describe("Jobserver", func() {
jobserver := NewJobServer(2, types.JobConfiguration{})

uuid, err := jobserver.AddJob(types.Job{
Type: "web-scraper",
Arguments: map[string]interface{}{
Type: teetypes.WebJob,
Arguments: map[string]any{
"url": "google",
},
Nonce: "1234567890",
Expand All @@ -62,9 +64,9 @@ var _ = Describe("Jobserver", func() {
Expect(err.Error()).To(ContainSubstring("this job is not from a whitelisted miner"))

uuid, err = jobserver.AddJob(types.Job{
Type: "web-scraper",
Type: teetypes.WebJob,
WorkerID: "miner1",
Arguments: map[string]interface{}{
Arguments: map[string]any{
"url": "google",
},
Nonce: "1234567891",
Expand All @@ -79,8 +81,8 @@ var _ = Describe("Jobserver", func() {
jobserver := NewJobServer(2, types.JobConfiguration{})

uuid, err := jobserver.AddJob(types.Job{
Type: "web-scraper",
Arguments: map[string]interface{}{
Type: teetypes.WebJob,
Arguments: map[string]any{
"url": "google",
},
Nonce: "1234567890",
Expand All @@ -94,8 +96,8 @@ var _ = Describe("Jobserver", func() {
Expect(exists).ToNot(BeTrue())

uuid, err = jobserver.AddJob(types.Job{
Type: "web-scraper",
Arguments: map[string]interface{}{
Type: teetypes.WebJob,
Arguments: map[string]any{
"url": "google",
},
Nonce: "1234567890",
Expand Down
2 changes: 2 additions & 0 deletions internal/jobserver/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"

teetypes "github.com/masa-finance/tee-types/types"
"github.com/masa-finance/tee-worker/api/types"
"github.com/sirupsen/logrus"
)
Expand All @@ -25,6 +26,7 @@ func (js *JobServer) worker(c context.Context) {
}

type worker interface {
GetStructuredCapabilities() teetypes.WorkerCapabilities
ExecuteJob(j types.Job) (types.JobResult, error)
}

Expand Down
Loading