Skip to content
This repository has been archived by the owner on Feb 21, 2024. It is now read-only.

refactored httpclient handling #991

Merged
merged 2 commits into from Dec 4, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
30 changes: 4 additions & 26 deletions client.go
Expand Up @@ -25,7 +25,6 @@ import (
"io/ioutil"
"log"
"math/rand"
"net"
"net/http"
"net/url"
"sort"
Expand All @@ -46,14 +45,13 @@ type ClientOptions struct {
// InternalHTTPClient represents a client to the Pilosa cluster.
type InternalHTTPClient struct {
defaultURI *URI
options *ClientOptions

// The client to use for HTTP communication.
HTTPClient *http.Client
}

// NewInternalHTTPClient returns a new instance of InternalHTTPClient to connect to host.
func NewInternalHTTPClient(host string, options *ClientOptions) (*InternalHTTPClient, error) {
func NewInternalHTTPClient(host string, remoteClient *http.Client) (*InternalHTTPClient, error) {
if host == "" {
return nil, ErrHostRequired
}
Expand All @@ -63,34 +61,14 @@ func NewInternalHTTPClient(host string, options *ClientOptions) (*InternalHTTPCl
return nil, err
}

client := NewInternalHTTPClientFromURI(uri, options)
client := NewInternalHTTPClientFromURI(uri, remoteClient)
return client, nil
}

func NewInternalHTTPClientFromURI(defaultURI *URI, options *ClientOptions) *InternalHTTPClient {
if options == nil {
options = &ClientOptions{}
}
transport := &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
DualStack: true,
}).DialContext,
MaxIdleConns: 1000,
MaxIdleConnsPerHost: 200,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
}
if options.TLS != nil {
transport.TLSClientConfig = options.TLS
}
client := &http.Client{Transport: transport}
func NewInternalHTTPClientFromURI(defaultURI *URI, remoteClient *http.Client) *InternalHTTPClient {
return &InternalHTTPClient{
defaultURI: defaultURI,
HTTPClient: client,
HTTPClient: remoteClient,
}
}

Expand Down
34 changes: 21 additions & 13 deletions client_test.go
Expand Up @@ -18,6 +18,7 @@ import (
"bytes"
"context"
"fmt"
"net/http"
"reflect"
"testing"

Expand All @@ -43,6 +44,13 @@ func createCluster(c *pilosa.Cluster) ([]*test.Server, []*test.Holder) {
return server, hldr
}

var defaultClient *http.Client

func init() {
defaultClient = pilosa.GetHTTPClient(nil)

}

// Test distributed TopN Row count across 3 nodes.
func TestClient_MultiNode(t *testing.T) {
cluster := test.NewCluster(3)
Expand All @@ -54,23 +62,23 @@ func TestClient_MultiNode(t *testing.T) {
}

s[0].Handler.Executor.ExecuteFn = func(ctx context.Context, index string, query *pql.Query, slices []uint64, opt *pilosa.ExecOptions) ([]interface{}, error) {
e := pilosa.NewExecutor(nil)
e := pilosa.NewExecutor(defaultClient)
e.Holder = hldr[0].Holder
e.Scheme = cluster.Nodes[0].Scheme
e.Host = cluster.Nodes[0].Host
e.Cluster = cluster
return e.Execute(ctx, index, query, slices, opt)
}
s[1].Handler.Executor.ExecuteFn = func(ctx context.Context, index string, query *pql.Query, slices []uint64, opt *pilosa.ExecOptions) ([]interface{}, error) {
e := pilosa.NewExecutor(nil)
e := pilosa.NewExecutor(defaultClient)
e.Holder = hldr[1].Holder
e.Scheme = cluster.Nodes[1].Scheme
e.Host = cluster.Nodes[1].Host
e.Cluster = cluster
return e.Execute(ctx, index, query, slices, opt)
}
s[2].Handler.Executor.ExecuteFn = func(ctx context.Context, index string, query *pql.Query, slices []uint64, opt *pilosa.ExecOptions) ([]interface{}, error) {
e := pilosa.NewExecutor(nil)
e := pilosa.NewExecutor(defaultClient)
e.Holder = hldr[2].Holder
e.Scheme = cluster.Nodes[2].Scheme
e.Host = cluster.Nodes[2].Host
Expand Down Expand Up @@ -135,9 +143,9 @@ func TestClient_MultiNode(t *testing.T) {

// Connect to each node to compare results.
client := make([]*test.Client, 3)
client[0] = test.MustNewClient(s[0].Host())
client[1] = test.MustNewClient(s[1].Host())
client[2] = test.MustNewClient(s[2].Host())
client[0] = test.MustNewClient(s[0].Host(), defaultClient)
client[1] = test.MustNewClient(s[1].Host(), defaultClient)
client[2] = test.MustNewClient(s[2].Host(), defaultClient)

topN := 4
queryRequest := &internal.QueryRequest{
Expand Down Expand Up @@ -218,7 +226,7 @@ func TestClient_Import(t *testing.T) {
s.Handler.Holder = hldr.Holder

// Send import request.
c := test.MustNewClient(s.Host())
c := test.MustNewClient(s.Host(), defaultClient)
if err := c.Import(context.Background(), "i", "f", 0, []pilosa.Bit{
{RowID: 0, ColumnID: 1},
{RowID: 0, ColumnID: 5},
Expand Down Expand Up @@ -269,7 +277,7 @@ func TestClient_ImportInverseEnabled(t *testing.T) {
s.Handler.Holder = hldr.Holder

// Send import request.
c := test.MustNewClient(s.Host())
c := test.MustNewClient(s.Host(), defaultClient)
if err := c.Import(context.Background(), "i", "f", 0, []pilosa.Bit{
{RowID: 0, ColumnID: 1},
{RowID: 0, ColumnID: 5},
Expand Down Expand Up @@ -318,7 +326,7 @@ func TestClient_ImportValue(t *testing.T) {
s.Handler.Holder = hldr.Holder

// Send import request.
c := test.MustNewClient(s.Host())
c := test.MustNewClient(s.Host(), defaultClient)
if err := c.ImportValue(context.Background(), "i", "f", fld.Name, 0, []pilosa.FieldValue{
{ColumnID: 1, Value: -10},
{ColumnID: 2, Value: 20},
Expand Down Expand Up @@ -355,7 +363,7 @@ func TestClient_BackupRestore(t *testing.T) {
s.Handler.Cluster.Nodes[0].Host = s.Host()
s.Handler.Holder = hldr.Holder

c := test.MustNewClient(s.Host())
c := test.MustNewClient(s.Host(), defaultClient)

// Backup from frame.
var buf bytes.Buffer
Expand Down Expand Up @@ -420,7 +428,7 @@ func TestClient_BackupInverseView(t *testing.T) {
s.Handler.Cluster.Nodes[0].Host = s.Host()
s.Handler.Holder = hldr.Holder

c := test.MustNewClient(s.Host())
c := test.MustNewClient(s.Host(), defaultClient)

// Backup from frame.
var buf bytes.Buffer
Expand Down Expand Up @@ -457,7 +465,7 @@ func TestClient_BackupInvalidView(t *testing.T) {
s.Handler.Cluster.Nodes[0].Host = s.Host()
s.Handler.Holder = hldr.Holder

c := test.MustNewClient(s.Host())
c := test.MustNewClient(s.Host(), defaultClient)

// Backup from frame.
var buf bytes.Buffer
Expand Down Expand Up @@ -487,7 +495,7 @@ func TestClient_FragmentBlocks(t *testing.T) {
s.Handler.Holder = hldr.Holder

// Retrieve blocks.
c := test.MustNewClient(s.Host())
c := test.MustNewClient(s.Host(), defaultClient)
blocks, err := c.FragmentBlocks(context.Background(), "i", "f", pilosa.ViewStandard, 0)
if err != nil {
t.Fatal(err)
Expand Down
8 changes: 4 additions & 4 deletions ctl/common.go
Expand Up @@ -2,6 +2,7 @@ package ctl

import (
"crypto/tls"

"github.com/pilosa/pilosa"
"github.com/spf13/pflag"
)
Expand All @@ -22,19 +23,18 @@ func SetTLSConfig(flags *pflag.FlagSet, certificatePath *string, certificateKeyP
// CommandClient returns a pilosa.InternalHTTPClient for the command
func CommandClient(cmd CommandWithTLSSupport) (*pilosa.InternalHTTPClient, error) {
tlsConfig := cmd.TLSConfiguration()
var clientOptions *pilosa.ClientOptions
var TLSConfig *tls.Config
if tlsConfig.CertificatePath != "" && tlsConfig.CertificateKeyPath != "" {
cert, err := tls.LoadX509KeyPair(tlsConfig.CertificatePath, tlsConfig.CertificateKeyPath)
if err != nil {
return nil, err
}
TLSConfig := &tls.Config{
TLSConfig = &tls.Config{
Certificates: []tls.Certificate{cert},
InsecureSkipVerify: tlsConfig.SkipVerify,
}
clientOptions = &pilosa.ClientOptions{TLS: TLSConfig}
}
client, err := pilosa.NewInternalHTTPClient(cmd.TLSHost(), clientOptions)
client, err := pilosa.NewInternalHTTPClient(cmd.TLSHost(), pilosa.GetHTTPClient(TLSConfig))
if err != nil {
return nil, err
}
Expand Down
24 changes: 11 additions & 13 deletions executor.go
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"errors"
"fmt"
"net/http"
"sort"
"time"

Expand Down Expand Up @@ -51,12 +52,9 @@ type Executor struct {
}

// NewExecutor returns a new instance of Executor.
func NewExecutor(clientOptions *ClientOptions) *Executor {
if clientOptions == nil {
clientOptions = &ClientOptions{}
}
func NewExecutor(remoteClient *http.Client) *Executor {
return &Executor{
client: NewInternalHTTPClientFromURI(nil, clientOptions),
client: NewInternalHTTPClientFromURI(nil, remoteClient),
}
}

Expand Down Expand Up @@ -968,7 +966,7 @@ func (e *Executor) executeClearBitView(ctx context.Context, index string, c *pql
}

// Forward call to remote node otherwise.
if res, err := e.exec(ctx, node, index, &pql.Query{Calls: []*pql.Call{c}}, nil, opt); err != nil {
if res, err := e.remoteExec(ctx, node, index, &pql.Query{Calls: []*pql.Call{c}}, nil, opt); err != nil {
return false, err
} else {
ret = res[0].(bool)
Expand Down Expand Up @@ -1074,7 +1072,7 @@ func (e *Executor) executeSetBitView(ctx context.Context, index string, c *pql.C
}

// Forward call to remote node otherwise.
if res, err := e.exec(ctx, node, index, &pql.Query{Calls: []*pql.Call{c}}, nil, opt); err != nil {
if res, err := e.remoteExec(ctx, node, index, &pql.Query{Calls: []*pql.Call{c}}, nil, opt); err != nil {
return false, err
} else {
ret = res[0].(bool)
Expand Down Expand Up @@ -1141,7 +1139,7 @@ func (e *Executor) executeSetFieldValue(ctx context.Context, index string, c *pq
resp := make(chan error, len(nodes))
for _, node := range nodes {
go func(node *Node) {
_, err := e.exec(ctx, node, index, &pql.Query{Calls: []*pql.Call{c}}, nil, opt)
_, err := e.remoteExec(ctx, node, index, &pql.Query{Calls: []*pql.Call{c}}, nil, opt)
resp <- err
}(node)
}
Expand Down Expand Up @@ -1199,7 +1197,7 @@ func (e *Executor) executeSetRowAttrs(ctx context.Context, index string, c *pql.
resp := make(chan error, len(nodes))
for _, node := range nodes {
go func(node *Node) {
_, err := e.exec(ctx, node, index, &pql.Query{Calls: []*pql.Call{c}}, nil, opt)
_, err := e.remoteExec(ctx, node, index, &pql.Query{Calls: []*pql.Call{c}}, nil, opt)
resp <- err
}(node)
}
Expand Down Expand Up @@ -1286,7 +1284,7 @@ func (e *Executor) executeBulkSetRowAttrs(ctx context.Context, index string, cal
resp := make(chan error, len(nodes))
for _, node := range nodes {
go func(node *Node) {
_, err := e.exec(ctx, node, index, &pql.Query{Calls: calls}, nil, opt)
_, err := e.remoteExec(ctx, node, index, &pql.Query{Calls: calls}, nil, opt)
resp <- err
}(node)
}
Expand Down Expand Up @@ -1345,7 +1343,7 @@ func (e *Executor) executeSetColumnAttrs(ctx context.Context, index string, c *p
resp := make(chan error, len(nodes))
for _, node := range nodes {
go func(node *Node) {
_, err := e.exec(ctx, node, index, &pql.Query{Calls: []*pql.Call{c}}, nil, opt)
_, err := e.remoteExec(ctx, node, index, &pql.Query{Calls: []*pql.Call{c}}, nil, opt)
resp <- err
}(node)
}
Expand All @@ -1361,7 +1359,7 @@ func (e *Executor) executeSetColumnAttrs(ctx context.Context, index string, c *p
}

// exec executes a PQL query remotely for a set of slices on a node.
func (e *Executor) exec(ctx context.Context, node *Node, index string, q *pql.Query, slices []uint64, opt *ExecOptions) (results []interface{}, err error) {
func (e *Executor) remoteExec(ctx context.Context, node *Node, index string, q *pql.Query, slices []uint64, opt *ExecOptions) (results []interface{}, err error) {
// Encode request object.
pbreq := &internal.QueryRequest{
Query: q.String(),
Expand Down Expand Up @@ -1511,7 +1509,7 @@ func (e *Executor) mapper(ctx context.Context, ch chan mapResponse, nodes []*Nod
if n.Host == e.Host {
resp.result, resp.err = e.mapperLocal(ctx, nodeSlices, mapFn, reduceFn)
} else if !opt.Remote {
results, err := e.exec(ctx, n, index, &pql.Query{Calls: []*pql.Call{c}}, nodeSlices, opt)
results, err := e.remoteExec(ctx, n, index, &pql.Query{Calls: []*pql.Call{c}}, nodeSlices, opt)
if len(results) > 0 {
resp.result = results[0]
}
Expand Down
11 changes: 6 additions & 5 deletions fragment.go
Expand Up @@ -28,6 +28,7 @@ import (
"io"
"io/ioutil"
"log"
"net/http"
"os"
"sort"
"sync"
Expand Down Expand Up @@ -1677,9 +1678,9 @@ func (h *blockHasher) WriteValue(v uint64) {
type FragmentSyncer struct {
Fragment *Fragment

Host string
Cluster *Cluster
ClientOptions *ClientOptions
Host string
Cluster *Cluster
RemoteClient *http.Client

Closing <-chan struct{}
}
Expand Down Expand Up @@ -1714,7 +1715,7 @@ func (s *FragmentSyncer) SyncFragment() error {
}

// Retrieve remote blocks.
client, err := NewInternalHTTPClient(node.Host, s.ClientOptions)
client, err := NewInternalHTTPClient(node.Host, s.RemoteClient)
if err != nil {
return err
}
Expand Down Expand Up @@ -1793,7 +1794,7 @@ func (s *FragmentSyncer) syncBlock(id int) error {
return nil
}

client, err := NewInternalHTTPClient(node.Host, s.ClientOptions)
client, err := NewInternalHTTPClient(node.Host, s.RemoteClient)
if err != nil {
return err
}
Expand Down
8 changes: 4 additions & 4 deletions handler.go
Expand Up @@ -56,9 +56,9 @@ type Handler struct {
StatusHandler StatusHandler

// Local hostname & cluster configuration.
URI *URI
Cluster *Cluster
ClientOptions *ClientOptions
URI *URI
Cluster *Cluster
RemoteClient *http.Client

Router *mux.Router

Expand Down Expand Up @@ -1506,7 +1506,7 @@ func (h *Handler) handlePostFrameRestore(w http.ResponseWriter, r *http.Request)
}

// Create a client for the remote cluster.
client := NewInternalHTTPClientFromURI(host, h.ClientOptions)
client := NewInternalHTTPClientFromURI(host, h.RemoteClient)

// Determine the maximum number of slices.
maxSlices, err := client.MaxSliceByIndex(r.Context())
Expand Down