# Spark MLlib Exercises


http://spark.apache.org/docs/latest/ml-statistics.html

In [31]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
!tar -xf spark-3.1.2-bin-hadoop3.2.tgz
!pip install -q findspark

In [32]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop3.2"

In [33]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [None]:
spark

## 1. Statistics (1p.)

Download the following dataset: https://www.kaggle.com/c/titanic/data?select=train.csv

In [None]:
file = "./titanic_train.csv"
spark.read.option("header",True).csv("./titanic_train.csv")
titanic_df = spark.read.format("csv").options(inferSchema="true", header="true").load(file)
titanic_df = titanic_df.dropna(how='any')
titanic_df.show(10)
print(titanic_df.dtypes)

+-----------+--------+------+--------------------+------+----+-----+-----+--------+-------+-----------+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|  Ticket|   Fare|      Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+--------+-------+-----------+--------+
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|PC 17599|71.2833|        C85|       C|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|  113803|   53.1|       C123|       S|
|          7|       0|     1|McCarthy, Mr. Tim...|  male|54.0|    0|    0|   17463|51.8625|        E46|       S|
|         11|       1|     3|Sandstrom, Miss. ...|female| 4.0|    1|    1| PP 9549|   16.7|         G6|       S|
|         12|       1|     1|Bonnell, Miss. El...|female|58.0|    0|    0|  113783|  26.55|       C103|       S|
|         22|       1|     2|Beesley, Mr. Lawr...|  male|34.0|    0|    0|  248698|   13.0|     

