Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions internal/api/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ func Start(ctx context.Context, listenAddress, dataDIR string, standalone bool,
e := echo.New()

// Default loglevel
logLevel := jc.GetString("log_level", "info")
e.Logger.SetLevel(parseLogLevel(logLevel))
level := jc.GetLogLevel()
e.Logger.SetLevel(parseLogLevel(level.String()))

// Jobserver instance
maxJobs, _ := jc.GetInt("max_jobs", 10)
Expand Down Expand Up @@ -63,7 +63,7 @@ func Start(ctx context.Context, listenAddress, dataDIR string, standalone bool,
debug.PUT("/loglevel", func(c echo.Context) error {
levelStr := c.QueryParam("level")
if levelStr == "" {
levelStr = jc.GetString("log_level", "info")
levelStr = jc.GetLogLevel().String()
}

// Set logrus log level
Expand Down
11 changes: 3 additions & 8 deletions internal/capabilities/detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,10 @@ type JobServerInterface interface {
}

// DetectCapabilities automatically detects available capabilities based on configuration
// If jobServer is provided, it will use the actual worker capabilities
// Always performs real capability detection by probing APIs and actors to ensure accurate reporting
func DetectCapabilities(jc config.JobConfiguration, jobServer JobServerInterface) teetypes.WorkerCapabilities {
// If we have a JobServer, get capabilities directly from the workers
if jobServer != nil {
return jobServer.GetWorkerCapabilities()
}

// Fallback to basic detection if no JobServer is available
// This maintains backward compatibility and is used during initialization
// Always perform real capability detection to ensure accurate reporting
// This guarantees miners report only capabilities they actually have access to
capabilities := make(teetypes.WorkerCapabilities)

// Start with always available capabilities
Expand Down
4 changes: 1 addition & 3 deletions internal/capabilities/detector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ var _ = Describe("DetectCapabilities", func() {
// Compare the sorted slices
Expect(gotKeys).To(Equal(expectedKeys))
},
Entry("With JobServer - gets capabilities from workers",
Entry("With JobServer - performs real detection (JobServer ignored)",
config.JobConfiguration{},
&MockJobServer{
capabilities: teetypes.WorkerCapabilities{
Expand All @@ -55,10 +55,8 @@ var _ = Describe("DetectCapabilities", func() {
},
},
teetypes.WorkerCapabilities{
teetypes.WebJob: {teetypes.CapScraper},
teetypes.TelemetryJob: {teetypes.CapTelemetry},
teetypes.TiktokJob: {teetypes.CapTranscription},
teetypes.TwitterJob: {teetypes.CapSearchByQuery, teetypes.CapGetById, teetypes.CapGetProfileById},
},
),
Entry("Without JobServer - basic capabilities only",
Expand Down
34 changes: 24 additions & 10 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,6 @@ func ReadConfig() JobConfiguration {
// that is needed for the job
jc := JobConfiguration{}

logLevel := os.Getenv("LOG_LEVEL")
level := ParseLogLevel(logLevel)
jc["log_level"] = level.String()
SetLogLevel(level)

dataDir := os.Getenv("DATA_DIR")
if dataDir == "" {
dataDir = "/home/masa"
Expand All @@ -57,6 +52,12 @@ func ReadConfig() JobConfiguration {
fmt.Println("Failed reading env file. Running in simulation mode, reading from environment variables")
}

// Parse LOG_LEVEL after .env file is loaded
logLevel := os.Getenv("LOG_LEVEL")
level := ParseLogLevel(logLevel)
jc["log_level"] = level
SetLogLevel(level)

bufSizeStr := os.Getenv("STATS_BUF_SIZE")
if bufSizeStr == "" {
bufSizeStr = "128"
Expand All @@ -66,18 +67,18 @@ func ReadConfig() JobConfiguration {
logrus.Errorf("Error parsing STATS_BUF_SIZE: %s. Setting to default.", err)
bufSize = 128
}
jc["stats_buf_size"] = uint(bufSize)
jc["stats_buf_size"] = bufSize

maxJobsStr := os.Getenv("STATS_BUF_SIZE")
maxJobsStr := os.Getenv("MAX_JOBS")
if maxJobsStr == "" {
maxJobsStr = "10"
}
maxJobs, err := strconv.Atoi(maxJobsStr)
if err != nil {
logrus.Errorf("Error parsing MAX_JOBS %s. Setting to default.", err)
bufSize = 10
maxJobs = 10
}
jc["max_jobs"] = uint(maxJobs)
jc["max_jobs"] = maxJobs

listenAddress := os.Getenv("LISTEN_ADDRESS")
if listenAddress == "" {
Expand Down Expand Up @@ -278,6 +279,16 @@ func (jc JobConfiguration) GetBool(key string, def bool) bool {
return def
}

// GetLogLevel safely extracts a logrus.Level from JobConfiguration, with a default fallback
func (jc JobConfiguration) GetLogLevel() logrus.Level {
if v, ok := jc["log_level"]; ok {
if level, ok := v.(logrus.Level); ok {
return level
}
}
return logrus.InfoLevel // default
}

// TwitterScraperConfig represents the configuration needed for Twitter scraping
// This is defined here to avoid circular imports between api/types and internal/jobs
type TwitterScraperConfig struct {
Expand Down Expand Up @@ -379,7 +390,10 @@ func ParseLogLevel(logLevel string) logrus.Level {
case "error":
return logrus.ErrorLevel
default:
logrus.Error("Invalid log level", "level", logLevel, "setting_to", logrus.InfoLevel.String())
logrus.WithFields(logrus.Fields{
"level": logLevel,
"setting_to": logrus.InfoLevel.String(),
}).Error("Invalid log level")
return logrus.InfoLevel
}
}
Expand Down
8 changes: 5 additions & 3 deletions internal/jobs/stats/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

teetypes "github.com/masa-finance/tee-types/types"
"github.com/masa-finance/tee-worker/internal/capabilities"
"github.com/masa-finance/tee-worker/internal/config"
"github.com/masa-finance/tee-worker/internal/versioning"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -134,8 +135,9 @@ func (s *StatsCollector) SetJobServer(js WorkerCapabilitiesProvider) {
s.Stats.Lock()
defer s.Stats.Unlock()

// Get capabilities from the JobServer directly
s.Stats.ReportedCapabilities = js.GetWorkerCapabilities()
// Use real capability detection to ensure accurate reporting
// This probes actual APIs and actors to verify access
s.Stats.ReportedCapabilities = capabilities.DetectCapabilities(s.jobConfiguration, js)

logrus.Infof("Updated structured capabilities with JobServer: %+v", s.Stats.ReportedCapabilities)
logrus.Infof("Updated structured capabilities with real detection: %+v", s.Stats.ReportedCapabilities)
}
5 changes: 2 additions & 3 deletions internal/jobs/twitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,8 @@ func (ts *TwitterScraper) getCredentialScraper(j types.Job, baseDir string) (*tw
}

authConfig := twitter.AuthConfig{
Account: account,
BaseDir: baseDir,
SkipLoginVerification: ts.configuration.SkipLoginVerification,
Account: account,
BaseDir: baseDir,
}
scraper := twitter.NewScraper(authConfig)
if scraper == nil {
Expand Down
46 changes: 3 additions & 43 deletions internal/jobs/twitter/auth.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package twitter

import (
"fmt"

"github.com/sirupsen/logrus"
)

Expand All @@ -11,10 +9,6 @@ type AuthConfig struct {
// Account-based auth
Account *TwitterAccount
BaseDir string
// SkipLoginVerification when true, skips the IsLoggedIn check after loading cookies
// This can help avoid rate limiting on Twitter's verify_credentials endpoint
// Default is false (verification enabled)
SkipLoginVerification bool
}

func NewScraper(config AuthConfig) *Scraper {
Expand All @@ -27,47 +21,13 @@ func NewScraper(config AuthConfig) *Scraper {

scraper := &Scraper{Scraper: newTwitterScraper()}

// Configure whether to skip login verification
scraper.SetSkipLoginVerification(config.SkipLoginVerification)

// Try loading cookies
if err := LoadCookies(scraper.Scraper, config.Account, config.BaseDir); err == nil {
logrus.Debugf("Cookies loaded for user %s.", config.Account.Username)
if scraper.IsLoggedIn() {
logrus.Debugf("Already logged in as %s.", config.Account.Username)
return scraper
}
scraper.SetBearerToken()
return scraper
} else {
logrus.Warnf("Failed to load cookies for user %s: %v", config.Account.Username, err)
}

RandomSleep()

if err := scraper.Login(config.Account.Username, config.Account.Password, config.Account.TwoFACode); err != nil {
logrus.WithError(err).Warnf("Login failed for %s", config.Account.Username)
logrus.WithError(err).Warnf("Failed to load cookies for user %s", config.Account.Username)
return nil
}

RandomSleep()

if err := SaveCookies(scraper.Scraper, config.Account, config.BaseDir); err != nil {
logrus.WithError(err).Errorf("Failed to save cookies for %s", config.Account.Username)
}

logrus.Debugf("Login successful for %s", config.Account.Username)
return scraper
}

func (scraper *Scraper) Login(username, password string, twoFACode ...string) error {

var err error
if len(twoFACode) > 0 {
err = scraper.Scraper.Login(username, password, twoFACode[0])
} else {
err = scraper.Scraper.Login(username, password)
}
if err != nil {
return fmt.Errorf("login failed: %v", err)
}
return nil
}
8 changes: 2 additions & 6 deletions internal/jobs/twitter/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,9 @@ func getAuthenticatedScraper(baseDir string) (*Scraper, *TwitterAccount, error)
return nil, nil, fmt.Errorf("all accounts are rate-limited")
}

// Check if we should skip login verification from environment
skipVerification := os.Getenv("TWITTER_SKIP_LOGIN_VERIFICATION") == "true"

authConfig := AuthConfig{
Account: account,
BaseDir: baseDir,
SkipLoginVerification: skipVerification,
Account: account,
BaseDir: baseDir,
}

scraper := NewScraper(authConfig)
Expand Down
6 changes: 0 additions & 6 deletions internal/jobs/twitter/cookies.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,6 @@ func SaveCookies(scraper *twitterscraper.Scraper, account *TwitterAccount, baseD
}

func LoadCookies(scraper *twitterscraper.Scraper, account *TwitterAccount, baseDir string) error {

// let's logout first before loading cookies
if err := scraper.Logout(); err != nil { // logout first
logrus.Errorf("Error logging out: %v", err) // log error but continue
}

logrus.Debugf("Loading cookies for user %s", account.Username)
cookieFile := filepath.Join(baseDir, fmt.Sprintf("%s_twitter_cookies.json", account.Username))

Expand Down
23 changes: 3 additions & 20 deletions internal/jobs/twitter/scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,7 @@ func newTwitterScraper() *twitterscraper.Scraper {
return twitterscraper.New()
}

// SetSkipLoginVerification configures whether to skip the Twitter login verification API call
// Setting this to true will avoid rate limiting on Twitter's verify_credentials endpoint
func (s *Scraper) SetSkipLoginVerification(skip bool) *Scraper {
s.skipLoginVerification = skip
return s
}

// IsLoggedIn checks if the scraper is logged in
// If skipLoginVerification is true, it will assume the session is valid without making an API call
func (s *Scraper) IsLoggedIn() bool {

// TODO: we somehow need to set the bearer token regardless. so calling this to set it.
// if the skip verification is set, we'll just return true.
loggedIn := s.Scraper.IsLoggedIn()
if s.skipLoginVerification {
return true // Skip the verification API call to avoid rate limits
}

// whatever the scraper returns, we return
return loggedIn
// SetBearerToken sets the bearer token via scraper's IsLoggedIn check
func (s *Scraper) SetBearerToken() {
s.Scraper.IsLoggedIn()
}
2 changes: 1 addition & 1 deletion internal/versioning/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ var (

// XXX: Bump this value only when there are protocol changes that makes the oracle
// incompatible between version!
TEEWorkerVersion = `gamma`
TEEWorkerVersion = `delta`
)
Loading