Skip to content

Commit

Permalink
Add experimental environment: tensorflow-serving (#1212)
Browse files Browse the repository at this point in the history
  • Loading branch information
life1347 committed Jul 2, 2019
1 parent 2ba66af commit b229d6e
Show file tree
Hide file tree
Showing 13 changed files with 408 additions and 30 deletions.
23 changes: 23 additions & 0 deletions environments/tensorflow-serving/Dockerfile
@@ -0,0 +1,23 @@
ARG GO_VERSION=1.9.2

FROM tensorflow/serving as serving
RUN apt update && apt install -y ca-certificates && rm -rf /var/lib/apt/lists/*

FROM golang:${GO_VERSION} AS builder

ENV GOPATH /usr
ENV APP ${GOPATH}/src/github.com/fission/fission/environments/tensorflow-serving

WORKDIR ${APP}

ADD server.go ${APP}

RUN go get
RUN go build -a -o /server server.go

FROM serving
WORKDIR /
COPY --from=builder /server /

ENTRYPOINT ["/server"]
EXPOSE 8888
29 changes: 29 additions & 0 deletions environments/tensorflow-serving/README.md
@@ -0,0 +1,29 @@
# Fission: Tensorflow Serving Environment

This is the Tensorflow Serving environment for Fission.

It's a Docker image containing a Go runtime, along with a tensorflow serving service.

## How it works

Tensorflow Serving is an serving service that supports both RESTful API and gRPC endpoints. In current implementation,
Go server launches `tensorflow_model_server` to load in model during specialization. As long as the Go server receives
requests from router it creates a reverse proxy that connects to RESTful API endpoint exposed by tensorflow_model_server
and get response from the upstream server for user.

## Build this image

```
docker build -t USER/tensorflow-serving . && docker push USER/tensorflow-serving
```

## Using the image in fission

You can add this customized image to fission with "fission env create":

```
fission env create --name tensorflow --image USER/tensorflow-serving --version 2
```

After this, fission functions that have the env parameter set to the
same environment name as this command will use this environment.
205 changes: 205 additions & 0 deletions environments/tensorflow-serving/server.go
@@ -0,0 +1,205 @@
package main

import (
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net"
"net/http"
"net/http/httputil"
"os"
"os/exec"
"path/filepath"
"strings"
"time"

"github.com/pkg/errors"
"go.uber.org/zap"
)

const (
PortgRPC = 8500
PortRestAPI = 8501
)

var (
specialized = false

// for tensorflow serving to use
MODEL_NAME = ""
)

type (
FunctionLoadRequest struct {
// FilePath is an absolute filesystem path to the
// function. What exactly is stored here is
// env-specific. Optional.
FilePath string `json:"filepath"`

// FunctionName has an environment-specific meaning;
// usually, it defines a function within a module
// containing multiple functions. Optional; default is
// environment-specific.
FunctionName string `json:"functionName"`

// URL to expose this function at. Optional; defaults
// to "/".
URL string `json:"url"`
}
)

func specializeHandler(logger *zap.Logger) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
logger.Error("v1 interface is not implemented")
w.WriteHeader(http.StatusNotImplemented)
}
}

func specializeHandlerV2(logger *zap.Logger) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
if specialized {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte("Not a generic container"))
return
}

body, err := ioutil.ReadAll(r.Body)
if err != nil {
logger.Error("error reading request body", zap.Error(err))
w.WriteHeader(http.StatusInternalServerError)
return
}
var loadreq FunctionLoadRequest
err = json.Unmarshal(body, &loadreq)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}

// To ensure we load model from the expected path
basePath := fmt.Sprintf("%v/%v", loadreq.FilePath, loadreq.FunctionName)
basePath, err = filepath.Abs(basePath)
if err != nil {
msg := "error getting absolute path of model"
logger.Error(msg, zap.Error(err))
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
return
} else if !strings.HasPrefix(basePath, loadreq.FilePath) {
msg := "incorrect model base path"
logger.Error(msg, zap.String("model_base_path", basePath))
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(msg))
return
}

_, err = os.Stat(basePath)
if err != nil {
msg := "error checking model status"
logger.Error(msg, zap.Error(err))
w.WriteHeader(http.StatusInternalServerError)
return
}

// get directory name that holds model
MODEL_NAME = filepath.Base(basePath)

argModelBasePath := fmt.Sprintf("--model_base_path=%v", basePath)
argModelName := fmt.Sprintf("--model_name=%v", MODEL_NAME)
argPortgRPC := fmt.Sprintf("--port=%v", PortgRPC)
argPortREST := fmt.Sprintf("--rest_api_port=%v", PortRestAPI)

logger.Info(fmt.Sprintf("specializing: %v %v", loadreq.FunctionName, loadreq.FilePath))

// Future: could be improved by keeping subprocess open while environment is specialized
cmd := exec.Command("tensorflow_model_server",
argPortgRPC, argPortREST, argModelName, argModelBasePath)

cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr

err = cmd.Start()
if err != nil {
msg := "error starting tensorflow serving"
logger.Error(msg, zap.Error(err))
err = errors.Wrap(err, msg)
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
return
}

go func() {
err = cmd.Wait()
if err != nil {
logger.Fatal("error running tensorflow serving", zap.Error(err))
}
}()

t := time.Now()
retryInterval := 50 * time.Millisecond

// tensorflow serving takes some time to load model
// into memory, keep retrying until it starts REST api server.
for {
if time.Since(t) > 30*time.Second {
w.WriteHeader(http.StatusGatewayTimeout)
return
}
conn, err := net.Dial("tcp", "localhost:8501")
if err == nil {
conn.Close()
break
} else {
logger.Info(fmt.Sprintf("waiting for tensorflow serving to be ready: %v", err.Error()))
time.Sleep(retryInterval)
retryInterval = retryInterval * 2
}
}

specialized = true

logger.Info("done")
}
}

func readinessProbeHandler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}

func main() {
logger, err := zap.NewProduction()
if err != nil {
log.Fatalf("can't initialize zap logger: %v", err)
}
defer logger.Sync()

http.HandleFunc("/healthz", readinessProbeHandler)
http.HandleFunc("/specialize", specializeHandler(logger.Named("specialize_handler")))
http.HandleFunc("/v2/specialize", specializeHandlerV2(logger.Named("specialize_v2_handler")))

// Generic route -- all http requests go to the user function.
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
if !specialized {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte("Generic container: no requests supported"))
return
}

// TODO: replace it with gRPC (https://gist.github.com/mauri870/1f953a183ee6c186e70a0a72e78b088c)
// set up proxy server director
director := func(req *http.Request) {
req.URL.Scheme = "http"
req.URL.Host = "localhost:8501"
req.URL.Path = fmt.Sprintf("/v1/models/%v:predict", MODEL_NAME)
}

proxy := &httputil.ReverseProxy{
Director: director,
}
proxy.ServeHTTP(w, r)
})

logger.Info("listening on 8888 ...")
http.ListenAndServe(":8888", nil)
}
23 changes: 23 additions & 0 deletions examples/tensorflow-serving/README.md
@@ -0,0 +1,23 @@
# Tensorflow Serving Environment Example

## Create Environment

```bash
$ fission env create --name tensorflow --image fission/tensorflow-serving --version 2
```

## Create Package

```bash
$ zip -r half_plus_two.zip ./half_plus_two
$ fission pkg create --env tensorflow --deploy half_plus_two.zip
```

## Create Function

Here, the `--entrypoint` represents the name of top directory contains trained model.

```bash
$ fission fn create --name t1 --pkg <pkg name> --env tensorflow --entrypoint "half_plus_two"
$ fission fn test --name t1 --body '{"instances": [1.0, 2.0, 0.0]}' --method POST
```
Binary file not shown.
Binary file not shown.
Binary file not shown.
27 changes: 14 additions & 13 deletions hack/release-build.sh
Expand Up @@ -131,19 +131,20 @@ build_all_envs() {
local version=$1

# call with version, env dir, image name base, image name variant
build_env_image "$version" "nodejs" "node-env" ""
build_env_image "$version" "nodejs" "node-env" "debian"
build_env_image "$version" "binary" "binary-env" ""
build_env_image "$version" "dotnet" "dotnet-env" ""
build_env_image "$version" "dotnet20" "dotnet20-env" ""
build_env_image "$version" "go" "go-env" ""
build_env_image "$version" "go" "go-env" "1.11.4"
build_env_image "$version" "perl" "perl-env" ""
build_env_image "$version" "php7" "php-env" ""
build_env_image "$version" "python" "python-env" ""
build_env_image "$version" "python" "python-env" "2.7"
build_env_image "$version" "ruby" "ruby-env" ""
build_env_image "$version" "jvm" "jvm-env" ""
build_env_image "$version" "nodejs" "node-env" ""
build_env_image "$version" "nodejs" "node-env" "debian"
build_env_image "$version" "binary" "binary-env" ""
build_env_image "$version" "dotnet" "dotnet-env" ""
build_env_image "$version" "dotnet20" "dotnet20-env" ""
build_env_image "$version" "go" "go-env" ""
build_env_image "$version" "go" "go-env" "1.11.4"
build_env_image "$version" "perl" "perl-env" ""
build_env_image "$version" "php7" "php-env" ""
build_env_image "$version" "python" "python-env" ""
build_env_image "$version" "python" "python-env" "2.7"
build_env_image "$version" "ruby" "ruby-env" ""
build_env_image "$version" "jvm" "jvm-env" ""
build_env_image "$version" "tensorflow-serving" "tensorflow-serving-env" ""
}

build_env_builder_image() {
Expand Down
27 changes: 14 additions & 13 deletions hack/release.sh
Expand Up @@ -71,19 +71,20 @@ push_all_envs() {
local version=$1

# call with version, env dir, image name base, image name variant
push_env_image "$version" "nodejs" "node-env" ""
push_env_image "$version" "nodejs" "node-env" "debian"
push_env_image "$version" "binary" "binary-env" ""
push_env_image "$version" "dotnet" "dotnet-env" ""
push_env_image "$version" "dotnet20" "dotnet20-env" ""
push_env_image "$version" "go" "go-env" ""
push_env_image "$version" "go" "go-env" "1.11.4"
push_env_image "$version" "perl" "perl-env" ""
push_env_image "$version" "php7" "php-env" ""
push_env_image "$version" "python" "python-env" ""
push_env_image "$version" "python" "python-env" "2.7"
push_env_image "$version" "ruby" "ruby-env" ""
push_env_image "$version" "jvm" "jvm-env" ""
push_env_image "$version" "nodejs" "node-env" ""
push_env_image "$version" "nodejs" "node-env" "debian"
push_env_image "$version" "binary" "binary-env" ""
push_env_image "$version" "dotnet" "dotnet-env" ""
push_env_image "$version" "dotnet20" "dotnet20-env" ""
push_env_image "$version" "go" "go-env" ""
push_env_image "$version" "go" "go-env" "1.11.4"
push_env_image "$version" "perl" "perl-env" ""
push_env_image "$version" "php7" "php-env" ""
push_env_image "$version" "python" "python-env" ""
push_env_image "$version" "python" "python-env" "2.7"
push_env_image "$version" "ruby" "ruby-env" ""
push_env_image "$version" "jvm" "jvm-env" ""
push_env_image "$version" "tensorflow-serving" "tensorflow-serving-env" ""
}

push_env_builder_image() {
Expand Down
1 change: 1 addition & 0 deletions test/build_and_test.sh
Expand Up @@ -40,6 +40,7 @@ build_and_push_builder $BUILDER_IMAGE:$TAG
build_and_push_env_runtime python $REPO/python-env:$TAG
build_and_push_env_runtime jvm $REPO/jvm-env:$TAG
build_and_push_env_runtime go $REPO/go-env:$TAG
build_and_push_env_runtime tensorflow-serving $REPO/tensorflow-serving-env:$TAG

build_and_push_env_builder python $REPO/python-env-builder:$TAG $BUILDER_IMAGE:$TAG
build_and_push_env_builder jvm $REPO/jvm-env-builder:$TAG $BUILDER_IMAGE:$TAG
Expand Down

0 comments on commit b229d6e

Please sign in to comment.