# 并行网页抓取
1. 从多个数据源(多个远程服务器)而不只是一个数据源收集数据;
2. 收集数据的同时，在已收集到的数据上执行时间更长 / 更复杂的操作(例如图像分析或OCR处理)
3. 从大型 Web 服务收集数据，如果你已经付费，或者创建多个连接是使用协议允许的行为。


Python 既支持多进程(multiprocessing)，也支持多线程(multithreading)。多进程和多线程 可以实现相同的目标:同时执行两个编程任务，而不是像传统线性方式那样一次只执行一 个任务。

In [1]:
'''多线程爬取'''
import _thread
import time 

def print_time(threadName,delay,iterations):
    start = int(time.time())
    for i in range(0,iterations):
        time.sleep(delay)
        seconds_elapsed = str(int(time.time()) - start)
        print("{}{}".format(seconds_elapsed,threadName))

'''这个脚本开启了 3 个线程:
    一个线程每 3 秒打印一次“Fizz”
    另一个线程每 5 秒打印一次"Buzz”
    第三个线程每秒打印一次“Counter”
'''
try:
    _thread.start_new_thread(print_time,("Fizz",3,33))
    _thread.start_new_thread(print_time,("Buzz",5,20))
    _thread.start_new_thread(print_time,("Counter",1,100))
except:
    print("Error unable to start thread")
while 1:
    pass 


1Counter
2Counter
3Fizz
3Counter
4Counter
5Buzz
5Counter
6Fizz
6Counter
7Counter
8Counter
9Fizz
9Counter
10Buzz
10Counter
11Counter


KeyboardInterrupt: 

In [1]:
'''多线程爬取网站'''
from os import link
from urllib.request import urlopen
from bs4 import BeautifulSoup
import re 
import random

import _thread
import time 
visited = []

def get_links(thread_name,bs):
    print("getting links in {}".format(thread_name))
    links =  bs.find("div",{"id":"bodyContent"}).find_all("a",href=re.compile('^(/wiki/)((?!:).)*$'))
    return [link for link in links if link not in visited]

# 为线程定义一个函数
def sracpe_article(thread_name,path):
    '''需要注意的是，现在 scrape_article 函数的第一个动作，
    是把当前网页的路径添加到已经 浏览过的路径列表中。
    这会减小抓取两次的可能性，但是不会彻底解决这类问题。'''
    visited.append(path)
    html = urlopen('http://en.wikipedia.org{}'.format(path))
    time.sleep(5)
    bs = BeautifulSoup(html, 'html.parser')
    title = bs.find("h1").get_text()
    print("scraping {} in thread {}".format(title,thread_name) )
    links = get_links(thread_name,bs)
    if len(links)>0:
        newArticle = links[random.randint(0,len(links) -1 )].attrs['href']
        print(newArticle)
        sracpe_article(thread_name,newArticle)

# 创建两个线程
try:
    _thread.start_new_thread(sracpe_article, ('Thread 1', '/wiki/Kevin_Bacon',))
    _thread.start_new_thread(sracpe_article, ('Thread 2', '/wiki/Monty_Python',))
except:
    print("error unable to start thread")
while 1:
    pass 

scraping Kevin Bacon in thread Thread 1
getting links in Thread 1
/wiki/David_Koepp
scraping Monty Python in thread Thread 2
getting links in Thread 2
/wiki/Michael_Palin
scraping David Koepp in thread Thread 1
getting links in Thread 1
/wiki/Empire_(film_magazine)
scraping Michael Palin in thread Thread 2
getting links in Thread 2
/wiki/Marriage_Guidance_Counsellor
scraping Empire (magazine) in thread Thread 1
getting links in Thread 1
/wiki/Empire_Awards
scraping Marriage Guidance Counsellor in thread Thread 2
getting links in Thread 2
/wiki/Michael_Palin
scraping Empire Awards in thread Thread 1
getting links in Thread 1
/wiki/Empire_Inspiration_Award


KeyboardInterrupt: 

In [2]:
'''使用队列数据结构来爬取网站
可以采用较少的数据库线程，每个线程都有独立的连接，从队列来回获取并存储 数据。这样可以实现更加可控的数据库连接
'''
from urllib.request import urlopen
from bs4 import BeautifulSoup
import re 
import random
import _thread
from queue import Queue
import time 
import pymysql


def storage(quene):
    conn = pymysql.connect(host='127.0.0.1', unix_socket='/tmp/mysql.sock',user='root', passwd='2022mysql', db='mysql', charset='utf8')

    cur = conn.cursor()
    cur.execute("use scraping")
    while 1:
        if not quene.empty():
            article = quene.get()
            cur.execute("select * from wiki_pages where path = %s",(article["path"]))
            if cur.rowcount==0:
                print("存储文章 {}".format(article["title"]))
                cur.execute("insert into wiki_pages (title,path) values(%s,%s)",(article['title'],article['path']))
                conn.commit()
            else:
                print("文章已经存在{}".format(article['title']))


