Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions internal/api/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@ func generate(c echo.Context) error {
return c.String(http.StatusOK, encryptedSignature)
}

// add adds a job to the job server.
//
// The request body should contain a JobRequest, which will be decrypted and
// passed to the job server. The response body will contain a JobResponse with
// the UUID of the added job.
//
// If there is an error, the response body will contain a JobError with an
// appropriate error message.
func add(jobServer *jobserver.JobServer) func(c echo.Context) error {
return func(c echo.Context) error {
jobRequest := types.JobRequest{}
Expand All @@ -36,10 +44,20 @@ func add(jobServer *jobserver.JobServer) func(c echo.Context) error {

uuid := jobServer.AddJob(*job)

// check if uuid is empty
if uuid == "" {
return c.JSON(http.StatusInternalServerError, types.JobError{Error: "Failed to add job"})
}

return c.JSON(http.StatusOK, types.JobResponse{UID: uuid})
}
}

// status returns the result of a job. If the job is not found, it returns an
// error with a status code of 404. If there is an error with the job, it
// returns an error with a status code of 500. If the job has not finished, it
// returns an empty string with a status code of 200. Otherwise, it returns the
// sealed result of the job with a status code of 200.
func status(jobServer *jobserver.JobServer) func(c echo.Context) error {
return func(c echo.Context) error {
res, exists := jobServer.GetJobResult(c.Param("job_id"))
Expand Down
3 changes: 2 additions & 1 deletion internal/jobs/twitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ func (ts *TwitterScraper) ScrapeTweetsByQuery(baseDir string, query string, coun
return nil, err
}

var tweets []*TweetResult
for _, tweet := range result.Data {
var newTweet twitterscraper.Tweet
newTweet.ID = tweet.ID
Expand All @@ -167,6 +166,7 @@ func (ts *TwitterScraper) ScrapeTweetsByQuery(baseDir string, query string, coun
}

ts.statsCollector.Add(stats.TwitterTweets, uint(len(result.Data)))

return tweets, nil

}
Expand All @@ -184,6 +184,7 @@ func (ts *TwitterScraper) ScrapeTweetsByQuery(baseDir string, query string, coun
}

ts.statsCollector.Add(stats.TwitterTweets, uint(len(tweets)))
logrus.Info("Scraped tweets: ", len(tweets))
return tweets, nil
}

Expand Down
56 changes: 39 additions & 17 deletions internal/jobs/twitterx/scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"
"fmt"
"github.com/masa-finance/tee-worker/pkg/client"
"github.com/sirupsen/logrus"
"io"
"net/http"
"net/url"
Expand All @@ -21,16 +22,28 @@ type TwitterXScraper struct {

type TwitterXSearchQueryResult struct {
Data []struct {
ID string `json:"id"`
EditHistoryTweetIds []string `json:"edit_history_tweet_ids"`
Text string `json:"text"`
EditHistoryTweetIds []string `json:"edit_history_tweet_ids"`
ID string `json:"id"`
} `json:"data"`
Meta struct {
NewestID string `json:"newest_id"`
OldestID string `json:"oldest_id"`
ResultCount int `json:"result_count"`
NextToken string `json:"next_token"`
} `json:"meta"`
Status string
Message string
}

// SearchParams holds all possible search parameters
type SearchParams struct {
Query string // The search query
MaxResults int // Maximum number of results to return
NextToken string // Token for getting the next page of results
SinceID string // Returns results with a Tweet ID greater than this ID
UntilID string // Returns results with a Tweet ID less than this ID
TweetFields []string // Additional tweet fields to include
}

func NewTwitterXScraper(client *client.TwitterXClient) *TwitterXScraper {
Expand Down Expand Up @@ -60,23 +73,38 @@ func (s *TwitterXScraper) ScrapeTweetsByQuery(query string) (*TwitterXSearchQuer
// run the search
response, err := client.Get(endpoint)
if err != nil {
logrus.Error("failed to execute search query: %w", err)
return nil, fmt.Errorf("failed to execute search query: %w", err)
}
defer response.Body.Close()

// read the response body
var body []byte
body, err = io.ReadAll(response.Body)
if err != nil {
logrus.Error("failed to read response body: %w", err)
return nil, fmt.Errorf("failed to read response body: %w", err)
}

// check response status
if response.StatusCode != http.StatusOK {
body, _ := io.ReadAll(response.Body)
return nil, fmt.Errorf("unexpected status code %d: %s", response.StatusCode, string(body))
logrus.Errorf("unexpected status code %d", response.StatusCode)
return nil, fmt.Errorf("unexpected status code %d", response.StatusCode)
}

// unmarshal the response
var result TwitterXSearchQueryResult
err = json.NewDecoder(response.Body).Decode(&result)
if err != nil {
return nil, fmt.Errorf("failed to decode response: %w", err)
if err := json.Unmarshal(body, &result); err != nil {
logrus.WithError(err).Error("failed to unmarshal response")
return nil, fmt.Errorf("failed to unmarshal response: %w", err)
}

logrus.WithFields(logrus.Fields{
"result_count": result.Meta.ResultCount,
"newest_id": result.Meta.NewestID,
"oldest_id": result.Meta.OldestID,
}).Info("Successfully scraped tweets by query")

return &result, nil
}

Expand Down Expand Up @@ -117,32 +145,26 @@ func (s *TwitterXScraper) ScrapeTweetsByQueryExtended(params SearchParams) (*Twi
// run the search
response, err := client.Get(endpoint)
if err != nil {
logrus.Errorf("failed to execute search query: %s", err)
return nil, fmt.Errorf("failed to execute search query: %w", err)
}
defer response.Body.Close()

// check response status
if response.StatusCode != http.StatusOK {
body, _ := io.ReadAll(response.Body)
logrus.Errorf("unexpected status code %d: %s", response.StatusCode, string(body))
return nil, fmt.Errorf("unexpected status code %d: %s", response.StatusCode, string(body))
}

// unmarshal the response
var result TwitterXSearchQueryResult
err = json.NewDecoder(response.Body).Decode(&result)
if err != nil {
logrus.Error("failed to decode response: %w", err)
return nil, fmt.Errorf("failed to decode response: %w", err)
}

logrus.Info("Successfully scraped tweets by query, result count: ", result.Meta.ResultCount)
return &result, nil
}

// SearchParams holds all possible search parameters
type SearchParams struct {
Query string // The search query
MaxResults int // Maximum number of results to return
NextToken string // Token for getting the next page of results
SinceID string // Returns results with a Tweet ID greater than this ID
UntilID string // Returns results with a Tweet ID less than this ID
TweetFields []string // Additional tweet fields to include
}
16 changes: 0 additions & 16 deletions pkg/client/twitter_x_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,22 +76,6 @@ func (c *TwitterXClient) Get(endpointUrl string) (*http.Response, error) {
logrus.Errorf("error making GET request: %v", err)
return nil, fmt.Errorf("error making GET request: %w", err)
}
defer resp.Body.Close()

// Read the response body
body, err := io.ReadAll(resp.Body)
if err != nil {
logrus.Errorf("error reading response: %v", err)
return nil, fmt.Errorf("error reading response: %w", err)
}

logrus.Info("Response body: ", string(body))

// if the response is not 200, return an error
if resp.StatusCode != http.StatusOK {
logrus.Errorf("API request failed with status: %d", resp.StatusCode)
return nil, fmt.Errorf("API request failed with status: %d", resp.StatusCode)
}

return resp, nil
}
Expand Down
Loading