In [1]:
# -*- coding: utf-8 -*-

'''
@Author   :   Corley Tang
@contact  :   cutercorleytd@gmail.com
@Github   :   https://github.com/corleytd
@Time     :   2023-11-22 19:00
@Project  :   Hands-on Crawler with Python-thread
线程
'''

# 导入所需的库
import random
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed, wait, ALL_COMPLETED

# 1.同步与异步

In [2]:
def sleep_for_seconds(seconds):  # 模拟执行耗时任务
    time.sleep(seconds)


# 同步
for i in range(5):
    print('start')
    sleep_for_seconds(2)
    print('end')

start
end
start
end
start
end
start
end
start
end


In [3]:
# 异步
executor = ThreadPoolExecutor(max_workers=2)
for i in range(5):
    print('start')
    executor.submit(sleep_for_seconds, 2)
    print('end')

start
end
start
end
start
end
start
end
start
end


# 2.threading库

In [4]:
# 创建线程
# 方式1：创建threading.Thread实例，将需要被线程执行的函数传入该实例
t = threading.Thread(target=sleep_for_seconds, args=(2,))
t.start()  # 启动线程，此时线程就开始执行了，如果在同1个线程对象中多次调用会引发RuntimeError: threads can only be started once异常
t.join()  # 将主线程挂起，直到子线程运行结束，若timeout不为None，，则表示主线程最长挂起的时间，主线程结束挂起后就会继续执行
print('Done')

Done


In [5]:
# 方式2：创建一个类，该类继承于threading.Thread，重写其run()方法（更推荐）
class CustomThread(threading.Thread):
    def __init__(self, func, args):
        super().__init__()
        self.func = func
        self.args = args

    def run(self):  # 可以自定义线程具体的执行逻辑，更加灵活直观
        self.func(*self.args)


# 实例化线程
t = CustomThread(sleep_for_seconds, (2,))
t.start()
t.join()
print('Done')

Done


# 3.线程锁

In [6]:
# 不使用锁：多个线程使用同一资源时，会发生冲突
class CustomThread(threading.Thread):
    def __init__(self, in_file, out_file):
        super().__init__()
        self.in_file = in_file  # 输入文件
        self.out_file = out_file  # 输出文件

    def run(self):
        for line in self.in_file.readlines():
            time.sleep(0.1)  # 模拟耗时操作
            self.out_file.write(line)
        print('Thread Done')


f1 = open('../01_python_basic/copy_deepcopy.py', encoding='utf-8')
f2 = open('../01_python_basic/random_str.py', encoding='utf-8')
f3 = open('../output/thread_output.txt', 'w', encoding='utf-8')
t1 = CustomThread(f1, f3)
t2 = CustomThread(f2, f3)
t1.start()
t2.start()
t1.join()
t2.join()
f1.close()
f2.close()
f3.close()
print('Done')  # 输出文件中顺序混乱

Thread Done
Thread Done
Done


In [7]:
# 优化：利用锁机制——线程获取锁后有locked（被锁）与unlocked（未被锁）两种状态，没有获取锁的线程无法执行，通过这种方式达到同一时刻下有且只有一个线程在运行的目的
class CustomThread(threading.Thread):
    def __init__(self, in_file, out_file, lock):
        super().__init__()
        self.in_file = in_file  # 输入文件
        self.out_file = out_file  # 输出文件
        self.lock = lock  # 锁对象，用于保护共享资源的访问

    def run(self):
        self.lock.acquire()  # 获得锁对象，lock状态变为locked，并且阻塞其他线程获取锁对象
        for line in self.in_file.readlines():
            time.sleep(0.1)  # 模拟耗时操作
            self.out_file.write(line)
        self.lock.release()  # 释放锁对象，lock状态变为unlocked，其他线程可以获取锁对象
        print('Thread Done')


f1 = open('../01_python_basic/copy_deepcopy.py', encoding='utf-8')
f2 = open('../01_python_basic/random_str.py', encoding='utf-8')
f3 = open('../output/thread_output.txt', 'w', encoding='utf-8')
lock = threading.Lock()  # 创建锁对象
t1 = CustomThread(f1, f3, lock)
t2 = CustomThread(f2, f3, lock)
t1.start()
t2.start()
t1.join()
t2.join()
f1.close()
f2.close()
f3.close()
print('Done')  # 输出文件中顺序正常

