Skip to content

Commit

Permalink
Go back to urllib3 instead of requests
Browse files Browse the repository at this point in the history
  • Loading branch information
George Sakkis committed Jul 3, 2012
1 parent 100f2b1 commit 48cb94a
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 24 deletions.
58 changes: 36 additions & 22 deletions pyes/connection_http.py
@@ -1,18 +1,19 @@
# -*- coding: utf-8 -*-
from __future__ import absolute_import
from requests.exceptions import RequestException
from . import logger
from .exceptions import NoServerAvailable
from .fakettypes import Method, RestResponse
from time import time
from urllib import urlencode
from urlparse import urlparse
import random
import threading
import requests
from .exceptions import NoServerAvailable
from .fakettypes import Method, RestResponse
from . import logger
import urllib3

__all__ = ["connect"]

DEFAULT_SERVER = ("http", "127.0.0.1", 9200)
SESSION = requests.session()
POOL_MANAGER = urllib3.PoolManager()


class Connection(object):
Expand Down Expand Up @@ -51,41 +52,54 @@ def __init__(self, servers=None, retry_time=60, max_retries=3, timeout=None,
self._retry_time = retry_time
self._max_retries = max_retries
self._timeout = timeout
if isinstance(basic_auth, dict):
self._auth = (basic_auth["username"], basic_auth["password"])
if basic_auth:
self._headers = urllib3.make_headers(basic_auth="%(username)s:%(password)s" % basic_auth)
else:
self._auth = basic_auth
self._headers = {}
self._lock = threading.RLock()
self._local = threading.local()

def execute(self, request):
"""Execute a request and return a response"""
url = request.uri
if request.parameters:
url += '?' + urlencode(request.parameters)

if request.headers:
headers = dict(self._headers, **request.headers)
else:
headers = self._headers

kwargs = dict(
method=Method._VALUES_TO_NAMES[request.method],
url=url,
body=request.body,
headers=headers,
timeout=self._timeout,
)

retry = 0
server = getattr(self._local, "server", None)
while True:
if not server:
self._local.server = server = self._get_server()
try:
response = SESSION.request(
method=Method._VALUES_TO_NAMES[request.method],
url=server + request.uri,
params=request.parameters,
data=request.body,
headers=request.headers,
auth=self._auth,
timeout=self._timeout,
)
return RestResponse(status=response.status_code,
body=response.content,
parse_result = urlparse(server)
conn = POOL_MANAGER.connection_from_host(parse_result.hostname,
parse_result.port,
parse_result.scheme)
response = conn.urlopen(**kwargs)
return RestResponse(status=response.status,
body=response.data,
headers=response.headers)
except RequestException:
except urllib3.exceptions.HTTPError:
self._drop_server(server)
self._local.server = server = None
if retry >= self._max_retries:
logger.error("Client error: bailing out after %d failed retries",
self._max_retries, exc_info=1)
raise NoServerAvailable
logger.debug("Client error: %d retries left", self._max_retries - retry)
logger.exception("Client error: %d retries left", self._max_retries - retry)
retry += 1

def _get_server(self):
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Expand Up @@ -17,4 +17,4 @@ upload-dir = docs/.build/html
[bdist_rpm]
requires = python-thrift
python-multiprocessing==2.6.2.1
requests==0.9.1
urllib3==1.4
2 changes: 1 addition & 1 deletion setup.py
Expand Up @@ -60,7 +60,7 @@ def run(self, *args, **kwargs):
TestCommand.run(self, *args, **kwargs)


install_requires = ["requests"]
install_requires = ["urllib3"]

#if not sys.platform.startswith("java"):
# install_requires += [ "thrift", ]
Expand Down

0 comments on commit 48cb94a

Please sign in to comment.