# 爬虫并发下载

## 保存在内存

In [None]:
# 导入枚举
from enum import Enum, unique
from queue import Queue
from random import random
from time import sleep
from threading import Thread, current_thread
from urllib.parse import urlparse

import requests
from bs4 import BeautifulSoup


def decode_page(page_bytes, charsets=('utf-8',)):
    page_html = None
    for charset in charsets:
        try:
            page_html = page_bytes.decode(charset)
            break
        except UnicodeDecodeError:
            pass
            # logging.error('[Decode]', err)
    return page_html


class Retry(object):

    def __init__(self, *, retry_times=3, wait_secs=1, errors=(Exception, )):
        """

        :param retry_times: 重试次数
        :param wait_secs: 最少等待时间
        :param errors: 抓取的错误列表
        """
        self.retry_times = retry_times
        self.wait_secs = wait_secs
        self.errors = errors

    # 可以将对象变成像函数一样可以调用
    def __call__(self, fn):

        def wrapper(*args, **kwargs):
            for _ in range(self.retry_times):
                try:
                    return fn(*args, **kwargs)
                # except后面可以跟元组
                except self.errors as e:
                    # logging.error(e)
                    # logging.info('[Retry]')
                    print(e)
                    sleep((random() + 1) * self.wait_secs)

            return None

        return wrapper


#  枚举的作用是定义常量，而且可以提供一些方法来对这些常量验证
#  保证里面没有重复值
@unique
class SpiderStatus(Enum):
    IDLE = 0
    WORKING = 1


class Spider(object):

    def __init__(self):
        self.status = SpiderStatus.IDLE

    # 重试的包装器
    @Retry()
    def fetch(self, current_url, *, charsets=('utf-8', ), user_agent=None, proxies=None):
        """

        :param current_url: 传入一个url
        :return: 一个html页面
        """
        thread_name = current_thread().name
        print(f'[{thread_name} Fetch]:{current_url}')
        headers = {'user-agent': user_agent} if user_agent else {}
        resp = requests.get(current_url,
                            headers=headers, proxies=proxies)

        return decode_page(resp.content, charsets) if resp.status_code == 200 else None

    def parse(self, html_page, *, domain='m.sohu.com'):
        """

        :param html_page: 传入一个html页面
        :return:返回一个新的url列表
        """
        soup = BeautifulSoup(html_page, 'lxml')
        url_links= []
        for a_tag in soup.body.select('a[href]'):
            # 解析url, 可以将url分解
            parser = urlparse(a_tag.attrs['href'])
            # 提取域名
            netloc = parser.netloc or domain
            # 提取协议
            scheme = parser.scheme or 'http'
            if scheme != 'javascript' and netloc == domain:
                # 提取路由
                path = parser.path
                # 提取参数
                query = '?' + parser.query if parser.query else ''
                # 新的格式化字符串的方式，可以直接将变量传进去
                full_url = f'{scheme}://{netloc}{path}{query}'
                if full_url not in visited_urls:
                    url_links.append(full_url)

        return url_links

    def extract(self, html_page):
        """

        :param html_page: 传入一个html页面
        :return: 返回提取到的数据
        """
        pass

    def store(self, data_dict):
        """

        :param data_dict: 传入数据
        :return:
        """
        pass


class SpiderThread(Thread):

    def __init__(self, name, spider, tasks_queue):
        """

        :param spider: 爬虫
        :param tasks_queue: 需要执行的任务
        """
        super().__init__(name=name, daemon=True)
        self.spider = spider
        self.tasks_queue = tasks_queue

    def run(self):
        while True:
            # 如果队列没东西时，会阻塞在这里，而且可以设置等待时间
            current_url = self.tasks_queue.get()
            visited_urls.add(current_url)
            self.spider.status = SpiderStatus.WORKING
            # 抓取页面
            html_page = self.spider.fetch(current_url)
            if html_page not in [None, '']:
                # 解析页面，提取其中的url
                url_links = self.spider.parse(html_page)
                for url_link in url_links:
                    self.tasks_queue.put(url_link)

            self.spider.status = SpiderStatus.IDLE


