In [2]:
# Parallel benchmark.
#
# Example:
# time python parallel.py 3
#

import tensorflow as tf
import numpy as np
from tensorflow.python.ops import control_flow_ops
import sys
import time
from game import Chess

import random
def make_heavy_op(myqueue, name):
    """Make an op that loops many times. This op reads off an int32 from "myqueue"
     and increments counter that many times. The return value is a Print op that
     final value of the counter."""

    looplimit = myqueue.dequeue()
    startval = tf.constant(0)
    condition = lambda i: tf.less(i, looplimit)
    increment = lambda i: tf.add(i, 1)
    result = control_flow_ops.while_loop(condition, increment, [startval])
    resultprint = tf.Print(result, [name])
    return resultprint

def parallel_loops(parallelism, num_entries=10, duration=10**4):
    """Enqueues "num_entries*parallelism" heavy ops and tries to execute them
     in parallel, using "parallelism" threads"""
    q = tf.FIFOQueue(num_entries*parallelism, np.int32, shapes=())
    stuff = tf.placeholder(np.int32)
    addstuff = q.enqueue([stuff])
    sess = tf.InteractiveSession()
    for i in range(num_entries*parallelism):
        sess.run([addstuff], feed_dict={stuff: duration})

    sess.run([q.close()])
    ops = [make_heavy_op(q, "heavy op #"+str(i)) for i in range(parallelism)]

    runner = tf.train.QueueRunner(q, ops)

    start_time = time.time()
    threads = runner.create_threads(sess, start=True)
    for t in threads:
        t.join()

    elapsed_time = time.time() - start_time
    print('done in %.2f, %.2f ops/sec'%(elapsed_time, (num_entries*parallelism)/elapsed_time))

In [3]:
parallel_loops(1)

done in 0.78, 12.79 ops/sec


In [92]:
parallel_loops(2)

done in 0.77, 26.11 ops/sec


In [93]:
parallel_loops(3)

done in 0.82, 36.37 ops/sec


In [94]:
parallel_loops(4)

done in 0.89, 45.15 ops/sec


In [7]:
parallel_loops(8)

done in 1.45, 55.32 ops/sec


In [83]:
import multiprocessing

def worker(n):
    """thread worker function"""
    env = Chess()
    turn_count = 0
    num_games = 10
    t0 = time.time()
    for i in range(num_games):
        env.reset()
        while env.get_reward() is None:
            legal_moves = env.get_legal_moves()
            move = random.choice(legal_moves)
            env.make_move(move)
            turn_count += 1
    dt = time.time() - t0
    # print('turn count:', turn_count, 'time:', dt, 'turns/second:', turn_count / dt)    print('Worker:', num)
    return turn_count, dt, turn_count / dt


def run(processes, tasks):
    pool = multiprocessing.Pool(processes=processes)
    res = np.array(pool.map(worker, range(tasks)))
    return res

In [86]:
t0 = time.time()
res = run(1, 16)
dt = time.time() - t0

print('total turn count:', res[:, 0].sum())
print('wall time:', dt)
print('turns/second:', res[:, 0].sum() / dt)

total turn count: 55843.0
wall time: 31.80108404159546
turns/second: 1756.00932116


In [87]:
t0 = time.time()
res = run(2, 16)
dt = time.time() - t0

print('total turn count:', res[:, 0].sum())
print('wall time:', dt)
print('turns/second:', res[:, 0].sum() / dt)

total turn count: 57668.0
wall time: 15.944632053375244
turns/second: 3616.76580601


In [88]:
t0 = time.time()
res = run(4, 16)
dt = time.time() - t0

print('total turn count:', res[:, 0].sum())
print('wall time:', dt)
print('turns/second:', res[:, 0].sum() / dt)

total turn count: 56989.0
wall time: 8.206257104873657
turns/second: 6944.5789075


In [89]:
t0 = time.time()
res = run(8, 16)
dt = time.time() - t0

print('total turn count:', res[:, 0].sum())
print('wall time:', dt)
print('turns/second:', res[:, 0].sum() / dt)

total turn count: 54174.0
wall time: 8.062294960021973
turns/second: 6719.42669781


In [4]:
cluster_spec = tf.train.ClusterSpec({"local": ["localhost:2222", "localhost:2223"]})

In [7]:
cluster_spec.job_tasks('local')

['localhost:2222', 'localhost:2223']