Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP - V2 alpha #1

Merged
merged 91 commits into from
May 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
91 commits
Select commit Hold shift + click to select a range
51a4e5d
added go modules support
Mar 16, 2020
7c9ae3e
managed to build
Apr 6, 2020
1a8b50c
add configuration, fix dockerfile, add azure
Apr 19, 2020
cf8a1a1
add more coverage for producer-consumer
Apr 21, 2020
34cfed2
refactor, add naive e2e test scenario, better support for sigterm han…
Apr 22, 2020
868543d
add support for shared access key
Apr 22, 2020
ebd1911
small fixes, bootstrap sqs provider
Apr 22, 2020
b858ab8
add sqs tests
Apr 22, 2020
3bc698b
add support for writeToSource error handler
Apr 23, 2020
a33e6bd
fix
Apr 23, 2020
ea3bad3
refactor backoff
Apr 23, 2020
08c604a
add documentation for azure source
Apr 23, 2020
d3d6503
update some defaults, add some documentation
Apr 23, 2020
68d4da1
add error codes
Apr 23, 2020
069bf2e
update producer default
Apr 23, 2020
8eef23a
fixed metrics, add support for writeToSource Error
Apr 27, 2020
9d44c5a
try to add ci
Apr 27, 2020
62803de
another attempt
Apr 27, 2020
1827b2f
fix
Apr 27, 2020
5dbedff
-
Apr 27, 2020
477c17f
added sqs test
Apr 27, 2020
981a6f7
add error tests and cleand a bit
Apr 27, 2020
23a83d4
small fix in worker
Apr 27, 2020
7bbced6
add delay
Apr 27, 2020
33838f3
azurerite fixes
Apr 27, 2020
401155b
fixed error handling and tests
Apr 27, 2020
f495655
add cleaning for azurite
Apr 27, 2020
47412b5
add push for working versions
Apr 30, 2020
2ff7c03
add more params
Apr 30, 2020
9a871b6
-
Apr 30, 2020
8fde78d
change retryable to abort
Apr 30, 2020
fcf7bc7
move metrics to a different place
Apr 30, 2020
2dd4ceb
small experiment with docker buildx
May 1, 2020
52e1f68
fix
May 1, 2020
9e0839b
another attempt
May 1, 2020
0df5c2b
-
May 1, 2020
4409824
try to add cache in registry
May 1, 2020
5a77eb4
another
May 1, 2020
ecbfc3a
-
May 1, 2020
e951a3b
-
May 1, 2020
aa0b252
another attempt
May 3, 2020
1646a24
try with bake
May 3, 2020
cf124e6
-
May 3, 2020
2b17064
another try
May 3, 2020
9d67637
-
May 3, 2020
61f7bd6
-
May 3, 2020
e4786b3
-
May 3, 2020
571b6f8
-
May 3, 2020
31b2078
fix
May 3, 2020
0423cff
attempt to add folder cache
May 3, 2020
0e8c309
-
May 3, 2020
73b2291
-
May 3, 2020
ee64d1a
-
May 3, 2020
f68643e
fix
May 3, 2020
b11fa4d
remove cache
May 3, 2020
8e0270d
add write to dockerhub
May 4, 2020
cf44752
another test with builder
May 4, 2020
2b40776
-
May 4, 2020
6b52780
-
May 4, 2020
ab0099b
-
May 4, 2020
90170f8
-
May 4, 2020
754f103
-
May 4, 2020
94356f9
add login
May 4, 2020
fd632ee
-
May 4, 2020
c80dab7
-
May 4, 2020
f376151
-
May 4, 2020
2b51c8a
-
May 4, 2020
c82e00c
add commandline parsing
May 8, 2020
c4cdc41
poc servicebus
May 8, 2020
4556cb7
-
May 8, 2020
45ac77c
add servicebus to providers
May 8, 2020
0f7db99
fix package name
May 8, 2020
1716540
refactor pipes
May 11, 2020
c4451b6
refactor in configuration
May 11, 2020
a7ae9e2
add default source
May 11, 2020
b68d10f
added multiple sources, add tests, fix lots of bugs
May 11, 2020
1f19a18
added none handler, changed config a bit
May 12, 2020
f5922bd
add support for env variables.
May 12, 2020
4b3b4c4
small fixes
May 13, 2020
eeeb17d
fix in metrics and complete handling
May 13, 2020
3a64c64
add support for prefetchCount, fix some issues with request context.
May 13, 2020
72b8cd7
added error param to abort
May 13, 2020
0ad6e72
fix logger
May 13, 2020
c1156a5
fix
May 13, 2020
a9bab28
fix scope in logger, fix azure complete status
May 13, 2020
e049bae
typo
May 13, 2020
1826705
add support fo service-bus hacks
May 13, 2020
d0269b6
fix shutdown
May 13, 2020
9360f95
add support for multiple source for pipe in config
May 13, 2020
97e43a7
remove print
May 13, 2020
2f2ea0c
fix panic
May 17, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
integration/**/*
Dockerfile
docs/**/*
benchmark/**/*
.vscode
38 changes: 38 additions & 0 deletions .github/workflows/push.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
on: push
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: install latest buildx
run: mkdir -p ~/.docker/cli-plugins && wget -O ~/.docker/cli-plugins/docker-buildx https://github.com/docker/buildx/releases/download/v0.4.1/buildx-v0.4.1.linux-amd64 && chmod a+x ~/.docker/cli-plugins/docker-buildx
- name: build
env:
DOCKER_BUILDKIT: 1
DOCKER_CLI_EXPERIMENTAL: enabled
working-directory: ./integration/docker
run: docker buildx bake -f ./docker-compose.base.yaml -f ./docker-compose.worker.yaml -f ./docker-compose.producer.yaml --load
- name: test-multi
working-directory: ./integration/tests
run: ./multi.sh
- name: Push to GitHub Packages
uses: docker/build-push-action@v1
env:
DOCKER_BUILDKIT: 1
with:
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
registry: docker.pkg.github.com
repository: soluto/dqd/app
tag_with_sha: ${{ github.ref == 'refs/heads/master'}}
tag_with_ref: ${{ github.ref != 'refs/heads/master' || startsWith(github.ref, 'refs/tags/') }}
- name: Push to Dockerhub
uses: docker/build-push-action@v1
env:
DOCKER_BUILDKIT: 1
with:
username: solutodqd
password: ${{ secrets.DOCKERHUB_TOKEN }}
repository: soluto/dqd
tag_with_sha: ${{ github.ref == 'refs/heads/master'}}
tag_with_ref: ${{ github.ref != 'refs/heads/master' || startsWith(github.ref, 'refs/tags/') }}
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@
*.out

