Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

updated implementation to comply with S3 Read/Write API endpoint spec. #14

Merged
merged 11 commits into from
Apr 3, 2017
7 changes: 6 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@ or update:

## Running


### With read from kafka enabled
`$GOPATH/bin/generic-rw-s3 --port=8080 --bucketName="bucketName" --bucketPrefix="bucketPrefix" --awsRegion="eu-west-1" --source-addresses="<proyx_address>" --source-group="<consumer_group>" --source-topic="<topic_to_read>" --source-queue="kafka"`

### With specified resource path
`$GOPATH/bin/generic-rw-s3 --port=8080 --resourcePath="concepts" --bucketName="bucketName" --bucketPrefix="bucketPrefix" --awsRegion="eu-west-1" --`


```
export|set PORT=8080
export|set BUCKET_NAME='bucketName"
Expand Down Expand Up @@ -49,6 +53,7 @@ export|set SRC_CONCURRENT_PROCESSING=true # Whether the consumer uses concurrent
```

## Endpoints
For complete API specification see [S3 Read/Write API Endpoint](https://docs.google.com/document/d/1Ck-o0Le9cXOfm-aVjiGmOT7ZTB5W5fDTsPqGkhzfa-U/edit#)

### PUT /UUID

Expand Down
26 changes: 19 additions & 7 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"net/http"
"os"
"time"
"fmt"
)

const (
Expand All @@ -30,6 +31,13 @@ func main() {
EnvVar: "APP_PORT",
})

resourcePath := app.String(cli.StringOpt{
Name: "resourcePath",
Value: "",
Desc: "Request path parameter to identify a resource, e.g. /concepts",
EnvVar: "RESOURCE_PATH",
})

awsRegion := app.String(cli.StringOpt{
Name: "awsRegion",
Value: "eu-west-1",
Expand Down Expand Up @@ -123,16 +131,18 @@ func main() {
Queue: *sourceQueue,
ConcurrentProcessing: *sourceConcurrentProcessing,
}

baseftrwapp.OutputMetricsIfRequired(*graphiteTCPAddress, *graphitePrefix, *logMetrics)
runServer(*port, *awsRegion, *bucketName, *bucketPrefix, *wrkSize, qConf)
if *resourcePath != "" {
*resourcePath = fmt.Sprintf( "/%s", *resourcePath)
}
runServer(*port, *resourcePath, *awsRegion, *bucketName, *bucketPrefix, *wrkSize, qConf)
}
log.SetLevel(log.InfoLevel)
log.Infof("Application started with args %s", os.Args)
app.Run(os.Args)
}