def is_any_alive(spider_threads):
    """

    :param spider_threads: 传入所有线程
    :return: 如果所有工作都完成，返回false, 如果有一个没结束，返回True
    """
    # 只要有任何一个结果是true 结果就是true 与all正好相反
    return any([spider_thread.spider.status == SpiderStatus.WORKING
                for spider_thread in spider_threads])

visited_urls = set()


def main():
    #  用队列是因为队列有锁，保证线程安全，
    #  而且可以设置最大长度，可以控制内存的
    #  先进先出，后进后出
    task_queue = Queue()
    # 将种子url放进队列， 并放在尾部
    task_queue.put('http://m.sohu.com/')
    #  创建一个线程队列
    spider_threads = [SpiderThread('t%d' % i, Spider(), task_queue) for i in range(10)]
    #  启动线程
    for spider_thread in spider_threads:
        spider_thread.start()
    #  如果队列内不为空， 或者还有任务没完成
    while not task_queue.empty() or is_any_alive(spider_threads):
        pass

    print('Over!')


if __name__ == '__main__':
    main()

## 保存在redis和mongodb中

In [None]:
# 导入枚举
from enum import Enum, unique
from hashlib import sha1
from random import random
from time import sleep
from threading import Thread, current_thread
from urllib.parse import urlparse

import pymongo
import requests
import redis
from bs4 import BeautifulSoup


def decode_page(page_bytes, charsets=('utf-8',)):
    page_html = None
    for charset in charsets:
        try:
            page_html = page_bytes.decode(charset)
            break
        except UnicodeDecodeError:
            pass
            # logging.error('[Decode]', err)
    return page_html


class Retry(object):

    def __init__(self, *, retry_times=3, wait_secs=1, errors=(Exception, )):
        """

        :param retry_times: 重试次数
        :param wait_secs: 最少等待时间
        :param errors: 抓取的错误列表
        """
        self.retry_times = retry_times
        self.wait_secs = wait_secs
        self.errors = errors

    # 可以将对象变成像函数一样可以调用
    def __call__(self, fn):

        def wrapper(*args, **kwargs):
            for _ in range(self.retry_times):
                try:
                    return fn(*args, **kwargs)
                # except后面可以跟元组
                except self.errors as e:
                    # logging.error(e)
                    # logging.info('[Retry]')
                    print(e)
                    sleep((random() + 1) * self.wait_secs)

            return None

        return wrapper


#  枚举的作用是定义常量，而且可以提供一些方法来对这些常量验证
#  保证里面没有重复值
@unique
class SpiderStatus(Enum):
    IDLE = 0
    WORKING = 1


class Spider(object):

    def __init__(self):
        self.status = SpiderStatus.IDLE

    # 重试的包装器
    @Retry()
    def fetch(self, current_url, *, charsets=('utf-8', ), user_agent=None, proxies=None):
        """

        :param current_url: 传入一个url
        :return: 一个html页面
        """
        thread_name = current_thread().name
        print(f'[{thread_name} Fetch]:{current_url}')
        headers = {'user-agent': user_agent} if user_agent else {}
        resp = requests.get(current_url,
                            headers=headers, proxies=proxies)

        return decode_page(resp.content, charsets) if resp.status_code == 200 else None

    def parse(self, html_page, *, domain='m.sohu.com'):
        """

        :param html_page: 传入一个html页面
        :return:返回一个新的url列表
        """
        soup = BeautifulSoup(html_page, 'lxml')
        url_links= []
        for a_tag in soup.body.select('a[href]'):
            # 解析url, 可以将url分解
            parser = urlparse(a_tag.attrs['href'])
            # 提取域名
            netloc = parser.netloc or domain
            # 提取协议
            scheme = parser.scheme or 'http'
            if scheme != 'javascript' and netloc == domain:
                # 提取路由
                path = parser.path
                # 提取参数
                query = '?' + parser.query if parser.query else ''
                # 新的格式化字符串的方式，可以直接将变量传进去
                full_url = f'{scheme}://{netloc}{path}{query}'
                if not redis_client.sismember('visited_urls', full_url) :
                    redis_client.rpush('m_sohu_task', full_url)

    def extract(self, html_page):
        """

        :param html_page: 传入一个html页面
        :return: 返回提取到的数据
        """
        pass

    def store(self, data_dict):
        """

        :param data_dict: 传入数据
        :return:
        """
        pass


