# subprocessを使って子プロセスを管理

In [2]:
import subprocess


proc = subprocess.Popen(
       ['echo', 'Hello from the child'],
       stdout=subprocess.PIPE)
out, err = proc.communicate()
print(out.decode('utf-8'))

Hello from the child



In [12]:
import os


def run_openssl(data):
    env = os.environ.copy()
    env['password'] = b'\xe24'
    proc = subprocess.Popen(
           ['openssl', 'enc', '-des3', '-pass', 'env:password'],
           env=env,
           stdin=subprocess.PIPE,
           stdout=subprocess.PIPE
           )
    proc.stdin.write(data)
    proc.stdin.flush()
    return proc

proces = []

for _ in range(3):
    data = os.urandom(10)
    proc = run_openssl(data)
    proces.append(proc)
    
for proc in proces:
    out, err = proc.communicate()
    print(out[-10:])
    
def run_md5(input_stdin):
    proc = subprocess.Popen(
        ['md5'],
        stdin=input_stdin,
        stdout=subprocess.PIPE
           )
    return proc

input_proces = []
hash_proces = []

for _ in range(3):
    data = os.urandom(10)
    proc = run_openssl(data)
    input_proces.append(proc)
    #hash_proc = run_md5(proc.stdout)
    #hash_proces.append(hash_proc)
        
for proc in input_proces:
    try:
        proc.communicate(timeout=0.1)
    except subprocess.TimeoutExpired:
        proc.terminate()
        proc.wait()
#for proc in hash_proces:
#    out, err = proc.communicate()
#    print(out.stip())

b'\x00\xd35\x17[~\xf6\xbe\x98\xd0'
b'v\x901Cp\xb0d<1\x8f'
b'\x86\xabB\xe0o\xf3{x\x03\x81'


# スレッドはブロッキングIOに使用

- ファイル入出力、WEBアクセス、データベースの処理

In [36]:
import select, socket
import datetime
from threading import *

def slow_systemcall():
    select.select([socket.socket()], [], [], 0.1)
    
start = datetime.datetime.now()
for _ in range(50):
    slow_systemcall()
end = datetime.datetime.now()
print(end - start)

start2 = datetime.datetime.now()

threads = []
for _ in range(5):
    thread = Thread(target=slow_systemcall)
    thread.start()
    threads.append(thread)

for thread in threads:
    thread.join()
end2 = datetime.datetime.now()
print(end2 - start2)
    

0:00:00.002660
0:00:00.001499


# スレッドのデータ競合に対してLockを使用する

In [40]:
class Counter(object):
    def __init__(self):
        self.count = 0
    
    def increment(self, offset):
        self.count += offset

def worker(sensor_index, how_many, counter):
    for _ in range(how_many):
        counter.increment(1)

def run_threads(func, how_many, counter):
    threads = []
    for i in range(5):
        args = (i, how_many, counter)
        thread = Thread(target=func, args=args)
        threads.append(thread)
        thread.start()
    for thread in threads:
        thread.join()

class LockingCounter(object):
    def __init__(self):
        self.lock = Lock()
        self.count = 0
        
    def increment(self, offset):
        with self.lock:
            self.count += offset
            
how_many = 10**5
counter = LockingCounter()
run_threads(worker, how_many, counter)
print(5 * how_many, counter.count)

500000 500000


# スレッド間の協調作業にはQueueを使用する

In [42]:
from queue import Queue
queue = Queue()

class ClosableQueue(Queue):
    SENTINEL = object()
    
    def close(self):
        self.put(self.SENTINEL)
        
    def __iter__(self):
        while True:
            item = self.get()
            try:
                if item is self.SENTINEL:
                    return
                yield item
            finally:
                self.task_done()
    
class StoppableWorker(Thread):
    def __init__(self, func, in_queue, out_queue):
        super().__init__()
        self.func = func
        self.in_queue = in_queue
        self.out_queue = out_queue
        self.polled_count = 0
        self.work_done = 0
    
    def run(self):
        for item in self.in_queue:
            result = self.func(item)
            self.out_queue.put(result)
            
download_queue = ClosableQueue()
resize_queue = ClosableQueue()
upload_queue = ClosableQueue()
done_queue = ClosableQueue()
threads = [
    StoppableWorker(download, download_queue, resize_queue),
    StoppableWorker(resize, resize_queue, upload_queue),
    StoppableWorker(upload, upload_queue, done_queue),
]

for thread in threads:
    thread.start()
for _ in range(100):
    download_queue.put(object())
download_queue.close()
download_queue.join()
resize_queue.close()
resize_queue.join()
upload_queue.close()
upload_queue.join()

NameError: name 'download' is not defined

# 多くの関数実行にはコルーチンを考える

In [47]:
def minimize():
    current = yield
    while True:
        value = yield current
        current = min(value, current)

it = minimize()
# whileの中に入るため
next(it)

print(it.send(10))
print(it.send(4))
print(it.send(22))
print(it.send(-1))

10
4
4
-1


# 本当の並列性のために`concurrent.futures`を考慮する

- 並列（cpuを複数使用）、並行（見た目上並列だが使用するcpuは一つ）
- ProcessPoolExecutorを使用する場合でもオーバーヘッドとのトレードオフを考慮する

In [53]:
def gcd(pair):
    a, b = pair
    low = min(a, b)
    for i in range(low, 0, -1):
        if a % i == 0 and b % i == 0:
            return i

numbers = [(1963309, 2265973),(1963309, 2265973),
          (1963309, 2265973), (1963309, 2265973)]

start = datetime.datetime.now()

results = list(map(gcd, numbers))

end = datetime.datetime.now()

print(end - start)

# Thread

start = datetime.datetime.now()

pool = ThreadPoolExecutor(max_worker=2)

results = list(pool.map(gcd, numbers))

end = datetime.datetime.now()

print(end - start)

# Process

start = datetime.datetime.now()

pool = ProcessPoolExecutor(max_worker=2)

results = list(pool.map(gcd, numbers))

end = datetime.datetime.now()

print(end - start)

ImportError: cannot import name 'ThreadPoolExecutor'