func runServer(port string, awsRegion string, bucketName string, bucketPrefix string, wrks int, qConf consumer.QueueConfig) {
func runServer(port string, resourcePath string, awsRegion string, bucketName string, bucketPrefix string, wrks int, qConf consumer.QueueConfig) {
hc := http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
Expand Down Expand Up @@ -165,17 +175,19 @@ func runServer(port string, awsRegion string, bucketName string, bucketPrefix st
rh := service.NewReaderHandler(r)

servicesRouter := mux.NewRouter()
service.Handlers(servicesRouter, wh, rh)
service.Handlers(servicesRouter, wh, rh, resourcePath)
service.AddAdminHandlers(servicesRouter, svc, bucketName, w, r)

qp := service.NewQProcessor(w)

log.Infof("listening on %v", port)

c := consumer.NewConsumer(qConf, qp.ProcessMsg, hc)
if qConf.Topic != "" {
c := consumer.NewConsumer(qConf, qp.ProcessMsg, hc)
go c.Start()
defer c.Stop()
}

go c.Start()
defer c.Stop()
if err := http.ListenAndServe(":"+port, nil); err != nil {
log.Fatalf("Unable to start server: %v", err)
}
Expand Down
12 changes: 6 additions & 6 deletions service/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/rcrowley/go-metrics"
"net/http"
"time"
"fmt"
)

func AddAdminHandlers(servicesRouter *mux.Router, svc s3iface.S3API, bucketName string, writer Writer, reader Reader) {
Expand Down Expand Up @@ -83,7 +84,7 @@ func (c *checker) gtgCheckHandler(rw http.ResponseWriter, r *http.Request) {
rw.WriteHeader(http.StatusOK)
}

func Handlers(servicesRouter *mux.Router, wh WriterHandler, rh ReaderHandler) {
func Handlers(servicesRouter *mux.Router, wh WriterHandler, rh ReaderHandler, resourcePath string) {
mh := handlers.MethodHandler{
"PUT": http.HandlerFunc(wh.HandleWrite),
"GET": http.HandlerFunc(rh.HandleGet),
Expand All @@ -101,9 +102,8 @@ func Handlers(servicesRouter *mux.Router, wh WriterHandler, rh ReaderHandler) {
ah := handlers.MethodHandler{
"GET": http.HandlerFunc(rh.HandleGetAll),
}

servicesRouter.Handle("/{uuid:[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}}", mh)
servicesRouter.Handle("/__count", ch)
servicesRouter.Handle("/__ids", ih)
servicesRouter.Handle("/", ah)
servicesRouter.Handle(fmt.Sprintf("%s%s", resourcePath, "/{uuid:[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}}"), mh)
servicesRouter.Handle(fmt.Sprintf("%s%s", resourcePath, "/__count"), ch)
servicesRouter.Handle(fmt.Sprintf("%s%s", resourcePath, "/__ids"), ih)
servicesRouter.Handle(fmt.Sprintf("%s%s", resourcePath, "/"), ah)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing test for /__count, /__ids and /.

}
100 changes: 71 additions & 29 deletions service/handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (

const (
ExpectedContentType = "application/json"
EmptyResourcePath = ""
NonEmptyResourcePath = "/notempty"
)

func TestAddAdminHandlers(t *testing.T) {
Expand Down Expand Up @@ -86,11 +88,47 @@ func TestAddAdminHandlers(t *testing.T) {
})
}

func TestMethodHandlerAcceptsNonEmptyResourcePath(t *testing.T) {
r := mux.NewRouter()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having re-read these tests, I think that they're testing the same as the other tests unnecessarily. I think going down the named routes route would actually be better to test the setting of the resource path.
r.Get("mh").URL("uuid", <uuid>) == /notempty/<uuid>

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't mind changing but I am worried whether there will be a quorum? Other reviewers may think otherwise...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could take out the additional tests and see if coverage goes down if you aren't convinced. Also Pete just told me that you can see inline the coverage if you use Atom as an editor - I am going to check it out now :)

mw := &mockWriter{}
mr := &mockReader{}
Handlers(r, NewWriterHandler(mw, mr), ReaderHandler{}, NonEmptyResourcePath)
rec := httptest.NewRecorder()
r.ServeHTTP(rec, newRequest("PUT", withNonEmptyResourcePath("/22f53313-85c6-46b2-94e7-cfde9322f26c"), "PAYLOAD"))
assert.Equal(t, 201, rec.Code)
}

func TestCountHandlerAcceptsNonEmptyResourcePath(t *testing.T) {
r := mux.NewRouter()
mr := &mockReader{}
Handlers(r, WriterHandler{}, NewReaderHandler(mr), NonEmptyResourcePath)
rec := httptest.NewRecorder()
r.ServeHTTP(rec, newRequest("GET", withNonEmptyResourcePath("/__count"), ""))
assert.Equal(t, 200, rec.Code)
}

func TestIdsHandlerAcceptsNonEmptyResourcePath(t *testing.T) {
r := mux.NewRouter()
mr := &mockReader{}
Handlers(r, WriterHandler{}, NewReaderHandler(mr), NonEmptyResourcePath)
rec := httptest.NewRecorder()
r.ServeHTTP(rec, newRequest("GET", withNonEmptyResourcePath("/__ids"), ""))
assert.Equal(t, 200, rec.Code)
}
func TestGetAllHandlerAcceptsNonEmptyResourcePath(t *testing.T) {
r := mux.NewRouter()
mr := &mockReader{}
Handlers(r, WriterHandler{}, NewReaderHandler(mr), NonEmptyResourcePath)
rec := httptest.NewRecorder()
r.ServeHTTP(rec, newRequest("GET", withNonEmptyResourcePath("/"), ""))
assert.Equal(t, 200, rec.Code)
}

func TestWriteHandlerNewContentReturnsCreated(t *testing.T) {
r := mux.NewRouter()
mw := &mockWriter{}
mr := &mockReader{}
Handlers(r, NewWriterHandler(mw, mr), ReaderHandler{})
Handlers(r, NewWriterHandler(mw, mr), ReaderHandler{}, EmptyResourcePath)

rec := httptest.NewRecorder()
r.ServeHTTP(rec, newRequest("PUT", "/22f53313-85c6-46b2-94e7-cfde9322f26c", "PAYLOAD"))
Expand All @@ -102,11 +140,15 @@ func TestWriteHandlerNewContentReturnsCreated(t *testing.T) {
assert.Equal(t, "{\"message\":\"CREATED\"}", rec.Body.String())
}

func withNonEmptyResourcePath(endpoint string) string {
return NonEmptyResourcePath + endpoint
}

func TestWriteHandlerUpdateContentReturnsOK(t *testing.T) {
r := mux.NewRouter()
mw := &mockWriter{}
mr := &mockReader{found: true}
Handlers(r, NewWriterHandler(mw, mr), ReaderHandler{})
Handlers(r, NewWriterHandler(mw, mr), ReaderHandler{}, EmptyResourcePath)

rec := httptest.NewRecorder()
r.ServeHTTP(rec, newRequest("PUT", "/89d15f70-640d-11e4-9803-0800200c9a66", "PAYLOAD"))
Expand All @@ -123,7 +165,7 @@ func TestWriterHandlerFailReadingBody(t *testing.T) {
r := mux.NewRouter()
mw := &mockWriter{}
mr := &mockReader{}
Handlers(r, NewWriterHandler(mw, mr), ReaderHandler{})
Handlers(r, NewWriterHandler(mw, mr), ReaderHandler{}, EmptyResourcePath)

rec := httptest.NewRecorder()
r.ServeHTTP(rec, newRequestBodyFail("PUT", "/22f53313-85c6-46b2-94e7-cfde9322f26c"))
Expand All @@ -135,19 +177,19 @@ func TestWriterHandlerFailWrite(t *testing.T) {
r := mux.NewRouter()
mw := &mockWriter{returnError: errors.New("error writing")}
mr := &mockReader{}
Handlers(r, NewWriterHandler(mw, mr), ReaderHandler{})
Handlers(r, NewWriterHandler(mw, mr), ReaderHandler{}, EmptyResourcePath)

rec := httptest.NewRecorder()
r.ServeHTTP(rec, newRequest("PUT", "/22f53313-85c6-46b2-94e7-cfde9322f26c", "PAYLOAD"))
assert.Equal(t, 500, rec.Code)
assert.Equal(t, "{\"message\":\"Unknown internal error\"}", rec.Body.String())
assert.Equal(t, 503, rec.Code)
assert.Equal(t, "{\"message\":\"Service currently unavailable\"}", rec.Body.String())
}

func TestWriterHandlerDeleteReturnsOK(t *testing.T) {
r := mux.NewRouter()
mw := &mockWriter{}
mr := &mockReader{}
Handlers(r, NewWriterHandler(mw, mr), ReaderHandler{})
Handlers(r, NewWriterHandler(mw, mr), ReaderHandler{}, EmptyResourcePath)

rec := httptest.NewRecorder()
r.ServeHTTP(rec, newRequest("DELETE", "/22f53313-85c6-46b2-94e7-cfde9322f26c", ""))
Expand All @@ -156,94 +198,94 @@ func TestWriterHandlerDeleteReturnsOK(t *testing.T) {
assert.Empty(t, rec.Body.String())
}

func TestWriterHandlerDeleteFailsReturns500(t *testing.T) {
func TestWriterHandlerDeleteFailsReturns503(t *testing.T) {
r := mux.NewRouter()
mw := &mockWriter{returnError: errors.New("Some error from writer")}
mr := &mockReader{}
Handlers(r, NewWriterHandler(mw, mr), ReaderHandler{})
Handlers(r, NewWriterHandler(mw, mr), ReaderHandler{}, EmptyResourcePath)

rec := httptest.NewRecorder()
r.ServeHTTP(rec, newRequest("DELETE", "/22f53313-85c6-46b2-94e7-cfde9322f26c", ""))
assert.Equal(t, 500, rec.Code)
assert.Equal(t, "{\"message\":\"Unknown internal error\"}", rec.Body.String())
assert.Equal(t, 503, rec.Code)
assert.Equal(t, "{\"message\":\"Service currently unavailable\"}", rec.Body.String())
}

func TestReadHandlerForUUID(t *testing.T) {
r := mux.NewRouter()
mr := &mockReader{payload: "Some content", returnCT: "return/type"}
Handlers(r, WriterHandler{}, NewReaderHandler(mr))
Handlers(r, WriterHandler{}, NewReaderHandler(mr), EmptyResourcePath)
assertRequestAndResponseFromRouter(t, r, "/22f53313-85c6-46b2-94e7-cfde9322f26c", 200, "Some content", "return/type")
}

func TestReadHandlerForUUIDAndNoContentType(t *testing.T) {
r := mux.NewRouter()
mr := &mockReader{payload: "Some content"}
Handlers(r, WriterHandler{}, NewReaderHandler(mr))
Handlers(r, WriterHandler{}, NewReaderHandler(mr), EmptyResourcePath)
assertRequestAndResponseFromRouter(t, r, "/22f53313-85c6-46b2-94e7-cfde9322f26c", 200, "Some content", "")
}

func TestReadHandlerForUUIDNotFound(t *testing.T) {
r := mux.NewRouter()
mr := &mockReader{}
Handlers(r, WriterHandler{}, NewReaderHandler(mr))
Handlers(r, WriterHandler{}, NewReaderHandler(mr), EmptyResourcePath)
assertRequestAndResponseFromRouter(t, r, "/22f53313-85c6-46b2-94e7-cfde9322f26c", 404, "{\"message\":\"Item not found\"}", ExpectedContentType)
}

func TestReadHandlerForErrorFromReader(t *testing.T) {
r := mux.NewRouter()
mr := &mockReader{payload: "something came back but", returnError: errors.New("Some error from reader though")}
Handlers(r, WriterHandler{}, NewReaderHandler(mr))
assertRequestAndResponseFromRouter(t, r, "/22f53313-85c6-46b2-94e7-cfde9322f26c", 500, "{\"message\":\"Unknown internal error\"}", ExpectedContentType)
Handlers(r, WriterHandler{}, NewReaderHandler(mr), EmptyResourcePath)
assertRequestAndResponseFromRouter(t, r, "/22f53313-85c6-46b2-94e7-cfde9322f26c", 503, "{\"message\":\"Service currently unavailable\"}", ExpectedContentType)
}

func TestReadHandlerForErrorReadingBody(t *testing.T) {
r := mux.NewRouter()
mr := &mockReader{rc: &mockReaderCloser{err: errors.New("Some error")}}
Handlers(r, WriterHandler{}, NewReaderHandler(mr))
Handlers(r, WriterHandler{}, NewReaderHandler(mr), EmptyResourcePath)

assertRequestAndResponseFromRouter(t, r, "/22f53313-85c6-46b2-94e7-cfde9322f26c", 502, "{\"message\":\"Error while communicating to other service\"}", ExpectedContentType)
}

func TestReadHandlerCountOK(t *testing.T) {
r := mux.NewRouter()
mr := &mockReader{count: 1337}
Handlers(r, WriterHandler{}, NewReaderHandler(mr))
Handlers(r, WriterHandler{}, NewReaderHandler(mr), EmptyResourcePath)
assertRequestAndResponseFromRouter(t, r, "/__count", 200, "1337", ExpectedContentType)
}

func TestReadHandlerCountFailsReturnsInternalServerError(t *testing.T) {
func TestReadHandlerCountFailsReturnsServiceUnavailable(t *testing.T) {
r := mux.NewRouter()
mr := &mockReader{returnError: errors.New("Some error from reader though")}
Handlers(r, WriterHandler{}, NewReaderHandler(mr))
assertRequestAndResponseFromRouter(t, r, "/__count", 500, "{\"message\":\"Unknown internal error\"}", ExpectedContentType)
Handlers(r, WriterHandler{}, NewReaderHandler(mr), EmptyResourcePath)
assertRequestAndResponseFromRouter(t, r, "/__count", 503, "{\"message\":\"Service currently unavailable\"}", ExpectedContentType)
}

func TestReaderHandlerIdsOK(t *testing.T) {
r := mux.NewRouter()
mr := &mockReader{payload: "PAYLOAD"}
Handlers(r, WriterHandler{}, NewReaderHandler(mr))
Handlers(r, WriterHandler{}, NewReaderHandler(mr), EmptyResourcePath)
assertRequestAndResponseFromRouter(t, r, "/__ids", 200, "PAYLOAD", "application/octet-stream")
}

func TestReaderHandlerIdsFailsReturnsInternalServerError(t *testing.T) {
func TestReaderHandlerIdsFailsReturnsServiceUnavailable(t *testing.T) {
r := mux.NewRouter()
mr := &mockReader{returnError: errors.New("Some error from reader though")}
Handlers(r, WriterHandler{}, NewReaderHandler(mr))
assertRequestAndResponseFromRouter(t, r, "/__ids", 500, "{\"message\":\"Unknown internal error\"}", ExpectedContentType)
Handlers(r, WriterHandler{}, NewReaderHandler(mr), EmptyResourcePath)
assertRequestAndResponseFromRouter(t, r, "/__ids", 503, "{\"message\":\"Service currently unavailable\"}", ExpectedContentType)
}

func TestHandleGetAllOK(t *testing.T) {
r := mux.NewRouter()
mr := &mockReader{payload: "PAYLOAD"}
Handlers(r, WriterHandler{}, NewReaderHandler(mr))
Handlers(r, WriterHandler{}, NewReaderHandler(mr), EmptyResourcePath)
assertRequestAndResponseFromRouter(t, r, "/", 200, "PAYLOAD", "application/octet-stream")
}

func TestHandleGetAllFailsReturnsInternalServerError(t *testing.T) {
func TestHandleGetAllFailsReturnsServiceUnavailable(t *testing.T) {
r := mux.NewRouter()
mr := &mockReader{returnError: errors.New("Some error from reader though")}
Handlers(r, WriterHandler{}, NewReaderHandler(mr))
assertRequestAndResponseFromRouter(t, r, "/", 500, "{\"message\":\"Unknown internal error\"}", ExpectedContentType)
Handlers(r, WriterHandler{}, NewReaderHandler(mr), EmptyResourcePath)
assertRequestAndResponseFromRouter(t, r, "/", 503, "{\"message\":\"Service currently unavailable\"}", ExpectedContentType)
}

func assertRequestAndResponseFromRouter(t testing.TB, r *mux.Router, url string, expectedStatus int, expectedBody string, expectedContentType string) *httptest.ResponseRecorder {
Expand Down
Loading