Skip to content

Commit

Permalink
Implement WIP FTP file writing.
Browse files Browse the repository at this point in the history
  • Loading branch information
chfoo committed Oct 9, 2014
1 parent aaeded1 commit bf8b1b9
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 24 deletions.
29 changes: 29 additions & 0 deletions wpull/app_test.py
Expand Up @@ -1065,6 +1065,35 @@ def test_basic(self):
self.assertEqual(0, exit_code)
self.assertEqual(0, builder.factory['Statistics'].files)

@wpull.testing.async.async_test(timeout=DEFAULT_TIMEOUT)
def test_args(self):
arg_parser = AppArgumentParser()
args = arg_parser.parse_args([
self.get_url('/'),
self.get_url('/no_exist'),
'-r',
'--no-remove-listing',
'--level', '1',
'--tries', '1',
'--wait', '0',
'--no-host-directories',
])
builder = Builder(args)

with cd_tempdir():
app = builder.build()
exit_code = yield From(app.run())

self.assertEqual(8, exit_code)
self.assertEqual(4, builder.factory['Statistics'].files)

print(os.listdir())

self.assertTrue(os.path.exists('.listing'))
self.assertTrue(os.path.exists('example.txt'))
self.assertTrue(os.path.exists('example1/.listing'))
self.assertTrue(os.path.exists('example2/.listing'))


@trollius.coroutine
def tornado_future_adapter(future):
Expand Down
7 changes: 5 additions & 2 deletions wpull/ftp/command.py
Expand Up @@ -41,10 +41,13 @@ def reconnect(self):
Coroutine.
'''
# FIXME: implement some states so we don't need to close all the time
self._control_stream.close()

if self._control_stream.closed():
yield From(self._control_stream.reconnect())
else:
yield From(self._control_stream.write_command(Command('REIN')))
# else:
# yield From(self._control_stream.write_command(Command('REIN')))

reply = yield From(self._control_stream.read_reply())

Expand Down
29 changes: 22 additions & 7 deletions wpull/processor/ftp.py
Expand Up @@ -8,7 +8,7 @@

from wpull.backport.logging import BraceMessage as __
from wpull.body import Body
from wpull.errors import NetworkError, ProtocolError
from wpull.errors import NetworkError, ProtocolError, ServerError
from wpull.ftp.request import Request, ListingResponse, Response
from wpull.processor.base import BaseProcessor, BaseProcessorSession
from wpull.processor.rule import ResultRule, FetchRule
Expand Down Expand Up @@ -135,9 +135,9 @@ def process(self):
return

request = Request(self._url_item.url_info.url) # TODO: dependency inject
body = Body(directory=self._processor.root_path, hint='ftp_resp')
self._file_writer_session.process_request(request)

yield From(self._fetch(request, body))
yield From(self._fetch(request))

wait_time = self._result_rule.get_wait_time()

Expand All @@ -146,24 +146,39 @@ def process(self):
yield From(trollius.sleep(wait_time))

@trollius.coroutine
def _fetch(self, request, body):
def _fetch(self, request):
'''Fetch the request
Coroutine.
'''
_logger.info(_('Fetching ‘{url}’.').format(url=request.url))

response = None

def response_callback(dummy, callback_response):
nonlocal response
response = callback_response

self._file_writer_session.process_response(response)

if not response.body:
response.body = Body(directory=self._processor.root_path,
hint='resp_cb')

return response.body

try:
with self._processor.ftp_client.session() as session:
if not request.url_info.path.endswith('/'):
response = yield From(session.fetch(request, body))
response = yield From(session.fetch(request, callback=response_callback))
else:
response = yield From(session.fetch_file_listing(request, body))
except (NetworkError, ProtocolError) as error:
response = yield From(session.fetch_file_listing(request, callback=response_callback))
except (NetworkError, ProtocolError, ServerError) as error:
self._log_error(request, error)

action = self._result_rule.handle_error(
request, error, self._url_item)
_logger.debug(str(self._result_rule._statistics.errors))

if response:
response.body.close()
Expand Down
2 changes: 1 addition & 1 deletion wpull/processor/rule.py
Expand Up @@ -264,7 +264,7 @@ def handle_error(self, request, error, url_item):
Returns:
str: A value from :class:`.hook.Actions`.
'''
self._statistics.errors[type(error)] += 1
self._statistics.increment_error(error)
self._waiter.increment()

action = self.consult_error_hook(request, url_item.url_record, error)
Expand Down
31 changes: 17 additions & 14 deletions wpull/writer.py
Expand Up @@ -39,15 +39,14 @@ def process_request(self, request):
'''Rewrite the request if needed.
Args:
request: :class:`.http.request.Request`
request: :class:`.abstract.request.Request`
This function is called by a Processor after it has created the
Request, but before submitting it to the Engine.
Request, but before submitting it to a Client.
Returns:
The original Request or a modified Request
'''
pass

@abc.abstractmethod
def process_response(self, response):
Expand All @@ -56,7 +55,6 @@ def process_response(self, response):
This function is called by a Processor before any response or error
handling is done.
'''
pass

@abc.abstractmethod
def save_document(self, response):
Expand All @@ -68,7 +66,6 @@ def save_document(self, response):
Returns:
str: The filename of the document.
'''
pass

@abc.abstractmethod
def discard_document(self, response):
Expand All @@ -77,7 +74,6 @@ def discard_document(self, response):
This function is called by a Processor once the Processor deemed
the document should be deleted (i.e., a "404 Not Found" response).
'''
pass

@abc.abstractmethod
def extra_resource_path(self, suffix):
Expand All @@ -86,7 +82,6 @@ def extra_resource_path(self, suffix):
Returns:
str, None
'''
pass


class BaseFileWriter(BaseWriter):
Expand Down Expand Up @@ -234,12 +229,15 @@ def process_response(self, response):
if not self._filename:
return

code = response.status_code

if self._file_continuing:
self._process_file_continue_response(response)
elif 200 <= code <= 299 or 400 <= code:
if response.request.url_info.scheme == 'ftp':
self.open_file(self._filename, response)
else:
code = response.status_code

if self._file_continuing:
self._process_file_continue_response(response)
elif 200 <= code <= 299 or 400 <= code:
self.open_file(self._filename, response)

def _process_file_continue_response(self, response):
'''Process a partial content response.'''
Expand All @@ -257,7 +255,8 @@ def save_document(self, response):
if self._headers_included:
self.save_headers(self._filename, response)

if self._local_timestamping:
if self._local_timestamping and \
response.request.url_info.scheme != 'ftp':
self.set_timestamp(self._filename, response)

return self._filename
Expand Down Expand Up @@ -432,7 +431,11 @@ def get_filename(self, url_info):

parts.extend(dir_parts)

parts.append(url_to_filename(url, self._index, alt_char=alt_char))
parts.append(url_to_filename(
url,
'.listing' if url_info.scheme == 'ftp' else self._index,
alt_char=alt_char
))

parts = [
safe_filename(
Expand Down

0 comments on commit bf8b1b9

Please sign in to comment.