In [4]:
import os

print('Process (%s) start...' % os.getpid())
# Only works on Unix/Linux/Mac:
pid = os.fork()
if pid == 0:
    print('I am child process (%s) and my parent is %s.' % 
        (os.getpid(), os.getppid()))
else:
    print('I (%s) just created a child process (%s).' % 
        (os.getpid(), pid))

Process (24182) start...
I (24182) just created a child process (24222).
I am child process (24222) and my parent is 24182.


In [31]:
os.getgid()
os.path
os.name
os.ctermid()
os.geteuid()
os.getgroups()

[4, 24, 27, 30, 46, 113, 129, 1000]

In [32]:
from multiprocessing import Process
import os

# 子进程要执行的代码
def run_proc(name):
    print('Run child process %s (%s)...' % (name, os.getpid()))

if __name__=='__main__':
    print('Parent process %s.' % os.getpid())
    p = Process(target=run_proc, args=('test',))
    print('Child process will start.')
    p.start()
    p.join()
    print('Child process end.')

Parent process 24182.
Child process will start.
Run child process test (24306)...
Child process end.


In [39]:
from multiprocessing import Pool
import os, time, random

def long_time_task(name):
    print('Run task %s (%s)...' % (name, os.getpid()))
    start = time.time()
    time.sleep(random.random() * 3)
    end = time.time()
    print('Task %s runs %0.2f seconds.' % (name, (end - start)))

if __name__=='__main__':
    print('Parent process %s.' % os.getpid())
    p = Pool()
    for i in range(5):
        p.apply_async(long_time_task, args=(i,))
    print('Waiting for all subprocesses done...')
    p.close()
    p.join()
    print('All subprocesses done.')

Parent process 24182.
Run task 3 (24495)...
Run task 0 (24492)...
Run task 1 (24493)...
Run task 2 (24494)...
Run task 4 (24496)...
Task 3 runs 0.02 seconds.
Waiting for all subprocesses done...
Task 4 runs 0.37 seconds.
Task 1 runs 0.99 seconds.
Task 0 runs 2.72 seconds.
Task 2 runs 2.77 seconds.
All subprocesses done.


In [40]:
import subprocess

print('$ nslookup www.python.org')
r = subprocess.call(['nslookup', 'www.python.org'])
print('Exit code:', r)

$ nslookup www.python.org
Exit code: 0


In [41]:
from multiprocessing import Process, Queue
import os, time, random

# 写数据进程执行的代码:
def write(q):
    print('Process to write: %s' % os.getpid())
    for value in ['A', 'B', 'C']:
        print('Put %s to queue...' % value)
        q.put(value)
        time.sleep(random.random())

# 读数据进程执行的代码:
def read(q):
    print('Process to read: %s' % os.getpid())
    while True:
        value = q.get(True)
        print('Get %s from queue.' % value)

if __name__=='__main__':
    # 父进程创建Queue，并传给各个子进程：
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # 启动子进程pw，写入:
    pw.start()
    # 启动子进程pr，读取:
    pr.start()
    # 等待pw结束:
    pw.join()
    # pr进程里是死循环，无法等待其结束，只能强行终止:
    pr.terminate()

Process to write: 24561
Process to read: 24562
Put A to queue...
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.


In [66]:
import threading

# 创建全局ThreadLocal对象:
local_school = threading.local()

def process_student():
    # 获取当前线程关联的student:
    std = local_school.student
    print('Hello, %s (in %s)\n' % (std, threading.current_thread().name))

def process_thread(name):
    # 绑定ThreadLocal的student:
    local_school.student = name
    process_student()

t1 = threading.Thread(target= process_thread, args=('Alice',),
                      name='Thread-A')
t2 = threading.Thread(target= process_thread, args=('Bob',), 
                      name='Thread-B')
t1.start()
t2.start()
t1.join()
t2.join()

Hello, Alice (in Thread-A)

Hello, Bob (in Thread-B)



In [76]:
# task_master.py

import random, time, queue
from multiprocessing.managers import BaseManager

# 发送任务的队列:
task_queue = queue.Queue()
# 接收结果的队列:
result_queue = queue.Queue()

# 从BaseManager继承的QueueManager:
class QueueManager(BaseManager):
    pass

# 把两个Queue都注册到网络上, callable参数关联了Queue对象:
QueueManager.register('get_task_queue', callable=lambda: task_queue)
QueueManager.register('get_result_queue', callable=lambda: result_queue)
# 绑定端口5000, 设置验证码'abc':
manager = QueueManager(address=('', 5000), authkey=b'abc')
# 启动Queue:
manager.start()
# 获得通过网络访问的Queue对象:
task = manager.get_task_queue()
result = manager.get_result_queue()
# 放几个任务进去:
for i in range(10):
    n = random.randint(0, 10000)
    print('Put task %d...' % n)
    task.put(n)
# 从result队列读取结果:
print('Try get results...')
for i in range(10):
    r = result.get(timeout=10)
    print('Result: %s' % r)
# 关闭:
manager.shutdown()
print('master exit.')

