diff --git a/Dockerfile b/Dockerfile index 7ebc8e41..c5eac519 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -ARG egover=1.6.0 +ARG egover=1.7.2 ARG baseimage=ghcr.io/edgelesssys/ego-deploy:v${egover} ARG VERSION diff --git a/Makefile b/Makefile index 28fdaaaa..68f98a0b 100644 --- a/Makefile +++ b/Makefile @@ -76,5 +76,8 @@ test-jobs: docker-build-test test-twitter: docker-build-test @docker run --user root $(ENV_FILE_ARG) -v $(PWD)/.masa:/home/masa -v $(PWD)/coverage:/app/coverage --rm --workdir /app -e DATA_DIR=/home/masa $(TEST_IMAGE) go test -v ./internal/jobs/twitter_test.go ./internal/jobs/jobs_suite_test.go +test-web: docker-build-test + @docker run --user root $(ENV_FILE_ARG) -v $(PWD)/.masa:/home/masa -v $(PWD)/coverage:/app/coverage --rm --workdir /app -e DATA_DIR=/home/masa $(TEST_IMAGE) go test -v ./internal/jobs/webscraper_test.go ./internal/jobs/jobs_suite_test.go + test-telemetry: docker-build-test @docker run --user root $(ENV_FILE_ARG) -v $(PWD)/.masa:/home/masa -v $(PWD)/coverage:/app/coverage --rm --workdir /app -e DATA_DIR=/home/masa $(TEST_IMAGE) go test -v ./internal/jobs/telemetry_test.go ./internal/jobs/jobs_suite_test.go \ No newline at end of file diff --git a/README.md b/README.md index fb5d9363..1828962b 100644 --- a/README.md +++ b/README.md @@ -58,6 +58,7 @@ The tee-worker requires various environment variables for operation. These shoul - `TWITTER_SKIP_LOGIN_VERIFICATION`: Set to `true` to skip Twitter's login verification step. This can help avoid rate limiting issues with Twitter's verify_credentials API endpoint when running multiple workers or processing large volumes of requests. - `TIKTOK_DEFAULT_LANGUAGE`: Default language for TikTok transcriptions (default: `eng-US`). - `TIKTOK_API_USER_AGENT`: User-Agent header for TikTok API requests (default: standard mobile browser user agent). +- `APIFY_API_KEY`: API key for Apify Twitter scraping services. Required for `twitter-apify` job type and enables enhanced follower/following data collection. - `LISTEN_ADDRESS`: The address the service listens on (default: `:8080`). - `RESULT_CACHE_MAX_SIZE`: Maximum number of job results to keep in the result cache (default: `1000`). - `RESULT_CACHE_MAX_AGE_SECONDS`: Maximum age (in seconds) to keep a result in the cache (default: `600`). @@ -72,11 +73,11 @@ The worker automatically detects and exposes capabilities based on available con **Core Services (Always Available):** 1. **`web`** - Web scraping services - - **Sub-capabilities**: `["web-scraper"]` + - **Sub-capabilities**: `["scraper"]` - **Requirements**: None (always available) 2. **`tiktok`** - TikTok video processing - - **Sub-capabilities**: `["tiktok-transcription"]` + - **Sub-capabilities**: `["transcription"]` - **Requirements**: None (always available) **Twitter Services (Configuration-Dependent):** @@ -90,12 +91,17 @@ The worker automatically detects and exposes capabilities based on available con - **Requirements**: `TWITTER_API_KEYS` environment variable 5. **`twitter`** - General Twitter scraping (uses best available auth) - - **Sub-capabilities**: Dynamic based on available authentication (same as credential or API depending on what's configured) - - **Requirements**: Either `TWITTER_ACCOUNTS` or `TWITTER_API_KEYS` + - **Sub-capabilities**: Dynamic based on available authentication (combines capabilities from credential, API, and Apify depending on what's configured) + - **Requirements**: Either `TWITTER_ACCOUNTS`, `TWITTER_API_KEYS`, or `APIFY_API_KEY` + - **Priority**: For follower/following operations: Apify > Credentials. For search operations: Credentials > API. + +6. **`twitter-apify`** - Twitter scraping using Apify's API (requires `APIFY_API_KEY`) + - **Sub-capabilities**: `["getfollowers", "getfollowing"]` + - **Requirements**: `APIFY_API_KEY` environment variable **Stats Service (Always Available):** -6. **`telemetry`** - Worker monitoring and stats +7. **`telemetry`** - Worker monitoring and stats - **Sub-capabilities**: `["telemetry"]` - **Requirements**: None (always available) @@ -142,7 +148,7 @@ curl -s localhost:8080/job/result \ All job types follow the same API flow above. Here are the available job types and their specific parameters: -#### `web-scraper` +#### `web` Scrapes content from web pages. **Parameters:** @@ -151,8 +157,9 @@ Scrapes content from web pages. ```json { - "type": "web-scraper", + "type": "web", "arguments": { + "type": "scraper", "url": "https://www.google.com", "depth": 1 } @@ -185,8 +192,9 @@ Transcribes TikTok videos to text. ```json { - "type": "tiktok-transcription", + "type": "tiktok", "arguments": { + "type": "transcription", "video_url": "https://www.tiktok.com/@coachty23/video/7502100651397172526", "language": "eng-US" } @@ -195,10 +203,11 @@ Transcribes TikTok videos to text. #### Twitter Job Types -Twitter scraping is available through three job types: -- `twitter-scraper`: Uses best available auth method (credential or API) -- `twitter-credential-scraper`: Forces credential-based scraping (requires `TWITTER_ACCOUNTS`) -- `twitter-api-scraper`: Forces API-based scraping (requires `TWITTER_API_KEYS`) +Twitter scraping is available through four job types: +- `twitter`: Uses best available auth method (credential, API, or Apify) +- `twitter-credential`: Forces credential-based scraping (requires `TWITTER_ACCOUNTS`) +- `twitter-api`: Forces API-based scraping (requires `TWITTER_API_KEYS`) +- `twitter-apify`: Forces Apify-based scraping (requires `APIFY_API_KEY`) **Common Parameters:** - `type` (string, required): The operation type (see sub-capabilities below) @@ -211,7 +220,7 @@ Twitter scraping is available through three job types: **`searchbyquery`** - Search tweets using Twitter query syntax ```json { - "type": "twitter-scraper", + "type": "twitter", "arguments": { "type": "searchbyquery", "query": "climate change", @@ -223,7 +232,7 @@ Twitter scraping is available through three job types: **`searchbyfullarchive`** - Search full tweet archive (requires elevated API key for API-based scraping) ```json { - "type": "twitter-api-scraper", + "type": "twitter-api", "arguments": { "type": "searchbyfullarchive", "query": "NASA", @@ -235,7 +244,7 @@ Twitter scraping is available through three job types: **`getbyid`** - Get specific tweet by ID ```json { - "type": "twitter-scraper", + "type": "twitter", "arguments": { "type": "getbyid", "query": "1881258110712492142" @@ -246,7 +255,7 @@ Twitter scraping is available through three job types: **`getreplies`** - Get replies to a specific tweet ```json { - "type": "twitter-credential-scraper", + "type": "twitter-credential", "arguments": { "type": "getreplies", "query": "1234567890", @@ -258,7 +267,7 @@ Twitter scraping is available through three job types: **`getretweeters`** - Get users who retweeted a specific tweet ```json { - "type": "twitter-credential-scraper", + "type": "twitter-credential", "arguments": { "type": "getretweeters", "query": "1234567890", @@ -272,7 +281,7 @@ Twitter scraping is available through three job types: **`gettweets`** - Get tweets from a user's timeline ```json { - "type": "twitter-credential-scraper", + "type": "twitter-credential", "arguments": { "type": "gettweets", "query": "NASA", @@ -284,7 +293,7 @@ Twitter scraping is available through three job types: **`getmedia`** - Get media (photos/videos) from a user ```json { - "type": "twitter-credential-scraper", + "type": "twitter-credential", "arguments": { "type": "getmedia", "query": "NASA", @@ -296,7 +305,7 @@ Twitter scraping is available through three job types: **`gethometweets`** - Get authenticated user's home timeline (credential-based only) ```json { - "type": "twitter-credential-scraper", + "type": "twitter-credential", "arguments": { "type": "gethometweets", "max_results": 30 @@ -307,7 +316,7 @@ Twitter scraping is available through three job types: **`getforyoutweets`** - Get "For You" timeline (credential-based only) ```json { - "type": "twitter-credential-scraper", + "type": "twitter-credential", "arguments": { "type": "getforyoutweets", "max_results": 25 @@ -320,7 +329,7 @@ Twitter scraping is available through three job types: **`searchbyprofile`** - Get user profile information ```json { - "type": "twitter-credential-scraper", + "type": "twitter-credential", "arguments": { "type": "searchbyprofile", "query": "NASA_Marshall" @@ -331,7 +340,7 @@ Twitter scraping is available through three job types: **`getprofilebyid`** - Get user profile by user ID ```json { - "type": "twitter-scraper", + "type": "twitter", "arguments": { "type": "getprofilebyid", "query": "44196397" @@ -342,7 +351,7 @@ Twitter scraping is available through three job types: **`getfollowers`** - Get followers of a profile ```json { - "type": "twitter-credential-scraper", + "type": "twitter-credential", "arguments": { "type": "getfollowers", "query": "NASA", @@ -351,10 +360,23 @@ Twitter scraping is available through three job types: } ``` +**`getfollowers`** (using Apify for enhanced data) - Get followers with detailed profile information +```json +{ + "type": "twitter-apify", + "arguments": { + "type": "getfollowers", + "query": "NASA", + "max_results": 100, + "next_cursor": "optional_pagination_cursor" + } +} +``` + **`getfollowing`** - Get users that a profile is following ```json { - "type": "twitter-credential-scraper", + "type": "twitter-credential", "arguments": { "type": "getfollowing", "query": "NASA", @@ -363,18 +385,43 @@ Twitter scraping is available through three job types: } ``` +**`getfollowing`** (using Apify for enhanced data) - Get following with detailed profile information +```json +{ + "type": "twitter-apify", + "arguments": { + "type": "getfollowing", + "query": "NASA", + "max_results": 100, + "next_cursor": "optional_pagination_cursor" + } +} +``` + ##### Other Operations **`gettrends`** - Get trending topics (no query required) ```json { - "type": "twitter-credential-scraper", + "type": "twitter-credential", "arguments": { "type": "gettrends" } } ``` +##### Return Types + +**Enhanced Profile Data with Apify**: When using `twitter-apify` for `getfollowers` or `getfollowing` operations, the response returns `ProfileResultApify` objects which include comprehensive profile information such as: +- Basic profile data (ID, name, screen name, location, description) +- Detailed follower/following counts and engagement metrics +- Profile appearance settings and colors +- Account verification and security status +- Privacy and interaction settings +- Business account information when available + +This enhanced data provides richer insights compared to standard credential or API-based profile results. + ### Health Check Endpoints The service provides health check endpoints: diff --git a/api/types/job.go b/api/types/job.go index d15fd43d..a228c66c 100644 --- a/api/types/job.go +++ b/api/types/job.go @@ -180,3 +180,25 @@ func (jc JobConfiguration) GetBool(key string, def bool) bool { } return def } + +// TwitterScraperConfig represents the configuration needed for Twitter scraping +// This is defined here to avoid circular imports between api/types and internal/jobs +type TwitterScraperConfig struct { + Accounts []string + ApiKeys []string + ApifyApiKey string + DataDir string + SkipLoginVerification bool +} + +// GetTwitterConfig constructs a TwitterScraperConfig directly from the JobConfiguration +// This eliminates the need for JSON marshaling/unmarshaling +func (jc JobConfiguration) GetTwitterConfig() TwitterScraperConfig { + return TwitterScraperConfig{ + Accounts: jc.GetStringSlice("twitter_accounts", []string{}), + ApiKeys: jc.GetStringSlice("twitter_api_keys", []string{}), + ApifyApiKey: jc.GetString("apify_api_key", ""), + DataDir: jc.GetString("data_dir", ""), + SkipLoginVerification: jc.GetBool("skip_login_verification", false), + } +} diff --git a/cmd/tee-worker/config.go b/cmd/tee-worker/config.go index 9e3230c4..a782b4d2 100644 --- a/cmd/tee-worker/config.go +++ b/cmd/tee-worker/config.go @@ -103,6 +103,15 @@ func readConfig() types.JobConfiguration { jc["twitter_api_keys"] = []string{} } + // Apify API key loading + apifyApiKey := os.Getenv("APIFY_API_KEY") + if apifyApiKey != "" { + logrus.Info("Apify API key found") + jc["apify_api_key"] = apifyApiKey + } else { + jc["apify_api_key"] = "" + } + tikTokLang := os.Getenv("TIKTOK_DEFAULT_LANGUAGE") if tikTokLang == "" { tikTokLang = "eng-US" diff --git a/go.mod b/go.mod index df165298..52861b0f 100644 --- a/go.mod +++ b/go.mod @@ -2,20 +2,20 @@ module github.com/masa-finance/tee-worker go 1.23.0 -toolchain go1.24.0 +toolchain go1.24.3 require ( github.com/cenkalti/backoff v2.2.1+incompatible - github.com/edgelesssys/ego v1.5.4 + github.com/edgelesssys/ego v1.7.2 github.com/gocolly/colly v1.2.0 github.com/google/uuid v1.6.0 - github.com/imperatrona/twitter-scraper v0.0.0-00010101000000-000000000000 + github.com/imperatrona/twitter-scraper v0.0.18 github.com/joho/godotenv v1.5.1 - github.com/labstack/echo-contrib v0.17.3 - github.com/labstack/echo/v4 v4.13.3 - github.com/masa-finance/tee-types v1.1.3 - github.com/onsi/ginkgo/v2 v2.23.3 - github.com/onsi/gomega v1.36.2 + github.com/labstack/echo-contrib v0.17.4 + github.com/labstack/echo/v4 v4.13.4 + github.com/masa-finance/tee-types v1.1.6 + github.com/onsi/ginkgo/v2 v2.23.4 + github.com/onsi/gomega v1.38.0 github.com/sirupsen/logrus v1.9.3 ) @@ -24,21 +24,22 @@ replace github.com/imperatrona/twitter-scraper => github.com/masa-finance/twitte require ( github.com/AlexEidt/Vidio v1.5.1 // indirect github.com/go-task/slim-sprig/v3 v3.0.0 // indirect + go.uber.org/automaxprocs v1.6.0 // indirect ) require ( - github.com/PuerkitoBio/goquery v1.9.0 // indirect - github.com/andybalholm/cascadia v1.3.2 // indirect - github.com/antchfx/htmlquery v1.3.3 // indirect - github.com/antchfx/xmlquery v1.4.2 // indirect - github.com/antchfx/xpath v1.3.2 // indirect - github.com/go-jose/go-jose/v4 v4.0.4 // indirect - github.com/go-logr/logr v1.4.2 // indirect + github.com/PuerkitoBio/goquery v1.10.3 // indirect + github.com/andybalholm/cascadia v1.3.3 // indirect + github.com/antchfx/htmlquery v1.3.4 // indirect + github.com/antchfx/xmlquery v1.4.4 // indirect + github.com/antchfx/xpath v1.3.4 // indirect + github.com/go-jose/go-jose/v4 v4.1.2 // indirect + github.com/go-logr/logr v1.4.3 // indirect github.com/gobwas/glob v0.2.3 // indirect - github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect - github.com/golang/protobuf v1.5.3 // indirect - github.com/google/go-cmp v0.6.0 // indirect - github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad // indirect + github.com/golang/groupcache v0.0.0-20241129210726-2c02b8208cf8 // indirect + github.com/golang/protobuf v1.5.4 // indirect + github.com/google/go-cmp v0.7.0 // indirect + github.com/google/pprof v0.0.0-20250630185457-6e76a2b096b5 // indirect github.com/kennygrant/sanitize v1.2.4 // indirect github.com/labstack/gommon v0.4.2 github.com/mattn/go-colorable v0.1.14 // indirect @@ -47,14 +48,14 @@ require ( github.com/temoto/robotstxt v1.1.2 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/fasttemplate v1.2.2 // indirect - golang.org/x/crypto v0.36.0 // indirect - golang.org/x/exp v0.0.0-20240904232852-e7e105dedf7e - golang.org/x/net v0.37.0 // indirect - golang.org/x/sys v0.31.0 // indirect - golang.org/x/text v0.23.0 // indirect - golang.org/x/time v0.11.0 // indirect - golang.org/x/tools v0.30.0 // indirect + golang.org/x/crypto v0.41.0 // indirect + golang.org/x/exp v0.0.0-20250718183923-645b1fa84792 + golang.org/x/net v0.43.0 // indirect + golang.org/x/sys v0.35.0 // indirect + golang.org/x/text v0.28.0 // indirect + golang.org/x/time v0.12.0 // indirect + golang.org/x/tools v0.35.0 // indirect google.golang.org/appengine v1.6.8 // indirect - google.golang.org/protobuf v1.36.5 // indirect + google.golang.org/protobuf v1.36.7 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 7ad58184..b381ea6e 100644 --- a/go.sum +++ b/go.sum @@ -1,71 +1,78 @@ github.com/AlexEidt/Vidio v1.5.1 h1:tovwvtgQagUz1vifiL9OeWkg1fP/XUzFazFKh7tFtaE= github.com/AlexEidt/Vidio v1.5.1/go.mod h1:djhIMnWMqPrC3X6nB6ymGX6uWWlgw+VayYGKE1bNwmI= -github.com/PuerkitoBio/goquery v1.9.0 h1:zgjKkdpRY9T97Q5DCtcXwfqkcylSFIVCocZmn2huTp8= -github.com/PuerkitoBio/goquery v1.9.0/go.mod h1:cW1n6TmIMDoORQU5IU/P1T3tGFunOeXEpGP2WHRwkbY= -github.com/andybalholm/cascadia v1.3.2 h1:3Xi6Dw5lHF15JtdcmAHD3i1+T8plmv7BQ/nsViSLyss= -github.com/andybalholm/cascadia v1.3.2/go.mod h1:7gtRlve5FxPPgIgX36uWBX58OdBsSS6lUvCFb+h7KvU= -github.com/antchfx/htmlquery v1.3.3 h1:x6tVzrRhVNfECDaVxnZi1mEGrQg3mjE/rxbH2Pe6dNE= -github.com/antchfx/htmlquery v1.3.3/go.mod h1:WeU3N7/rL6mb6dCwtE30dURBnBieKDC/fR8t6X+cKjU= -github.com/antchfx/xmlquery v1.4.2 h1:MZKd9+wblwxfQ1zd1AdrTsqVaMjMCwow3IqkCSe00KA= -github.com/antchfx/xmlquery v1.4.2/go.mod h1:QXhvf5ldTuGqhd1SHNvvtlhhdQLks4dD0awIVhXIDTA= -github.com/antchfx/xpath v1.3.2 h1:LNjzlsSjinu3bQpw9hWMY9ocB80oLOWuQqFvO6xt51U= -github.com/antchfx/xpath v1.3.2/go.mod h1:i54GszH55fYfBmoZXapTHN8T8tkcHfRgLyVwwqzXNcs= +github.com/PuerkitoBio/goquery v1.10.3 h1:pFYcNSqHxBD06Fpj/KsbStFRsgRATgnf3LeXiUkhzPo= +github.com/PuerkitoBio/goquery v1.10.3/go.mod h1:tMUX0zDMHXYlAQk6p35XxQMqMweEKB7iK7iLNd4RH4Y= +github.com/andybalholm/cascadia v1.3.3 h1:AG2YHrzJIm4BZ19iwJ/DAua6Btl3IwJX+VI4kktS1LM= +github.com/andybalholm/cascadia v1.3.3/go.mod h1:xNd9bqTn98Ln4DwST8/nG+H0yuB8Hmgu1YHNnWw0GeA= +github.com/antchfx/htmlquery v1.3.4 h1:Isd0srPkni2iNTWCwVj/72t7uCphFeor5Q8nCzj1jdQ= +github.com/antchfx/htmlquery v1.3.4/go.mod h1:K9os0BwIEmLAvTqaNSua8tXLWRWZpocZIH73OzWQbwM= +github.com/antchfx/xmlquery v1.4.4 h1:mxMEkdYP3pjKSftxss4nUHfjBhnMk4imGoR96FRY2dg= +github.com/antchfx/xmlquery v1.4.4/go.mod h1:AEPEEPYE9GnA2mj5Ur2L5Q5/2PycJ0N9Fusrx9b12fc= +github.com/antchfx/xpath v1.3.3/go.mod h1:i54GszH55fYfBmoZXapTHN8T8tkcHfRgLyVwwqzXNcs= +github.com/antchfx/xpath v1.3.4 h1:1ixrW1VnXd4HurCj7qnqnR0jo14g8JMe20Fshg1Vgz4= +github.com/antchfx/xpath v1.3.4/go.mod h1:i54GszH55fYfBmoZXapTHN8T8tkcHfRgLyVwwqzXNcs= github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4= github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/edgelesssys/ego v1.5.4 h1:ADc6t5j77mOfwwu+akZX/I41YzHoseYiBcM5aME+Hb0= -github.com/edgelesssys/ego v1.5.4/go.mod h1:t10m29KSwG2hKwWFIq7/vuzfoKhPIdevOXx8nm636iU= -github.com/go-jose/go-jose/v4 v4.0.4 h1:VsjPI33J0SB9vQM6PLmNjoHqMQNGPiZ0rHL7Ni7Q6/E= -github.com/go-jose/go-jose/v4 v4.0.4/go.mod h1:NKb5HO1EZccyMpiZNbdUw/14tiXNyUJh188dfnMCAfc= -github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= -github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/edgelesssys/ego v1.7.2 h1:m1rPkrQBlVycE7ofzbijaZlZFUIUVwhGIYKks5FdLxU= +github.com/edgelesssys/ego v1.7.2/go.mod h1:MkciSCrXddC6YYsmUTXeoQwFsbs17ncR3KKB+Ul3uRM= +github.com/go-jose/go-jose/v4 v4.1.2 h1:TK/7NqRQZfgAh+Td8AlsrvtPoUyiHh0LqVvokh+1vHI= +github.com/go-jose/go-jose/v4 v4.1.2/go.mod h1:22cg9HWM1pOlnRiY+9cQYJ9XHmya1bYW8OeDM6Ku6Oo= +github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= +github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI= github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= github.com/gobwas/glob v0.2.3 h1:A4xDbljILXROh+kObIiy5kIaPYD8e96x1tgBhUI5J+Y= github.com/gobwas/glob v0.2.3/go.mod h1:d3Ez4x06l9bZtSvzIay5+Yzi0fmZzPgnTbPcKjJAkT8= github.com/gocolly/colly v1.2.0 h1:qRz9YAn8FIH0qzgNUw+HT9UN7wm1oF9OBAilwEWpyrI= github.com/gocolly/colly v1.2.0/go.mod h1:Hof5T3ZswNVsOHYmba1u03W65HDWgpV5HifSuueE0EA= -github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE= github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= +github.com/golang/groupcache v0.0.0-20241129210726-2c02b8208cf8 h1:f+oWsMOmNPc8JmEHVZIycC7hBoQxHH9pNKQORJNozsQ= +github.com/golang/groupcache v0.0.0-20241129210726-2c02b8208cf8/go.mod h1:wcDNUvekVysuuOpQKo3191zZyTpiI6se1N1ULghS0sw= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= -github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= -github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= -github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad h1:a6HEuzUHeKH6hwfN/ZoQgRgVIWFJljSWa/zetS2WTvg= -github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad/go.mod h1:vavhavw2zAxS5dIdcRluK6cSGGPlZynqzFM8NdvU144= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/google/pprof v0.0.0-20250630185457-6e76a2b096b5 h1:xhMrHhTJ6zxu3gA4enFM9MLn9AY7613teCdFnlUVbSQ= +github.com/google/pprof v0.0.0-20250630185457-6e76a2b096b5/go.mod h1:5hDyRhoBCxViHszMt12TnOpEI4VVi+U8Gm9iphldiMA= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= github.com/kennygrant/sanitize v1.2.4 h1:gN25/otpP5vAsO2djbMhF/LQX6R7+O1TB4yv8NzpJ3o= github.com/kennygrant/sanitize v1.2.4/go.mod h1:LGsjYYtgxbetdg5owWB2mpgUL6e2nfw2eObZ0u0qvak= -github.com/labstack/echo-contrib v0.17.3 h1:hj+qXksKZG1scSe9ksUXMtv7fZYN+PtQT+bPcYA3/TY= -github.com/labstack/echo-contrib v0.17.3/go.mod h1:TcRBrzW8jcC4JD+5Dc/pvOyAps0rtgzj7oBqoR3nYsc= -github.com/labstack/echo/v4 v4.13.3 h1:pwhpCPrTl5qry5HRdM5FwdXnhXSLSY+WE+YQSeCaafY= -github.com/labstack/echo/v4 v4.13.3/go.mod h1:o90YNEeQWjDozo584l7AwhJMHN0bOC4tAfg+Xox9q5g= +github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/labstack/echo-contrib v0.17.4 h1:g5mfsrJfJTKv+F5uNKCyrjLK7js+ZW6HTjg4FnDxxgk= +github.com/labstack/echo-contrib v0.17.4/go.mod h1:9O7ZPAHUeMGTOAfg80YqQduHzt0CzLak36PZRldYrZ0= +github.com/labstack/echo/v4 v4.13.4 h1:oTZZW+T3s9gAu5L8vmzihV7/lkXGZuITzTQkTEhcXEA= +github.com/labstack/echo/v4 v4.13.4/go.mod h1:g63b33BZ5vZzcIUF8AtRH40DrTlXnx4UMC8rBdndmjQ= github.com/labstack/gommon v0.4.2 h1:F8qTUNXgG1+6WQmqoUWnz8WiEU60mXVVw0P4ht1WRA0= github.com/labstack/gommon v0.4.2/go.mod h1:QlUFxVM+SNXhDL/Z7YhocGIBYOiwB0mXm1+1bAPHPyU= -github.com/masa-finance/tee-types v1.1.2 h1:lm+a0wh4i9RNymBa190ZTuO0VRcGyhQWf96rU2M8840= -github.com/masa-finance/tee-types v1.1.2/go.mod h1:hF+wFRjmYuD0qkAZvH55BizPpiI7GiZCDqWkkclQ2sE= -github.com/masa-finance/tee-types v1.1.3 h1:GPUzcy3n+MoN8TcYN6McwMePazPXr9nB/qmnLTOW0iQ= -github.com/masa-finance/tee-types v1.1.3/go.mod h1:hF+wFRjmYuD0qkAZvH55BizPpiI7GiZCDqWkkclQ2sE= +github.com/masa-finance/tee-types v1.1.6 h1:vw5gOK2ZoCnsmrjdY9NCUR9GY9c0VxvzwQy5V4sNemo= +github.com/masa-finance/tee-types v1.1.6/go.mod h1:sB98t0axFlPi2d0zUPFZSQ84mPGwbr9eRY5yLLE3fSc= github.com/masa-finance/twitter-scraper v1.0.2 h1:him+wvYZHg/7EDdy73z1ceUywDJDRAhPLD2CSEa2Vfk= github.com/masa-finance/twitter-scraper v1.0.2/go.mod h1:38MY3g/h4V7Xl4HbW9lnkL8S3YiFZenBFv86hN57RG8= github.com/mattn/go-colorable v0.1.14 h1:9A9LHSqF/7dyVVX6g0U9cwm9pG3kP9gSzcuIPHPsaIE= github.com/mattn/go-colorable v0.1.14/go.mod h1:6LmQG8QLFO4G5z1gPvYEzlUgJ2wF+stgPZH1UqBm1s8= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= -github.com/onsi/ginkgo/v2 v2.23.3 h1:edHxnszytJ4lD9D5Jjc4tiDkPBZ3siDeJJkUZJJVkp0= -github.com/onsi/ginkgo/v2 v2.23.3/go.mod h1:zXTP6xIp3U8aVuXN8ENK9IXRaTjFnpVB9mGmaSRvxnM= -github.com/onsi/gomega v1.36.2 h1:koNYke6TVk6ZmnyHrCXba/T/MoLBXFjeC1PtvYgw0A8= -github.com/onsi/gomega v1.36.2/go.mod h1:DdwyADRjrc825LhMEkD76cHR5+pUnjhUN8GlHlRPHzY= +github.com/onsi/ginkgo/v2 v2.23.4 h1:ktYTpKJAVZnDT4VjxSbiBenUjmlL/5QkBEocaWXiQus= +github.com/onsi/ginkgo/v2 v2.23.4/go.mod h1:Bt66ApGPBFzHyR+JO10Zbt0Gsp4uWxu5mIOTusL46e8= +github.com/onsi/gomega v1.38.0 h1:c/WX+w8SLAinvuKKQFh77WEucCnPk4j2OTUr7lt7BeY= +github.com/onsi/gomega v1.38.0/go.mod h1:OcXcwId0b9QsE7Y49u+BTrL4IdKOBOKnD6VQNTJEB6o= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g= +github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U= github.com/saintfish/chardet v0.0.0-20230101081208-5e3ef4b5456d h1:hrujxIzL1woJ7AwssoOcM/tq5JjjG2yYOc8odClEiXA= github.com/saintfish/chardet v0.0.0-20230101081208-5e3ef4b5456d/go.mod h1:uugorj2VCxiV1x+LzaIdVa9b4S4qGAcH6cbhh4qVxOU= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= @@ -82,16 +89,19 @@ github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyC github.com/valyala/fasttemplate v1.2.2 h1:lxLXG0uE3Qnshl9QyaK6XJxMXlQZELvChBOCmQD0Loo= github.com/valyala/fasttemplate v1.2.2/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs= +go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc= golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8= golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70= -golang.org/x/crypto v0.36.0 h1:AnAEvhDddvBdpY+uR+MyHmuZzzNqXSe/GvuDeob5L34= -golang.org/x/crypto v0.36.0/go.mod h1:Y4J0ReaxCR1IMaabaSMugxJES1EpwhBHhv2bDHklZvc= -golang.org/x/exp v0.0.0-20240904232852-e7e105dedf7e h1:I88y4caeGeuDQxgdoFPUq097j7kNfw6uvuiNxUBfcBk= -golang.org/x/exp v0.0.0-20240904232852-e7e105dedf7e/go.mod h1:akd2r19cwCdwSwWeIdzYQGa/EZZyqcOdwWiwj5L5eKQ= +golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= +golang.org/x/crypto v0.41.0 h1:WKYxWedPGCTVVl5+WHSSrOBT0O8lx32+zxmHxijgXp4= +golang.org/x/crypto v0.41.0/go.mod h1:pO5AFd7FA68rFak7rOAGVuygIISepHftHnr8dr6+sUc= +golang.org/x/exp v0.0.0-20250718183923-645b1fa84792 h1:R9PFI6EUdfVKgwKjZef7QIwGcBKu86OEFpJ9nUEP2l4= +golang.org/x/exp v0.0.0-20250718183923-645b1fa84792/go.mod h1:A+z0yzpGtvnG90cToK5n2tu8UJVP2XUATh+r+sfOOOc= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= @@ -101,15 +111,14 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= -golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= -golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk= golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0= -golang.org/x/net v0.37.0 h1:1zLorHbz+LYj7MQlSf1+2tPIIgibq2eL5xkrGk6f+2c= -golang.org/x/net v0.37.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8= +golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4= +golang.org/x/net v0.43.0 h1:lat02VYK2j4aLzMzecihNvTlJNQUq316m2Mr9rnM6YE= +golang.org/x/net v0.43.0/go.mod h1:vhO1fvI4dGsIjh73sWfUVjj3N7CA9WkKJNQm2svM6Jg= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -117,6 +126,7 @@ golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -125,24 +135,24 @@ golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik= -golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI= +golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/telemetry v0.0.0-20240228155512-f48c80bd79b2/go.mod h1:TeRTkGYfJXctD9OcfyVLyj2J3IxLnKwHJR8f4D8a3YE= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= -golang.org/x/term v0.7.0/go.mod h1:P32HKFT3hSsZrRxla30E9HqToFYAQPCMs/zFMBUFqPY= golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= golang.org/x/term v0.12.0/go.mod h1:owVbMEjm3cBLCHdkQu9b1opXd4ETQWc3BhuQGKgXgvU= golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk= golang.org/x/term v0.20.0/go.mod h1:8UkIAJTvZgivsXaD6/pH6U9ecQzZ45awqEOzuCvwpFY= golang.org/x/term v0.24.0/go.mod h1:lOBK/LVxemqiMij05LGJ0tzNr8xlmwBRJ81PX6wVLH8= +golang.org/x/term v0.27.0/go.mod h1:iMsnZpn0cago0GOrHO2+Y7u7JPn5AylBrcoWkElMTSM= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= @@ -153,28 +163,30 @@ golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= -golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY= -golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4= -golang.org/x/time v0.11.0 h1:/bpjEDfN9tkoN/ryeYHnv5hcMlc8ncjMcM4XBk5NWV0= -golang.org/x/time v0.11.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg= +golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= +golang.org/x/text v0.28.0 h1:rhazDwis8INMIwQ4tpjLDzUhx6RlXqZNPEM0huQojng= +golang.org/x/text v0.28.0/go.mod h1:U8nCwOR8jO/marOQ0QbDiOngZVEBB7MAiitBuMjXiNU= +golang.org/x/time v0.12.0 h1:ScB/8o8olJvc+CQPWrK3fPZNfh7qgwCrY0zJmoEQLSE= +golang.org/x/time v0.12.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= golang.org/x/tools v0.13.0/go.mod h1:HvlwmtVNQAhOuCjW7xxvovg8wbNq7LwfXh/k7wXUl58= golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk= -golang.org/x/tools v0.30.0 h1:BgcpHewrV5AUp2G9MebG4XPFI1E2W41zU1SaqVA9vJY= -golang.org/x/tools v0.30.0/go.mod h1:c347cR/OJfw5TI+GfX7RUPNMdDRRbjvYTS0jPyvsVtY= +golang.org/x/tools v0.35.0 h1:mBffYraMEf7aa0sB+NuKnuCy8qI/9Bughn8dC2Gu5r0= +golang.org/x/tools v0.35.0/go.mod h1:NKdj5HkL/73byiZSJjqJgKn3ep7KjFkBOkR/Hps3VPw= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/appengine v1.6.8 h1:IhEN5q69dyKagZPYMSdIjS2HqprW324FRQZJcGqPAsM= google.golang.org/appengine v1.6.8/go.mod h1:1jJ3jBArFh5pcgW8gCtRJnepW8FzD1V44FJffLiz/Ds= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM= -google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +google.golang.org/protobuf v1.36.7 h1:IgrO7UwFQGJdRNXH/sQux4R1Dj1WAKcLElzeeRaXV2A= +google.golang.org/protobuf v1.36.7/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/api/api_test.go b/internal/api/api_test.go index e1f425ee..4746d343 100644 --- a/internal/api/api_test.go +++ b/internal/api/api_test.go @@ -88,17 +88,13 @@ var _ = Describe("API", func() { Expect(err).NotTo(HaveOccurred()) Expect(jobResult.UUID).NotTo(BeEmpty()) - // Step 4: Wait for the job result + // Step 4: Wait for the job result - should fail due to invalid URL encryptedResult, err := jobResult.Get() - Expect(err).NotTo(HaveOccurred()) - Expect(encryptedResult).NotTo(BeEmpty()) + Expect(err).To(HaveOccurred()) + Expect(encryptedResult).To(BeEmpty()) - // Step 5: Decrypt the result - decryptedResult, err := clientInstance.Decrypt(jobSignature, encryptedResult) - Expect(err).NotTo(HaveOccurred()) - Expect(decryptedResult).NotTo(BeEmpty()) - Expect(decryptedResult).NotTo(ContainSubstring("google")) - Expect(decryptedResult).To(ContainSubstring(`"pages":null`)) + // The error should be about URL scheme validation + Expect(err.Error()).To(ContainSubstring("URL must include a scheme")) }) It("should submit a job and get the correct result", func() { diff --git a/internal/capabilities/detector.go b/internal/capabilities/detector.go index 1290c6c8..bbdbb378 100644 --- a/internal/capabilities/detector.go +++ b/internal/capabilities/detector.go @@ -34,9 +34,11 @@ func DetectCapabilities(jc types.JobConfiguration, jobServer JobServerInterface) // Check what Twitter authentication methods are available accounts := jc.GetStringSlice("twitter_accounts", nil) apiKeys := jc.GetStringSlice("twitter_api_keys", nil) + apifyApiKey := jc.GetString("apify_api_key", "") hasAccounts := len(accounts) > 0 hasApiKeys := len(apiKeys) > 0 + hasApifyKey := apifyApiKey != "" // Add Twitter-specific capabilities based on available authentication if hasAccounts { @@ -56,8 +58,13 @@ func DetectCapabilities(jc types.JobConfiguration, jobServer JobServerInterface) capabilities[teetypes.TwitterApiJob] = apiCaps } + // Add Apify-specific capabilities based on available API key + if hasApifyKey { + capabilities[teetypes.TwitterApifyJob] = teetypes.TwitterApifyCaps + } + // Add general TwitterJob capability if any Twitter auth is available - if hasAccounts || hasApiKeys { + if hasAccounts || hasApiKeys || hasApifyKey { var twitterJobCaps []teetypes.Capability // Use the most comprehensive capabilities available if hasAccounts { @@ -73,6 +80,11 @@ func DetectCapabilities(jc types.JobConfiguration, jobServer JobServerInterface) } } + // Add Apify capabilities if available + if hasApifyKey { + twitterJobCaps = append(twitterJobCaps, teetypes.TwitterApifyCaps...) + } + capabilities[teetypes.TwitterJob] = twitterJobCaps } diff --git a/internal/capabilities/detector_test.go b/internal/capabilities/detector_test.go index 0bb616c8..eb26d9f5 100644 --- a/internal/capabilities/detector_test.go +++ b/internal/capabilities/detector_test.go @@ -30,16 +30,16 @@ func TestDetectCapabilities(t *testing.T) { jc: types.JobConfiguration{}, jobServer: &MockJobServer{ capabilities: teetypes.WorkerCapabilities{ - teetypes.WebJob: {teetypes.CapWebScraper}, + teetypes.WebJob: {teetypes.CapScraper}, teetypes.TelemetryJob: {teetypes.CapTelemetry}, - teetypes.TiktokJob: {teetypes.CapTiktokTranscription}, + teetypes.TiktokJob: {teetypes.CapTranscription}, teetypes.TwitterJob: {teetypes.CapSearchByQuery, teetypes.CapGetById, teetypes.CapGetProfileById}, }, }, expected: teetypes.WorkerCapabilities{ - teetypes.WebJob: {teetypes.CapWebScraper}, + teetypes.WebJob: {teetypes.CapScraper}, teetypes.TelemetryJob: {teetypes.CapTelemetry}, - teetypes.TiktokJob: {teetypes.CapTiktokTranscription}, + teetypes.TiktokJob: {teetypes.CapTranscription}, teetypes.TwitterJob: {teetypes.CapSearchByQuery, teetypes.CapGetById, teetypes.CapGetProfileById}, }, }, @@ -48,9 +48,9 @@ func TestDetectCapabilities(t *testing.T) { jc: types.JobConfiguration{}, jobServer: nil, expected: teetypes.WorkerCapabilities{ - teetypes.WebJob: {teetypes.CapWebScraper}, + teetypes.WebJob: {teetypes.CapScraper}, teetypes.TelemetryJob: {teetypes.CapTelemetry}, - teetypes.TiktokJob: {teetypes.CapTiktokTranscription}, + teetypes.TiktokJob: {teetypes.CapTranscription}, }, }, { @@ -60,9 +60,9 @@ func TestDetectCapabilities(t *testing.T) { }, jobServer: nil, expected: teetypes.WorkerCapabilities{ - teetypes.WebJob: {teetypes.CapWebScraper}, + teetypes.WebJob: {teetypes.CapScraper}, teetypes.TelemetryJob: {teetypes.CapTelemetry}, - teetypes.TiktokJob: {teetypes.CapTiktokTranscription}, + teetypes.TiktokJob: {teetypes.CapTranscription}, teetypes.TwitterCredentialJob: teetypes.TwitterCredentialCaps, teetypes.TwitterJob: teetypes.TwitterCredentialCaps, }, @@ -74,9 +74,9 @@ func TestDetectCapabilities(t *testing.T) { }, jobServer: nil, expected: teetypes.WorkerCapabilities{ - teetypes.WebJob: {teetypes.CapWebScraper}, + teetypes.WebJob: {teetypes.CapScraper}, teetypes.TelemetryJob: {teetypes.CapTelemetry}, - teetypes.TiktokJob: {teetypes.CapTiktokTranscription}, + teetypes.TiktokJob: {teetypes.CapTranscription}, teetypes.TwitterApiJob: teetypes.TwitterAPICaps, teetypes.TwitterJob: teetypes.TwitterAPICaps, }, @@ -88,9 +88,9 @@ func TestDetectCapabilities(t *testing.T) { }, jobServer: nil, expected: teetypes.WorkerCapabilities{ - teetypes.WebJob: {teetypes.CapWebScraper}, + teetypes.WebJob: {teetypes.CapScraper}, teetypes.TelemetryJob: {teetypes.CapTelemetry}, - teetypes.TiktokJob: {teetypes.CapTiktokTranscription}, + teetypes.TiktokJob: {teetypes.CapTranscription}, // Note: Mock elevated keys will be detected as basic since we can't make real API calls in tests teetypes.TwitterApiJob: teetypes.TwitterAPICaps, teetypes.TwitterJob: teetypes.TwitterAPICaps, diff --git a/internal/jobs/tiktok_transcription.go b/internal/jobs/tiktok_transcription.go index a7b1f6b9..d7a704d1 100644 --- a/internal/jobs/tiktok_transcription.go +++ b/internal/jobs/tiktok_transcription.go @@ -9,7 +9,7 @@ import ( "strings" "time" - "github.com/masa-finance/tee-types/args" + teeargs "github.com/masa-finance/tee-types/args" teetypes "github.com/masa-finance/tee-types/types" "github.com/masa-finance/tee-worker/api/types" "github.com/masa-finance/tee-worker/internal/jobs/stats" @@ -104,27 +104,38 @@ func (ttt *TikTokTranscriber) ExecuteJob(j types.Job) (types.JobResult, error) { return types.JobResult{Error: "TikTok transcription endpoint is not configured for the worker"}, fmt.Errorf("tiktok transcription endpoint not configured") } - args := &args.TikTokTranscriptionArguments{} - if err := j.Arguments.Unmarshal(args); err != nil { - ttt.stats.Add(j.WorkerID, stats.TikTokTranscriptionErrors, 1) + // Use the centralized type-safe unmarshaller + jobArgs, err := teeargs.UnmarshalJobArguments(teetypes.JobType(j.Type), map[string]any(j.Arguments)) + if err != nil { return types.JobResult{Error: "Failed to unmarshal job arguments"}, fmt.Errorf("unmarshal job arguments: %w", err) } - if args.VideoURL == "" { + // Type assert to TikTok arguments + tiktokArgs, ok := teeargs.AsTikTokArguments(jobArgs) + if !ok { + return types.JobResult{Error: "invalid argument type for TikTok job"}, fmt.Errorf("invalid argument type") + } + + // Use interface methods; no need to downcast + logrus.WithField("job_uuid", j.UUID).Infof("TikTok arguments validated: video_url=%s, language=%s, has_language_preference=%t", + tiktokArgs.GetVideoURL(), tiktokArgs.GetLanguageCode(), tiktokArgs.HasLanguagePreference()) + + // VideoURL validation is now handled by the unmarshaller, but we check again for safety + if tiktokArgs.GetVideoURL() == "" { ttt.stats.Add(j.WorkerID, stats.TikTokTranscriptionErrors, 1) return types.JobResult{Error: "VideoURL is required"}, fmt.Errorf("videoURL is required") } - // Sanitize/Validate VideoURL further if necessary (e.g., ensure it's a TikTok URL) - // Placeholder for language selection logic - selectedLanguageKey := args.Language - if selectedLanguageKey == "" { - selectedLanguageKey = ttt.configuration.DefaultLanguage + // Use the enhanced language selection logic + selectedLanguageKey := tiktokArgs.GetLanguageCode() // This handles defaults automatically + if tiktokArgs.HasLanguagePreference() { + logrus.WithField("job_uuid", j.UUID).Infof("Using custom language preference: %s", selectedLanguageKey) + } else { + logrus.WithField("job_uuid", j.UUID).Infof("Using default language: %s", selectedLanguageKey) } - // If still empty, a hardcoded default like "eng-US" or first available will be used later // Sub-Step 3.1: Call TikTok Transcription API - apiRequestBody := map[string]string{"url": args.VideoURL} + apiRequestBody := map[string]string{"url": tiktokArgs.GetVideoURL()} jsonBody, err := json.Marshal(apiRequestBody) if err != nil { ttt.stats.Add(j.WorkerID, stats.TikTokTranscriptionErrors, 1) @@ -149,7 +160,7 @@ func (ttt *TikTokTranscriber) ExecuteJob(j types.Job) (types.JobResult, error) { logrus.WithFields(logrus.Fields{ "job_uuid": j.UUID, - "url": args.VideoURL, + "url": tiktokArgs.GetVideoURL(), "method": "POST", "api_endpoint": ttt.configuration.TranscriptionEndpoint, }).Info("Calling TikTok Transcription API") @@ -246,7 +257,7 @@ func (ttt *TikTokTranscriber) ExecuteJob(j types.Job) (types.JobResult, error) { TranscriptionText: plainTextTranscription, DetectedLanguage: finalDetectedLanguage, VideoTitle: parsedAPIResponse.VideoTitle, - OriginalURL: args.VideoURL, + OriginalURL: tiktokArgs.GetVideoURL(), ThumbnailURL: parsedAPIResponse.ThumbnailURL, } diff --git a/internal/jobs/tiktok_transcription_test.go b/internal/jobs/tiktok_transcription_test.go index 33ab83f8..0b3b9aad 100644 --- a/internal/jobs/tiktok_transcription_test.go +++ b/internal/jobs/tiktok_transcription_test.go @@ -111,7 +111,7 @@ var _ = Describe("TikTokTranscriber", func() { }) Context("when arguments are invalid", func() { - It("should return an error if VideoURL is empty and record error stats", func() { + It("should return an error if VideoURL is empty and not record error stats", func() { jobArguments := map[string]interface{}{ "video_url": "", // Empty URL } @@ -129,7 +129,7 @@ var _ = Describe("TikTokTranscriber", func() { By("Checking for job execution errors") Expect(err).To(HaveOccurred(), "An error should occur for empty VideoURL") Expect(res.Error).NotTo(BeEmpty(), "JobResult.Error should detail the validation failure") - Expect(res.Error).To(ContainSubstring("VideoURL is required")) + Expect(res.Error).To(ContainSubstring("Failed to unmarshal job arguments")) Expect(res.Data).To(BeNil()) By("Verifying error statistics") @@ -142,7 +142,7 @@ var _ = Describe("TikTokTranscriber", func() { return 0 } return workerStatsMap[stats.TikTokTranscriptionErrors] - }, 5*time.Second, 100*time.Millisecond).Should(BeNumerically("==", 1), "TikTokTranscriptionErrors count should be 1") + }, 5*time.Second, 100*time.Millisecond).Should(BeNumerically("==", 0), "TikTokTranscriptionErrors count should be 0") Eventually(func() uint { if statsCollector == nil || statsCollector.Stats == nil || statsCollector.Stats.Stats == nil { diff --git a/internal/jobs/twitter.go b/internal/jobs/twitter.go index b5e37e39..a84f235b 100644 --- a/internal/jobs/twitter.go +++ b/internal/jobs/twitter.go @@ -10,6 +10,7 @@ import ( "time" "github.com/masa-finance/tee-types/args" + teeargs "github.com/masa-finance/tee-types/args" teetypes "github.com/masa-finance/tee-types/types" "github.com/masa-finance/tee-worker/internal/jobs/twitterx" @@ -19,6 +20,7 @@ import ( "github.com/masa-finance/tee-worker/api/types" "github.com/masa-finance/tee-worker/internal/jobs/stats" "github.com/masa-finance/tee-worker/internal/jobs/twitter" + "github.com/masa-finance/tee-worker/internal/jobs/twitterapify" "github.com/sirupsen/logrus" ) @@ -103,60 +105,60 @@ func parseApiKeys(apiKeys []string) []*twitter.TwitterApiKey { }) } -func (ts *TwitterScraper) getAuthenticatedScraper(j types.Job, baseDir string, jobType teetypes.JobType) (*twitter.Scraper, *twitter.TwitterAccount, *twitter.TwitterApiKey, error) { +// getCredentialScraper returns a credential-based scraper and account +func (ts *TwitterScraper) getCredentialScraper(j types.Job, baseDir string) (*twitter.Scraper, *twitter.TwitterAccount, error) { if baseDir == "" { baseDir = ts.configuration.DataDir } - var account *twitter.TwitterAccount - var apiKey *twitter.TwitterApiKey - var scraper *twitter.Scraper + account := ts.accountManager.GetNextAccount() + if account == nil { + ts.statsCollector.Add(j.WorkerID, stats.TwitterAuthErrors, 1) + return nil, nil, fmt.Errorf("no Twitter credentials available") + } - switch jobType { - case teetypes.TwitterCredentialJob: - account = ts.accountManager.GetNextAccount() - if account == nil { - ts.statsCollector.Add(j.WorkerID, stats.TwitterAuthErrors, 1) - return nil, nil, nil, fmt.Errorf("no Twitter credentials available for credential-based scraping") - } - case teetypes.TwitterApiJob: - apiKey = ts.accountManager.GetNextApiKey() - if apiKey == nil { - ts.statsCollector.Add(j.WorkerID, stats.TwitterAuthErrors, 1) - return nil, nil, nil, fmt.Errorf("no Twitter API keys available for API-based scraping") - } - case teetypes.TwitterJob: - logrus.Debug("Using standard Twitter scraper - prefer credentials if available") - account = ts.accountManager.GetNextAccount() - if account == nil { - apiKey = ts.accountManager.GetNextApiKey() - if apiKey == nil { - ts.statsCollector.Add(j.WorkerID, stats.TwitterAuthErrors, 1) - return nil, nil, nil, fmt.Errorf("no Twitter accounts or API keys available") - } - } - default: - return nil, nil, nil, fmt.Errorf("unsupported job type: %s", jobType) + authConfig := twitter.AuthConfig{ + Account: account, + BaseDir: baseDir, + SkipLoginVerification: ts.configuration.SkipLoginVerification, + } + scraper := twitter.NewScraper(authConfig) + if scraper == nil { + ts.statsCollector.Add(j.WorkerID, stats.TwitterAuthErrors, 1) + logrus.Errorf("Authentication failed for %s", account.Username) + return nil, account, fmt.Errorf("twitter authentication failed for %s", account.Username) } - if account != nil { - authConfig := twitter.AuthConfig{ - Account: account, - BaseDir: baseDir, - SkipLoginVerification: ts.configuration.SkipLoginVerification, - } - scraper = twitter.NewScraper(authConfig) - if scraper == nil { - ts.statsCollector.Add(j.WorkerID, stats.TwitterAuthErrors, 1) - logrus.Errorf("Authentication failed for %s", account.Username) - return nil, account, nil, fmt.Errorf("twitter authentication failed for %s", account.Username) - } - } else if apiKey != nil { - logrus.Info("Using API key only for this request") - } else { - return nil, nil, nil, fmt.Errorf("no authentication method available after selection logic") + return scraper, account, nil +} + +// getApiScraper returns a TwitterX API scraper and API key +func (ts *TwitterScraper) getApiScraper(j types.Job) (*twitterx.TwitterXScraper, *twitter.TwitterApiKey, error) { + apiKey := ts.accountManager.GetNextApiKey() + if apiKey == nil { + ts.statsCollector.Add(j.WorkerID, stats.TwitterAuthErrors, 1) + return nil, nil, fmt.Errorf("no Twitter API keys available") + } + + apiClient := client.NewTwitterXClient(apiKey.Key) + twitterXScraper := twitterx.NewTwitterXScraper(apiClient) + + return twitterXScraper, apiKey, nil +} + +// getApifyScraper returns an Apify client +func (ts *TwitterScraper) getApifyScraper(j types.Job) (*twitterapify.TwitterApifyClient, error) { + if ts.configuration.ApifyApiKey == "" { + ts.statsCollector.Add(j.WorkerID, stats.TwitterAuthErrors, 1) + return nil, fmt.Errorf("no Apify API key available") + } + + apifyScraper, err := twitterapify.NewTwitterApifyClient(ts.configuration.ApifyApiKey) + if err != nil { + ts.statsCollector.Add(j.WorkerID, stats.TwitterAuthErrors, 1) + return nil, fmt.Errorf("failed to create apify scraper: %w", err) } - return scraper, account, apiKey, nil + return apifyScraper, nil } func (ts *TwitterScraper) handleError(j types.Job, err error, account *twitter.TwitterAccount) bool { @@ -185,13 +187,10 @@ func filterMap[T any, R any](slice []T, f func(T) (R, bool)) []R { } func (ts *TwitterScraper) ScrapeFollowersForProfile(j types.Job, baseDir string, username string, count int) ([]*twitterscraper.Profile, error) { - scraper, account, _, err := ts.getAuthenticatedScraper(j, baseDir, teetypes.TwitterJob) + scraper, account, err := ts.getCredentialScraper(j, baseDir) if err != nil { return nil, err } - if scraper == nil { - return nil, fmt.Errorf("scraper not initialized for ScrapeFollowersForProfile") - } ts.statsCollector.Add(j.WorkerID, stats.TwitterScrapes, 1) followingResponse, errString, _ := scraper.FetchFollowers(username, count, "") @@ -210,15 +209,11 @@ func (ts *TwitterScraper) ScrapeFollowersForProfile(j types.Job, baseDir string, func (ts *TwitterScraper) ScrapeTweetsProfile(j types.Job, baseDir string, username string) (twitterscraper.Profile, error) { logrus.Infof("[ScrapeTweetsProfile] Starting profile scraping for username: %s", username) - scraper, account, _, err := ts.getAuthenticatedScraper(j, baseDir, teetypes.TwitterJob) + scraper, account, err := ts.getCredentialScraper(j, baseDir) if err != nil { - logrus.Errorf("[ScrapeTweetsProfile] Failed to get authenticated scraper: %v", err) + logrus.Errorf("[ScrapeTweetsProfile] Failed to get credential scraper: %v", err) return twitterscraper.Profile{}, err } - if scraper == nil { - logrus.Errorf("[ScrapeTweetsProfile] Scraper is nil after authentication") - return twitterscraper.Profile{}, fmt.Errorf("scraper not initialized for ScrapeTweetsProfile") - } logrus.Infof("[ScrapeTweetsProfile] About to increment TwitterScrapes stat for WorkerID: %s", j.WorkerID) ts.statsCollector.Add(j.WorkerID, stats.TwitterScrapes, 1) @@ -248,39 +243,35 @@ func (ts *TwitterScraper) ScrapeTweetsByRecentSearchQuery(j types.Job, baseDir s } func (ts *TwitterScraper) queryTweets(j types.Job, baseQueryEndpoint string, baseDir string, query string, count int) ([]*teetypes.TweetResult, error) { - scraper, account, apiKey, err := ts.getAuthenticatedScraper(j, baseDir, teetypes.TwitterJob) - if err != nil { - return nil, err + // Try credentials first, fallback to API for CapSearchByQuery + scraper, account, err := ts.getCredentialScraper(j, baseDir) + if err == nil { + return ts.scrapeTweetsWithCredentials(j, query, count, scraper, account) } - if account != nil && scraper != nil { - return ts.scrapeTweetsWithCredentials(j, query, count, scraper, account) - } else if apiKey != nil { - return ts.scrapeTweetsWithApiKey(j, baseQueryEndpoint, query, count, apiKey) + // Fallback to API + twitterXScraper, apiKey, apiErr := ts.getApiScraper(j) + if apiErr != nil { + ts.statsCollector.Add(j.WorkerID, stats.TwitterAuthErrors, 1) + return nil, fmt.Errorf("no Twitter accounts or API keys available") } - return nil, fmt.Errorf("no valid authentication method (credentials or API key) found by getAuthenticatedScraper for queryTweets") + return ts.scrapeTweets(j, baseQueryEndpoint, query, count, twitterXScraper, apiKey) } func (ts *TwitterScraper) queryTweetsWithCredentials(j types.Job, baseDir string, query string, count int) ([]*teetypes.TweetResult, error) { - scraper, account, _, err := ts.getAuthenticatedScraper(j, baseDir, teetypes.TwitterCredentialJob) + scraper, account, err := ts.getCredentialScraper(j, baseDir) if err != nil { return nil, err } - if scraper == nil { - return nil, fmt.Errorf("scraper not initialized for queryTweetsWithCredentials") - } return ts.scrapeTweetsWithCredentials(j, query, count, scraper, account) } func (ts *TwitterScraper) queryTweetsWithApiKey(j types.Job, baseQueryEndpoint string, baseDir string, query string, count int) ([]*teetypes.TweetResult, error) { - _, _, apiKey, err := ts.getAuthenticatedScraper(j, baseDir, teetypes.TwitterApiJob) + twitterXScraper, apiKey, err := ts.getApiScraper(j) if err != nil { return nil, err } - if apiKey == nil { - return nil, fmt.Errorf("API key not available for queryTweetsWithApiKey") - } - return ts.scrapeTweetsWithApiKey(j, baseQueryEndpoint, query, count, apiKey) + return ts.scrapeTweets(j, baseQueryEndpoint, query, count, twitterXScraper, apiKey) } func (ts *TwitterScraper) scrapeTweetsWithCredentials(j types.Job, query string, count int, scraper *twitter.Scraper, account *twitter.TwitterAccount) ([]*teetypes.TweetResult, error) { @@ -305,15 +296,14 @@ func (ts *TwitterScraper) scrapeTweetsWithCredentials(j types.Job, query string, return tweets, nil } -func (ts *TwitterScraper) scrapeTweetsWithApiKey(j types.Job, baseQueryEndpoint string, query string, count int, apiKey *twitter.TwitterApiKey) ([]*teetypes.TweetResult, error) { +// scrapeTweets uses an existing scraper instance +func (ts *TwitterScraper) scrapeTweets(j types.Job, baseQueryEndpoint string, query string, count int, twitterXScraper *twitterx.TwitterXScraper, apiKey *twitter.TwitterApiKey) ([]*teetypes.TweetResult, error) { ts.statsCollector.Add(j.WorkerID, stats.TwitterScrapes, 1) if baseQueryEndpoint == twitterx.TweetsAll && apiKey.Type == twitter.TwitterApiKeyTypeBase { return nil, fmt.Errorf("this API key is a base/Basic key and does not have access to full archive search. Please use an elevated/Pro API key") } - apiClient := client.NewTwitterXClient(apiKey.Key) - twitterXScraper := twitterx.NewTwitterXScraper(apiClient) tweets := make([]*teetypes.TweetResult, 0, count) cursor := "" @@ -404,16 +394,19 @@ EndLoop: return tweets, nil } +func (ts *TwitterScraper) scrapeTweetsWithApiKey(j types.Job, baseQueryEndpoint string, query string, count int, apiKey *twitter.TwitterApiKey) ([]*teetypes.TweetResult, error) { + apiClient := client.NewTwitterXClient(apiKey.Key) + twitterXScraper := twitterx.NewTwitterXScraper(apiClient) + return ts.scrapeTweets(j, baseQueryEndpoint, query, count, twitterXScraper, apiKey) +} + func (ts *TwitterScraper) ScrapeTweetByID(j types.Job, baseDir string, tweetID string) (*teetypes.TweetResult, error) { ts.statsCollector.Add(j.WorkerID, stats.TwitterScrapes, 1) - scraper, account, _, err := ts.getAuthenticatedScraper(j, baseDir, teetypes.TwitterJob) + scraper, account, err := ts.getCredentialScraper(j, baseDir) if err != nil { return nil, err } - if scraper == nil { - return nil, fmt.Errorf("scraper not initialized for ScrapeTweetByID") - } tweet, err := scraper.GetTweet(tweetID) if err != nil { @@ -430,13 +423,10 @@ func (ts *TwitterScraper) ScrapeTweetByID(j types.Job, baseDir string, tweetID s } func (ts *TwitterScraper) GetTweet(j types.Job, baseDir, tweetID string) (*teetypes.TweetResult, error) { - scraper, account, _, err := ts.getAuthenticatedScraper(j, baseDir, teetypes.TwitterJob) + scraper, account, err := ts.getCredentialScraper(j, baseDir) if err != nil { return nil, err } - if scraper == nil { - return nil, fmt.Errorf("scraper not initialized for GetTweet") - } ts.statsCollector.Add(j.WorkerID, stats.TwitterScrapes, 1) scrapedTweet, err := scraper.GetTweet(tweetID) @@ -453,13 +443,10 @@ func (ts *TwitterScraper) GetTweet(j types.Job, baseDir, tweetID string) (*teety } func (ts *TwitterScraper) GetTweetReplies(j types.Job, baseDir, tweetID string, cursor string) ([]*teetypes.TweetResult, error) { - scraper, account, _, err := ts.getAuthenticatedScraper(j, baseDir, teetypes.TwitterJob) + scraper, account, err := ts.getCredentialScraper(j, baseDir) if err != nil { return nil, err } - if scraper == nil { - return nil, fmt.Errorf("scraper not initialized for GetTweetReplies") - } ts.statsCollector.Add(j.WorkerID, stats.TwitterScrapes, 1) var replies []*teetypes.TweetResult @@ -488,13 +475,10 @@ func (ts *TwitterScraper) GetTweetReplies(j types.Job, baseDir, tweetID string, } func (ts *TwitterScraper) GetTweetRetweeters(j types.Job, baseDir, tweetID string, count int, cursor string) ([]*twitterscraper.Profile, error) { - scraper, account, _, err := ts.getAuthenticatedScraper(j, baseDir, teetypes.TwitterJob) + scraper, account, err := ts.getCredentialScraper(j, baseDir) if err != nil { return nil, err } - if scraper == nil { - return nil, fmt.Errorf("scraper not initialized for GetTweetRetweeters") - } ts.statsCollector.Add(j.WorkerID, stats.TwitterScrapes, 1) retweeters, _, err := scraper.GetTweetRetweeters(tweetID, count, cursor) @@ -508,13 +492,10 @@ func (ts *TwitterScraper) GetTweetRetweeters(j types.Job, baseDir, tweetID strin } func (ts *TwitterScraper) GetUserTweets(j types.Job, baseDir, username string, count int, cursor string) ([]*teetypes.TweetResult, string, error) { - scraper, account, _, err := ts.getAuthenticatedScraper(j, baseDir, teetypes.TwitterJob) + scraper, account, err := ts.getCredentialScraper(j, baseDir) if err != nil { return nil, "", err } - if scraper == nil { - return nil, "", fmt.Errorf("scraper not initialized for GetUserTweets") - } ts.statsCollector.Add(j.WorkerID, stats.TwitterScrapes, 1) var tweets []*teetypes.TweetResult @@ -551,13 +532,10 @@ func (ts *TwitterScraper) GetUserTweets(j types.Job, baseDir, username string, c } func (ts *TwitterScraper) GetUserMedia(j types.Job, baseDir, username string, count int, cursor string) ([]*teetypes.TweetResult, string, error) { - scraper, account, _, err := ts.getAuthenticatedScraper(j, baseDir, teetypes.TwitterJob) + scraper, account, err := ts.getCredentialScraper(j, baseDir) if err != nil { return nil, "", err } - if scraper == nil { - return nil, "", fmt.Errorf("scraper not initialized for GetUserMedia") - } ts.statsCollector.Add(j.WorkerID, stats.TwitterScrapes, 1) var media []*teetypes.TweetResult @@ -615,13 +593,10 @@ func (ts *TwitterScraper) GetUserMedia(j types.Job, baseDir, username string, co } func (ts *TwitterScraper) GetHomeTweets(j types.Job, baseDir string, count int, cursor string) ([]*teetypes.TweetResult, string, error) { - scraper, account, _, err := ts.getAuthenticatedScraper(j, baseDir, teetypes.TwitterJob) + scraper, account, err := ts.getCredentialScraper(j, baseDir) if err != nil { return nil, "", err } - if scraper == nil { - return nil, "", fmt.Errorf("scraper not initialized for GetHomeTweets") - } ts.statsCollector.Add(j.WorkerID, stats.TwitterScrapes, 1) var tweets []*teetypes.TweetResult @@ -661,13 +636,10 @@ func (ts *TwitterScraper) GetHomeTweets(j types.Job, baseDir string, count int, } func (ts *TwitterScraper) GetForYouTweets(j types.Job, baseDir string, count int, cursor string) ([]*teetypes.TweetResult, string, error) { - scraper, account, _, err := ts.getAuthenticatedScraper(j, baseDir, teetypes.TwitterJob) + scraper, account, err := ts.getCredentialScraper(j, baseDir) if err != nil { return nil, "", err } - if scraper == nil { - return nil, "", fmt.Errorf("scraper not initialized for GetForYouTweets") - } ts.statsCollector.Add(j.WorkerID, stats.TwitterScrapes, 1) var tweets []*teetypes.TweetResult @@ -707,13 +679,10 @@ func (ts *TwitterScraper) GetForYouTweets(j types.Job, baseDir string, count int } func (ts *TwitterScraper) GetBookmarks(j types.Job, baseDir string, count int, cursor string) ([]*teetypes.TweetResult, string, error) { - scraper, account, _, err := ts.getAuthenticatedScraper(j, baseDir, teetypes.TwitterJob) + scraper, account, err := ts.getCredentialScraper(j, baseDir) if err != nil { return nil, "", err } - if scraper == nil { - return nil, "", fmt.Errorf("scraper not initialized for GetBookmarks") - } ts.statsCollector.Add(j.WorkerID, stats.TwitterScrapes, 1) var bookmarks []*teetypes.TweetResult @@ -755,13 +724,10 @@ func (ts *TwitterScraper) GetBookmarks(j types.Job, baseDir string, count int, c } func (ts *TwitterScraper) GetProfileByID(j types.Job, baseDir, userID string) (*twitterscraper.Profile, error) { - scraper, account, _, err := ts.getAuthenticatedScraper(j, baseDir, teetypes.TwitterJob) + scraper, account, err := ts.getCredentialScraper(j, baseDir) if err != nil { return nil, err } - if scraper == nil { - return nil, fmt.Errorf("scraper not initialized for GetProfileByID") - } ts.statsCollector.Add(j.WorkerID, stats.TwitterScrapes, 1) profile, err := scraper.GetProfileByID(userID) @@ -845,13 +811,10 @@ func (ts *TwitterScraper) GetTweetByIDWithApiKey(j types.Job, tweetID string, ap } func (ts *TwitterScraper) SearchProfile(j types.Job, query string, count int) ([]*twitterscraper.ProfileResult, error) { - scraper, _, _, err := ts.getAuthenticatedScraper(j, ts.configuration.DataDir, teetypes.TwitterJob) + scraper, _, err := ts.getCredentialScraper(j, ts.configuration.DataDir) if err != nil { return nil, err } - if scraper == nil { - return nil, fmt.Errorf("scraper not initialized for SearchProfile") - } ts.statsCollector.Add(j.WorkerID, stats.TwitterScrapes, 1) var profiles []*twitterscraper.ProfileResult @@ -869,13 +832,10 @@ func (ts *TwitterScraper) SearchProfile(j types.Job, query string, count int) ([ } func (ts *TwitterScraper) GetTrends(j types.Job, baseDir string) ([]string, error) { - scraper, account, _, err := ts.getAuthenticatedScraper(j, baseDir, teetypes.TwitterJob) + scraper, account, err := ts.getCredentialScraper(j, baseDir) if err != nil { return nil, err } - if scraper == nil { - return nil, fmt.Errorf("scraper not initialized for GetTrends") - } ts.statsCollector.Add(j.WorkerID, stats.TwitterScrapes, 1) trends, err := scraper.GetTrends() @@ -888,13 +848,10 @@ func (ts *TwitterScraper) GetTrends(j types.Job, baseDir string) ([]string, erro } func (ts *TwitterScraper) GetFollowers(j types.Job, baseDir, user string, count int, cursor string) ([]*twitterscraper.Profile, string, error) { - scraper, account, _, err := ts.getAuthenticatedScraper(j, baseDir, teetypes.TwitterJob) + scraper, account, err := ts.getCredentialScraper(j, baseDir) if err != nil { return nil, "", err } - if scraper == nil { - return nil, "", fmt.Errorf("scraper not initialized for GetFollowers") - } ts.statsCollector.Add(j.WorkerID, stats.TwitterScrapes, 1) followers, nextCursor, fetchErr := scraper.FetchFollowers(user, count, cursor) @@ -907,13 +864,10 @@ func (ts *TwitterScraper) GetFollowers(j types.Job, baseDir, user string, count } func (ts *TwitterScraper) GetFollowing(j types.Job, baseDir, username string, count int) ([]*twitterscraper.Profile, error) { - scraper, account, _, err := ts.getAuthenticatedScraper(j, baseDir, teetypes.TwitterJob) + scraper, account, err := ts.getCredentialScraper(j, baseDir) if err != nil { return nil, err } - if scraper == nil { - return nil, fmt.Errorf("scraper not initialized for GetFollowing") - } ts.statsCollector.Add(j.WorkerID, stats.TwitterScrapes, 1) following, _, fetchErr := scraper.FetchFollowing(username, count, "") @@ -925,14 +879,47 @@ func (ts *TwitterScraper) GetFollowing(j types.Job, baseDir, username string, co return following, nil } +// getFollowersApify retrieves followers using Apify +func (ts *TwitterScraper) getFollowersApify(j types.Job, username string, maxResults int, cursor string) ([]*teetypes.ProfileResultApify, string, error) { + apifyScraper, err := ts.getApifyScraper(j) + if err != nil { + return nil, "", err + } + + ts.statsCollector.Add(j.WorkerID, stats.TwitterScrapes, 1) + + followers, nextCursor, err := apifyScraper.GetFollowers(username, maxResults, cursor) + if err != nil { + return nil, "", err + } + + ts.statsCollector.Add(j.WorkerID, stats.TwitterProfiles, uint(len(followers))) + return followers, nextCursor, nil +} + +// getFollowingApify retrieves following using Apify +func (ts *TwitterScraper) getFollowingApify(j types.Job, username string, maxResults int, cursor string) ([]*teetypes.ProfileResultApify, string, error) { + apifyScraper, err := ts.getApifyScraper(j) + if err != nil { + return nil, "", err + } + + ts.statsCollector.Add(j.WorkerID, stats.TwitterScrapes, 1) + + following, nextCursor, err := apifyScraper.GetFollowing(username, maxResults, cursor) + if err != nil { + return nil, "", err + } + + ts.statsCollector.Add(j.WorkerID, stats.TwitterProfiles, uint(len(following))) + return following, nextCursor, nil +} + func (ts *TwitterScraper) GetSpace(j types.Job, baseDir, spaceID string) (*twitterscraper.Space, error) { - scraper, account, _, err := ts.getAuthenticatedScraper(j, baseDir, teetypes.TwitterJob) + scraper, account, err := ts.getCredentialScraper(j, baseDir) if err != nil { return nil, err } - if scraper == nil { - return nil, fmt.Errorf("scraper not initialized for GetSpace") - } ts.statsCollector.Add(j.WorkerID, stats.TwitterScrapes, 1) space, err := scraper.GetSpace(spaceID) @@ -944,35 +931,75 @@ func (ts *TwitterScraper) GetSpace(j types.Job, baseDir, spaceID string) (*twitt return space, nil } -type TwitterScraper struct { - configuration struct { - Accounts []string `json:"twitter_accounts"` - ApiKeys []string `json:"twitter_api_keys"` - DataDir string `json:"data_dir"` - SkipLoginVerification bool `json:"skip_login_verification,omitempty"` +func (ts *TwitterScraper) FetchHomeTweets(j types.Job, baseDir string, count int, cursor string) ([]*twitterscraper.Tweet, string, error) { + scraper, account, err := ts.getCredentialScraper(j, baseDir) + if err != nil { + return nil, "", err + } + + ts.statsCollector.Add(j.WorkerID, stats.TwitterScrapes, 1) + tweets, nextCursor, fetchErr := scraper.FetchHomeTweets(count, cursor) + if fetchErr != nil { + _ = ts.handleError(j, fetchErr, account) + return nil, "", fetchErr + } + + ts.statsCollector.Add(j.WorkerID, stats.TwitterTweets, uint(len(tweets))) + return tweets, nextCursor, nil +} + +func (ts *TwitterScraper) FetchForYouTweets(j types.Job, baseDir string, count int, cursor string) ([]*twitterscraper.Tweet, string, error) { + scraper, account, err := ts.getCredentialScraper(j, baseDir) + if err != nil { + return nil, "", err + } + + ts.statsCollector.Add(j.WorkerID, stats.TwitterScrapes, 1) + tweets, nextCursor, fetchErr := scraper.FetchForYouTweets(count, cursor) + if fetchErr != nil { + _ = ts.handleError(j, fetchErr, account) + return nil, "", fetchErr } + + ts.statsCollector.Add(j.WorkerID, stats.TwitterTweets, uint(len(tweets))) + return tweets, nextCursor, nil +} + +// TwitterScraperConfig is now defined in api/types to avoid duplication and circular imports + +// twitterScraperRuntimeConfig holds the runtime configuration without JSON tags to prevent credential serialization +// Unified config: use types.TwitterScraperConfig directly + +type TwitterScraper struct { + configuration types.TwitterScraperConfig accountManager *twitter.TwitterAccountManager statsCollector *stats.StatsCollector capabilities map[teetypes.Capability]bool } func NewTwitterScraper(jc types.JobConfiguration, c *stats.StatsCollector) *TwitterScraper { - config := struct { - Accounts []string `json:"twitter_accounts"` - ApiKeys []string `json:"twitter_api_keys"` - DataDir string `json:"data_dir"` - SkipLoginVerification bool `json:"skip_login_verification,omitempty"` - }{} - if err := jc.Unmarshal(&config); err != nil { - logrus.Errorf("Error unmarshalling Twitter scraper configuration: %v", err) - return nil - } + // Use direct config access instead of JSON marshaling/unmarshaling + config := jc.GetTwitterConfig() accounts := parseAccounts(config.Accounts) apiKeys := parseApiKeys(config.ApiKeys) accountManager := twitter.NewTwitterAccountManager(accounts, apiKeys) accountManager.DetectAllApiKeyTypes() + // Validate Apify API key at startup if provided (similar to API key detection) + if config.ApifyApiKey != "" { + apifyScraper, err := twitterapify.NewTwitterApifyClient(config.ApifyApiKey) + if err != nil { + logrus.Errorf("Failed to create Apify scraper at startup: %v", err) + // Don't fail startup, just log the error - the key might work later or be temporary + } else if err := apifyScraper.ValidateApiKey(); err != nil { + logrus.Errorf("Apify API key validation failed at startup: %v", err) + // Don't fail startup, just log the error - the key might work later or be temporary + } else { + logrus.Infof("Apify API key validated successfully at startup") + } + } + if os.Getenv("TWITTER_SKIP_LOGIN_VERIFICATION") == "true" { config.SkipLoginVerification = true } @@ -1037,6 +1064,11 @@ func (ts *TwitterScraper) GetStructuredCapabilities() teetypes.WorkerCapabilitie capabilities[teetypes.TwitterApiJob] = apiCaps } + // Add Apify-specific capabilities based on available API key + if ts.configuration.ApifyApiKey != "" { + capabilities[teetypes.TwitterApifyJob] = teetypes.TwitterApifyCaps + } + // Add general twitter scraper capability (uses best available method) if len(ts.configuration.Accounts) > 0 || len(ts.configuration.ApiKeys) > 0 { var generalCaps []teetypes.Capability @@ -1078,6 +1110,8 @@ func getScrapeStrategy(jobType teetypes.JobType) TwitterScrapeStrategy { return &CredentialScrapeStrategy{} case teetypes.TwitterApiJob: return &ApiKeyScrapeStrategy{} + case teetypes.TwitterApifyJob: + return &ApifyScrapeStrategy{} default: return &DefaultScrapeStrategy{} } @@ -1086,11 +1120,12 @@ func getScrapeStrategy(jobType teetypes.JobType) TwitterScrapeStrategy { type CredentialScrapeStrategy struct{} func (s *CredentialScrapeStrategy) Execute(j types.Job, ts *TwitterScraper, jobArgs *args.TwitterSearchArguments) (types.JobResult, error) { - switch strings.ToLower(jobArgs.QueryType) { - case "searchbyquery": + capability := jobArgs.GetCapability() + switch capability { + case teetypes.CapSearchByQuery: tweets, err := ts.queryTweetsWithCredentials(j, ts.configuration.DataDir, jobArgs.Query, jobArgs.MaxResults) return processResponse(tweets, "", err) - case "searchbyfullarchive": + case teetypes.CapSearchByFullArchive: logrus.Warn("Full archive search with credential-only implementation may have limited results") tweets, err := ts.queryTweetsWithCredentials(j, ts.configuration.DataDir, jobArgs.Query, jobArgs.MaxResults) return processResponse(tweets, "", err) @@ -1102,31 +1137,26 @@ func (s *CredentialScrapeStrategy) Execute(j types.Job, ts *TwitterScraper, jobA type ApiKeyScrapeStrategy struct{} func (s *ApiKeyScrapeStrategy) Execute(j types.Job, ts *TwitterScraper, jobArgs *args.TwitterSearchArguments) (types.JobResult, error) { - switch strings.ToLower(jobArgs.QueryType) { - case "searchbyquery": + capability := jobArgs.GetCapability() + switch capability { + case teetypes.CapSearchByQuery: tweets, err := ts.queryTweetsWithApiKey(j, twitterx.TweetsSearchRecent, ts.configuration.DataDir, jobArgs.Query, jobArgs.MaxResults) return processResponse(tweets, "", err) - case "searchbyfullarchive": + case teetypes.CapSearchByFullArchive: tweets, err := ts.queryTweetsWithApiKey(j, twitterx.TweetsAll, ts.configuration.DataDir, jobArgs.Query, jobArgs.MaxResults) return processResponse(tweets, "", err) - case "getprofilebyid": - _, _, apiKey, err := ts.getAuthenticatedScraper(j, ts.configuration.DataDir, teetypes.TwitterApiJob) + case teetypes.CapGetProfileById: + _, apiKey, err := ts.getApiScraper(j) if err != nil { return types.JobResult{Error: err.Error()}, err } - if apiKey == nil { - return types.JobResult{Error: "no API key available"}, fmt.Errorf("no API key available") - } profile, err := ts.GetProfileByIDWithApiKey(j, jobArgs.Query, apiKey) return processResponse(profile, "", err) - case "getbyid": - _, _, apiKey, err := ts.getAuthenticatedScraper(j, ts.configuration.DataDir, teetypes.TwitterApiJob) + case teetypes.CapGetById: + _, apiKey, err := ts.getApiScraper(j) if err != nil { return types.JobResult{Error: err.Error()}, err } - if apiKey == nil { - return types.JobResult{Error: "no API key available"}, fmt.Errorf("no API key available") - } tweet, err := ts.GetTweetByIDWithApiKey(j, jobArgs.Query, apiKey) return processResponse(tweet, "", err) default: @@ -1134,14 +1164,48 @@ func (s *ApiKeyScrapeStrategy) Execute(j types.Job, ts *TwitterScraper, jobArgs } } +type ApifyScrapeStrategy struct{} + +func (s *ApifyScrapeStrategy) Execute(j types.Job, ts *TwitterScraper, jobArgs *args.TwitterSearchArguments) (types.JobResult, error) { + capability := teetypes.Capability(jobArgs.QueryType) + switch capability { + case teetypes.CapGetFollowers: + followers, nextCursor, err := ts.getFollowersApify(j, jobArgs.Query, jobArgs.MaxResults, jobArgs.NextCursor) + return processResponse(followers, nextCursor, err) + case teetypes.CapGetFollowing: + following, nextCursor, err := ts.getFollowingApify(j, jobArgs.Query, jobArgs.MaxResults, jobArgs.NextCursor) + return processResponse(following, nextCursor, err) + default: + return types.JobResult{Error: fmt.Sprintf("unsupported capability %s for Apify job", capability)}, fmt.Errorf("unsupported capability %s for Apify job", capability) + } +} + type DefaultScrapeStrategy struct{} +// FIXED: Now using validated QueryType from centralized unmarshaller (addresses the TODO comment) func (s *DefaultScrapeStrategy) Execute(j types.Job, ts *TwitterScraper, jobArgs *args.TwitterSearchArguments) (types.JobResult, error) { - switch strings.ToLower(jobArgs.QueryType) { - case "searchbyquery": + capability := teetypes.Capability(jobArgs.QueryType) + switch capability { + case teetypes.CapGetFollowers, teetypes.CapGetFollowing: + // Priority: Apify > Credentials for general TwitterJob + if ts.configuration.ApifyApiKey != "" { + // Use Apify strategy + apifyStrategy := &ApifyScrapeStrategy{} + return apifyStrategy.Execute(j, ts, jobArgs) + } + // Fall back to credential-based strategy + credentialStrategy := &CredentialScrapeStrategy{} + return credentialStrategy.Execute(j, ts, jobArgs) + case teetypes.CapSearchByQuery: + // Priority: Credentials > API for searchbyquery + if len(ts.configuration.Accounts) > 0 { + credentialStrategy := &CredentialScrapeStrategy{} + return credentialStrategy.Execute(j, ts, jobArgs) + } + // Fall back to API strategy tweets, err := ts.queryTweets(j, twitterx.TweetsSearchRecent, ts.configuration.DataDir, jobArgs.Query, jobArgs.MaxResults) return processResponse(tweets, "", err) - case "searchbyfullarchive": + case teetypes.CapSearchByFullArchive: tweets, err := ts.queryTweets(j, twitterx.TweetsAll, ts.configuration.DataDir, jobArgs.Query, jobArgs.MaxResults) return processResponse(tweets, "", err) default: @@ -1227,50 +1291,46 @@ func processResponse(response any, nextCursor string, err error) (types.JobResul } func defaultStrategyFallback(j types.Job, ts *TwitterScraper, jobArgs *args.TwitterSearchArguments) (types.JobResult, error) { - switch strings.ToLower(jobArgs.QueryType) { - case "searchbyprofile": + capability := jobArgs.GetCapability() + switch capability { + case teetypes.CapSearchByProfile: profile, err := ts.ScrapeTweetsProfile(j, ts.configuration.DataDir, jobArgs.Query) return processResponse(profile, "", err) - case "searchfollowers": // This is for ScrapeFollowersForProfile which is not paginated by cursor in this context - followers, err := ts.ScrapeFollowersForProfile(j, ts.configuration.DataDir, jobArgs.Query, jobArgs.MaxResults) - return processResponse(followers, "", err) - case "getbyid": - tweet, err := ts.ScrapeTweetByID(j, ts.configuration.DataDir, jobArgs.Query) + case teetypes.CapGetById: + tweet, err := ts.GetTweet(j, ts.configuration.DataDir, jobArgs.Query) return processResponse(tweet, "", err) - case "getreplies": + case teetypes.CapGetReplies: // GetTweetReplies takes a cursor for a specific part of a thread, not general pagination of all replies. // The retryWithCursor logic might not directly apply unless GetTweetReplies is adapted for broader pagination. replies, err := ts.GetTweetReplies(j, ts.configuration.DataDir, jobArgs.Query, jobArgs.NextCursor) return processResponse(replies, jobArgs.NextCursor, err) // Pass original NextCursor as it's specific - case "getretweeters": + case teetypes.CapGetRetweeters: // Similar to GetTweetReplies, cursor is for a specific page. retweeters, err := ts.GetTweetRetweeters(j, ts.configuration.DataDir, jobArgs.Query, jobArgs.MaxResults, jobArgs.NextCursor) // GetTweetRetweeters in twitterscraper returns (profiles, nextCursorStr, error) // The current ts.GetTweetRetweeters doesn't return the next cursor. This should be updated if pagination is needed here. // For now, assuming it fetches one batch or handles its own pagination internally up to MaxResults. return processResponse(retweeters, "", err) // Assuming no next cursor from this specific call structure - case "gettweets": + case teetypes.CapGetTweets: return retryWithCursorAndQuery(j, ts.configuration.DataDir, jobArgs.Query, jobArgs.MaxResults, jobArgs.NextCursor, ts.GetUserTweets) - case "getmedia": + case teetypes.CapGetMedia: return retryWithCursorAndQuery(j, ts.configuration.DataDir, jobArgs.Query, jobArgs.MaxResults, jobArgs.NextCursor, ts.GetUserMedia) - case "gethometweets": + case teetypes.CapGetHomeTweets: return retryWithCursor(j, ts.configuration.DataDir, jobArgs.MaxResults, jobArgs.NextCursor, ts.GetHomeTweets) - case "getforyoutweets": + case teetypes.CapGetForYouTweets: return retryWithCursor(j, ts.configuration.DataDir, jobArgs.MaxResults, jobArgs.NextCursor, ts.GetForYouTweets) - case "getbookmarks": - return retryWithCursor(j, ts.configuration.DataDir, jobArgs.MaxResults, jobArgs.NextCursor, ts.GetBookmarks) - case "getprofilebyid": + case teetypes.CapGetProfileById: profile, err := ts.GetProfileByID(j, ts.configuration.DataDir, jobArgs.Query) return processResponse(profile, "", err) - case "gettrends": + case teetypes.CapGetTrends: trends, err := ts.GetTrends(j, ts.configuration.DataDir) return processResponse(trends, "", err) - case "getfollowing": + case teetypes.CapGetFollowing: following, err := ts.GetFollowing(j, ts.configuration.DataDir, jobArgs.Query, jobArgs.MaxResults) return processResponse(following, "", err) - case "getfollowers": + case teetypes.CapGetFollowers: return retryWithCursorAndQuery(j, ts.configuration.DataDir, jobArgs.Query, jobArgs.MaxResults, jobArgs.NextCursor, ts.GetFollowers) - case "getspace": + case teetypes.CapGetSpace: space, err := ts.GetSpace(j, ts.configuration.DataDir, jobArgs.Query) return processResponse(space, "", err) } @@ -1278,21 +1338,36 @@ func defaultStrategyFallback(j types.Job, ts *TwitterScraper, jobArgs *args.Twit } // ExecuteJob runs a job using the appropriate scrape strategy based on the job type. -// It first unmarshals the job arguments into a TwitterSearchArguments struct. +// It first unmarshals the job arguments using the centralized type-safe unmarshaller. // Then it runs the appropriate scrape strategy's Execute method, passing in the job, TwitterScraper, and job arguments. // If the result is empty, it returns an error. // If the result is not empty, it unmarshals the result into a slice of TweetResult and returns the result. // If the unmarshaling fails, it returns an error. // If the unmarshaled result is empty, it returns an error. func (ts *TwitterScraper) ExecuteJob(j types.Job) (types.JobResult, error) { - jobArgs := &args.TwitterSearchArguments{} - if err := j.Arguments.Unmarshal(jobArgs); err != nil { + // Use the centralized unmarshaller from tee-types - this addresses the TODO comment! + jobArgs, err := teeargs.UnmarshalJobArguments(teetypes.JobType(j.Type), map[string]any(j.Arguments)) + if err != nil { logrus.Errorf("Error while unmarshalling job arguments for job ID %s, type %s: %v", j.UUID, j.Type, err) return types.JobResult{Error: "error unmarshalling job arguments"}, err } + // Type assert to Twitter arguments + twitterArgs, ok := teeargs.AsTwitterArguments(jobArgs) + if !ok { + logrus.Errorf("Expected Twitter arguments for job ID %s, type %s", j.UUID, j.Type) + return types.JobResult{Error: "invalid argument type for Twitter job"}, fmt.Errorf("invalid argument type") + } + + // Log the capability for debugging + logrus.Debugf("Executing Twitter job ID %s with capability: %s", j.UUID, twitterArgs.GetCapability()) + strategy := getScrapeStrategy(j.Type) - jobResult, err := strategy.Execute(j, ts, jobArgs) + + // Convert to concrete type for direct usage + args := twitterArgs.(*teeargs.TwitterSearchArguments) + + jobResult, err := strategy.Execute(j, ts, args) if err != nil { logrus.Errorf("Error executing job ID %s, type %s: %v", j.UUID, j.Type, err) return types.JobResult{Error: "error executing job"}, err @@ -1304,72 +1379,47 @@ func (ts *TwitterScraper) ExecuteJob(j types.Job) (types.JobResult, error) { return types.JobResult{Error: "job result data is empty"}, fmt.Errorf("job result data is empty") } - // Check if this is a non-tweet operation that doesn't return tweet results - isNonTweetOperation := strings.ToLower(jobArgs.QueryType) == "searchbyprofile" || - strings.ToLower(jobArgs.QueryType) == "searchfollowers" || - strings.ToLower(jobArgs.QueryType) == "getretweeters" || - strings.ToLower(jobArgs.QueryType) == "getprofilebyid" || - strings.ToLower(jobArgs.QueryType) == "getbyid" || - strings.ToLower(jobArgs.QueryType) == "getspace" || - strings.ToLower(jobArgs.QueryType) == "gettrends" || - strings.ToLower(jobArgs.QueryType) == "getfollowing" || - strings.ToLower(jobArgs.QueryType) == "getfollowers" - - // Skip tweet validation for non-tweet operations - if !isNonTweetOperation { - // Unmarshal result to typed structure + switch { + case twitterArgs.IsSingleTweetOperation(): + var result *teetypes.TweetResult + if err := jobResult.Unmarshal(&result); err != nil { + logrus.Errorf("Error while unmarshalling single tweet result for job ID %s, type %s: %v", j.UUID, j.Type, err) + return types.JobResult{Error: "error unmarshalling single tweet result for final validation"}, err + } + case twitterArgs.IsMultipleTweetOperation(): var results []*teetypes.TweetResult if err := jobResult.Unmarshal(&results); err != nil { - logrus.Errorf("Error while unmarshalling job result for job ID %s, type %s: %v", j.UUID, j.Type, err) - return types.JobResult{Error: "error unmarshalling job result for final validation and result length check"}, err + logrus.Errorf("Error while unmarshalling multiple tweet result for job ID %s, type %s: %v", j.UUID, j.Type, err) + return types.JobResult{Error: "error unmarshalling multiple tweet result for final validation"}, err } - - // Final validation after unmarshaling - if len(results) == 0 { - logrus.Errorf("Job result is empty for job ID %s, type %s", j.UUID, j.Type) - return types.JobResult{Error: "job result is empty"}, fmt.Errorf("job result is empty") + case twitterArgs.IsSingleProfileOperation(): + var result *twitterscraper.Profile + if err := jobResult.Unmarshal(&result); err != nil { + logrus.Errorf("Error while unmarshalling single profile result for job ID %s, type %s: %v", j.UUID, j.Type, err) + return types.JobResult{Error: "error unmarshalling single profile result for final validation"}, err + } + case twitterArgs.IsMultipleProfileOperation(): + var results []*twitterscraper.Profile + if err := jobResult.Unmarshal(&results); err != nil { + logrus.Errorf("Error while unmarshalling multiple profile result for job ID %s, type %s: %v", j.UUID, j.Type, err) + return types.JobResult{Error: "error unmarshalling multiple profile result for final validation"}, err + } + case twitterArgs.IsSingleSpaceOperation(): + var result *twitterscraper.Space + if err := jobResult.Unmarshal(&result); err != nil { + logrus.Errorf("Error while unmarshalling single space result for job ID %s, type %s: %v", j.UUID, j.Type, err) + return types.JobResult{Error: "error unmarshalling single space result for final validation"}, err + } + case twitterArgs.IsTrendsOperation(): + var results []string + if err := jobResult.Unmarshal(&results); err != nil { + logrus.Errorf("Error while unmarshalling trends result for job ID %s, type %s: %v", j.UUID, j.Type, err) + return types.JobResult{Error: "error unmarshalling trends result for final validation"}, err } + default: + logrus.Errorf("Invalid operation type for job ID %s, type %s", j.UUID, j.Type) + return types.JobResult{Error: "invalid operation type"}, fmt.Errorf("invalid operation type") } return jobResult, nil } - -func (ts *TwitterScraper) FetchHomeTweets(j types.Job, baseDir string, count int, cursor string) ([]*twitterscraper.Tweet, string, error) { - scraper, account, _, err := ts.getAuthenticatedScraper(j, baseDir, teetypes.TwitterJob) - if err != nil { - return nil, "", err - } - if scraper == nil { - return nil, "", fmt.Errorf("scraper not initialized for FetchHomeTweets") - } - - ts.statsCollector.Add(j.WorkerID, stats.TwitterScrapes, 1) - tweets, nextCursor, fetchErr := scraper.FetchHomeTweets(count, cursor) - if fetchErr != nil { - _ = ts.handleError(j, fetchErr, account) - return nil, "", fetchErr - } - - ts.statsCollector.Add(j.WorkerID, stats.TwitterTweets, uint(len(tweets))) - return tweets, nextCursor, nil -} - -func (ts *TwitterScraper) FetchForYouTweets(j types.Job, baseDir string, count int, cursor string) ([]*twitterscraper.Tweet, string, error) { - scraper, account, _, err := ts.getAuthenticatedScraper(j, baseDir, teetypes.TwitterJob) - if err != nil { - return nil, "", err - } - if scraper == nil { - return nil, "", fmt.Errorf("scraper not initialized for FetchForYouTweets") - } - - ts.statsCollector.Add(j.WorkerID, stats.TwitterScrapes, 1) - tweets, nextCursor, fetchErr := scraper.FetchForYouTweets(count, cursor) - if fetchErr != nil { - _ = ts.handleError(j, fetchErr, account) - return nil, "", fetchErr - } - - ts.statsCollector.Add(j.WorkerID, stats.TwitterTweets, uint(len(tweets))) - return tweets, nextCursor, nil -} diff --git a/internal/jobs/twitter_test.go b/internal/jobs/twitter_test.go index 21b99606..1da29053 100644 --- a/internal/jobs/twitter_test.go +++ b/internal/jobs/twitter_test.go @@ -55,6 +55,7 @@ var _ = Describe("Twitter Scraper", func() { var err error var twitterAccounts []string var twitterApiKeys []string + var apifyApiKey string BeforeEach(func() { logrus.SetLevel(logrus.DebugLevel) @@ -69,6 +70,7 @@ var _ = Describe("Twitter Scraper", func() { twitterAccounts = parseTwitterAccounts() twitterApiKeys = parseTwitterApiKeys() + apifyApiKey = os.Getenv("APIFY_API_KEY") // Skip all tests if neither auth method is available if len(twitterAccounts) == 0 && len(twitterApiKeys) == 0 { @@ -105,7 +107,7 @@ var _ = Describe("Twitter Scraper", func() { res, err := scraper.ExecuteJob(types.Job{ Type: teetypes.TwitterCredentialJob, Arguments: map[string]interface{}{ - "type": "searchbyquery", + "type": teetypes.CapSearchByQuery, "query": "NASA", "max_results": 1, }, @@ -130,7 +132,7 @@ var _ = Describe("Twitter Scraper", func() { res, err := scraper.ExecuteJob(types.Job{ Type: teetypes.TwitterApiJob, Arguments: map[string]interface{}{ - "type": "searchbyquery", + "type": teetypes.CapSearchByQuery, "query": "NASA", "max_results": 1, }, @@ -156,7 +158,7 @@ var _ = Describe("Twitter Scraper", func() { res, err := scraper.ExecuteJob(types.Job{ Type: teetypes.TwitterCredentialJob, Arguments: map[string]interface{}{ - "type": "searchbyquery", + "type": teetypes.CapSearchByQuery, "query": "NASA", "max_results": 1, }, @@ -178,7 +180,7 @@ var _ = Describe("Twitter Scraper", func() { res, err := scraper.ExecuteJob(types.Job{ Type: teetypes.TwitterJob, Arguments: map[string]interface{}{ - "type": "searchbyquery", + "type": teetypes.CapSearchByQuery, "query": "nasa", "max_results": 10, }, @@ -199,7 +201,7 @@ var _ = Describe("Twitter Scraper", func() { res, err := scraper.ExecuteJob(types.Job{ Type: teetypes.TwitterApiJob, Arguments: map[string]interface{}{ - "type": "searchbyquery", + "type": teetypes.CapSearchByQuery, "query": "NASA", "max_results": 1, }, @@ -220,7 +222,7 @@ var _ = Describe("Twitter Scraper", func() { res, err := scraper.ExecuteJob(types.Job{ Type: teetypes.TwitterApiJob, Arguments: map[string]interface{}{ - "type": "searchbyfullarchive", + "type": teetypes.CapSearchByFullArchive, "query": "NASA", "max_results": 1, }, @@ -245,7 +247,7 @@ var _ = Describe("Twitter Scraper", func() { j := types.Job{ Type: teetypes.TwitterJob, Arguments: map[string]interface{}{ - "type": "searchbyquery", + "type": teetypes.CapSearchByQuery, "query": "nasa", "max_results": 10, }, @@ -275,7 +277,7 @@ var _ = Describe("Twitter Scraper", func() { j := types.Job{ Type: teetypes.TwitterCredentialJob, Arguments: map[string]interface{}{ - "type": "searchbyprofile", + "type": teetypes.CapSearchByProfile, "query": "NASA_Marshall", }, Timeout: 10 * time.Second, @@ -302,7 +304,7 @@ var _ = Describe("Twitter Scraper", func() { res, err := twitterScraper.ExecuteJob(types.Job{ Type: teetypes.TwitterJob, Arguments: map[string]interface{}{ - "type": "getbyid", + "type": teetypes.CapGetById, "query": "1881258110712492142", }, Timeout: 10 * time.Second, @@ -325,7 +327,7 @@ var _ = Describe("Twitter Scraper", func() { j := types.Job{ Type: teetypes.TwitterCredentialJob, Arguments: map[string]interface{}{ - "type": "getreplies", + "type": teetypes.CapGetReplies, "query": "1234567890", }, Timeout: 10 * time.Second, @@ -354,7 +356,7 @@ var _ = Describe("Twitter Scraper", func() { j := types.Job{ Type: teetypes.TwitterCredentialJob, Arguments: map[string]interface{}{ - "type": "getretweeters", + "type": teetypes.CapGetRetweeters, "query": "1234567890", "max_results": 5, }, @@ -384,7 +386,7 @@ var _ = Describe("Twitter Scraper", func() { j := types.Job{ Type: teetypes.TwitterCredentialJob, Arguments: map[string]interface{}{ - "type": "gettweets", + "type": teetypes.CapGetTweets, "query": "NASA", "max_results": 5, }, @@ -414,7 +416,7 @@ var _ = Describe("Twitter Scraper", func() { res, err := twitterScraper.ExecuteJob(types.Job{ Type: teetypes.TwitterCredentialJob, Arguments: map[string]interface{}{ - "type": "getmedia", + "type": teetypes.CapGetMedia, "query": "NASA", "max_results": 5, }, @@ -437,7 +439,7 @@ var _ = Describe("Twitter Scraper", func() { j := types.Job{ Type: teetypes.TwitterCredentialJob, Arguments: map[string]interface{}{ - "type": "gethometweets", + "type": teetypes.CapGetHomeTweets, "max_results": 5, }, Timeout: 10 * time.Second, @@ -466,7 +468,7 @@ var _ = Describe("Twitter Scraper", func() { j := types.Job{ Type: teetypes.TwitterCredentialJob, Arguments: map[string]interface{}{ - "type": "getforyoutweets", + "type": teetypes.CapGetForYouTweets, "max_results": 5, }, Timeout: 10 * time.Second, @@ -497,7 +499,7 @@ var _ = Describe("Twitter Scraper", func() { j := types.Job{ Type: teetypes.TwitterCredentialJob, Arguments: map[string]interface{}{ - "type": "getprofilebyid", + "type": teetypes.CapGetProfileById, "query": "44196397", // Elon Musk's Twitter ID }, Timeout: 10 * time.Second, @@ -525,7 +527,7 @@ var _ = Describe("Twitter Scraper", func() { j := types.Job{ Type: teetypes.TwitterCredentialJob, Arguments: map[string]interface{}{ - "type": "getfollowing", + "type": teetypes.CapGetFollowing, "query": "NASA", "max_results": 5, }, @@ -555,7 +557,7 @@ var _ = Describe("Twitter Scraper", func() { j := types.Job{ Type: teetypes.TwitterCredentialJob, Arguments: map[string]interface{}{ - "type": "getfollowers", + "type": teetypes.CapGetFollowers, "query": "NASA", }, Timeout: 10 * time.Second, @@ -585,7 +587,7 @@ var _ = Describe("Twitter Scraper", func() { j := types.Job{ Type: teetypes.TwitterCredentialJob, Arguments: map[string]interface{}{ - "type": "gettrends", + "type": teetypes.CapGetTrends, }, Timeout: 10 * time.Second, } @@ -613,7 +615,7 @@ var _ = Describe("Twitter Scraper", func() { res, err := scraper.ExecuteJob(types.Job{ Type: teetypes.TwitterApiJob, Arguments: map[string]interface{}{ - "type": "getbyid", + "type": teetypes.CapGetById, "query": "1881258110712492142", }, Timeout: 10 * time.Second, @@ -655,7 +657,7 @@ var _ = Describe("Twitter Scraper", func() { res, err := scraper.ExecuteJob(types.Job{ Type: teetypes.TwitterApiJob, Arguments: map[string]interface{}{ - "type": "getprofilebyid", + "type": teetypes.CapGetProfileById, "query": "44196397", // Elon Musk's Twitter ID }, Timeout: 10 * time.Second, @@ -687,7 +689,7 @@ var _ = Describe("Twitter Scraper", func() { res, err := twitterScraper.ExecuteJob(types.Job{ Type: teetypes.TwitterJob, Arguments: map[string]interface{}{ - "type": "getspace", + "type": teetypes.CapGetSpace, "query": "1YpKkZEWlBaxj", }, Timeout: 10 * time.Second, @@ -707,7 +709,7 @@ var _ = Describe("Twitter Scraper", func() { j := types.Job{ Type: teetypes.TwitterJob, Arguments: map[string]interface{}{ - "type": "getbookmarks", + "type": "getbookmarks", // not yet in teetypes until it's supported "max_results": 5, }, Timeout: 10 * time.Second, @@ -734,7 +736,7 @@ var _ = Describe("Twitter Scraper", func() { j := types.Job{ Type: teetypes.TwitterApiJob, Arguments: map[string]interface{}{ - "type": "searchbyfullarchive", + "type": teetypes.CapSearchByFullArchive, "query": "AI", "max_results": 2, }, @@ -763,7 +765,7 @@ var _ = Describe("Twitter Scraper", func() { j := types.Job{ Type: teetypes.TwitterCredentialJob, Arguments: map[string]interface{}{ - "type": "searchbyfullarchive", + "type": teetypes.CapSearchByFullArchive, "query": "#AI", "max_results": 2, }, @@ -785,5 +787,198 @@ var _ = Describe("Twitter Scraper", func() { Expect(statsCollector.Stats.Stats[j.WorkerID][stats.TwitterScrapes]).To(BeNumerically("==", 1)) Expect(statsCollector.Stats.Stats[j.WorkerID][stats.TwitterTweets]).To(BeNumerically("==", uint(len(results)))) }) + + It("should use Apify for twitter-apify with getfollowers", func() { + if apifyApiKey == "" { + Skip("APIFY_API_KEY is not set") + } + scraper := NewTwitterScraper(types.JobConfiguration{ + "apify_api_key": apifyApiKey, + "data_dir": tempDir, + }, statsCollector) + res, err := scraper.ExecuteJob(types.Job{ + Type: teetypes.TwitterApifyJob, + Arguments: map[string]interface{}{ + "type": teetypes.CapGetFollowers, + "query": "elonmusk", + "max_results": 200, + }, + Timeout: 60 * time.Second, + }) + Expect(err).NotTo(HaveOccurred()) + Expect(res.Error).To(BeEmpty()) + + var followers []*teetypes.ProfileResultApify + err = res.Unmarshal(&followers) + Expect(err).NotTo(HaveOccurred()) + Expect(followers).ToNot(BeEmpty()) + Expect(followers[0].ScreenName).ToNot(BeEmpty()) + }) + + It("should use Apify for twitter-apify with getfollowing", func() { + if apifyApiKey == "" { + Skip("APIFY_API_KEY is not set") + } + scraper := NewTwitterScraper(types.JobConfiguration{ + "apify_api_key": apifyApiKey, + "data_dir": tempDir, + }, statsCollector) + res, err := scraper.ExecuteJob(types.Job{ + Type: teetypes.TwitterApifyJob, + Arguments: map[string]interface{}{ + "type": teetypes.CapGetFollowing, + "query": "elonmusk", + "max_results": 200, + }, + Timeout: 60 * time.Second, + }) + Expect(err).NotTo(HaveOccurred()) + Expect(res.Error).To(BeEmpty()) + + var following []*teetypes.ProfileResultApify + err = res.Unmarshal(&following) + Expect(err).NotTo(HaveOccurred()) + Expect(following).ToNot(BeEmpty()) + Expect(following[0].ScreenName).ToNot(BeEmpty()) + }) + + It("should prioritize Apify for general twitter job with getfollowers", func() { + if apifyApiKey == "" || len(twitterAccounts) == 0 { + Skip("APIFY_API_KEY or TWITTER_ACCOUNTS not set") + } + scraper := NewTwitterScraper(types.JobConfiguration{ + "apify_api_key": apifyApiKey, + "twitter_accounts": twitterAccounts, + "data_dir": tempDir, + }, statsCollector) + res, err := scraper.ExecuteJob(types.Job{ + Type: teetypes.TwitterJob, + Arguments: map[string]interface{}{ + "type": teetypes.CapGetFollowers, + "query": "elonmusk", + "max_results": 200, + }, + Timeout: 60 * time.Second, + }) + Expect(err).NotTo(HaveOccurred()) + Expect(res.Error).To(BeEmpty()) + + // Should return ProfileResultApify (from Apify) not twitterscraper.Profile + var followers []*teetypes.ProfileResultApify + err = res.Unmarshal(&followers) + Expect(err).NotTo(HaveOccurred()) + Expect(followers).ToNot(BeEmpty()) + }) + }) + + // --- Error Handling Tests --- + Context("Error Handling", func() { + It("should handle negative count values in job arguments", func() { + res, err := twitterScraper.ExecuteJob(types.Job{ + Type: teetypes.TwitterJob, + Arguments: map[string]interface{}{ + "type": teetypes.CapSearchByQuery, + "query": "test", + "count": -5, // Invalid negative value + }, + Timeout: 10 * time.Second, + }) + Expect(err).To(HaveOccurred()) + Expect(res.Error).To(ContainSubstring("error unmarshalling job arguments")) + Expect(err.Error()).To(ContainSubstring("count must be non-negative")) + }) + + It("should handle negative max_results values in job arguments", func() { + res, err := twitterScraper.ExecuteJob(types.Job{ + Type: teetypes.TwitterJob, + Arguments: map[string]interface{}{ + "type": teetypes.CapSearchByQuery, + "query": "test", + "max_results": -10, // Invalid negative value + }, + Timeout: 10 * time.Second, + }) + Expect(err).To(HaveOccurred()) + Expect(res.Error).To(ContainSubstring("error unmarshalling job arguments")) + Expect(err.Error()).To(ContainSubstring("max_results must be non-negative")) + }) + + It("should handle invalid capability for job type", func() { + res, err := twitterScraper.ExecuteJob(types.Job{ + Type: teetypes.TwitterApiJob, // API job type + Arguments: map[string]interface{}{ + "type": "invalidcapability", // Invalid capability + "query": "test", + }, + Timeout: 10 * time.Second, + }) + Expect(err).To(HaveOccurred()) + Expect(res.Error).To(ContainSubstring("error unmarshalling job arguments")) + Expect(err.Error()).To(ContainSubstring("capability 'invalidcapability' is not valid for job type")) + }) + + It("should handle capability not available for specific job type", func() { + res, err := twitterScraper.ExecuteJob(types.Job{ + Type: teetypes.TwitterApiJob, // API job type - doesn't support getfollowers + Arguments: map[string]interface{}{ + "type": teetypes.CapGetFollowers, // Valid capability but not for TwitterApiJob + "query": "test", + }, + Timeout: 10 * time.Second, + }) + Expect(err).To(HaveOccurred()) + Expect(res.Error).To(ContainSubstring("error unmarshalling job arguments")) + Expect(err.Error()).To(ContainSubstring("capability 'getfollowers' is not valid for job type 'twitter-api'")) + }) + + It("should handle invalid JSON data structure", func() { + // Create a job with arguments that will cause JSON unmarshalling to fail + res, err := twitterScraper.ExecuteJob(types.Job{ + Type: teetypes.TwitterJob, + Arguments: map[string]interface{}{ + "type": teetypes.CapSearchByQuery, + "query": "test", + "max_results": "not_a_number", // String instead of int + }, + Timeout: 10 * time.Second, + }) + Expect(err).To(HaveOccurred()) + Expect(res.Error).To(ContainSubstring("error unmarshalling job arguments")) + Expect(err.Error()).To(ContainSubstring("failed to unmarshal")) + }) + + It("should handle jobs with unknown job type", func() { + // Test with an unknown job type - this should be caught by the unmarshaller + res, err := twitterScraper.ExecuteJob(types.Job{ + Type: "unknown-job-type", // Invalid job type + Arguments: map[string]interface{}{ + "type": teetypes.CapSearchByQuery, + "query": "test", + }, + Timeout: 10 * time.Second, + }) + Expect(err).To(HaveOccurred()) + Expect(res.Error).To(ContainSubstring("error unmarshalling job arguments")) + Expect(err.Error()).To(ContainSubstring("unknown job type")) + }) + + It("should handle empty arguments map", func() { + res, err := twitterScraper.ExecuteJob(types.Job{ + Type: teetypes.TwitterJob, + Arguments: map[string]interface{}{}, // Empty arguments + Timeout: 10 * time.Second, + }) + // Empty arguments should now work with default capability (searchbyquery) + // The default capability will be used from JobDefaultCapabilityMap + if len(twitterAccounts) == 0 && len(twitterApiKeys) == 0 { + // If no auth is available, expect auth error + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("no Twitter")) + } else { + // If auth is available, it should work with default searchbyquery capability + Expect(err).NotTo(HaveOccurred()) + Expect(res.Error).To(BeEmpty()) + } + }) }) }) diff --git a/internal/jobs/twitterapify/client.go b/internal/jobs/twitterapify/client.go new file mode 100644 index 00000000..35458cce --- /dev/null +++ b/internal/jobs/twitterapify/client.go @@ -0,0 +1,203 @@ +package twitterapify + +import ( + "encoding/base64" + "encoding/json" + "fmt" + "time" + + util "github.com/masa-finance/tee-types/pkg/util" + teetypes "github.com/masa-finance/tee-types/types" + "github.com/masa-finance/tee-worker/pkg/client" + "github.com/sirupsen/logrus" +) + +const ( + TwitterFollowerActorID = "kaitoeasyapi~premium-x-follower-scraper-following-data" + MaxActorPolls = 60 // 5 minutes max wait time + ActorPollInterval = 5 * time.Second // polling interval between status checks + + // Actor run status constants + ActorStatusSucceeded = "SUCCEEDED" + ActorStatusFailed = "FAILED" + ActorStatusAborted = "ABORTED" +) + +// FollowerActorRunRequest represents the input for running the Twitter follower actor +type FollowerActorRunRequest struct { + UserNames []string `json:"user_names"` + UserIds []string `json:"user_ids"` + MaxFollowers int `json:"maxFollowers"` + MaxFollowings int `json:"maxFollowings"` + GetFollowers bool `json:"getFollowers"` + GetFollowing bool `json:"getFollowing"` +} + +// TwitterApifyClient wraps the generic Apify client for Twitter-specific operations +type TwitterApifyClient struct { + apifyClient *client.ApifyClient +} + +// CursorData represents the pagination data stored in cursor +type CursorData struct { + Offset int `json:"offset"` +} + +// NewTwitterApifyClient creates a new Twitter Apify client +func NewTwitterApifyClient(apiToken string) (*TwitterApifyClient, error) { + apifyClient, err := client.NewApifyClient(apiToken) + if err != nil { + return nil, fmt.Errorf("failed to create apify client: %w", err) + } + + return &TwitterApifyClient{ + apifyClient: apifyClient, + }, nil +} + +// ValidateApiKey tests if the Apify API token is valid +func (c *TwitterApifyClient) ValidateApiKey() error { + return c.apifyClient.ValidateApiKey() +} + +// GetFollowers retrieves followers for a username using Apify +func (c *TwitterApifyClient) GetFollowers(username string, maxResults int, cursor string) ([]*teetypes.ProfileResultApify, string, error) { + offset := parseCursor(cursor) + minimum := 200 + + // Ensure minimum of 200 as required by the actor + maxFollowers := util.Max(maxResults, minimum) + + input := FollowerActorRunRequest{ + UserNames: []string{username}, + UserIds: []string{}, // Explicitly set empty array as required by actor + MaxFollowers: maxFollowers, + MaxFollowings: minimum, // Actor requires minimum even when not used + GetFollowers: true, + GetFollowing: false, + } + + return c.runActorAndGetProfiles(input, offset, maxResults) +} + +// GetFollowing retrieves following for a username using Apify +func (c *TwitterApifyClient) GetFollowing(username string, maxResults int, cursor string) ([]*teetypes.ProfileResultApify, string, error) { + offset := parseCursor(cursor) + minimum := 200 + + // Ensure minimum of 200 as required by the actor + maxFollowings := util.Max(maxResults, minimum) + + input := FollowerActorRunRequest{ + UserNames: []string{username}, + UserIds: []string{}, // Explicitly set empty array as required by actor + MaxFollowers: minimum, // Actor requires minimum even when not used + MaxFollowings: maxFollowings, + GetFollowers: false, + GetFollowing: true, + } + + return c.runActorAndGetProfiles(input, offset, maxResults) +} + +// runActorAndGetProfiles runs the actor and retrieves profiles from the dataset +func (c *TwitterApifyClient) runActorAndGetProfiles(input FollowerActorRunRequest, offset, limit int) ([]*teetypes.ProfileResultApify, string, error) { + // 1. Run the actor + logrus.Infof("Starting Apify actor run for %v", input.UserNames) + runResp, err := c.apifyClient.RunActor(TwitterFollowerActorID, input) + if err != nil { + return nil, "", fmt.Errorf("failed to run actor: %w", err) + } + + // 2. Poll for completion + logrus.Infof("Polling for actor run completion: %s", runResp.Data.ID) + pollCount := 0 + + for { + status, err := c.apifyClient.GetActorRun(runResp.Data.ID) + if err != nil { + return nil, "", fmt.Errorf("failed to get actor run status: %w", err) + } + + logrus.Debugf("Actor run status: %s", status.Data.Status) + + if status.Data.Status == ActorStatusSucceeded { + logrus.Infof("Actor run completed successfully") + break + } else if status.Data.Status == ActorStatusFailed || status.Data.Status == ActorStatusAborted { + return nil, "", fmt.Errorf("actor run failed with status: %s", status.Data.Status) + } + + pollCount++ + if pollCount >= MaxActorPolls { + return nil, "", fmt.Errorf("actor run timed out after %d polls", MaxActorPolls) + } + + time.Sleep(ActorPollInterval) + } + + // 3. Get dataset items with pagination + logrus.Infof("Retrieving dataset items from: %s (offset: %d, limit: %d)", runResp.Data.DefaultDatasetId, offset, limit) + dataset, err := c.apifyClient.GetDatasetItems(runResp.Data.DefaultDatasetId, offset, limit) + if err != nil { + return nil, "", fmt.Errorf("failed to get dataset items: %w", err) + } + + // 4. Convert to ProfileResultApify + profiles := make([]*teetypes.ProfileResultApify, 0, len(dataset.Data.Items)) + for i, item := range dataset.Data.Items { + var profile teetypes.ProfileResultApify + if err := json.Unmarshal(item, &profile); err != nil { + logrus.Warnf("Failed to unmarshal profile at index %d: %v", i, err) + continue + } + profiles = append(profiles, &profile) + } + + // 5. Generate next cursor if more data may be available + var nextCursor string + if len(dataset.Data.Items) == limit { + nextCursor = generateCursor(offset + len(dataset.Data.Items)) + logrus.Debugf("Generated next cursor for offset %d", offset+len(dataset.Data.Items)) + } + + if len(dataset.Data.Items) == limit { + logrus.Infof("Successfully retrieved %d profiles; more may be available", len(profiles)) + } else { + logrus.Infof("Successfully retrieved %d profiles", len(profiles)) + } + return profiles, nextCursor, nil +} + +// parseCursor decodes a base64 cursor to get the offset +func parseCursor(cursor string) int { + if cursor == "" { + return 0 + } + + decoded, err := base64.StdEncoding.DecodeString(cursor) + if err != nil { + logrus.Warnf("Failed to decode cursor: %v", err) + return 0 + } + + var cursorData CursorData + if err := json.Unmarshal(decoded, &cursorData); err != nil { + logrus.Warnf("Failed to unmarshal cursor data: %v", err) + return 0 + } + + return cursorData.Offset +} + +// generateCursor encodes an offset as a base64 cursor +func generateCursor(offset int) string { + cursorData := CursorData{Offset: offset} + data, err := json.Marshal(cursorData) + if err != nil { + logrus.Warnf("Failed to marshal cursor data: %v", err) + return "" + } + + return base64.StdEncoding.EncodeToString(data) +} diff --git a/internal/jobs/webscraper.go b/internal/jobs/webscraper.go index aea90ee9..2a120afa 100644 --- a/internal/jobs/webscraper.go +++ b/internal/jobs/webscraper.go @@ -46,14 +46,21 @@ func (ws *WebScraper) GetStructuredCapabilities() teetypes.WorkerCapabilities { func (ws *WebScraper) ExecuteJob(j types.Job) (types.JobResult, error) { logrus.Info("Starting ExecuteJob for web scraper") - // Step 1: Unmarshal arguments - args := &teeargs.WebSearchArguments{} - logrus.Info("Unmarshaling job arguments") - if err := j.Arguments.Unmarshal(args); err != nil { + // Step 1: Use centralized type-safe unmarshaller + jobArgs, err := teeargs.UnmarshalJobArguments(teetypes.JobType(j.Type), map[string]any(j.Arguments)) + if err != nil { logrus.Errorf("Failed to unmarshal job arguments: %v", err) - return types.JobResult{Error: fmt.Sprintf("Invalid arguments: %v", err)}, err + ws.stats.Add(j.WorkerID, stats.WebInvalid, 1) + return types.JobResult{Error: fmt.Sprintf("Invalid arguments: %v", err)}, nil + } + + // Type assert to Web arguments + args, ok := teeargs.AsWebArguments(jobArgs) + if !ok { + logrus.Errorf("Expected Web arguments for job ID %s, type %s", j.UUID, j.Type) + return types.JobResult{Error: "invalid argument type for Web job"}, nil } - logrus.Infof("Job arguments unmarshaled successfully: %+v", args) + logrus.Infof("Job arguments unmarshaled and validated successfully: %+v", args) // Step 2: Validate URL against blacklist logrus.Info("Validating URL against blacklist") @@ -70,9 +77,12 @@ func (ws *WebScraper) ExecuteJob(j types.Job) (types.JobResult, error) { } logrus.Infof("URL %s passed blacklist validation", args.URL) - // Step 3: Perform web scraping - logrus.Infof("Initiating web scraping for URL: %s with depth: %d", args.URL, args.Depth) - result, err := scrapeWeb([]string{args.URL}, args.Depth) + // Step 3: Use enhanced methods for cleaner logic and validation + logrus.Infof("Initiating web scraping for URL: %s (max_depth: %d, has_selector: %t, is_deep_scrape: %t)", + args.URL, args.GetEffectiveMaxDepth(), args.HasSelector(), args.IsDeepScrape()) + + // Perform web scraping using the effective max depth + result, err := scrapeWeb([]string{args.URL}, args.GetEffectiveMaxDepth()) if err != nil { logrus.Errorf("Web scraping failed for URL %s: %v", args.URL, err) ws.stats.Add(j.WorkerID, stats.WebErrors, 1) diff --git a/internal/jobs/webscraper_test.go b/internal/jobs/webscraper_test.go index b23ea048..185e356c 100644 --- a/internal/jobs/webscraper_test.go +++ b/internal/jobs/webscraper_test.go @@ -34,7 +34,7 @@ var _ = Describe("Webscraper", func() { Expect(res.Error).To(BeEmpty()) var scrapedData CollectedData - res.Unmarshal(&scrapedData) + err = res.Unmarshal(&scrapedData) Expect(err).NotTo(HaveOccurred()) Expect(scrapedData.Pages).ToNot(BeEmpty()) @@ -59,39 +59,35 @@ var _ = Describe("Webscraper", func() { } res, err := webScraper.ExecuteJob(j) Expect(err).NotTo(HaveOccurred()) - Expect(res.Error).To(BeEmpty()) + Expect(res.Error).To(Equal("Invalid arguments: failed to unmarshal web job arguments: failed to unmarshal arguments: URL must include a scheme (http:// or https://)")) - var scrapedData CollectedData - res.Unmarshal(&scrapedData) - Expect(err).NotTo(HaveOccurred()) - - Expect(scrapedData.Pages).To(BeEmpty()) + // Don't attempt to unmarshal since the job failed Eventually(func() uint { return statsCollector.Stats.Stats[j.WorkerID][stats.WebSuccess] - }, 5*time.Second, 10*time.Millisecond).Should(BeNumerically("==", 1)) + }, 5*time.Second, 10*time.Millisecond).Should(BeNumerically("==", 0)) Eventually(func() uint { return statsCollector.Stats.Stats[j.WorkerID][stats.WebErrors] }, 5*time.Second, 10*time.Millisecond).Should(BeNumerically("==", 0)) Eventually(func() uint { return statsCollector.Stats.Stats[j.WorkerID][stats.WebInvalid] - }, 5*time.Second, 10*time.Millisecond).Should(BeNumerically("==", 0)) + }, 5*time.Second, 10*time.Millisecond).Should(BeNumerically("==", 1)) }) It("should allow to blacklist urls", func() { webScraper := NewWebScraper(types.JobConfiguration{ - "webscraper_blacklist": []string{"google"}, + "webscraper_blacklist": []string{"https://google.com"}, }, statsCollector) j := types.Job{ Type: teetypes.WebJob, Arguments: map[string]interface{}{ - "url": "google", + "url": "https://google.com", }, WorkerID: "test", } res, err := webScraper.ExecuteJob(j) Expect(err).ToNot(HaveOccurred()) - Expect(res.Error).To(Equal("URL blacklisted: google")) + Expect(res.Error).To(Equal("URL blacklisted: https://google.com")) Eventually(func() uint { return statsCollector.Stats.Stats[j.WorkerID][stats.WebSuccess] }, 5*time.Second, 10*time.Millisecond).Should(BeNumerically("==", 0)) diff --git a/internal/jobserver/jobserver.go b/internal/jobserver/jobserver.go index d02acea1..45cfd9a6 100644 --- a/internal/jobserver/jobserver.go +++ b/internal/jobserver/jobserver.go @@ -93,6 +93,9 @@ func NewJobServer(workers int, jc types.JobConfiguration) *JobServer { teetypes.TwitterApiJob: { w: jobs.NewTwitterScraper(jc, s), // Uses the same implementation as standard Twitter scraper }, + teetypes.TwitterApifyJob: { + w: jobs.NewTwitterScraper(jc, s), // Register Apify job type with Twitter scraper + }, teetypes.TelemetryJob: { w: jobs.NewTelemetryJob(jc, s), }, diff --git a/pkg/client/apify_client.go b/pkg/client/apify_client.go new file mode 100644 index 00000000..a4a2bc92 --- /dev/null +++ b/pkg/client/apify_client.go @@ -0,0 +1,256 @@ +package client + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + + "github.com/sirupsen/logrus" +) + +const ( + apifyBaseURL = "https://api.apify.com/v2" +) + +// ApifyClient represents a client for the Apify API +type ApifyClient struct { + apiToken string + baseUrl string + options *Options +} + +// ActorRunResponse represents the response from running an actor +type ActorRunResponse struct { + Data struct { + ID string `json:"id"` + Status string `json:"status"` + DefaultDatasetId string `json:"defaultDatasetId"` + } `json:"data"` +} + +// DatasetResponse represents the response from getting dataset items +type DatasetResponse struct { + Data struct { + Items []json.RawMessage `json:"items"` + Count int `json:"count"` + Offset int `json:"offset"` + Limit int `json:"limit"` + } `json:"data"` +} + +// NewApifyClient creates a new Apify client with functional options +func NewApifyClient(apiToken string, opts ...Option) (*ApifyClient, error) { + logrus.Info("Creating new ApifyClient with API token") + + options, err := NewOptions(opts...) + if err != nil { + return nil, fmt.Errorf("failed to create options: %w", err) + } + + return &ApifyClient{ + apiToken: apiToken, + baseUrl: apifyBaseURL, + options: options, + }, nil +} + +// HTTPClient exposes the configured http client +func (c *ApifyClient) HTTPClient() *http.Client { + return c.options.HttpClient +} + +// RunActor runs an actor with the given input +func (c *ApifyClient) RunActor(actorId string, input interface{}) (*ActorRunResponse, error) { + url := fmt.Sprintf("%s/acts/%s/runs?token=%s", c.baseUrl, actorId, c.apiToken) + logrus.Infof("Running actor %s", actorId) + + // Marshal input to JSON + inputJSON, err := json.Marshal(input) + if err != nil { + logrus.Errorf("error marshaling actor input: %v", err) + return nil, fmt.Errorf("error marshaling actor input: %w", err) + } + + // Create request + req, err := http.NewRequest("POST", url, bytes.NewBuffer(inputJSON)) + if err != nil { + logrus.Errorf("error creating POST request: %v", err) + return nil, fmt.Errorf("error creating POST request: %w", err) + } + + // Add headers + req.Header.Add("Content-Type", "application/json") + + // Make the request + resp, err := c.options.HttpClient.Do(req) + if err != nil { + logrus.Errorf("error making POST request: %v", err) + return nil, fmt.Errorf("error making POST request: %w", err) + } + defer resp.Body.Close() + + // Read response body + body, err := io.ReadAll(resp.Body) + if err != nil { + logrus.Errorf("error reading response body: %v", err) + return nil, fmt.Errorf("error reading response body: %w", err) + } + + // Check response status + if resp.StatusCode != http.StatusCreated { + logrus.Errorf("unexpected status code %d: %s", resp.StatusCode, string(body)) + return nil, fmt.Errorf("unexpected status code %d: %s", resp.StatusCode, string(body)) + } + + // Parse response + var runResp ActorRunResponse + if err := json.Unmarshal(body, &runResp); err != nil { + logrus.Errorf("error parsing response: %v", err) + return nil, fmt.Errorf("error parsing response: %w", err) + } + + logrus.Infof("Actor run started with ID: %s", runResp.Data.ID) + return &runResp, nil +} + +// GetActorRun gets the status of an actor run +func (c *ApifyClient) GetActorRun(runId string) (*ActorRunResponse, error) { + url := fmt.Sprintf("%s/actor-runs/%s?token=%s", c.baseUrl, runId, c.apiToken) + logrus.Debugf("Getting actor run status: %s", runId) + + // Create request + req, err := http.NewRequest("GET", url, nil) + if err != nil { + logrus.Errorf("error creating GET request: %v", err) + return nil, fmt.Errorf("error creating GET request: %w", err) + } + + // Make the request + resp, err := c.options.HttpClient.Do(req) + if err != nil { + logrus.Errorf("error making GET request: %v", err) + return nil, fmt.Errorf("error making GET request: %w", err) + } + defer resp.Body.Close() + + // Read response body + body, err := io.ReadAll(resp.Body) + if err != nil { + logrus.Errorf("error reading response body: %v", err) + return nil, fmt.Errorf("error reading response body: %w", err) + } + + // Check response status + if resp.StatusCode != http.StatusOK { + logrus.Errorf("unexpected status code %d: %s", resp.StatusCode, string(body)) + return nil, fmt.Errorf("unexpected status code %d: %s", resp.StatusCode, string(body)) + } + + // Parse response + var runResp ActorRunResponse + if err := json.Unmarshal(body, &runResp); err != nil { + logrus.Errorf("error parsing response: %v", err) + return nil, fmt.Errorf("error parsing response: %w", err) + } + + return &runResp, nil +} + +// GetDatasetItems gets items from a dataset with pagination +func (c *ApifyClient) GetDatasetItems(datasetId string, offset, limit int) (*DatasetResponse, error) { + url := fmt.Sprintf("%s/datasets/%s/items?token=%s&offset=%d&limit=%d", + c.baseUrl, datasetId, c.apiToken, offset, limit) + logrus.Debugf("Getting dataset items: %s (offset: %d, limit: %d)", datasetId, offset, limit) + + // Create request + req, err := http.NewRequest("GET", url, nil) + if err != nil { + logrus.Errorf("error creating GET request: %v", err) + return nil, fmt.Errorf("error creating GET request: %w", err) + } + + // Make the request + resp, err := c.options.HttpClient.Do(req) + if err != nil { + logrus.Errorf("error making GET request: %v", err) + return nil, fmt.Errorf("error making GET request: %w", err) + } + defer resp.Body.Close() + + // Read response body + body, err := io.ReadAll(resp.Body) + if err != nil { + logrus.Errorf("error reading response body: %v", err) + return nil, fmt.Errorf("error reading response body: %w", err) + } + + // Check response status + if resp.StatusCode != http.StatusOK { + logrus.Errorf("unexpected status code %d: %s", resp.StatusCode, string(body)) + return nil, fmt.Errorf("unexpected status code %d: %s", resp.StatusCode, string(body)) + } + + // Parse response - Apify returns a direct array of items, not wrapped in a data object + var items []json.RawMessage + if err := json.Unmarshal(body, &items); err != nil { + logrus.Errorf("error parsing response: %v", err) + return nil, fmt.Errorf("error parsing response: %w", err) + } + + // Create a DatasetResponse object with the items and estimated pagination info + datasetResp := &DatasetResponse{ + Data: struct { + Items []json.RawMessage `json:"items"` + Count int `json:"count"` + Offset int `json:"offset"` + Limit int `json:"limit"` + }{ + Items: items, + Count: len(items), + Offset: offset, + Limit: limit, + }, + } + + logrus.Debugf("Retrieved %d items from dataset", len(items)) + return datasetResp, nil +} + +// ValidateApiKey tests if the API token is valid by making a request to /users/me +// This endpoint doesn't consume any actor runs or quotas - it's perfect for validation +func (c *ApifyClient) ValidateApiKey() error { + url := fmt.Sprintf("%s/users/me?token=%s", c.baseUrl, c.apiToken) + logrus.Debug("Testing Apify API token") + + // Create request + req, err := http.NewRequest("GET", url, nil) + if err != nil { + logrus.Errorf("error creating auth test request: %v", err) + return fmt.Errorf("error creating auth test request: %w", err) + } + + // Make the request + resp, err := c.options.HttpClient.Do(req) + if err != nil { + logrus.Errorf("error making auth test request: %v", err) + return fmt.Errorf("error making auth test request: %w", err) + } + defer resp.Body.Close() + + // Check response status + switch resp.StatusCode { + case http.StatusOK: + logrus.Debug("Apify API token validation successful") + return nil + case http.StatusUnauthorized: + return fmt.Errorf("invalid Apify API token") + case http.StatusForbidden: + return fmt.Errorf("insufficient permissions for Apify API token") + case http.StatusTooManyRequests: + return fmt.Errorf("rate limit exceeded") + default: + return fmt.Errorf("Apify API auth test failed with status: %d", resp.StatusCode) + } +} diff --git a/pkg/client/twitter_x_client.go b/pkg/client/twitter_x_client.go index d799f047..b4e4b930 100644 --- a/pkg/client/twitter_x_client.go +++ b/pkg/client/twitter_x_client.go @@ -1,11 +1,10 @@ package client import ( - "encoding/json" "fmt" - "github.com/sirupsen/logrus" - "io" "net/http" + + "github.com/sirupsen/logrus" ) const ( @@ -19,19 +18,6 @@ type TwitterXClient struct { httpClient *http.Client } -// AuthResponse Simple auth response structure -type AuthResponse struct { - Data struct { - ID string `json:"id"` - Name string `json:"name"` - Username string `json:"username"` - } `json:"data"` - Errors []struct { - Message string `json:"message"` - Code int `json:"code"` - } `json:"errors,omitempty"` -} - func NewTwitterXClient(apiKey string) *TwitterXClient { logrus.Info("Creating new TwitterXClient with API key") // test if the API key is valid before returning the client @@ -79,54 +65,3 @@ func (c *TwitterXClient) Get(endpointUrl string) (*http.Response, error) { return resp, nil } - -// TestAuth tests if the API key is valid by making a request to /2/users/me -func (c *TwitterXClient) testAuth() error { - // Create request - req, err := http.NewRequest("GET", baseURL+"/users/me", nil) - if err != nil { - return fmt.Errorf("error creating auth test request: %w", err) - } - - // Add headers - req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", c.apiKey)) - req.Header.Add("Content-Type", "application/json") - - // Make request - resp, err := c.httpClient.Do(req) - if err != nil { - return fmt.Errorf("error making auth test request: %w", err) - } - defer resp.Body.Close() - - // Read response body - body, err := io.ReadAll(resp.Body) - if err != nil { - return fmt.Errorf("error reading response: %w", err) - } - - // Parse response - var authResp AuthResponse - if err := json.Unmarshal(body, &authResp); err != nil { - return fmt.Errorf("error parsing response: %w", err) - } - - // Check for errors - if len(authResp.Errors) > 0 { - return fmt.Errorf("API error: %s (code: %d)", - authResp.Errors[0].Message, - authResp.Errors[0].Code) - } - - // Check response status - switch resp.StatusCode { - case http.StatusOK: - return nil - case http.StatusUnauthorized: - return fmt.Errorf("invalid API key") - case http.StatusTooManyRequests: - return fmt.Errorf("rate limit exceeded") - default: - return fmt.Errorf("API auth test failed with status: %d", resp.StatusCode) - } -} diff --git a/tee/masa-tee-worker.json b/tee/masa-tee-worker.json index 5de682ca..ef2a245c 100644 --- a/tee/masa-tee-worker.json +++ b/tee/masa-tee-worker.json @@ -38,6 +38,7 @@ {"name": "TIKTOK_DEFAULT_LANGUAGE", "fromHost":true}, {"name": "TWITTER_ACCOUNTS", "fromHost":true}, {"name": "TWITTER_API_KEYS", "fromHost":true}, + {"name": "APIFY_API_KEY", "fromHost":true}, {"name": "TWITTER_SKIP_LOGIN_VERIFICATION", "fromHost":true}, {"name": "WEBSCRAPER_BLACKLIST", "fromHost":true} ],