From f3cdc15370aea5d85eeab35600d89c0749e71374 Mon Sep 17 00:00:00 2001 From: Cebtenzzre Date: Sat, 15 Dec 2018 11:57:52 -0500 Subject: [PATCH] tumblr_backup: --save-notes Included revisions: - Remove log_queue, better status and account logic - Better tracking and synchronization on ThreadPool.queue.qsize - Remove remaining_posts - Remove getting_tup - Put back the account parameter - Make typing optional Fixes #169 --- note_scraper.py | 240 +++++++++++++++++++++++++++++++++++++++++++++++ tumblr_backup.py | 214 ++++++++++++++++++++++++++++++++---------- util.py | 48 ++++++++++ 3 files changed, 453 insertions(+), 49 deletions(-) create mode 100644 note_scraper.py diff --git a/note_scraper.py b/note_scraper.py new file mode 100644 index 0000000..59eb5c4 --- /dev/null +++ b/note_scraper.py @@ -0,0 +1,240 @@ +# -*- coding: utf-8 -*- + +from __future__ import absolute_import, division, print_function, with_statement + +import contextlib +import io +import itertools +import re +import ssl +import sys +import time +import traceback + +from bs4 import BeautifulSoup + +from util import HAVE_SSL_CTX, HTTP_TIMEOUT, to_bytes, to_native_str + +try: + from typing import TYPE_CHECKING +except ImportError: + TYPE_CHECKING = False + +if TYPE_CHECKING: + from typing import List, Text + +try: + from http.client import HTTPException +except ImportError: + from httplib import HTTPException # type: ignore[no-redef] + +try: + from http.cookiejar import MozillaCookieJar +except ImportError: + from cookielib import MozillaCookieJar # type: ignore[no-redef] + +try: + from urllib.error import HTTPError, URLError + from urllib.parse import quote, urlparse, urlsplit, urlunsplit + from urllib.request import BaseHandler, HTTPCookieProcessor, HTTPSHandler, build_opener +except ImportError: + from urllib import quote # type: ignore[attr-defined,no-redef] + from urllib2 import (BaseHandler, HTTPCookieProcessor, HTTPSHandler, HTTPError, URLError, # type: ignore[no-redef] + build_opener) + from urlparse import urlparse, urlsplit, urlunsplit # type: ignore[no-redef] + +EXIT_SUCCESS = 0 +EXIT_SAFE_MODE = 2 + + +def log(url, msg): + global msg_pipe + url_msg = ", URL '{}'".format(url) if url != post_url else '' + print('[Note Scraper] Post {}{}: {}'.format(ident, url_msg, msg), file=msg_pipe) + + +class WebCrawler(object): + + # Python 2.x urllib.always_safe is private in Python 3.x; its content is copied here + _ALWAYS_SAFE_BYTES = (b'ABCDEFGHIJKLMNOPQRSTUVWXYZ' + b'abcdefghijklmnopqrstuvwxyz' + b'0123456789' b'_.-') + + _reserved = b';/?:@&=+$|,#' # RFC 3986 (Generic Syntax) + _unreserved_marks = b"-_.!~*'()" # RFC 3986 sec 2.3 + _safe_chars = _ALWAYS_SAFE_BYTES + b'%' + _reserved + _unreserved_marks + + TRY_LIMIT = 2 # For code 429, only give it one extra try + + def __init__(self, noverify, cookiefile, notes_limit): + self.notes_limit = notes_limit + self.lasturl = None + + handlers = [] # type: List[BaseHandler] # pytype: disable=invalid-annotation + if HAVE_SSL_CTX: + context = ssl._create_unverified_context() if noverify else ssl.create_default_context() + handlers.append(HTTPSHandler(context=context)) + if cookiefile: + cookies = MozillaCookieJar(cookiefile) + cookies.load() + + # Session cookies are denoted by either `expires` field set to an empty string or 0. MozillaCookieJar only + # recognizes the former (see https://bugs.python.org/issue17164). + for cookie in cookies: + if cookie.expires == 0: + cookie.expires = None + cookie.discard = True + + handlers.append(HTTPCookieProcessor(cookies)) + + self.opener = build_opener(*handlers) + + @classmethod + def quote_unsafe(cls, string): + return quote(to_bytes(string), cls._safe_chars) + + # Based on w3lib.safe_url_string + @classmethod + def iri_to_uri(cls, iri): + parts = urlsplit(iri) + + # IDNA encoding can fail for too long labels (>63 characters) or missing labels (e.g. http://.example.com) + try: + netloc = parts.netloc.encode('idna').decode('ascii') + except UnicodeError: + netloc = parts.netloc + + return urlunsplit(tuple(itertools.chain( + (to_native_str(parts.scheme), to_native_str(netloc).rstrip(':')), + (cls.quote_unsafe(getattr(parts, p)) for p in ('path', 'query', 'fragment')), + ))) + + def ratelimit_sleep(self, headers): + rlr = headers.get('X-Rate-Limit-Reset') + if rlr is None: + return False + + try: + irlr = int(rlr) + except ValueError: + log(self.lasturl, "Expected integer X-Rate-Limit-Reset, got '{}'".format(rlr)) + return False + + now = time.time() + if irlr < now: + log(self.lasturl, 'Warning: X-Rate-Limit-Reset is {} seconds in the past'.format(now - irlr)) + return True + + sleep_dur = irlr - now + if sleep_dur > 20 * 60: + log(self.lasturl, 'Refusing to sleep for {} minutes, giving up'.format(round(sleep_dur / 60))) + return False + + log(self.lasturl, 'Rate limited, sleeping for {} seconds as requested'.format(round(sleep_dur))) + time.sleep(sleep_dur) + return True + + def urlopen(self, iri): + self.lasturl = iri + uri = self.iri_to_uri(iri) + try_count = 0 + while True: + try: + with contextlib.closing(self.opener.open(uri, timeout=HTTP_TIMEOUT)) as resp: + try_count += 1 + parsed_uri = urlparse(resp.geturl()) + if re.match(r'(www\.)?tumblr\.com', parsed_uri.netloc) and parsed_uri.path == '/safe-mode': + sys.exit(EXIT_SAFE_MODE) + return resp.read().decode('utf-8', errors='ignore') + except HTTPError as e: + if e.code == 429 and try_count < self.TRY_LIMIT and self.ratelimit_sleep(e.headers): + continue + raise + + @staticmethod + def get_more_link(soup, base, notes_url): + global ident + element = soup.find('a', class_='more_notes_link') + if not element: + return None + onclick = element.get_attribute_list('onclick')[0] + if not onclick: + log(notes_url, 'No onclick attribute, probably a dashboard-only blog') + return None + match = re.search(r";tumblrReq\.open\('GET','([^']+)'", onclick) + if not match: + log(notes_url, 'tumblrReq regex failed, did Tumblr update?') + return None + path = match.group(1) + if not path.startswith('/'): + path = '/' + path + return base + path + + @staticmethod + def append_notes(soup, notes_list, notes_url): + notes = soup.find('ol', class_='notes') + if notes is None: + log(notes_url, 'Response HTML does not have a notes list') + return False + notes = notes.find_all('li') + for n in reversed(notes): + if 'more_notes_link_container' not in n.get('class', []): + notes_list.append(n.prettify()) + return True + + def get_notes(self, post_url): + parsed_uri = urlparse(post_url) + base = '{uri.scheme}://{uri.netloc}'.format(uri=parsed_uri) + + notes_10k = 0 + notes_list = [] # type: List[Text] + + notes_url = post_url + while True: + resp_str = self.urlopen(notes_url) + if resp_str is None: + break + + soup = BeautifulSoup(resp_str, 'lxml') + if not self.append_notes(soup, notes_list, notes_url): + break + + old_notes_url, notes_url = notes_url, self.get_more_link(soup, base, notes_url) + if (not notes_url) or notes_url == old_notes_url: + break + + if len(notes_list) > (notes_10k + 1) * 10000: + notes_10k += 1 + log(notes_url, 'Note: {} notes retrieved so far'.format(notes_10k * 10000)) + if self.notes_limit != 0 and len(notes_list) > self.notes_limit: + log(notes_url, 'Warning: Reached notes limit, stopping early.') + break + + return u''.join(notes_list) + + +if __name__ == '__main__': + post_url, ident, noverify, notes_limit, cookiefile, msg_fd = sys.argv[1:] + + with io.open(int(msg_fd), 'w') as msg_pipe: + crawler = WebCrawler(bool(int(noverify)), cookiefile, int(notes_limit)) + + try: + notes = crawler.get_notes(post_url) + except KeyboardInterrupt: + sys.exit() # Ignore these so they don't propogate into the parent + except HTTPError as e: + log(crawler.lasturl, 'HTTP Error {} {}'.format(e.code, e.reason)) + sys.exit() + except URLError as e: + log(crawler.lasturl, 'URL Error: {}'.format(e.reason)) + sys.exit() + except HTTPException as e: + log(crawler.lasturl, 'HTTP Exception: {}'.format(e)) + sys.exit() + except Exception: + log(crawler.lasturl, 'Caught an exception') + traceback.print_exc(file=msg_pipe) + sys.exit() + + print(notes, end=u'') diff --git a/tumblr_backup.py b/tumblr_backup.py index 798892b..c4273b6 100755 --- a/tumblr_backup.py +++ b/tumblr_backup.py @@ -5,11 +5,13 @@ # standard Python library imports import errno +import fcntl import hashlib import imghdr import io import locale import os +import py_compile import re import ssl import sys @@ -21,7 +23,7 @@ from os.path import join, split, splitext from xml.sax.saxutils import escape -from util import to_bytes, to_unicode +from util import HAVE_SSL_CTX, HTTP_TIMEOUT, LockedQueue, PY3, to_bytes, to_unicode try: from typing import TYPE_CHECKING @@ -49,6 +51,14 @@ except ImportError: import Queue as queue # type: ignore[no-redef] +if PY3: + import subprocess +else: + try: + import subprocess32 as subprocess # pytype: disable=import-error + except ImportError: + subprocess = None + try: from urllib.request import urlopen from urllib.parse import urlencode, urlparse, quote @@ -76,6 +86,11 @@ except ImportError: youtube_dl = None +try: + import bs4 +except ImportError: + bs4 = None + # These builtins have new names in Python 3 try: long, xrange # type: ignore[has-type] @@ -105,6 +120,8 @@ def test_jpg(h, f): imghdr.tests.append(test_jpg) +script_dir = os.path.dirname(os.path.realpath(__file__)) + # variable directory names, will be set in TumblrBackup.backup() save_folder = '' media_folder = '' @@ -133,7 +150,6 @@ def test_jpg(h, f): MAX_POSTS = 50 -HTTP_TIMEOUT = 90 HTTP_CHUNK_SIZE = 1024 * 1024 # get your own API key at https://www.tumblr.com/oauth/apps @@ -148,8 +164,7 @@ def test_jpg(h, f): TIME_ENCODING = locale.getlocale(locale.LC_TIME)[1] or FILE_ENCODING -have_ssl_ctx = sys.version_info >= (2, 7, 9) -if have_ssl_ctx: +if HAVE_SSL_CTX: ssl_ctx = ssl.create_default_context() def tb_urlopen(url): return urlopen(url, timeout=HTTP_TIMEOUT, context=ssl_ctx) @@ -157,24 +172,48 @@ def tb_urlopen(url): def tb_urlopen(url): return urlopen(url, timeout=HTTP_TIMEOUT) +# Guards open fds without O_CLOEXEC to avoid leaking them +cloexec_lock = threading.Lock() +disable_note_scraper = set() # type: Set[str] +disablens_lock = threading.Lock() -def log(msg, account=None): - if options.quiet: - return - # Separate terminator - it = (i for i, c in enumerate(reversed(msg)) if c not in '\r\n') - try: - idx = len(msg) - next(it) - except StopIteration: - idx = 0 - msg, term = msg[:idx], msg[idx:] +class Logger(object): + def __init__(self): + self.lock = threading.Lock() + self.backup_account = None # type: Optional[str] + self.status_msg = None # type: Optional[str] + + def __call__(self, msg, account=False): + with self.lock: + for line in msg.splitlines(True): + self._print(line, account) + self._print(self.status_msg, account=True) + sys.stdout.flush() + + def status(self, msg): + self.status_msg = msg + self('') + + def _print(self, msg, account=False): + if options.quiet: + return + if account: # Optional account prefix + msg = '{}: {}'.format(self.backup_account, msg) + + # Separate terminator + it = (i for i, c in enumerate(reversed(msg)) if c not in '\r\n') + try: + idx = len(msg) - next(it) + except StopIteration: + idx = 0 + msg, term = msg[:idx], msg[idx:] + + pad = ' ' * (80 - len(msg)) # Pad to 80 chars + print(msg + pad + term, end='') + - if account is not None: # Optional account prefix - msg = '{}: {}'.format(account, msg) - msg += ''.join([' ' for _ in range(80 - len(msg))]) # Pad to 80 chars - print(msg + term, end='') - sys.stdout.flush() +log = Logger() def mkdir(dir, recursive=False): @@ -583,11 +622,11 @@ def backup(self, account): long(splitext(split(f)[1])[0]) for f in glob(path_to(post_dir, '*' + post_ext)) ) - log('Backing up posts after {}\r'.format(ident_max), account) + log.status('Backing up posts after {}\r'.format(ident_max)) except ValueError: # max() arg is an empty sequence pass else: - log('Getting basic information\r', account) + log.status('Getting basic information\r') # start by calling the API with just a single post resp = apiparse(base, 1) @@ -612,7 +651,7 @@ def backup(self, account): TumblrPost.post_header = self.header(body_class='post') # start the thread pool - backup_pool = ThreadPool(account) + backup_pool = ThreadPool() # returns whether any posts from this batch were saved def _backup(posts): @@ -641,6 +680,7 @@ def _backup(posts): continue elif 'trail' in p and p['trail'] and 'is_current_item' not in p['trail'][-1]: continue + backup_pool.add_work(post.save_content) self.post_count += 1 return True @@ -651,7 +691,9 @@ def _backup(posts): i = options.skip while True: # find the upper bound - log('Getting posts {} to {} (of {} expected)\r'.format(i, i + MAX_POSTS - 1, count_estimate), account) + log.status('Getting {}posts {} to {} (of {} expected)\r'.format( + 'liked ' if options.likes else '', i, i + MAX_POSTS - 1, count_estimate, + )) resp = apiparse(base, MAX_POSTS, i) if resp is None: @@ -662,7 +704,7 @@ def _backup(posts): posts = resp[posts_key] # `_backup(posts)` can be empty even when `posts` is not if we don't backup reblogged posts if not posts or not _backup(posts): - log('Backup complete: Found empty set of posts\n', account) + log('Backup complete: Found empty set of posts\n', account=True) break i += MAX_POSTS @@ -676,17 +718,17 @@ def _backup(posts): # postprocessing if not options.blosxom and (self.post_count or options.count == 0): - log('Getting avatar and style\r', account) + log.status('Getting avatar and style\r') get_avatar() get_style() if not have_custom_css: save_style() - log('Building index\r', account) + log.status('Building index\r') ix = Indices(self) ix.build_index() ix.save_index() - log('{} posts backed up\n'.format(self.post_count), account) + log('{} {}posts backed up\n'.format(self.post_count, 'liked ' if options.likes else ''), account=True) self.total_count += self.post_count @@ -698,7 +740,7 @@ def __init__(self, post, backup_account): self.content = '' self.post = post self.backup_account = backup_account - self.json_content = json.dumps(post, sort_keys=True, indent=4, separators=(',', ': ')) + self.json_content = to_unicode(json.dumps(post, sort_keys=True, indent=4, separators=(',', ': '))) self.creator = post['blog_name'] self.ident = str(post['id']) self.url = post['post_url'] @@ -804,8 +846,9 @@ def append_try(elt, fmt=u'%s'): elif self.typ == 'audio': def make_player(src_): - append(u'

