# ADZD Lab 6 - MLlib

## Autor: Paweł Mendroch

# Spark MLlib Exercises


http://spark.apache.org/docs/latest/ml-statistics.html

In [1]:
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
spark = SparkSession.builder.getOrCreate()

In [2]:
spark

## 1. Statistics

In [4]:
df = spark.createDataFrame([
(-1.80e+0,  5.60e+0, 0  ),
( 1.20e+0, -1.30e+0, 1  ),
( 2.40e-1,  9.20e-1, -1 ),
( 1.50e-1,  5.50e+0, 2  ),
( 1.80e+0,  3.00e-1, 1  ),
(-1.80e+0,  3.10e+0, 0  ),
( 2.20e+0,  8.70e+0, 0  ),
(-8.70e-1, -1.60e+0, 1  ),
(-1.50e+0,  2.10e+0, -2 ),
(-9.90e-1, -5.10e+0, 2  ),
( 2.00e+0,  3.10e-2, 0  ),
(-9.50e-1,  2.60e+0, 0  ),
(-1.80e+0, -7.80e-2, 0  ),
(-6.30e-1,  1.70e+0, 1  ),
( 2.40e-1,  7.10e-1, 0  ),
(-2.50e-1, -2.20e+0, 0  ),
( 1.70e+0,  6.60e-1, 2  ),
(-2.50e+0,  6.80e-1, -1 ),
(-1.10e+0,  3.50e+0, -2 ),
(-1.30e+0,  3.40e+0, 0  ),
( 3.60e-1,  6.50e+0, 0  ),
(-1.80e+0,  6.70e+0, -1 ),
( 8.40e-1, -6.60e-1, 2  ),
(-1.60e-1,  1.80e+0, 1  ),
(-2.80e+0,  1.60e+0, 0  ),
(-8.70e-2,  4.70e-1, 1  ),
( 7.50e-1,  6.10e+0, 0  ),
( 8.90e-1,  5.40e+0, -2 ),
(-4.30e-1,  5.20e+0, -1 ),
(-7.00e-1 ,  5.80e+0 , 0)
], ["x1", "x2", "x3"])

