Skip to content

Commit

Permalink
added kafka-websocket example
Browse files Browse the repository at this point in the history
  • Loading branch information
jprobinson committed Dec 19, 2015
1 parent 1a80062 commit 6f15bbc
Show file tree
Hide file tree
Showing 7 changed files with 404 additions and 0 deletions.
11 changes: 11 additions & 0 deletions examples/pubsub/api-kafka-websocket-pubsub/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# `api-kafka-websocket-pubsub`
* This is an example based on a prototype from an NYTimes hack week. It mixes `gizmo/server.SimpleServer`, `gizmo/server.SimpleService`, `gizmo/pubsub.KafkaPublisher`, `gizmo/pubsub.KafkaSubscriber` and `gorilla/websocket` and was used to test out realtime, collaborative crossword games.
* The server offers 3 endpoints to allow users to:
1. Create a new topic on Kafka (visit http://localhost:8080/svc/v1/create to get a 'stream ID')
2. Upgrade a request to a websocket connection and expose the topic over it
3. Serve an HTML page that demos the service.(visit http://localhost:8080/svc/v1/demo/{stream_id from 'create'})

### This demo requires Kafka and Zookeeper to be installed and running localally by default.
* To install and run on OS X, run: `brew install kafka` and then `zookeeper-server-start.sh /usr/local/etc/kafka/zookeeper.properties` to run Zookeeper and `kafka-server-start.sh /usr/local/etc/kafka/server.properties` to start a Kafka broker.

### The config in this example is loaded via a local JSON file and the default `gizmo/config.Config` struct.
11 changes: 11 additions & 0 deletions examples/pubsub/api-kafka-websocket-pubsub/config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"Server": {
"Log":"",
"HTTPPort":8080
},
"Kafka":{
"BrokerHosts":["localhost:9092"],
"Partition":0,
"MaxRetry":1
}
}
33 changes: 33 additions & 0 deletions examples/pubsub/api-kafka-websocket-pubsub/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package main

import (
"flag"

"github.com/nytimes/gizmo/config"
"github.com/nytimes/gizmo/pubsub"
"github.com/nytimes/gizmo/server"

"github.com/nytimes/gizmo/examples/pubsub/api-kafka-websocket-pubsub/service"
)

func main() {
cfg := config.NewConfig("./config.json")

// set the pubsub's Log to be the same as server's
pubsub.Log = server.Log

// in case we want to override the port or log location via CLI
flag.Parse()
config.SetServerOverrides(cfg.Server)

server.Init("gamestream-example", cfg.Server)

err := server.Register(service.NewStreamService(cfg.Server.HTTPPort, cfg.Kafka))
if err != nil {
server.Log.Fatal(err)
}

if err = server.Run(); err != nil {
server.Log.Fatal(err)
}
}
56 changes: 56 additions & 0 deletions examples/pubsub/api-kafka-websocket-pubsub/service/create.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package service

import (
"fmt"
"net/http"
"os"
"os/exec"
"time"

"github.com/nytimes/gizmo/server"
)

// CreateStream is a JSON endpoint for creating a new topic in Kafka.
func (s *StreamService) CreateStream(r *http.Request) (int, interface{}, error) {
id := time.Now().Unix()
topic := topicName(id)
err := createTopic(topic)
if err != nil {
return http.StatusInternalServerError, nil, jsonErr{err}
}

server.LogWithFields(r).WithField("topic", topic).Info("created new topic")

return http.StatusOK, struct {
Status string `json:"status"`
StreamID int64 `json:"stream_id"`
}{"success!", id}, nil
}

func topicName(id int64) string {
return fmt.Sprintf("stream-%d", id)
}

func createTopic(name string) error {
cmd := exec.Command("kafka-topics.sh",
"--create",
"--zookeeper",
"localhost:2181",
"--replication-factor",
"1",
"--partition",
"1",
"--topic",
name)
cmd.Stderr = os.Stderr
cmd.Stdout = os.Stdout
return cmd.Run()
}

type jsonErr struct {
Err error `json:"error"`
}

func (e jsonErr) Error() string {
return e.Err.Error()
}
80 changes: 80 additions & 0 deletions examples/pubsub/api-kafka-websocket-pubsub/service/demo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package service

import (
"html/template"
"net/http"

"github.com/nytimes/gizmo/server"
"github.com/nytimes/gizmo/web"
)

// Demo will serve an HTML page that demonstrates how to use the 'stream'
// endpoint.
func (s *StreamService) Demo(w http.ResponseWriter, r *http.Request) {
vals := struct {
Port int
StreamID int64
}{
s.port,
web.GetInt64Var(r, "stream_id"),
}

w.Header().Set("Content-Type", "text/html; charset=utf-8")
err := demoTempl.Execute(w, &vals)
if err != nil {
server.Log.Error("template error ", err)
http.Error(w, "problems loading HTML", http.StatusInternalServerError)
}
}

var demoTempl = template.Must(template.New("demo").Parse(demoHTML))

const demoHTML = `<!DOCTYPE html>
<html lang="en">
<head>
<title>StreamService Demo</title>
</head>
<body>
<h1>Welcome to the stream for {{ .StreamID }}!</h1>
<p>Open multiple tabs to see messages broadcast across all views</p>
<div id="consumed" style="float:left; width:50%">
</div>
<div id="published" style="float:left">
</div>
<script src="https://ajax.googleapis.com/ajax/libs/jquery/2.1.3/jquery.min.js"></script>
<script type="text/javascript">
(function()
{
var conn = new WebSocket(
"ws://localhost:{{ .Port }}/svc/v1/stream/{{ .StreamID }}"
);
// consume from websocket/Kafka
conn.onmessage = function(evt)
{
var evts = $("#consumed");
evts.prepend("<p> Received: " + evt.data + "</p>");
}
// publish to websocket/Kafka
setInterval(publishMessage(conn), 1000);
function publishMessage(conn) {
return function() {
var alpha = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
var msg = '{"game":"crossword","user_id":12345,"time":'+new Date().getTime()+',"cell":' + Math.floor(
(Math.random() * 10) + 1) +
',"value":"' + alpha.charAt(Math.floor(
Math.random() * alpha.length)) + '"}'
conn.send(msg);
var evts = $("#published");
evts.prepend("<p> Sent: " + msg + "</p>");
}
}
})();
</script>
</body>
</html>`
50 changes: 50 additions & 0 deletions examples/pubsub/api-kafka-websocket-pubsub/service/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package service

import (
"net/http"

"github.com/nytimes/gizmo/config"
"github.com/nytimes/gizmo/server"
)

// StreamService offers three endpoints: one to create a new topic in
// Kafka, a second to expose the topic over a websocket and a third
// to host a web page that provides a demo.
type StreamService struct {
port int
cfg *config.Kafka
}

// NewStreamService will return a new stream service instance.
// If the given config is empty, it will default to localhost.
func NewStreamService(port int, cfg *config.Kafka) *StreamService {
if cfg == nil {
cfg = &config.Kafka{BrokerHosts: []string{"localhost:9092"}}
}
return &StreamService{port, cfg}
}

// Prefix is the string prefixed to all endpoint routes.
func (s *StreamService) Prefix() string {
return "/svc/v1"
}

// Middleware in this service will do nothing.
func (s *StreamService) Middleware(h http.Handler) http.Handler {
return server.NoCacheHandler(h)
}

// Endpoints returns the two endpoints for our stream service.
func (s *StreamService) Endpoints() map[string]map[string]http.HandlerFunc {
return map[string]map[string]http.HandlerFunc{
"/create": map[string]http.HandlerFunc{
"GET": server.JSONToHTTP(s.CreateStream).ServeHTTP,
},
"/stream/{stream_id:[0-9]+}": map[string]http.HandlerFunc{
"GET": s.Stream,
},
"/demo/{stream_id:[0-9]+}": map[string]http.HandlerFunc{
"GET": s.Demo,
},
}
}
Loading

0 comments on commit 6f15bbc

Please sign in to comment.