' - % (src_, "Your browser does not support the audio element.", src_, "Audio file")) + append(u'

' + .format('Your browser does not support the audio element.', 'Audio file', src=src_)) src = None audio_url = get_try('audio_url') or get_try('audio_source_url') @@ -1019,14 +1062,69 @@ def get_post(self): foot = [] if self.tags: foot.append(u''.join(self.tag_link(t) for t in self.tags)) - if self.note_count: - foot.append(u'%d note%s' % (self.note_count, 's'[self.note_count == 1:])) if self.source_title and self.source_url: foot.append(u'%s' % (self.source_url, self.source_title) ) + + notes_html = u'' + if options.save_notes and self.backup_account not in disable_note_scraper: + with cloexec_lock: + msg_fd_rd, msg_fd_wr = os.pipe() + try: + if sys.version_info[:2] >= (3, 4): + # O_CLOEXEC is default in Python 3.4 and newer, unset it for the child side + os.set_inheritable(msg_fd_wr, True) + else: + # O_CLOEXEC must be set manually in older Python, set it for the parent side + fcntl.fcntl(msg_fd_rd, fcntl.F_SETFD, fcntl.FD_CLOEXEC) + + args = [sys.executable, '-m', 'note_scraper', self.url, self.ident, + str(int(options.no_ssl_verify)), str(options.notes_limit or 0), + options.cookiefile or '', str(msg_fd_wr)] + env = os.environ.copy() + env['PYTHONPATH'] = script_dir + # stdout is captured, stderr goes to our stderr, msg_fd handles informational messages + process = subprocess.Popen(args, stdout=subprocess.PIPE, close_fds=False, env=env) + except: + os.close(msg_fd_rd) + raise + finally: + os.close(msg_fd_wr) + + try: + with io.open(msg_fd_rd) as msg_file: + for line in msg_file: + log(line) + + notes_html = process.communicate()[0].decode('utf-8') + except: + process.terminate() + process.wait() + raise + + if process.returncode == 2: # EXIT_SAFE_MODE + # Safe mode is blocking us, disable note scraping for this blog + notes_html = u'' + with disablens_lock: + # Check if another thread already set this + if self.backup_account not in disable_note_scraper: + disable_note_scraper.add(self.backup_account) + log('[Note Scraper] Blocked by safe mode - scraping disabled for {}\n'.format( + self.backup_account + )) + + notes_str = u'{} note{}'.format(self.note_count, 's'[self.note_count == 1:]) + if notes_html.strip(): + foot.append(u'
{}\n'.format(notes_str)) + foot.append(u'
    ') + foot.append(notes_html) + foot.append(u'