class SpiderThread(Thread):

    def __init__(self, name, spider):
        """

        :param spider: 爬虫
        :param tasks_queue: 需要执行的任务
        """
        super().__init__(name=name, daemon=True)
        self.spider = spider

    def run(self):
        while True:

            # 如果队列没东西时，会阻塞在这里，而且可以设置等待时间
            current_url = redis_client.lpop('m_sohu_task')
            # 抓取不到继续抓取直到抓到为止
            while not current_url:
                current_url = redis_client.lpop('m_sohu_task')
            self.spider.status = SpiderStatus.WORKING
            current_url = current_url.decode('utf-8')
            if not redis_client.sismember('visited_urls', current_url):
                redis_client.sadd('visited_urls', current_url)
                # 抓取页面
                html_page = self.spider.fetch(current_url)
                if html_page not in [None, '']:
                    hasher = hasher_proto.copy()
                    hasher.update(current_url.encode('utf-8'))
                    doc_id = hasher.hexdigest()
                    #  find 和 findone 的区别
                    # find如果找不到数据，返回一个游标
                    # findone如果找不到数据，返回False
                    if sohu_data_coll.find({'_id': doc_id}).count() == 0:
                        print(sohu_data_coll)
                        sohu_data_coll.insert_one(
                            {'_id':doc_id,
                             'url':current_url,
                             'page': html_page})

                    # 如果往mongo中放二进制数据,需要用下面的方法包装
                    # from bson import Binary
                    # bson这个包千万不要自己下，而是要下mongo时自带下载，不然mongo操作中会包莫名错误
                    # Binary()
                    # 解析页面，提取其中的url
                    self.spider.parse(html_page)

                self.spider.status = SpiderStatus.IDLE


def is_any_alive(spider_threads):
    """

    :param spider_threads: 传入所有线程
    :return: 如果所有工作都完成，返回false, 如果有一个没结束，返回True
    """
    # 只要有任何一个结果是true 结果就是true 与all正好相反
    return any([spider_thread.spider.status == SpiderStatus.WORKING
                for spider_thread in spider_threads])


redis_client = redis.Redis(host='47.98.172.171',
                               port=11223, password='544619417wxz')

mongo_client = pymongo.MongoClient(host='47.98.172.171', port=27017)
db = mongo_client.msohu
sohu_data_coll = db.webpages
hasher_proto = sha1()


def main():
    print(mongo_client)
    # 判断redis是否有m_sohu_task这个键
    if not redis_client.exists('m_sohu_task'):
        # 将种子url放进队列， 并放在尾部
        redis_client.rpush('m_sohu_task', 'http://m.sohu.com/')
    else:
        pass
    #  创建一个线程队列
    # redis本身是线程安全的是，单线程异步io模式
    spider_threads = [SpiderThread('t%d' % i, Spider()) for i in range(10)]
    #  启动线程
    for spider_thread in spider_threads:
        spider_thread.start()
    #  如果redis中m_sohu_task长度是否为0， 或者还有任务没完成
    while redis_client.exists('m_sohu_task') or is_any_alive(spider_threads):
        pass

    print('Over!')


if __name__ == '__main__':
    main()