In [1]:
class Counter:
    def __init__(self):
        self.count = 0

    def increment(self, offset):
        self.count += offset

In [2]:
def worker(sensor_index, how_many, counter):
    for _ in range(how_many):
        counter.increment(1)

In [4]:
from threading import Thread
how_many = 10**5
counter = Counter()
threads = []
for i in range(5):
    thread = Thread(target=worker, args=(i, how_many, counter))
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()

expected = how_many* 5
found = counter.count
print(f'Counter should be {expected}, got {found}')

Counter should be 500000, got 295713


In [5]:
#データ競合やデータ構造の破壊を防ぐために相互排他ロックのLock
from threading import Lock
class LockingCounter:
    def __init__(self):
        self.lock = Lock()
        self.count = 0

    def increment(self, offset):
        with self.lock:
            self.count += offset

In [6]:
counter = LockingCounter()
threads = []
for i in range(5):
    thread = Thread(target=worker,
                    args=(i, how_many, counter))
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()


In [8]:
expected = how_many*5 
print(counter.count)

500000


55 スレッド間の協調作業にはQueneを使う

In [9]:
#関数パイプライン
#download, resize, uploadをパイプライン式に実行

In [13]:
from collections import deque
from threading import Lock
class MyQueue:
    def __init__(self):
        self.items = deque()
        self.lock = Lock()

    def put(self, item):
        with self.lock:
            return self.items.append(item)

    def get(self):
        with self.lock:
            self.items.popleft()


    

In [11]:
#パイプラインの各段階、キューから作業を取り出し、関数を実行し、結果をもう一つ別のキューに置くpythonスレッド

In [19]:
from threading import Thread
import time
class Worker(Thread):
    def __init__(self, func, in_queue, out_queue):
        super().__init__()
        self.func = func
        self.in_queue = in_queue
        self.out_queue = out_queue
        self.polled_count = 0
        self.work_done = 0

    def run(self):
        while True:
            self.polled_count += 1
            try:
                item = self.in_queue.get()
            except IndexError:
                time.sleep(0.01)
            else:
                result = self.func(item)
                self.out_queue.put(result)
                self.work_done +=1

In [20]:
def download(item):
    pass
def resize(item):
    pass
def upload(item):
    pass
download_queue = MyQueue()
resize_queue = MyQueue()
upload_queue = MyQueue()
done_queue = MyQueue()
threads = [
    Worker(download, download_queue, resize_queue),
    Worker(resize, resize_queue, upload_queue),
    Worker(upload, upload_queue, done_queue)
]

In [21]:
for thread in threads:
    thread.start()

for _ in range(1000):
    download_queue.put(object())

In [22]:
while len(done_queue.items) < 1000:
    #待っている間何か有益なことをする
    pass

In [23]:
processed = len(done_queue.items)
polled = sum(t.polled_count for t in threads)
print(f'Processed{processed} items after'
        f'polling {polled} times')

Processed1000 items afterpolling 42452 times


In [24]:
#問題1 全ての入力作業が完了したと決めるためdone_queueをビージーウェイトする必要がある
#問題2 Workerではrunメソッドが永久にループする
#問題3 パイプラインの渋滞によってプログラムがどこかでクラッシュすることがある

In [27]:
#queueのQueueはこれらの問題を解決する
from queue import Queue
my_queue = Queue()

def consumer():
    print('Consumer waiting')
    my_queue.get()
    print('Consumer done')

thread = Thread(target = consumer)
thread.start()

Consumer waiting


In [28]:
print('Producer putting')
my_queue.put(object())
thread.join()
print('Producer done')

Producer putting
Consumer done
Producer done


In [29]:
my_queue.put(object())
print('Producer put 1')
my_queue.put(object())
print('Producer put 2')
print('Producer done')
thread.join()

Producer put 1
Producer put 2
Producer done


In [None]:
#Queueクラスではtask_doneメソッドを使って作業進捗の追跡もできる。これによってその段階での入力キーが減ってくるまで待つことができ, パイプラインの終端ええdone_queueをポーリングする必要がなくなる

in_queue = Queue()

def consumer()