In [2]:
#--------------------------------------------------
# only needed when run directly with "spark-submit"
# OR when executing from the python-shell
#--------------------------------------------------

# IMPORTENT: SPARK_HOME and PYTHONPATH need to be set first

from pyspark import SparkConf
from pyspark import SparkContext

conf = SparkConf()

# Running it on the cluster
#conf.setMaster('spark://192.168.17.1:7077')

# Running it locally on master-node with 4 threads
conf.setMaster('local[4]')

conf.setAppName('mnist_logistic_regression')
sc = SparkContext(conf=conf)

print("CONNECTED.");

ImportError: No module named pyspark

In [2]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS
from pyspark.mllib.util import MLUtils
from pyspark.mllib.evaluation import MulticlassMetrics
import datetime

now = datetime.datetime.now()
print("--- Started at: %d:%d:%d ---" % (now.hour, now.minute, now.second))

# Load training data in LIBSVM format
# loadLibSVMFile => RDD[LabeledPoint]

print("Load training and test data in LIBSVM format...")
#training = MLUtils.loadLibSVMFile(sc, "/home/farmer/scripts/mnist_train_600.libsvm", False, 784)
#test = MLUtils.loadLibSVMFile(sc, "/home/farmer/scripts/mnist_test_100.libsvm", False, 784)

data = MLUtils.loadLibSVMFile(sc, "/home/farmer/scripts/mnist_train.libsvm")

print("Data loaded.")

--- Started at: 15:41:24 ---
Load training and test data in LIBSVM format...
Data loaded.


In [3]:
# Split data into training (60%) and test (40%)
# randomSplit => Array[RDD[T]]

training, test = data.randomSplit([0.9, 0.1], seed=11L)
#test, unused = test.randomSplit([1.0, 0.0], seed=11L)

print("Data prepared.")

Data prepared.


In [None]:
training.cache()

# Run training algorithm to build the model

print("Run training algorithm to build the model...")
model = LogisticRegressionWithLBFGS.train(training, numClasses=10)

# Compute raw scores on the test set

print("Compute raw scores on the test set...")
predictionAndLabels = test.map(lambda lp: (float(model.predict(lp.features)), lp.label))

# Instantiate metrics object

metrics = MulticlassMetrics(predictionAndLabels)

# Overall statistics

precision = metrics.precision()
recall = metrics.recall()
f1Score = metrics.fMeasure()
print("Summary Stats")
print("Precision = %s" % precision)
print("Recall = %s" % recall)
print("F1 Score = %s" % f1Score)

Run training algorithm to build the model...


In [None]:
# Statistics by class

labels = data.map(lambda lp: lp.label).distinct().collect()
for label in sorted(labels):
    print("Class %s precision = %s" % (label, metrics.precision(label)))
    print("Class %s recall = %s" % (label, metrics.recall(label)))
    print("Class %s F1 Measure = %s" % (label, metrics.fMeasure(label, beta=1.0)))

# Weighted stats

print("Weighted recall = %s" % metrics.weightedRecall)
print("Weighted precision = %s" % metrics.weightedPrecision)
print("Weighted F(1) Score = %s" % metrics.weightedFMeasure())
print("Weighted F(0.5) Score = %s" % metrics.weightedFMeasure(beta=0.5))
print("Weighted false positive rate = %s" % metrics.weightedFalsePositiveRate)

now = datetime.datetime.now()
print("--- Finished at: %d:%d:%d ---" % (now.hour, now.minute, now.second))

In [None]:
import random as ran
import time

start_time = time.time()

def sample(p):
    x, y = ran.random(), ran.random()
    return 1 if x*x + y*y < 1 else 0

NUM_SAMPLES = 10*1000*1000

print("Mapping...")
mappedOutput = sc.parallelize(xrange(0, NUM_SAMPLES)).map(sample)

print("Reducing...")
count = mappedOutput.reduce(lambda a, b: a + b)

print("Pi is roughly %f" % (4.0 * count / NUM_SAMPLES))
print("--- %s seconds ---" % (time.time() - start_time))