diff --git a/Makefile b/Makefile index 1417711c..a695ab73 100644 --- a/Makefile +++ b/Makefile @@ -68,10 +68,13 @@ test: docker-build-test docker run --user root $(ENV_FILE_ARG) -e LOG_LEVEL=debug -v $(PWD)/coverage:/app/coverage --rm --workdir /app $(TEST_IMAGE) go test -coverprofile=coverage/coverage.txt -covermode=atomic -v $(TEST_ARGS) test-capabilities: docker-build-test - @docker run --user root $(ENV_FILE_ARG) -e LOG_LEVEL=debug -v $(PWD)/coverage:/app/coverage --rm --workdir /app $(TEST_IMAGE) go test -coverprofile=coverage/coverage-capabilities.txt -covermode=atomic -v ./internal/capabilities + @docker run --user root $(ENV_FILE_ARG) -e LOG_LEVEL=debug -v $(PWD)/coverage:/app/coverage --rm --workdir /app $(TEST_IMAGE) go test -v ./internal/capabilities + +test-api: docker-build-test + @docker run --user root $(ENV_FILE_ARG) -v $(PWD)/.masa:/home/masa -v $(PWD)/coverage:/app/coverage --rm --workdir /app -e DATA_DIR=/home/masa $(TEST_IMAGE) go test -v ./internal/api test-jobs: docker-build-test - @docker run --user root $(ENV_FILE_ARG) -v $(PWD)/.masa:/home/masa -v $(PWD)/coverage:/app/coverage --rm --workdir /app -e DATA_DIR=/home/masa $(TEST_IMAGE) go test -coverprofile=coverage/coverage-jobs.txt -covermode=atomic -v ./internal/jobs + @docker run --user root $(ENV_FILE_ARG) -v $(PWD)/.masa:/home/masa -v $(PWD)/coverage:/app/coverage --rm --workdir /app -e DATA_DIR=/home/masa $(TEST_IMAGE) go test -v ./internal/jobs test-twitter: docker-build-test @docker run --user root $(ENV_FILE_ARG) -v $(PWD)/.masa:/home/masa -v $(PWD)/coverage:/app/coverage --rm --workdir /app -e DATA_DIR=/home/masa $(TEST_IMAGE) go test -v ./internal/jobs/twitter_test.go ./internal/jobs/jobs_suite_test.go @@ -83,7 +86,11 @@ test-reddit: docker-build-test @docker run --user root $(ENV_FILE_ARG) -v $(PWD)/.masa:/home/masa -v $(PWD)/coverage:/app/coverage --rm --workdir /app -e DATA_DIR=/home/masa $(TEST_IMAGE) go test -v ./internal/jobs/reddit_test.go ./internal/jobs/redditapify/client_test.go ./api/types/reddit/reddit_suite_test.go test-web: docker-build-test - @docker run --user root $(ENV_FILE_ARG) -v $(PWD)/.masa:/home/masa -v $(PWD)/coverage:/app/coverage --rm --workdir /app -e DATA_DIR=/home/masa $(TEST_IMAGE) go test -v ./internal/jobs/webscraper_test.go ./internal/jobs/jobs_suite_test.go + @docker run --user root $(ENV_FILE_ARG) -v $(PWD)/.masa:/home/masa -v $(PWD)/coverage:/app/coverage --rm --workdir /app -e DATA_DIR=/home/masa $(TEST_IMAGE) sh -c "cd /app && go test -v ./internal/jobs/web_test.go ./internal/jobs/jobs_suite_test.go" + @docker run --user root $(ENV_FILE_ARG) -v $(PWD)/.masa:/home/masa -v $(PWD)/coverage:/app/coverage --rm --workdir /app -e DATA_DIR=/home/masa $(TEST_IMAGE) sh -c "cd /app && go test -v ./internal/jobs/webapify" + +test-llm: docker-build-test + @docker run --user root $(ENV_FILE_ARG) -v $(PWD)/.masa:/home/masa -v $(PWD)/coverage:/app/coverage --rm --workdir /app -e DATA_DIR=/home/masa $(TEST_IMAGE) sh -c "cd /app && go test -v ./internal/jobs/llmapify" test-telemetry: docker-build-test - @docker run --user root $(ENV_FILE_ARG) -v $(PWD)/.masa:/home/masa -v $(PWD)/coverage:/app/coverage --rm --workdir /app -e DATA_DIR=/home/masa $(TEST_IMAGE) go test -v ./internal/jobs/telemetry_test.go ./internal/jobs/jobs_suite_test.go + @docker run --user root $(ENV_FILE_ARG) -v $(PWD)/.masa:/home/masa -v $(PWD)/coverage:/app/coverage --rm --workdir /app -e DATA_DIR=/home/masa $(TEST_IMAGE) go test -v ./internal/jobs/telemetry_test.go \ No newline at end of file diff --git a/go.mod b/go.mod index ad14f922..8aef56f6 100644 --- a/go.mod +++ b/go.mod @@ -5,15 +5,13 @@ go 1.23.0 toolchain go1.24.3 require ( - github.com/cenkalti/backoff v2.2.1+incompatible github.com/edgelesssys/ego v1.7.2 - github.com/gocolly/colly v1.2.0 github.com/google/uuid v1.6.0 github.com/imperatrona/twitter-scraper v0.0.18 github.com/joho/godotenv v1.5.1 github.com/labstack/echo-contrib v0.17.4 github.com/labstack/echo/v4 v4.13.4 - github.com/masa-finance/tee-types v1.1.13 + github.com/masa-finance/tee-types v1.1.15 github.com/onsi/ginkgo/v2 v2.23.4 github.com/onsi/gomega v1.38.0 github.com/sirupsen/logrus v1.9.3 @@ -28,24 +26,13 @@ require ( ) require ( - github.com/PuerkitoBio/goquery v1.10.3 // indirect - github.com/andybalholm/cascadia v1.3.3 // indirect - github.com/antchfx/htmlquery v1.3.4 // indirect - github.com/antchfx/xmlquery v1.4.4 // indirect - github.com/antchfx/xpath v1.3.4 // indirect github.com/go-jose/go-jose/v4 v4.1.2 // indirect github.com/go-logr/logr v1.4.3 // indirect - github.com/gobwas/glob v0.2.3 // indirect - github.com/golang/groupcache v0.0.0-20241129210726-2c02b8208cf8 // indirect - github.com/golang/protobuf v1.5.4 // indirect github.com/google/go-cmp v0.7.0 // indirect github.com/google/pprof v0.0.0-20250630185457-6e76a2b096b5 // indirect - github.com/kennygrant/sanitize v1.2.4 // indirect github.com/labstack/gommon v0.4.2 github.com/mattn/go-colorable v0.1.14 // indirect github.com/mattn/go-isatty v0.0.20 // indirect - github.com/saintfish/chardet v0.0.0-20230101081208-5e3ef4b5456d // indirect - github.com/temoto/robotstxt v1.1.2 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/fasttemplate v1.2.2 // indirect golang.org/x/crypto v0.41.0 // indirect @@ -55,7 +42,6 @@ require ( golang.org/x/text v0.28.0 // indirect golang.org/x/time v0.12.0 // indirect golang.org/x/tools v0.35.0 // indirect - google.golang.org/appengine v1.6.8 // indirect google.golang.org/protobuf v1.36.7 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 6cf96c43..bdb89acb 100644 --- a/go.sum +++ b/go.sum @@ -1,18 +1,5 @@ github.com/AlexEidt/Vidio v1.5.1 h1:tovwvtgQagUz1vifiL9OeWkg1fP/XUzFazFKh7tFtaE= github.com/AlexEidt/Vidio v1.5.1/go.mod h1:djhIMnWMqPrC3X6nB6ymGX6uWWlgw+VayYGKE1bNwmI= -github.com/PuerkitoBio/goquery v1.10.3 h1:pFYcNSqHxBD06Fpj/KsbStFRsgRATgnf3LeXiUkhzPo= -github.com/PuerkitoBio/goquery v1.10.3/go.mod h1:tMUX0zDMHXYlAQk6p35XxQMqMweEKB7iK7iLNd4RH4Y= -github.com/andybalholm/cascadia v1.3.3 h1:AG2YHrzJIm4BZ19iwJ/DAua6Btl3IwJX+VI4kktS1LM= -github.com/andybalholm/cascadia v1.3.3/go.mod h1:xNd9bqTn98Ln4DwST8/nG+H0yuB8Hmgu1YHNnWw0GeA= -github.com/antchfx/htmlquery v1.3.4 h1:Isd0srPkni2iNTWCwVj/72t7uCphFeor5Q8nCzj1jdQ= -github.com/antchfx/htmlquery v1.3.4/go.mod h1:K9os0BwIEmLAvTqaNSua8tXLWRWZpocZIH73OzWQbwM= -github.com/antchfx/xmlquery v1.4.4 h1:mxMEkdYP3pjKSftxss4nUHfjBhnMk4imGoR96FRY2dg= -github.com/antchfx/xmlquery v1.4.4/go.mod h1:AEPEEPYE9GnA2mj5Ur2L5Q5/2PycJ0N9Fusrx9b12fc= -github.com/antchfx/xpath v1.3.3/go.mod h1:i54GszH55fYfBmoZXapTHN8T8tkcHfRgLyVwwqzXNcs= -github.com/antchfx/xpath v1.3.4 h1:1ixrW1VnXd4HurCj7qnqnR0jo14g8JMe20Fshg1Vgz4= -github.com/antchfx/xpath v1.3.4/go.mod h1:i54GszH55fYfBmoZXapTHN8T8tkcHfRgLyVwwqzXNcs= -github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4= -github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -24,18 +11,6 @@ github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI= github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= -github.com/gobwas/glob v0.2.3 h1:A4xDbljILXROh+kObIiy5kIaPYD8e96x1tgBhUI5J+Y= -github.com/gobwas/glob v0.2.3/go.mod h1:d3Ez4x06l9bZtSvzIay5+Yzi0fmZzPgnTbPcKjJAkT8= -github.com/gocolly/colly v1.2.0 h1:qRz9YAn8FIH0qzgNUw+HT9UN7wm1oF9OBAilwEWpyrI= -github.com/gocolly/colly v1.2.0/go.mod h1:Hof5T3ZswNVsOHYmba1u03W65HDWgpV5HifSuueE0EA= -github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= -github.com/golang/groupcache v0.0.0-20241129210726-2c02b8208cf8 h1:f+oWsMOmNPc8JmEHVZIycC7hBoQxHH9pNKQORJNozsQ= -github.com/golang/groupcache v0.0.0-20241129210726-2c02b8208cf8/go.mod h1:wcDNUvekVysuuOpQKo3191zZyTpiI6se1N1ULghS0sw= -github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= -github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= -github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= -github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= -github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= @@ -45,8 +20,6 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= -github.com/kennygrant/sanitize v1.2.4 h1:gN25/otpP5vAsO2djbMhF/LQX6R7+O1TB4yv8NzpJ3o= -github.com/kennygrant/sanitize v1.2.4/go.mod h1:LGsjYYtgxbetdg5owWB2mpgUL6e2nfw2eObZ0u0qvak= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -57,8 +30,8 @@ github.com/labstack/echo/v4 v4.13.4 h1:oTZZW+T3s9gAu5L8vmzihV7/lkXGZuITzTQkTEhcX github.com/labstack/echo/v4 v4.13.4/go.mod h1:g63b33BZ5vZzcIUF8AtRH40DrTlXnx4UMC8rBdndmjQ= github.com/labstack/gommon v0.4.2 h1:F8qTUNXgG1+6WQmqoUWnz8WiEU60mXVVw0P4ht1WRA0= github.com/labstack/gommon v0.4.2/go.mod h1:QlUFxVM+SNXhDL/Z7YhocGIBYOiwB0mXm1+1bAPHPyU= -github.com/masa-finance/tee-types v1.1.13 h1:bVXUEF8nXT3bhJE4kcDwcuzfQopid9BbIp0/OucClL4= -github.com/masa-finance/tee-types v1.1.13/go.mod h1:sB98t0axFlPi2d0zUPFZSQ84mPGwbr9eRY5yLLE3fSc= +github.com/masa-finance/tee-types v1.1.15 h1:DfTNAYsG5g3XPxzJ2kw1bbT536mOeux3ZxaAq8XnNLg= +github.com/masa-finance/tee-types v1.1.15/go.mod h1:sB98t0axFlPi2d0zUPFZSQ84mPGwbr9eRY5yLLE3fSc= github.com/masa-finance/twitter-scraper v1.0.2 h1:him+wvYZHg/7EDdy73z1ceUywDJDRAhPLD2CSEa2Vfk= github.com/masa-finance/twitter-scraper v1.0.2/go.mod h1:38MY3g/h4V7Xl4HbW9lnkL8S3YiFZenBFv86hN57RG8= github.com/mattn/go-colorable v0.1.14 h1:9A9LHSqF/7dyVVX6g0U9cwm9pG3kP9gSzcuIPHPsaIE= @@ -73,17 +46,12 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g= github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U= -github.com/saintfish/chardet v0.0.0-20230101081208-5e3ef4b5456d h1:hrujxIzL1woJ7AwssoOcM/tq5JjjG2yYOc8odClEiXA= -github.com/saintfish/chardet v0.0.0-20230101081208-5e3ef4b5456d/go.mod h1:uugorj2VCxiV1x+LzaIdVa9b4S4qGAcH6cbhh4qVxOU= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -github.com/temoto/robotstxt v1.1.2 h1:W2pOjSJ6SWvldyEuiFXNxz3xZ8aiWX5LbfDiOFd7Fxg= -github.com/temoto/robotstxt v1.1.2/go.mod h1:+1AmkuG3IYkh1kv0d2qEB9Le88ehNO0zwOr3ujewlOo= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/fasttemplate v1.2.2 h1:lxLXG0uE3Qnshl9QyaK6XJxMXlQZELvChBOCmQD0Loo= @@ -97,7 +65,6 @@ golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliY golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8= golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70= -golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= golang.org/x/crypto v0.41.0 h1:WKYxWedPGCTVVl5+WHSSrOBT0O8lx32+zxmHxijgXp4= golang.org/x/crypto v0.41.0/go.mod h1:pO5AFd7FA68rFak7rOAGVuygIISepHftHnr8dr6+sUc= golang.org/x/exp v0.0.0-20250718183923-645b1fa84792 h1:R9PFI6EUdfVKgwKjZef7QIwGcBKu86OEFpJ9nUEP2l4= @@ -116,7 +83,6 @@ golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk= golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0= -golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4= golang.org/x/net v0.43.0 h1:lat02VYK2j4aLzMzecihNvTlJNQUq316m2Mr9rnM6YE= golang.org/x/net v0.43.0/go.mod h1:vhO1fvI4dGsIjh73sWfUVjj3N7CA9WkKJNQm2svM6Jg= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -126,7 +92,6 @@ golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= -golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -140,7 +105,6 @@ golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI= golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/telemetry v0.0.0-20240228155512-f48c80bd79b2/go.mod h1:TeRTkGYfJXctD9OcfyVLyj2J3IxLnKwHJR8f4D8a3YE= @@ -152,18 +116,15 @@ golang.org/x/term v0.12.0/go.mod h1:owVbMEjm3cBLCHdkQu9b1opXd4ETQWc3BhuQGKgXgvU= golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk= golang.org/x/term v0.20.0/go.mod h1:8UkIAJTvZgivsXaD6/pH6U9ecQzZ45awqEOzuCvwpFY= golang.org/x/term v0.24.0/go.mod h1:lOBK/LVxemqiMij05LGJ0tzNr8xlmwBRJ81PX6wVLH8= -golang.org/x/term v0.27.0/go.mod h1:iMsnZpn0cago0GOrHO2+Y7u7JPn5AylBrcoWkElMTSM= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= -golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= -golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= golang.org/x/text v0.28.0 h1:rhazDwis8INMIwQ4tpjLDzUhx6RlXqZNPEM0huQojng= golang.org/x/text v0.28.0/go.mod h1:U8nCwOR8jO/marOQ0QbDiOngZVEBB7MAiitBuMjXiNU= golang.org/x/time v0.12.0 h1:ScB/8o8olJvc+CQPWrK3fPZNfh7qgwCrY0zJmoEQLSE= @@ -177,11 +138,6 @@ golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxb golang.org/x/tools v0.35.0 h1:mBffYraMEf7aa0sB+NuKnuCy8qI/9Bughn8dC2Gu5r0= golang.org/x/tools v0.35.0/go.mod h1:NKdj5HkL/73byiZSJjqJgKn3ep7KjFkBOkR/Hps3VPw= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/appengine v1.6.8 h1:IhEN5q69dyKagZPYMSdIjS2HqprW324FRQZJcGqPAsM= -google.golang.org/appengine v1.6.8/go.mod h1:1jJ3jBArFh5pcgW8gCtRJnepW8FzD1V44FJffLiz/Ds= -google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= -google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.36.7 h1:IgrO7UwFQGJdRNXH/sQux4R1Dj1WAKcLElzeeRaXV2A= google.golang.org/protobuf v1.36.7/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/internal/api/api_test.go b/internal/api/api_test.go index 3b1c2edb..3af0d603 100644 --- a/internal/api/api_test.go +++ b/internal/api/api_test.go @@ -70,41 +70,14 @@ var _ = Describe("API", func() { cancel() }) - It("should submit an invalid job, and fail because of the malformed URL. no results containing google", func() { - // Step 1: Create the job request - job := types.Job{ - Type: teetypes.WebJob, - Arguments: map[string]interface{}{ - "url": "google", - }, - } - - // Step 2: Get a Job signature - jobSignature, err := clientInstance.CreateJobSignature(job) - Expect(err).NotTo(HaveOccurred()) - Expect(jobSignature).NotTo(BeEmpty()) - - // Step 3: Submit the job - jobResult, err := clientInstance.SubmitJob(jobSignature) - Expect(err).NotTo(HaveOccurred()) - Expect(jobResult.UUID).NotTo(BeEmpty()) - - // Step 4: Wait for the job result - should fail due to invalid URL - encryptedResult, err := jobResult.Get() - Expect(err).To(HaveOccurred()) - Expect(encryptedResult).To(BeEmpty()) - - // The error should be about URL scheme validation - Expect(err.Error()).To(ContainSubstring("URL must include a scheme")) - }) - It("should submit a job and get the correct result", func() { // Step 1: Create the job request + // we use TikTok transcription here as it's supported by all workers without any unique config job := types.Job{ - Type: teetypes.WebJob, + Type: teetypes.TiktokJob, Arguments: map[string]interface{}{ - "url": "https://google.com", - "depth": 1, + "type": "transcription", + "video_url": "https://www.tiktok.com/@theblockrunner.com/video/7227579907361066282", }, } // Step 2: Get a Job signature @@ -127,12 +100,10 @@ var _ = Describe("API", func() { Expect(err).NotTo(HaveOccurred()) Expect(decryptedResult).NotTo(BeEmpty()) - Expect(decryptedResult).To(ContainSubstring("google")) result, err := jobResult.GetDecrypted(jobSignature) Expect(err).NotTo(HaveOccurred()) Expect(result).NotTo(BeEmpty()) - Expect(result).To(ContainSubstring("google")) }) It("bubble up errors", func() { diff --git a/internal/capabilities/detector.go b/internal/capabilities/detector.go index 005c60c0..27a4cd14 100644 --- a/internal/capabilities/detector.go +++ b/internal/capabilities/detector.go @@ -38,10 +38,12 @@ func DetectCapabilities(jc config.JobConfiguration, jobServer JobServerInterface accounts := jc.GetStringSlice("twitter_accounts", nil) apiKeys := jc.GetStringSlice("twitter_api_keys", nil) apifyApiKey := jc.GetString("apify_api_key", "") + geminiApiKey := config.LlmApiKey(jc.GetString("gemini_api_key", "")) hasAccounts := len(accounts) > 0 hasApiKeys := len(apiKeys) > 0 hasApifyKey := hasValidApifyKey(apifyApiKey) + hasLLMKey := geminiApiKey.IsValid() // Add Twitter-specific capabilities based on available authentication if hasAccounts { @@ -73,6 +75,9 @@ func DetectCapabilities(jc config.JobConfiguration, jobServer JobServerInterface s.Add(teetypes.TiktokSearchCaps...) capabilities[teetypes.TiktokJob] = s.Items() + if hasLLMKey { + capabilities[teetypes.WebJob] = teetypes.WebCaps + } } // Add general TwitterJob capability if any Twitter auth is available diff --git a/internal/capabilities/detector_test.go b/internal/capabilities/detector_test.go index 40b71d40..5b0bdb4e 100644 --- a/internal/capabilities/detector_test.go +++ b/internal/capabilities/detector_test.go @@ -8,8 +8,8 @@ import ( . "github.com/onsi/gomega" teetypes "github.com/masa-finance/tee-types/types" - "github.com/masa-finance/tee-worker/internal/config" . "github.com/masa-finance/tee-worker/internal/capabilities" + "github.com/masa-finance/tee-worker/internal/config" ) // MockJobServer implements JobServerInterface for testing @@ -65,7 +65,6 @@ var _ = Describe("DetectCapabilities", func() { config.JobConfiguration{}, nil, teetypes.WorkerCapabilities{ - teetypes.WebJob: {teetypes.CapScraper}, teetypes.TelemetryJob: {teetypes.CapTelemetry}, teetypes.TiktokJob: {teetypes.CapTranscription}, }, @@ -76,7 +75,6 @@ var _ = Describe("DetectCapabilities", func() { }, nil, teetypes.WorkerCapabilities{ - teetypes.WebJob: {teetypes.CapScraper}, teetypes.TelemetryJob: {teetypes.CapTelemetry}, teetypes.TiktokJob: {teetypes.CapTranscription}, teetypes.TwitterCredentialJob: teetypes.TwitterCredentialCaps, @@ -89,7 +87,6 @@ var _ = Describe("DetectCapabilities", func() { }, nil, teetypes.WorkerCapabilities{ - teetypes.WebJob: {teetypes.CapScraper}, teetypes.TelemetryJob: {teetypes.CapTelemetry}, teetypes.TiktokJob: {teetypes.CapTranscription}, teetypes.TwitterApiJob: teetypes.TwitterAPICaps, @@ -102,7 +99,6 @@ var _ = Describe("DetectCapabilities", func() { }, nil, teetypes.WorkerCapabilities{ - teetypes.WebJob: {teetypes.CapScraper}, teetypes.TelemetryJob: {teetypes.CapTelemetry}, teetypes.TiktokJob: {teetypes.CapTranscription}, // Note: Mock elevated keys will be detected as basic since we can't make real API calls in tests @@ -133,19 +129,19 @@ var _ = Describe("DetectCapabilities", func() { }, Entry("Basic scrapers only", config.JobConfiguration{}, - []string{"web", "telemetry", "tiktok"}, + []string{"telemetry", "tiktok"}, ), Entry("With Twitter accounts", config.JobConfiguration{ "twitter_accounts": []string{"user1:pass1"}, }, - []string{"web", "telemetry", "tiktok", "twitter", "twitter-credential"}, + []string{"telemetry", "tiktok", "twitter", "twitter-credential"}, ), Entry("With Twitter API keys", config.JobConfiguration{ "twitter_api_keys": []string{"key1"}, }, - []string{"web", "telemetry", "tiktok", "twitter", "twitter-api"}, + []string{"telemetry", "tiktok", "twitter", "twitter-api"}, ), ) }) @@ -179,6 +175,27 @@ var _ = Describe("DetectCapabilities", func() { _, hasReddit := caps[teetypes.RedditJob] Expect(hasReddit).To(BeTrue(), "expected reddit capabilities to be present") }) + It("should add enhanced capabilities when valid Apify API key is provided alongside a Gemini API key", func() { + apifyKey := os.Getenv("APIFY_API_KEY") + if apifyKey == "" { + Skip("APIFY_API_KEY is not set") + } + + geminiKey := os.Getenv("GEMINI_API_KEY") + if geminiKey == "" { + Skip("GEMINI_API_KEY is not set") + } + + jc := config.JobConfiguration{ + "apify_api_key": apifyKey, + "gemini_api_key": geminiKey, + } + caps := DetectCapabilities(jc, nil) + + // Web should be present + _, hasWeb := caps[teetypes.WebJob] + Expect(hasWeb).To(BeTrue(), "expected web capabilities to be present") + }) }) }) diff --git a/internal/config/config.go b/internal/config/config.go index e9c1b14c..bfe05915 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -159,6 +159,14 @@ func ReadConfig() JobConfiguration { jc["apify_api_key"] = "" } + geminiApiKey := os.Getenv("GEMINI_API_KEY") + if geminiApiKey != "" { + logrus.Info("Gemini API key found") + jc["gemini_api_key"] = geminiApiKey + } else { + jc["gemini_api_key"] = "" + } + tikTokLang := os.Getenv("TIKTOK_DEFAULT_LANGUAGE") if tikTokLang == "" { tikTokLang = "eng-US" @@ -295,6 +303,41 @@ func (jc JobConfiguration) GetRedditConfig() RedditConfig { } } +// LlmApiKey represents an LLM API key with validation capabilities +type LlmApiKey string + +// IsValid checks if the LLM API key is valid +func (k LlmApiKey) IsValid() bool { + if k == "" { + return false + } + + // TODO: Add actual Gemini API key validation with a handler + // For now, just check if it's not empty + return true +} + +type LlmConfig struct { + GeminiApiKey LlmApiKey +} + +// WebConfig represents the configuration needed for Web scraping via Apify +type WebConfig struct { + LlmConfig + ApifyApiKey string +} + +// GetWebConfig constructs a WebConfig directly from the JobConfiguration +// This eliminates the need for JSON marshaling/unmarshaling +func (jc JobConfiguration) GetWebConfig() WebConfig { + return WebConfig{ + LlmConfig: LlmConfig{ + GeminiApiKey: LlmApiKey(jc.GetString("gemini_api_key", "")), + }, + ApifyApiKey: jc.GetString("apify_api_key", ""), + } +} + // ParseLogLevel parses a string and returns the corresponding logrus.Level. func ParseLogLevel(logLevel string) logrus.Level { switch strings.ToLower(logLevel) { diff --git a/internal/jobs/llmapify/client.go b/internal/jobs/llmapify/client.go new file mode 100644 index 00000000..e6d5b977 --- /dev/null +++ b/internal/jobs/llmapify/client.go @@ -0,0 +1,93 @@ +package llmapify + +import ( + "encoding/json" + "errors" + "fmt" + + teeargs "github.com/masa-finance/tee-types/args" + teetypes "github.com/masa-finance/tee-types/types" + "github.com/masa-finance/tee-worker/internal/config" + "github.com/masa-finance/tee-worker/internal/jobs/stats" + "github.com/masa-finance/tee-worker/pkg/client" + "github.com/sirupsen/logrus" +) + +const ( + ActorID = "dusan.vystrcil~llm-dataset-processor" +) + +var ( + ErrProviderKeyRequired = errors.New("llm provider key is required") + ErrFailedToCreateClient = errors.New("failed to create apify client") +) + +type ApifyClient struct { + client client.Apify + statsCollector *stats.StatsCollector + llmConfig config.LlmConfig +} + +// NewInternalClient is a function variable that can be replaced in tests. +// It defaults to the actual implementation. +var NewInternalClient = func(apiKey string) (client.Apify, error) { + return client.NewApifyClient(apiKey) +} + +// NewClient creates a new LLM Apify client +func NewClient(apiToken string, llmConfig config.LlmConfig, statsCollector *stats.StatsCollector) (*ApifyClient, error) { + client, err := NewInternalClient(apiToken) + if err != nil { + return nil, fmt.Errorf("%w: %v", ErrFailedToCreateClient, err) + } + + if !llmConfig.GeminiApiKey.IsValid() { + return nil, ErrProviderKeyRequired + } + + return &ApifyClient{ + client: client, + statsCollector: statsCollector, + llmConfig: llmConfig, + }, nil +} + +// ValidateApiKey tests if the Apify API token is valid +func (c *ApifyClient) ValidateApiKey() error { + return c.client.ValidateApiKey() +} + +func (c *ApifyClient) Process(workerID string, args teeargs.LLMProcessorArguments, cursor client.Cursor) ([]*teetypes.LLMProcessorResult, client.Cursor, error) { + if c.statsCollector != nil { + c.statsCollector.Add(workerID, stats.LLMQueries, 1) + } + + input := args.ToLLMProcessorRequest() + input.LLMProviderApiKey = string(c.llmConfig.GeminiApiKey) + + limit := uint(args.Items) + dataset, nextCursor, err := c.client.RunActorAndGetResponse(ActorID, input, cursor, limit) + if err != nil { + if c.statsCollector != nil { + c.statsCollector.Add(workerID, stats.LLMErrors, 1) + } + return nil, client.EmptyCursor, err + } + + response := make([]*teetypes.LLMProcessorResult, 0, len(dataset.Data.Items)) + + for i, item := range dataset.Data.Items { + var resp teetypes.LLMProcessorResult + if err := json.Unmarshal(item, &resp); err != nil { + logrus.Warnf("Failed to unmarshal llm result at index %d: %v", i, err) + continue + } + response = append(response, &resp) + } + + if c.statsCollector != nil { + c.statsCollector.Add(workerID, stats.LLMProcessedItems, uint(len(response))) + } + + return response, nextCursor, nil +} diff --git a/internal/jobs/llmapify/client_test.go b/internal/jobs/llmapify/client_test.go new file mode 100644 index 00000000..ed81ae60 --- /dev/null +++ b/internal/jobs/llmapify/client_test.go @@ -0,0 +1,274 @@ +package llmapify_test + +import ( + "encoding/json" + "errors" + "fmt" + "os" + "strconv" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/masa-finance/tee-worker/internal/config" + "github.com/masa-finance/tee-worker/internal/jobs/llmapify" + "github.com/masa-finance/tee-worker/pkg/client" + + teeargs "github.com/masa-finance/tee-types/args" + teetypes "github.com/masa-finance/tee-types/types" +) + +// MockApifyClient is a mock implementation of the ApifyClient. +type MockApifyClient struct { + RunActorAndGetResponseFunc func(actorID string, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) + ValidateApiKeyFunc func() error +} + +func (m *MockApifyClient) RunActorAndGetResponse(actorID string, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { + if m.RunActorAndGetResponseFunc != nil { + return m.RunActorAndGetResponseFunc(actorID, input, cursor, limit) + } + return nil, "", errors.New("RunActorAndGetResponseFunc not defined") +} + +func (m *MockApifyClient) ValidateApiKey() error { + if m.ValidateApiKeyFunc != nil { + return m.ValidateApiKeyFunc() + } + return errors.New("ValidateApiKeyFunc not defined") +} + +var _ = Describe("LLMApifyClient", func() { + var ( + mockClient *MockApifyClient + llmClient *llmapify.ApifyClient + apifyKey string + ) + + BeforeEach(func() { + apifyKey = os.Getenv("APIFY_API_KEY") + mockClient = &MockApifyClient{} + // Replace the client creation function with one that returns the mock + llmapify.NewInternalClient = func(apiKey string) (client.Apify, error) { + return mockClient, nil + } + var err error + llmClient, err = llmapify.NewClient("test-token", config.LlmConfig{GeminiApiKey: "test-llm-key"}, nil) + Expect(err).NotTo(HaveOccurred()) + }) + + Describe("Process", func() { + It("should construct the correct actor input", func() { + args := teeargs.LLMProcessorArguments{ + DatasetId: "test-dataset-id", + Prompt: "test-prompt", + } + + // Marshal and unmarshal to apply defaults + jsonData, err := json.Marshal(args) + Expect(err).ToNot(HaveOccurred()) + err = json.Unmarshal(jsonData, &args) + Expect(err).ToNot(HaveOccurred()) + + mockClient.RunActorAndGetResponseFunc = func(actorID string, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { + Expect(actorID).To(Equal(llmapify.ActorID)) + Expect(limit).To(Equal(uint(1))) + + // Verify the input is correctly converted to LLMProcessorRequest + request, ok := input.(teetypes.LLMProcessorRequest) + Expect(ok).To(BeTrue()) + Expect(request.InputDatasetId).To(Equal("test-dataset-id")) + Expect(request.Prompt).To(Equal("test-prompt")) + Expect(request.LLMProviderApiKey).To(Equal("test-llm-key")) // should be set from constructor + Expect(request.Model).To(Equal(teeargs.LLMDefaultModel)) // default model + Expect(request.MultipleColumns).To(Equal(teeargs.LLMDefaultMultipleColumns)) // default value + Expect(request.MaxTokens).To(Equal(teeargs.LLMDefaultMaxTokens)) // default value + Expect(request.Temperature).To(Equal(strconv.FormatFloat(teeargs.LLMDefaultTemperature, 'f', -1, 64))) // default value + + return &client.DatasetResponse{Data: client.ApifyDatasetData{Items: []json.RawMessage{}}}, "next", nil + } + + _, _, processErr := llmClient.Process("test-worker", args, client.EmptyCursor) + Expect(processErr).NotTo(HaveOccurred()) + }) + + It("should handle errors from the apify client", func() { + expectedErr := errors.New("apify error") + mockClient.RunActorAndGetResponseFunc = func(actorID string, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { + return nil, "", expectedErr + } + + args := teeargs.LLMProcessorArguments{ + DatasetId: "test-dataset-id", + Prompt: "test-prompt", + } + _, _, err := llmClient.Process("test-worker", args, client.EmptyCursor) + Expect(err).To(MatchError(expectedErr)) + }) + + It("should handle JSON unmarshalling errors gracefully", func() { + invalidJSON := []byte(`{"llmresponse": 123}`) // llmresponse should be a string + dataset := &client.DatasetResponse{ + Data: client.ApifyDatasetData{ + Items: []json.RawMessage{invalidJSON}, + }, + } + mockClient.RunActorAndGetResponseFunc = func(actorID string, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { + return dataset, "next", nil + } + + args := teeargs.LLMProcessorArguments{ + DatasetId: "test-dataset-id", + Prompt: "test-prompt", + } + results, _, err := llmClient.Process("test-worker", args, client.EmptyCursor) + Expect(err).NotTo(HaveOccurred()) + Expect(results).To(BeEmpty()) // The invalid item should be skipped + }) + + It("should correctly unmarshal valid items", func() { + llmResultJSON, _ := json.Marshal(map[string]any{ + "llmresponse": "This is a summary of the webpage content.", + }) + dataset := &client.DatasetResponse{ + Data: client.ApifyDatasetData{ + Items: []json.RawMessage{llmResultJSON}, + }, + } + mockClient.RunActorAndGetResponseFunc = func(actorID string, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { + return dataset, "next", nil + } + + args := teeargs.LLMProcessorArguments{ + DatasetId: "test-dataset-id", + Prompt: "test-prompt", + } + results, cursor, err := llmClient.Process("test-worker", args, client.EmptyCursor) + Expect(err).NotTo(HaveOccurred()) + Expect(cursor).To(Equal(client.Cursor("next"))) + Expect(results).To(HaveLen(1)) + Expect(results[0].LLMResponse).To(Equal("This is a summary of the webpage content.")) + }) + + It("should handle multiple valid results", func() { + llmResult1, _ := json.Marshal(map[string]any{ + "llmresponse": "First summary.", + }) + llmResult2, _ := json.Marshal(map[string]any{ + "llmresponse": "Second summary.", + }) + dataset := &client.DatasetResponse{ + Data: client.ApifyDatasetData{ + Items: []json.RawMessage{llmResult1, llmResult2}, + }, + } + mockClient.RunActorAndGetResponseFunc = func(actorID string, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { + return dataset, "next", nil + } + + args := teeargs.LLMProcessorArguments{ + DatasetId: "test-dataset-id", + Prompt: "test-prompt", + } + results, _, err := llmClient.Process("test-worker", args, client.EmptyCursor) + Expect(err).NotTo(HaveOccurred()) + Expect(results).To(HaveLen(2)) + Expect(results[0].LLMResponse).To(Equal("First summary.")) + Expect(results[1].LLMResponse).To(Equal("Second summary.")) + }) + + It("should use custom values when provided", func() { + args := teeargs.LLMProcessorArguments{ + DatasetId: "test-dataset-id", + Prompt: "test-prompt", + MaxTokens: 500, + Temperature: 0.5, + } + + mockClient.RunActorAndGetResponseFunc = func(actorID string, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { + request, ok := input.(teetypes.LLMProcessorRequest) + Expect(ok).To(BeTrue()) + Expect(request.MaxTokens).To(Equal(uint(500))) + Expect(request.Temperature).To(Equal("0.5")) + Expect(request.LLMProviderApiKey).To(Equal("test-llm-key")) // should be set from constructor + + return &client.DatasetResponse{Data: client.ApifyDatasetData{Items: []json.RawMessage{}}}, "next", nil + } + + _, _, err := llmClient.Process("test-worker", args, client.EmptyCursor) + Expect(err).NotTo(HaveOccurred()) + }) + }) + + Describe("ValidateApiKey", func() { + It("should validate the API key", func() { + mockClient.ValidateApiKeyFunc = func() error { + return nil + } + Expect(llmClient.ValidateApiKey()).To(Succeed()) + }) + + It("should return error when validation fails", func() { + expectedErr := errors.New("invalid key") + mockClient.ValidateApiKeyFunc = func() error { + return expectedErr + } + Expect(llmClient.ValidateApiKey()).To(MatchError(expectedErr)) + }) + }) + + // Integration tests that use the real client + Context("Integration tests", func() { + It("should validate API key with real client when both APIFY_API_KEY and GEMINI_API_KEY are set", func() { + geminiKey := os.Getenv("GEMINI_API_KEY") + if apifyKey == "" || geminiKey == "" { + Skip("Both APIFY_API_KEY and GEMINI_API_KEY must be set for integration tests") + } + + // Reset to use real client + llmapify.NewInternalClient = func(apiKey string) (client.Apify, error) { + return client.NewApifyClient(apiKey) + } + + realClient, err := llmapify.NewClient(apifyKey, config.LlmConfig{GeminiApiKey: config.LlmApiKey(geminiKey)}, nil) + Expect(err).NotTo(HaveOccurred()) + Expect(realClient.ValidateApiKey()).To(Succeed()) + }) + + It("should process a real dataset when both APIFY_API_KEY and GEMINI_API_KEY are set", func() { + geminiKey := os.Getenv("GEMINI_API_KEY") + if apifyKey == "" || geminiKey == "" { + Skip("Both APIFY_API_KEY and GEMINI_API_KEY must be set for integration tests") + } + + // Reset to use real client + llmapify.NewInternalClient = func(apiKey string) (client.Apify, error) { + return client.NewApifyClient(apiKey) + } + + realClient, err := llmapify.NewClient(apifyKey, config.LlmConfig{GeminiApiKey: config.LlmApiKey(geminiKey)}, nil) + Expect(err).NotTo(HaveOccurred()) + + args := teeargs.LLMProcessorArguments{ + DatasetId: "V6tyuuZIgfiETl1cl", + Prompt: "summarize the content of this webpage ${markdown}", + } + // Marshal and unmarshal to apply defaults + jsonData, err := json.Marshal(args) + Expect(err).ToNot(HaveOccurred()) + err = json.Unmarshal(jsonData, &args) + Expect(err).ToNot(HaveOccurred()) + + results, cursor, err := realClient.Process("test-worker", args, client.EmptyCursor) + Expect(err).NotTo(HaveOccurred()) + Expect(results).NotTo(BeEmpty()) + Expect(results[0]).NotTo(BeNil()) + Expect(results[0].LLMResponse).NotTo(BeEmpty()) + Expect(cursor).NotTo(BeEmpty()) + + prettyJSON, err := json.MarshalIndent(results, "", " ") + Expect(err).NotTo(HaveOccurred()) + fmt.Println(string(prettyJSON)) + }) + }) +}) diff --git a/internal/jobs/llmapify/llmapify_suite_test.go b/internal/jobs/llmapify/llmapify_suite_test.go new file mode 100644 index 00000000..1533e4e1 --- /dev/null +++ b/internal/jobs/llmapify/llmapify_suite_test.go @@ -0,0 +1,13 @@ +package llmapify_test + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestWebApifyClient(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "WebApify Client Suite") +} diff --git a/internal/jobs/stats/stats.go b/internal/jobs/stats/stats.go index ff7d3044..ada7ab98 100644 --- a/internal/jobs/stats/stats.go +++ b/internal/jobs/stats/stats.go @@ -25,9 +25,13 @@ const ( TwitterAuthErrors StatType = "twitter_auth_errors" TwitterRateErrors StatType = "twitter_ratelimit_errors" TwitterXSearchQueries StatType = "twitterx_search" // TODO: investigate if this is needed or used... - WebSuccess StatType = "web_success" + WebQueries StatType = "web_queries" + WebScrapedPages StatType = "web_scraped_pages" + WebProcessedPages StatType = "web_processed_pages" WebErrors StatType = "web_errors" - WebInvalid StatType = "web_invalid" + LLMQueries StatType = "llm_queries" + LLMProcessedItems StatType = "llm_processed_items" + LLMErrors StatType = "llm_errors" TikTokTranscriptionSuccess StatType = "tiktok_transcription_success" TikTokTranscriptionErrors StatType = "tiktok_transcription_errors" TikTokVideos StatType = "tiktok_returned_videos" @@ -65,7 +69,7 @@ type StatsCollector struct { Stats *Stats Chan chan AddStat jobServer capabilities.JobServerInterface - jobConfiguration config.JobConfiguration + jobConfiguration config.JobConfiguration } // StartCollector starts a goroutine that listens to a channel for AddStat messages and updates the stats accordingly. diff --git a/internal/jobs/telemetry_test.go b/internal/jobs/telemetry_test.go index e3175045..7c2b4732 100644 --- a/internal/jobs/telemetry_test.go +++ b/internal/jobs/telemetry_test.go @@ -33,7 +33,8 @@ var _ = Describe("Telemetry Job", func() { Context("Telemetry Data Fetching", func() { It("should fetch telemetry data and log it", func() { // Add some test stats to the collector - statsCollector.Add("test-worker-1", stats.WebSuccess, 5) + statsCollector.Add("test-worker-1", stats.WebQueries, 5) + statsCollector.Add("test-worker-1", stats.WebScrapedPages, 10) statsCollector.Add("test-worker-1", stats.WebErrors, 2) statsCollector.Add("test-worker-2", stats.TwitterScrapes, 10) statsCollector.Add("test-worker-2", stats.TwitterTweets, 50) diff --git a/internal/jobs/tiktok.go b/internal/jobs/tiktok.go index e702426a..fff685c3 100644 --- a/internal/jobs/tiktok.go +++ b/internal/jobs/tiktok.go @@ -152,14 +152,6 @@ func (ttt *TikTokTranscriber) executeTranscription(j types.Job, a *teeargs.TikTo return types.JobResult{Error: "VideoURL is required"}, fmt.Errorf("videoURL is required") } - // Use the enhanced language selection logic - selectedLanguageKey := tiktokArgs.GetLanguageCode() // This handles defaults automatically - if tiktokArgs.HasLanguagePreference() { - logrus.WithField("job_uuid", j.UUID).Infof("Using custom language preference: %s", selectedLanguageKey) - } else { - logrus.WithField("job_uuid", j.UUID).Infof("Using default language: %s", selectedLanguageKey) - } - // Sub-Step 3.1: Call TikTok Transcription API apiRequestBody := map[string]string{"url": tiktokArgs.GetVideoURL()} jsonBody, err := json.Marshal(apiRequestBody) @@ -229,28 +221,42 @@ func (ttt *TikTokTranscriber) executeTranscription(j types.Job, a *teeargs.TikTo } vttText := "" + languageCode := tiktokArgs.GetLanguageCode() // either requested or default - // Directly use the requested/default language; if missing, return an error - if transcript, ok := parsedAPIResponse.Transcripts[selectedLanguageKey]; ok && strings.TrimSpace(transcript) != "" { - vttText = transcript + if tiktokArgs.HasLanguagePreference() { + if transcript, ok := parsedAPIResponse.Transcripts[tiktokArgs.Language]; ok && strings.TrimSpace(transcript) != "" { + vttText = transcript + } } else { - errMsg := fmt.Sprintf("Transcript for requested language %s not found in API response", selectedLanguageKey) - logrus.WithFields(logrus.Fields{ - "job_uuid": j.UUID, - "requested_lang": selectedLanguageKey, - }).Error(errMsg) - ttt.stats.Add(j.WorkerID, stats.TikTokTranscriptionErrors, 1) - return types.JobResult{Error: errMsg}, fmt.Errorf(errMsg) + // Attempt to use the default language + if transcript, ok := parsedAPIResponse.Transcripts[languageCode]; ok && strings.TrimSpace(transcript) != "" { + vttText = transcript + } else { + // No preference and default not found - return the first available transcript + for langCode, transcript := range parsedAPIResponse.Transcripts { + if strings.TrimSpace(transcript) != "" { + vttText = transcript + languageCode = langCode + break + } + } + } } if vttText == "" { - errMsg := "Suitable transcript could not be extracted from API response" - logrus.WithField("job_uuid", j.UUID).Error(errMsg) + errMsg := "No transcripts found in API response" + if tiktokArgs.HasLanguagePreference() { + errMsg = fmt.Sprintf("Transcript for requested language %s not found in API response", languageCode) + } + logrus.WithFields(logrus.Fields{ + "job_uuid": j.UUID, + "requested_lang": languageCode, + }).Error(errMsg) ttt.stats.Add(j.WorkerID, stats.TikTokTranscriptionErrors, 1) return types.JobResult{Error: errMsg}, fmt.Errorf(errMsg) } - logrus.Debugf("Job %s: Raw VTT content for language %s:\n%s", j.UUID, selectedLanguageKey, vttText) + logrus.Debugf("Job %s: Raw VTT content for language %s:\n%s", j.UUID, languageCode, vttText) // Convert VTT to Plain Text plainTextTranscription, err := convertVTTToPlainText(vttText) @@ -265,7 +271,7 @@ func (ttt *TikTokTranscriber) executeTranscription(j types.Job, a *teeargs.TikTo // Process Result & Return resultData := teetypes.TikTokTranscriptionResult{ TranscriptionText: plainTextTranscription, - DetectedLanguage: selectedLanguageKey, + DetectedLanguage: languageCode, VideoTitle: parsedAPIResponse.VideoTitle, OriginalURL: tiktokArgs.GetVideoURL(), ThumbnailURL: parsedAPIResponse.ThumbnailURL, diff --git a/internal/jobs/tiktok_test.go b/internal/jobs/tiktok_test.go index c4f63730..cb8973c0 100644 --- a/internal/jobs/tiktok_test.go +++ b/internal/jobs/tiktok_test.go @@ -44,11 +44,10 @@ var _ = Describe("TikTok", func() { Context("when a valid TikTok URL is provided", func() { It("should successfully transcribe the video and record success stats", func(ctx SpecContext) { - videoURL := "https://www.tiktok.com/@.jake.ai/video/7516694182245813509" + videoURL := "https://www.tiktok.com/@theblockrunner.com/video/7227579907361066282" jobArguments := map[string]interface{}{ "type": teetypes.CapTranscription, "video_url": videoURL, - // default language is eng-US from tee types } job := types.Job{ diff --git a/internal/jobs/web.go b/internal/jobs/web.go new file mode 100644 index 00000000..75d0d0dc --- /dev/null +++ b/internal/jobs/web.go @@ -0,0 +1,146 @@ +package jobs + +import ( + "encoding/json" + "errors" + "fmt" + + "github.com/sirupsen/logrus" + + "github.com/masa-finance/tee-worker/api/types" + "github.com/masa-finance/tee-worker/internal/config" + "github.com/masa-finance/tee-worker/internal/jobs/llmapify" + "github.com/masa-finance/tee-worker/internal/jobs/stats" + "github.com/masa-finance/tee-worker/internal/jobs/webapify" + "github.com/masa-finance/tee-worker/pkg/client" + + teeargs "github.com/masa-finance/tee-types/args" + "github.com/masa-finance/tee-types/pkg/util" + teetypes "github.com/masa-finance/tee-types/types" +) + +// WebApifyClient defines the interface for the Web Apify client to allow mocking in tests +type WebApifyClient interface { + Scrape(workerID string, args teeargs.WebArguments, cursor client.Cursor) ([]*teetypes.WebScraperResult, string, client.Cursor, error) +} + +// NewWebApifyClient is a function variable that can be replaced in tests. +// It defaults to the actual implementation. +var NewWebApifyClient = func(apiKey string, statsCollector *stats.StatsCollector) (WebApifyClient, error) { + return webapify.NewClient(apiKey, statsCollector) +} + +// LLMApify is the interface for the LLM processor client +// Only the Process method is required for this flow +type LLMApify interface { + Process(workerID string, args teeargs.LLMProcessorArguments, cursor client.Cursor) ([]*teetypes.LLMProcessorResult, client.Cursor, error) +} + +// NewLLMApifyClient is a function variable to allow injection in tests +var NewLLMApifyClient = func(apiKey string, llmConfig config.LlmConfig, statsCollector *stats.StatsCollector) (LLMApify, error) { + return llmapify.NewClient(apiKey, llmConfig, statsCollector) +} + +type WebScraper struct { + configuration config.WebConfig + statsCollector *stats.StatsCollector + capabilities []teetypes.Capability +} + +func NewWebScraper(jc config.JobConfiguration, statsCollector *stats.StatsCollector) *WebScraper { + cfg := jc.GetWebConfig() + logrus.Info("Web scraper via Apify initialized") + return &WebScraper{ + configuration: cfg, + statsCollector: statsCollector, + capabilities: teetypes.WebCaps, + } +} + +func (w *WebScraper) ExecuteJob(j types.Job) (types.JobResult, error) { + logrus.WithField("job_uuid", j.UUID).Info("Starting ExecuteJob for Web scrape") + + // Require Gemini key for LLM processing in Web flow + if !w.configuration.GeminiApiKey.IsValid() { + msg := errors.New("Gemini API key is required for Web job") + return types.JobResult{Error: msg.Error()}, msg + } + + jobArgs, err := teeargs.UnmarshalJobArguments(teetypes.JobType(j.Type), map[string]any(j.Arguments)) + if err != nil { + msg := fmt.Errorf("failed to unmarshal job arguments: %w", err) + return types.JobResult{Error: msg.Error()}, msg + } + + webArgs, ok := jobArgs.(*teeargs.WebArguments) + if !ok { + return types.JobResult{Error: "invalid argument type for Web job"}, errors.New("invalid argument type") + } + logrus.Debugf("web job args: %+v", *webArgs) + + webClient, err := NewWebApifyClient(w.configuration.ApifyApiKey, w.statsCollector) + if err != nil { + return types.JobResult{Error: "error while scraping Web"}, fmt.Errorf("error creating Web Apify client: %w", err) + } + + webResp, datasetId, cursor, err := webClient.Scrape(j.WorkerID, *webArgs, client.EmptyCursor) + if err != nil { + return types.JobResult{Error: fmt.Sprintf("error while scraping Web: %s", err.Error())}, fmt.Errorf("error scraping Web: %w", err) + } + + // Run LLM processing and inject into results (Gemini key already validated) + if datasetId == "" { + return types.JobResult{Error: "missing dataset id from web scraping"}, errors.New("missing dataset id from web scraping") + } + + llmClient, err := NewLLMApifyClient(w.configuration.ApifyApiKey, w.configuration.LlmConfig, w.statsCollector) + if err != nil { + return types.JobResult{Error: "error creating LLM Apify client"}, fmt.Errorf("failed to create LLM Apify client: %w", err) + } + + llmArgs := teeargs.LLMProcessorArguments{ + DatasetId: datasetId, + Prompt: "summarize the content of this webpage, focusing on keywords and topics: ${markdown}", + MaxTokens: teeargs.LLMDefaultMaxTokens, + Temperature: teeargs.LLMDefaultTemperature, + Items: uint(len(webResp)), + } + llmResp, _, llmErr := llmClient.Process(j.WorkerID, llmArgs, client.EmptyCursor) + if llmErr != nil { + return types.JobResult{Error: fmt.Sprintf("error while processing LLM: %s", llmErr.Error())}, fmt.Errorf("error processing LLM: %w", llmErr) + } + + max := util.Min(len(webResp), len(llmResp)) + for i := 0; i < max; i++ { + if webResp[i] != nil { + webResp[i].LLMResponse = llmResp[i].LLMResponse + } + } + + data, err := json.Marshal(webResp) + if err != nil { + return types.JobResult{Error: fmt.Sprintf("error marshalling Web response")}, fmt.Errorf("error marshalling Web response: %w", err) + } + + if w.statsCollector != nil { + w.statsCollector.Add(j.WorkerID, stats.WebProcessedPages, uint(max)) + } + + return types.JobResult{ + Data: data, + Job: j, + NextCursor: cursor.String(), + }, nil +} + +// GetStructuredCapabilities returns the structured capabilities supported by the Web scraper +// based on the available credentials and API keys +func (ws *WebScraper) GetStructuredCapabilities() teetypes.WorkerCapabilities { + capabilities := make(teetypes.WorkerCapabilities) + + if ws.configuration.ApifyApiKey != "" && ws.configuration.GeminiApiKey.IsValid() { + capabilities[teetypes.WebJob] = teetypes.WebCaps + } + + return capabilities +} diff --git a/internal/jobs/web_test.go b/internal/jobs/web_test.go new file mode 100644 index 00000000..b24d3b34 --- /dev/null +++ b/internal/jobs/web_test.go @@ -0,0 +1,249 @@ +package jobs_test + +import ( + "encoding/json" + "errors" + "os" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/masa-finance/tee-worker/api/types" + "github.com/masa-finance/tee-worker/internal/config" + "github.com/masa-finance/tee-worker/internal/jobs" + "github.com/masa-finance/tee-worker/internal/jobs/llmapify" + "github.com/masa-finance/tee-worker/internal/jobs/stats" + "github.com/masa-finance/tee-worker/internal/jobs/webapify" + "github.com/masa-finance/tee-worker/pkg/client" + + teeargs "github.com/masa-finance/tee-types/args" + teetypes "github.com/masa-finance/tee-types/types" +) + +// MockWebApifyClient is a mock implementation of the WebApifyClient. +type MockWebApifyClient struct { + ScrapeFunc func(args teeargs.WebArguments) ([]*teetypes.WebScraperResult, string, client.Cursor, error) +} + +func (m *MockWebApifyClient) Scrape(_ string, args teeargs.WebArguments, _ client.Cursor) ([]*teetypes.WebScraperResult, string, client.Cursor, error) { + if m != nil && m.ScrapeFunc != nil { + res, datasetId, next, err := m.ScrapeFunc(args) + return res, datasetId, next, err + } + return nil, "", client.EmptyCursor, nil +} + +// MockLLMApifyClient is a mock implementation of the LLMApify interface +// used to prevent external calls during unit tests. +type MockLLMApifyClient struct { + ProcessFunc func(workerID string, args teeargs.LLMProcessorArguments, cursor client.Cursor) ([]*teetypes.LLMProcessorResult, client.Cursor, error) +} + +func (m *MockLLMApifyClient) Process(workerID string, args teeargs.LLMProcessorArguments, cursor client.Cursor) ([]*teetypes.LLMProcessorResult, client.Cursor, error) { + if m != nil && m.ProcessFunc != nil { + return m.ProcessFunc(workerID, args, cursor) + } + return []*teetypes.LLMProcessorResult{}, client.EmptyCursor, nil +} + +var _ = Describe("WebScraper", func() { + var ( + scraper *jobs.WebScraper + statsCollector *stats.StatsCollector + job types.Job + mockClient *MockWebApifyClient + mockLLM *MockLLMApifyClient + ) + + // Keep originals to restore after each test to avoid leaking globals + originalNewWebApifyClient := jobs.NewWebApifyClient + originalNewLLMApifyClient := jobs.NewLLMApifyClient + + BeforeEach(func() { + statsCollector = stats.StartCollector(128, config.JobConfiguration{}) + cfg := config.JobConfiguration{ + "apify_api_key": "test-key", + "gemini_api_key": "test-gemini-key", + } + scraper = jobs.NewWebScraper(cfg, statsCollector) + mockClient = &MockWebApifyClient{} + mockLLM = &MockLLMApifyClient{ + ProcessFunc: func(workerID string, args teeargs.LLMProcessorArguments, cursor client.Cursor) ([]*teetypes.LLMProcessorResult, client.Cursor, error) { + // Return a single empty summary to avoid changing expectations + return []*teetypes.LLMProcessorResult{{LLMResponse: ""}}, client.EmptyCursor, nil + }, + } + + // Replace the client creation function with one that returns the mocks + jobs.NewWebApifyClient = func(apiKey string, _ *stats.StatsCollector) (jobs.WebApifyClient, error) { + return mockClient, nil + } + jobs.NewLLMApifyClient = func(apiKey string, llmConfig config.LlmConfig, _ *stats.StatsCollector) (jobs.LLMApify, error) { + return mockLLM, nil + } + + job = types.Job{ + UUID: "test-uuid", + Type: teetypes.WebJob, + } + }) + + AfterEach(func() { + jobs.NewWebApifyClient = originalNewWebApifyClient + jobs.NewLLMApifyClient = originalNewLLMApifyClient + }) + + Context("ExecuteJob", func() { + It("should return an error for invalid arguments", func() { + job.Arguments = map[string]any{"invalid": "args"} + result, err := scraper.ExecuteJob(job) + Expect(err).To(HaveOccurred()) + Expect(result.Error).To(ContainSubstring("failed to unmarshal job arguments")) + }) + + It("should call Scrape and return data and next cursor", func() { + job.Arguments = map[string]any{ + "type": teetypes.WebScraper, + "url": "https://example.com", + "max_depth": 1, + "max_pages": 2, + } + + mockClient.ScrapeFunc = func(args teeargs.WebArguments) ([]*teetypes.WebScraperResult, string, client.Cursor, error) { + Expect(args.URL).To(Equal("https://example.com")) + return []*teetypes.WebScraperResult{{URL: "https://example.com", Markdown: "# Hello"}}, "dataset-123", client.Cursor("next-cursor"), nil + } + + result, err := scraper.ExecuteJob(job) + Expect(err).NotTo(HaveOccurred()) + Expect(result.NextCursor).To(Equal("next-cursor")) + + var resp []*teetypes.WebScraperResult + err = json.Unmarshal(result.Data, &resp) + Expect(err).NotTo(HaveOccurred()) + Expect(resp).To(HaveLen(1)) + Expect(resp[0]).NotTo(BeNil()) + Expect(resp[0].URL).To(Equal("https://example.com")) + }) + + It("should handle errors from the web client", func() { + job.Arguments = map[string]any{ + "type": teetypes.WebScraper, + "url": "https://example.com", + "max_depth": 0, + "max_pages": 1, + } + + expectedErr := errors.New("client error") + mockClient.ScrapeFunc = func(args teeargs.WebArguments) ([]*teetypes.WebScraperResult, string, client.Cursor, error) { + return nil, "", client.EmptyCursor, expectedErr + } + + result, err := scraper.ExecuteJob(job) + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError(ContainSubstring("client error"))) + Expect(result.Error).To(ContainSubstring("error while scraping Web: client error")) + }) + + It("should handle errors when creating the client", func() { + jobs.NewWebApifyClient = func(apiKey string, _ *stats.StatsCollector) (jobs.WebApifyClient, error) { + return nil, errors.New("client creation failed") + } + job.Arguments = map[string]any{ + "type": teetypes.WebScraper, + "url": "https://example.com", + "max_depth": 0, + "max_pages": 1, + } + + result, err := scraper.ExecuteJob(job) + Expect(err).To(HaveOccurred()) + Expect(result.Error).To(Equal("error while scraping Web")) + }) + }) + + // Integration tests that use the real client + Context("Integration tests", func() { + var ( + apifyKey string + geminiKey string + ) + + BeforeEach(func() { + apifyKey = os.Getenv("APIFY_API_KEY") + geminiKey = os.Getenv("GEMINI_API_KEY") + + if apifyKey == "" || geminiKey == "" { + Skip("APIFY_API_KEY and GEMINI_API_KEY required for integration web integration tests") + } + + // Reset to use real client for integration tests + jobs.NewWebApifyClient = func(apiKey string, s *stats.StatsCollector) (jobs.WebApifyClient, error) { + return webapify.NewClient(apiKey, s) + } + jobs.NewLLMApifyClient = func(apiKey string, llmConfig config.LlmConfig, s *stats.StatsCollector) (jobs.LLMApify, error) { + return llmapify.NewClient(apiKey, llmConfig, s) + } + }) + + It("should execute a real web scraping job when keys is set", func() { + cfg := config.JobConfiguration{ + "apify_api_key": apifyKey, + "gemini_api_key": geminiKey, + } + integrationStatsCollector := stats.StartCollector(128, cfg) + integrationScraper := jobs.NewWebScraper(cfg, integrationStatsCollector) + + maxDepth := 1 + maxPages := 3 + + job := types.Job{ + UUID: "integration-test-uuid", + Type: teetypes.WebJob, + Arguments: map[string]any{ + "type": teetypes.WebScraper, + "url": "https://docs.learnbittensor.org", + "max_depth": maxDepth, + "max_pages": maxPages, + }, + } + + result, err := integrationScraper.ExecuteJob(job) + Expect(err).NotTo(HaveOccurred()) + Expect(result.Error).To(BeEmpty()) + Expect(result.Data).NotTo(BeEmpty()) + + var resp []*teetypes.WebScraperResult + err = json.Unmarshal(result.Data, &resp) + + Expect(err).NotTo(HaveOccurred()) + Expect(resp).To(HaveLen(3)) + + for i := 0; i < maxPages; i++ { + Expect(resp[i]).NotTo(BeNil()) + Expect(resp[i].URL).To(ContainSubstring("https://docs.learnbittensor.org")) + Expect(resp[i].LLMResponse).NotTo(BeEmpty()) + Expect(resp[i].Markdown).NotTo(BeEmpty()) + Expect(resp[i].Text).To(ContainSubstring("Bittensor")) + } + }) + + It("should expose capabilities only when both APIFY and GEMINI keys are present", func() { + cfg := config.JobConfiguration{ + "apify_api_key": apifyKey, + "gemini_api_key": geminiKey, + } + integrationStatsCollector := stats.StartCollector(128, cfg) + integrationScraper := jobs.NewWebScraper(cfg, integrationStatsCollector) + + caps := integrationScraper.GetStructuredCapabilities() + if apifyKey != "" && geminiKey != "" { + Expect(caps[teetypes.WebJob]).NotTo(BeEmpty()) + } else { + // Expect no capabilities when either key is missing + _, ok := caps[teetypes.WebJob] + Expect(ok).To(BeFalse()) + } + }) + }) +}) diff --git a/internal/jobs/webapify/client.go b/internal/jobs/webapify/client.go new file mode 100644 index 00000000..601f23e9 --- /dev/null +++ b/internal/jobs/webapify/client.go @@ -0,0 +1,79 @@ +package webapify + +import ( + "encoding/json" + "fmt" + + teeargs "github.com/masa-finance/tee-types/args" + teetypes "github.com/masa-finance/tee-types/types" + "github.com/masa-finance/tee-worker/internal/jobs/stats" + "github.com/masa-finance/tee-worker/pkg/client" + "github.com/sirupsen/logrus" +) + +const ( + ActorID = "apify~website-content-crawler" +) + +type ApifyClient struct { + client client.Apify + statsCollector *stats.StatsCollector +} + +// NewInternalClient is a function variable that can be replaced in tests. +// It defaults to the actual implementation. +var NewInternalClient = func(apiKey string) (client.Apify, error) { + return client.NewApifyClient(apiKey) +} + +// NewClient creates a new Reddit Apify client +func NewClient(apiToken string, statsCollector *stats.StatsCollector) (*ApifyClient, error) { + client, err := NewInternalClient(apiToken) + if err != nil { + return nil, fmt.Errorf("failed to create apify client: %w", err) + } + + return &ApifyClient{ + client: client, + statsCollector: statsCollector, + }, nil +} + +// ValidateApiKey tests if the Apify API token is valid +func (c *ApifyClient) ValidateApiKey() error { + return c.client.ValidateApiKey() +} + +func (c *ApifyClient) Scrape(workerID string, args teeargs.WebArguments, cursor client.Cursor) ([]*teetypes.WebScraperResult, string, client.Cursor, error) { + if c.statsCollector != nil { + c.statsCollector.Add(workerID, stats.WebQueries, 1) + } + + input := args.ToWebScraperRequest() + + limit := uint(args.MaxPages) + dataset, nextCursor, err := c.client.RunActorAndGetResponse(ActorID, input, cursor, limit) + if err != nil { + if c.statsCollector != nil { + c.statsCollector.Add(workerID, stats.WebErrors, 1) + } + return nil, "", client.EmptyCursor, err + } + + response := make([]*teetypes.WebScraperResult, 0, len(dataset.Data.Items)) + + for i, item := range dataset.Data.Items { + var resp teetypes.WebScraperResult + if err := json.Unmarshal(item, &resp); err != nil { + logrus.Warnf("Failed to unmarshal scrape result at index %d: %v", i, err) + continue + } + response = append(response, &resp) + } + + if c.statsCollector != nil { + c.statsCollector.Add(workerID, stats.WebScrapedPages, uint(len(response))) + } + + return response, dataset.DatasetId, nextCursor, nil +} diff --git a/internal/jobs/webapify/client_test.go b/internal/jobs/webapify/client_test.go new file mode 100644 index 00000000..b377ae53 --- /dev/null +++ b/internal/jobs/webapify/client_test.go @@ -0,0 +1,203 @@ +package webapify_test + +import ( + "encoding/json" + "errors" + "os" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/masa-finance/tee-worker/internal/jobs/webapify" + "github.com/masa-finance/tee-worker/pkg/client" + + teeargs "github.com/masa-finance/tee-types/args" +) + +// MockApifyClient is a mock implementation of the ApifyClient. +type MockApifyClient struct { + RunActorAndGetResponseFunc func(actorID string, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) + ValidateApiKeyFunc func() error +} + +func (m *MockApifyClient) RunActorAndGetResponse(actorID string, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { + if m.RunActorAndGetResponseFunc != nil { + return m.RunActorAndGetResponseFunc(actorID, input, cursor, limit) + } + return nil, "", errors.New("RunActorAndGetResponseFunc not defined") +} + +func (m *MockApifyClient) ValidateApiKey() error { + if m.ValidateApiKeyFunc != nil { + return m.ValidateApiKeyFunc() + } + return errors.New("ValidateApiKeyFunc not defined") +} + +var _ = Describe("WebApifyClient", func() { + var ( + mockClient *MockApifyClient + webClient *webapify.ApifyClient + apifyKey string + geminiKey string + ) + + BeforeEach(func() { + apifyKey = os.Getenv("APIFY_API_KEY") + geminiKey = os.Getenv("GEMINI_API_KEY") + mockClient = &MockApifyClient{} + // Replace the client creation function with one that returns the mock + webapify.NewInternalClient = func(apiKey string) (client.Apify, error) { + return mockClient, nil + } + var err error + webClient, err = webapify.NewClient("test-token", nil) + Expect(err).NotTo(HaveOccurred()) + }) + + Describe("Scrape", func() { + It("should construct the correct actor input", func() { + args := teeargs.WebArguments{ + URL: "https://example.com", + MaxDepth: 1, + MaxPages: 2, + } + + mockClient.RunActorAndGetResponseFunc = func(actorID string, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { + Expect(actorID).To(Equal(webapify.ActorID)) + Expect(limit).To(Equal(uint(2))) + return &client.DatasetResponse{Data: client.ApifyDatasetData{Items: []json.RawMessage{}}}, "next", nil + } + + _, _, _, err := webClient.Scrape("test-worker", args, client.EmptyCursor) + Expect(err).NotTo(HaveOccurred()) + }) + + It("should handle errors from the apify client", func() { + expectedErr := errors.New("apify error") + mockClient.RunActorAndGetResponseFunc = func(actorID string, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { + return nil, "", expectedErr + } + + args := teeargs.WebArguments{ + URL: "https://example.com", + MaxDepth: 0, + MaxPages: 1, + } + _, _, _, err := webClient.Scrape("test-worker", args, client.EmptyCursor) + Expect(err).To(MatchError(expectedErr)) + }) + + It("should handle JSON unmarshalling errors gracefully", func() { + invalidJSON := []byte(`{"url": "test", "markdown": 123}`) // markdown should be a string + dataset := &client.DatasetResponse{ + Data: client.ApifyDatasetData{ + Items: []json.RawMessage{invalidJSON}, + }, + } + mockClient.RunActorAndGetResponseFunc = func(actorID string, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { + return dataset, "next", nil + } + + args := teeargs.WebArguments{ + URL: "https://example.com", + MaxDepth: 0, + MaxPages: 1, + } + results, _, _, err := webClient.Scrape("test-worker", args, client.EmptyCursor) + Expect(err).NotTo(HaveOccurred()) + Expect(results).To(BeEmpty()) // The invalid item should be skipped + }) + + It("should correctly unmarshal valid items", func() { + webResultJSON, _ := json.Marshal(map[string]any{ + "url": "https://example.com", + "markdown": "# Hello World", + "title": "Example", + }) + dataset := &client.DatasetResponse{ + Data: client.ApifyDatasetData{ + Items: []json.RawMessage{webResultJSON}, + }, + } + mockClient.RunActorAndGetResponseFunc = func(actorID string, input any, cursor client.Cursor, limit uint) (*client.DatasetResponse, client.Cursor, error) { + return dataset, "next", nil + } + + args := teeargs.WebArguments{ + URL: "https://example.com", + MaxDepth: 0, + MaxPages: 1, + } + results, _, cursor, err := webClient.Scrape("test-worker", args, client.EmptyCursor) + Expect(err).NotTo(HaveOccurred()) + Expect(cursor).To(Equal(client.Cursor("next"))) + Expect(results).To(HaveLen(1)) + Expect(results[0].URL).To(Equal("https://example.com")) + Expect(results[0].Markdown).To(Equal("# Hello World")) + }) + }) + + Describe("ValidateApiKey", func() { + It("should validate the API key", func() { + mockClient.ValidateApiKeyFunc = func() error { + return nil + } + Expect(webClient.ValidateApiKey()).To(Succeed()) + }) + + It("should return error when validation fails", func() { + expectedErr := errors.New("invalid key") + mockClient.ValidateApiKeyFunc = func() error { + return expectedErr + } + Expect(webClient.ValidateApiKey()).To(MatchError(expectedErr)) + }) + }) + + // Integration tests that use the real client + Context("Integration tests", func() { + It("should validate API key with real client when APIFY_API_KEY is set", func() { + if apifyKey == "" || geminiKey == "" { + Skip("APIFY_API_KEY and GEMINI_API_KEY required to run web integration tests") + } + + // Reset to use real client + webapify.NewInternalClient = func(apiKey string) (client.Apify, error) { + return client.NewApifyClient(apiKey) + } + + realClient, err := webapify.NewClient(apifyKey, nil) + Expect(err).NotTo(HaveOccurred()) + Expect(realClient.ValidateApiKey()).To(Succeed()) + }) + + It("should scrape a real URL when APIFY_API_KEY is set", func() { + if apifyKey == "" || geminiKey == "" { + Skip("APIFY_API_KEY and GEMINI_API_KEY required to run web integration tests") + } + + // Reset to use real client + webapify.NewInternalClient = func(apiKey string) (client.Apify, error) { + return client.NewApifyClient(apiKey) + } + + realClient, err := webapify.NewClient(apifyKey, nil) + Expect(err).NotTo(HaveOccurred()) + + args := teeargs.WebArguments{ + URL: "https://example.com", + MaxDepth: 0, + MaxPages: 1, + } + + results, datasetId, cursor, err := realClient.Scrape("test-worker", args, client.EmptyCursor) + Expect(err).NotTo(HaveOccurred()) + Expect(datasetId).NotTo(BeEmpty()) + Expect(results).NotTo(BeEmpty()) + Expect(results[0]).NotTo(BeNil()) + Expect(results[0].URL).To(Equal("https://example.com/")) + Expect(cursor).NotTo(BeEmpty()) + }) + }) +}) diff --git a/internal/jobs/webapify/webapify_suite_test.go b/internal/jobs/webapify/webapify_suite_test.go new file mode 100644 index 00000000..285fb346 --- /dev/null +++ b/internal/jobs/webapify/webapify_suite_test.go @@ -0,0 +1,13 @@ +package webapify_test + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestWebApifyClient(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "WebApify Client Suite") +} diff --git a/internal/jobs/webscraper.go b/internal/jobs/webscraper.go deleted file mode 100644 index e6b49fc9..00000000 --- a/internal/jobs/webscraper.go +++ /dev/null @@ -1,282 +0,0 @@ -package jobs - -import ( - "encoding/json" - "fmt" - "net/http" - "strconv" - "strings" - "time" - - teeargs "github.com/masa-finance/tee-types/args" - teetypes "github.com/masa-finance/tee-types/types" - - "github.com/cenkalti/backoff" - "github.com/gocolly/colly" - "github.com/masa-finance/tee-worker/api/types" - "github.com/masa-finance/tee-worker/internal/config" - "github.com/masa-finance/tee-worker/internal/jobs/stats" - "github.com/sirupsen/logrus" -) - -type WebScraper struct { - configuration WebScraperConfiguration - stats *stats.StatsCollector -} - -type WebScraperConfiguration struct { - Blacklist []string `json:"webscraper_blacklist"` -} - -func NewWebScraper(jc config.JobConfiguration, statsCollector *stats.StatsCollector) *WebScraper { - config := WebScraperConfiguration{} - jc.Unmarshal(&config) - return &WebScraper{ - configuration: config, - stats: statsCollector, - } -} - -// GetStructuredCapabilities returns the structured capabilities supported by the web scraper -func (ws *WebScraper) GetStructuredCapabilities() teetypes.WorkerCapabilities { - return teetypes.WorkerCapabilities{ - teetypes.WebJob: teetypes.AlwaysAvailableWebCaps, - } -} - -func (ws *WebScraper) ExecuteJob(j types.Job) (types.JobResult, error) { - logrus.Info("Starting ExecuteJob for web scraper") - - // Step 1: Use centralized type-safe unmarshaller - jobArgs, err := teeargs.UnmarshalJobArguments(teetypes.JobType(j.Type), map[string]any(j.Arguments)) - if err != nil { - logrus.Warnf("Failed to unmarshal job arguments: %v", err) - ws.stats.Add(j.WorkerID, stats.WebInvalid, 1) - return types.JobResult{Error: fmt.Sprintf("Invalid arguments: %v", err)}, nil - } - - // Type assert to Web arguments - args, ok := jobArgs.(*teeargs.WebSearchArguments) - if !ok { - logrus.Errorf("Expected Web arguments for job ID %s, type %s", j.UUID, j.Type) - return types.JobResult{Error: "invalid argument type for Web job"}, nil - } - logrus.Debugf("Job arguments unmarshaled and validated successfully: %+v", args) - - // Step 2: Validate URL against blacklist - logrus.Debug("Validating URL against blacklist") - for _, u := range ws.configuration.Blacklist { - logrus.Debugf("Checking if URL contains blacklisted term: %s", u) - if strings.Contains(args.URL, u) { - logrus.Warnf("URL %s is blacklisted due to term: %s", args.URL, u) - ws.stats.Add(j.WorkerID, stats.WebInvalid, 1) - return types.JobResult{ - Error: fmt.Sprintf("URL blacklisted: %s", args.URL), - }, nil - } - } - logrus.Infof("URL %s passed blacklist validation", args.URL) - - // Step 3: Use enhanced methods for cleaner logic and validation - logrus.Debugf("Initiating web scraping for URL: %s (max_depth: %d, has_selector: %t, is_deep_scrape: %t)", - args.URL, args.GetEffectiveMaxDepth(), args.HasSelector(), args.IsDeepScrape()) - - // Perform web scraping using the effective max depth - result, err := scrapeWeb([]string{args.URL}, args.GetEffectiveMaxDepth()) - if err != nil { - logrus.Warnf("Web scraping failed for URL %s: %v", args.URL, err) - ws.stats.Add(j.WorkerID, stats.WebErrors, 1) - return types.JobResult{Error: err.Error()}, err - } - logrus.Debugf("Web scraping succeeded for URL %s: %v", args.URL, string(result)) - - // Step 4: Process result and return - logrus.Debugf("Updating statistics for successful web scraping") - ws.stats.Add(j.WorkerID, stats.WebSuccess, 1) - logrus.Infof("Returning web scraping result for URL %s", args.URL) - return types.JobResult{ - Data: result, - }, nil -} - -// Section represents a distinct part of a scraped webpage, typically defined by a heading. -// It contains a Title, representing the heading of the section, and Paragraphs, a slice of strings -// containing the text content found within that section. -type Section struct { - Title string `json:"title"` // Title is the heading text of the section. - Paragraphs []string `json:"paragraphs"` // Paragraphs contains all the text content of the section. - Images []string `json:"images"` // Images storing base64 - maybe!!? -} - -// CollectedData represents the aggregated result of the scraping process. -// It contains a slice of Section structs, each representing a distinct part of a scraped webpage. -type CollectedData struct { - Sections []Section `json:"sections"` // Sections is a collection of webpage sections that have been scraped. - Pages []string `json:"pages"` -} - -// scrapeWeb initiates the scraping process for the given list of URIs. -// It returns a CollectedData struct containing the scraped sections from each URI, -// and an error if any occurred during the scraping process. -// -// Parameters: -// - uri: []string - list of URLs to scrape -// - depth: int - depth of how many subpages to scrape -// -// Returns: -// - []byte - JSON representation of the collected data -// - error - any error that occurred during the scraping process -// -// Example usage: -// -// go func() { -// res, err := scraper.scrapeWeb([]string{"https://en.wikipedia.org/wiki/Maize"}, 5) -// if err != nil { -// logrus.WithError(err).Error("Error collecting data") -// return -// } -// logrus.WithField("result", string(res)).Info("Scraping completed") -// }() -func scrapeWeb(uri []string, depth int) ([]byte, error) { - logrus.Infof("Starting scrapeWeb with parameters: URIs=%v, Depth=%d", uri, depth) - // Set default depth to 1 if 0 is provided - if depth <= 0 { - logrus.Infof("Invalid depth (%d) provided, setting default depth to 1", depth) - depth = 1 - } - - logrus.Info("Initializing CollectedData struct") - var collectedData CollectedData - - logrus.Info("Creating new Colly collector") - c := colly.NewCollector( - colly.Async(true), // Enable asynchronous requests - colly.AllowURLRevisit(), - colly.IgnoreRobotsTxt(), - colly.MaxDepth(depth), - ) - logrus.Info("Colly collector created successfully") - - // Adjust the parallelism and delay based on your needs and server capacity - logrus.Info("Setting scraping limits with parallelism and delay") - limitRule := colly.LimitRule{ - DomainGlob: "*", - Parallelism: 4, // Increased parallelism - Delay: 500 * time.Millisecond, // Reduced delay - } - logrus.Info("Applying scraping limits to the collector") - if err := c.Limit(&limitRule); err != nil { - logrus.Errorf("[-] Unable to set scraper limit. Using default. Error: %v", err) - } - - // Increase the timeout slightly if necessary - logrus.Info("Setting request timeout to 240 seconds") - c.SetRequestTimeout(240 * time.Second) - - // Initialize a backoff strategy - logrus.Info("Initializing exponential backoff strategy") - backoffStrategy := backoff.NewExponentialBackOff() - - logrus.Info("Registering OnError callback to handle request errors") - c.OnError(func(r *colly.Response, err error) { - logrus.Errorf("Error occurred during request to URL: %s. StatusCode: %d, Error: %v", r.Request.URL, r.StatusCode, err) - if r.StatusCode == http.StatusTooManyRequests { - // Parse the Retry-After header (in seconds) - retryAfter, convErr := strconv.Atoi(r.Headers.Get("Retry-After")) - if convErr != nil { - // If not in seconds, it might be a date. Handle accordingly. - logrus.Warnf("Retry-After header is present but unrecognized format: %s", r.Headers.Get("Retry-After")) - } - // Calculate the next delay - nextDelay := backoffStrategy.NextBackOff() - if retryAfter > 0 { - nextDelay = time.Duration(retryAfter) * time.Second - } - logrus.Warnf("Rate limited for URL: %s. Retrying after %v", r.Request.URL, nextDelay) - time.Sleep(nextDelay) - // Retry the request - logrus.Info("Retrying the request") - _ = r.Request.Retry() - - } else { - logrus.Errorf("Request failed for URL: %s with error: %v", r.Request.URL, err) - logrus.Errorf("[-] Request URL: %s failed with error: %v", r.Request.URL, err) - } - }) - - logrus.Info("Registering OnHTML callback for h1, h2 elements (titles)") - c.OnHTML("h1, h2", func(e *colly.HTMLElement) { - logrus.Infof("Title (h1/h2) found: %s", e.Text) - // Directly append a new Section to collectedData.Sections - collectedData.Sections = append(collectedData.Sections, Section{Title: e.Text}) - }) - - logrus.Info("Registering OnHTML callback for paragraph elements") - c.OnHTML("p", func(e *colly.HTMLElement) { - logrus.Infof("Paragraph detected: %s", e.Text) - // Check if there are any sections to append paragraphs to - if len(collectedData.Sections) > 0 { - // Get a reference to the last section - lastSection := &collectedData.Sections[len(collectedData.Sections)-1] - // Append the paragraph to the last section - // Check for duplicate paragraphs before appending - isDuplicate := false - for _, paragraph := range lastSection.Paragraphs { - if paragraph == e.Text { - isDuplicate = true - break - } - } - // Handle dupes - if !isDuplicate { - lastSection.Paragraphs = append(lastSection.Paragraphs, e.Text) - } - } - }) - - logrus.Info("Registering OnHTML callback for image elements") - c.OnHTML("img", func(e *colly.HTMLElement) { - logrus.Infof("Image detected with source URL: %s", e.Attr("src")) - imageURL := e.Request.AbsoluteURL(e.Attr("src")) - if len(collectedData.Sections) > 0 { - lastSection := &collectedData.Sections[len(collectedData.Sections)-1] - lastSection.Images = append(lastSection.Images, imageURL) - } - }) - - logrus.Info("Registering OnHTML callback for anchor elements") - c.OnHTML("a", func(e *colly.HTMLElement) { - logrus.Infof("Link detected: %s", e.Attr("href")) - pageURL := e.Request.AbsoluteURL(e.Attr("href")) - // Check if the URL protocol is supported (http or https) - if strings.HasPrefix(pageURL, "http://") || strings.HasPrefix(pageURL, "https://") { - collectedData.Pages = append(collectedData.Pages, pageURL) - _ = e.Request.Visit(pageURL) - } - }) - - logrus.Infof("Starting to visit URLs: %v", uri) - for _, u := range uri { - err := c.Visit(u) - if err != nil { - logrus.Errorf("Failed to visit URL: %s. Error: %v", u, err) - continue - } - logrus.Infof("Visiting URL: %s", u) - err = c.Visit(u) - if err != nil { - logrus.Errorf("Failed to visit URL: %s. Error: %v", u, err) - return nil, err - } - } - - // Wait for all requests to finish - logrus.Info("Waiting for all requests to complete") - c.Wait() - - logrus.Info("Scraping completed, marshaling collected data into JSON format") - j, _ := json.Marshal(collectedData) - - logrus.Infof("Scraping successful. Returning data for URIs: %v", uri) - return j, nil -} diff --git a/internal/jobs/webscraper_test.go b/internal/jobs/webscraper_test.go deleted file mode 100644 index 88da18ee..00000000 --- a/internal/jobs/webscraper_test.go +++ /dev/null @@ -1,102 +0,0 @@ -package jobs_test - -import ( - "time" - - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" - - teetypes "github.com/masa-finance/tee-types/types" - "github.com/masa-finance/tee-worker/api/types" - "github.com/masa-finance/tee-worker/internal/config" - . "github.com/masa-finance/tee-worker/internal/jobs" - "github.com/masa-finance/tee-worker/internal/jobs/stats" -) - -var statsCollector *stats.StatsCollector - -var _ = Describe("Webscraper", func() { - BeforeEach(func() { - statsCollector = stats.StartCollector(128, config.JobConfiguration{}) - }) - - It("should scrape now", func() { - webScraper := NewWebScraper(config.JobConfiguration{}, statsCollector) - - j := types.Job{ - Type: teetypes.WebJob, - Arguments: map[string]interface{}{ - "url": "https://www.google.com", - }, - WorkerID: "test", - } - res, err := webScraper.ExecuteJob(j) - Expect(err).NotTo(HaveOccurred()) - Expect(res.Error).To(BeEmpty()) - - var scrapedData CollectedData - err = res.Unmarshal(&scrapedData) - Expect(err).NotTo(HaveOccurred()) - - Expect(scrapedData.Pages).ToNot(BeEmpty()) - - Eventually(func() uint { - return statsCollector.Stats.Stats[j.WorkerID][stats.WebSuccess] - }, 5*time.Second, 10*time.Millisecond).Should(BeNumerically("==", 1)) - Eventually(func() uint { - return statsCollector.Stats.Stats[j.WorkerID][stats.WebErrors] - }, 5*time.Second, 10*time.Millisecond).Should(BeNumerically("==", 0)) - }) - - It("does not return data with invalid hosts", func() { - webScraper := NewWebScraper(config.JobConfiguration{}, statsCollector) - - j := types.Job{ - Type: teetypes.WebJob, - Arguments: map[string]interface{}{ - "url": "google", - }, - WorkerID: "test", - } - res, err := webScraper.ExecuteJob(j) - Expect(err).NotTo(HaveOccurred()) - Expect(res.Error).To(Equal("Invalid arguments: failed to unmarshal web job arguments: failed to unmarshal arguments: URL must include a scheme (http:// or https://)")) - - // Don't attempt to unmarshal since the job failed - Eventually(func() uint { - return statsCollector.Stats.Stats[j.WorkerID][stats.WebSuccess] - }, 5*time.Second, 10*time.Millisecond).Should(BeNumerically("==", 0)) - Eventually(func() uint { - return statsCollector.Stats.Stats[j.WorkerID][stats.WebErrors] - }, 5*time.Second, 10*time.Millisecond).Should(BeNumerically("==", 0)) - Eventually(func() uint { - return statsCollector.Stats.Stats[j.WorkerID][stats.WebInvalid] - }, 5*time.Second, 10*time.Millisecond).Should(BeNumerically("==", 1)) - }) - - It("should allow to blacklist urls", func() { - webScraper := NewWebScraper(config.JobConfiguration{ - "webscraper_blacklist": []string{"https://google.com"}, - }, statsCollector) - - j := types.Job{ - Type: teetypes.WebJob, - Arguments: map[string]interface{}{ - "url": "https://google.com", - }, - WorkerID: "test", - } - res, err := webScraper.ExecuteJob(j) - Expect(err).ToNot(HaveOccurred()) - Expect(res.Error).To(Equal("URL blacklisted: https://google.com")) - Eventually(func() uint { - return statsCollector.Stats.Stats[j.WorkerID][stats.WebSuccess] - }, 5*time.Second, 10*time.Millisecond).Should(BeNumerically("==", 0)) - Eventually(func() uint { - return statsCollector.Stats.Stats[j.WorkerID][stats.WebErrors] - }, 5*time.Second, 10*time.Millisecond).Should(BeNumerically("==", 0)) - Eventually(func() uint { - return statsCollector.Stats.Stats[j.WorkerID][stats.WebInvalid] - }, 5*time.Second, 10*time.Millisecond).Should(BeNumerically("==", 1)) - }) -}) diff --git a/pkg/client/apify_client.go b/pkg/client/apify_client.go index 6e0f615c..64413420 100644 --- a/pkg/client/apify_client.go +++ b/pkg/client/apify_client.go @@ -56,7 +56,8 @@ type ApifyDatasetData struct { // DatasetResponse represents the response from getting dataset items type DatasetResponse struct { - Data ApifyDatasetData `json:"data"` + Data ApifyDatasetData `json:"data"` + DatasetId string `json:"dataset_id"` } // CursorData represents the pagination data stored in cursor @@ -341,6 +342,9 @@ PollLoop: return nil, "", fmt.Errorf("failed to get dataset items: %w", err) } + // Propagate dataset id for downstream consumers + dataset.DatasetId = runResp.Data.DefaultDatasetId + // 4. Generate next cursor if more data may be available var nextCursor Cursor if uint(len(dataset.Data.Items)) == limit { diff --git a/tee/masa-tee-worker.json b/tee/masa-tee-worker.json index ef2a245c..00678377 100644 --- a/tee/masa-tee-worker.json +++ b/tee/masa-tee-worker.json @@ -39,6 +39,7 @@ {"name": "TWITTER_ACCOUNTS", "fromHost":true}, {"name": "TWITTER_API_KEYS", "fromHost":true}, {"name": "APIFY_API_KEY", "fromHost":true}, + {"name": "GEMINI_API_KEY", "fromHost":true}, {"name": "TWITTER_SKIP_LOGIN_VERIFICATION", "fromHost":true}, {"name": "WEBSCRAPER_BLACKLIST", "fromHost":true} ],