node_modules
.vscode/launch.json
.vscode/launch.json
cloud-envs
38 changes: 25 additions & 13 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,18 +1,30 @@
# ------------- Build ----------------------
FROM golang:1.10.1-stretch as build
# syntax = docker/dockerfile:1.0-experimental
FROM golang:1.14.0-alpine as builder
RUN apk update && apk add --no-cache git ca-certificates && update-ca-certificates

ENV CGO_ENABLED=0
ENV GOOS=linux
ENV GOBIN=$GOPATH/bin
WORKDIR /src
ENV UID=10001
RUN adduser \
--disabled-password \
--gecos "" \
--home "/nonexistent" \
--shell "/sbin/nologin" \
--no-create-home \
--uid "${UID}" \
appuser

WORKDIR /src
COPY go.mod go.sum ./
RUN --mount=type=cache,target=/root/.cache/go-build go mod download
RUN go mod verify
COPY . .
RUN go get .
RUN go build -o main .

# ------------- Release ----------------------
FROM scratch as release
RUN --mount=type=cache,target=/root/.cache/go-build CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o /go/bin/dqd

FROM scratch

COPY --from=build /etc/ssl/certs/ /etc/ssl/certs/
COPY --from=build /src/main /
CMD [ "/main" ]
COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/
COPY --from=builder /go/bin/dqd /dqd
COPY --from=builder /etc/passwd /etc/passwd
COPY --from=builder /etc/group /etc/group
USER appuser:appuser
ENTRYPOINT ["/dqd"]
54 changes: 54 additions & 0 deletions cmd/dqd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package cmd

