In [0]:
from collections import defaultdict
from multiprocessing import Pool, Process
import numpy as np
import psutil
import scipy.signal
import sys
import tensorflow as tf
import time

num_trials = 2

# Count the number of physical CPUs.
num_cpus = psutil.cpu_count(logical=False)
print('Using {} cores.'.format(num_cpus))

################################################
###### Benchmark 1: numerical computation ######
################################################


def f(args):
    image, random_filter = args
    # Do some image processing.
    return scipy.signal.convolve2d(image, random_filter)[::5, ::5]


pool = Pool(num_cpus)

filters = [np.random.normal(size=(4, 4)) for _ in range(num_cpus)]


def run_benchmark():
    image = np.zeros((3000, 3000))
    pool.map(f, zip(num_cpus * [image], filters))


durations1 = []
for _ in range(num_trials):
    start_time = time.time()

    run_benchmark()

    duration1 = time.time() - start_time
    durations1.append(duration1)
    print('Numerical computation workload took {} seconds.'.format(duration1))

###############################################
###### Benchmark 2: stateful computation ######
###############################################


def accumulate_prefixes(args):
    running_prefix_count, running_popular_prefixes, document = args
    for word in document:
        for i in range(1, len(word)):
            prefix = word[:i]
            running_prefix_count[prefix] += 1
            if running_prefix_count[prefix] > 3:
                running_popular_prefixes.add(prefix)
    return running_prefix_count, running_popular_prefixes


pool = Pool(num_cpus)

durations2 = []
for _ in range(num_trials):
    running_prefix_counts = [defaultdict(int) for _ in range(4)]
    running_popular_prefixes = [set() for _ in range(4)]

    start_time = time.time()

    for i in range(10):
        documents = [[np.random.bytes(20) for _ in range(10000)]
                     for _ in range(num_cpus)]
        results = pool.map(
            accumulate_prefixes,
            zip(running_prefix_counts, running_popular_prefixes, documents))
        running_prefix_counts = [result[0] for result in results]
        running_popular_prefixes = [result[1] for result in results]

    popular_prefixes = set()
    for prefixes in running_popular_prefixes:
        popular_prefixes |= prefixes

    duration2 = time.time() - start_time
    durations2.append(duration2)
    print('Stateful computation workload took {} seconds.'.format(duration2))

###################################################
###### Benchmark 3: expensive initialization ######
###################################################


def save_model():
    mnist = tf.keras.datasets.mnist.load_data()
    x_train, y_train = mnist[0]
    x_train = x_train / 255.0
    model = tf.keras.models.Sequential([
        tf.keras.layers.Flatten(input_shape=(28, 28)),
        tf.keras.layers.Dense(512, activation=tf.nn.relu),
        tf.keras.layers.Dropout(0.2),
        tf.keras.layers.Dense(10, activation=tf.nn.softmax)
    ])
    model.compile(
        optimizer='adam',
        loss='sparse_categorical_crossentropy',
        metrics=['accuracy'])
    # Train the model.
    model.fit(x_train, y_train, epochs=1)
    # Save the model to disk.
    filename = '/tmp/model'
    model.save(filename)


# Train and save the model. This has to be done in a separate process because
# otherwise Python multiprocessing will hang when you try do run the code
# below.
p = Process(target=save_model)
p.start()
p.join()

filename = '/tmp/model'


def evaluate_next_batch(i):
    # Pin the process to a specific core if we are on Linux to prevent
    # contention between the different processes since TensorFlow uses
    # multiple threads.
    if sys.platform == 'linux':
        psutil.Process().cpu_affinity([i])
    model = tf.keras.models.load_model(filename)
    mnist = tf.keras.datasets.mnist.load_data()
    x_test = mnist[1][0] / 255.0
    return model.predict(x_test)


pool = Pool(num_cpus)

durations3 = []
for _ in range(num_trials):
    start_time = time.time()

    for _ in range(10):
        pool.map(evaluate_next_batch, range(num_cpus))

    duration3 = time.time() - start_time
    durations3.append(duration3)
    print('Expensive initialization workload took {} seconds.'.format(duration3))

print('Used {} cores.'.format(num_cpus))

print("""
Results:
- Numerical computation: {} +/- {}
- Stateful computation: {} +/- {}
- Expensive initialization: {} +/- {}
""".format(np.mean(durations1), np.std(durations1),
           np.mean(durations2), np.std(durations2),
           np.mean(durations3), np.std(durations3)))

Using 1 cores.
Numerical computation workload took 1.125704050064087 seconds.
Numerical computation workload took 0.8151633739471436 seconds.
Stateful computation workload took 13.845899820327759 seconds.
Stateful computation workload took 13.56156849861145 seconds.


In [0]:
from collections import defaultdict
import numpy as np
import psutil
import scipy.signal
import sys
import tensorflow as tf
import time

num_trials = 5

# Count the number of physical CPUs.
num_cpus = psutil.cpu_count(logical=False)
print('Using {} cores.'.format(num_cpus))

################################################
###### Benchmark 1: numerical computation ######
################################################


