### Overview

This notebook is used to showcase the capability of SnapML - Spark APIs to accept Spark DataFrames.

In Spark, a DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs.

By supporting Spark Dataframes, SnapML API can accept data, for ML Training, coming from a variety of Data sources & in different data formats.

This notebook particularly focusses on reading `parquet` formated file residing in `HDFS` filesystem.

Prequisties to run this Notebook are,
1. Ensure all the Spark python libraries are exported before this `jupyter notebook` is started,
```
export PYTHONPATH=/mnt/pai/home/josamuel/spark-2.3.0-bin-hadoop2.7/python/:/mnt/pai/home/josamuel/spark-2.3.0-bin-hadoop2.7/python/lib/pyspark.zip:/mnt/pai/home/josamuel/spark-2.3.0-bin-hadoop2.7/python/lib/py4j-0.10.6-src.zip
```
2. Install `snapml-spark 1.3.0` conda package for SnapML libraries which comes with `PowerAI 1.6.1` release.
```
conda install snapml-spark
```
3. Start a Spark cluster and update the Spark Master IP appropriately.
4. Make sure Hadoop cluster is configured and make the necessary changes to the below cells with the right Namenode.


In [1]:
from pyspark import SparkConf, SparkContext
from pyspark.sql.session import SparkSession
import time

### Create a Spark Session

In [2]:
spark = SparkSession.builder\
 .appName("Spark-SnapML")\
 .master("spark://sparkMaster:7077")\
 .config("spark.jars", "/mnt/pai/home/josamuel/anaconda3/envs/py36/snap-ml-spark/lib/snap-ml-spark-v1.3.0-ppc64le.jar")\
 .config("spark.default.parallelism",4)\
 .config("spark.executor.extraJavaOptions","-XX:MaxDirectMemorySize=8192m")\
 .getOrCreate()

In [3]:
spark

In [4]:
from snap_ml_spark import LogisticRegression as snapml_LogisticRegression
from snap_ml_spark.Metrics import accuracy, logisticLoss

In [5]:
filename = '/mnt/pai/home/josamuel/data/criteo.kaggle2014'
train_filename = filename + '-train.libsvm'
test_filename = filename + '-test.libsvm'

n_features_ = 1000000

In [6]:
train_data_df = spark.read.format("libsvm")\
    .option("numFeatures", str(n_features_))\
    .load(train_filename)\
    .repartition(4)\
    .cache()
    
test_data_df = spark.read.format("libsvm")\
    .option("numFeatures", str(n_features_))\
    .load(test_filename)\
    .repartition(4)\
    .cache()
    
print("Count is " + str(train_data_df.count()))
print("Count is " + str(test_data_df.count()))

Count is 7500
Count is 2500


#### Dump the test dataset into HDFS in parquet format

Note enable the below cell only for the first time to dump the data into HDFS.

In [7]:
# train_data_df.write.mode('overwrite').parquet("hdfs://hdfs_nameNode/train_data.parquet") 
# test_data_df.write.mode('overwrite').parquet("hdfs://hdfs_nameNode/test_data.parquet")

### SnapML: Parquet Format from HDFS

In [8]:
train_data_p_df = spark.read.parquet("hdfs://hdfs_nameNode/train_data.parquet")
test_data_p_df = spark.read.parquet("hdfs://hdfs_nameNode/test_data.parquet")

In [10]:
use_gpu = True

In [11]:
# Create snapML Logistic Regression
snapml_regularizer = 10.0
if use_gpu:
    snapml_lr = snapml_LogisticRegression(max_iter=100, regularizer=snapml_regularizer, 
                                          verbose=False, dual=True, use_gpu=use_gpu, n_threads=-1)
else:
    snapml_lr = snapml_LogisticRegression(max_iter=50, regularizer=snapml_regularizer, 
                                          verbose=False, dual=True, use_gpu=use_gpu, n_threads=-1)

In [13]:
# Fit the model and time it
snapml_t0 = time.time()
snapml_lr.fit(train_data_p_df)
snapml_time = time.time() - snapml_t0
print("Train DF time: %.2f" % snapml_time, 's')

Train DF time: 16.16 s


In [14]:
# Perform inference on test data
pred = snapml_lr.predict(test_data_p_df)

# Compute accuracy
snapml_accuracy  = accuracy(pred)

# Print off SnapML  result
print('snapML  accuracy: %.4f' %snapml_accuracy, ", time: %.2f" % snapml_time, 's')

snapML  accuracy: 0.7840 , time: 16.16 s


### SparkML LogisticRegression

In [19]:
from pyspark.ml.classification import LogisticRegression as sparkml_LogisticRegression

n_examples = train_data_p_df.count()

# Create sparkML lib Logistic Regression
sparkml_lr = sparkml_LogisticRegression(fitIntercept=False, 
                                        regParam=snapml_regularizer/n_examples, standardization=False)

In [20]:
# Fit the model and time it
sparkml_t0 = time.time()
sparkml_lr_model = sparkml_lr.fit(train_data_p_df)
sparkml_time = time.time() - sparkml_t0
print("Train SparkML DF time: %.2f" % sparkml_time, 's')

Train SparkML DF time: 40.42 s


In [21]:
# Perform inference on test data
predictions = sparkml_lr_model.transform(test_data_p_df)

# Compute accuracy
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
sparkml_accuracy = evaluator.evaluate(predictions)

# Print off Spark result
print('Spark ML', evaluator.getMetricName(),'=', sparkml_accuracy)

Spark ML accuracy = 0.784


&copy; Copyright IBM Corporation 2018