import (
"fmt"
"os"
"strings"

gofigure "github.com/NCAR/go-figure"
"github.com/spf13/pflag"
"github.com/spf13/viper"
)

func ConfigurationError(err error) {
fmt.Println(err)
pflag.Usage()
os.Exit(1)
}

func Load() (*viper.Viper, error) {
v := viper.New()
configFiles := pflag.CommandLine.StringSlice("config", []string{"./dqd.yaml"}, "Load a DQD config file")
configDirs := pflag.CommandLine.StringSlice("configDir", []string{"/etc/dqd", "/dqd/config"}, "Lookup for config files in these folders")
configOverrides := pflag.CommandLine.StringSlice("set", []string{}, "Override specific configuration keys")
pflag.Parse()
v.SetConfigType("yaml")
v.SetEnvPrefix("dqd")
v.AutomaticEnv()
err := gofigure.Parse(v, *configDirs)
if err != nil {
panic(fmt.Errorf("Fatal error config file: %s \n", err))
}
for _, f := range *configFiles {
r, err := os.Open(f)
defer r.Close()
if err == nil {
v.MergeConfig(r)
} else {
if f != "./dqd.yaml" {
return nil, err
}
}

}

for _, override := range *configOverrides {
entry := strings.Split(override, "=")
if len(entry) != 2 {
return nil, fmt.Errorf("invalid set value '%v'", override)
}
v.Set(entry[0], entry[1])
}

return v, nil
}
173 changes: 173 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
package config

import (
"fmt"

"github.com/soluto/dqd/handlers"
"github.com/soluto/dqd/listeners"
"github.com/soluto/dqd/pipe"
"github.com/soluto/dqd/providers/azure"
"github.com/soluto/dqd/providers/servicebus"
"github.com/soluto/dqd/providers/sqs"
"github.com/soluto/dqd/utils"
v1 "github.com/soluto/dqd/v1"
"github.com/spf13/viper"
)

type App struct {
Sources map[string]*v1.Source
Listeners []listeners.Listener
Workers []*pipe.Worker
}

var sourceProviders = map[string]struct {
v1.ConsumerFactory
v1.ProducerFactory
}{
"azure-queue": {
&azure.AzureQueueClientFactory{},
&azure.AzureQueueClientFactory{},
},
"sqs": {
&sqs.SQSClientFactory{},
&sqs.SQSClientFactory{},
},
"service-bus": {
&servicebus.ServiceBusClientFactory{},
&servicebus.ServiceBusClientFactory{},
},
"io": {
&utils.IoSourceFactory{},
&utils.IoSourceFactory{},
},
}

func createSources(v *viper.Viper) map[string]*v1.Source {
sources := map[string]*v1.Source{}
for sourceName, subSource := range utils.ViperSubMap(v, "sources") {
sourceType := subSource.GetString("type")
factory, exist := sourceProviders[sourceType]
if !exist {
panic(fmt.Errorf("FATAL - Unkown source provider:%v", sourceType))
}
sources[sourceName] = v1.NewSource(factory, factory, subSource, sourceName)
}
return sources
}

func getSource(sources map[string]*v1.Source, sourceName string) *v1.Source {
source, exists := sources[sourceName]
if !exists {
panic(fmt.Sprintf("Missing source definition: %v", sourceName))
}
return source
}

func getPipeSources(sources map[string]*v1.Source, v *viper.Viper) (pipeSources []*v1.Source) {
sourcesConfig := v.GetStringSlice("sources")
for _, s := range sourcesConfig {
pipeSources = append(pipeSources, getSource(sources, s))
}
if len(pipeSources) == 0 {
pipeSources = []*v1.Source{getSource(sources, v.GetString("source"))}
}
return
}