visited = []
def getlinks(thread_name,bs):
    print("获取链接{}".format(thread_name))
    links = bs.find('div', {'id':'bodyContent'}).find_all('a',href=re.compile('^(/wiki/)((?!:).)*$'))

    return [link for link in links if link not in visited]

def scrape_article(thread_name,path,queue):
    visited.append(path)
    html = urlopen('http://en.wikipedia.org{}'.format(path))
    time.sleep(5)
    bs = BeautifulSoup(html, 'html.parser')

    title = bs.find('h1').get_text()
    print('Added {} for storage in thread {}'.format(title, thread_name))
    queue.put({"title":title, "path":path})
    links = getlinks(thread_name, bs)

    if len(links)>0:
        newArticle = links[random.randint(0,len(links)-1)].attrs['href']
        scrape_article(thread_name,newArticle,queue)

queue = Queue()
try:
    _thread.start_new_thread(scrape_article, ('Thread 1','/wiki/Kevin_Bacon', queue,))
    _thread.start_new_thread(scrape_article, ('Thread 2','/wiki/Monty_Python', queue,))
    _thread.start_new_thread(storage, (queue,))
except:
    print("error:unable to start threads")
while 1:
    pass 


Added Kevin Bacon for storage in thread Thread 1
获取链接Thread 1
存储文章 Kevin Bacon
Added Monty Python for storage in thread Thread 2
获取链接Thread 2
存储文章 Monty Python
Added Apollo 13 (film) for storage in thread Thread 1
获取链接Thread 1
存储文章 Apollo 13 (film)
Added Joan Miró for storage in thread Thread 2
获取链接Thread 2
存储文章 Joan Miró
Added Max Elliott Slade for storage in thread Thread 1
获取链接Thread 1
存储文章 Max Elliott Slade
Added Wayback Machine for storage in thread Thread 2
获取链接Thread 2
存储文章 Wayback Machine
Added Parenthood (1990 TV series) for storage in thread Thread 1
获取链接Thread 1
存储文章 Parenthood (1990 TV series)
Added Time capsule for storage in thread Thread 2
获取链接Thread 2
存储文章 Time capsule
Added Jasen Fisher for storage in thread Thread 1
获取链接Thread 1
存储文章 Jasen Fisher
Added George Edward Pendray for storage in thread Thread 2
获取链接Thread 2
存储文章 George Edward Pendray
Added The Witches (1990 film) for storage in thread Thread 1
获取链接Thread 1
存储文章 The Witches (1990 film)
Added Spaceflight for s

KeyboardInterrupt: 

-----------------------------
 threading 模块是 一个高级接口，可以让你轻松地使用线程，同时也暴露了 _thread 模块的所有特性。

In [None]:
import threading
import time 
def print_time(threadName,delay,iterations):
    start = int(time.time())
    for i in range(0,iterations):
        time.sleep(delay)
        seconds_elapsed = str(int(time.time()) - start)
        print("{}{}".format(seconds_elapsed,threadName))

threading.Thread(target=print_time, args=('Fizz', 3, 33)).start()
threading.Thread(target=print_time, args=('Buzz', 5, 20)).start()
threading.Thread(target=print_time, args=('Counter', 1, 100)).start()

**threading 模块的一个优点是，它可以轻松地创建其他线程都无法访问的线程局部数据 (local thread data)。这样做的好处是，如果你有若干线程，它们各自抓取不同的网站，那
么每个线程都可以跟踪自己访问的页面列表。**

In [None]:
'''局部数据可以随时创建，调用线程函数即可'''
from ast import arg
import threading
def crawler(url):
    data = threading.local()
    data.visited = []
    # 抓取网站
threading.Thread(target=crawler, args=('http://brookings.edu')).start()

这样就可以解决线程之间因为共享对象而<u>导致竞争条件</u>的问题。无论何时，只要不需要共享对象，就不要共享，保存在线程局部内存中即可。为了安全地在线程中共享对象，仍然 可以使用上一节中的 Queue 模块。