Thread Done
Thread Done
Done


In [8]:
# 使用with关键字简化锁的获取和释放过程
# 优化：利用锁机制
class CustomThread(threading.Thread):
    def __init__(self, in_file, out_file, lock):
        super().__init__()
        self.in_file = in_file  # 输入文件
        self.out_file = out_file  # 输出文件
        self.lock = lock  # 锁对象，用于保护共享资源的访问

    def run(self):
        with self.lock:
            for line in self.in_file.readlines():
                time.sleep(0.1)  # 模拟耗时操作
                self.out_file.write(line)
        print('Thread Done')


f1 = open('../01_python_basic/copy_deepcopy.py', encoding='utf-8')
f2 = open('../01_python_basic/random_str.py', encoding='utf-8')
f3 = open('../output/thread_output.txt', 'w', encoding='utf-8')
lock = threading.Lock()  # 创建锁对象
t1 = CustomThread(f1, f3, lock)
t2 = CustomThread(f2, f3, lock)
t1.start()
t2.start()
t1.join()
t2.join()
f1.close()
f2.close()
f3.close()
print('Done')  # 与前面结果相同

Thread Done
Thread Done
Done


In [9]:
# 可重入锁
class CustomThread(threading.Thread):
    def __init__(self, in_file, out_file, lock):
        super().__init__()
        self.in_file = in_file  # 输入文件
        self.out_file = out_file  # 输出文件
        self.lock = lock  # 锁对象，用于保护共享资源的访问

    def run(self):
        self.lock.acquire()  # 获得锁对象
        self.lock.acquire()  # 再次获得锁对象，可重入锁可以多次使用acquire()，而普通锁多次调用acquire()方法会产生死锁
        for line in self.in_file.readlines():
            time.sleep(0.1)  # 模拟耗时操作
            self.out_file.write(line)
        self.lock.release()  # 释放锁对象
        self.lock.release()  # 再次释放锁对象
        print('Thread Done')


f1 = open('../01_python_basic/copy_deepcopy.py', encoding='utf-8')
f2 = open('../01_python_basic/random_str.py', encoding='utf-8')
f3 = open('../output/thread_output.txt', 'w', encoding='utf-8')
lock = threading.RLock()  # 创建可重入锁对象：同一个线程每一次调用acquire()方法获取锁，对应的计数器会加1，而调用release()方法释放锁时，计数器会减1
t1 = CustomThread(f1, f3, lock)
t2 = CustomThread(f2, f3, lock)
t1.start()
t2.start()
t1.join()
t2.join()
f1.close()
f2.close()
f3.close()
print('Done')  # 输出文件中顺序正常

Thread Done
Thread Done
Done


In [10]:
# ThreadLocal：线程拥有独立私有数据，对其他线程不可见
local_v = threading.local()  # 全局ThreadLocal对象：每个线程都可以操作local_v变量的属性且互不影响，其中的数据只对当前线程可见


def get_name():
    name = local_v.name  # 从ThreadLocal中取出当前线程相应的私有数据
    print(f'Hello, {name}')


def set_name(name):
    local_v.name = name  # 存入私有数据到ThreadLocal中
    get_name()


t1 = threading.Thread(target=set_name, args=('Alice',), name='A')
t2 = threading.Thread(target=set_name, args=('Corley',), name='C')
t1.start()
t2.start()

Hello, Alice
Hello, Corley


# 4.全局解释器锁GIL
在Python中，线程无法利用所有CPU资源的原因可能包括以下几点：
- **全局解释器锁（GIL）**：Python的全局解释器锁（Global Interpreter Lock）限制了同一时间只有一个线程可以执行Python字节码。这意味着即使在多核处理器上，Python线程也不能并行执行。在处理密集型任务时，这可能会限制多线程的效率。
- **线程调度**：操作系统对线程的调度也是影响Python线程使用CPU的因素之一。在某些情况下，操作系统可能会将线程置于睡眠状态，以让其他线程使用CPU。这取决于操作系统的调度策略以及线程的运行环境。
- **数据依赖和同步**：如果多线程之间的数据依赖性强，或者需要同步执行，那么可能无法充分利用所有CPU资源。因为线程同步可能会导致某些线程等待其他线程完成操作，从而降低了CPU的使用率。
- **I/O操作**：如果线程经常进行I/O操作（如读写文件、网络请求等），那么它可能无法在短时间内处理大量的计算任务，从而降低了CPU的使用率。

