Skip to content

Commit

Permalink
implement workers
Browse files Browse the repository at this point in the history
  • Loading branch information
greenpau committed Jan 4, 2024
1 parent 4bbd2ba commit 6c47a74
Show file tree
Hide file tree
Showing 10 changed files with 427 additions and 79 deletions.
4 changes: 3 additions & 1 deletion Makefile
Expand Up @@ -23,6 +23,7 @@ all:
--with github.com/greenpau/caddy-lambda@$(LATEST_GIT_COMMIT)=$(BUILD_DIR)
@bin/caddy fmt --overwrite assets/conf/api/Caddyfile
@#bin/caddy validate --config assets/conf/api/Caddyfile
@#bin/caddy run --config assets/conf/api/Caddyfile

.PHONY: linter
linter:
Expand Down Expand Up @@ -63,7 +64,8 @@ clean:
.PHONY: qtest
qtest: covdir
@echo "Perform quick tests ..."
@go test $(VERBOSE) -coverprofile=.coverage/coverage.out -run TestParseCaddyfile ./*.go
@#go test $(VERBOSE) -coverprofile=.coverage/coverage.out -run TestParseCaddyfile ./*.go
@go test $(VERBOSE) -coverprofile=.coverage/coverage.out -run TestFunctionExecutor ./*.go

.PHONY: dep
dep:
Expand Down
22 changes: 21 additions & 1 deletion README.md
Expand Up @@ -22,6 +22,8 @@ plugin, i.e. the plugin writes response headers and body.

## Getting Started

The `Caddyfile` config follows:

```
localhost {
route /api/* {
Expand All @@ -37,4 +39,22 @@ localhost {
respond "OK"
}
}
```
```

The `assets/scripts/api/hello_world/app/index.py` follows:

```py
import json

def handler(event: dict) -> dict:
print(f"event: {event}")
response = {
"body": json.dumps({"message": "hello world!"}),
"status_code": 200,
}
return response
```

The `response` dictionary is mandatory for a handler. he `status_code` and `body` are
mandatory fields of the `response`. The plugin writes `status_code` and `body` back to
the requestor.
6 changes: 4 additions & 2 deletions assets/conf/api/Caddyfile
@@ -1,14 +1,16 @@
{
debug
http_port 9080
https_port 9443
}

localhost:9080 {
localhost {
route /api/* {
lambda {
name hello_world
runtime python
python_executable {$HOME}/dev/go/src/github.com/greenpau/caddy-lambda/venv/bin/python
# python_executable {$HOME}/dev/go/src/github.com/greenpau/caddy-lambda/venv/bin/python
python_executable python
entrypoint assets/scripts/api/hello_world/app/index.py
function handler
}
Expand Down
7 changes: 4 additions & 3 deletions assets/scripts/api/hello_world/app/index.py
Expand Up @@ -14,9 +14,10 @@

import json

def handler(event: dict):
def handler(event: dict) -> dict:
print(f"event: {event}")
response = {
"body": "hello world!",
"body": json.dumps({"message": "hello world!", "event": event}),
"status_code": 200,
}
print(json.dumps(response))
return response
41 changes: 39 additions & 2 deletions config.go
Expand Up @@ -15,14 +15,20 @@
package lambda

import (
"strconv"

"github.com/caddyserver/caddy/v2/caddyconfig/caddyfile"
"github.com/caddyserver/caddy/v2/caddyconfig/httpcaddyfile"
"github.com/caddyserver/caddy/v2/modules/caddyhttp"
"go.uber.org/zap"
)

const (
pluginName = "lambda"
)

func init() {
httpcaddyfile.RegisterHandlerDirective("lambda", parseCaddyfile)
httpcaddyfile.RegisterHandlerDirective(pluginName, parseCaddyfile)
}

// parseCaddyfile sets up a handler for function execution
Expand All @@ -40,6 +46,22 @@ func ensureArgsCount(d *caddyfile.Dispenser, args []string, count int) error {
return nil
}

func ensureArgUint(d *caddyfile.Dispenser, name, arg string) (uint, error) {
n, err := strconv.Atoi(arg)
if err != nil {
return 0, d.Errf("failed to convert %s %s: %v", name, arg, err)
}
ns := strconv.Itoa(n)
if ns != arg {
return 0, d.Errf("failed to convert %s %s, resolved %s", name, arg, ns)
}
if n < 0 {
return 0, d.Errf("%s %s must be greater or equal to zero", name, arg)
}

return uint(n), nil
}

// UnmarshalCaddyfile sets up the handler from Caddyfile tokens. Syntax:
//
// lambda [<matcher>] {
Expand Down Expand Up @@ -92,8 +114,19 @@ func (fex *FunctionExecutor) UnmarshalCaddyfile(d *caddyfile.Dispenser) error {
return err
}
fex.EntrypointHandler = args[0]
case "workers":
args = d.RemainingArgs()
err := ensureArgsCount(d, args, 1)
if err != nil {
return err
}
count, err := ensureArgUint(d, "workers", args[0])
if err != nil {
return err
}
fex.MaxWorkersCount = count
default:
return d.Errf("unrecognized subdirective %q", d.Val())
return d.Errf("unsupported %s directive %q", pluginName, d.Val())
}
}
}
Expand All @@ -112,13 +145,17 @@ func (fex *FunctionExecutor) UnmarshalCaddyfile(d *caddyfile.Dispenser) error {
if fex.PythonExecutable == "" {
fex.PythonExecutable = "python"
}
if fex.MaxWorkersCount == 0 {
fex.MaxWorkersCount = 1
}
fex.logger.Debug(
"configured lambda function",
zap.String("name", fex.Name),
zap.String("runtime", fex.Runtime),
zap.String("python_executable", fex.PythonExecutable),
zap.String("entrypoint", fex.EntrypointPath),
zap.String("function", fex.EntrypointHandler),
zap.Uint("workers", fex.MaxWorkersCount),
)
default:
return d.Errf("lambda runtime is not set")
Expand Down
1 change: 1 addition & 0 deletions config_test.go
Expand Up @@ -38,6 +38,7 @@ func TestParseCaddyfile(t *testing.T) {
python_executable {$HOME}/dev/go/src/github.com/greenpau/caddy-lambda/venv/bin/python
entrypoint assets/scripts/api/hello_world/app/index.py
function handler
workers 1
}`),
want: `{
"foo": "bar"
Expand Down
56 changes: 42 additions & 14 deletions executor.go
Expand Up @@ -16,6 +16,7 @@ package lambda

import (
"net/http"
"time"

"github.com/caddyserver/caddy/v2/modules/caddyhttp"
"github.com/google/uuid"
Expand Down Expand Up @@ -70,22 +71,49 @@ func (fex *FunctionExecutor) invoke(resp http.ResponseWriter, req *http.Request)
"invoked lambda function",
zap.String("lambda_name", fex.Name),
zap.String("request_id", requestID),
zap.String("method", req.Method),
zap.String("proto", req.Proto),
zap.String("host", req.Host),
zap.String("uri", req.RequestURI),
zap.String("remote_addr_port", req.RemoteAddr),
zap.Int64("content_length", req.ContentLength),
zap.Int("cookie_count", len(cookies)),
zap.String("user_agent", req.UserAgent()),
zap.String("referer", req.Referer()),
zap.Any("cookies", cookies),
zap.Any("query_params", queryParams),
zap.Any("headers", reqHeaders),
)

resp.WriteHeader(200)
resp.Write([]byte(`FOO`))
data := make(map[string]interface{})
data["request_id"] = requestID
data["method"] = req.Method
data["path"] = req.URL.Path
data["proto"] = req.Proto
data["host"] = req.Host
data["request_uri"] = req.RequestURI
data["remote_addr_port"] = req.RemoteAddr
data["cookies"] = cookies
data["headers"] = reqHeaders
data["query_params"] = queryParams

statusCode, body, err := fex.execWorker(data)
if err != nil {
resp.WriteHeader(http.StatusInternalServerError)
resp.Write([]byte(http.StatusText(http.StatusInternalServerError)))
return nil
}

resp.WriteHeader(statusCode)
resp.Write(body)
return nil
}

func (fex *FunctionExecutor) execWorker(data map[string]interface{}) (int, []byte, error) {
availableWorkers := 0
for {
for _, w := range fex.workers {
if w.Terminated {
continue
}
if w.InUse {
availableWorkers++
continue
}
return w.handle(fex.entrypointImport, fex.EntrypointHandler, data)
}
if availableWorkers < 1 {
break
}
time.Sleep(100 * time.Millisecond)
}
return http.StatusServiceUnavailable, []byte(http.StatusText(http.StatusServiceUnavailable)), nil
}

0 comments on commit 6c47a74

Please sign in to comment.