Skip to content
This repository has been archived by the owner on Jun 11, 2020. It is now read-only.

Commit

Permalink
Add pubsub and bucket trigger support (#29)
Browse files Browse the repository at this point in the history
  • Loading branch information
ssttevee authored and iangudger committed Jul 18, 2018
1 parent 342ddb2 commit c2205a8
Show file tree
Hide file tree
Showing 7 changed files with 301 additions and 4 deletions.
9 changes: 6 additions & 3 deletions README.md
Expand Up @@ -43,6 +43,12 @@ Use the Cygwin Terminal to run the commands as described below. Note that `make

The commands described below may be run as-is using Command Prompt, or prefixed with `./` using Windows PowerShell (i.e. `./make` or `./make godev`). Note that `make test` won't work under Windows.

### External Dependencies

The `events` sub-package depends on the following libraries:
* `google.golang.org/api/pubsub/v1`
* `google.golang.org/api/storage/v1`

## Hello, world!
A demo hello world example is included. To try it out, simply skip to the [Deployment](#deployment) section.

Expand Down Expand Up @@ -84,9 +90,6 @@ Run ```make``` to compile and package your code. Upload the generated ```functio
### Vagrant
Run ```vagrant up``` to start the envirement. Run ```vagrant ssh``` to connect to the envirement. Run ```cd /vagrant``` to access the respority files. The instructions in [Local Testing](#local-testing) and [Deployment](#deployment) should now work.

## Limitations
* This has only been tested for HTTP trigger functions. Non-HTTP trigger functions will use a different endpoint (not ```/execute```).

## Troubleshooting
Some versions of Node.js (especially those packaged for Ubuntu) name their main binary ```nodejs``` instead of ```node```. The symptom of this problem is an error about the ```node``` binary not being found in the path even though Node.js is installed. This can be fixed with ```sudo ln -s $(which nodejs) /usr/local/bin/node```. There's also a package called `nodejs-legacy` that can be installed in some Debian and Ubuntu distros using `apt` that creates a symlink `node` in `/usr/bin/`

Expand Down
163 changes: 163 additions & 0 deletions events/events.go
@@ -0,0 +1,163 @@
// Copyright 2018 Google Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package events

import (
"encoding/base64"
"encoding/json"
"net/http"
"runtime/debug"
"time"

"google.golang.org/api/pubsub/v1"
"google.golang.org/api/storage/v1"

"../nodego"
)

// JSTime is a wrapper for time.Time to decode the time from Javascript.
type JSTime struct {
time.Time
}

// UnmarshalJSON parses a JSON string to time.Time.
func (t *JSTime) UnmarshalJSON(b []byte) (err error) {
if string(b) == `"null"` {
t.Time = time.Time{}
return
}

t.Time, err = time.Parse(`"2006-01-02T15:04:05.000Z"`, string(b))
return
}

// EventContext holds the data associated with the event that triggered the
// execution of the function along with metadata of the event itself.
type EventContext struct {
EventID string `json:"eventId"`
Timestamp JSTime `json:"timestamp"`
EventType string `json:"eventType"`
Resource string `json:"resource"`
}

// Event is the basic data structure passed to functions by non-HTTP
// triggers.
type Event struct {
Context EventContext
Data json.RawMessage
}

// UnmarshalJSON parses a JSON string to time.Time.
func (e *Event) UnmarshalJSON(b []byte) error {
raws := map[string]json.RawMessage{}
if err := json.Unmarshal(b, &raws); err != nil {
return err
}

rawContext, ok := raws["context"]
if !ok {
rawContext = b
}

if err := json.Unmarshal(rawContext, &e.Context); err != nil {
return err
}

e.Data = raws["data"]
return nil
}

// PubSubMessage is a wrapper for pubsub.PubsubMessage.
type PubSubMessage struct {
pubsub.PubsubMessage

Data []byte
}

// PubSubMessage unmarshals the event data as a pub sub message.
func (e *Event) PubSubMessage() (*PubSubMessage, error) {
var msg pubsub.PubsubMessage
if err := json.Unmarshal(e.Data, &msg); err != nil {
return nil, err
}

decoded, err := base64.StdEncoding.DecodeString(msg.Data)
if err != nil {
return nil, err
}

return &PubSubMessage{
PubsubMessage: msg,
Data: decoded,
}, nil
}

// StorageObject is a wrapper for storage.Object.
type StorageObject struct {
storage.Object

// TODO consider adding pre-parsed time fields.
}

// StorageObject unmarshals the event data as a storage event.
func (e *Event) StorageObject() (*StorageObject, error) {
var obj storage.Object
if err := json.Unmarshal(e.Data, &obj); err != nil {
return nil, err
}

return &StorageObject{
Object: obj,
}, nil
}

// Handler returns http.Handler that parses the body for a function event.
func Handler(handler func(*Event) error) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
// TODO potentially extract information from the request path.
//
// PubSub and Bucket triggers have the following request path
// structures respectively:
//
// /execute/_ah/push-handlers/pubsub/projects/{PROJECT_NAME}/topics/{TOPIC_NAME}
// /execute/_ah/push-handlers/pubsub/projects/{ARBITRARY_VALUE}/topics/{ARBITRARY_VALUE}
//
// It seems that, for the time being, bucket triggers are actually just
// pubsub triggers internally.

// TODO flush logs before sending response, as in worker.js

defer func() {
if r := recover(); r != nil {
w.WriteHeader(http.StatusInternalServerError)
nodego.ErrorLogger.Printf("%s:\n\n%s\n", r, debug.Stack())
}
}()

defer r.Body.Close()

var event Event
if err := json.NewDecoder(r.Body).Decode(&event); err != nil {
nodego.ErrorLogger.Print("Failed to decode event: ", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

if err := handler(&event); err != nil {
nodego.ErrorLogger.Print(err)
http.Error(w, err.Error(), http.StatusInternalServerError)
}
}
}
40 changes: 40 additions & 0 deletions examples/bucket.go
@@ -0,0 +1,40 @@
// Copyright 2018 Google Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"flag"
"net/http"

"../events"
"../nodego"
)

func main() {
flag.Parse()

http.HandleFunc(nodego.BucketTrigger, events.Handler(func(event *events.Event) error {
obj, err := event.StorageObject()
if err != nil {
return err
}

nodego.InfoLogger.Printf("%s was last updated at %s", obj.Name, obj.Updated)

return nil
}))

nodego.TakeOver()
}
33 changes: 33 additions & 0 deletions examples/http.go
@@ -0,0 +1,33 @@
// Copyright 2017 Google Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"flag"
"fmt"
"net/http"

"../nodego"
)

