Skip to content

Commit

Permalink
0.2.0b2:
Browse files Browse the repository at this point in the history
- fix server()
- improve gathering proxies
- improve reading data from proxy
  • Loading branch information
constverum committed Jun 2, 2016
1 parent 1986a42 commit 16e7a07
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 58 deletions.
1 change: 0 additions & 1 deletion MANIFEST.in
Expand Up @@ -3,4 +3,3 @@ include README.rst
include CHANGELOG.rst
include requirements.txt
include proxybroker/data/*.mmdb
include proxybroker/data/*.gif
4 changes: 2 additions & 2 deletions proxybroker/__init__.py
Expand Up @@ -16,8 +16,8 @@

__title__ = 'ProxyBroker'
__package__ = 'proxybroker'
__version__ = '0.2.0b1'
__short_description__ = '[Finder/Grabber/Checker] Finds public proxies on multiple sources and concurrently checks them (type, anonymity, country). HTTP(S) & SOCKS'
__version__ = '0.2.0b2'
__short_description__ = '[Finder/Checker/Server] Finds public proxies from multiple sources and concurrently checks them. Supports HTTP(S) and SOCKS4/5.'
__author__ = 'Constverum'
__author_email__ = 'constverum@gmail.com'
__url__ = 'https://github.com/constverum/ProxyBroker'
Expand Down
49 changes: 25 additions & 24 deletions proxybroker/api.py
Expand Up @@ -14,8 +14,11 @@
from .resolver import Resolver
from .providers import Provider, PROVIDERS

# Pause between grabbing cycles; in seconds.
GRAB_PAUSE = 180

GRAB_PAUSE = 180 # Pause between the cycles grabbing in seconds
# The maximum number of providers that are parsed concurrently
MAX_CONCURRENT_PROVIDERS = 3


class Broker:
Expand Down Expand Up @@ -265,32 +268,30 @@ async def _load(self, data, check=True):
self._done()

async def _grab(self, types=None, check=False):
# TODO: need refactoring
def _get_tasks(by=MAX_CONCURRENT_PROVIDERS):
providers = [pr for pr in self._providers if not types or
not pr.proto or bool(pr.proto & types.keys())]
while providers:
tasks = [asyncio.ensure_future(pr.get_proxies())
for pr in providers[:by]]
del providers[:by]
self._all_tasks.extend(tasks)
yield tasks
log.debug('Start grabbing proxies')
providers = [pr for pr in self._providers
if not types or not pr.proto or bool(pr.proto & types.keys())]

if not check:
tasks = [asyncio.ensure_future(pr.get_proxies()) for pr in providers]
self._all_tasks.extend(tasks)
for task in asyncio.as_completed(tasks):
proxies = await task
for proxy in proxies:
await self._handle(proxy, check=check)
else:
while True:
for pr in providers:
proxies = await pr.get_proxies()
while True:
for tasks in _get_tasks():
for task in asyncio.as_completed(tasks):
proxies = await task
for proxy in proxies:
await self._handle(proxy, check=check)
log.debug('Grab cycle is complete')
if self._server:
log.debug('sleeped')
await asyncio.sleep(GRAB_PAUSE)
log.debug('unsleeped')
else:
break
await self._on_check.join()
log.debug('Grab cycle is complete')
if self._server:
log.debug('fall asleep for %d seconds' % GRAB_PAUSE)
await asyncio.sleep(GRAB_PAUSE)
log.debug('awaked')
else:
break
await self._on_check.join()
self._done()

async def _handle(self, proxy, check=False):
Expand Down
12 changes: 6 additions & 6 deletions proxybroker/negotiators.py
Expand Up @@ -41,7 +41,7 @@ class Socks5Ngtr(BaseNegotiator):

async def negotiate(self, **kwargs):
await self._proxy.send(struct.pack('3B', 5, 1, 0))
resp = await self._proxy.recv(2, one_chunk=True)
resp = await self._proxy.recv(2)

if resp[0] == 0x05 and resp[1] == 0xff:
self._proxy.log('Failed (auth is required)', err=BadResponseError)
Expand All @@ -54,7 +54,7 @@ async def negotiate(self, **kwargs):
port = kwargs.get('port', 80)

await self._proxy.send(struct.pack('>8BH', 5, 1, 0, 1, *bip, port))
resp = await self._proxy.recv(10, one_chunk=True)
resp = await self._proxy.recv(10)

if resp[0] != 0x05 or resp[1] != 0x00:
self._proxy.log('Failed (invalid data)', err=BadResponseError)
Expand All @@ -73,7 +73,7 @@ async def negotiate(self, **kwargs):
port = kwargs.get('port', 80)

await self._proxy.send(struct.pack('>2BH5B', 4, 1, port, *bip, 0))
resp = await self._proxy.recv(8, one_chunk=True)
resp = await self._proxy.recv(8)

if resp[0] != 0x00 or resp[1] != 0x5A:
self._proxy.log('Failed (invalid data)', err=BadResponseError)
Expand All @@ -90,7 +90,7 @@ class Connect80Ngtr(BaseNegotiator):

async def negotiate(self, **kwargs):
await self._proxy.send(_CONNECT_request(kwargs.get('host'), 80))
resp = await self._proxy.recv(one_chunk=True)
resp = await self._proxy.recv(status_only=True)
code = get_status_code(resp)
if code != 200:
self._proxy.log('Connect: failed. HTTP status: %s' % code,
Expand All @@ -105,7 +105,7 @@ class Connect25Ngtr(BaseNegotiator):

async def negotiate(self, **kwargs):
await self._proxy.send(_CONNECT_request(kwargs.get('host'), 25))
resp = await self._proxy.recv(one_chunk=True)
resp = await self._proxy.recv(status_only=True)
code = get_status_code(resp)
if code != 200:
self._proxy.log('Connect: failed. HTTP status: %s' % code,
Expand All @@ -120,7 +120,7 @@ class HttpsNgtr(BaseNegotiator):

async def negotiate(self, **kwargs):
await self._proxy.send(_CONNECT_request(kwargs.get('host'), 443))
resp = await self._proxy.recv(one_chunk=True)
resp = await self._proxy.recv(status_only=True)
code = get_status_code(resp)
if code != 200:
self._proxy.log('Connect: failed. HTTP status: %s' % code,
Expand Down
26 changes: 10 additions & 16 deletions proxybroker/providers.py
Expand Up @@ -14,9 +14,6 @@
from .resolver import Resolver


MAX_CONCURRENT_PROVIDERS = 10


class Provider:
"""Proxy provider.
Expand All @@ -35,8 +32,6 @@ class Provider:
"""

_pattern = IPPortPatternGlobal
# The maximum number of providers that are parsed concurrently
_sem_providers = asyncio.Semaphore(MAX_CONCURRENT_PROVIDERS)

def __init__(self, url=None, proto=(), max_conn=4,
max_tries=3, timeout=20, loop=None):
Expand Down Expand Up @@ -79,20 +74,19 @@ async def get_proxies(self):
:return: :attr:`.proxies`
"""
with (await self._sem_providers):
log.debug('Try to get proxies from %s' % self.domain)
log.debug('Try to get proxies from %s' % self.domain)

await self._start_new_session()
if not self._session:
return []
await self._start_new_session()
if not self._session:
return []

try:
await self._pipe()
finally:
self._session.close()
try:
await self._pipe()
finally:
self._session.close()

log.debug('%d proxies received from %s: %s' % (
len(self.proxies), self.domain, self.proxies))
log.debug('%d proxies received from %s: %s' % (
len(self.proxies), self.domain, self.proxies))
return self.proxies

async def _start_new_session(self):
Expand Down
41 changes: 33 additions & 8 deletions proxybroker/proxy.py
Expand Up @@ -5,7 +5,7 @@
from collections import Counter

from .errors import *
from .utils import log
from .utils import log, parse_headers
from .resolver import Resolver
from .negotiators import NGTRS

Expand Down Expand Up @@ -262,6 +262,8 @@ async def connect(self, ssl=False):
self.log(msg, stime, err=err)

def close(self):
if self._closed:
return
self._closed = True
if self.writer:
# try:
Expand All @@ -287,16 +289,12 @@ async def send(self, req):
finally:
self.log('Request: %s%s' % (req, msg), err=err)

async def recv(self, length=65536, one_chunk=False):
async def recv(self, length=0, status_only=False):
resp, msg, err = b'', '', None
stime = time.time()
try:
while not self.reader.at_eof() and len(resp) < length:
data = await asyncio.wait_for(
self.reader.read(length), timeout=self._timeout)
resp += data
if not data or one_chunk:
break
resp = await asyncio.wait_for(
self._recv(length, status_only), timeout=self._timeout)
except asyncio.TimeoutError:
msg = 'Received: timeout'
err = ProxyTimeoutError(msg)
Expand All @@ -315,3 +313,30 @@ async def recv(self, length=65536, one_chunk=False):
msg += ': %s' % resp[:12]
self.log(msg, stime, err=err)
return resp

async def _recv(self, length=0, status_only=False):
resp = b''
if length:
try:
resp = await self.reader.readexactly(length)
except asyncio.IncompleteReadError as e:
resp = e.partial
else:
body_size, body_recv, chunked = 0, 0, None
while not self.reader.at_eof():
line = await self.reader.readline()
resp += line
if body_size:
body_recv += len(line)
if body_recv >= body_size:
break
elif chunked and line == b'0\r\n':
break
elif not body_size and line == b'\r\n':
if status_only:
break
headers = parse_headers(resp)
body_size = int(headers.get('Content-Length', 0))
if not body_size:
chunked = headers.get('Transfer-Encoding') == 'chunked'
return resp
2 changes: 1 addition & 1 deletion setup.py
Expand Up @@ -13,7 +13,7 @@

REQUIRES = ['aiodns', 'aiohttp', 'maxminddb']
PACKAGES = ['proxybroker', 'proxybroker.data']
PACKAGE_DATA = {'': ['LICENSE'], INFO['package']: ['data/*.gif', 'data/*.mmdb']}
PACKAGE_DATA = {'': ['LICENSE'], INFO['package']: ['data/*.mmdb']}

setup(
name=INFO['package'],
Expand Down

0 comments on commit 16e7a07

Please sign in to comment.