Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion deployment/build-and-stage.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,20 @@ steps:
args: ['push', '--all-tags', 'gcr.io/oss-vdb/importer']
waitFor: ['build-importer', 'cloud-build-queue']

- name: 'gcr.io/cloud-builders/docker'
entrypoint: 'bash'
args: ['-c', 'docker pull gcr.io/oss-vdb/osv-worker:latest || exit 0']
id: 'pull-osv-worker'
waitFor: ['setup']
- name: gcr.io/cloud-builders/docker
args: ['build', '-t', 'gcr.io/oss-vdb/osv-worker:latest', '-t', 'gcr.io/oss-vdb/osv-worker:$COMMIT_SHA', '-f', 'cmd/worker/Dockerfile', '--cache-from', 'gcr.io/oss-vdb/osv-worker:latest', '--pull', '.']
dir: 'go'
id: 'build-osv-worker'
waitFor: ['pull-osv-worker']
- name: gcr.io/cloud-builders/docker
args: ['push', '--all-tags', 'gcr.io/oss-vdb/osv-worker']
waitFor: ['build-osv-worker', 'cloud-build-queue']

- name: 'gcr.io/cloud-builders/docker'
entrypoint: 'bash'
args: ['-c', 'docker pull gcr.io/oss-vdb/exporter:latest || exit 0']
Expand Down Expand Up @@ -421,7 +435,7 @@ steps:
- name: 'gcr.io/cloud-builders/gcloud'
args: ['deploy', 'releases', 'create', 'osv-$SHORT_SHA', '--project=oss-vdb', '--region=us-central1',
'--delivery-pipeline=gke-workers', '--images',
"worker=gcr.io/oss-vdb/worker:$COMMIT_SHA,\
"worker=gcr.io/oss-vdb/osv-worker:$COMMIT_SHA,\
importer=gcr.io/oss-vdb/importer:$COMMIT_SHA,\
exporter=gcr.io/oss-vdb/exporter:$COMMIT_SHA,\
staging-api-test=gcr.io/oss-vdb-test/staging-api-test:$COMMIT_SHA,\
Expand Down Expand Up @@ -484,6 +498,7 @@ images:
- 'gcr.io/oss-vdb/worker-base:$COMMIT_SHA'
- 'gcr.io/oss-vdb/worker:$COMMIT_SHA'
- 'gcr.io/oss-vdb/importer:$COMMIT_SHA'
- 'gcr.io/oss-vdb/osv-worker:$COMMIT_SHA'
- 'gcr.io/oss-vdb/exporter:$COMMIT_SHA'
- 'gcr.io/oss-vdb/cron:$COMMIT_SHA'
- 'gcr.io/oss-vdb/alpine-cve-convert:$COMMIT_SHA'
Expand Down
2 changes: 1 addition & 1 deletion deployment/clouddeploy/gke-workers/base/scaler.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ spec:
resource.labels.subscription_id: tasks
target:
type: AverageValue
averageValue: 1
averageValue: 10 # each worker can handle 10 tasks now
type: External
scaleTargetRef:
apiVersion: apps/v1
Expand Down
17 changes: 8 additions & 9 deletions deployment/clouddeploy/gke-workers/base/workers.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,21 @@ spec:
- name: worker-private
image: worker
imagePullPolicy: Always
volumeMounts:
- mountPath: "/work"
name: "ssd"
env:
- name: GITTER_HOST
value: http://gitter-service:8888
securityContext:
privileged: true
- name: PUBSUB_SUBSCRIPTION
value: tasks
- name: DATASTORE_DATABASE_ID
value: "" # default
- name: FAILED_TASKS_TOPIC
value: failed-tasks
- name: NOTIFY_PYPI
value: "false"
resources:
requests:
cpu: 1
memory: "10G"
limits:
cpu: 2
memory: "13G"
volumes:
- name: "ssd"
hostPath:
path: "/mnt/disks/ssd0"
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,4 @@ spec:
- name: GOOGLE_CLOUD_PROJECT
value: oss-vdb-test
- name: OSV_VULNERABILITIES_BUCKET
value: osv-test-vulnerabilities
args:
# TODO(michaelkedar): ssh secrets
# TODO(michaelkedar): Somehow grab or enforce redis endpoint from terraform
- "--redis_host=10.102.25.214"
value: osv-test-vulnerabilities
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,5 @@ spec:
value: oss-vdb
- name: OSV_VULNERABILITIES_BUCKET
value: osv-vulnerabilities
args:
- "--ssh_key_public=/secrets/ssh.pub"
- "--ssh_key_private=/secrets/ssh"
- "--redis_host=10.102.25.213"
volumeMounts:
- mountPath: "/secrets"
name: "secrets"
volumes:
- name: secrets
secret:
secretName: secrets
items:
- key: ssh
path: ssh
mode: 0600
- key: ssh.pub
path: ssh.pub
- name: NOTIFY_PYPI
value: "true"
2 changes: 1 addition & 1 deletion deployment/deploy-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ steps:
- name: gcr.io/cloud-builders/gcloud
args: ['container', 'images', 'add-tag', '--quiet', 'gcr.io/oss-vdb/worker-base:$COMMIT_SHA', 'gcr.io/oss-vdb/worker-base:$TAG_NAME']
- name: gcr.io/cloud-builders/gcloud
args: ['container', 'images', 'add-tag', '--quiet', 'gcr.io/oss-vdb/worker:$COMMIT_SHA', 'gcr.io/oss-vdb/worker:$TAG_NAME']
args: ['container', 'images', 'add-tag', '--quiet', 'gcr.io/oss-vdb/osv-worker:$COMMIT_SHA', 'gcr.io/oss-vdb/osv-worker:$TAG_NAME']
- name: gcr.io/cloud-builders/gcloud
args: ['container', 'images', 'add-tag', '--quiet', 'gcr.io/oss-vdb/oss-fuzz-worker:$COMMIT_SHA', 'gcr.io/oss-vdb/oss-fuzz-worker:$TAG_NAME']
- name: gcr.io/cloud-builders/gcloud
Expand Down
31 changes: 31 additions & 0 deletions go/cmd/worker/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

FROM golang:1.26.2-alpine@sha256:f85330846cde1e57ca9ec309382da3b8e6ae3ab943d2739500e08c86393a21b1 AS build

WORKDIR /src

COPY ./go.mod /src/go.mod
COPY ./go.sum /src/go.sum
RUN go mod download && go mod verify


COPY ./ /src/
RUN CGO_ENABLED=0 go build -o worker ./cmd/worker

FROM gcr.io/distroless/static-debian12@sha256:20bc6c0bc4d625a22a8fde3e55f6515709b32055ef8fb9cfbddaa06d1760f838
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just double checking running this locally works fine? no problems with musl because we are compiling on alpine because CGO is 0?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, running locally seems to work


COPY --from=build /src/worker /

ENTRYPOINT ["/worker"]
131 changes: 131 additions & 0 deletions go/cmd/worker/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// Package main implements the OSV worker for ingesting and enriching upstream vulns sent from the importer
package main

import (
"context"
"errors"
"flag"
"log/slog"
"net/http"
"os"
"os/signal"
"strconv"
"syscall"
"time"

"cloud.google.com/go/datastore"
"cloud.google.com/go/pubsub/v2"
"cloud.google.com/go/storage"
db "github.com/google/osv.dev/go/internal/database/datastore"
"github.com/google/osv.dev/go/internal/worker"
"github.com/google/osv.dev/go/internal/worker/pipeline/registry"
"github.com/google/osv.dev/go/logger"
"github.com/google/osv.dev/go/osv/clients"
"github.com/google/osv.dev/go/osv/ecosystem"
)

func main() {
if err := run(); err != nil {
os.Exit(1)
}
}

func run() error {
logger.InitGlobalLogger()
defer logger.Close()
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()

logger.DebugContext(ctx, "worker starting")

project := os.Getenv("GOOGLE_CLOUD_PROJECT")
if project == "" {
logger.ErrorContext(ctx, "GOOGLE_CLOUD_PROJECT environment variable is not set")
return errors.New("GOOGLE_CLOUD_PROJECT environment variable is not set")
}
gitterHost := os.Getenv("GITTER_HOST")
if gitterHost == "" {
logger.ErrorContext(ctx, "GITTER_HOST environment variable is not set")
return errors.New("GITTER_HOST environment variable is not set")
}

numWorkers := flag.Int("num-workers", 10, "Number of workers used to process tasks")

flag.Parse()

pubsubSubscription := envOrDefault("PUBSUB_SUBSCRIPTION", "tasks")
datastoreID := envOrDefault("DATASTORE_DATABASE_ID", "") // empty string is the (default) database
vulnBucket := envOrDefault("OSV_VULNERABILITIES_BUCKET", "osv-test-vulnerabilities")
failTasksTopic := envOrDefault("FAILED_TASKS_TOPIC", "failed-tasks")
notifyPyPI, _ := strconv.ParseBool(envOrDefault("NOTIFY_PYPI", "false")) // returns false on error

// Plug in all the connections to the engine
dsClient, err := datastore.NewClientWithDatabase(ctx, project, datastoreID)
if err != nil {
logger.ErrorContext(ctx, "Failed to create datastore client", slog.Any("error", err))
return err
}
defer dsClient.Close()

gcsClient, err := storage.NewClient(ctx)
if err != nil {
logger.ErrorContext(ctx, "Failed to create storage client", slog.Any("error", err))
return err
}
defer gcsClient.Close()

psClient, err := pubsub.NewClient(ctx, project)
if err != nil {
logger.ErrorContext(ctx, "Failed to create pubsub client", slog.Any("error", err))
return err
}
defer psClient.Close()

stores := worker.Stores{
SourceRepo: db.NewSourceRepositoryStore(dsClient),
Vulnerability: db.NewVulnerabilityStore(db.VulnStoreConfig{
Client: dsClient,
GCS: clients.NewGCSClient(gcsClient, vulnBucket),
FailedWritePublisher: &clients.GCPPublisher{Publisher: psClient.Publisher(failTasksTopic)},
}),
Relations: db.NewRelationsStore(dsClient),
ImportFindings: db.NewImportFindingsStore(dsClient),
}

engine := worker.Engine{
Stores: stores,
Pipeline: registry.List,

GitterHost: gitterHost,
GitterClient: &http.Client{Timeout: 1 * time.Hour},
EcosystemProvider: ecosystem.DefaultProvider,
}

if notifyPyPI {
engine.NotifyPyPI = true
engine.Stores.PyPIPublisher = &clients.GCPPublisher{Publisher: psClient.Publisher("pypi-bridge")}
}

// Set up and run the subscriber
sub := psClient.Subscriber(pubsubSubscription)
sub.ReceiveSettings.MaxOutstandingMessages = *numWorkers
sub.ReceiveSettings.MaxOutstandingBytes = -1 // no limit - we can give lots of memory to these machines
sub.ReceiveSettings.MaxExtension = 6 * time.Hour
sub.ReceiveSettings.MaxDurationPerAckExtension = 10 * time.Minute
subscriber := worker.Subscriber{
Engine: engine,
PubSubSub: sub,
}

return subscriber.Run(ctx)
}

// envOrDefault retrieves the value of the environment variable named by the key.
// If the variable is not present in the environment, it returns the defaultValue.
func envOrDefault(key, defaultValue string) string {
if value, exists := os.LookupEnv(key); exists {
return value
}

return defaultValue
}
Loading