') + else: + foot.append(notes_str) + if foot: - post += u'\n' % u' — '.join(foot) + post += u'\n'.format(u'\n'.join(foot)) post += u'\n\n' return post @@ -1116,17 +1214,16 @@ def get_post(self, in_tag_index): class ThreadPool(object): - def __init__(self, account, thread_count=20, max_queue=1000): - self.account = account - self.queue = queue.Queue(max_queue) # type: Queue[Callable[[], None]] + def __init__(self, thread_count=20, max_queue=1000): + self.queue = LockedQueue(threading.RLock(), max_queue) # type: LockedQueue[Callable[[], None]] self.quit = threading.Event() self.abort = threading.Event() self.threads = [threading.Thread(target=self.handler) for _ in range(thread_count)] for t in self.threads: t.start() - def add_work(self, work): - self.queue.put(work) + def add_work(self, *args, **kwargs): + self.queue.put(*args, **kwargs) def wait(self): self.quit.set() @@ -1135,21 +1232,26 @@ def wait(self): def cancel(self): self.abort.set() for i, t in enumerate(self.threads, start=1): - log('Stopping threads {}{}\r'.format(' ' * i, '.' * (len(self.threads) - i))) + log.status('Stopping threads {}{}\r'.format(' ' * i, '.' * (len(self.threads) - i))) t.join() + with self.queue.mutex: + self.queue.queue.clear() + self.queue.all_tasks_done.notify_all() + def handler(self): while not self.abort.is_set(): - try: - work = self.queue.get(timeout=0.1) - except queue.Empty: - if self.quit.is_set(): - break - continue + with self.queue.mutex: + try: + work = self.queue.get(block=not self.quit.is_set(), timeout=0.1) + except queue.Empty: + if self.quit.is_set(): + break + continue + qsize = self.queue.qsize() - qsize = self.queue.qsize() if self.quit.is_set() and qsize % MAX_POSTS == 0: - log('{} remaining posts to save\r'.format(qsize), self.account) + log.status('{} remaining posts to save\r'.format(qsize)) try: work() @@ -1197,7 +1299,9 @@ def __call__(self, parser, namespace, values, option_string=None): parser.add_argument('--save-video', action='store_true', help='save all video files') parser.add_argument('--save-video-tumblr', action='store_true', help='save only Tumblr video files') parser.add_argument('--save-audio', action='store_true', help='save audio files') - parser.add_argument('--cookiefile', help='cookie file for youtube-dl') + parser.add_argument('--save-notes', action='store_true', help='save a list of notes for each post') + parser.add_argument('--notes-limit', type=int, metavar='COUNT', help='limit requested notes to COUNT, per-post') + parser.add_argument('--cookiefile', help='cookie file for youtube-dl and --save-notes') parser.add_argument('-j', '--json', action='store_true', help='save the original JSON source') parser.add_argument('-b', '--blosxom', action='store_true', help='save the posts in blosxom format') parser.add_argument('-r', '--reverse-month', action='store_false', @@ -1243,7 +1347,7 @@ def __call__(self, parser, namespace, values, option_string=None): if not re.match(r'^\d{4}(\d\d)?(\d\d)?$', options.period): parser.error("Period must be 'y', 'm', 'd' or YYYY[MM[DD]]") set_period() - if have_ssl_ctx and options.no_ssl_verify: + if HAVE_SSL_CTX and options.no_ssl_verify: ssl_ctx = ssl._create_unverified_context() # Otherwise, it's an old Python version without SSL verification, # so this is the default. @@ -1270,6 +1374,17 @@ def __call__(self, parser, namespace, values, option_string=None): parser.error("--save-video: module 'youtube_dl' is not installed") if options.cookiefile is not None and not os.access(options.cookiefile, os.R_OK): parser.error('--cookiefile: file cannot be read') + if options.save_notes: + if not subprocess: + parser.error("--save-notes: Python is older than 3.2 and module 'subprocess32' is not installed") + if not bs4: + parser.error("--save-notes: module 'bs4' is not installed") + py_compile.compile(join(script_dir, 'note_scraper.py')) + if options.notes_limit is not None: + if not options.save_notes: + parser.error('--notes-limit requires --save-notes') + if options.notes_limit < 1: + parser.error('--notes-limit: Value must be at least 1') if not API_KEY: sys.stderr.write('''\ @@ -1280,6 +1395,7 @@ def __call__(self, parser, namespace, values, option_string=None): tb = TumblrBackup() try: for account in blogs: + log.backup_account = account tb.backup(account) except KeyboardInterrupt: sys.exit(EXIT_INTERRUPT) diff --git a/util.py b/util.py index a5e0d98..dba02eb 100644 --- a/util.py +++ b/util.py @@ -1,8 +1,27 @@ # -*- coding: utf-8 -*- +from __future__ import absolute_import, division, print_function, with_statement + import sys +import threading + +try: + from typing import TYPE_CHECKING +except ImportError: + TYPE_CHECKING = False + +if TYPE_CHECKING: + from typing import Generic, TypeVar + +try: + import queue +except ImportError: + import Queue as queue # type: ignore[no-redef] PY3 = sys.version_info[0] >= 3 +HAVE_SSL_CTX = sys.version_info >= (2, 7, 9) + +HTTP_TIMEOUT = 90 def to_unicode(string, encoding='utf-8', errors='strict'): @@ -22,3 +41,32 @@ def to_native_str(string, encoding='utf-8', errors='strict'): return to_unicode(string, encoding, errors) else: return to_bytes(string, encoding, errors) + + +if TYPE_CHECKING: + T = TypeVar('T') + + class GenericQueue(queue.Queue[T], Generic[T]): + pass +else: + T = None + + class FakeGenericMeta(type): + def __getitem__(cls, item): + return cls + + if PY3: + exec("""class GenericQueue(queue.Queue, metaclass=FakeGenericMeta): + pass""") + else: + class GenericQueue(queue.Queue, object): + __metaclass__ = FakeGenericMeta + + +class LockedQueue(GenericQueue[T]): + def __init__(self, lock, maxsize=0): + super(LockedQueue, self).__init__(maxsize) + self.mutex = lock + self.not_empty = threading.Condition(lock) + self.not_full = threading.Condition(lock) + self.all_tasks_done = threading.Condition(lock)