In [1]:
!pip3 install tqdm requests dill

[33mYou are using pip version 8.1.1, however version 18.1 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.[0m


In [2]:
import requests
from tqdm import tqdm
import os

def download_from_url(url, dst):
    file_size = int(requests.head(url).headers["Content-Length"])
    if os.path.exists(dst):
        first_byte = os.path.getsize(dst)
    else:
        first_byte = 0
    if first_byte >= file_size:
        return file_size
    header = {"Range": "bytes=%s-%s" % (first_byte, file_size)}
    pbar = tqdm(
        total=file_size, initial=first_byte,
        unit='B', unit_scale=True, desc=url.split('/')[-1])
    req = requests.get(url, headers=header, stream=True)
    with(open(dst, 'ab')) as f:
        for chunk in req.iter_content(chunk_size=1024):
            if chunk:
                f.write(chunk)
                pbar.update(1024)
    pbar.close()
    return file_size

def load_mnist(path, kind='train'):
    import os
    import gzip
    import numpy as np

    """Load MNIST data from `path`"""
    labels_path = os.path.join(path,
                               '%s-labels-idx3-ubyte.gz'
                               % kind)
    images_path = os.path.join(path,
                               '%s-images-idx3-ubyte.gz'
                               % kind)

    with gzip.open(labels_path, 'rb') as lbpath:
        labels = np.frombuffer(lbpath.read(), dtype=np.uint8,
                               offset=8)

    with gzip.open(images_path, 'rb') as imgpath:
        images = np.frombuffer(imgpath.read(), dtype=np.uint8,
                               offset=16).reshape(len(labels), 784)

    return images, labels

In [3]:
download_from_url('http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/train-images-idx3-ubyte.gz', 
                  'fashion/train-images-idx3-ubyte.gz')
download_from_url('http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/train-labels-idx1-ubyte.gz', 
                  'fashion/train-labels-idx3-ubyte.gz')

download_from_url('http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/t10k-images-idx3-ubyte.gz', 
                  'fashion/t10k-images-idx3-ubyte.gz')
download_from_url('http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/t10k-labels-idx1-ubyte.gz', 
                  'fashion/t10k-labels-idx3-ubyte.gz')

5148

In [4]:
X_train, y_train = load_mnist('fashion', kind='train')
X_test, y_test = load_mnist('fashion', kind='t10k')

In [5]:
from pyspark.ml.linalg import Vectors
df_train = map(lambda i: (int(y_train[i]), Vectors.dense(X_train[i])), range(len(X_train)))
df_test = map(lambda i: (int(y_test[i]), Vectors.dense(X_test[i])), range(len(X_test)))

In [6]:
from pyspark.sql import SparkSession
sparkSession = SparkSession.builder \
        .appName("examples") \
        .master('local[4]').config('spark.driver.memory', '8g') \
        .getOrCreate()
df_train = sparkSession.createDataFrame(df_train,schema=["labels", "features"])
df_test = sparkSession.createDataFrame(df_test,schema=["labels", "features"])

In [16]:
import tensorflow.contrib.slim as slim
import tensorflow as tf
import inception_v1

def inception():
    x = tf.placeholder(tf.float32, shape=[None, 784], name='x')
    y = tf.placeholder(tf.int32, shape=[None, 1], name='y')
    y = tf.reshape(y, [-1])
    x = tf.reshape(x, shape=[-1, 28, 28, 1])
    x = tf.image.grayscale_to_rgb(x)
    x = tf.image.resize_images(x, (224, 224))
    with slim.arg_scope(inception_v1.inception_v1_arg_scope()):
        logits, endpoints = inception_v1.inception_v1(
            x, num_classes = 10, is_training = True)
    z = tf.argmax(logits, 1, name='out')
    loss = tf.losses.sparse_softmax_cross_entropy(y,logits)
    return loss

In [17]:
from sparkflow.graph_utils import build_graph
from sparkflow.tensorflow_async import SparkAsyncDL
from pyspark.ml.pipeline import Pipeline
from sparkflow.graph_utils import build_adam_config
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

mg = build_graph(inception)
adam_config = build_adam_config(learning_rate=0.001, beta1=0.9, beta2=0.999)

In [18]:
spark_model = SparkAsyncDL(
    inputCol='features',
    tensorflowGraph=mg,
    tfInput='x:0',
    tfLabel='y:0',
    tfOutput='out:0',
    tfOptimizer='adam',
    miniBatchSize=16,
    miniStochasticIters=1,
    shufflePerIter=True,
    iters=10,
    predictionCol='predicted',
    labelCol='labels',
    partitions=3,
    verbose=1,
    optimizerOptions=adam_config
)

In [None]:
fitted_model = spark_model.fit(df_train)

 * Serving Flask app "sparkflow.HogwildSparkModel" (lazy loading)
 * Environment: production
   Use a production WSGI server instead.
 * Debug mode: off


### This is probably took a very long time

![alt text](my-cpu.png)