func createHandler(v *viper.Viper) handlers.Handler {
if v == nil {
panic("no handler define for pipe, use 'none' handler if it's the desired behavior")
}
if v.Get("none") != nil {
return handlers.None
}
v.SetDefault("http.path", "/")
v.SetDefault("http.host", "localhost")
v.SetDefault("http.port", 80)

httpEndpoint := v.GetString("http.endpoint")
if httpEndpoint == "" {
httpEndpoint = fmt.Sprintf("http://%v:%v%v", v.GetString("http.host"), v.GetString("http.port"), v.GetString("http.path"))
}
return handlers.NewHttpHandler(httpEndpoint)
}

func createWorkers(v *viper.Viper, sources map[string]*v1.Source) []*pipe.Worker {
var wList []*pipe.Worker
pipesConfig := utils.ViperSubMap(v, "pipes")
for name, pipeConfig := range pipesConfig {
pipeConfig.SetDefault("rate.init", 10)
pipeConfig.SetDefault("rate.min", 1)
pipeConfig.SetDefault("rate.window", "30s")
pipeConfig.SetDefault("source", "default")
handler := createHandler(pipeConfig.Sub("handler"))

pipeSources := getPipeSources(sources, pipeConfig)

var opts = []pipe.WorkerOption{}
writeToSource := pipeConfig.GetString("onError.writeTo.source")
if writeToSource != "" {
opts = append(opts, pipe.WithErrorSource(getSource(sources, writeToSource)))
}

if pipeConfig.IsSet("rate.fixed") {
opts = append(opts, pipe.WithFixedRate(pipeConfig.GetInt("rate.fixed")))
} else {
opts = append(opts, pipe.WithDynamicRate(pipeConfig.GetInt("rate.init"),
pipeConfig.GetInt("rate.min"),
pipeConfig.GetDuration("rate.window")))
}
output := pipeConfig.GetString("output")
if output != "" {
opts = append(opts, pipe.WithOutput(getSource(sources, output)))
} else {
opts = append(opts, pipe.WithDynamicRate(pipeConfig.GetInt("rate.init"),
pipeConfig.GetInt("rate.min"),
pipeConfig.GetDuration("rate.window")))
}

wList = append(wList, pipe.NewWorker(
name,
pipeSources,
handler,
opts...,
))
}
return wList
}

func createListeners(v *viper.Viper, sources map[string]*v1.Source) []listeners.Listener {
v.SetDefault("listeners.http.host", "0.0.0.0:9999")
host := v.GetString("listeners.http.host")
listener := listeners.Http(host)
for _, s := range sources {
listener.Add(s, viper.New())
}
return []listeners.Listener{listener}
}

func CreateApp(v *viper.Viper) (_ *App, err error) {
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("%v", r)
}
}()

err = utils.NormalizeEntityConfig(v, "pipe", "pipes")
if err != nil {
return
}
err = utils.NormalizeEntityConfig(v, "source", "sources")
if err != nil {
return
}

sources := createSources(v)
listeners := createListeners(v, sources)
workers := createWorkers(v, sources)
return &App{
sources,
listeners,
workers,
}, nil
}
Empty file added docs/examples/azure.yaml
Empty file.
2 changes: 2 additions & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# DQD

20 changes: 20 additions & 0 deletions docs/sources/azure.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@


```yaml
source:
type: azure-queue

# Location
storageAccount: test
queue: dqd
#connection: http://azure:10001/devstoreaccount1 useful for local testing

# Credentials
storageAccountKey: ****
sasToken: ****

# Options
visibilityTimeoutInSeconds: 100 # defaults to 60
maxDequeueCount: 1 # deaults to 5
retryVisiblityTimeoutInSeconds: [10, 500, 600]
```
10 changes: 10 additions & 0 deletions docs/sources/service-bus.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
```yaml
source:
type: service-bus

connectionString: ""
topic: "my-topic"
subscription: "my sub"

prefetchCount: 30 #defaults to 1
```
19 changes: 19 additions & 0 deletions docs/sources/sqs.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@

# SQS Source

Credentials are taken by the aws sdk defaukt provider chain:
https://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/configuring-sdk.html

```yaml
source:
type: sqs


# Location
url:
region: us-east-1
#endpoint: http://sqs:9324 useful for local testing

# Options
visibilityTimeoutInSeconds: 100 # defaults to 30
```
Loading