从Python3.2开始，对GIL进行重要的改进，改进后的GIL相比于Python2.x中旧版，GIL会让线程对GIL的竞争更加平稳、效率会更高。

In [11]:
# 比较Python2.7上旧的GIL与Python3.9上新的GIL运行效率的差别
def count(n):
    while n > 0:
        n -= 1


# 顺序执行
start = time.time()
for i in range(5):
    count(100000000)
print(f'顺序执行耗时：{time.time() - start:.4f}')

# 并发执行
start = time.time()
for i in range(5):
    t = threading.Thread(target=count, args=(100000000,))
    t.start()
    t.join()
print(f'并发执行耗时：{time.time() - start:.4f}')

顺序执行耗时：45.1431
并发执行耗时：45.9311


Python 2.7运行结果：
顺序执行耗时：21.6790
并发执行耗时：26.5600

Python 3.9运行结果：
顺序执行耗时：37.6176
并发执行耗时：37.0380

Python2.7下多线程执行耗时几乎是顺序执行的1.23倍，而Python3.8下，两者差异微小。
# 5.线程池ThreadPoolExecutor
Python线程最适合用于**IO密集型**任务，如网络I/O请求或磁盘I/O操作等。这些任务可以有效利用线程的优势，因为线程可以在等待IO完成时切换到其他任务。然而，如果你的项目需要CPU密集型任务，建议使用多进程或多进程+线程的方法。

In [12]:
def crawl(url):
    '''模拟爬虫'''
    time.sleep(random.random() * 10)
    print(f'crawl task {url} done')
    return len(url)


urls = ['http://www.baidu.com', 'http://www.taobao.com', 'http://www.sohu.com', 'http://www.qq.com',
        'http://www.163.com']

with ThreadPoolExecutor(max_workers=3) as executor:  # 创建最大容量为3的线程池
    tasks = []
    for url in urls:
        task = executor.submit(crawl, url)  # 向线程池中提交任务执行
        tasks.append(task)  # 添加任务到任务列表中

    for task in as_completed(
            tasks):  # as_completed是一个生成器：在没有任务完成时会一直阻塞，除非设置了timeout；当有某个任务完成时，会yield这个任务，并执行for 循环，然后继续阻塞，循环直到所有的任务结束。同时，先完成的任务会先返回给主线程。
        res = task.result()
        print(f'result: {res}')
    print('Main Done')

crawl task http://www.sohu.com done
result: 19
crawl task http://www.baidu.com done
result: 20
crawl task http://www.qq.com done
result: 17
crawl task http://www.taobao.com done
result: 21
crawl task http://www.163.com done
result: 18
Main Done


In [13]:
# 使用wait函数实现等待线程池中所有任务完成再回到主线程
with ThreadPoolExecutor(max_workers=3) as executor:
    tasks = []
    for url in urls:
        task = executor.submit(crawl, url)
        tasks.append(task)

    wait(tasks, return_when=ALL_COMPLETED)  # 等待所有任务完成后再回到主线程

    for task in as_completed(tasks):
        res = task.result()
        print(f'result: {res}')
    print('Main Done')

crawl task http://www.sohu.com done
crawl task http://www.taobao.com done
crawl task http://www.baidu.com done
crawl task http://www.qq.com done
crawl task http://www.163.com done
result: 20
result: 17
result: 21
result: 19
result: 18
Main Done


In [14]:
# ThreadPoolExecutor线程池默认不会抛出程序异常
def crawl(url):
    '''模拟爬虫'''
    time.sleep(url)  # 报错：TypeError: an integer is required (got type str)
    print(f'crawl task {url} done')


with ThreadPoolExecutor(max_workers=3) as executor:
    for url in urls:
        executor.submit(crawl, url)  # 执行任务时并没有报错