### Exercise 1.A.
**TODO:** Calculate descriptive statistics (see https://spark.apache.org/docs/1.6.1/api/java/org/apache/spark/sql/DataFrame.html#describe(scala.collection.Seq))

In [5]:
df.describe("x1", "x2", "x3").show()

+-------+-------------------+------------------+-------------------+
|summary|                 x1|                x2|                 x3|
+-------+-------------------+------------------+-------------------+
|  count|                 30|                30|                 30|
|   mean|-0.3032333333333333|2.2710999999999997|0.13333333333333333|
| stddev| 1.3388872807576868|3.1520939329387003| 1.1366415543118709|
|    min|               -2.8|              -5.1|                 -2|
|    max|                2.2|               8.7|                  2|
+-------+-------------------+------------------+-------------------+



### Exercise 1.B.

**TODO:** Check which features have normal distribution (see http://spark.apache.org/docs/latest/api/java/org/apache/spark/ml/stat/KolmogorovSmirnovTest.html)

In [24]:
from pyspark.ml.stat import KolmogorovSmirnovTest

ks_test = KolmogorovSmirnovTest()


for x in ks_test.test(df, "x1", "norm"):
    print(x)

Column<b'pValue'>
Column<b'statistic'>


## 2. Loading data

Doc: http://spark.apache.org/docs/latest/ml-datasource.html 

Loading data from file data/mllib/sample_libsvm_data.txt

In [25]:
file = "file:///usr/local/spark/data/mllib/sample_libsvm_data.txt"

df = spark.read.format("libsvm").option("numFeatures", "780").load(file)
df.show(10)
df.take(1)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(780,[127,128,129...|
|  1.0|(780,[158,159,160...|
|  1.0|(780,[124,125,126...|
|  1.0|(780,[152,153,154...|
|  1.0|(780,[151,152,153...|
|  0.0|(780,[129,130,131...|
|  1.0|(780,[158,159,160...|
|  1.0|(780,[99,100,101,...|
|  0.0|(780,[154,155,156...|
|  0.0|(780,[127,128,129...|
+-----+--------------------+
only showing top 10 rows



[Row(label=0.0, features=SparseVector(780, {127: 51.0, 128: 159.0, 129: 253.0, 130: 159.0, 131: 50.0, 154: 48.0, 155: 238.0, 156: 252.0, 157: 252.0, 158: 252.0, 159: 237.0, 181: 54.0, 182: 227.0, 183: 253.0, 184: 252.0, 185: 239.0, 186: 233.0, 187: 252.0, 188: 57.0, 189: 6.0, 207: 10.0, 208: 60.0, 209: 224.0, 210: 252.0, 211: 253.0, 212: 252.0, 213: 202.0, 214: 84.0, 215: 252.0, 216: 253.0, 217: 122.0, 235: 163.0, 236: 252.0, 237: 252.0, 238: 252.0, 239: 253.0, 240: 252.0, 241: 252.0, 242: 96.0, 243: 189.0, 244: 253.0, 245: 167.0, 262: 51.0, 263: 238.0, 264: 253.0, 265: 253.0, 266: 190.0, 267: 114.0, 268: 253.0, 269: 228.0, 270: 47.0, 271: 79.0, 272: 255.0, 273: 168.0, 289: 48.0, 290: 238.0, 291: 252.0, 292: 252.0, 293: 179.0, 294: 12.0, 295: 75.0, 296: 121.0, 297: 21.0, 300: 253.0, 301: 243.0, 302: 50.0, 316: 38.0, 317: 165.0, 318: 253.0, 319: 233.0, 320: 208.0, 321: 84.0, 328: 253.0, 329: 252.0, 330: 165.0, 343: 7.0, 344: 178.0, 345: 252.0, 346: 240.0, 347: 71.0, 348: 19.0, 349: 28.0

### Exercise 2.A
**TODO:** Load wine data from https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/multiclass/wine.scale
Dataset description: http://archive.ics.uci.edu/ml/datasets/Wine

In [27]:
file = "wine.scale"

winedf = spark.read.format("libsvm").load(file)
winedf.show(10)
winedf.take(1)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  1.0|(13,[0,1,2,3,4,5,...|
|  1.0|(13,[0,1,2,3,4,5,...|
|  1.0|(13,[0,1,2,3,4,5,...|
|  1.0|(13,[0,1,2,3,4,5,...|
|  1.0|(13,[0,1,2,3,4,5,...|
|  1.0|(13,[0,1,2,3,4,5,...|
|  1.0|(13,[0,1,2,3,4,5,...|
|  1.0|(13,[0,1,2,3,4,5,...|
|  1.0|(13,[0,1,2,3,4,5,...|
|  1.0|(13,[0,1,2,3,4,5,...|
+-----+--------------------+
only showing top 10 rows



[Row(label=1.0, features=SparseVector(13, {0: 0.6842, 1: -0.6166, 2: 0.1444, 3: -0.4845, 4: 0.2391, 5: 0.2552, 6: 0.1477, 7: -0.434, 8: 0.1861, 9: -0.256, 10: -0.0894, 11: 0.9414, 12: 0.1227}))]

## 3. Classification

Learn a decision tree to classify wines (see http://spark.apache.org/docs/latest/ml-classification-regression.html#decision-tree-classifier). Split data into training and testing sets: winedf.randomSplit([0.7, 0.3]).  

In [196]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = winedf.randomSplit([0.7, 0.3])

# Train a DecisionTree model.
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features")

model = dt.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "label", "features").show(10)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g " % (1.0 - accuracy))

+----------+-----+--------------------+
|prediction|label|            features|
+----------+-----+--------------------+
|       1.0|  1.0|(13,[0,1,2,3,4,5,...|
|       1.0|  1.0|(13,[0,1,2,3,4,5,...|
|       1.0|  1.0|(13,[0,1,2,3,4,5,...|
|       1.0|  1.0|(13,[0,1,2,3,4,5,...|
|       1.0|  1.0|(13,[0,1,2,3,4,5,...|
|       1.0|  1.0|(13,[0,1,2,3,4,5,...|
|       1.0|  1.0|(13,[0,1,2,3,4,5,...|
|       1.0|  1.0|(13,[0,1,2,3,4,5,...|
|       1.0|  1.0|(13,[0,1,2,3,4,5,...|
|       1.0|  1.0|(13,[0,1,2,3,4,5,...|
+----------+-----+--------------------+
only showing top 10 rows

Test Error = 0.112903 


### Exercise 3.A
**TODO:** Add VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4) to the pipeline and compare the accuracy and the trees. 

In [42]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = winedf.randomSplit([0.7, 0.3])

featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(winedf)

# Train a DecisionTree model.
dt = DecisionTreeClassifier(labelCol="label", featuresCol="indexedFeatures")

pipeline = Pipeline(stages=[featureIndexer, dt])

# model = dt.fit(trainingData)
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "label", "features").show(10)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g " % (1.0 - accuracy))

+----------+-----+--------------------+
|prediction|label|            features|
+----------+-----+--------------------+
|       1.0|  1.0|(13,[0,1,2,3,4,5,...|
|       1.0|  1.0|(13,[0,1,2,3,4,5,...|
|       1.0|  1.0|(13,[0,1,2,3,4,5,...|
|       1.0|  1.0|(13,[0,1,2,3,4,5,...|
|       2.0|  1.0|(13,[0,1,2,3,4,5,...|
|       1.0|  1.0|(13,[0,1,2,3,4,5,...|
|       1.0|  1.0|(13,[0,1,2,3,4,5,...|
|       1.0|  1.0|(13,[0,1,2,3,4,5,...|
|       1.0|  1.0|(13,[0,1,2,3,4,5,...|
|       1.0|  1.0|(13,[0,1,2,3,4,5,...|
+----------+-----+--------------------+
only showing top 10 rows

Test Error = 0.0638298 


Load csv wine dataset from https://gist.github.com/tijptjik/9408623#file-wine-csv

**Note!** Dots (.) should be removed in the header.

In [204]:
file = "wine.csv"
winedf2 = spark.read.format("csv")\
    .options(inferSchema="true", header="true")\
    .load(file)\
    .withColumnRenamed("Wine", "label")\
    .withColumnRenamed("Malic.acid", "Malic acid")\
    .withColumnRenamed("Nonflavanoid.phenols", "Nonflavanoid phenols")\
    .withColumnRenamed("Color.int", "Color int")
winedf2.show(10)
print(winedf2.dtypes)

+-----+-------+----------+----+----+---+-------+----------+--------------------+-------+---------+----+----+-------+
|label|Alcohol|Malic acid| Ash| Acl| Mg|Phenols|Flavanoids|Nonflavanoid phenols|Proanth|Color int| Hue|  OD|Proline|
+-----+-------+----------+----+----+---+-------+----------+--------------------+-------+---------+----+----+-------+
|    1|  14.23|      1.71|2.43|15.6|127|    2.8|      3.06|                0.28|   2.29|     5.64|1.04|3.92|   1065|
|    1|   13.2|      1.78|2.14|11.2|100|   2.65|      2.76|                0.26|   1.28|     4.38|1.05| 3.4|   1050|
|    1|  13.16|      2.36|2.67|18.6|101|    2.8|      3.24|                 0.3|   2.81|     5.68|1.03|3.17|   1185|
|    1|  14.37|      1.95| 2.5|16.8|113|   3.85|      3.49|                0.24|   2.18|      7.8|0.86|3.45|   1480|
|    1|  13.24|      2.59|2.87|21.0|118|    2.8|      2.69|                0.39|   1.82|     4.32|1.04|2.93|    735|
|    1|   14.2|      1.76|2.45|15.2|112|   3.27|      3.39|     

### Exercise 3.B
**TODO:** 
Discretize all features and apply VectorAssembler. Next apply VectorIndexer and DecisionTreeClassifier like above.

In [239]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

cols = [i for i in winedf2.columns if i != "label"]

assembler = VectorAssembler(inputCols=cols, outputCol="features")
winedf3 = assembler.transform(winedf2).drop(*cols)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = winedf3.randomSplit([0.7, 0.3])

featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(winedf3)

# Train a DecisionTree model.
dt = DecisionTreeClassifier(labelCol="label", featuresCol="indexedFeatures")

pipeline = Pipeline(stages=[featureIndexer, dt])

# model = dt.fit(trainingData)
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "label", "features").show(10)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g " % (1.0 - accuracy))

+----------+-----+--------------------+
|prediction|label|            features|
+----------+-----+--------------------+
|       1.0|    1|[13.05,1.73,2.04,...|
|       1.0|    1|[13.05,1.77,2.1,1...|
|       1.0|    1|[13.16,2.36,2.67,...|
|       2.0|    1|[13.24,3.98,2.29,...|
|       1.0|    1|[13.29,1.97,2.68,...|
|       1.0|    1|[13.41,3.84,2.12,...|
|       1.0|    1|[13.5,1.81,2.61,2...|
|       1.0|    1|[13.68,1.83,2.36,...|
|       1.0|    1|[13.74,1.67,2.25,...|
|       1.0|    1|[13.87,1.9,2.8,19...|
+----------+-----+--------------------+
only showing top 10 rows

Test Error = 0.0862069 


## 4. Text classification

### Exercise 4.A
**TODO:** 
Build a pipeline consisting of Tokenizer, HashingTF and LogisticRegression, fit it to training data: 

    (0, "This example follows simple text document", 1.0),
    (1, "Ten przykład jest zgodny z prostym dokumentem tekstowym", 0.0),
    (2, "This example covers concepts of Estimator", 1.0),
    (3, "Ten przykład obejmuje koncepcje Estymatora", 0.0)

Then test the model on test data:
    
    (4, "The image data source is used to load image files from a directory"),
    (5, "To źródło danych obrazu służy do ładowania plików obrazów z katalogu"),
    (6, "The LIBSVM data source is used to load libsvm type files from a directory"),
    (7, "To źródło danych LIBSVM służy do ładowania plików typu libsvm z katalogu")

In [163]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer

# Prepare training documents from a list of (id, text, label) tuples.

trainingData = spark.createDataFrame([
    (0, "This example follows simple text document", 1.0),
    (1, "Ten przykład jest zgodny z prostym dokumentem tekstowym", 0.0),
    (2, "This example covers concepts of Estimator", 1.0),
    (3, "Ten przykład obejmuje koncepcje Estymatora", 0.0)], ["id", "text", "label"])

# Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.

tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

# Fit the pipeline to training documents.

model = pipeline.fit(trainingData)

# Prepare test documents, which are unlabeled (id, text) tuples.

testData = spark.createDataFrame([
    (4, "The image data source is used to load image files from a directory"),
    (5, "To źródło danych obrazu służy do ładowania plików obrazów z katalogu"),
    (6, "The LIBSVM data source is used to load libsvm type files from a directory"),
    (7, "To źródło danych LIBSVM służy do ładowania plików typu libsvm z katalogu")], ["id", "text"])

# Make predictions on test documents and print columns of interest.

prediction = model.transform(testData)
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
    rid, text, prob, prediction = row
    print("(%d, %s) --> prob=%s, prediction=%f" % (rid, text, str(prob), prediction))

(4, The image data source is used to load image files from a directory) --> prob=[0.4989747792078863,0.5010252207921136], prediction=1.000000
(5, To źródło danych obrazu służy do ładowania plików obrazów z katalogu) --> prob=[0.6713571892386851,0.3286428107613149], prediction=0.000000
(6, The LIBSVM data source is used to load libsvm type files from a directory) --> prob=[0.4989747792078863,0.5010252207921136], prediction=1.000000
(7, To źródło danych LIBSVM służy do ładowania plików typu libsvm z katalogu) --> prob=[0.6713571892386851,0.3286428107613149], prediction=0.000000
