Skip to content

Commit

Permalink
Merge pull request #120 from pipermerriam/piper/gevent-connection-poo…
Browse files Browse the repository at this point in the history
…ling

Piper/gevent connection pooling
  • Loading branch information
pipermerriam committed Oct 24, 2016
2 parents 74a8178 + e534b4b commit 6571a8c
Show file tree
Hide file tree
Showing 7 changed files with 132 additions and 6 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
Unreleased
-----------

* Introduced `KeepAliveRPCProvider` to correctly recycle HTTP connections and use HTTP keep alive

3.1.1
-----

Expand Down
13 changes: 11 additions & 2 deletions docs/quickstart.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,19 @@ The ``Web3`` object
The common entrypoint for interacting with the Web3 library is the ``Web3``
class. You will need to instantiate a web3 instance.

Web3 takes a connection to existing Ethereum node (*Geth* or *Parity*).
This connection can be either over JSON-RPC using HTTP (TCP/IP)
or UNIX sockets (IPC).

.. code-block:: python
>>> from web3 import Web, RPCProvider, IPCProvider
>>> web3 = Web3(RPCProvider(host='localhost', port='8545'))
>>> from web3 import Web, KeepAliveRPCProvider, IPCProvider
# Note that you should create only one RPCProvider per
# process, as it recycles underlying TCP/IP network connections between
# your process and Ethereum node
>>> web3 = Web3(KeepAliveRPCProvider(host='localhost', port='8545'))
# or for an IPC based connection
>>> web3 = Web3(IPCProvider())
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
"geventhttpclient>=1.3.1",
"ethereum-abi-utils>=0.2.1",
"pysha3>=0.3",
"pylru>=1.0.9",
],
py_modules=['web3'],
license="MIT",
Expand Down
8 changes: 5 additions & 3 deletions tests/providers/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from gevent import socket

from web3.providers.ipc import IPCProvider
from web3.providers.rpc import TestRPCProvider, RPCProvider
from web3.providers.rpc import TestRPCProvider, RPCProvider, KeepAliveRPCProvider


def get_open_port():
Expand All @@ -15,7 +15,7 @@ def get_open_port():
return port


@pytest.fixture(params=['tester', 'rpc', 'ipc'])
@pytest.fixture(params=['tester', 'rpc', 'ipc', 'keep-alive-rpc'])
def disconnected_provider(request):
"""
Supply a Provider that's not connected to a node.
Expand All @@ -30,7 +30,9 @@ def disconnected_provider(request):
provider.thread.kill()
return provider
elif request.param == 'rpc':
return RPCProvider(port=9999)
return RPCProvider(port=get_open_port())
elif request.param == 'keep-alive-rpc':
return KeepAliveRPCProvider(port=get_open_port())
elif request.param == 'ipc':
return IPCProvider(ipc_path='nonexistent')
else:
Expand Down
1 change: 1 addition & 0 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ max-line-length= 100
exclude= tests/*

[testenv]
usedevelop=True
commands=
admin: py.test {posargs:tests/admin-module}
eth: py.test {posargs:tests/eth-module}
Expand Down
2 changes: 2 additions & 0 deletions web3/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from web3.main import Web3
from web3.providers.rpc import (
RPCProvider,
KeepAliveRPCProvider,
TestRPCProvider,
)
from web3.providers.ipc import IPCProvider
Expand All @@ -17,5 +18,6 @@
"Web3",
"RPCProvider",
"TestRPCProvider",
"KeepAliveRPCProvider",
"IPCProvider",
]
108 changes: 107 additions & 1 deletion web3/providers/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,26 @@
from geventhttpclient import HTTPClient
import logging

import pylru

from .base import BaseProvider # noqa: E402


logger = logging.getLogger(__name__)


class RPCProvider(BaseProvider):
"""Create a RPC client.
.. note ::
You should preferably create only one client per process,
or otherwise underlying HTTP network connections may leak.
"""
def __init__(self,
host="127.0.0.1",
port="8545",
port=8545,
path="/",
ssl=False,
connection_timeout=10,
Expand All @@ -32,6 +44,12 @@ def __init__(self,

super(RPCProvider, self).__init__(*args, **kwargs)

def __str__(self):
return "RPC connection {}:{}".format(self.host, self.port)

def __repr__(self):
return self.__str__()

def make_request(self, method, params):
from web3 import __version__ as web3_version
request_data = self.encode_rpc_request(method, params)
Expand All @@ -57,6 +75,94 @@ def make_request(self, method, params):
return response_body


_client_cache = pylru.lrucache(128)


class KeepAliveRPCProvider(BaseProvider):
"""RPC-provider that handles HTTP keep-alive connection correctly.
HTTP client is recycled across requests. Create only one instance of
KeepAliveProvider per process.
"""
def __init__(self,
host="127.0.0.1",
port=8545,
path="/",
ssl=False,
connection_timeout=10,
network_timeout=10,
concurrency=10,
*args,
**kwargs):
"""Create a new RPC client with keep-alive connection pool.
:param concurrency: See :class:`geventhttpclient.HTTPClient`
:param connection_timeout: See :class:`geventhttpclient.HTTPClient`
:param network_timeout: See :class:`geventhttpclient.HTTPClient`
"""
self.host = host
self.port = int(port)
self.path = path
self.ssl = ssl
self.connection_timeout = connection_timeout
self.network_timeout = network_timeout
self.concurrency = concurrency

super(KeepAliveRPCProvider, self).__init__(*args, **kwargs)

self.client = self.get_or_create_client()

def get_or_create_client(self):
from web3 import __version__ as web3_version
global _client_cache

key = "{}:{}".format(self.host, self.port)

try:
# Get in-process client instance for this host
client = _client_cache[key]
logger.debug("Re-using HTTP client for RPC connection to %s", key)
except KeyError:
request_user_agent = 'Web3.py/{version}/{class_name}'.format(
version=web3_version,
class_name=type(self),
)

client = HTTPClient(
host=self.host,
port=self.port,
ssl=self.ssl,
connection_timeout=self.connection_timeout,
network_timeout=self.network_timeout,
concurrency=self.concurrency,
headers={
'Content-Type': 'application/json',
'User-Agent': request_user_agent,
},
)
_client_cache[key] = client
logger.debug(
"Created new keep-alive HTTP client for RPC connection to %s",
key,
)

return client

def __str__(self):
return "Keep-alive RPC connection {}:{}".format(self.host, self.port)

def __repr__(self):
return self.__str__()

def make_request(self, method, params):
request_data = self.encode_rpc_request(method, params)
response = self.client.post(self.path, body=request_data)
response_body = response.read()
return response_body


def is_testrpc_available():
try:
import testrpc # noqa: F401
Expand Down

0 comments on commit 6571a8c

Please sign in to comment.