### Exercise 1.A.
**TODO:** Calculate descriptive statistics for 'Age' and 'Fare' (see https://spark.apache.org/docs/1.6.1/api/java/org/apache/spark/sql/DataFrame.html#describe(scala.collection.Seq))

In [None]:
titanic_df.describe("Age", "Fare").show()

+-------+------------------+-----------------+
|summary|               Age|             Fare|
+-------+------------------+-----------------+
|  count|               183|              183|
|   mean|  35.6744262295082|78.68246885245901|
| stddev|15.643865966849717|76.34784270040569|
|    min|              0.92|              0.0|
|    max|              80.0|         512.3292|
+-------+------------------+-----------------+



### Exercise 1.B.

**TODO:** Check if 'Age' and 'Fare' have normal distribution (see http://spark.apache.org/docs/latest/api/java/org/apache/spark/ml/stat/KolmogorovSmirnovTest.html)

In [None]:
from pyspark.ml.stat import KolmogorovSmirnovTest

ksResult = KolmogorovSmirnovTest.test(titanic_df, 'Age', 'norm').first()
print(round(ksResult.pValue, 3))
print(round(ksResult.statistic, 3))

0.0
0.971


In [None]:
ksResult = KolmogorovSmirnovTest.test(titanic_df, 'Fare', 'norm').first()
print(round(ksResult.pValue, 3))
print(round(ksResult.statistic, 3))

0.0
0.989


### Exercise 1.C.

**TODO:** Calculate Pearson correlation between the following pairs of features:  
* 'Age' and 'Survived'
* 'Sex' and 'Survived' *(remember about encoding 'Sex' attributes as 0s and 1s)*

Which correlation is stronger?

In [None]:
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf

def sex_to_integer (sex) :
   return int(sex == "male")

sex_to_integer_udf = udf(sex_to_integer ,IntegerType())

In [None]:
df = titanic_df.select('Age', 'Sex', 'Survived').na.drop()
df = df.withColumn('Sex', sex_to_integer_udf('Sex'))

df.show(10)

+----+---+--------+
| Age|Sex|Survived|
+----+---+--------+
|38.0|  0|       1|
|35.0|  0|       1|
|54.0|  1|       0|
| 4.0|  0|       1|
|58.0|  0|       1|
|34.0|  1|       1|
|28.0|  1|       1|
|19.0|  1|       0|
|49.0|  0|       1|
|65.0|  1|       0|
+----+---+--------+
only showing top 10 rows



In [None]:
df.corr("Age", "Survived", method="pearson")

-0.2540847542030532

In [None]:
df.corr("Sex", "Survived", method="pearson")

-0.5324179744538412

## 2. Loading data

Doc: http://spark.apache.org/docs/latest/ml-datasource.html 

Download data from https://github.com/apache/spark/blob/master/data/mllib/sample_libsvm_data.txt and load as DataFrame. 

In [None]:
file = "sample_libsvm_data.txt"

df = spark.read.format("libsvm").option("numFeatures", "780").load(file)
df.show(10)
df.take(1)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(780,[127,128,129...|
|  1.0|(780,[158,159,160...|
|  1.0|(780,[124,125,126...|
|  1.0|(780,[152,153,154...|
|  1.0|(780,[151,152,153...|
|  0.0|(780,[129,130,131...|
|  1.0|(780,[158,159,160...|
|  1.0|(780,[99,100,101,...|
|  0.0|(780,[154,155,156...|
|  0.0|(780,[127,128,129...|
+-----+--------------------+
only showing top 10 rows



[Row(label=0.0, features=SparseVector(780, {127: 51.0, 128: 159.0, 129: 253.0, 130: 159.0, 131: 50.0, 154: 48.0, 155: 238.0, 156: 252.0, 157: 252.0, 158: 252.0, 159: 237.0, 181: 54.0, 182: 227.0, 183: 253.0, 184: 252.0, 185: 239.0, 186: 233.0, 187: 252.0, 188: 57.0, 189: 6.0, 207: 10.0, 208: 60.0, 209: 224.0, 210: 252.0, 211: 253.0, 212: 252.0, 213: 202.0, 214: 84.0, 215: 252.0, 216: 253.0, 217: 122.0, 235: 163.0, 236: 252.0, 237: 252.0, 238: 252.0, 239: 253.0, 240: 252.0, 241: 252.0, 242: 96.0, 243: 189.0, 244: 253.0, 245: 167.0, 262: 51.0, 263: 238.0, 264: 253.0, 265: 253.0, 266: 190.0, 267: 114.0, 268: 253.0, 269: 228.0, 270: 47.0, 271: 79.0, 272: 255.0, 273: 168.0, 289: 48.0, 290: 238.0, 291: 252.0, 292: 252.0, 293: 179.0, 294: 12.0, 295: 75.0, 296: 121.0, 297: 21.0, 300: 253.0, 301: 243.0, 302: 50.0, 316: 38.0, 317: 165.0, 318: 253.0, 319: 233.0, 320: 208.0, 321: 84.0, 328: 253.0, 329: 252.0, 330: 165.0, 343: 7.0, 344: 178.0, 345: 252.0, 346: 240.0, 347: 71.0, 348: 19.0, 349: 28.0

### Exercise 2.A
**TODO:** Load wine data from https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/multiclass/wine.scale
Dataset description: http://archive.ics.uci.edu/ml/datasets/Wine

In [None]:
file = "wine.scale"

df = spark.read.format("libsvm").option("numFeatures", "13").load(file)
df.show(10)
df.take(1)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  1.0|(13,[0,1,2,3,4,5,...|
|  1.0|(13,[0,1,2,3,4,5,...|
|  1.0|(13,[0,1,2,3,4,5,...|
|  1.0|(13,[0,1,2,3,4,5,...|
|  1.0|(13,[0,1,2,3,4,5,...|
|  1.0|(13,[0,1,2,3,4,5,...|
|  1.0|(13,[0,1,2,3,4,5,...|
|  1.0|(13,[0,1,2,3,4,5,...|
|  1.0|(13,[0,1,2,3,4,5,...|
|  1.0|(13,[0,1,2,3,4,5,...|
+-----+--------------------+
only showing top 10 rows



[Row(label=1.0, features=SparseVector(13, {0: 0.6842, 1: -0.6166, 2: 0.1444, 3: -0.4845, 4: 0.2391, 5: 0.2552, 6: 0.1477, 7: -0.434, 8: 0.1861, 9: -0.256, 10: -0.0894, 11: 0.9414, 12: 0.1227}))]

## 3. Classification (2p.)

In [14]:
file = "wine.csv" # https://gist.githubusercontent.com/tijptjik/9408623/raw/b237fa5848349a14a14e5d4107dc7897c21951f5/wine.csv

# Remember about deleting dots from the headers of this csv file!
winedf2 = spark.read.format("csv").options(inferSchema="true", header="true").load(file)
winedf2.show(10)
print(winedf2.dtypes)

+----+-------+----------+----+----+---+-------+----------+--------------------+-------+---------+----+----+-------+
|Wine|Alcohol|Malic-acid| Ash| Acl| Mg|Phenols|Flavanoids|Nonflavanoid-phenols|Proanth|Color-int| Hue|  OD|Proline|
+----+-------+----------+----+----+---+-------+----------+--------------------+-------+---------+----+----+-------+
|   1|  14.23|      1.71|2.43|15.6|127|    2.8|      3.06|                0.28|   2.29|     5.64|1.04|3.92|   1065|
|   1|   13.2|      1.78|2.14|11.2|100|   2.65|      2.76|                0.26|   1.28|     4.38|1.05| 3.4|   1050|
|   1|  13.16|      2.36|2.67|18.6|101|    2.8|      3.24|                 0.3|   2.81|     5.68|1.03|3.17|   1185|
|   1|  14.37|      1.95| 2.5|16.8|113|   3.85|      3.49|                0.24|   2.18|      7.8|0.86|3.45|   1480|
|   1|  13.24|      2.59|2.87|21.0|118|    2.8|      2.69|                0.39|   1.82|     4.32|1.04|2.93|    735|
|   1|   14.2|      1.76|2.45|15.2|112|   3.27|      3.39|              

### Exercise 3.A
**TODO:** 

Remember about deleting dots from the headers of this csv file and splitting data into train and test set


1) Create pipeline with VectorAssembler and DecisionTreeClassifier.

2) Use the pipeline to make predictions.

3) Evaluate predictions using MulticlassClassificationEvaluator.

4) Calculate accuracy and test error

5) Print the structure of the trained decision tree (hint: use toDebugString attribute)

In [10]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier, LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import HashingTF, IDF, QuantileDiscretizer, StringIndexer, Tokenizer, VectorAssembler
from pyspark.ml.stat import Correlation, KolmogorovSmirnovTest

In [None]:
train_df, test_df = winedf2.randomSplit([0.8, 0.2], seed=0)

In [25]:
num_classes = train_df.select("Wine").distinct().count()

feature_cols = train_df.columns[1:]

assembler = VectorAssembler(
    inputCols=feature_cols, 
    outputCol="features"
)
decision_tree = DecisionTreeClassifier(
    labelCol="Wine", 
    featuresCol="features"
)
pipeline = Pipeline(stages=[assembler, decision_tree]) 

model = pipeline.fit(train_df)
predictions = model.transform(test_df)

evaluator = MulticlassClassificationEvaluator(
    labelCol="Wine", 
    predictionCol="prediction", 
    metricName="accuracy"
)

accuracy = evaluator.evaluate(predictions) * 100

In [21]:
print(accuracy)

86.11111111111111


In [27]:
tree_model = model.stages[1]
print(tree_model.toDebugString)

DecisionTreeClassificationModel: uid=DecisionTreeClassifier_e60eb72ea1ed, depth=5, numNodes=19, numClasses=4, numFeatures=13
  If (feature 12 <= 755.0)
   If (feature 6 <= 1.385)
    If (feature 9 <= 3.77)
     Predict: 2.0
    Else (feature 9 > 3.77)
     Predict: 3.0
   Else (feature 6 > 1.385)
    If (feature 0 <= 13.135)
     Predict: 2.0
    Else (feature 0 > 13.135)
     If (feature 1 <= 1.6749999999999998)
      Predict: 2.0
     Else (feature 1 > 1.6749999999999998)
      If (feature 0 <= 13.285)
       Predict: 1.0
      Else (feature 0 > 13.285)
       Predict: 3.0
  Else (feature 12 > 755.0)
   If (feature 5 <= 1.6150000000000002)
    If (feature 1 <= 1.62)
     Predict: 2.0
    Else (feature 1 > 1.62)
     Predict: 3.0
   Else (feature 5 > 1.6150000000000002)
    If (feature 0 <= 11.98)
     Predict: 2.0
    Else (feature 0 > 11.98)
     Predict: 1.0



### Exercise 3.B
**TODO:** 

1) Extend the pipeline from the previos task with QuantileDiscretizer 

2) Try using a couple of different numbers of buckets, which cinfiguration gives the best results?

3) Can you see any difference in the structure of the decistion tree?

In [28]:
def train_binned_wine_decision_tree(train_df, test_df, num_buckets: int):
    num_classes = train_df.select("Wine").distinct().count()
    feature_cols = train_df.columns[1:]
    discretized_cols = [f"{col}_disc" for col in train_df.columns[1:]]
    
    discretizer = QuantileDiscretizer(
        inputCols=feature_cols,
        outputCols=discretized_cols,
        numBuckets=num_buckets
    )
    assembler = VectorAssembler(
        inputCols=discretized_cols, 
        outputCol="features"
    )
    decision_tree = DecisionTreeClassifier(
        labelCol="Wine", 
        featuresCol="features",
        
    )
    pipeline = Pipeline(stages=[discretizer, assembler, decision_tree]) 

    model = pipeline.fit(train_df)
    predictions = model.transform(test_df)

    evaluator = MulticlassClassificationEvaluator(
        labelCol="Wine", 
        predictionCol="prediction", 
        metricName="accuracy"
    )
    accuracy = evaluator.evaluate(predictions) * 100
    
    return model, accuracy

In [29]:
for num_bins in range(2, 6):
    print("Bins:", num_bins)
    model, accuracy = train_binned_wine_decision_tree(train_df, test_df, num_bins)
    print(f"Accuracy: {accuracy:.2f}")

    tree_model = model.stages[2]
    print(tree_model.toDebugString)
    print()

Bins: 2
Accuracy: 88.89
DecisionTreeClassificationModel: uid=DecisionTreeClassifier_3f1ae284dd9b, depth=5, numNodes=21, numClasses=4, numFeatures=13
  If (feature 6 in {0.0})
   If (feature 9 in {0.0})
    If (feature 10 in {0.0})
     If (feature 8 in {0.0})
      If (feature 1 in {0.0})
       Predict: 2.0
      Else (feature 1 not in {0.0})
       Predict: 3.0
     Else (feature 8 not in {0.0})
      Predict: 2.0
    Else (feature 10 not in {0.0})
     Predict: 2.0
   Else (feature 9 not in {0.0})
    If (feature 10 in {0.0})
     Predict: 3.0
    Else (feature 10 not in {0.0})
     Predict: 2.0
  Else (feature 6 not in {0.0})
   If (feature 12 in {0.0})
    Predict: 2.0
   Else (feature 12 not in {0.0})
    If (feature 0 in {0.0})
     If (feature 2 in {0.0})
      Predict: 2.0
     Else (feature 2 not in {0.0})
      If (feature 3 in {0.0})
       Predict: 1.0
      Else (feature 3 not in {0.0})
       Predict: 2.0
    Else (feature 0 not in {0.0})
     Predict: 1.0


Bins: 3
Accu

## 4. Text classification (2p.)

### Exercise 4
**TODO:** 
Build a pipeline consisting of Tokenizer, HashingTF, IDF and StringIndexer and LogisticRegression, fit it to training data: 
http://help.sentiment140.com/for-students/

What is the accuracy of this classifier?

In [34]:
from pyspark.ml.feature import HashingTF, IDF, QuantileDiscretizer, StringIndexer, Tokenizer, VectorAssembler
from pyspark.ml.stat import Correlation, KolmogorovSmirnovTest

In [39]:
columns = ["label", "id", "date", "query", "user", "text"]

train_df = (
    spark.read
    .format("csv")
    .options(inferSchema="true", header="false")
    .load("sentiment_train.csv")
)
for old, new in zip(train_df.columns, columns):
    train_df = train_df.withColumnRenamed(old, new)


test_df = (
    spark.read
    .format("csv")
    .options(inferSchema="true", header="false")
    .load("sentiment_test.csv")
)
for old, new in zip(test_df.columns, columns):
    test_df = test_df.withColumnRenamed(old, new)

    
train_df = train_df.select("label", "text")
test_df = test_df.select("label", "text")

train_df.show(5)
print(train_df.dtypes)

+-----+--------------------+
|label|                text|
+-----+--------------------+
|    0|@switchfoot http:...|
|    0|is upset that he ...|
|    0|@Kenichan I dived...|
|    0|my whole body fee...|
|    0|@nationwideclass ...|
+-----+--------------------+
only showing top 5 rows

[('label', 'int'), ('text', 'string')]


In [36]:
tokenizer = Tokenizer(
    inputCol="text", 
    outputCol="tokens"
)
hashing_tf = HashingTF(
    inputCol="tokens", 
    outputCol="features", 
    numFeatures=50
)
idf = IDF(
    inputCol="features", 
    outputCol="final_features"
)
string_indexer = StringIndexer(
    inputCol="label", 
    outputCol="final_label",
    handleInvalid="skip"
)
classifier = LogisticRegression(
    featuresCol="final_features", 
    labelCol="final_label", 
    predictionCol="prediction"
)

pipeline = Pipeline(stages=[tokenizer, hashing_tf, idf, string_indexer, classifier])

In [40]:
model = pipeline.fit(train_df)
predictions = model.transform(test_df)

evaluator = MulticlassClassificationEvaluator(
    labelCol="final_label", 
    predictionCol="prediction", 
    metricName="accuracy"
)
accuracy = evaluator.evaluate(predictions) * 100

print(f"Accuracy: {accuracy:.2f}")

Accuracy: 53.20
