From ebb6d04f1f59818a19525123d32bd64f163063fe Mon Sep 17 00:00:00 2001 From: "Dylan.Wen" Date: Tue, 7 Jul 2015 18:25:14 +0800 Subject: [PATCH 1/3] add http transport implementation for golang server --- lib/go/thrift/http_client.go | 2 + lib/go/thrift/http_transport.go | 366 ++++++++++++++++++++++++++++++++ 2 files changed, 368 insertions(+) create mode 100644 lib/go/thrift/http_transport.go diff --git a/lib/go/thrift/http_client.go b/lib/go/thrift/http_client.go index b7cb10191b5..1993f7bba12 100644 --- a/lib/go/thrift/http_client.go +++ b/lib/go/thrift/http_client.go @@ -21,6 +21,7 @@ package thrift import ( "bytes" + "fmt" "io" "net/http" "net/url" @@ -177,6 +178,7 @@ func (p *THttpClient) Flush() error { return NewTTransportExceptionFromError(err) } p.header.Add("Content-Type", "application/x-thrift") + p.header.Add("Content-Length", fmt.Sprintf("%d", p.requestBuffer.Len())) req.Header = p.header response, err := client.Do(req) if err != nil { diff --git a/lib/go/thrift/http_transport.go b/lib/go/thrift/http_transport.go new file mode 100644 index 00000000000..1ede0c2e77d --- /dev/null +++ b/lib/go/thrift/http_transport.go @@ -0,0 +1,366 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package thrift + +import ( + "bytes" + "fmt" + "io" + "strconv" + "time" +) + +const ( + BAD_STATUS = 5 + NO_MORE_DATA = 6 +) + +var ( + // The golang language spec doesn't allow array constants, + // so these constants are listed as vars here. + THttpTransportStatusLineSep = []byte{' '} + THttpTransportHeaderColon = []byte{':'} + THttpTransportChunkSemicolon = []byte{';'} + THttpTransportCRLF = []byte{'\r', '\n'} +) + +var ( + THttpTransportBufferSize = 1024 +) + +type THttpTransport struct { + transport TTransport + + readBuffer *bytes.Buffer + writeBuffer *bytes.Buffer + readHeadersDone bool + contentLength int + chunked bool +} + +func NewTHttpTransport(transport TTransport) *THttpTransport { + return &THttpTransport{ + transport: transport, + readBuffer: bytes.NewBuffer(make([]byte, 0, THttpTransportBufferSize)), + writeBuffer: bytes.NewBuffer(make([]byte, 0, THttpTransportBufferSize)), + } +} + +func (p *THttpTransport) init() { + p.readBuffer.Reset() + p.writeBuffer.Reset() + p.readHeadersDone = false + p.contentLength = 0 + p.chunked = false +} + +func (p *THttpTransport) Open() error { + return p.transport.Open() +} + +func (p *THttpTransport) IsOpen() bool { + return p.transport.IsOpen() +} + +func (p *THttpTransport) Close() error { + p.init() + return p.transport.Close() +} + +func (p *THttpTransport) Read(buf []byte) (l int, err error) { + n, _ := p.readBuffer.Read(buf) + if n > 0 { + return n, nil + } + p.readBuffer.Reset() + got, err := p.readMoteData() + if got == 0 { + return 0, err + } + n, _ = p.readBuffer.Read(buf) + return n, err +} + +func (p *THttpTransport) readMoteData() (int, error) { + if !p.readHeadersDone { + p.readHeaders() + } + var size int + var err error + if p.chunked { + size, err = p.readChunked() + } else { + size, err = p.readContent(p.contentLength) + } + return size, err +} + +func (p *THttpTransport) readHeaders() error { + // initialize headers status variables + p.contentLength = 0 + p.chunked = false + + readStatusLineDone := false + for { + // read until headers are finished + line, err := p.readLine() + if err != nil { + return err + } + if len(line) == 0 { + if readStatusLineDone { + p.readHeadersDone = true + return nil + } else { + // Must have been an HTTP 100, keep going for another status line + readStatusLineDone = false + } + } else { + if !readStatusLineDone { + if err := p.parseStatusLine(line); err != nil { + return err + } + readStatusLineDone = true + } else { + p.parseHeader(line) + } + } + } +} + +func (p *THttpTransport) refill() error { + buf := make([]byte, THttpTransportBufferSize) + n, err := p.transport.Read(buf) + if n == 0 { + return NewTTransportException(NO_MORE_DATA, "Could not read more data") + } + read := buf[:n] + p.readBuffer.Write(read) + if err == nil || err == io.EOF { + return nil + } + return err +} + +func (p *THttpTransport) readLine() ([]byte, error) { + for { + rbuf := p.readBuffer.Bytes() + i := bytes.Index(rbuf, THttpTransportCRLF) + if i != -1 { + // length is equal to first index of CRLF + line := p.readBuffer.Next(i) + // skip CRLF + p.readBuffer.Next(len(THttpTransportCRLF)) + return line, nil + } else { + if err := p.refill(); err != nil { + return nil, err + } + } + } +} + +func (p *THttpTransport) parseStatusLine(line []byte) error { + s := bytes.Split(line, THttpTransportStatusLineSep) + if len(s) != 3 { + return NewTTransportException(BAD_STATUS, "Bad Status: "+string(line)) + } + method := string(s[0]) + path := s[1] + http := s[2] + if len(path) == 0 || len(http) == 0 { + return NewTTransportException(BAD_STATUS, "Bad Status: "+string(line)) + } + switch method { + case "POST": + // POST method ok, looking for content. + return nil + case "OPTIONS": + // preflight OPTIONS method, we don't need further content. + header := fmt.Sprintf( + "HTTP/1.1 200 OK%s"+ + "Date: %s%s"+ + "Access-Control-Allow-Origin: *%s"+ + "Access-Control-Allow-Methods: POST, OPTIONS%s"+ + "Access-Control-Allow-Headers: Content-Type%s"+ + "%s", + THttpTransportCRLF, + getTimeRFC1123(), THttpTransportCRLF, + THttpTransportCRLF, + THttpTransportCRLF, + THttpTransportCRLF, + THttpTransportCRLF) + // Flush the write buffer and reset header variables + if err := p.flushWriteBuffer(&header); err != nil { + return err + } + p.readHeadersDone = false + return nil + default: + return NewTTransportException(BAD_STATUS, "Bad Status (unsupported method): "+string(line)) + } +} + +func (p *THttpTransport) parseHeader(line []byte) error { + s := bytes.Split(line, THttpTransportHeaderColon) + if len(s) != 2 { + return nil + } + key, value := string(s[0]), string(s[1]) + switch key { + case "Content-Length": + l, err := strconv.Atoi(value) + if err != nil { + return err + } + p.chunked = false + p.contentLength = l + return nil + case "Transfer-Encoding": + if value == "chunked" { + p.chunked = true + } + } + return nil +} + +func getTimeRFC1123() string { + return time.Now().Format(time.RFC1123) +} + +func (p *THttpTransport) readChunked() (int, error) { + var length int + line, err := p.readLine() + if err != nil { + return 0, err + } + chunkSize, err := p.parseChunkSize(line) + if err != nil { + return 0, err + } + if chunkSize == 0 { + if err = p.readChunkedFooters(); err != nil { + return 0, err + } + } else { + // read chunk data + length, err = p.readContent(chunkSize) + if err != nil { + return 0, err + } + // read trailing CRLF after chunk + if _, err = p.readLine(); err != nil { + return length, err + } + } + return length, nil +} + +func (p *THttpTransport) parseChunkSize(line []byte) (int, error) { + s := bytes.SplitN(line, THttpTransportChunkSemicolon, 1) + sizeStr := string(s[0]) + return strconv.Atoi(sizeStr) +} + +func (p *THttpTransport) readChunkedFooters() error { + // End of all chunks, read footer lines untils a blank line + for { + line, err := p.readLine() + if err != nil { + return err + } + if len(line) == 0 { + break + } + } + return nil +} + +func (p *THttpTransport) readContent(size int) (int, error) { + for { + if size < len(p.readBuffer.Bytes()) { + // have enough data in read buffer + break + } + if len(p.readBuffer.Bytes()) == 0 { + // We have given all the data, reset buffer + p.readBuffer.Reset() + } + if err := p.refill(); err != nil { + return 0, nil + } + } + return size, nil +} + +func (p *THttpTransport) Write(buf []byte) (int, error) { + return p.writeBuffer.Write(buf) +} + +func (p *THttpTransport) flushWriteBuffer(header *string) error { + // Write back the header, data and then flush and reset write buffer + if _, err := p.transport.Write([]byte(*header)); err != nil { + return err + } + if _, err := p.transport.Write(p.writeBuffer.Bytes()); err != nil { + return err + } + if err := p.transport.Flush(); err != nil { + return err + } + p.writeBuffer.Reset() + return nil +} + +func (p *THttpTransport) Flush() error { + header := fmt.Sprintf( + "HTTP/1.1 200 OK%s"+ + "Date: %s%s"+ + "Server: Thrift%s"+ + "Access-Control-Allow-Origin: *%s"+ + "Content-Type: application/x-thrift%s"+ + "Content-Length: %d%s"+ + "Connetion: Keep-Alive%s"+ + "%s", + THttpTransportCRLF, + getTimeRFC1123(), THttpTransportCRLF, + THttpTransportCRLF, + THttpTransportCRLF, + THttpTransportCRLF, + len(p.writeBuffer.Bytes()), THttpTransportCRLF, + THttpTransportCRLF, + THttpTransportCRLF, + ) + if err := p.flushWriteBuffer(&header); err != nil { + return err + } + return nil +} + +type tHttpTransportFactory struct { +} + +func NewTHttpTransportFactory() TTransportFactory { + return &tHttpTransportFactory{} +} + +func (p *tHttpTransportFactory) GetTransport(base TTransport) TTransport { + return NewTHttpTransport(base) +} From 6c318e50adba846d9d0a0794a0f2d217ecdcb507 Mon Sep 17 00:00:00 2001 From: "Dylan.Wen" Date: Wed, 8 Jul 2015 14:20:34 +0800 Subject: [PATCH 2/3] add test for golang http transport --- lib/go/thrift/http_transport.go | 110 ++++++++++++++++++--------- lib/go/thrift/http_transport_test.go | 29 +++++++ 2 files changed, 102 insertions(+), 37 deletions(-) create mode 100644 lib/go/thrift/http_transport_test.go diff --git a/lib/go/thrift/http_transport.go b/lib/go/thrift/http_transport.go index 1ede0c2e77d..236a4b80189 100644 --- a/lib/go/thrift/http_transport.go +++ b/lib/go/thrift/http_transport.go @@ -23,6 +23,7 @@ import ( "bytes" "fmt" "io" + "net/http" "strconv" "time" ) @@ -39,12 +40,32 @@ var ( THttpTransportHeaderColon = []byte{':'} THttpTransportChunkSemicolon = []byte{';'} THttpTransportCRLF = []byte{'\r', '\n'} + THttpTransportVersions = [][]byte{[]byte("HTTP/1.0"), []byte("HTTP/1.1")} + THttpTransportStatusCodes = []int{http.StatusOK} ) var ( THttpTransportBufferSize = 1024 ) +func isValidVersion(ver []byte, versions [][]byte) bool { + for _, v := range versions { + if bytes.Equal(ver, v) { + return true + } + } + return false +} + +func isValidStatusCode(code int, codes []int) bool { + for _, c := range codes { + if code == c { + return true + } + } + return false +} + type THttpTransport struct { transport TTransport @@ -67,8 +88,6 @@ func (p *THttpTransport) init() { p.readBuffer.Reset() p.writeBuffer.Reset() p.readHeadersDone = false - p.contentLength = 0 - p.chunked = false } func (p *THttpTransport) Open() error { @@ -100,7 +119,9 @@ func (p *THttpTransport) Read(buf []byte) (l int, err error) { func (p *THttpTransport) readMoteData() (int, error) { if !p.readHeadersDone { - p.readHeaders() + if err := p.readHeaders(); err != nil { + return 0, err + } } var size int var err error @@ -108,6 +129,8 @@ func (p *THttpTransport) readMoteData() (int, error) { size, err = p.readChunked() } else { size, err = p.readContent(p.contentLength) + // reset the state for reentry + p.readHeadersDone = false } return size, err } @@ -149,7 +172,7 @@ func (p *THttpTransport) refill() error { buf := make([]byte, THttpTransportBufferSize) n, err := p.transport.Read(buf) if n == 0 { - return NewTTransportException(NO_MORE_DATA, "Could not read more data") + return err } read := buf[:n] p.readBuffer.Write(read) @@ -182,40 +205,51 @@ func (p *THttpTransport) parseStatusLine(line []byte) error { if len(s) != 3 { return NewTTransportException(BAD_STATUS, "Bad Status: "+string(line)) } - method := string(s[0]) - path := s[1] - http := s[2] - if len(path) == 0 || len(http) == 0 { - return NewTTransportException(BAD_STATUS, "Bad Status: "+string(line)) - } - switch method { - case "POST": - // POST method ok, looking for content. - return nil - case "OPTIONS": - // preflight OPTIONS method, we don't need further content. - header := fmt.Sprintf( - "HTTP/1.1 200 OK%s"+ - "Date: %s%s"+ - "Access-Control-Allow-Origin: *%s"+ - "Access-Control-Allow-Methods: POST, OPTIONS%s"+ - "Access-Control-Allow-Headers: Content-Type%s"+ - "%s", - THttpTransportCRLF, - getTimeRFC1123(), THttpTransportCRLF, - THttpTransportCRLF, - THttpTransportCRLF, - THttpTransportCRLF, - THttpTransportCRLF) - // Flush the write buffer and reset header variables - if err := p.flushWriteBuffer(&header); err != nil { + if isValidVersion(s[0], THttpTransportVersions) { + // http response + statusCode, err := strconv.Atoi(string(s[1])) + if err != nil { return err } - p.readHeadersDone = false + if !isValidStatusCode(statusCode, THttpTransportStatusCodes) { + return NewTTransportException(BAD_STATUS, "Bad Status: "+string(line)) + } return nil - default: - return NewTTransportException(BAD_STATUS, "Bad Status (unsupported method): "+string(line)) + } else if isValidVersion(s[2], THttpTransportVersions) { + // http request + if len(s[1]) == 0 { + return NewTTransportException(BAD_STATUS, "Bad Status: "+string(line)) + } + switch string(s[0]) { + case "POST": + // POST method ok, looking for content. + return nil + case "OPTIONS": + // preflight OPTIONS method, we don't need further content. + header := fmt.Sprintf( + "HTTP/1.1 200 OK%s"+ + "Date: %s%s"+ + "Access-Control-Allow-Origin: *%s"+ + "Access-Control-Allow-Methods: POST, OPTIONS%s"+ + "Access-Control-Allow-Headers: Content-Type%s"+ + "%s", + THttpTransportCRLF, + getTimeRFC1123(), THttpTransportCRLF, + THttpTransportCRLF, + THttpTransportCRLF, + THttpTransportCRLF, + THttpTransportCRLF) + // Flush the write buffer and reset header variables + if err := p.flushWriteBuffer(&header); err != nil { + return err + } + p.readHeadersDone = false + return nil + default: + return NewTTransportException(BAD_STATUS, "Bad Status (unsupported method): "+string(line)) + } } + return NewTTransportException(BAD_STATUS, "Bad Status: "+string(line)) } func (p *THttpTransport) parseHeader(line []byte) error { @@ -223,7 +257,7 @@ func (p *THttpTransport) parseHeader(line []byte) error { if len(s) != 2 { return nil } - key, value := string(s[0]), string(s[1]) + key, value := string(bytes.TrimSpace(s[0])), string(bytes.TrimSpace(s[1])) switch key { case "Content-Length": l, err := strconv.Atoi(value) @@ -259,6 +293,8 @@ func (p *THttpTransport) readChunked() (int, error) { if err = p.readChunkedFooters(); err != nil { return 0, err } + // reset the state for reentry + p.readHeadersDone = false } else { // read chunk data length, err = p.readContent(chunkSize) @@ -295,7 +331,7 @@ func (p *THttpTransport) readChunkedFooters() error { func (p *THttpTransport) readContent(size int) (int, error) { for { - if size < len(p.readBuffer.Bytes()) { + if size <= len(p.readBuffer.Bytes()) { // have enough data in read buffer break } @@ -304,7 +340,7 @@ func (p *THttpTransport) readContent(size int) (int, error) { p.readBuffer.Reset() } if err := p.refill(); err != nil { - return 0, nil + return 0, err } } return size, nil diff --git a/lib/go/thrift/http_transport_test.go b/lib/go/thrift/http_transport_test.go new file mode 100644 index 00000000000..2f426d4680a --- /dev/null +++ b/lib/go/thrift/http_transport_test.go @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package thrift + +import ( + "testing" +) + +func TestHttpTransport(t *testing.T) { + trans := NewTHttpTransport(NewTMemoryBuffer()) + TransportTest(t, trans, trans) +} From b1e4bb7db9af82bd078cfaaf67df27a3fb72e108 Mon Sep 17 00:00:00 2001 From: "Dylan.Wen" Date: Wed, 8 Jul 2015 16:38:39 +0800 Subject: [PATCH 3/3] simplify the used interfaces a little bit --- lib/go/thrift/http_transport.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/go/thrift/http_transport.go b/lib/go/thrift/http_transport.go index 236a4b80189..ac1d29d64be 100644 --- a/lib/go/thrift/http_transport.go +++ b/lib/go/thrift/http_transport.go @@ -331,11 +331,11 @@ func (p *THttpTransport) readChunkedFooters() error { func (p *THttpTransport) readContent(size int) (int, error) { for { - if size <= len(p.readBuffer.Bytes()) { + if size <= p.readBuffer.Len() { // have enough data in read buffer break } - if len(p.readBuffer.Bytes()) == 0 { + if p.readBuffer.Len() == 0 { // We have given all the data, reset buffer p.readBuffer.Reset() }