In [4]:
'''通常情况下，爬虫都需要运行很长时间。isAlive 函数可以确保爬虫在一个线程崩溃后重启:'''
import threading
import time 
class Crawler(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.done = False
    def isDone(self):
        return self.done 
    def run(self):
        time.sleep(5)
        self.done = True
        raise Exception("something bad happebd!")

    
t = Crawler()
t.start()

while True:
    time.sleep(1)
    if t.isDone():
        print('Done')
        break
    if not t.isAlive():
        t = Crawler()
        t.start()


AttributeError: 'Crawler' object has no attribute 'isAlive'

--------------------------
# 多进程抓取

In [None]:
from multiprocessing import Process
import time 

def print_time(threadName,delay,iterations):
    start = int(time.time())
    for i in range(0,iterations):
        time.sleep(delay)
        seconds_elapsed = str(int(time.time()) - start)
        print (threadName if threadName else seconds_elapsed)
processes = []
processes.append(Process(target=print_time, args=('Counter', 1, 100)))
processes.append(Process(target=print_time, args=('Fizz', 3, 33)))
processes.append(Process(target=print_time, args=('Buzz', 5, 20)))
for p in processes:
    p.start()
for p in processes:
    p.join()


**多进程抓取网页**

In [1]:
from urllib.request import urlopen
from bs4 import BeautifulSoup
import re 
import random
from multiprocessing import Process
import os 
import time 
visited = []
def get_links(bs):
    print('Getting links in {}'.format(os.getpid()))
    links = bs.find('div', {'id':'bodyContent'}).find_all('a',
             href=re.compile('^(/wiki/)((?!:).)*$'))
    return [link for link in links if link not in visited]


def scrape_article(path):
    visited.append(path)
    html = urlopen('http://en.wikipedia.org{}'.format(path))
    time.sleep(5)
    bs = BeautifulSoup(html, 'html.parser')
    title = bs.find('h1').get_text()
    print('Scraping {} in process {}'.format(title, os.getpid()))

    links = get_links(bs)
    if len(links)>0:
        newArticle = links[random.randint(0, len(links)-1)].attrs['href']
        print(newArticle)
        scrape_article(newArticle)

processes = []
processes.append(Process(target=scrape_article, args=('/wiki/Kevin_Bacon',)))
processes.append(Process(target=scrape_article, args=('/wiki/Monty_Python',)))
for p in processes:
    p.start()

Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/spawn.py", line 116, in spawn_main
    exitcode = _main(fd, parent_sentinel)
  File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/spawn.py", line 126, in _main
    self = reduction.pickle.load(from_parent)
AttributeError: Can't get attribute 'scrape_article' on <module '__main__' (built-in)>
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/spawn.py", line 116, in spawn_main
    exitcode = _main(fd, parent_sentinel)
  File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/spawn.py", line 126, in _main
    self = reduction.pickle.load(from_parent)
AttributeError: Can't get attribute 'scrape_article' on <module '__main__' (built-in)>


**有一种方法可以让同一台机器上的进程互相通信，那就是用 Python 的两个对象:队 列和管线(pipe)。**
如果将网页的静态列表替换成某种抓取委托器(delegator)会怎样呢?爬虫以待抓取网页 路径的形式(例如 /wiki/Monty_Python)，从一个队列中获取一个任务，抓取结束后再将一 个“已发现 URL”的列表返回到另一个独立的队列中，这个队列将由抓取委托器来处理， 这样就只有新的 URL 会被添加到第一个任务队列中。

In [2]:
from urllib.request import urlopen
from bs4 import BeautifulSoup
import re
import random
from multiprocessing import Process, Queue
import os
import time


def task_delegator(taskQueue,urlsQueue):
    # 为每个进程初始化一个任务
    visited = ['/wiki/Kevin_Bacon', '/wiki/Monty_Python']
    taskQueue.put('/wiki/Kevin_Bacon')
    taskQueue.put('/wiki/Monty_Python')

    while 1:
        # 检查urlsQueue中是否存在新链接需要处理
        if not urlsQueue.empty():
            links = [link for link in urlsQueue.get() if link not in visited]
            for link in links:
                # 向taskQueue中增加新链接
                taskQueue.put(link)

def get_links(bs):
    links = bs.find('div', {'id':'bodyContent'}).find_all('a',
             href=re.compile('^(/wiki/)((?!:).)*$'))
    return [link.attrs['href'] for link in links]


def scrape_article(taskQueue,urlsQueue):
    while 1:
        while taskQueue.empty():
            # 如果任务队列为空，休息100毫秒
            # 这种情况应该极少发生
            time.sleep(.1)
        path = taskQueue.get()
        html = urlopen('http://en.wikipedia.org{}'.format(path))
        time.sleep(5)
        bs = BeautifulSoup(html, 'html.parser')
        title = bs.find('h1').get_text()
        print('Scraping {} in process {}'.format(title, os.getpid()))
        links = get_links(bs)
        # 发送这些链接到委托器进行处理
        urlsQueue.put(links)
processes = []
taskQueue = Queue()
urlsQueue = Queue()
processes.append(Process(target=task_delegator, args=(taskQueue, urlsQueue,)))
processes.append(Process(target=scrape_article, args=(taskQueue, urlsQueue,)))
processes.append(Process(target=scrape_article, args=(taskQueue, urlsQueue,)))
for p in processes:
    p.start()

Traceback (most recent call last):
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "<string>", line 1, in <module>
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/spawn.py", line 116, in spawn_main
  File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/spawn.py", line 116, in spawn_main
  File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/spawn.py", line 116, in spawn_main
        exitcode = _main(fd, parent_sentinel)exitcode = _main(fd, parent_sentinel)
      File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/spawn.py", line 126, in _main
exitcode = _main(fd, parent_sentinel)
  File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/spawn.py", line 126, in _main

  File "/Library/Frameworks/Python.framework/Ve