def f(image, random_filter):
    # Do some image processing.
    return scipy.signal.convolve2d(image, random_filter)[::5, ::5]


filters = [np.random.normal(size=(4, 4)) for _ in range(num_cpus)]


def run_benchmark():
    image = np.zeros((3000, 3000))
    [f(image, filters[i]) for i in range(num_cpus)]


durations1 = []
for _ in range(num_trials):
    start_time = time.time()

    run_benchmark()

    duration1 = time.time() - start_time
    durations1.append(duration1)
    print('Numerical computation workload took {} seconds.'.format(duration1))

###############################################
###### Benchmark 2: stateful computation ######
###############################################


class StreamingPrefixCount(object):
    def __init__(self):
        self.prefix_count = defaultdict(int)
        self.popular_prefixes = set()

    def add_document(self, document):
        for word in document:
            for i in range(1, len(word)):
                prefix = word[:i]
                self.prefix_count[prefix] += 1
                if self.prefix_count[prefix] > 3:
                    self.popular_prefixes.add(prefix)

    def get_popular(self):
        return self.popular_prefixes


durations2 = []
for _ in range(num_trials):
    streaming_actors = [StreamingPrefixCount() for _ in range(num_cpus)]

    start_time = time.time()

    for i in range(num_cpus * 10):
        document = [np.random.bytes(20) for _ in range(10000)]
        streaming_actors[i % num_cpus].add_document(document)

    # Aggregate all of the results.
    results = [actor.get_popular() for actor in streaming_actors]
    popular_prefixes = set()
    for prefixes in results:
        popular_prefixes |= prefixes

    duration2 = time.time() - start_time
    durations2.append(duration2)
    print('Stateful computation workload took {} seconds.'.format(duration2))

###################################################
###### Benchmark 3: expensive initialization ######
###################################################

mnist = tf.keras.datasets.mnist.load_data()
x_train, y_train = mnist[0]
x_train = x_train / 255.0
model = tf.keras.models.Sequential([
    tf.keras.layers.Flatten(input_shape=(28, 28)),
    tf.keras.layers.Dense(512, activation=tf.nn.relu),
    tf.keras.layers.Dropout(0.2),
    tf.keras.layers.Dense(10, activation=tf.nn.softmax)
])
model.compile(
    optimizer='adam',
    loss='sparse_categorical_crossentropy',
    metrics=['accuracy'])
# Train the model.
model.fit(x_train, y_train, epochs=1)
# Save the model to disk.
filename = '/tmp/model'
model.save(filename)


class Model(object):
    def __init__(self):
        # Load the model and some data.
        self.model = tf.keras.models.load_model(filename)
        mnist = tf.keras.datasets.mnist.load_data()
        self.x_test = mnist[1][0] / 255.0

    def evaluate_next_batch(self):
        # Note that we reuse the same data over and over, but in a
        # real application, the data would be different each time.
        return self.model.predict(self.x_test)


actor = Model()

durations3 = []
for _ in range(num_trials):
    start_time = time.time()

    for j in range(10):
        results = [actor.evaluate_next_batch() for _ in range(num_cpus)]

    duration3 = time.time() - start_time
    durations3.append(duration3)
    print('Expensive initialization workload took {} seconds.'.format(duration3))

print('Used {} cores.'.format(num_cpus))

print("""
Results:
- Numerical computation: {} +/- {}
- Stateful computation: {} +/- {}
- Expensive initialization: {} +/- {}
""".format(np.mean(durations1), np.std(durations1),
           np.mean(durations2), np.std(durations2),
           np.mean(durations3), np.std(durations3)))

Using 1 cores.
Numerical computation workload took 0.6866893768310547 seconds.
Numerical computation workload took 0.6582543849945068 seconds.
Numerical computation workload took 0.6543757915496826 seconds.
Numerical computation workload took 0.6595430374145508 seconds.
Numerical computation workload took 0.6546082496643066 seconds.
Stateful computation workload took 2.800020217895508 seconds.
Stateful computation workload took 2.8248794078826904 seconds.
Stateful computation workload took 2.8060741424560547 seconds.
Stateful computation workload took 2.7280027866363525 seconds.
Stateful computation workload took 2.8515570163726807 seconds.
Instructions for updating:
If using Keras pass *_constraint arguments to layers.
INFO:tensorflow:Assets written to: /tmp/model/assets
Expensive initialization workload took 4.910231828689575 seconds.
Expensive initialization workload took 4.859525442123413 seconds.
Expensive initialization workload took 4.853824853897095 seconds.
Expensive initializ

Just the model part


In [0]:
from collections import defaultdict
from multiprocessing import Pool, Process
import numpy as np
import psutil
import scipy.signal
import sys
import tensorflow as tf
import time

num_trials = 5

# Count the number of physical CPUs.
num_cpus = psutil.cpu_count(logical=False)