func main() {
flag.Parse()

http.HandleFunc(nodego.HTTPTrigger, func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, "Hello, I'm native Go!")
})

nodego.TakeOver()
}
40 changes: 40 additions & 0 deletions examples/pubsub.go
@@ -0,0 +1,40 @@
// Copyright 2018 Google Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"flag"
"net/http"

"../events"
"../nodego"
)

func main() {
flag.Parse()

http.HandleFunc(nodego.PubSubTrigger, events.Handler(func(event *events.Event) error {
msg, err := event.PubSubMessage()
if err != nil {
return err
}

nodego.InfoLogger.Printf("Your message: %s", msg.Data)

return nil
}))

nodego.TakeOver()
}
18 changes: 17 additions & 1 deletion nodego/env.go
Expand Up @@ -23,7 +23,7 @@ import (
// Variables copied from worker.js.
var (
codeLocationDir = os.Getenv("CODE_LOCATION")
packageJsonFile = codeLocationDir + "/package.json"
packageJSONFile = codeLocationDir + "/package.json"
entryPoint = os.Getenv("ENTRY_POINT")
supervisorHostname = os.Getenv("SUPERVISOR_HOSTNAME")
supervisorInternalPort = os.Getenv("SUPERVISOR_INTERNAL_PORT")
Expand Down Expand Up @@ -55,4 +55,20 @@ func max(a, b int64) int64 {
return b
}

// HTTPTrigger is the pattern to pass to http.Handle or http.HandleFunc to
// handle HTTP requests.
const HTTPTrigger = executePrefix

// PubSubTrigger is the pattern to pass to http.Handle or http.HandleFunc to
// handle Cloud Pub/Sub messages.
//
// Currently, pub sub trigger request paths are of the form:
// /execute/_ah/push-handlers/pubsub/projects/{PROJECT}/topics/{TOPIC}
const PubSubTrigger = "/"

// BucketTrigger is the pattern to pass to http.Handle or http.HandleFunc to
// handle Cloud Storage bucket events.
//
// Currently, storage bucket trigger request paths are of the form:
// /execute/_ah/push-handlers/pubsub/projects/{ARBITRARY_VALUE}/topics/{ARBITRARY_VALUE}
const BucketTrigger = "/"
2 changes: 2 additions & 0 deletions provision.sh
Expand Up @@ -29,3 +29,5 @@ tar -xvzf godeb-amd64.tar.gz
./godeb install
cd
rm -rf /tmp/go

go get -d -u google.golang.org/api/pubsub/v1 google.golang.org/api/storage/v1

0 comments on commit c2205a8

Please sign in to comment.