In [7]:
import requests
from bs4 import BeautifulSoup
import queue
import threading
import re
import csv
import functools
import os
from time import time

In [8]:
lock = threading.Lock()

In [9]:
# 获取函数运行时间装饰器
def excute_time(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        start = time()
        result = func(*args, **kwargs)
        spend = time() - start
        print('%s() 运行时间: %d' % (func.__name__, spend))
        thread_num = ''
        if 'thread_num' in kwargs:
            thread_num = kwargs['thread_num']
        
        # 运行时间写入文件
        headers = ['函数', '运行时间', '线程数']
        if os.path.isfile('运行时间统计.csv'):
            with open('运行时间统计.csv', 'a', newline='') as f:
                writer = csv.DictWriter(f, headers)
                data = {'函数':func.__name__, '运行时间':spend, '线程数':thread_num}
                writer.writerow(data)
        else:
            with open('运行时间统计.csv', 'a', newline='') as f:
                writer = csv.DictWriter(f, headers)
                writer.writeheader()
                data = {'函数':func.__name__, '运行时间':spend, '线程数':thread_num}
                writer.writerow(data)
            
        return result
    return wrapper

In [10]:
class jobObject:
    'job描述对象'
    __slots__ = ('tag_href', 'job', 'company', 'address', 'salary', 'tag')

In [11]:
def ipToQueue():
    q = queue.Queue()
    url = 'https://free-proxy-list.net/'
    r = requests.get(url)
    html = r.text
    soup = BeautifulSoup(html, features='html5lib')
    div_tag = soup.find('div', 'table-responsive')
    tbody_list = div_tag.tbody.contents
    result = ''
    for tr in tbody_list:
        ip = tr.next_element.string
        port = tr.next_element.next_sibling.string
        result = ip + ':' + port
        q.put(result)

    return q
    

In [12]:
def getIPool(ip_queue):
    proxies = queue.Queue()

    flag = True
    while flag:
        ip = ip_queue.get()
        proxy = {'http': ip, 'https': ip}
        proxies.put(proxy)
        if ip_queue.qsize()==0:
            flag = False

    return proxies

In [13]:
def getCleanProxies(proxies, **thread_num):
    #     url = 'http://www.baidu.com'
    url = 'https://jobs.51job.com/'

    # 有效的ip队列
    ok_proxies = queue.Queue()

    def check_proxy(thread_proxies):

        thread_proxies_size = thread_proxies.qsize()

        for i in range(thread_proxies_size):
            thread_proxies_size = thread_proxies.qsize()
            print('%s : 当前剩余处理的ip: %d' % (threading.current_thread().name, thread_proxies_size))

            # 如果当前ip为0退出当前线程
            if (thread_proxies_size == 0):
                break

            proxy = thread_proxies.get()
            try:
                r = requests.get(url, proxies=proxy, timeout=5)
                print(threading.current_thread().name + ' is processing')
                print('r.status_code:', r.status_code)
                if (r.status_code != 200):
                    print('remove proxy:', proxy)
                else:
                    ok_proxies.put(proxy)

                print('-------------------------------------')

            except requests.Timeout as e:
                print(threading.current_thread().name + ' is processing')
                print(e)
                print('remove proxy:', proxy)
                print('-------------------------------------')
            except requests.ConnectionError as e:
                print(threading.current_thread().name + ' is processing')
                print(e)
                print('remove proxy:', proxy)
                print('-------------------------------------')
            except requests.InvalidHeader as e:
                print(threading.current_thread().name + ' is processing')
                print(e)
                print('remove proxy:', proxy)
                print('-------------------------------------')


    threads = []

    for i in range(thread_num.get('thread_num')):
        thread = threading.Thread(target=check_proxy, args=(proxies,))
        threads.append(thread)

    print('线程开始：')
    for thread in threads:
        print(thread.name + '开始运行')
        thread.start()

    # 每次在剩余处理ip为0之后，程序就会停住不动
    # 当当前线程检测到剩余ip为0后，调用join()方法
    print('线程开始停止运行：')
    for thread in threads:
        thread.join()
        print(thread.name + '已停止')

    return ok_proxies

In [14]:
@excute_time
def get_jobObjectList(url):
    global proxy
    
    flag = True
    
    # TODO 查看信息
    print('get_jobObjectList()当前处理链接:', url)
    while flag:
        try:
            r = requests.get(url, proxies=proxy, timeout=5)
            # 报了异常的话捕捉了之后
            # 获得新的proxy之后就直接开始下次循环
        except requests.exceptions.Timeout as e:
            print(e)
            proxy = getProxy()
            print('尝试新的proxy')
            #print('----------------------')
            continue
        except requests.exceptions.ProxyError as e:
            print(e)
            proxy = getProxy()
            print('尝试新的proxy')
            #print('----------------------')
            continue
        except requests.exceptions.ConnectionError as e:
            print(e)
            proxy = getProxy()
            print('尝试新的proxy')
            #print('----------------------')
            continue
        
        # 不报异常就取消循环
        flag = False

    
    
    r.encoding = 'gbk'
    html = r.text
    soup = BeautifulSoup(html, features='html5lib')
    # 找到了工作列表的开头
    title = soup.find('div', 'el title')
    # 找到每一个工作列，它们分别是一个div class='el'
    job_info_list = title.find_next_siblings('div', 'el')

    # 存放当前页jobObject对象到jobObjectList中
    jobObjectList = []
    for tag in job_info_list:
        object = jobObject()
        object.tag_href = tag.find('a').attrs['href']
        object.job = tag.find('a').attrs['title']

        infos = tag.find_all(class_=re.compile('t[2-4]'))
        object.company = infos[0].string
        object.address = infos[1].string
        object.salary = infos[2].string
        jobObjectList.append(object)
    
    print('get_jobObjectList处理完成!')

    return jobObjectList

In [15]:
@excute_time
def getTag(url):
    tag_url = url
    global proxy
    
    # 存储标签的List
    tag_list = []
    flag = True

    # TODO 查看信息
    print('getTag()当前处理链接:', tag_url)
    print('getTag()获得的ip:', proxy)
    while flag:
        #     headers = {'user-agent','Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/66.0.3359.181 Chrome/66.0.3359.181 Safari/537.36'}
        try:
            r_tag = requests.get(tag_url, timeout=5, proxies=proxy)

        # 报了异常的话捕捉了之后
        # 获得新的proxy之后就直接开始下次循环
        except requests.exceptions.Timeout as e:
            print(e)
            proxy = getProxy()
            print('尝试新的proxy')
            #print('----------------------')
            continue
        except requests.exceptions.ProxyError as e:
            print(e)
            proxy = getProxy()
            print('尝试新的proxy')
            #print('----------------------')
            continue
        except requests.exceptions.ConnectionError as e:
            print(e)
            proxy = getProxy()
            print('尝试新的proxy')
            #print('----------------------')
            continue
        # 不报异常就取消循环
        flag = False

    r_tag.encoding = 'gbk'
    tag_html = r_tag.text
    tag_soup = BeautifulSoup(tag_html, features='html5lib')

    # 找到详情页第一行标签
    tag_1 = tag_soup.find_all('span', 'sp4')
    for i in tag_1:
        tag_list.append(i.contents[1])

    # 找到详情页第二行标签
    tag_2 = tag_soup.find('p', 't2')

    # 有些详情页没有第二行标签
    if tag_2 is None:
        print('getTag()中tag_list:', tag_list)
        print('----------------------')
        return tag_list

    ll = []
    for i in tag_2.strings:
        ll.append(i)

    # 清除ll中的空值
    def not_empty(s):
        return s and s.strip()

    for i in filter(not_empty, ll):
        tag_list.append(i)

    print('getTag()中tag_list:', tag_list)
    print('----------------------')

    return tag_list

In [16]:
@excute_time
def pack_tag(jobObjectList, **thread_num):
    jobObject_queue = queue.Queue()
    
    for jobObject in jobObjectList:
        jobObject_queue.put(jobObject)
    
    def pack_object(jobObject_queue):
        flag = True
        while flag:
            jobObject = jobObject_queue.get()
            jobObject.tag = getTag(jobObject.tag_href)
            if jobObject_queue.qsize() == 0:
                flag = False
            
    
    threads = []
    
#     print('thread_num.get(\'thread_num\'): ', thread_num.get('thread_num'))
    
    for i in range(thread_num.get('thread_num')):
        thread = threading.Thread(target=pack_object, args=(jobObject_queue,))
        threads.append(thread)
    
    for thread in threads:
        thread.start()
        
        
    for thread in threads:
        thread.join()
        print('%s 已停止' % thread.name)
    

    print('tag组装完成!')


In [17]:
def save_to_csv(jobObjectList):
    headers = ['职位', '公司', '地址', '薪酬', '标签']
    datas = []

    for jobObject in jobObjectList:
        data = {headers[0]: jobObject.job,
                headers[1]: jobObject.company,
                headers[2]: jobObject.address,
                headers[3]: jobObject.salary,
                headers[4]: jobObject.tag}
        datas.append(data)
        
    if os.path.isfile('51job.csv'):
        with open('51job.csv', 'a', newline='') as f:
            writer = csv.DictWriter(f, headers)
            for row in datas:
                writer.writerow(row)
    else:
        with open('51job.csv', 'a', newline='') as f:
            writer = csv.DictWriter(f, headers)
            writer.writeheader()
            for row in datas:
                writer.writerow(row)

In [18]:
def handle_single(url, proxy):
    jobObjectList = get_jobObjectList(url)
    pack_tag(jobObjectList, thread_num=20)
    save_to_csv(jobObjectList)

In [19]:
# 获取proxy
# 当队列中的proxy用完了的时候在获取新的proxies
# 不是每次线程启动就获取新的proxies
# 当一个proxy不能用了才能获取新的proxy
def getProxy():
    global proxies
    
    lock.acquire()
    if proxies.qsize()==0:
        proxies = getProxies()
    lock.release()
        
    proxy = proxies.get()
    print('###当前proxies的ip数: ', proxies.qsize())
    return proxy
    

def getProxies():
    # 多线程的话，就不保存在本地文件
    # 应该保存在各自线程的queue里面s
    ip_start = time()
    ip_queue = ipToQueue()
    proxies = getIPool(ip_queue)
    proxies = getCleanProxies(proxies, thread_num=45)
    ip_result = time() - ip_start
    print('可用的ip: ', proxies.qsize())
    print('%s清洗ip的时间: %d' % (threading.current_thread().name, ip_result))
    return proxies

In [20]:
# 全局变量
proxies = getProxies()
proxy= getProxy()

线程开始：
Thread-4开始运行
Thread-4 : 当前剩余处理的ip: 300Thread-5开始运行

Thread-5 : 当前剩余处理的ip: 299Thread-6开始运行

Thread-6 : 当前剩余处理的ip: 298Thread-7开始运行

Thread-7 : 当前剩余处理的ip: 297Thread-8开始运行

Thread-8 : 当前剩余处理的ip: 296Thread-9开始运行

Thread-9 : 当前剩余处理的ip: 295Thread-10开始运行

Thread-10 : 当前剩余处理的ip: 294Thread-11开始运行

Thread-11 : 当前剩余处理的ip: 293Thread-12开始运行

Thread-12 : 当前剩余处理的ip: 292Thread-13开始运行

Thread-13 : 当前剩余处理的ip: 291Thread-14开始运行

Thread-14 : 当前剩余处理的ip: 290Thread-15开始运行

Thread-15 : 当前剩余处理的ip: 289Thread-16开始运行

Thread-16 : 当前剩余处理的ip: 288Thread-17开始运行

Thread-17 : 当前剩余处理的ip: 287Thread-18开始运行

Thread-18 : 当前剩余处理的ip: 286Thread-19开始运行

Thread-19 : 当前剩余处理的ip: 285Thread-20开始运行

Thread-20 : 当前剩余处理的ip: 284Thread-21开始运行

Thread-21 : 当前剩余处理的ip: 283Thread-22开始运行

Thread-22 : 当前剩余处理的ip: 282Thread-23开始运行

Thread-23 : 当前剩余处理的ip: 281Thread-24开始运行

Thread-24 : 当前剩余处理的ip: 280Thread-25开始运行

Thread-25 : 当前剩余处理的ip: 279Thread-26开始运行

Thread-26 : 当前剩余处理的ip: 278Thread-27开始运行

Thread-27 : 当前剩余处理的ip: 277Thread-28开始运行

Thread-2

In [21]:
def main():
    url_queue = queue.Queue()
    pages = 10
    thread_nums = 5
    
    for i in range(pages):
        url = 'https://search.51job.com/list/000000,000000,0000,00,9,99,Java,2,{}.html?lang=c&stype=&postchannel=0000&workyear=99&cotype=99&degreefrom=99&jobterm=99&companysize=99&providesalary=99&lonlat=0%2C0&radius=-1&ord_field=0&confirmdate=9&fromType=&dibiaoid=0&address=&line=&specialarea=00&from=&welfare='\
        .format(i+1)
        url_queue.put(url)
    
#     flag = True
#     while flag:
 
    def go(url_queue, proxy):
        flag = True
        while flag:   
            url = url_queue.get()
            start = time()
            handle_single(url, proxy)
            result = time() - start
            print('%s完成单页的时间: %d' % (threading.current_thread().name, result))
            
            if url_queue.qsize()==0:
                flag = False
            
    
    threads = []
    
    for i in range(thread_nums):
        thread = threading.Thread(target=go, args=(url_queue,proxy))
        threads.append(thread)
    
    start = time()
    for thread in threads:
        thread.start()
        print('主线程%s开始' % thread.name)
        
    for thread in threads:
        thread.join()
        print('主线程%s停止' % thread.name)
    
    result = time()-start
    print('爬取时间:',result)


In [None]:
main()

get_jobObjectList()当前处理链接:主线程Thread-49开始
 https://search.51job.com/list/000000,000000,0000,00,9,99,Java,2,1.html?lang=c&stype=&postchannel=0000&workyear=99&cotype=99&degreefrom=99&jobterm=99&companysize=99&providesalary=99&lonlat=0%2C0&radius=-1&ord_field=0&confirmdate=9&fromType=&dibiaoid=0&address=&line=&specialarea=00&from=&welfare=
get_jobObjectList()当前处理链接:主线程Thread-50开始
 https://search.51job.com/list/000000,000000,0000,00,9,99,Java,2,2.html?lang=c&stype=&postchannel=0000&workyear=99&cotype=99&degreefrom=99&jobterm=99&companysize=99&providesalary=99&lonlat=0%2C0&radius=-1&ord_field=0&confirmdate=9&fromType=&dibiaoid=0&address=&line=&specialarea=00&from=&welfare=
get_jobObjectList()当前处理链接:主线程Thread-51开始
 https://search.51job.com/list/000000,000000,0000,00,9,99,Java,2,3.html?lang=c&stype=&postchannel=0000&workyear=99&cotype=99&degreefrom=99&jobterm=99&companysize=99&providesalary=99&lonlat=0%2C0&radius=-1&ord_field=0&confirmdate=9&fromType=&dibiaoid=0&address=&line=&specialarea=00&f