diff --git a/internal/api/start.go b/internal/api/start.go index b4d7e8b5..3bc4fd8f 100644 --- a/internal/api/start.go +++ b/internal/api/start.go @@ -23,8 +23,8 @@ func Start(ctx context.Context, listenAddress, dataDIR string, standalone bool, e := echo.New() // Default loglevel - logLevel := jc.GetString("log_level", "info") - e.Logger.SetLevel(parseLogLevel(logLevel)) + level := jc.GetLogLevel() + e.Logger.SetLevel(parseLogLevel(level.String())) // Jobserver instance maxJobs, _ := jc.GetInt("max_jobs", 10) @@ -63,7 +63,7 @@ func Start(ctx context.Context, listenAddress, dataDIR string, standalone bool, debug.PUT("/loglevel", func(c echo.Context) error { levelStr := c.QueryParam("level") if levelStr == "" { - levelStr = jc.GetString("log_level", "info") + levelStr = jc.GetLogLevel().String() } // Set logrus log level diff --git a/internal/capabilities/detector.go b/internal/capabilities/detector.go index ceaf3f7f..6ccfd4ac 100644 --- a/internal/capabilities/detector.go +++ b/internal/capabilities/detector.go @@ -21,15 +21,10 @@ type JobServerInterface interface { } // DetectCapabilities automatically detects available capabilities based on configuration -// If jobServer is provided, it will use the actual worker capabilities +// Always performs real capability detection by probing APIs and actors to ensure accurate reporting func DetectCapabilities(jc config.JobConfiguration, jobServer JobServerInterface) teetypes.WorkerCapabilities { - // If we have a JobServer, get capabilities directly from the workers - if jobServer != nil { - return jobServer.GetWorkerCapabilities() - } - - // Fallback to basic detection if no JobServer is available - // This maintains backward compatibility and is used during initialization + // Always perform real capability detection to ensure accurate reporting + // This guarantees miners report only capabilities they actually have access to capabilities := make(teetypes.WorkerCapabilities) // Start with always available capabilities diff --git a/internal/capabilities/detector_test.go b/internal/capabilities/detector_test.go index 57b9e7e8..7c263adc 100644 --- a/internal/capabilities/detector_test.go +++ b/internal/capabilities/detector_test.go @@ -44,7 +44,7 @@ var _ = Describe("DetectCapabilities", func() { // Compare the sorted slices Expect(gotKeys).To(Equal(expectedKeys)) }, - Entry("With JobServer - gets capabilities from workers", + Entry("With JobServer - performs real detection (JobServer ignored)", config.JobConfiguration{}, &MockJobServer{ capabilities: teetypes.WorkerCapabilities{ @@ -55,10 +55,8 @@ var _ = Describe("DetectCapabilities", func() { }, }, teetypes.WorkerCapabilities{ - teetypes.WebJob: {teetypes.CapScraper}, teetypes.TelemetryJob: {teetypes.CapTelemetry}, teetypes.TiktokJob: {teetypes.CapTranscription}, - teetypes.TwitterJob: {teetypes.CapSearchByQuery, teetypes.CapGetById, teetypes.CapGetProfileById}, }, ), Entry("Without JobServer - basic capabilities only", diff --git a/internal/config/config.go b/internal/config/config.go index f286b3bb..9bd4d233 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -33,11 +33,6 @@ func ReadConfig() JobConfiguration { // that is needed for the job jc := JobConfiguration{} - logLevel := os.Getenv("LOG_LEVEL") - level := ParseLogLevel(logLevel) - jc["log_level"] = level.String() - SetLogLevel(level) - dataDir := os.Getenv("DATA_DIR") if dataDir == "" { dataDir = "/home/masa" @@ -57,6 +52,12 @@ func ReadConfig() JobConfiguration { fmt.Println("Failed reading env file. Running in simulation mode, reading from environment variables") } + // Parse LOG_LEVEL after .env file is loaded + logLevel := os.Getenv("LOG_LEVEL") + level := ParseLogLevel(logLevel) + jc["log_level"] = level + SetLogLevel(level) + bufSizeStr := os.Getenv("STATS_BUF_SIZE") if bufSizeStr == "" { bufSizeStr = "128" @@ -66,18 +67,18 @@ func ReadConfig() JobConfiguration { logrus.Errorf("Error parsing STATS_BUF_SIZE: %s. Setting to default.", err) bufSize = 128 } - jc["stats_buf_size"] = uint(bufSize) + jc["stats_buf_size"] = bufSize - maxJobsStr := os.Getenv("STATS_BUF_SIZE") + maxJobsStr := os.Getenv("MAX_JOBS") if maxJobsStr == "" { maxJobsStr = "10" } maxJobs, err := strconv.Atoi(maxJobsStr) if err != nil { logrus.Errorf("Error parsing MAX_JOBS %s. Setting to default.", err) - bufSize = 10 + maxJobs = 10 } - jc["max_jobs"] = uint(maxJobs) + jc["max_jobs"] = maxJobs listenAddress := os.Getenv("LISTEN_ADDRESS") if listenAddress == "" { @@ -278,6 +279,16 @@ func (jc JobConfiguration) GetBool(key string, def bool) bool { return def } +// GetLogLevel safely extracts a logrus.Level from JobConfiguration, with a default fallback +func (jc JobConfiguration) GetLogLevel() logrus.Level { + if v, ok := jc["log_level"]; ok { + if level, ok := v.(logrus.Level); ok { + return level + } + } + return logrus.InfoLevel // default +} + // TwitterScraperConfig represents the configuration needed for Twitter scraping // This is defined here to avoid circular imports between api/types and internal/jobs type TwitterScraperConfig struct { @@ -379,7 +390,10 @@ func ParseLogLevel(logLevel string) logrus.Level { case "error": return logrus.ErrorLevel default: - logrus.Error("Invalid log level", "level", logLevel, "setting_to", logrus.InfoLevel.String()) + logrus.WithFields(logrus.Fields{ + "level": logLevel, + "setting_to": logrus.InfoLevel.String(), + }).Error("Invalid log level") return logrus.InfoLevel } } diff --git a/internal/jobs/stats/stats.go b/internal/jobs/stats/stats.go index e41edbfe..d7e184a0 100644 --- a/internal/jobs/stats/stats.go +++ b/internal/jobs/stats/stats.go @@ -6,6 +6,7 @@ import ( "time" teetypes "github.com/masa-finance/tee-types/types" + "github.com/masa-finance/tee-worker/internal/capabilities" "github.com/masa-finance/tee-worker/internal/config" "github.com/masa-finance/tee-worker/internal/versioning" "github.com/sirupsen/logrus" @@ -134,8 +135,9 @@ func (s *StatsCollector) SetJobServer(js WorkerCapabilitiesProvider) { s.Stats.Lock() defer s.Stats.Unlock() - // Get capabilities from the JobServer directly - s.Stats.ReportedCapabilities = js.GetWorkerCapabilities() + // Use real capability detection to ensure accurate reporting + // This probes actual APIs and actors to verify access + s.Stats.ReportedCapabilities = capabilities.DetectCapabilities(s.jobConfiguration, js) - logrus.Infof("Updated structured capabilities with JobServer: %+v", s.Stats.ReportedCapabilities) + logrus.Infof("Updated structured capabilities with real detection: %+v", s.Stats.ReportedCapabilities) } diff --git a/internal/jobs/twitter.go b/internal/jobs/twitter.go index d72fe750..88553026 100644 --- a/internal/jobs/twitter.go +++ b/internal/jobs/twitter.go @@ -117,9 +117,8 @@ func (ts *TwitterScraper) getCredentialScraper(j types.Job, baseDir string) (*tw } authConfig := twitter.AuthConfig{ - Account: account, - BaseDir: baseDir, - SkipLoginVerification: ts.configuration.SkipLoginVerification, + Account: account, + BaseDir: baseDir, } scraper := twitter.NewScraper(authConfig) if scraper == nil { diff --git a/internal/jobs/twitter/auth.go b/internal/jobs/twitter/auth.go index 6d03ede7..62f421bc 100644 --- a/internal/jobs/twitter/auth.go +++ b/internal/jobs/twitter/auth.go @@ -1,8 +1,6 @@ package twitter import ( - "fmt" - "github.com/sirupsen/logrus" ) @@ -11,10 +9,6 @@ type AuthConfig struct { // Account-based auth Account *TwitterAccount BaseDir string - // SkipLoginVerification when true, skips the IsLoggedIn check after loading cookies - // This can help avoid rate limiting on Twitter's verify_credentials endpoint - // Default is false (verification enabled) - SkipLoginVerification bool } func NewScraper(config AuthConfig) *Scraper { @@ -27,47 +21,13 @@ func NewScraper(config AuthConfig) *Scraper { scraper := &Scraper{Scraper: newTwitterScraper()} - // Configure whether to skip login verification - scraper.SetSkipLoginVerification(config.SkipLoginVerification) - // Try loading cookies if err := LoadCookies(scraper.Scraper, config.Account, config.BaseDir); err == nil { logrus.Debugf("Cookies loaded for user %s.", config.Account.Username) - if scraper.IsLoggedIn() { - logrus.Debugf("Already logged in as %s.", config.Account.Username) - return scraper - } + scraper.SetBearerToken() + return scraper } else { - logrus.Warnf("Failed to load cookies for user %s: %v", config.Account.Username, err) - } - - RandomSleep() - - if err := scraper.Login(config.Account.Username, config.Account.Password, config.Account.TwoFACode); err != nil { - logrus.WithError(err).Warnf("Login failed for %s", config.Account.Username) + logrus.WithError(err).Warnf("Failed to load cookies for user %s", config.Account.Username) return nil } - - RandomSleep() - - if err := SaveCookies(scraper.Scraper, config.Account, config.BaseDir); err != nil { - logrus.WithError(err).Errorf("Failed to save cookies for %s", config.Account.Username) - } - - logrus.Debugf("Login successful for %s", config.Account.Username) - return scraper -} - -func (scraper *Scraper) Login(username, password string, twoFACode ...string) error { - - var err error - if len(twoFACode) > 0 { - err = scraper.Scraper.Login(username, password, twoFACode[0]) - } else { - err = scraper.Scraper.Login(username, password) - } - if err != nil { - return fmt.Errorf("login failed: %v", err) - } - return nil } diff --git a/internal/jobs/twitter/common.go b/internal/jobs/twitter/common.go index 94a6d84c..332a6ff4 100644 --- a/internal/jobs/twitter/common.go +++ b/internal/jobs/twitter/common.go @@ -81,13 +81,9 @@ func getAuthenticatedScraper(baseDir string) (*Scraper, *TwitterAccount, error) return nil, nil, fmt.Errorf("all accounts are rate-limited") } - // Check if we should skip login verification from environment - skipVerification := os.Getenv("TWITTER_SKIP_LOGIN_VERIFICATION") == "true" - authConfig := AuthConfig{ - Account: account, - BaseDir: baseDir, - SkipLoginVerification: skipVerification, + Account: account, + BaseDir: baseDir, } scraper := NewScraper(authConfig) diff --git a/internal/jobs/twitter/cookies.go b/internal/jobs/twitter/cookies.go index db68fcd0..af2fbdac 100644 --- a/internal/jobs/twitter/cookies.go +++ b/internal/jobs/twitter/cookies.go @@ -32,12 +32,6 @@ func SaveCookies(scraper *twitterscraper.Scraper, account *TwitterAccount, baseD } func LoadCookies(scraper *twitterscraper.Scraper, account *TwitterAccount, baseDir string) error { - - // let's logout first before loading cookies - if err := scraper.Logout(); err != nil { // logout first - logrus.Errorf("Error logging out: %v", err) // log error but continue - } - logrus.Debugf("Loading cookies for user %s", account.Username) cookieFile := filepath.Join(baseDir, fmt.Sprintf("%s_twitter_cookies.json", account.Username)) diff --git a/internal/jobs/twitter/scraper.go b/internal/jobs/twitter/scraper.go index d46aec94..36eac85c 100644 --- a/internal/jobs/twitter/scraper.go +++ b/internal/jobs/twitter/scraper.go @@ -13,24 +13,7 @@ func newTwitterScraper() *twitterscraper.Scraper { return twitterscraper.New() } -// SetSkipLoginVerification configures whether to skip the Twitter login verification API call -// Setting this to true will avoid rate limiting on Twitter's verify_credentials endpoint -func (s *Scraper) SetSkipLoginVerification(skip bool) *Scraper { - s.skipLoginVerification = skip - return s -} - -// IsLoggedIn checks if the scraper is logged in -// If skipLoginVerification is true, it will assume the session is valid without making an API call -func (s *Scraper) IsLoggedIn() bool { - - // TODO: we somehow need to set the bearer token regardless. so calling this to set it. - // if the skip verification is set, we'll just return true. - loggedIn := s.Scraper.IsLoggedIn() - if s.skipLoginVerification { - return true // Skip the verification API call to avoid rate limits - } - - // whatever the scraper returns, we return - return loggedIn +// SetBearerToken sets the bearer token via scraper's IsLoggedIn check +func (s *Scraper) SetBearerToken() { + s.Scraper.IsLoggedIn() } diff --git a/internal/versioning/version.go b/internal/versioning/version.go index aa451595..3e5dc6dc 100644 --- a/internal/versioning/version.go +++ b/internal/versioning/version.go @@ -5,5 +5,5 @@ var ( // XXX: Bump this value only when there are protocol changes that makes the oracle // incompatible between version! - TEEWorkerVersion = `gamma` + TEEWorkerVersion = `delta` )