From 5f9fd3b094cc85fc8cc1aca04e91706c34fa4c13 Mon Sep 17 00:00:00 2001 From: grantdfoster Date: Fri, 15 Aug 2025 22:21:47 +0200 Subject: [PATCH 01/19] feat: supports tiktok apify capabilities --- Makefile | 3 + go.mod | 3 +- go.sum | 4 +- internal/capabilities/detector.go | 10 + internal/capabilities/detector_test.go | 32 ++ internal/jobs/stats/stats.go | 4 +- .../{tiktok_transcription.go => tiktok.go} | 298 ++++++++++-------- ...k_transcription_test.go => tiktok_test.go} | 175 +++++++++- internal/jobs/tiktokapify/client.go | 97 ++++++ internal/jobserver/jobserver.go | 2 +- 10 files changed, 483 insertions(+), 145 deletions(-) rename internal/jobs/{tiktok_transcription.go => tiktok.go} (50%) rename internal/jobs/{tiktok_transcription_test.go => tiktok_test.go} (54%) create mode 100644 internal/jobs/tiktokapify/client.go diff --git a/Makefile b/Makefile index 68f98a0b..2cd13a1e 100644 --- a/Makefile +++ b/Makefile @@ -76,6 +76,9 @@ test-jobs: docker-build-test test-twitter: docker-build-test @docker run --user root $(ENV_FILE_ARG) -v $(PWD)/.masa:/home/masa -v $(PWD)/coverage:/app/coverage --rm --workdir /app -e DATA_DIR=/home/masa $(TEST_IMAGE) go test -v ./internal/jobs/twitter_test.go ./internal/jobs/jobs_suite_test.go +test-tiktok: docker-build-test + @docker run --user root $(ENV_FILE_ARG) -v $(PWD)/.masa:/home/masa -v $(PWD)/coverage:/app/coverage --rm --workdir /app -e DATA_DIR=/home/masa $(TEST_IMAGE) go test -v ./internal/jobs/tiktok_test.go ./internal/jobs/jobs_suite_test.go + test-web: docker-build-test @docker run --user root $(ENV_FILE_ARG) -v $(PWD)/.masa:/home/masa -v $(PWD)/coverage:/app/coverage --rm --workdir /app -e DATA_DIR=/home/masa $(TEST_IMAGE) go test -v ./internal/jobs/webscraper_test.go ./internal/jobs/jobs_suite_test.go diff --git a/go.mod b/go.mod index 52861b0f..ebec5515 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,8 @@ require ( github.com/joho/godotenv v1.5.1 github.com/labstack/echo-contrib v0.17.4 github.com/labstack/echo/v4 v4.13.4 - github.com/masa-finance/tee-types v1.1.6 + // FIXME: update to 1.1.7 once released + github.com/masa-finance/tee-types v1.1.7-0.20250815192551-781daf346571 github.com/onsi/ginkgo/v2 v2.23.4 github.com/onsi/gomega v1.38.0 github.com/sirupsen/logrus v1.9.3 diff --git a/go.sum b/go.sum index b381ea6e..0aa0d403 100644 --- a/go.sum +++ b/go.sum @@ -57,8 +57,8 @@ github.com/labstack/echo/v4 v4.13.4 h1:oTZZW+T3s9gAu5L8vmzihV7/lkXGZuITzTQkTEhcX github.com/labstack/echo/v4 v4.13.4/go.mod h1:g63b33BZ5vZzcIUF8AtRH40DrTlXnx4UMC8rBdndmjQ= github.com/labstack/gommon v0.4.2 h1:F8qTUNXgG1+6WQmqoUWnz8WiEU60mXVVw0P4ht1WRA0= github.com/labstack/gommon v0.4.2/go.mod h1:QlUFxVM+SNXhDL/Z7YhocGIBYOiwB0mXm1+1bAPHPyU= -github.com/masa-finance/tee-types v1.1.6 h1:vw5gOK2ZoCnsmrjdY9NCUR9GY9c0VxvzwQy5V4sNemo= -github.com/masa-finance/tee-types v1.1.6/go.mod h1:sB98t0axFlPi2d0zUPFZSQ84mPGwbr9eRY5yLLE3fSc= +github.com/masa-finance/tee-types v1.1.7-0.20250815192551-781daf346571 h1:Ua/FW15oL9n8ZcyJJVgKcIu2EUlYgqm3lxdYPawHyzE= +github.com/masa-finance/tee-types v1.1.7-0.20250815192551-781daf346571/go.mod h1:sB98t0axFlPi2d0zUPFZSQ84mPGwbr9eRY5yLLE3fSc= github.com/masa-finance/twitter-scraper v1.0.2 h1:him+wvYZHg/7EDdy73z1ceUywDJDRAhPLD2CSEa2Vfk= github.com/masa-finance/twitter-scraper v1.0.2/go.mod h1:38MY3g/h4V7Xl4HbW9lnkL8S3YiFZenBFv86hN57RG8= github.com/mattn/go-colorable v0.1.14 h1:9A9LHSqF/7dyVVX6g0U9cwm9pG3kP9gSzcuIPHPsaIE= diff --git a/internal/capabilities/detector.go b/internal/capabilities/detector.go index bbdbb378..cde7d177 100644 --- a/internal/capabilities/detector.go +++ b/internal/capabilities/detector.go @@ -61,9 +61,19 @@ func DetectCapabilities(jc types.JobConfiguration, jobServer JobServerInterface) // Add Apify-specific capabilities based on available API key if hasApifyKey { capabilities[teetypes.TwitterApifyJob] = teetypes.TwitterApifyCaps + // Merge TikTok search caps with any existing (keep transcription) + existing := capabilities[teetypes.TiktokJob] + merged := append([]teetypes.Capability{}, existing...) + for _, c := range teetypes.TiktokSearchCaps { + if !slices.Contains(merged, c) { + merged = append(merged, c) + } + } + capabilities[teetypes.TiktokJob] = merged } // Add general TwitterJob capability if any Twitter auth is available + // TODO: this will get cleaned up with unique twitter capabilities if hasAccounts || hasApiKeys || hasApifyKey { var twitterJobCaps []teetypes.Capability // Use the most comprehensive capabilities available diff --git a/internal/capabilities/detector_test.go b/internal/capabilities/detector_test.go index eb26d9f5..3b8b04d5 100644 --- a/internal/capabilities/detector_test.go +++ b/internal/capabilities/detector_test.go @@ -180,3 +180,35 @@ func TestDetectCapabilities_ScraperTypes(t *testing.T) { }) } } + +func TestDetectCapabilities_Apify(t *testing.T) { + jc := types.JobConfiguration{ + "apify_api_key": "dummy", + } + + caps := DetectCapabilities(jc, nil) + + // TikTok should gain search capabilities + tiktokCaps, ok := caps[teetypes.TiktokJob] + if !ok { + t.Fatalf("expected tiktok capabilities to be present") + } + if !slices.Contains(tiktokCaps, teetypes.CapSearchByQuery) { + t.Errorf("expected tiktok to include capability %q", teetypes.CapSearchByQuery) + } + if !slices.Contains(tiktokCaps, teetypes.CapSearchByTrending) { + t.Errorf("expected tiktok to include capability %q", teetypes.CapSearchByTrending) + } + + // Twitter-Apify job should be present with follower/following capabilities + twitterApifyCaps, ok := caps[teetypes.TwitterApifyJob] + if !ok { + t.Fatalf("expected twitter-apify capabilities to be present") + } + if !slices.Contains(twitterApifyCaps, teetypes.CapGetFollowers) { + t.Errorf("expected twitter-apify to include capability %q", teetypes.CapGetFollowers) + } + if !slices.Contains(twitterApifyCaps, teetypes.CapGetFollowing) { + t.Errorf("expected twitter-apify to include capability %q", teetypes.CapGetFollowing) + } +} diff --git a/internal/jobs/stats/stats.go b/internal/jobs/stats/stats.go index 5eac0bd6..0c24882a 100644 --- a/internal/jobs/stats/stats.go +++ b/internal/jobs/stats/stats.go @@ -24,12 +24,14 @@ const ( TwitterErrors StatType = "twitter_errors" TwitterAuthErrors StatType = "twitter_auth_errors" TwitterRateErrors StatType = "twitter_ratelimit_errors" - TwitterXSearchQueries StatType = "twitterx_search" + TwitterXSearchQueries StatType = "twitterx_search" // TODO: investigate if this is needed or used... WebSuccess StatType = "web_success" WebErrors StatType = "web_errors" WebInvalid StatType = "web_invalid" TikTokTranscriptionSuccess StatType = "tiktok_transcription_success" TikTokTranscriptionErrors StatType = "tiktok_transcription_errors" + TikTokVideos StatType = "tiktok_returned_videos" + TikTokErrors StatType = "tiktok_errors" // TODO: Should we add stats for calls to each of the Twitter capabilities to decouple business / scoring logic? ) diff --git a/internal/jobs/tiktok_transcription.go b/internal/jobs/tiktok.go similarity index 50% rename from internal/jobs/tiktok_transcription.go rename to internal/jobs/tiktok.go index d7a704d1..60d28eed 100644 --- a/internal/jobs/tiktok_transcription.go +++ b/internal/jobs/tiktok.go @@ -4,7 +4,6 @@ import ( "bytes" "encoding/json" "fmt" - "io" "net/http" "strings" "time" @@ -13,6 +12,8 @@ import ( teetypes "github.com/masa-finance/tee-types/types" "github.com/masa-finance/tee-worker/api/types" "github.com/masa-finance/tee-worker/internal/jobs/stats" + "github.com/masa-finance/tee-worker/internal/jobs/tiktokapify" + "github.com/masa-finance/tee-worker/pkg/client" "github.com/sirupsen/logrus" ) @@ -27,6 +28,7 @@ type TikTokTranscriptionConfiguration struct { APIReferer string `json:"tiktok_api_referer,omitempty"` APIUserAgent string `json:"tiktok_api_user_agent,omitempty"` DefaultLanguage string `json:"tiktok_default_language,omitempty"` // e.g., "eng-US" + ApifyApiKey string `json:"apify_api_key,omitempty"` } // TikTokTranscriber is the main job struct for handling TikTok transcriptions. @@ -38,8 +40,13 @@ type TikTokTranscriber struct { // GetStructuredCapabilities returns the structured capabilities supported by the TikTok transcriber func (t *TikTokTranscriber) GetStructuredCapabilities() teetypes.WorkerCapabilities { + caps := make([]teetypes.Capability, 0, len(teetypes.AlwaysAvailableTiktokCaps)+len(teetypes.TiktokSearchCaps)) + caps = append(caps, teetypes.AlwaysAvailableTiktokCaps...) + if t.configuration.ApifyApiKey != "" { + caps = append(caps, teetypes.TiktokSearchCaps...) + } return teetypes.WorkerCapabilities{ - teetypes.TiktokJob: teetypes.AlwaysAvailableTiktokCaps, + teetypes.TiktokJob: caps, } } @@ -55,38 +62,42 @@ func NewTikTokTranscriber(jc types.JobConfiguration, statsCollector *stats.Stats // Get configurable values from job configuration if err := jc.Unmarshal(&config); err != nil { - logrus.WithError(err).Debug("TikTokTranscriber: Could not unmarshal job configuration, using all defaults") + logrus.WithError(err).Warn("failed to unmarshal TikTokTranscriptionConfiguration from JobConfiguration, using defaults where applicable") } - - // Set defaults for configurable values if not provided - if config.DefaultLanguage == "" { - config.DefaultLanguage = "eng-US" + // Ensure Apify key aligns with Twitter's pattern (explicit getter wins) + config.ApifyApiKey = jc.GetString("apify_api_key", config.ApifyApiKey) + if config.ApifyApiKey != "" { + if c, err := tiktokapify.NewTikTokApifyClient(config.ApifyApiKey); err != nil { + logrus.Errorf("Failed to create Apify client at startup: %v", err) + } else if err := c.ValidateApiKey(); err != nil { + logrus.Errorf("Apify API key validation failed at startup: %v", err) + } else { + logrus.Infof("Apify API key validated successfully at startup") + } } + // Note: APIUserAgent is optional, it can be set later or use a default if config.APIUserAgent == "" { - config.APIUserAgent = "Mozilla/5.0 (Linux; Android 6.0; Nexus 5 Build/MRA58N) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Mobile Safari/537.36" + config.APIUserAgent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36" } - // Log the actual configuration values being used - logrus.WithFields(logrus.Fields{ - "transcription_endpoint": config.TranscriptionEndpoint, - "api_origin": config.APIOrigin, - "api_referer": config.APIReferer, - "api_user_agent": config.APIUserAgent, - "default_language": config.DefaultLanguage, - }).Info("TikTokTranscriber initialized with configuration") - - httpClient := &http.Client{ - Timeout: 30 * time.Second, // Sensible default timeout + // If a default language is set in the configuration, use it + if config.DefaultLanguage == "" { + config.DefaultLanguage = "eng-US" } return &TikTokTranscriber{ configuration: config, stats: statsCollector, - httpClient: httpClient, + httpClient: &http.Client{Timeout: 30 * time.Second}, } } +// NewTikTokScraper is an alias constructor to align with Twitter's naming pattern +func NewTikTokScraper(jc types.JobConfiguration, statsCollector *stats.StatsCollector) *TikTokTranscriber { + return NewTikTokTranscriber(jc, statsCollector) +} + // APIResponse is used to unmarshal the JSON response from the transcription API. type APIResponse struct { VideoTitle string `json:"videoTitle"` @@ -97,12 +108,7 @@ type APIResponse struct { // ExecuteJob processes a single TikTok transcription job. func (ttt *TikTokTranscriber) ExecuteJob(j types.Job) (types.JobResult, error) { - logrus.WithField("job_uuid", j.UUID).Info("Starting ExecuteJob for TikTok transcription") - - if ttt.configuration.TranscriptionEndpoint == "" { - ttt.stats.Add(j.WorkerID, stats.TikTokTranscriptionErrors, 1) - return types.JobResult{Error: "TikTok transcription endpoint is not configured for the worker"}, fmt.Errorf("tiktok transcription endpoint not configured") - } + logrus.WithField("job_uuid", j.UUID).Info("Starting ExecuteJob for TikTok job") // Use the centralized type-safe unmarshaller jobArgs, err := teeargs.UnmarshalJobArguments(teetypes.JobType(j.Type), map[string]any(j.Arguments)) @@ -110,170 +116,184 @@ func (ttt *TikTokTranscriber) ExecuteJob(j types.Job) (types.JobResult, error) { return types.JobResult{Error: "Failed to unmarshal job arguments"}, fmt.Errorf("unmarshal job arguments: %w", err) } - // Type assert to TikTok arguments - tiktokArgs, ok := teeargs.AsTikTokArguments(jobArgs) + // Branch by argument type (transcription vs search) + if transcriptionArgs, ok := teeargs.AsTikTokTranscriptionArguments(jobArgs); ok { + return ttt.executeTranscription(j, transcriptionArgs) + } + if searchByQueryArgs, ok := teeargs.AsTikTokSearchByQueryArguments(jobArgs); ok { + return ttt.executeSearchByQuery(j, searchByQueryArgs) + } + if searchByTrendingArgs, ok := teeargs.AsTikTokSearchByTrendingArguments(jobArgs); ok { + return ttt.executeSearchByTrending(j, searchByTrendingArgs) + } + + // Fallback: treat as searchbyquery (default capability) + searchByQueryArgs, ok := teeargs.AsTikTokSearchByQueryArguments(jobArgs) if !ok { return types.JobResult{Error: "invalid argument type for TikTok job"}, fmt.Errorf("invalid argument type") } + return ttt.executeSearchByQuery(j, searchByQueryArgs) +} - // Use interface methods; no need to downcast - logrus.WithField("job_uuid", j.UUID).Infof("TikTok arguments validated: video_url=%s, language=%s, has_language_preference=%t", - tiktokArgs.GetVideoURL(), tiktokArgs.GetLanguageCode(), tiktokArgs.HasLanguagePreference()) - - // VideoURL validation is now handled by the unmarshaller, but we check again for safety - if tiktokArgs.GetVideoURL() == "" { +// executeTranscription calls the external transcription service and returns a normalized result +func (ttt *TikTokTranscriber) executeTranscription(j types.Job, a *teeargs.TikTokTranscriptionArguments) (types.JobResult, error) { + if ttt.configuration.TranscriptionEndpoint == "" { ttt.stats.Add(j.WorkerID, stats.TikTokTranscriptionErrors, 1) - return types.JobResult{Error: "VideoURL is required"}, fmt.Errorf("videoURL is required") + return types.JobResult{Error: "TikTok transcription endpoint is not configured for the worker"}, fmt.Errorf("tiktok transcription endpoint not configured") } - // Use the enhanced language selection logic - selectedLanguageKey := tiktokArgs.GetLanguageCode() // This handles defaults automatically - if tiktokArgs.HasLanguagePreference() { - logrus.WithField("job_uuid", j.UUID).Infof("Using custom language preference: %s", selectedLanguageKey) - } else { - logrus.WithField("job_uuid", j.UUID).Infof("Using default language: %s", selectedLanguageKey) + reqBody := map[string]any{ + "video_url": a.GetVideoURL(), } - - // Sub-Step 3.1: Call TikTok Transcription API - apiRequestBody := map[string]string{"url": tiktokArgs.GetVideoURL()} - jsonBody, err := json.Marshal(apiRequestBody) - if err != nil { - ttt.stats.Add(j.WorkerID, stats.TikTokTranscriptionErrors, 1) - return types.JobResult{Error: "Failed to marshal API request body"}, fmt.Errorf("marshal API request body: %w", err) + if a.HasLanguagePreference() { + reqBody["language"] = a.GetLanguageCode() } - req, err := http.NewRequest("POST", ttt.configuration.TranscriptionEndpoint, bytes.NewBuffer(jsonBody)) + payload, _ := json.Marshal(reqBody) + req, err := http.NewRequest(http.MethodPost, ttt.configuration.TranscriptionEndpoint, bytes.NewReader(payload)) if err != nil { - ttt.stats.Add(j.WorkerID, stats.TikTokTranscriptionErrors, 1) - return types.JobResult{Error: "Failed to create API request"}, fmt.Errorf("create API request: %w", err) + return types.JobResult{Error: "Failed to create request"}, fmt.Errorf("create request: %w", err) } - req.Header.Set("Content-Type", "application/json") + req.Header.Set("Accept", "application/json") if ttt.configuration.APIOrigin != "" { req.Header.Set("Origin", ttt.configuration.APIOrigin) } if ttt.configuration.APIReferer != "" { req.Header.Set("Referer", ttt.configuration.APIReferer) } - // User-Agent is set from config or default in NewTikTokTranscriber - req.Header.Set("User-Agent", ttt.configuration.APIUserAgent) - - logrus.WithFields(logrus.Fields{ - "job_uuid": j.UUID, - "url": tiktokArgs.GetVideoURL(), - "method": "POST", - "api_endpoint": ttt.configuration.TranscriptionEndpoint, - }).Info("Calling TikTok Transcription API") + if ttt.configuration.APIUserAgent != "" { + req.Header.Set("User-Agent", ttt.configuration.APIUserAgent) + } - apiResp, err := ttt.httpClient.Do(req) + resp, err := ttt.httpClient.Do(req) if err != nil { ttt.stats.Add(j.WorkerID, stats.TikTokTranscriptionErrors, 1) - return types.JobResult{Error: "API request failed"}, fmt.Errorf("API request execution: %w", err) + return types.JobResult{Error: "Failed to call transcription endpoint"}, fmt.Errorf("http call: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + ttt.stats.Add(j.WorkerID, stats.TikTokTranscriptionErrors, 1) + return types.JobResult{Error: fmt.Sprintf("transcription endpoint returned status %d", resp.StatusCode)}, fmt.Errorf("unexpected status %d", resp.StatusCode) } - defer apiResp.Body.Close() - if apiResp.StatusCode != http.StatusOK { - // Try to read body for more error details from API - bodyBytes, _ := io.ReadAll(apiResp.Body) - errMsg := fmt.Sprintf("API request failed with status code %d. Response: %s", apiResp.StatusCode, string(bodyBytes)) - logrus.WithField("job_uuid", j.UUID).Error(errMsg) + var apiResp APIResponse + if err := json.NewDecoder(resp.Body).Decode(&apiResp); err != nil { + ttt.stats.Add(j.WorkerID, stats.TikTokTranscriptionErrors, 1) + return types.JobResult{Error: "Failed to parse response"}, fmt.Errorf("decode response: %w", err) + } + if apiResp.Error != "" { ttt.stats.Add(j.WorkerID, stats.TikTokTranscriptionErrors, 1) - return types.JobResult{Error: errMsg}, fmt.Errorf(errMsg) + return types.JobResult{Error: apiResp.Error}, fmt.Errorf("api error: %s", apiResp.Error) } - var parsedAPIResponse APIResponse - if err := json.NewDecoder(apiResp.Body).Decode(&parsedAPIResponse); err != nil { + // Pick transcript language + chosenLang := a.GetLanguageCode() + transcriptVTT, ok := apiResp.Transcripts[chosenLang] + if !ok { + for lang, v := range apiResp.Transcripts { + chosenLang = lang + transcriptVTT = v + break + } + } + if transcriptVTT == "" { ttt.stats.Add(j.WorkerID, stats.TikTokTranscriptionErrors, 1) - return types.JobResult{Error: "Failed to parse API response"}, fmt.Errorf("parse API response: %w", err) + return types.JobResult{Error: "no transcripts available in response"}, fmt.Errorf("no transcripts available") } - if parsedAPIResponse.Error != "" { - errMsg := fmt.Sprintf("API returned an error: %s", parsedAPIResponse.Error) - logrus.WithField("job_uuid", j.UUID).Error(errMsg) + text, err := convertVTTToPlainText(transcriptVTT) + if err != nil { ttt.stats.Add(j.WorkerID, stats.TikTokTranscriptionErrors, 1) - return types.JobResult{Error: errMsg}, fmt.Errorf(errMsg) + return types.JobResult{Error: "failed to parse transcript"}, fmt.Errorf("parse vtt: %w", err) + } + + result := teetypes.TikTokTranscriptionResult{ + TranscriptionText: text, + DetectedLanguage: chosenLang, + VideoTitle: apiResp.VideoTitle, + OriginalURL: a.GetVideoURL(), + ThumbnailURL: apiResp.ThumbnailURL, } - // Sub-Step 3.2: Extract Transcription and Metadata - if len(parsedAPIResponse.Transcripts) == 0 { - errMsg := "No transcripts found in API response" - logrus.WithField("job_uuid", j.UUID).Warn(errMsg) - ttt.stats.Add(j.WorkerID, stats.TikTokTranscriptionErrors, 1) // Or a different stat for "no_transcript_found" - return types.JobResult{Error: errMsg}, fmt.Errorf(errMsg) + data, err := json.Marshal(result) + if err != nil { + return types.JobResult{Error: "failed to marshal result"}, fmt.Errorf("marshal result: %w", err) } - vttText := "" - finalDetectedLanguage := "" + ttt.stats.Add(j.WorkerID, stats.TikTokTranscriptionSuccess, 1) + return types.JobResult{Data: data}, nil +} - // Try requested/default language - if selectedLanguageKey != "" { - if transcript, ok := parsedAPIResponse.Transcripts[selectedLanguageKey]; ok { - vttText = transcript - finalDetectedLanguage = selectedLanguageKey - } +// executeSearchByQuery runs the epctex/tiktok-search-scraper actor and returns results +func (ttt *TikTokTranscriber) executeSearchByQuery(j types.Job, a *teeargs.TikTokSearchByQueryArguments) (types.JobResult, error) { + if ttt.configuration.ApifyApiKey == "" { + ttt.stats.Add(j.WorkerID, stats.TikTokErrors, 1) + return types.JobResult{Error: "Apify API key not configured for searchbyquery"}, fmt.Errorf("missing Apify API key") } - // If not found, try a hardcoded common default or first available - if vttText == "" { - commonDefault := "eng-US" // As per spec - if transcript, ok := parsedAPIResponse.Transcripts[commonDefault]; ok { - vttText = transcript - finalDetectedLanguage = commonDefault - } else { // Pick the first one available if commonDefault also not found - for lang, transcript := range parsedAPIResponse.Transcripts { - vttText = transcript - finalDetectedLanguage = lang - logrus.WithFields(logrus.Fields{ - "job_uuid": j.UUID, - "requested_lang": selectedLanguageKey, - "fallback_used": finalDetectedLanguage, - }).Info("Requested/default language not found, using first available transcript") - break - } - } + c, err := tiktokapify.NewTikTokApifyClient(ttt.configuration.ApifyApiKey) + if err != nil { + ttt.stats.Add(j.WorkerID, stats.TikTokErrors, 1) + return types.JobResult{Error: "Failed to create Apify client"}, fmt.Errorf("apify client: %w", err) } - if vttText == "" { - errMsg := "Suitable transcript could not be extracted from API response" - logrus.WithField("job_uuid", j.UUID).Error(errMsg) - ttt.stats.Add(j.WorkerID, stats.TikTokTranscriptionErrors, 1) - return types.JobResult{Error: errMsg}, fmt.Errorf(errMsg) + limit := a.MaxItems + if limit <= 0 { + limit = 20 } - logrus.Debugf("Job %s: Raw VTT content for language %s:\n%s", j.UUID, finalDetectedLanguage, vttText) + items, next, err := c.SearchByQuery(*a, client.EmptyCursor, limit) + if err != nil { + ttt.stats.Add(j.WorkerID, stats.TikTokErrors, 1) + return types.JobResult{Error: err.Error()}, err + } - // Convert VTT to Plain Text - plainTextTranscription, err := convertVTTToPlainText(vttText) + data, err := json.Marshal(items) if err != nil { - // This error is more about our parsing than the API - errMsg := fmt.Sprintf("Failed to convert VTT to plain text: %v", err) - logrus.WithField("job_uuid", j.UUID).Error(errMsg) - ttt.stats.Add(j.WorkerID, stats.TikTokTranscriptionErrors, 1) - return types.JobResult{Error: errMsg}, fmt.Errorf(errMsg) + // Do not increment error stats for marshal errors; not the worker's fault + return types.JobResult{Error: "Failed to marshal results"}, fmt.Errorf("marshal results: %w", err) } - // Process Result & Return - resultData := teetypes.TikTokTranscriptionResult{ - TranscriptionText: plainTextTranscription, - DetectedLanguage: finalDetectedLanguage, - VideoTitle: parsedAPIResponse.VideoTitle, - OriginalURL: tiktokArgs.GetVideoURL(), - ThumbnailURL: parsedAPIResponse.ThumbnailURL, + // Increment returned videos based on the number of items + ttt.stats.Add(j.WorkerID, stats.TikTokVideos, uint(len(items))) + return types.JobResult{Data: data, NextCursor: next.String()}, nil +} + +// executeSearchByTrending runs the lexis-solutions/tiktok-trending-videos-scraper actor and returns results +func (ttt *TikTokTranscriber) executeSearchByTrending(j types.Job, a *teeargs.TikTokSearchByTrendingArguments) (types.JobResult, error) { + if ttt.configuration.ApifyApiKey == "" { + ttt.stats.Add(j.WorkerID, stats.TikTokErrors, 1) + return types.JobResult{Error: "Apify API key not configured for searchbytrending"}, fmt.Errorf("missing Apify API key") } - jsonData, err := json.Marshal(resultData) + c, err := tiktokapify.NewTikTokApifyClient(ttt.configuration.ApifyApiKey) if err != nil { - ttt.stats.Add(j.WorkerID, stats.TikTokTranscriptionErrors, 1) - return types.JobResult{Error: "Failed to marshal result data"}, fmt.Errorf("marshal result data: %w", err) + ttt.stats.Add(j.WorkerID, stats.TikTokErrors, 1) + return types.JobResult{Error: "Failed to create Apify client"}, fmt.Errorf("apify client: %w", err) } - logrus.WithFields(logrus.Fields{ - "job_uuid": j.UUID, - "video_title": resultData.VideoTitle, - "detected_language": resultData.DetectedLanguage, - }).Info("Successfully processed TikTok transcription job") - ttt.stats.Add(j.WorkerID, stats.TikTokTranscriptionSuccess, 1) - return types.JobResult{Data: jsonData}, nil + limit := a.MaxItems + if limit <= 0 { + limit = 20 + } + + items, next, err := c.SearchByTrending(*a, client.EmptyCursor, limit) + if err != nil { + ttt.stats.Add(j.WorkerID, stats.TikTokErrors, 1) + return types.JobResult{Error: err.Error()}, err + } + + data, err := json.Marshal(items) + if err != nil { + // Do not increment error stats for marshal errors; not the worker's fault + return types.JobResult{Error: "Failed to marshal results"}, fmt.Errorf("marshal results: %w", err) + } + + // Increment returned videos based on the number of items + ttt.stats.Add(j.WorkerID, stats.TikTokVideos, uint(len(items))) + return types.JobResult{Data: data, NextCursor: next.String()}, nil } // convertVTTToPlainText parses a VTT string and extracts the dialogue lines. diff --git a/internal/jobs/tiktok_transcription_test.go b/internal/jobs/tiktok_test.go similarity index 54% rename from internal/jobs/tiktok_transcription_test.go rename to internal/jobs/tiktok_test.go index 0b3b9aad..f2759ca2 100644 --- a/internal/jobs/tiktok_transcription_test.go +++ b/internal/jobs/tiktok_test.go @@ -2,6 +2,8 @@ package jobs_test import ( "encoding/json" + "fmt" + "os" "strings" "time" @@ -15,7 +17,7 @@ import ( "github.com/sirupsen/logrus" ) -var _ = Describe("TikTokTranscriber", func() { +var _ = Describe("TikTok", func() { var statsCollector *stats.StatsCollector var tikTokTranscriber *TikTokTranscriber var jobConfig types.JobConfiguration @@ -156,4 +158,175 @@ var _ = Describe("TikTokTranscriber", func() { }, 5*time.Second, 100*time.Millisecond).Should(BeNumerically("==", 0), "TikTokTranscriptionSuccess count should be 0") }) }) + + Context("TikTok Apify search", func() { + It("should search by query via Apify", func() { + apifyKey := os.Getenv("APIFY_API_KEY") + if apifyKey == "" { + Skip("APIFY_API_KEY is not set") + } + + jobConfig := types.JobConfiguration{ + "apify_api_key": apifyKey, + } + t := NewTikTokTranscriber(jobConfig, statsCollector) + + j := types.Job{ + Type: teetypes.TiktokJob, + Arguments: map[string]interface{}{ + "type": teetypes.CapSearchByQuery, + "search": []string{"crypto", "ai"}, + "max_items": 5, + "end_page": 1, + "proxy": map[string]any{"use_apify_proxy": true}, + }, + WorkerID: "tiktok-test-worker-search-query", + Timeout: 60 * time.Second, + } + + res, err := t.ExecuteJob(j) + Expect(err).NotTo(HaveOccurred()) + Expect(res.Error).To(BeEmpty()) + + var items []*teetypes.TikTokSearchByQueryResult + err = json.Unmarshal(res.Data, &items) + Expect(err).NotTo(HaveOccurred()) + Expect(items).NotTo(BeEmpty()) + + for _, item := range items { + fmt.Println("Video: ", item.URL) + } + + expectedCount := uint(len(items)) + Eventually(func() uint { + if statsCollector == nil || statsCollector.Stats == nil || statsCollector.Stats.Stats == nil { + return 0 + } + workerStatsMap := statsCollector.Stats.Stats[j.WorkerID] + if workerStatsMap == nil { + return 0 + } + return workerStatsMap[stats.TikTokVideos] + }, 15*time.Second, 250*time.Millisecond).Should(BeNumerically("==", expectedCount), "TikTokVideos count should equal returned items") + + Eventually(func() uint { + if statsCollector == nil || statsCollector.Stats == nil || statsCollector.Stats.Stats == nil { + return 0 + } + workerStatsMap := statsCollector.Stats.Stats[j.WorkerID] + if workerStatsMap == nil { + return 0 + } + return workerStatsMap[stats.TikTokErrors] + }, 5*time.Second, 100*time.Millisecond).Should(BeNumerically("==", 0), "TikTokErrors should be 0 on success") + }) + + It("should search trending via Apify", func() { + apifyKey := os.Getenv("APIFY_API_KEY") + if apifyKey == "" { + Skip("APIFY_API_KEY is not set") + } + + jobConfig := types.JobConfiguration{ + "apify_api_key": apifyKey, + } + t := NewTikTokTranscriber(jobConfig, statsCollector) + + j := types.Job{ + Type: teetypes.TiktokJob, + Arguments: map[string]interface{}{ + "type": teetypes.CapSearchByTrending, + "country_code": "US", + "sort_by": "vv", + "max_items": 5, + "period": "7", + }, + WorkerID: "tiktok-test-worker-search-trending", + Timeout: 60 * time.Second, + } + + res, err := t.ExecuteJob(j) + Expect(err).NotTo(HaveOccurred()) + Expect(res.Error).To(BeEmpty()) + + var items []*teetypes.TikTokSearchByTrending + err = json.Unmarshal(res.Data, &items) + Expect(err).NotTo(HaveOccurred()) + Expect(items).NotTo(BeEmpty()) + + for _, item := range items { + fmt.Println("Video: ", item.Title) + } + + expectedCount := uint(len(items)) + Eventually(func() uint { + if statsCollector == nil || statsCollector.Stats == nil || statsCollector.Stats.Stats == nil { + return 0 + } + workerStatsMap := statsCollector.Stats.Stats[j.WorkerID] + if workerStatsMap == nil { + return 0 + } + return workerStatsMap[stats.TikTokVideos] + }, 15*time.Second, 250*time.Millisecond).Should(BeNumerically("==", expectedCount), "TikTokVideos count should equal returned items") + + Eventually(func() uint { + if statsCollector == nil || statsCollector.Stats == nil || statsCollector.Stats.Stats == nil { + return 0 + } + workerStatsMap := statsCollector.Stats.Stats[j.WorkerID] + if workerStatsMap == nil { + return 0 + } + return workerStatsMap[stats.TikTokErrors] + }, 5*time.Second, 100*time.Millisecond).Should(BeNumerically("==", 0), "TikTokErrors should be 0 on success") + + fmt.Println(items) + + }) + + It("should increment TikTokErrors when Apify key is missing", func() { + // No APIFY_API_KEY provided in config + jobConfig := types.JobConfiguration{} + t := NewTikTokTranscriber(jobConfig, statsCollector) + + j := types.Job{ + Type: teetypes.TiktokJob, + Arguments: map[string]interface{}{ + "type": teetypes.CapSearchByQuery, + "search": []string{"tiktok"}, + "max_items": 1, + "end_page": 1, + }, + WorkerID: "tiktok-test-worker-missing-key", + Timeout: 10 * time.Second, + } + + res, err := t.ExecuteJob(j) + Expect(err).To(HaveOccurred()) + Expect(res.Error).NotTo(BeEmpty()) + + Eventually(func() uint { + if statsCollector == nil || statsCollector.Stats == nil || statsCollector.Stats.Stats == nil { + return 0 + } + workerStatsMap := statsCollector.Stats.Stats[j.WorkerID] + if workerStatsMap == nil { + return 0 + } + return workerStatsMap[stats.TikTokErrors] + }, 5*time.Second, 100*time.Millisecond).Should(BeNumerically("==", 1), "TikTokErrors should increment by 1 for missing API key") + + Consistently(func() uint { + if statsCollector == nil || statsCollector.Stats == nil || statsCollector.Stats.Stats == nil { + return 0 + } + workerStatsMap := statsCollector.Stats.Stats[j.WorkerID] + if workerStatsMap == nil { + return 0 + } + return workerStatsMap[stats.TikTokVideos] + }, 1*time.Second, 100*time.Millisecond).Should(BeNumerically("==", 0), "TikTokVideos should remain 0 on error") + }) + }) }) diff --git a/internal/jobs/tiktokapify/client.go b/internal/jobs/tiktokapify/client.go new file mode 100644 index 00000000..8af59f29 --- /dev/null +++ b/internal/jobs/tiktokapify/client.go @@ -0,0 +1,97 @@ +package tiktokapify + +import ( + "encoding/json" + "fmt" + + teeargs "github.com/masa-finance/tee-types/args" + teetypes "github.com/masa-finance/tee-types/types" + "github.com/masa-finance/tee-worker/pkg/client" +) + +const ( + // Actors + SearchActorID = "epctex~tiktok-search-scraper" // must rent this actor from apify explicitly + TrendingActorID = "lexis-solutions~tiktok-trending-videos-scraper" // must rent this actor from apify explicitly +) + +type TikTokApifyClient struct { + apify *client.ApifyClient +} + +func NewTikTokApifyClient(apiToken string) (*TikTokApifyClient, error) { + apifyClient, err := client.NewApifyClient(apiToken) + if err != nil { + return nil, fmt.Errorf("failed to create Apify client: %w", err) + } + return &TikTokApifyClient{apify: apifyClient}, nil +} + +// ValidateApiKey validates the underlying Apify API token +func (c *TikTokApifyClient) ValidateApiKey() error { + return c.apify.ValidateApiKey() +} + +// SearchByQuery runs the search actor and returns typed results +func (c *TikTokApifyClient) SearchByQuery(input teeargs.TikTokSearchByQueryArguments, cursor client.Cursor, limit int) ([]*teetypes.TikTokSearchByQueryResult, client.Cursor, error) { + // Map snake_case fields to Apify actor's expected camelCase input + startUrls := input.StartUrls + if startUrls == nil { + startUrls = []string{} + } + searchTerms := input.Search + if searchTerms == nil { + searchTerms = []string{} + } + + apifyInput := map[string]any{ + "search": searchTerms, + "startUrls": startUrls, + "maxItems": input.MaxItems, + "endPage": input.EndPage, + } + if input.Proxy != nil { + apifyInput["proxy"] = map[string]any{"useApifyProxy": input.Proxy.UseApifyProxy} + } + + dataset, next, err := c.apify.RunActorAndGetResponse(SearchActorID, apifyInput, cursor, limit) + if err != nil { + return nil, "", fmt.Errorf("apify run (search): %w", err) + } + + var results []*teetypes.TikTokSearchByQueryResult + for _, raw := range dataset.Data.Items { + var item teetypes.TikTokSearchByQueryResult + if err := json.Unmarshal(raw, &item); err != nil { + // If structure differs for some items, skip + continue + } + results = append(results, &item) + } + return results, next, nil +} + +// SearchByTrending runs the trending actor and returns typed results +func (c *TikTokApifyClient) SearchByTrending(input teeargs.TikTokSearchByTrendingArguments, cursor client.Cursor, limit int) ([]*teetypes.TikTokSearchByTrending, client.Cursor, error) { + apifyInput := map[string]any{ + "countryCode": input.CountryCode, + "sortBy": input.SortBy, + "maxItems": input.MaxItems, + "period": input.Period, + } + + dataset, next, err := c.apify.RunActorAndGetResponse(TrendingActorID, apifyInput, cursor, limit) + if err != nil { + return nil, "", fmt.Errorf("apify run (trending): %w", err) + } + + var results []*teetypes.TikTokSearchByTrending + for _, raw := range dataset.Data.Items { + var item teetypes.TikTokSearchByTrending + if err := json.Unmarshal(raw, &item); err != nil { + continue + } + results = append(results, &item) + } + return results, next, nil +} diff --git a/internal/jobserver/jobserver.go b/internal/jobserver/jobserver.go index 45cfd9a6..74b87b16 100644 --- a/internal/jobserver/jobserver.go +++ b/internal/jobserver/jobserver.go @@ -100,7 +100,7 @@ func NewJobServer(workers int, jc types.JobConfiguration) *JobServer { w: jobs.NewTelemetryJob(jc, s), }, teetypes.TiktokJob: { - w: jobs.NewTikTokTranscriber(jc, s), + w: jobs.NewTikTokScraper(jc, s), }, } // Validate that all workers were initialized successfully From 0263ea5645f0f5551227faf6f46ad5707898293d Mon Sep 17 00:00:00 2001 From: grantdfoster Date: Fri, 15 Aug 2025 22:44:35 +0200 Subject: [PATCH 02/19] fix: tiktok test --- internal/jobs/tiktok_test.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/internal/jobs/tiktok_test.go b/internal/jobs/tiktok_test.go index f2759ca2..ec4d151e 100644 --- a/internal/jobs/tiktok_test.go +++ b/internal/jobs/tiktok_test.go @@ -115,6 +115,7 @@ var _ = Describe("TikTok", func() { Context("when arguments are invalid", func() { It("should return an error if VideoURL is empty and not record error stats", func() { jobArguments := map[string]interface{}{ + "type": teetypes.CapTranscription, "video_url": "", // Empty URL } @@ -237,7 +238,7 @@ var _ = Describe("TikTok", func() { Arguments: map[string]interface{}{ "type": teetypes.CapSearchByTrending, "country_code": "US", - "sort_by": "vv", + "sort_by": "repost", "max_items": 5, "period": "7", }, @@ -280,9 +281,6 @@ var _ = Describe("TikTok", func() { } return workerStatsMap[stats.TikTokErrors] }, 5*time.Second, 100*time.Millisecond).Should(BeNumerically("==", 0), "TikTokErrors should be 0 on success") - - fmt.Println(items) - }) It("should increment TikTokErrors when Apify key is missing", func() { From 71ddc8d7af62b2663c027db5b2e91bc94c3d7236 Mon Sep 17 00:00:00 2001 From: grantdfoster Date: Fri, 15 Aug 2025 22:50:42 +0200 Subject: [PATCH 03/19] fix: transcription test --- internal/jobs/tiktok_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/jobs/tiktok_test.go b/internal/jobs/tiktok_test.go index ec4d151e..430d69c9 100644 --- a/internal/jobs/tiktok_test.go +++ b/internal/jobs/tiktok_test.go @@ -45,6 +45,7 @@ var _ = Describe("TikTok", func() { It("should successfully transcribe the video and record success stats", func(ctx SpecContext) { videoURL := "https://www.tiktok.com/@coachty23/video/7502100651397172526" jobArguments := map[string]interface{}{ + "type": teetypes.CapTranscription, "video_url": videoURL, "language": "eng-US", // Request a specific language } From 70a4067c98cfd7dd9408755ef3feaec2f45a1cb2 Mon Sep 17 00:00:00 2001 From: grantdfoster Date: Fri, 15 Aug 2025 23:01:23 +0200 Subject: [PATCH 04/19] fix: tiktok transcription test --- internal/jobs/tiktok.go | 2 +- internal/jobs/tiktok_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/jobs/tiktok.go b/internal/jobs/tiktok.go index 60d28eed..0f86804c 100644 --- a/internal/jobs/tiktok.go +++ b/internal/jobs/tiktok.go @@ -143,7 +143,7 @@ func (ttt *TikTokTranscriber) executeTranscription(j types.Job, a *teeargs.TikTo } reqBody := map[string]any{ - "video_url": a.GetVideoURL(), + "url": a.GetVideoURL(), } if a.HasLanguagePreference() { reqBody["language"] = a.GetLanguageCode() diff --git a/internal/jobs/tiktok_test.go b/internal/jobs/tiktok_test.go index 430d69c9..a819b671 100644 --- a/internal/jobs/tiktok_test.go +++ b/internal/jobs/tiktok_test.go @@ -43,7 +43,7 @@ var _ = Describe("TikTok", func() { Context("when a valid TikTok URL is provided", func() { It("should successfully transcribe the video and record success stats", func(ctx SpecContext) { - videoURL := "https://www.tiktok.com/@coachty23/video/7502100651397172526" + videoURL := "https://www.tiktok.com/@theblockrunner.com/video/7227579907361066282" jobArguments := map[string]interface{}{ "type": teetypes.CapTranscription, "video_url": videoURL, From 02ae853f086efec8cc31efb5491f38c64063e9a6 Mon Sep 17 00:00:00 2001 From: grantdfoster Date: Thu, 21 Aug 2025 02:12:48 +0200 Subject: [PATCH 05/19] fix: favor set --- internal/capabilities/detector.go | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/internal/capabilities/detector.go b/internal/capabilities/detector.go index cde7d177..e036fe48 100644 --- a/internal/capabilities/detector.go +++ b/internal/capabilities/detector.go @@ -4,6 +4,7 @@ import ( "slices" "strings" + util "github.com/masa-finance/tee-types/pkg/util" teetypes "github.com/masa-finance/tee-types/types" "github.com/masa-finance/tee-worker/api/types" "github.com/masa-finance/tee-worker/internal/jobs/twitter" @@ -63,13 +64,9 @@ func DetectCapabilities(jc types.JobConfiguration, jobServer JobServerInterface) capabilities[teetypes.TwitterApifyJob] = teetypes.TwitterApifyCaps // Merge TikTok search caps with any existing (keep transcription) existing := capabilities[teetypes.TiktokJob] - merged := append([]teetypes.Capability{}, existing...) - for _, c := range teetypes.TiktokSearchCaps { - if !slices.Contains(merged, c) { - merged = append(merged, c) - } - } - capabilities[teetypes.TiktokJob] = merged + s := util.NewSet(existing...) + s.Add(teetypes.TiktokSearchCaps...) + capabilities[teetypes.TiktokJob] = s.Items() } // Add general TwitterJob capability if any Twitter auth is available From 4d8b13ab52fa375b610e46f0b3d82a1899c699a0 Mon Sep 17 00:00:00 2001 From: grantdfoster Date: Thu, 21 Aug 2025 02:21:22 +0200 Subject: [PATCH 06/19] fix: merge conflict on tiktok job --- internal/jobs/tiktok.go | 58 ++++++++--------------------------------- 1 file changed, 11 insertions(+), 47 deletions(-) diff --git a/internal/jobs/tiktok.go b/internal/jobs/tiktok.go index a39d88a6..0f86804c 100644 --- a/internal/jobs/tiktok.go +++ b/internal/jobs/tiktok.go @@ -188,58 +188,22 @@ func (ttt *TikTokTranscriber) executeTranscription(j types.Job, a *teeargs.TikTo return types.JobResult{Error: apiResp.Error}, fmt.Errorf("api error: %s", apiResp.Error) } - // Sub-Step 3.2: Extract Transcription and Metadata - if len(parsedAPIResponse.Transcripts) == 0 { - errMsg := "No transcripts found in API response" - logrus.WithField("job_uuid", j.UUID).Warn(errMsg) - ttt.stats.Add(j.WorkerID, stats.TikTokTranscriptionErrors, 1) // Or a different stat for "no_transcript_found" - return types.JobResult{Error: errMsg}, fmt.Errorf(errMsg) - } - - vttText := "" - - // Directly use the requested/default language; if missing, return an error - if transcript, ok := parsedAPIResponse.Transcripts[selectedLanguageKey]; ok && strings.TrimSpace(transcript) != "" { - vttText = transcript - } else { - errMsg := fmt.Sprintf("Transcript for requested language %s not found in API response", selectedLanguageKey) - logrus.WithFields(logrus.Fields{ - "job_uuid": j.UUID, - "requested_lang": selectedLanguageKey, - }).Error(errMsg) - ttt.stats.Add(j.WorkerID, stats.TikTokTranscriptionErrors, 1) - return types.JobResult{Error: errMsg}, fmt.Errorf(errMsg) + // Pick transcript language + chosenLang := a.GetLanguageCode() + transcriptVTT, ok := apiResp.Transcripts[chosenLang] + if !ok { + for lang, v := range apiResp.Transcripts { + chosenLang = lang + transcriptVTT = v + break + } } - - if vttText == "" { - errMsg := "Suitable transcript could not be extracted from API response" - logrus.WithField("job_uuid", j.UUID).Error(errMsg) + if transcriptVTT == "" { ttt.stats.Add(j.WorkerID, stats.TikTokTranscriptionErrors, 1) return types.JobResult{Error: "no transcripts available in response"}, fmt.Errorf("no transcripts available") } - logrus.Debugf("Job %s: Raw VTT content for language %s:\n%s", j.UUID, selectedLanguageKey, vttText) - - // Convert VTT to Plain Text - plainTextTranscription, err := convertVTTToPlainText(vttText) - if err != nil { - // This error is more about our parsing than the API - errMsg := fmt.Sprintf("Failed to convert VTT to plain text: %v", err) - logrus.WithField("job_uuid", j.UUID).Error(errMsg) - ttt.stats.Add(j.WorkerID, stats.TikTokTranscriptionErrors, 1) - return types.JobResult{Error: errMsg}, fmt.Errorf(errMsg) - } - - // Process Result & Return - resultData := teetypes.TikTokTranscriptionResult{ - TranscriptionText: plainTextTranscription, - DetectedLanguage: selectedLanguageKey, - VideoTitle: parsedAPIResponse.VideoTitle, - OriginalURL: tiktokArgs.GetVideoURL(), - ThumbnailURL: parsedAPIResponse.ThumbnailURL, - } - - jsonData, err := json.Marshal(resultData) + text, err := convertVTTToPlainText(transcriptVTT) if err != nil { ttt.stats.Add(j.WorkerID, stats.TikTokTranscriptionErrors, 1) return types.JobResult{Error: "failed to parse transcript"}, fmt.Errorf("parse vtt: %w", err) From bd7c7e7f8657d982b590a480aadf4fe09e802ba9 Mon Sep 17 00:00:00 2001 From: grantdfoster Date: Thu, 21 Aug 2025 02:26:39 +0200 Subject: [PATCH 07/19] fix: merge with latest transcription fix --- internal/jobs/tiktok.go | 161 +++++++++++++++++++++++++++++----------- 1 file changed, 118 insertions(+), 43 deletions(-) diff --git a/internal/jobs/tiktok.go b/internal/jobs/tiktok.go index 0f86804c..7330f637 100644 --- a/internal/jobs/tiktok.go +++ b/internal/jobs/tiktok.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/json" "fmt" + "io" "net/http" "strings" "time" @@ -137,93 +138,167 @@ func (ttt *TikTokTranscriber) ExecuteJob(j types.Job) (types.JobResult, error) { // executeTranscription calls the external transcription service and returns a normalized result func (ttt *TikTokTranscriber) executeTranscription(j types.Job, a *teeargs.TikTokTranscriptionArguments) (types.JobResult, error) { + logrus.WithField("job_uuid", j.UUID).Info("Starting ExecuteJob for TikTok transcription") + if ttt.configuration.TranscriptionEndpoint == "" { ttt.stats.Add(j.WorkerID, stats.TikTokTranscriptionErrors, 1) return types.JobResult{Error: "TikTok transcription endpoint is not configured for the worker"}, fmt.Errorf("tiktok transcription endpoint not configured") } - reqBody := map[string]any{ - "url": a.GetVideoURL(), + // Use the centralized type-safe unmarshaller + jobArgs, err := teeargs.UnmarshalJobArguments(teetypes.JobType(j.Type), map[string]any(j.Arguments)) + if err != nil { + return types.JobResult{Error: "Failed to unmarshal job arguments"}, fmt.Errorf("unmarshal job arguments: %w", err) + } + + // Type assert to TikTok arguments + tiktokArgs, ok := teeargs.AsTikTokTranscriptionArguments(jobArgs) + if !ok { + return types.JobResult{Error: "invalid argument type for TikTok job"}, fmt.Errorf("invalid argument type") + } + + // Use interface methods; no need to downcast + logrus.WithField("job_uuid", j.UUID).Infof("TikTok arguments validated: video_url=%s, language=%s, has_language_preference=%t", + tiktokArgs.GetVideoURL(), tiktokArgs.GetLanguageCode(), tiktokArgs.HasLanguagePreference()) + + // VideoURL validation is now handled by the unmarshaller, but we check again for safety + if tiktokArgs.GetVideoURL() == "" { + ttt.stats.Add(j.WorkerID, stats.TikTokTranscriptionErrors, 1) + return types.JobResult{Error: "VideoURL is required"}, fmt.Errorf("videoURL is required") + } + + // Use the enhanced language selection logic + selectedLanguageKey := tiktokArgs.GetLanguageCode() // This handles defaults automatically + if tiktokArgs.HasLanguagePreference() { + logrus.WithField("job_uuid", j.UUID).Infof("Using custom language preference: %s", selectedLanguageKey) + } else { + logrus.WithField("job_uuid", j.UUID).Infof("Using default language: %s", selectedLanguageKey) } - if a.HasLanguagePreference() { - reqBody["language"] = a.GetLanguageCode() + + // Sub-Step 3.1: Call TikTok Transcription API + apiRequestBody := map[string]string{"url": tiktokArgs.GetVideoURL()} + jsonBody, err := json.Marshal(apiRequestBody) + if err != nil { + ttt.stats.Add(j.WorkerID, stats.TikTokTranscriptionErrors, 1) + return types.JobResult{Error: "Failed to marshal API request body"}, fmt.Errorf("marshal API request body: %w", err) } - payload, _ := json.Marshal(reqBody) - req, err := http.NewRequest(http.MethodPost, ttt.configuration.TranscriptionEndpoint, bytes.NewReader(payload)) + req, err := http.NewRequest("POST", ttt.configuration.TranscriptionEndpoint, bytes.NewBuffer(jsonBody)) if err != nil { - return types.JobResult{Error: "Failed to create request"}, fmt.Errorf("create request: %w", err) + ttt.stats.Add(j.WorkerID, stats.TikTokTranscriptionErrors, 1) + return types.JobResult{Error: "Failed to create API request"}, fmt.Errorf("create API request: %w", err) } + req.Header.Set("Content-Type", "application/json") - req.Header.Set("Accept", "application/json") if ttt.configuration.APIOrigin != "" { req.Header.Set("Origin", ttt.configuration.APIOrigin) } if ttt.configuration.APIReferer != "" { req.Header.Set("Referer", ttt.configuration.APIReferer) } - if ttt.configuration.APIUserAgent != "" { - req.Header.Set("User-Agent", ttt.configuration.APIUserAgent) - } + // User-Agent is set from config or default in NewTikTokTranscriber + req.Header.Set("User-Agent", ttt.configuration.APIUserAgent) + + logrus.WithFields(logrus.Fields{ + "job_uuid": j.UUID, + "url": tiktokArgs.GetVideoURL(), + "method": "POST", + "api_endpoint": ttt.configuration.TranscriptionEndpoint, + }).Info("Calling TikTok Transcription API") - resp, err := ttt.httpClient.Do(req) + apiResp, err := ttt.httpClient.Do(req) if err != nil { ttt.stats.Add(j.WorkerID, stats.TikTokTranscriptionErrors, 1) - return types.JobResult{Error: "Failed to call transcription endpoint"}, fmt.Errorf("http call: %w", err) + return types.JobResult{Error: "API request failed"}, fmt.Errorf("API request execution: %w", err) } - defer resp.Body.Close() + defer apiResp.Body.Close() - if resp.StatusCode < 200 || resp.StatusCode >= 300 { + if apiResp.StatusCode != http.StatusOK { + // Try to read body for more error details from API + bodyBytes, _ := io.ReadAll(apiResp.Body) + errMsg := fmt.Sprintf("API request failed with status code %d. Response: %s", apiResp.StatusCode, string(bodyBytes)) + logrus.WithField("job_uuid", j.UUID).Error(errMsg) ttt.stats.Add(j.WorkerID, stats.TikTokTranscriptionErrors, 1) - return types.JobResult{Error: fmt.Sprintf("transcription endpoint returned status %d", resp.StatusCode)}, fmt.Errorf("unexpected status %d", resp.StatusCode) + return types.JobResult{Error: errMsg}, fmt.Errorf(errMsg) } - var apiResp APIResponse - if err := json.NewDecoder(resp.Body).Decode(&apiResp); err != nil { + var parsedAPIResponse APIResponse + if err := json.NewDecoder(apiResp.Body).Decode(&parsedAPIResponse); err != nil { ttt.stats.Add(j.WorkerID, stats.TikTokTranscriptionErrors, 1) - return types.JobResult{Error: "Failed to parse response"}, fmt.Errorf("decode response: %w", err) + return types.JobResult{Error: "Failed to parse API response"}, fmt.Errorf("parse API response: %w", err) } - if apiResp.Error != "" { + + if parsedAPIResponse.Error != "" { + errMsg := fmt.Sprintf("API returned an error: %s", parsedAPIResponse.Error) + logrus.WithField("job_uuid", j.UUID).Error(errMsg) ttt.stats.Add(j.WorkerID, stats.TikTokTranscriptionErrors, 1) - return types.JobResult{Error: apiResp.Error}, fmt.Errorf("api error: %s", apiResp.Error) + return types.JobResult{Error: errMsg}, fmt.Errorf(errMsg) } - // Pick transcript language - chosenLang := a.GetLanguageCode() - transcriptVTT, ok := apiResp.Transcripts[chosenLang] - if !ok { - for lang, v := range apiResp.Transcripts { - chosenLang = lang - transcriptVTT = v - break - } + // Sub-Step 3.2: Extract Transcription and Metadata + if len(parsedAPIResponse.Transcripts) == 0 { + errMsg := "No transcripts found in API response" + logrus.WithField("job_uuid", j.UUID).Warn(errMsg) + ttt.stats.Add(j.WorkerID, stats.TikTokTranscriptionErrors, 1) // Or a different stat for "no_transcript_found" + return types.JobResult{Error: errMsg}, fmt.Errorf(errMsg) + } + + vttText := "" + + // Directly use the requested/default language; if missing, return an error + if transcript, ok := parsedAPIResponse.Transcripts[selectedLanguageKey]; ok && strings.TrimSpace(transcript) != "" { + vttText = transcript + } else { + errMsg := fmt.Sprintf("Transcript for requested language %s not found in API response", selectedLanguageKey) + logrus.WithFields(logrus.Fields{ + "job_uuid": j.UUID, + "requested_lang": selectedLanguageKey, + }).Error(errMsg) + ttt.stats.Add(j.WorkerID, stats.TikTokTranscriptionErrors, 1) + return types.JobResult{Error: errMsg}, fmt.Errorf(errMsg) } - if transcriptVTT == "" { + + if vttText == "" { + errMsg := "Suitable transcript could not be extracted from API response" + logrus.WithField("job_uuid", j.UUID).Error(errMsg) ttt.stats.Add(j.WorkerID, stats.TikTokTranscriptionErrors, 1) - return types.JobResult{Error: "no transcripts available in response"}, fmt.Errorf("no transcripts available") + return types.JobResult{Error: errMsg}, fmt.Errorf(errMsg) } - text, err := convertVTTToPlainText(transcriptVTT) + logrus.Debugf("Job %s: Raw VTT content for language %s:\n%s", j.UUID, selectedLanguageKey, vttText) + + // Convert VTT to Plain Text + plainTextTranscription, err := convertVTTToPlainText(vttText) if err != nil { + // This error is more about our parsing than the API + errMsg := fmt.Sprintf("Failed to convert VTT to plain text: %v", err) + logrus.WithField("job_uuid", j.UUID).Error(errMsg) ttt.stats.Add(j.WorkerID, stats.TikTokTranscriptionErrors, 1) - return types.JobResult{Error: "failed to parse transcript"}, fmt.Errorf("parse vtt: %w", err) + return types.JobResult{Error: errMsg}, fmt.Errorf(errMsg) } - result := teetypes.TikTokTranscriptionResult{ - TranscriptionText: text, - DetectedLanguage: chosenLang, - VideoTitle: apiResp.VideoTitle, - OriginalURL: a.GetVideoURL(), - ThumbnailURL: apiResp.ThumbnailURL, + // Process Result & Return + resultData := teetypes.TikTokTranscriptionResult{ + TranscriptionText: plainTextTranscription, + DetectedLanguage: selectedLanguageKey, + VideoTitle: parsedAPIResponse.VideoTitle, + OriginalURL: tiktokArgs.GetVideoURL(), + ThumbnailURL: parsedAPIResponse.ThumbnailURL, } - data, err := json.Marshal(result) + jsonData, err := json.Marshal(resultData) if err != nil { - return types.JobResult{Error: "failed to marshal result"}, fmt.Errorf("marshal result: %w", err) + ttt.stats.Add(j.WorkerID, stats.TikTokTranscriptionErrors, 1) + return types.JobResult{Error: "Failed to marshal result data"}, fmt.Errorf("marshal result data: %w", err) } + logrus.WithFields(logrus.Fields{ + "job_uuid": j.UUID, + "video_title": resultData.VideoTitle, + "detected_language": resultData.DetectedLanguage, + }).Info("Successfully processed TikTok transcription job") ttt.stats.Add(j.WorkerID, stats.TikTokTranscriptionSuccess, 1) - return types.JobResult{Data: data}, nil + return types.JobResult{Data: jsonData}, nil } // executeSearchByQuery runs the epctex/tiktok-search-scraper actor and returns results From 3d11e6c356a8dfcd38868c0436a9289ecf1ae1ae Mon Sep 17 00:00:00 2001 From: grantdfoster Date: Thu, 21 Aug 2025 02:30:14 +0200 Subject: [PATCH 08/19] fix: hardcodes eng-US in test --- internal/jobs/tiktok_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/jobs/tiktok_test.go b/internal/jobs/tiktok_test.go index 8490d1ad..dd2a0e22 100644 --- a/internal/jobs/tiktok_test.go +++ b/internal/jobs/tiktok_test.go @@ -47,6 +47,7 @@ var _ = Describe("TikTok", func() { jobArguments := map[string]interface{}{ "type": teetypes.CapTranscription, "video_url": videoURL, + "language": "eng-US", // default language is eng-US from tee types } From c4c6ab6173ecf7fda56ddaf67434fa4e7285c4aa Mon Sep 17 00:00:00 2001 From: grantdfoster Date: Thu, 21 Aug 2025 02:32:20 +0200 Subject: [PATCH 09/19] fix: use default language --- internal/jobs/tiktok_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/internal/jobs/tiktok_test.go b/internal/jobs/tiktok_test.go index dd2a0e22..8490d1ad 100644 --- a/internal/jobs/tiktok_test.go +++ b/internal/jobs/tiktok_test.go @@ -47,7 +47,6 @@ var _ = Describe("TikTok", func() { jobArguments := map[string]interface{}{ "type": teetypes.CapTranscription, "video_url": videoURL, - "language": "eng-US", // default language is eng-US from tee types } From f3234da02fc5df4992cc1eea06542f65a55f2290 Mon Sep 17 00:00:00 2001 From: grantdfoster Date: Thu, 21 Aug 2025 02:33:35 +0200 Subject: [PATCH 10/19] fix: point to latest tee types --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index ebec5515..2c2236d8 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/labstack/echo-contrib v0.17.4 github.com/labstack/echo/v4 v4.13.4 // FIXME: update to 1.1.7 once released - github.com/masa-finance/tee-types v1.1.7-0.20250815192551-781daf346571 + github.com/masa-finance/tee-types v1.1.8-0.20250821003151-e9cd1f3350fb github.com/onsi/ginkgo/v2 v2.23.4 github.com/onsi/gomega v1.38.0 github.com/sirupsen/logrus v1.9.3 diff --git a/go.sum b/go.sum index 0aa0d403..0f63bba7 100644 --- a/go.sum +++ b/go.sum @@ -57,8 +57,8 @@ github.com/labstack/echo/v4 v4.13.4 h1:oTZZW+T3s9gAu5L8vmzihV7/lkXGZuITzTQkTEhcX github.com/labstack/echo/v4 v4.13.4/go.mod h1:g63b33BZ5vZzcIUF8AtRH40DrTlXnx4UMC8rBdndmjQ= github.com/labstack/gommon v0.4.2 h1:F8qTUNXgG1+6WQmqoUWnz8WiEU60mXVVw0P4ht1WRA0= github.com/labstack/gommon v0.4.2/go.mod h1:QlUFxVM+SNXhDL/Z7YhocGIBYOiwB0mXm1+1bAPHPyU= -github.com/masa-finance/tee-types v1.1.7-0.20250815192551-781daf346571 h1:Ua/FW15oL9n8ZcyJJVgKcIu2EUlYgqm3lxdYPawHyzE= -github.com/masa-finance/tee-types v1.1.7-0.20250815192551-781daf346571/go.mod h1:sB98t0axFlPi2d0zUPFZSQ84mPGwbr9eRY5yLLE3fSc= +github.com/masa-finance/tee-types v1.1.8-0.20250821003151-e9cd1f3350fb h1:n0HmrxsOELoweYODOoJRMKoGRk67BMsR5B2zDYeIwFY= +github.com/masa-finance/tee-types v1.1.8-0.20250821003151-e9cd1f3350fb/go.mod h1:sB98t0axFlPi2d0zUPFZSQ84mPGwbr9eRY5yLLE3fSc= github.com/masa-finance/twitter-scraper v1.0.2 h1:him+wvYZHg/7EDdy73z1ceUywDJDRAhPLD2CSEa2Vfk= github.com/masa-finance/twitter-scraper v1.0.2/go.mod h1:38MY3g/h4V7Xl4HbW9lnkL8S3YiFZenBFv86hN57RG8= github.com/mattn/go-colorable v0.1.14 h1:9A9LHSqF/7dyVVX6g0U9cwm9pG3kP9gSzcuIPHPsaIE= From b40df3a98fd5f0a10c75ba6c0c4fe73d94673cab Mon Sep 17 00:00:00 2001 From: grantdfoster Date: Tue, 26 Aug 2025 20:22:18 +0200 Subject: [PATCH 11/19] chore: updates tee types and casts correctly --- go.mod | 3 ++- go.sum | 4 ++-- internal/jobs/tiktok.go | 10 +++++----- internal/jobs/tiktokapify/client.go | 10 ++++------ internal/jobs/twitter.go | 6 +++--- internal/jobs/webscraper.go | 2 +- 6 files changed, 17 insertions(+), 18 deletions(-) diff --git a/go.mod b/go.mod index a899daea..660ee857 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,8 @@ require ( github.com/joho/godotenv v1.5.1 github.com/labstack/echo-contrib v0.17.4 github.com/labstack/echo/v4 v4.13.4 - github.com/masa-finance/tee-types v1.1.10 + // FIXME: remove this once the types are released + github.com/masa-finance/tee-types v1.1.11-0.20250826175307-b42bf7bb17e0 github.com/onsi/ginkgo/v2 v2.23.4 github.com/onsi/gomega v1.38.0 github.com/sirupsen/logrus v1.9.3 diff --git a/go.sum b/go.sum index 5c3c5310..8dc1d7a2 100644 --- a/go.sum +++ b/go.sum @@ -57,8 +57,8 @@ github.com/labstack/echo/v4 v4.13.4 h1:oTZZW+T3s9gAu5L8vmzihV7/lkXGZuITzTQkTEhcX github.com/labstack/echo/v4 v4.13.4/go.mod h1:g63b33BZ5vZzcIUF8AtRH40DrTlXnx4UMC8rBdndmjQ= github.com/labstack/gommon v0.4.2 h1:F8qTUNXgG1+6WQmqoUWnz8WiEU60mXVVw0P4ht1WRA0= github.com/labstack/gommon v0.4.2/go.mod h1:QlUFxVM+SNXhDL/Z7YhocGIBYOiwB0mXm1+1bAPHPyU= -github.com/masa-finance/tee-types v1.1.10 h1:mn/wF84Yg6tXH+JigRwluWaBwHT8SeNMzGVDgclC+08= -github.com/masa-finance/tee-types v1.1.10/go.mod h1:sB98t0axFlPi2d0zUPFZSQ84mPGwbr9eRY5yLLE3fSc= +github.com/masa-finance/tee-types v1.1.11-0.20250826175307-b42bf7bb17e0 h1:f5FYfxfJ5xoAejRrleiIfHLl0DGHfEBn7hLLDaRXoi8= +github.com/masa-finance/tee-types v1.1.11-0.20250826175307-b42bf7bb17e0/go.mod h1:sB98t0axFlPi2d0zUPFZSQ84mPGwbr9eRY5yLLE3fSc= github.com/masa-finance/twitter-scraper v1.0.2 h1:him+wvYZHg/7EDdy73z1ceUywDJDRAhPLD2CSEa2Vfk= github.com/masa-finance/twitter-scraper v1.0.2/go.mod h1:38MY3g/h4V7Xl4HbW9lnkL8S3YiFZenBFv86hN57RG8= github.com/mattn/go-colorable v0.1.14 h1:9A9LHSqF/7dyVVX6g0U9cwm9pG3kP9gSzcuIPHPsaIE= diff --git a/internal/jobs/tiktok.go b/internal/jobs/tiktok.go index 7330f637..3222dbd7 100644 --- a/internal/jobs/tiktok.go +++ b/internal/jobs/tiktok.go @@ -118,18 +118,18 @@ func (ttt *TikTokTranscriber) ExecuteJob(j types.Job) (types.JobResult, error) { } // Branch by argument type (transcription vs search) - if transcriptionArgs, ok := teeargs.AsTikTokTranscriptionArguments(jobArgs); ok { + if transcriptionArgs, ok := jobArgs.(*teeargs.TikTokTranscriptionArguments); ok { return ttt.executeTranscription(j, transcriptionArgs) } - if searchByQueryArgs, ok := teeargs.AsTikTokSearchByQueryArguments(jobArgs); ok { + if searchByQueryArgs, ok := jobArgs.(*teeargs.TikTokSearchByQueryArguments); ok { return ttt.executeSearchByQuery(j, searchByQueryArgs) } - if searchByTrendingArgs, ok := teeargs.AsTikTokSearchByTrendingArguments(jobArgs); ok { + if searchByTrendingArgs, ok := jobArgs.(*teeargs.TikTokSearchByTrendingArguments); ok { return ttt.executeSearchByTrending(j, searchByTrendingArgs) } // Fallback: treat as searchbyquery (default capability) - searchByQueryArgs, ok := teeargs.AsTikTokSearchByQueryArguments(jobArgs) + searchByQueryArgs, ok := jobArgs.(*teeargs.TikTokSearchByQueryArguments) if !ok { return types.JobResult{Error: "invalid argument type for TikTok job"}, fmt.Errorf("invalid argument type") } @@ -152,7 +152,7 @@ func (ttt *TikTokTranscriber) executeTranscription(j types.Job, a *teeargs.TikTo } // Type assert to TikTok arguments - tiktokArgs, ok := teeargs.AsTikTokTranscriptionArguments(jobArgs) + tiktokArgs, ok := jobArgs.(*teeargs.TikTokTranscriptionArguments) if !ok { return types.JobResult{Error: "invalid argument type for TikTok job"}, fmt.Errorf("invalid argument type") } diff --git a/internal/jobs/tiktokapify/client.go b/internal/jobs/tiktokapify/client.go index 8af59f29..081fadb6 100644 --- a/internal/jobs/tiktokapify/client.go +++ b/internal/jobs/tiktokapify/client.go @@ -16,7 +16,7 @@ const ( ) type TikTokApifyClient struct { - apify *client.ApifyClient + apify client.Apify } func NewTikTokApifyClient(apiToken string) (*TikTokApifyClient, error) { @@ -33,7 +33,7 @@ func (c *TikTokApifyClient) ValidateApiKey() error { } // SearchByQuery runs the search actor and returns typed results -func (c *TikTokApifyClient) SearchByQuery(input teeargs.TikTokSearchByQueryArguments, cursor client.Cursor, limit int) ([]*teetypes.TikTokSearchByQueryResult, client.Cursor, error) { +func (c *TikTokApifyClient) SearchByQuery(input teeargs.TikTokSearchByQueryArguments, cursor client.Cursor, limit uint) ([]*teetypes.TikTokSearchByQueryResult, client.Cursor, error) { // Map snake_case fields to Apify actor's expected camelCase input startUrls := input.StartUrls if startUrls == nil { @@ -49,9 +49,7 @@ func (c *TikTokApifyClient) SearchByQuery(input teeargs.TikTokSearchByQueryArgum "startUrls": startUrls, "maxItems": input.MaxItems, "endPage": input.EndPage, - } - if input.Proxy != nil { - apifyInput["proxy"] = map[string]any{"useApifyProxy": input.Proxy.UseApifyProxy} + "proxy": map[string]any{"useApifyProxy": true}, } dataset, next, err := c.apify.RunActorAndGetResponse(SearchActorID, apifyInput, cursor, limit) @@ -72,7 +70,7 @@ func (c *TikTokApifyClient) SearchByQuery(input teeargs.TikTokSearchByQueryArgum } // SearchByTrending runs the trending actor and returns typed results -func (c *TikTokApifyClient) SearchByTrending(input teeargs.TikTokSearchByTrendingArguments, cursor client.Cursor, limit int) ([]*teetypes.TikTokSearchByTrending, client.Cursor, error) { +func (c *TikTokApifyClient) SearchByTrending(input teeargs.TikTokSearchByTrendingArguments, cursor client.Cursor, limit uint) ([]*teetypes.TikTokSearchByTrending, client.Cursor, error) { apifyInput := map[string]any{ "countryCode": input.CountryCode, "sortBy": input.SortBy, diff --git a/internal/jobs/twitter.go b/internal/jobs/twitter.go index 116671b4..71b77d85 100644 --- a/internal/jobs/twitter.go +++ b/internal/jobs/twitter.go @@ -1347,7 +1347,7 @@ func (ts *TwitterScraper) ExecuteJob(j types.Job) (types.JobResult, error) { } // Type assert to Twitter arguments - twitterArgs, ok := teeargs.AsTwitterArguments(jobArgs) + twitterArgs, ok := jobArgs.(*teeargs.TwitterSearchArguments) if !ok { logrus.Errorf("Expected Twitter arguments for job ID %s, type %s", j.UUID, j.Type) return types.JobResult{Error: "invalid argument type for Twitter job"}, fmt.Errorf("invalid argument type") @@ -1358,8 +1358,8 @@ func (ts *TwitterScraper) ExecuteJob(j types.Job) (types.JobResult, error) { strategy := getScrapeStrategy(j.Type) - // Convert to concrete type for direct usage - args := twitterArgs.(*teeargs.TwitterSearchArguments) + // Use the already cast concrete type directly + args := twitterArgs jobResult, err := strategy.Execute(j, ts, args) if err != nil { diff --git a/internal/jobs/webscraper.go b/internal/jobs/webscraper.go index 7e4d652c..a90fe55d 100644 --- a/internal/jobs/webscraper.go +++ b/internal/jobs/webscraper.go @@ -55,7 +55,7 @@ func (ws *WebScraper) ExecuteJob(j types.Job) (types.JobResult, error) { } // Type assert to Web arguments - args, ok := teeargs.AsWebArguments(jobArgs) + args, ok := jobArgs.(*teeargs.WebSearchArguments) if !ok { logrus.Errorf("Expected Web arguments for job ID %s, type %s", j.UUID, j.Type) return types.JobResult{Error: "invalid argument type for Web job"}, nil From a8d01d67eee9bb3dabface702cec420f6f17e3e0 Mon Sep 17 00:00:00 2001 From: grantdfoster Date: Tue, 26 Aug 2025 20:35:06 +0200 Subject: [PATCH 12/19] fix: converts limit to uint and refactors detector test --- internal/capabilities/detector_test.go | 255 +++++++++++-------------- internal/jobs/tiktok.go | 2 +- 2 files changed, 111 insertions(+), 146 deletions(-) diff --git a/internal/capabilities/detector_test.go b/internal/capabilities/detector_test.go index 3b8b04d5..06acdd76 100644 --- a/internal/capabilities/detector_test.go +++ b/internal/capabilities/detector_test.go @@ -1,12 +1,14 @@ -package capabilities +package capabilities_test import ( - "reflect" "slices" - "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" teetypes "github.com/masa-finance/tee-types/types" "github.com/masa-finance/tee-worker/api/types" + . "github.com/masa-finance/tee-worker/internal/capabilities" ) // MockJobServer implements JobServerInterface for testing @@ -18,17 +20,32 @@ func (m *MockJobServer) GetWorkerCapabilities() teetypes.WorkerCapabilities { return m.capabilities } -func TestDetectCapabilities(t *testing.T) { - tests := []struct { - name string - jc types.JobConfiguration - jobServer JobServerInterface - expected teetypes.WorkerCapabilities - }{ - { - name: "With JobServer - gets capabilities from workers", - jc: types.JobConfiguration{}, - jobServer: &MockJobServer{ +var _ = Describe("DetectCapabilities", func() { + DescribeTable("capability detection scenarios", + func(jc types.JobConfiguration, jobServer JobServerInterface, expected teetypes.WorkerCapabilities) { + got := DetectCapabilities(jc, jobServer) + + // Extract job type keys and sort for consistent comparison + gotKeys := make([]string, 0, len(got)) + for jobType := range got { + gotKeys = append(gotKeys, jobType.String()) + } + + expectedKeys := make([]string, 0, len(expected)) + for jobType := range expected { + expectedKeys = append(expectedKeys, jobType.String()) + } + + // Sort both slices for comparison + slices.Sort(gotKeys) + slices.Sort(expectedKeys) + + // Compare the sorted slices + Expect(gotKeys).To(Equal(expectedKeys)) + }, + Entry("With JobServer - gets capabilities from workers", + types.JobConfiguration{}, + &MockJobServer{ capabilities: teetypes.WorkerCapabilities{ teetypes.WebJob: {teetypes.CapScraper}, teetypes.TelemetryJob: {teetypes.CapTelemetry}, @@ -36,58 +53,54 @@ func TestDetectCapabilities(t *testing.T) { teetypes.TwitterJob: {teetypes.CapSearchByQuery, teetypes.CapGetById, teetypes.CapGetProfileById}, }, }, - expected: teetypes.WorkerCapabilities{ + teetypes.WorkerCapabilities{ teetypes.WebJob: {teetypes.CapScraper}, teetypes.TelemetryJob: {teetypes.CapTelemetry}, teetypes.TiktokJob: {teetypes.CapTranscription}, teetypes.TwitterJob: {teetypes.CapSearchByQuery, teetypes.CapGetById, teetypes.CapGetProfileById}, }, - }, - { - name: "Without JobServer - basic capabilities only", - jc: types.JobConfiguration{}, - jobServer: nil, - expected: teetypes.WorkerCapabilities{ + ), + Entry("Without JobServer - basic capabilities only", + types.JobConfiguration{}, + nil, + teetypes.WorkerCapabilities{ teetypes.WebJob: {teetypes.CapScraper}, teetypes.TelemetryJob: {teetypes.CapTelemetry}, teetypes.TiktokJob: {teetypes.CapTranscription}, }, - }, - { - name: "With Twitter accounts - adds credential capabilities", - jc: types.JobConfiguration{ + ), + Entry("With Twitter accounts - adds credential capabilities", + types.JobConfiguration{ "twitter_accounts": []string{"account1", "account2"}, }, - jobServer: nil, - expected: teetypes.WorkerCapabilities{ + nil, + teetypes.WorkerCapabilities{ teetypes.WebJob: {teetypes.CapScraper}, teetypes.TelemetryJob: {teetypes.CapTelemetry}, teetypes.TiktokJob: {teetypes.CapTranscription}, teetypes.TwitterCredentialJob: teetypes.TwitterCredentialCaps, teetypes.TwitterJob: teetypes.TwitterCredentialCaps, }, - }, - { - name: "With Twitter API keys - adds API capabilities", - jc: types.JobConfiguration{ + ), + Entry("With Twitter API keys - adds API capabilities", + types.JobConfiguration{ "twitter_api_keys": []string{"key1", "key2"}, }, - jobServer: nil, - expected: teetypes.WorkerCapabilities{ + nil, + teetypes.WorkerCapabilities{ teetypes.WebJob: {teetypes.CapScraper}, teetypes.TelemetryJob: {teetypes.CapTelemetry}, teetypes.TiktokJob: {teetypes.CapTranscription}, teetypes.TwitterApiJob: teetypes.TwitterAPICaps, teetypes.TwitterJob: teetypes.TwitterAPICaps, }, - }, - { - name: "With mock elevated Twitter API keys - only basic capabilities detected", - jc: types.JobConfiguration{ + ), + Entry("With mock elevated Twitter API keys - only basic capabilities detected", + types.JobConfiguration{ "twitter_api_keys": []string{"Bearer abcd1234-ELEVATED"}, }, - jobServer: nil, - expected: teetypes.WorkerCapabilities{ + nil, + teetypes.WorkerCapabilities{ teetypes.WebJob: {teetypes.CapScraper}, teetypes.TelemetryJob: {teetypes.CapTelemetry}, teetypes.TiktokJob: {teetypes.CapTranscription}, @@ -95,120 +108,72 @@ func TestDetectCapabilities(t *testing.T) { teetypes.TwitterApiJob: teetypes.TwitterAPICaps, teetypes.TwitterJob: teetypes.TwitterAPICaps, }, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got := DetectCapabilities(tt.jc, tt.jobServer) - - // Extract job type keys and sort for consistent comparison - gotKeys := make([]string, 0, len(got)) - for jobType := range got { - gotKeys = append(gotKeys, jobType.String()) + ), + ) + + Context("Scraper Types", func() { + DescribeTable("scraper type detection", + func(jc types.JobConfiguration, expectedKeys []string) { + caps := DetectCapabilities(jc, nil) + + jobNames := make([]string, 0, len(caps)) + for jobType := range caps { + jobNames = append(jobNames, jobType.String()) + } + + // Sort both slices for comparison + slices.Sort(jobNames) + expectedSorted := make([]string, len(expectedKeys)) + copy(expectedSorted, expectedKeys) + slices.Sort(expectedSorted) + + // Compare the sorted slices + Expect(jobNames).To(Equal(expectedSorted)) + }, + Entry("Basic scrapers only", + types.JobConfiguration{}, + []string{"web", "telemetry", "tiktok"}, + ), + Entry("With Twitter accounts", + types.JobConfiguration{ + "twitter_accounts": []string{"user1:pass1"}, + }, + []string{"web", "telemetry", "tiktok", "twitter", "twitter-credential"}, + ), + Entry("With Twitter API keys", + types.JobConfiguration{ + "twitter_api_keys": []string{"key1"}, + }, + []string{"web", "telemetry", "tiktok", "twitter", "twitter-api"}, + ), + ) + }) + + Context("Apify Integration", func() { + It("should add enhanced capabilities when Apify API key is provided", func() { + jc := types.JobConfiguration{ + "apify_api_key": "dummy", } - expectedKeys := make([]string, 0, len(tt.expected)) - for jobType := range tt.expected { - expectedKeys = append(expectedKeys, jobType.String()) - } + caps := DetectCapabilities(jc, nil) - // Sort both slices for comparison - slices.Sort(gotKeys) - slices.Sort(expectedKeys) + // TikTok should gain search capabilities + tiktokCaps, ok := caps[teetypes.TiktokJob] + Expect(ok).To(BeTrue(), "expected tiktok capabilities to be present") + Expect(tiktokCaps).To(ContainElement(teetypes.CapSearchByQuery), "expected tiktok to include CapSearchByQuery capability") + Expect(tiktokCaps).To(ContainElement(teetypes.CapSearchByTrending), "expected tiktok to include CapSearchByTrending capability") - // Compare the sorted slices - if !reflect.DeepEqual(gotKeys, expectedKeys) { - t.Errorf("DetectCapabilities() job types = %v, want %v", gotKeys, expectedKeys) - } + // Twitter-Apify job should be present with follower/following capabilities + twitterApifyCaps, ok := caps[teetypes.TwitterApifyJob] + Expect(ok).To(BeTrue(), "expected twitter-apify capabilities to be present") + Expect(twitterApifyCaps).To(ContainElement(teetypes.CapGetFollowers), "expected twitter-apify to include CapGetFollowers capability") + Expect(twitterApifyCaps).To(ContainElement(teetypes.CapGetFollowing), "expected twitter-apify to include CapGetFollowing capability") }) - } -} + }) +}) // Helper function to check if a job type exists in capabilities func hasJobType(capabilities teetypes.WorkerCapabilities, jobName string) bool { _, exists := capabilities[teetypes.JobType(jobName)] return exists } - -func TestDetectCapabilities_ScraperTypes(t *testing.T) { - tests := []struct { - name string - jc types.JobConfiguration - expectedKeys []string // scraper names we expect - }{ - { - name: "Basic scrapers only", - jc: types.JobConfiguration{}, - expectedKeys: []string{"web", "telemetry", "tiktok"}, - }, - { - name: "With Twitter accounts", - jc: types.JobConfiguration{ - "twitter_accounts": []string{"user1:pass1"}, - }, - expectedKeys: []string{"web", "telemetry", "tiktok", "twitter", "twitter-credential"}, - }, - { - name: "With Twitter API keys", - jc: types.JobConfiguration{ - "twitter_api_keys": []string{"key1"}, - }, - expectedKeys: []string{"web", "telemetry", "tiktok", "twitter", "twitter-api"}, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - caps := DetectCapabilities(tt.jc, nil) - - jobNames := make([]string, 0, len(caps)) - for jobType := range caps { - jobNames = append(jobNames, jobType.String()) - } - - // Sort both slices for comparison - slices.Sort(jobNames) - expectedSorted := make([]string, len(tt.expectedKeys)) - copy(expectedSorted, tt.expectedKeys) - slices.Sort(expectedSorted) - - // Compare the sorted slices - if !reflect.DeepEqual(jobNames, expectedSorted) { - t.Errorf("Expected capabilities %v, got %v", expectedSorted, jobNames) - } - }) - } -} - -func TestDetectCapabilities_Apify(t *testing.T) { - jc := types.JobConfiguration{ - "apify_api_key": "dummy", - } - - caps := DetectCapabilities(jc, nil) - - // TikTok should gain search capabilities - tiktokCaps, ok := caps[teetypes.TiktokJob] - if !ok { - t.Fatalf("expected tiktok capabilities to be present") - } - if !slices.Contains(tiktokCaps, teetypes.CapSearchByQuery) { - t.Errorf("expected tiktok to include capability %q", teetypes.CapSearchByQuery) - } - if !slices.Contains(tiktokCaps, teetypes.CapSearchByTrending) { - t.Errorf("expected tiktok to include capability %q", teetypes.CapSearchByTrending) - } - - // Twitter-Apify job should be present with follower/following capabilities - twitterApifyCaps, ok := caps[teetypes.TwitterApifyJob] - if !ok { - t.Fatalf("expected twitter-apify capabilities to be present") - } - if !slices.Contains(twitterApifyCaps, teetypes.CapGetFollowers) { - t.Errorf("expected twitter-apify to include capability %q", teetypes.CapGetFollowers) - } - if !slices.Contains(twitterApifyCaps, teetypes.CapGetFollowing) { - t.Errorf("expected twitter-apify to include capability %q", teetypes.CapGetFollowing) - } -} diff --git a/internal/jobs/tiktok.go b/internal/jobs/tiktok.go index 3222dbd7..1caf4f21 100644 --- a/internal/jobs/tiktok.go +++ b/internal/jobs/tiktok.go @@ -354,7 +354,7 @@ func (ttt *TikTokTranscriber) executeSearchByTrending(j types.Job, a *teeargs.Ti limit = 20 } - items, next, err := c.SearchByTrending(*a, client.EmptyCursor, limit) + items, next, err := c.SearchByTrending(*a, client.EmptyCursor, uint(limit)) if err != nil { ttt.stats.Add(j.WorkerID, stats.TikTokErrors, 1) return types.JobResult{Error: err.Error()}, err From 11ca1d9bdee0e3c63052ff32a4f04222cf2666b5 Mon Sep 17 00:00:00 2001 From: grantdfoster Date: Tue, 26 Aug 2025 20:39:57 +0200 Subject: [PATCH 13/19] chore: adds suite for capabilities testing --- internal/capabilities/capabilities_suite_test.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) create mode 100644 internal/capabilities/capabilities_suite_test.go diff --git a/internal/capabilities/capabilities_suite_test.go b/internal/capabilities/capabilities_suite_test.go new file mode 100644 index 00000000..224f32a0 --- /dev/null +++ b/internal/capabilities/capabilities_suite_test.go @@ -0,0 +1,13 @@ +package capabilities_test + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestCapabilities(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Capabilities Suite") +} From a369e7d164fa89910c7714d72f0edc20a16b5e57 Mon Sep 17 00:00:00 2001 From: grantdfoster Date: Tue, 26 Aug 2025 21:00:06 +0200 Subject: [PATCH 14/19] chore: updates tiktok client with dedicated structs --- internal/jobs/tiktokapify/client.go | 61 +++++++++++++++++++++++------ 1 file changed, 49 insertions(+), 12 deletions(-) diff --git a/internal/jobs/tiktokapify/client.go b/internal/jobs/tiktokapify/client.go index 081fadb6..63a88a8b 100644 --- a/internal/jobs/tiktokapify/client.go +++ b/internal/jobs/tiktokapify/client.go @@ -15,6 +15,21 @@ const ( TrendingActorID = "lexis-solutions~tiktok-trending-videos-scraper" // must rent this actor from apify explicitly ) +type TikTokSearchByQueryRequest struct { + SearchTerms []string `json:"search"` + StartUrls []string `json:"startUrls"` + MaxItems uint `json:"maxItems"` + EndPage uint `json:"endPage"` + Proxy map[string]any `json:"proxy"` +} + +type TikTokSearchByTrendingRequest struct { + CountryCode string `json:"countryCode"` + SortBy string `json:"sortBy"` + MaxItems uint `json:"maxItems"` + Period string `json:"period"` +} + type TikTokApifyClient struct { apify client.Apify } @@ -44,12 +59,24 @@ func (c *TikTokApifyClient) SearchByQuery(input teeargs.TikTokSearchByQueryArgum searchTerms = []string{} } - apifyInput := map[string]any{ - "search": searchTerms, - "startUrls": startUrls, - "maxItems": input.MaxItems, - "endPage": input.EndPage, - "proxy": map[string]any{"useApifyProxy": true}, + // Create structured request using the TikTokSearchByQueryRequest struct + request := TikTokSearchByQueryRequest{ + SearchTerms: searchTerms, + StartUrls: startUrls, + MaxItems: input.MaxItems, + EndPage: input.EndPage, + Proxy: map[string]any{"useApifyProxy": true}, + } + + // Convert struct to map[string]any for Apify client + requestBytes, err := json.Marshal(request) + if err != nil { + return nil, "", fmt.Errorf("failed to marshal request: %w", err) + } + + var apifyInput map[string]any + if err := json.Unmarshal(requestBytes, &apifyInput); err != nil { + return nil, "", fmt.Errorf("failed to unmarshal to map: %w", err) } dataset, next, err := c.apify.RunActorAndGetResponse(SearchActorID, apifyInput, cursor, limit) @@ -61,7 +88,7 @@ func (c *TikTokApifyClient) SearchByQuery(input teeargs.TikTokSearchByQueryArgum for _, raw := range dataset.Data.Items { var item teetypes.TikTokSearchByQueryResult if err := json.Unmarshal(raw, &item); err != nil { - // If structure differs for some items, skip + // Skip any items whose structure doesn't match continue } results = append(results, &item) @@ -71,11 +98,21 @@ func (c *TikTokApifyClient) SearchByQuery(input teeargs.TikTokSearchByQueryArgum // SearchByTrending runs the trending actor and returns typed results func (c *TikTokApifyClient) SearchByTrending(input teeargs.TikTokSearchByTrendingArguments, cursor client.Cursor, limit uint) ([]*teetypes.TikTokSearchByTrending, client.Cursor, error) { - apifyInput := map[string]any{ - "countryCode": input.CountryCode, - "sortBy": input.SortBy, - "maxItems": input.MaxItems, - "period": input.Period, + request := TikTokSearchByTrendingRequest{ + CountryCode: input.CountryCode, + SortBy: input.SortBy, + MaxItems: uint(input.MaxItems), + Period: input.Period, + } + + requestBytes, err := json.Marshal(request) + if err != nil { + return nil, "", fmt.Errorf("failed to marshal request: %w", err) + } + + var apifyInput map[string]any + if err := json.Unmarshal(requestBytes, &apifyInput); err != nil { + return nil, "", fmt.Errorf("failed to unmarshal to map: %w", err) } dataset, next, err := c.apify.RunActorAndGetResponse(TrendingActorID, apifyInput, cursor, limit) From 00581ce40eaf2710f3b55f1d3db1711cefac2e0a Mon Sep 17 00:00:00 2001 From: grantdfoster Date: Tue, 26 Aug 2025 21:09:34 +0200 Subject: [PATCH 15/19] chore: adds tiktok auth errors --- internal/jobs/stats/stats.go | 1 + internal/jobs/tiktok.go | 14 ++------------ 2 files changed, 3 insertions(+), 12 deletions(-) diff --git a/internal/jobs/stats/stats.go b/internal/jobs/stats/stats.go index 1500c431..ebd2ad4d 100644 --- a/internal/jobs/stats/stats.go +++ b/internal/jobs/stats/stats.go @@ -32,6 +32,7 @@ const ( TikTokTranscriptionErrors StatType = "tiktok_transcription_errors" TikTokVideos StatType = "tiktok_returned_videos" TikTokErrors StatType = "tiktok_errors" + TikTokAuthErrors StatType = "tiktok_auth_errors" RedditReturnedItems StatType = "reddit_returned_items" RedditQueries StatType = "reddit_queries" RedditErrors StatType = "reddit_errors" diff --git a/internal/jobs/tiktok.go b/internal/jobs/tiktok.go index 1caf4f21..8fa3b6bf 100644 --- a/internal/jobs/tiktok.go +++ b/internal/jobs/tiktok.go @@ -303,14 +303,9 @@ func (ttt *TikTokTranscriber) executeTranscription(j types.Job, a *teeargs.TikTo // executeSearchByQuery runs the epctex/tiktok-search-scraper actor and returns results func (ttt *TikTokTranscriber) executeSearchByQuery(j types.Job, a *teeargs.TikTokSearchByQueryArguments) (types.JobResult, error) { - if ttt.configuration.ApifyApiKey == "" { - ttt.stats.Add(j.WorkerID, stats.TikTokErrors, 1) - return types.JobResult{Error: "Apify API key not configured for searchbyquery"}, fmt.Errorf("missing Apify API key") - } - c, err := tiktokapify.NewTikTokApifyClient(ttt.configuration.ApifyApiKey) if err != nil { - ttt.stats.Add(j.WorkerID, stats.TikTokErrors, 1) + ttt.stats.Add(j.WorkerID, stats.TikTokAuthErrors, 1) return types.JobResult{Error: "Failed to create Apify client"}, fmt.Errorf("apify client: %w", err) } @@ -338,14 +333,9 @@ func (ttt *TikTokTranscriber) executeSearchByQuery(j types.Job, a *teeargs.TikTo // executeSearchByTrending runs the lexis-solutions/tiktok-trending-videos-scraper actor and returns results func (ttt *TikTokTranscriber) executeSearchByTrending(j types.Job, a *teeargs.TikTokSearchByTrendingArguments) (types.JobResult, error) { - if ttt.configuration.ApifyApiKey == "" { - ttt.stats.Add(j.WorkerID, stats.TikTokErrors, 1) - return types.JobResult{Error: "Apify API key not configured for searchbytrending"}, fmt.Errorf("missing Apify API key") - } - c, err := tiktokapify.NewTikTokApifyClient(ttt.configuration.ApifyApiKey) if err != nil { - ttt.stats.Add(j.WorkerID, stats.TikTokErrors, 1) + ttt.stats.Add(j.WorkerID, stats.TikTokAuthErrors, 1) return types.JobResult{Error: "Failed to create Apify client"}, fmt.Errorf("apify client: %w", err) } From a7c1ec6bacc0e6229ecf047ca34dc32464424f0b Mon Sep 17 00:00:00 2001 From: grantdfoster Date: Tue, 26 Aug 2025 21:42:23 +0200 Subject: [PATCH 16/19] chore: moves apify key detection to capability detection --- internal/capabilities/detector.go | 26 +++++++++++++++++++++++++- internal/capabilities/detector_test.go | 16 +++++++++++++--- internal/jobs/tiktok.go | 11 +---------- internal/jobs/twitter.go | 13 +------------ 4 files changed, 40 insertions(+), 26 deletions(-) diff --git a/internal/capabilities/detector.go b/internal/capabilities/detector.go index 5984c834..11c82c85 100644 --- a/internal/capabilities/detector.go +++ b/internal/capabilities/detector.go @@ -10,6 +10,8 @@ import ( teetypes "github.com/masa-finance/tee-types/types" "github.com/masa-finance/tee-worker/api/types" "github.com/masa-finance/tee-worker/internal/jobs/twitter" + "github.com/masa-finance/tee-worker/pkg/client" + "github.com/sirupsen/logrus" ) // JobServerInterface defines the methods we need from JobServer to avoid circular dependencies @@ -39,7 +41,7 @@ func DetectCapabilities(jc types.JobConfiguration, jobServer JobServerInterface) hasAccounts := len(accounts) > 0 hasApiKeys := len(apiKeys) > 0 - hasApifyKey := apifyApiKey != "" + hasApifyKey := hasValidApifyKey(apifyApiKey) // Add Twitter-specific capabilities based on available authentication if hasAccounts { @@ -132,3 +134,25 @@ func parseApiKeys(apiKeys []string) []*twitter.TwitterApiKey { } return result } + +// hasValidApifyKey checks if the provided Apify API key is valid by attempting to validate it +func hasValidApifyKey(apifyApiKey string) bool { + if apifyApiKey == "" { + return false + } + + // Create temporary Apify client and validate the key + apifyClient, err := client.NewApifyClient(apifyApiKey) + if err != nil { + logrus.Errorf("Failed to create Apify client during capability detection: %v", err) + return false + } + + if err := apifyClient.ValidateApiKey(); err != nil { + logrus.Errorf("Apify API key validation failed during capability detection: %v", err) + return false + } + + logrus.Infof("Apify API key validated successfully during capability detection") + return true +} diff --git a/internal/capabilities/detector_test.go b/internal/capabilities/detector_test.go index 06acdd76..2a2cbb0c 100644 --- a/internal/capabilities/detector_test.go +++ b/internal/capabilities/detector_test.go @@ -1,6 +1,7 @@ package capabilities_test import ( + "os" "slices" . "github.com/onsi/ginkgo/v2" @@ -150,14 +151,19 @@ var _ = Describe("DetectCapabilities", func() { }) Context("Apify Integration", func() { - It("should add enhanced capabilities when Apify API key is provided", func() { + It("should add enhanced capabilities when valid Apify API key is provided", func() { + apifyKey := os.Getenv("APIFY_API_KEY") + if apifyKey == "" { + Skip("APIFY_API_KEY is not set") + } + jc := types.JobConfiguration{ - "apify_api_key": "dummy", + "apify_api_key": apifyKey, } caps := DetectCapabilities(jc, nil) - // TikTok should gain search capabilities + // TikTok should gain search capabilities with valid key tiktokCaps, ok := caps[teetypes.TiktokJob] Expect(ok).To(BeTrue(), "expected tiktok capabilities to be present") Expect(tiktokCaps).To(ContainElement(teetypes.CapSearchByQuery), "expected tiktok to include CapSearchByQuery capability") @@ -168,6 +174,10 @@ var _ = Describe("DetectCapabilities", func() { Expect(ok).To(BeTrue(), "expected twitter-apify capabilities to be present") Expect(twitterApifyCaps).To(ContainElement(teetypes.CapGetFollowers), "expected twitter-apify to include CapGetFollowers capability") Expect(twitterApifyCaps).To(ContainElement(teetypes.CapGetFollowing), "expected twitter-apify to include CapGetFollowing capability") + + // Reddit should be present + _, hasReddit := caps[teetypes.RedditJob] + Expect(hasReddit).To(BeTrue(), "expected reddit capabilities to be present") }) }) }) diff --git a/internal/jobs/tiktok.go b/internal/jobs/tiktok.go index 8fa3b6bf..0f51bf4b 100644 --- a/internal/jobs/tiktok.go +++ b/internal/jobs/tiktok.go @@ -65,17 +65,8 @@ func NewTikTokTranscriber(jc types.JobConfiguration, statsCollector *stats.Stats if err := jc.Unmarshal(&config); err != nil { logrus.WithError(err).Warn("failed to unmarshal TikTokTranscriptionConfiguration from JobConfiguration, using defaults where applicable") } - // Ensure Apify key aligns with Twitter's pattern (explicit getter wins) + // Get Apify key from configuration (validation now handled at startup by capability detection) config.ApifyApiKey = jc.GetString("apify_api_key", config.ApifyApiKey) - if config.ApifyApiKey != "" { - if c, err := tiktokapify.NewTikTokApifyClient(config.ApifyApiKey); err != nil { - logrus.Errorf("Failed to create Apify client at startup: %v", err) - } else if err := c.ValidateApiKey(); err != nil { - logrus.Errorf("Apify API key validation failed at startup: %v", err) - } else { - logrus.Infof("Apify API key validated successfully at startup") - } - } // Note: APIUserAgent is optional, it can be set later or use a default if config.APIUserAgent == "" { diff --git a/internal/jobs/twitter.go b/internal/jobs/twitter.go index 71b77d85..d3c9ff8c 100644 --- a/internal/jobs/twitter.go +++ b/internal/jobs/twitter.go @@ -980,18 +980,7 @@ func NewTwitterScraper(jc types.JobConfiguration, c *stats.StatsCollector) *Twit accountManager.DetectAllApiKeyTypes() // Validate Apify API key at startup if provided (similar to API key detection) - if config.ApifyApiKey != "" { - apifyScraper, err := twitterapify.NewTwitterApifyClient(config.ApifyApiKey) - if err != nil { - logrus.Errorf("Failed to create Apify scraper at startup: %v", err) - // Don't fail startup, just log the error - the key might work later or be temporary - } else if err := apifyScraper.ValidateApiKey(); err != nil { - logrus.Errorf("Apify API key validation failed at startup: %v", err) - // Don't fail startup, just log the error - the key might work later or be temporary - } else { - logrus.Infof("Apify API key validated successfully at startup") - } - } + // Apify API key validation now handled at startup by capability detection if os.Getenv("TWITTER_SKIP_LOGIN_VERIFICATION") == "true" { config.SkipLoginVerification = true From 3ef735636a3e08b794795a9b5cfa9a63cf415684 Mon Sep 17 00:00:00 2001 From: grantdfoster Date: Tue, 26 Aug 2025 21:48:41 +0200 Subject: [PATCH 17/19] chore: adds tiktok query count --- internal/jobs/stats/stats.go | 1 + internal/jobs/tiktok.go | 2 ++ 2 files changed, 3 insertions(+) diff --git a/internal/jobs/stats/stats.go b/internal/jobs/stats/stats.go index ebd2ad4d..8f91bc72 100644 --- a/internal/jobs/stats/stats.go +++ b/internal/jobs/stats/stats.go @@ -31,6 +31,7 @@ const ( TikTokTranscriptionSuccess StatType = "tiktok_transcription_success" TikTokTranscriptionErrors StatType = "tiktok_transcription_errors" TikTokVideos StatType = "tiktok_returned_videos" + TikTokQueries StatType = "tiktok_queries" TikTokErrors StatType = "tiktok_errors" TikTokAuthErrors StatType = "tiktok_auth_errors" RedditReturnedItems StatType = "reddit_returned_items" diff --git a/internal/jobs/tiktok.go b/internal/jobs/tiktok.go index 0f51bf4b..4f4a40b1 100644 --- a/internal/jobs/tiktok.go +++ b/internal/jobs/tiktok.go @@ -319,6 +319,7 @@ func (ttt *TikTokTranscriber) executeSearchByQuery(j types.Job, a *teeargs.TikTo // Increment returned videos based on the number of items ttt.stats.Add(j.WorkerID, stats.TikTokVideos, uint(len(items))) + ttt.stats.Add(j.WorkerID, stats.TikTokQueries, 1) return types.JobResult{Data: data, NextCursor: next.String()}, nil } @@ -349,6 +350,7 @@ func (ttt *TikTokTranscriber) executeSearchByTrending(j types.Job, a *teeargs.Ti // Increment returned videos based on the number of items ttt.stats.Add(j.WorkerID, stats.TikTokVideos, uint(len(items))) + ttt.stats.Add(j.WorkerID, stats.TikTokQueries, 1) return types.JobResult{Data: data, NextCursor: next.String()}, nil } From f8eb1afec5c3fb9abd8368ac97216c09a1b1fc6d Mon Sep 17 00:00:00 2001 From: grantdfoster Date: Tue, 26 Aug 2025 22:04:26 +0200 Subject: [PATCH 18/19] chore: update types --- go.mod | 3 +-- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/go.mod b/go.mod index 660ee857..3696a985 100644 --- a/go.mod +++ b/go.mod @@ -13,8 +13,7 @@ require ( github.com/joho/godotenv v1.5.1 github.com/labstack/echo-contrib v0.17.4 github.com/labstack/echo/v4 v4.13.4 - // FIXME: remove this once the types are released - github.com/masa-finance/tee-types v1.1.11-0.20250826175307-b42bf7bb17e0 + github.com/masa-finance/tee-types v1.1.11 github.com/onsi/ginkgo/v2 v2.23.4 github.com/onsi/gomega v1.38.0 github.com/sirupsen/logrus v1.9.3 diff --git a/go.sum b/go.sum index 8dc1d7a2..2eb0313f 100644 --- a/go.sum +++ b/go.sum @@ -57,8 +57,8 @@ github.com/labstack/echo/v4 v4.13.4 h1:oTZZW+T3s9gAu5L8vmzihV7/lkXGZuITzTQkTEhcX github.com/labstack/echo/v4 v4.13.4/go.mod h1:g63b33BZ5vZzcIUF8AtRH40DrTlXnx4UMC8rBdndmjQ= github.com/labstack/gommon v0.4.2 h1:F8qTUNXgG1+6WQmqoUWnz8WiEU60mXVVw0P4ht1WRA0= github.com/labstack/gommon v0.4.2/go.mod h1:QlUFxVM+SNXhDL/Z7YhocGIBYOiwB0mXm1+1bAPHPyU= -github.com/masa-finance/tee-types v1.1.11-0.20250826175307-b42bf7bb17e0 h1:f5FYfxfJ5xoAejRrleiIfHLl0DGHfEBn7hLLDaRXoi8= -github.com/masa-finance/tee-types v1.1.11-0.20250826175307-b42bf7bb17e0/go.mod h1:sB98t0axFlPi2d0zUPFZSQ84mPGwbr9eRY5yLLE3fSc= +github.com/masa-finance/tee-types v1.1.11 h1:/8YwfLAcY1qr0BwoJBtxxDtMfc32I0niO+6R3tCWDdk= +github.com/masa-finance/tee-types v1.1.11/go.mod h1:sB98t0axFlPi2d0zUPFZSQ84mPGwbr9eRY5yLLE3fSc= github.com/masa-finance/twitter-scraper v1.0.2 h1:him+wvYZHg/7EDdy73z1ceUywDJDRAhPLD2CSEa2Vfk= github.com/masa-finance/twitter-scraper v1.0.2/go.mod h1:38MY3g/h4V7Xl4HbW9lnkL8S3YiFZenBFv86hN57RG8= github.com/mattn/go-colorable v0.1.14 h1:9A9LHSqF/7dyVVX6g0U9cwm9pG3kP9gSzcuIPHPsaIE= From fd311c2ef7d62a3655fe56fd1b012b044e579dc5 Mon Sep 17 00:00:00 2001 From: grantdfoster Date: Wed, 27 Aug 2025 17:47:05 +0200 Subject: [PATCH 19/19] fix: return error if capability is not supported --- internal/jobs/tiktok.go | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/internal/jobs/tiktok.go b/internal/jobs/tiktok.go index 4f4a40b1..73949af2 100644 --- a/internal/jobs/tiktok.go +++ b/internal/jobs/tiktok.go @@ -111,20 +111,13 @@ func (ttt *TikTokTranscriber) ExecuteJob(j types.Job) (types.JobResult, error) { // Branch by argument type (transcription vs search) if transcriptionArgs, ok := jobArgs.(*teeargs.TikTokTranscriptionArguments); ok { return ttt.executeTranscription(j, transcriptionArgs) - } - if searchByQueryArgs, ok := jobArgs.(*teeargs.TikTokSearchByQueryArguments); ok { + } else if searchByQueryArgs, ok := jobArgs.(*teeargs.TikTokSearchByQueryArguments); ok { return ttt.executeSearchByQuery(j, searchByQueryArgs) - } - if searchByTrendingArgs, ok := jobArgs.(*teeargs.TikTokSearchByTrendingArguments); ok { + } else if searchByTrendingArgs, ok := jobArgs.(*teeargs.TikTokSearchByTrendingArguments); ok { return ttt.executeSearchByTrending(j, searchByTrendingArgs) - } - - // Fallback: treat as searchbyquery (default capability) - searchByQueryArgs, ok := jobArgs.(*teeargs.TikTokSearchByQueryArguments) - if !ok { + } else { return types.JobResult{Error: "invalid argument type for TikTok job"}, fmt.Errorf("invalid argument type") } - return ttt.executeSearchByQuery(j, searchByQueryArgs) } // executeTranscription calls the external transcription service and returns a normalized result