Skip to content

Commit

Permalink
Merge pull request #2 from affix/feature/openfaas_connector_sdk
Browse files Browse the repository at this point in the history
FIX #1 Move to OpenFaaS Connector SDK
  • Loading branch information
affix committed Jun 9, 2019
2 parents 9a4dc55 + 427b34f commit eecb275
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 27 deletions.
1 change: 0 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ RUN apk -U add curl git && \
apk del curl
WORKDIR /go/src/github.com/affix/sidekiq-connector

COPY vendor vendor
COPY types types
COPY Gopkg.lock Gopkg.lock
COPY Gopkg.toml Gopkg.toml
Expand Down
44 changes: 40 additions & 4 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,7 @@
[[constraint]]
branch = "master"
name = "github.com/jrallison/go-workers"

[[constraint]]
name = "github.com/openfaas-incubator/connector-sdk"
version = "0.3.1"
7 changes: 4 additions & 3 deletions Jenkinsfile
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ spec:

def image = "jenkins/jnlp-slave"
node(label) {
stage('Build Docker and Push image') {
stage('Build Container') {
container('docker') {
checkout scm
sh "docker build -t affixxx/sidekiq-connector:latest ."
def scmVars = checkout scm
sh "docker build -t affixxx/sidekiq-connector:${scmVars.GIT_COMMIT} ."
if(env.BRANCH_NAME == "master") {
withDockerRegistry([ credentialsId: "dockerhub", url: "" ]) {
sh "docker tag affixxx/sidekiq-connector:${scmVars.GIT_COMMIT} affixxx/sidekiq-connector:latest"
sh "docker push affixxx/sidekiq-connector:latest"
}
}
Expand Down
50 changes: 31 additions & 19 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,11 @@ import (
"strings"
"time"

"github.com/affix/sidekiq-connector/types"
"github.com/jrallison/go-workers"
"github.com/openfaas-incubator/connector-sdk/types"
)

type connectorConfig struct {
gatewayURL string
upstreamTimeout time.Duration
queues []string
printResponse bool
Expand All @@ -30,10 +29,24 @@ func main() {
topicMap := types.NewTopicMap()

lookupBuilder := types.FunctionLookupBuilder{
GatewayURL: config.gatewayURL,
Client: types.MakeClient(config.upstreamTimeout),
Client: types.MakeClient(config.upstreamTimeout),
}

creds := types.GetCredentials()
controllerconfig := &types.ControllerConfig{
RebuildInterval: time.Millisecond * 1000,
GatewayURL: "http://127.0.0.1:8080",

This comment has been minimized.

Copy link
@alexellis

alexellis Jun 9, 2019

Collaborator

This needs to be configurable?

PrintResponse: true,
PrintResponseBody: true,
}

controller := types.NewController(creds, controllerconfig)

receiver := ResponseReceiver{}
controller.Subscribe(&receiver)

controller.BeginMapBuilder()

ticker := time.NewTicker(config.rebuildInterval)
go synchronizeLookups(ticker, &lookupBuilder, &topicMap)

Expand All @@ -45,7 +58,7 @@ func main() {
})

for _, queue := range config.queues {
handler := makeMessageHandler(&topicMap, config, queue)
handler := makeMessageHandler(controller, queue)
workers.Process(queue, handler, 10)
}

Expand All @@ -68,21 +81,15 @@ func synchronizeLookups(ticker *time.Ticker,
}
}

func makeMessageHandler(topicMap *types.TopicMap, config connectorConfig, queue string) func(msg *workers.Msg) {

invoker := types.Invoker{
PrintResponse: config.printResponse,
Client: types.MakeClient(config.upstreamTimeout),
GatewayURL: config.gatewayURL,
}
func makeMessageHandler(controller *types.Controller, queue string) func(msg *workers.Msg) {

mcb := func(msg *workers.Msg) {
msgJson, err := json.Marshal(msg.Args)

if err != nil {
log.Fatal(err.Error())
}
invoker.Invoke(topicMap, queue, &msgJson)
controller.Invoke(queue, &msgJson)
}
return mcb
}
Expand All @@ -106,11 +113,6 @@ func buildConnectorConfig() connectorConfig {
log.Fatal(`Provide a list of queues i.e. queues="payment_published,slack_joined"`)
}

gatewayURL := "http://gateway:8080"
if val, exists := os.LookupEnv("gateway_url"); exists {
gatewayURL = val
}

upstreamTimeout := time.Second * 30
rebuildInterval := time.Second * 3

Expand All @@ -134,11 +136,21 @@ func buildConnectorConfig() connectorConfig {
}

return connectorConfig{
gatewayURL: gatewayURL,
upstreamTimeout: upstreamTimeout,
queues: queues,
rebuildInterval: rebuildInterval,
redis_host: redis,
printResponse: printResponse,
}
}

type ResponseReceiver struct {
}

func (ResponseReceiver) Response(res types.InvokerResponse) {
if res.Error != nil {
log.Printf("tester got error: %s", res.Error.Error())
} else {
log.Printf("tester got result: [%d] %s => %s (%d) bytes", res.Status, res.Topic, res.Function, len(*res.Body))
}
}
Empty file removed vendor/.gitkeep
Empty file.

0 comments on commit eecb275

Please sign in to comment.