def save_model():
    mnist = tf.keras.datasets.mnist.load_data()
    x_train, y_train = mnist[0]
    x_train = x_train / 255.0
    model = tf.keras.models.Sequential([
        tf.keras.layers.Flatten(input_shape=(28, 28)),
        tf.keras.layers.Dense(512, activation=tf.nn.relu),
        tf.keras.layers.Dropout(0.2),
        tf.keras.layers.Dense(10, activation=tf.nn.softmax)
    ])
    model.compile(
        optimizer='adam',
        loss='sparse_categorical_crossentropy',
        metrics=['accuracy'])
    # Train the model.
    model.fit(x_train, y_train, epochs=1)
    # Save the model to disk.
    filename = '/tmp/model'
    model.save(filename)


# Train and save the model. This has to be done in a separate process because
# otherwise Python multiprocessing will hang when you try do run the code
# below.
p = Process(target=save_model)
p.start()
p.join()

filename = '/tmp/model'


def evaluate_next_batch(i):
    # Pin the process to a specific core if we are on Linux to prevent
    # contention between the different processes since TensorFlow uses
    # multiple threads.
    if sys.platform == 'linux':
        psutil.Process().cpu_affinity([i])
    model = tf.keras.models.load_model(filename)
    mnist = tf.keras.datasets.mnist.load_data()
    x_test = mnist[1][0] / 255.0
    return model.predict(x_test)


pool = Pool(num_cpus)

durations3 = []
for _ in range(num_trials):
    start_time = time.time()

    for _ in range(10):
        pool.map(evaluate_next_batch, range(num_cpus))

    duration3 = time.time() - start_time
    durations3.append(duration3)
    print('Expensive initialization workload took {} seconds.'.format(duration3))

print('Used {} cores.'.format(num_cpus))


In [0]:
sys.platform

'linux'

In [0]:
from multiprocessing import Pool
import psutil
import sys
import tensorflow as tf


In [0]:

num_cpus = psutil.cpu_count(logical=False)

filename = '/tmp/model'


In [0]:

def evaluate_next_batch(i):
    # Pin the process to a specific core if we are on Linux to prevent
    # contention between the different processes since TensorFlow uses
    # multiple threads.
    if sys.platform == 'linux':
        psutil.Process().cpu_affinity([i])
    model = tf.keras.models.load_model(filename)
    mnist = tf.keras.datasets.mnist.load_data()
    x_test = mnist[1][0] / 255.0
    return model.predict(x_test)


In [0]:

pool = Pool(num_cpus)


In [0]:

for _ in range(10):
    #pool.map(evaluate_next_batch, range(num_cpus))
    print(_)
    print(evaluate_next_batch)
    print(num_cpus)



In [0]:
def evaluate_next_batch(i):
  if sys.platform == 'linux':
    psutil.Process().cpu_affinity([i])
  model=tf.keras.models.load_model(filename)
  mnist = tf.keras.datasets.mnist.load_data()
  x_test = mnist[1][0] / 255.0
  model.predict(x_test)

In [0]:
for i in range(10):
  print(range(num_cpus))

range(0, 1)
range(0, 1)
range(0, 1)
range(0, 1)
range(0, 1)
range(0, 1)
range(0, 1)
range(0, 1)
range(0, 1)
range(0, 1)


array([[4.38016087e-07, 1.12755416e-07, 4.86328972e-05, ...,
        9.99576747e-01, 2.68568033e-06, 4.34771209e-05],
       [2.29172492e-05, 5.72527852e-03, 9.75866377e-01, ...,
        2.09616982e-08, 1.92292660e-04, 5.00768760e-09],
       [1.68155566e-05, 9.97583270e-01, 4.19404620e-04, ...,
        1.36637106e-03, 1.76737245e-04, 1.80361058e-05],
       ...,
       [3.46943239e-08, 4.45942391e-07, 4.06168766e-07, ...,
        5.70913762e-05, 1.68910576e-03, 6.25970133e-04],
       [1.70961607e-06, 8.89005094e-07, 1.33031941e-08, ...,
        3.23465287e-07, 5.03362331e-04, 4.81461804e-08],
       [3.05578737e-06, 1.81200621e-09, 6.46509898e-06, ...,
        3.89563604e-10, 1.62250387e-07, 1.32534881e-08]], dtype=float32)

In [0]:
import tensorflow as tf

mnist = tf.keras.datasets.mnist.load_data()
x_train, y_train = mnist[0]
x_train = x_train / 255.0
model = tf.keras.models.Sequential([
    tf.keras.layers.Flatten(input_shape=(28, 28)),
    tf.keras.layers.Dense(512, activation=tf.nn.relu),
    tf.keras.layers.Dropout(0.2),
    tf.keras.layers.Dense(10, activation=tf.nn.softmax)
])
model.compile(
    optimizer='adam',
    loss='sparse_categorical_crossentropy',
    metrics=['accuracy'])
# Train the model.
model.fit(x_train, y_train, epochs=1)
# Save the model to disk.
filename = '/tmp/model'
model.save(filename)

Instructions for updating:
If using Keras pass *_constraint arguments to layers.
INFO:tensorflow:Assets written to: /tmp/model/assets