In [15]:
# 方式1：通过返回值捕获异常并处理
with ThreadPoolExecutor(max_workers=3) as executor:
    tasks = []
    for url in urls:
        task = executor.submit(crawl, url)
        tasks.append(task)

    for task in as_completed(tasks):
        try:
            res = task.result()
            print(f'result: {res}')
        except Exception as e:
            print(f'thread error: {e}')
    print('Main Done')

thread error: an integer is required (got type str)
thread error: an integer is required (got type str)
thread error: an integer is required (got type str)
thread error: an integer is required (got type str)
thread error: an integer is required (got type str)
Main Done


In [16]:
# 方式2：通过add_done_callback函数给线程任务增加回调函数
def exception_callback(worker):
    exception = worker.exception()  # 获取工作线程的异常
    if exception:
        print(f'thread error：{exception}')


with ThreadPoolExecutor(max_workers=3) as executor:
    tasks = []
    for url in urls:
        executor.submit(crawl, url).add_done_callback(exception_callback)  # 添加回调方法
    print('Main Done')

thread error：an integer is required (got type str)
thread error：an integer is required (got type str)
thread error：an integer is required (got type str)
Main Done
thread error：an integer is required (got type str)
thread error：an integer is required (got type str)


在Python中，可以在单个子线程中再通过ThreadPoolExecutor创建线程池，具体是在Python中创建线程需要使用threading模块，而创建线程池则需要使用concurrent.futures模块中的ThreadPoolExecutor类，在一个单独的线程中创建线程池并不会影响全局的线程管理，因此是可以做到的。

但是这样做并不常见，也可能会引入一些问题。例如，如果主线程和子线程都尝试访问共享资源，可能会出现竞态条件（race condition）或者死锁等问题。此外，如果主线程和子线程都打印输出，可能会造成输出的内容混乱。同时，还需要注意线程间的同步问题，否则可能导致线程冲突。在线程池中创建线程可能会导致线程资源浪费，并且增加了系统的复杂性，应该尽量避免。通常情况下，建议在主线程中创建线程池，而不是在线程池内部再创建线程，然后将任务提交给线程池执行。这样可以更好地控制线程的管理和资源的访问，避免出现上述问题。

以下是一个在一个单独的线程中创建线程池并提交任务的示例代码：
```python
def worker(num):
    print(f"Worker {num} is running")
    return num * num

def main():
    # 创建一个线程池，大小为4
    with ThreadPoolExecutor(max_workers=4) as executor:
        # 提交任务给线程池
        for i in range(10):
            executor.submit(worker, i)

# 创建一个新线程并运行主函数
new_thread = threading.Thread(target=main)
new_thread.start()
```

在这个示例中，我们创建了一个新的线程并运行了main函数。在main函数中，我们使用ThreadPoolExecutor创建了一个大小为4的线程池，然后提交了10个任务给线程池执行。每个任务都是调用worker函数，打印一些信息并返回结果的平方。最后，我们打印出了每个任务的执行结果。

同理，也可以在ThreadPoolExecutor线程池中创建单个子线程。具体来说，可以先使用threading.Thread创建子线程实现需要执行的任务逻辑，再使用ThreadPoolExecutor的submit()函数来提交这些任务（函数名和参数）到线程池中，并返回一个代表此任务的句柄。然而，要注意的是这样做可能会导致线程数量过多，从而影响程序的性能和稳定性。所以一般建议在需要使用线程池的地方直接使用ThreadPoolExecutor创建线程池，而不是在已经存在的线程中再次创建新的线程。
举例如下：
```python
def worker(num):
    print(f"Worker {num} is running")
    return num * num

def single_task(i): # 单个子线程
    t = threading.Thread(target=worker, args=(i,))
    t.start()
    t.join()

def main():
    # 创建一个线程池，大小为4
    with ThreadPoolExecutor(max_workers=4) as executor:
        # 提交任务给线程池
        for i in range(10):
            executor.submit(single_task, i)

if __name__ == '__main__':
    # 创建一个新线程并运行主函数
    new_thread = threading.Thread(target=main)
    new_thread.start()
```