Process QueueManager-88:
Traceback (most recent call last):
  File "/usr/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap
    self.run()
  File "/usr/lib/python3.5/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.5/multiprocessing/managers.py", line 505, in _run_server
    server = cls._Server(registry, address, authkey, serializer)
  File "/usr/lib/python3.5/multiprocessing/managers.py", line 140, in __init__
    self.listener = Listener(address=address, backlog=16)
  File "/usr/lib/python3.5/multiprocessing/connection.py", line 438, in __init__
    self._listener = SocketListener(address, family, backlog)
  File "/usr/lib/python3.5/multiprocessing/connection.py", line 576, in __init__
    self._socket.bind(address)
OSError: [Errno 98] Address already in use


EOFError: 

In [73]:
# task_worker.py

import time, sys, queue
from multiprocessing.managers import BaseManager

# 创建类似的QueueManager:
class QueueManager(BaseManager):
    pass

# 由于这个QueueManager只从网络上获取Queue，所以注册时只提供名字:
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')

# 连接到服务器，也就是运行task_master.py的机器:
server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)
# 端口和验证码注意保持与task_master.py设置的完全一致:
m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
# 从网络连接:
m.connect()
# 获取Queue的对象:
task = m.get_task_queue()
result = m.get_result_queue()
# 从task队列取任务,并把结果写入result队列:
for i in range(10):
    try:
        n = task.get(timeout=1)
        print('run task %d * %d...' % (n, n))
        r = '%d * %d = %d' % (n, n, n*n)
        time.sleep(1)
        result.put(r)
    except Queue.Empty:
        print('task queue is empty.')
# 处理结束:
print('worker exit.')

Connect to server 127.0.0.1...


AttributeError: 'function' object has no attribute 'Empty'

In [87]:
def reverse(data):
    for index in range(len(data)-1, -1, -1):
        yield data[index]
        print('sss')
r=reverse
for char in r('golf'):
    print(char)

f
sss
l
sss
o
sss
g
sss


In [93]:
def consumer():
    r = ''
    print('rrrr')
    while True:
        print('r---->%s' % r) #启动后执行到这里，遇见yield暂停
        nn = yield r  #接受到n后执行
        print('nnnnn')
        print('r>>>%s'%r)
        if not n:
            print('a')
            return
        print('[CONSUMER] Consuming %s...' % nn)
        r = '200 OK'

def produce(c):
    print('ccccc')
    m = c.send(None) #负责启动生成器
    print('m == %s' % m)  
    n = 0
    while n < 1:
        n = n + 1
        print('[PRODUCER] Producing %s...' % n)
        r = c.send(n)  #将n发送给c
        print('[PRODUCER] Consumer return: %s' % r)
        # next(c)
    c.close()

c = consumer()
produce(c)

ccccc
rrrr
r---->
m == 
[PRODUCER] Producing 1...
nnnnn
r>>>
[CONSUMER] Consuming 1...
r---->200 OK
[PRODUCER] Consumer return: 200 OK


In [4]:
#coding:utf-8
import random
from time import sleep
import sys
import multiprocessing
import os
#
#需求分析：有大批量数据需要执行，而且是重复一个函数操作（例如爆破密码），如果全部开始线程数N多，这里控制住线程数m个并行执行，其他等待
#
lock=multiprocessing.Lock()#一个锁
def a(x):#模拟需要重复执行的函数
    lock.acquire()#输出时候上锁，否则进程同时输出时候会混乱，不可读
    print ('开始进程：',os.getpid(),'模拟进程时间:',x)
    lock.release()
    
    sleep(x)#模拟执行操作
    
    lock.acquire()
    print ('结束进程：',os.getpid(),'预测下一个进程启动会使用该进程号')
    lock.release()
list=[]
for i in range(10):#产生一个随机数数组，模拟每次调用函数需要的输入，这里模拟总共有10组需要处理
    list.append(random.randint(1,10))
print(list)    
pool=multiprocessing.Pool(processes=3)#限制并行进程数为3
pool.map(a,list)#创建进程池，调用函数a，传入参数为list,此参数必须是一个可迭代对象,因为map是在迭代创建每个进程

[5, 3, 10, 2, 5, 1, 3, 5, 6, 4]
开始进程： 17020 模拟进程时间: 5
开始进程： 17021 模拟进程时间: 3
开始进程： 17022 模拟进程时间: 10
结束进程： 17021 预测下一个进程启动会使用该进程号
开始进程： 17021 模拟进程时间: 2
结束进程： 17020 预测下一个进程启动会使用该进程号
开始进程： 17020 模拟进程时间: 5
结束进程： 17021 预测下一个进程启动会使用该进程号
开始进程： 17021 模拟进程时间: 1
结束进程： 17021 预测下一个进程启动会使用该进程号
开始进程： 17021 模拟进程时间: 3
结束进程： 17021 预测下一个进程启动会使用该进程号
开始进程： 17021 模拟进程时间: 5
结束进程： 17020 预测下一个进程启动会使用该进程号
开始进程： 17020 模拟进程时间: 6
结束进程： 17022 预测下一个进程启动会使用该进程号
开始进程： 17022 模拟进程时间: 4
结束进程： 17022 预测下一个进程启动会使用该进程号
结束进程： 17021 预测下一个进程启动会使用该进程号
结束进程： 17020 预测下一个进程启动会使用该进程号


[None, None, None, None, None, None, None, None, None, None]