# MLlib

Spark biedt ook een framework aan voor MachineLearning modellen te trainen op gedistribueerde datasets.
Dit framework is MLlib of ook wel sparkML genoemd.
De code om te werken met deze package is sterk gelijkaardig aan sklearn.
De API en een uitgebreide documentatie met voorbeeldcode kan je [hier](https://spark.apache.org/docs/latest/ml-guide.html) vinden.

Deze package bied de volgende tools aan
* ML-technieken: classificatie, regressie, clustering, ...
* Features: Extracting en transforming van features, PCA, ...
* Pipelines: Maak, train, optimaliseer en evalueer pipelines
* Persistentie: Bewaar en laden van algoritmes/modellen
* Databeheer: Algebra tools, statistieken, null-waarden, ...

Let op dat er twee API's aangeboden worden, 1 gebaseerd op RDD's en 1 op DataFrames.
De API gebaseerd op RDD's is ouder en minder flexibel dan de API gebruik makend van DataFrames.
Momenteel werken ze allebei maar in de toekomst zou de RDD gebaseerde kunnen verdwijnen.

## Utilities

### Varianten voor numpy-arrays

Voor feature sets en volledige matrices van datasets aan te maken kan je gebruik maken van de Vector en Matrix klassen.
Deze beschikken over een Dense variant waar je elk element moet ingeven of een Sparse Variant waar cellen, elementen leeg kan laten.
Dit ziet er als volgt uit:

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master('local[*]').appName("MLLib intro").config("spark.driver.memory", "4g").config("spark.executor.memory", "4g").config("spark.executor.memoryOverhead", "1g") \
.getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/06 11:56:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Het is belangrijk om te weten dat dit locale datastructuren (wrapper rond numpy array) zijn en geen gedistribueerde objecten.

In [4]:
from pyspark.ml.linalg import Vectors, Matrices

data = Vectors.sparse(4, (0, 1), (2.0, 1.0))
print(data)
data = Vectors.dense([2.0, 1.0, 0.0, 0.0])
print(data)
data = Matrices.dense(4, 4, range(16))
print(data)

(4,[0,1],[2.0,1.0])
[2.0,1.0,0.0,0.0]
DenseMatrix([[ 0.,  4.,  8., 12.],
             [ 1.,  5.,  9., 13.],
             [ 2.,  6., 10., 14.],
             [ 3.,  7., 11., 15.]])


### Statistieken

Voor er kan gewerkt worden met statistieken moeten we (net zoals bij pandas) eerst een dataset hebben.
Hieronder maken we een random dataframe aan van 50 rijen en 4 kolommen.

In [8]:
# random data aanmaken (50 rijen en 4 kolommen)
from pyspark.mllib.random import RandomRDDs

sc = spark.sparkContext
data = RandomRDDs.uniformVectorRDD(sc, 50, 4).map(lambda a: a.tolist()).toDF()
data.head(5)
data.describe().show()

+-------+--------------------+--------------------+--------------------+-------------------+
|summary|                  _1|                  _2|                  _3|                 _4|
+-------+--------------------+--------------------+--------------------+-------------------+
|  count|                  50|                  50|                  50|                 50|
|   mean|  0.5277823634936698| 0.45039698313032434| 0.43083175420665226| 0.5033761310058955|
| stddev| 0.28803317364991815| 0.30465666667986957| 0.23551772800012008|0.27961638439500036|
|    min|0.027289600150978033|0.006537536790830023|0.010302092502849414|0.02558399730345995|
|    max|  0.9742260064078323|  0.9830205082146971|  0.9842047921655179| 0.9914768379236777|
+-------+--------------------+--------------------+--------------------+-------------------+



                                                                                

**Correlation matrix**

Buiten de statistieken die berekend kunnen worden door de summary() functie kan ook de correlatiematrix belangrijk zijn.
Deze matrix maakt het mogelijk om het verband tussen de verscheidene features te bestuderen.
Deze matrix kan als volgt berekend worden voor een gedistribueerd dataframe.

In [14]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=data.columns, outputCol='vector')
df_vector = assembler.transform(data)
df_vector.show(truncate=False)

from pyspark.ml.stat import Correlation
df_corr = Correlation.corr(df_vector, 'vector')
df_corr.collect()[0][0].values

+--------------------+--------------------+-------------------+-------------------+--------------------------------------------------------------------------------+
|_1                  |_2                  |_3                 |_4                 |vector                                                                          |
+--------------------+--------------------+-------------------+-------------------+--------------------------------------------------------------------------------+
|0.4220033153845991  |0.35004689001896083 |0.36866243345675354|0.6729241647864656 |[0.4220033153845991,0.35004689001896083,0.36866243345675354,0.6729241647864656] |
|0.3844623109357628  |0.6350806128449099  |0.6913926352466443 |0.7664217645523693 |[0.3844623109357628,0.6350806128449099,0.6913926352466443,0.7664217645523693]   |
|0.1258495827205075  |0.2726724908455812  |0.7217576619785252 |0.7844931480053702 |[0.1258495827205075,0.2726724908455812,0.7217576619785252,0.7844931480053702]   |
|0.4411105

                                                                                

array([ 1.        , -0.15261255, -0.19830636, -0.12312731, -0.15261255,
        1.        ,  0.02824971,  0.17790507, -0.19830636,  0.02824971,
        1.        ,  0.15273451, -0.12312731,  0.17790507,  0.15273451,
        1.        ])

**Onafhankelijksheidtest**

Naast de correlatiematrix kan het ook belangrijk zijn om de onafhankelijkheid te testen tussen elke feature en een label.
Dit kan uitgevoerd worden door een zogenaamde ChiSquareTest.
Deze krijgt als input een dataframe, de naam van de kolom met de features (als vectors) en de naam van een kolom met de labels.
We kunnen deze test uitvoeren als volgt:

In [15]:
from pyspark.sql.functions import rand, when

df = df_vector.withColumn('label', when(rand() > 0.5, 1).otherwise(0))
df.show()

+--------------------+--------------------+-------------------+-------------------+--------------------+-----+
|                  _1|                  _2|                 _3|                 _4|              vector|label|
+--------------------+--------------------+-------------------+-------------------+--------------------+-----+
|  0.4220033153845991| 0.35004689001896083|0.36866243345675354| 0.6729241647864656|[0.42200331538459...|    0|
|  0.3844623109357628|  0.6350806128449099| 0.6913926352466443| 0.7664217645523693|[0.38446231093576...|    1|
|  0.1258495827205075|  0.2726724908455812| 0.7217576619785252| 0.7844931480053702|[0.12584958272050...|    1|
| 0.44111056411460436| 0.06495557377755123| 0.3783184879152791| 0.4288061046585595|[0.44111056411460...|    1|
| 0.04201175951500413|  0.8382688787875853| 0.9842047921655179| 0.4274475062527101|[0.04201175951500...|    1|
|  0.8766253216566182| 0.36272491389074757| 0.2084328033390458|  0.384389490728792|[0.87662532165661...|    0|
|

In [16]:
from pyspark.ml.stat import ChiSquareTest
ChiSquareTest.test(df, 'vector', 'label', flatten=True).show()

[Stage 46:>                                                       (0 + 16) / 16]

+------------+-------------------+----------------+-----------------+
|featureIndex|             pValue|degreesOfFreedom|        statistic|
+------------+-------------------+----------------+-----------------+
|           0|0.43343669725576095|              49|49.99999999999992|
|           1|0.43343669725576095|              49|49.99999999999992|
|           2|0.43343669725576095|              49|49.99999999999992|
|           3|0.43343669725576095|              49|49.99999999999992|
+------------+-------------------+----------------+-----------------+



                                                                                

De Chi-square test wordt op de volgende manieren gebruikt in Machine Learning:
* Feature selectie: Identificeer welke features een significante relatie hebben met de targetvariabele.
* Correlatie tussen categorische variabelen: Evalueer de afhankelijkheid tussen twee categorische variabelen.

De Chi-square test vergelijkt de verwachte en werkelijke frequenties van waarden in de categorieën van een feature ten opzichte van de targetvariabele. Het doel is om te bepalen of de verschillen tussen deze frequenties statistisch significant zijn of niet. Een hoge Chi-square score betekent dat er een grote afhankelijkheid is tussen de feature en de target, wat suggereert dat de feature nuttig is voor voorspellingen.
Als de p-waarde kleiner is dan een vooraf bepaald significantieniveau (bijvoorbeeld 0,05), verwerp je de nullhypothese (dat er geen relatie is), wat betekent dat de feature relevant is.

**Summarizer**

Andere statistieken per kolom kunnen berekend worden door gebruik te maken van de Summarizer klasse:

In [19]:
from pyspark.ml.stat import Summarizer

summarizer = Summarizer.metrics("mean", "count")
df.select(summarizer.summary(df.vector)).show(truncate=False)

[Stage 52:>                                                       (0 + 16) / 16]

+-----------------------------------------------------------------------------------+
|aggregate_metrics(vector, 1.0)                                                     |
+-----------------------------------------------------------------------------------+
|{[0.5277823634936697,0.4503969831303244,0.4308317542066523,0.5033761310058955], 50}|
+-----------------------------------------------------------------------------------+



                                                                                

Het gebruik maken van de Summarizer maakt het dus mogelijk om rechtstreeks op de feature vectors te werken zonder ze eerst terug te moeten splitsen.

### Pipelines

Pipelines binnen Spark zijn een groep van high-level API's steunend op Dataframes om ML-pipelines aan te maken, optimaliseren en trainen.
De belangrijkste concepten binnen de Pipelines van Spark zijn:
* Dataframe: concept van de dataset
* Transformer: Zet een dataframe om in een ander dataframe
* Estimator: Zet een dataframe om in een model/transformer
* Pipeline: een ketting van transformers en estimators om een flow vast te leggen
* Parameter: API voor parameters van transformers en estimators aan te passen

Gebruik nu onderstaande mini-dataset waar we op basis van een tekstkolom met logistische regressie een bepaald label proberen te voorspellen.
Maak hiervoor een Pipeline uit die bestaat uit de volgende stappen:
* Tokenizer om de tekstkolom te splitsen in de overeenkomstige woorden
* HashingTf om de term frequency van de woorden te bepalen en het om te zetten naar een feature vector
* LogisticRegression Estimator om de voorspelling te doen.

Train daarna deze pipeline en maak de voorspellingen voor de traningsdata.
Hoe accuraat is dit model?

In [None]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer

training = spark.createDataFrame([
    (0, 'wat is big data tof', 1.0),
    (1, 'ik kan goed big data', 1.0),
    (2, 'ik wil van het zonnetje genieten', 0.0),
    (3, 'ik heb dorst',  0.0)
], ['id', 'text', 'label'])

tokenizer = Tokenizer(inputCol='text', outputCol='tokens')
hasher = HashingTF(inputCol='tokens', outputCol='features')
logistic = LogisticRegression(maxIter=10, regParam=0.001)

df_prepped = hasher.transform(tokenizer.transform(training))
model = logistic.fit(df_prepped) # model is een transformer, logistic is een estimator

In [24]:
model.transform(df_prepped).show()

+---+--------------------+-----+--------------------+--------------------+--------------------+--------------------+----------+
| id|                text|label|              tokens|            features|       rawPrediction|         probability|prediction|
+---+--------------------+-----+--------------------+--------------------+--------------------+--------------------+----------+
|  0| wat is big data tof|  1.0|[wat, is, big, da...|(262144,[67406,67...|[-6.8203767382313...|[0.00109012004814...|       1.0|
|  1|ik kan goed big data|  1.0|[ik, kan, goed, b...|(262144,[97142,15...|[-6.2496016696093...|[0.00192750081356...|       1.0|
|  2|ik wil van het zo...|  0.0|[ik, wil, van, he...|(262144,[16725,15...|[6.94109921712630...|[0.99903372873981...|       0.0|
|  3|        ik heb dorst|  0.0|    [ik, heb, dorst]|(262144,[88144,12...|[6.17763043040952...|[0.99792895842010...|       0.0|
+---+--------------------+-----+--------------------+--------------------+--------------------+---------

In [27]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[tokenizer, hasher, logistic])
model = pipeline.fit(training)
predictions = model.transform(training)
predictions.show()

+---+--------------------+-----+--------------------+--------------------+--------------------+--------------------+----------+
| id|                text|label|              tokens|            features|       rawPrediction|         probability|prediction|
+---+--------------------+-----+--------------------+--------------------+--------------------+--------------------+----------+
|  0| wat is big data tof|  1.0|[wat, is, big, da...|(262144,[67406,67...|[-6.8203767382313...|[0.00109012004814...|       1.0|
|  1|ik kan goed big data|  1.0|[ik, kan, goed, b...|(262144,[97142,15...|[-6.2496016696093...|[0.00192750081356...|       1.0|
|  2|ik wil van het zo...|  0.0|[ik, wil, van, he...|(262144,[16725,15...|[6.94109921712630...|[0.99903372873981...|       0.0|
|  3|        ik heb dorst|  0.0|    [ik, heb, dorst]|(262144,[88144,12...|[6.17763043040952...|[0.99792895842010...|       0.0|
+---+--------------------+-----+--------------------+--------------------+--------------------+---------

### Evalueren van een model

In de pyspark.ml package zitten er ook functionaliteiten voor deze modellen te evalueren.
Meer informatie hierover vind je [hier](https://spark.apache.org/docs/2.2.0/mllib-evaluation-metrics.html).

In [28]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator()
evaluator.evaluate(predictions)

1.0

### Data sources

Door gebruik te maken van de sparkContext kunnen een reeks standaard databronnen ingelezen worden om datasets uit op te bouwen (Csv, Json, ...).
Daarnaast is het ook mogelijk om een folder met een reeks beelden te gebruiken als dataset om zo een model voor image classification te trainen.
Download nu [deze](https://www.kaggle.com/returnofsputnik/chihuahua-or-muffin) dataset en upload ze naar een folder op het hadoop filesysteem.

In [None]:
import opendatasets as od

od.download("https://www.kaggle.com/returnofsputnik/chihuahua-or-muffin")

In [29]:
from hdfs import InsecureClient

client = InsecureClient('http://localhost:9870', user='bigdata')
map = "/user/bigdata/MLLib"

if client.status(map, strict=False) is None:
    client.makedirs(map)
else:
    # do some cleaning in case anything else than *.txt is present
    for f in client.list(map):
        client.delete(map + '/' + f, recursive=True)

client.upload(map, 'chihuahua-or-muffin')

'/user/bigdata/MLLib/chihuahua-or-muffin'

De geuploade images kunnen nu ingelezen worden als volgt:

In [12]:
df = spark.read.format('image').load('/user/bigdata/MLLib/chihuahua-or-muffin')
df.printSchema()
df.select('image.width').show()

root
 |-- image: struct (nullable = true)
 |    |-- origin: string (nullable = true)
 |    |-- height: integer (nullable = true)
 |    |-- width: integer (nullable = true)
 |    |-- nChannels: integer (nullable = true)
 |    |-- mode: integer (nullable = true)
 |    |-- data: binary (nullable = true)

+-----+
|width|
+-----+
|  172|
|  171|
|  171|
|  172|
|  172|
|  168|
|  168|
|  171|
|  168|
|  171|
|  171|
|  171|
|  168|
|  171|
|  171|
|  172|
+-----+



Merk op dat het werken met images niet zo eenvoudig is.
Hiervoor wordt binnen pyspark typisch gebruik gemaakt van de [sparkdl](https://smurching.github.io/spark-deep-learning/site/api/python/sparkdl.html) package.
Hierbij staat de dl voor deep learning.
Aangezien dit ons momenteel te ver leidt ga ik dit niet verder toelichten.

Een andere aparte databron die eenvoudig ingelezen kan worden is het formaat "libsvm".
Een bestand van dit formaat wordt ingelezen als een dataframe met twee kolommen: een label en een kolom met de feature-vectors.
De code om dergelijk bestand in te laden is:

In [None]:
df = spark.read.format('libsvm').load('{path to file}')

## Voorbeeld

Train en evalueer een machine learning model dat beeldgegevens kan classificeren, gebaseerd op eigenschappen zoals hoogte, breedte en aantal kanalen. Omdat MLlib geen directe ondersteuning biedt voor beelddata, zullen we enkele numerieke eigenschappen van de afbeeldingen gebruiken en pipelines opzetten voor preprocessing en training.
Hierbij ga je verder werken op het dataframe van de chichuahua of muffin dataset van hierboven.

Voer de volgende stappen uit:
* Data decoderen en omzetten naar een dataframe dat door ML-kan gebruikt worden.
    * Schrijf een udf om de image.data kolom om te zetten naar een vector. Gebruik hiervoor een udf dat dit eerst omzet naar een numpy array waarna het omgezet wordt naar een PIL-Image (een python package). Dit wordt dan geresized naar een 32x32 figuur. Ten slotte wordt dit omgezet naar een 1D-array 
    * Schrijf een udf om de filename in de image.origin kolom om te zetten naar 1 van de labels: 'muffin' of 'chihuahua'
* Na het uitvoeren van de udf's zou je een dataframe moeten hebben met minstens 2 kolommen: het label als string kolom, de data als een vector kolom
* Splits de data in een trainings en testset
* Voer de volgende preprocessing stappen uit
    * Zet de tekstuele labels om naar integers
    * Schaal de vector-kolom door elke waarde te delen door 255 zodat de pixel-waarden in de range 0-255 vallen
    * Zorg ervoor dat dit in een pipeline zit en fit de preprocessor al eens. Print het schema en een aantal rijen uit om de output te verifieren
* Voer data modelling uit
    * Zorg voor dimensionality reduction aan de hand van PCA
    * Gebruik logistische regressie om het label te voorspellen
* Evalueer het model door een confusion matrix te berekenen

In [16]:
from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType, DoubleType, StringType
from pyspark.ml.feature import StringIndexer, MinMaxScaler, PCA
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml.functions import array_to_vector
from PIL import Image
import io
import numpy as np

target_size=(32,32)
num_channels=3
num_flattened=target_size[0]*target_size[1]*num_channels

def get_label(path):
    filename = path.split('/')[-1]
    if 'muffin' in filename:
        return 'muffin'
    else:
        return 'chihuahau'

def resize_image(pixel_data, width, height, target_size=(32, 32)):
    # Convert the pixel data to a numpy array and reshape it to (height, width)
    image_array = np.array(pixel_data, dtype=np.uint8).reshape((height, width, 3))
    # Convert the numpy array to a PIL Image
    image = Image.fromarray(image_array, mode="RGB")  # "L" for grayscale images
    # Resize the image to the target size
    image = image.resize(target_size)
    # image to list
    image = np.array(image, dtype=float).flatten().tolist()

# 2. UDF registreren voor resizing
resize_image_udf = F.udf(resize_image, ArrayType(DoubleType()))
get_label_udf = F.udf(get_label, StringType())

# 3. Pas de resize UDF toe om een nieuwe kolom 'resized_vector' te maken met de resized pixelwaarden
df_data = df.select("image.origin", array_to_vector(resize_image_udf(F.col('image.data'),F.col('image.width'),F.col('image.height'))).alias("data"))
df_data.show()
df_data = df_data.withColumn('label', get_label_udf(F.col('origin')))

# splitsen in train en test
train_data, test_data = df_data.randomSplit([0.8, 0.2], seed=42)

# preprocessing
label_indexer = StringIndexer(inputCol='label', outputCol='label_index')
scaler = MinMaxScaler(inputCol='data', outputCol='scaled_features')
preprocessor = Pipeline(stages=[label_indexer, scaler])

preprocessor_model = preprocessor.fit(train_data)
train_data_pre = preproccesor_model.transform(train_data)
test_data_pre = preproccesor_model.transform(test_data)

# modelling
pca = PCA(k=5, inputCol='scaled_features', outputCol='pca_features')
lr = LogisticRegression(featuresCol='pca_features', labedlCol='label', maxIter=5)
pipeline = Pipeline(stages = [preprocessor, pca, lr])

model = pipeline.fit(train_data)
predictions = model.transform(test_data)

# evaluate the model
preds_and_labels = predictions.select("prediction", "label_index").rdd.map(lambda x: (float(x[0]), float(x[1])))
metrics = MultiClassMetrics()
confusion_matrix = metrics.confusionMatrix().toArray()
print(confusion_matrix)

print(metrics.accuracy)

25/05/06 13:31:48 ERROR Executor: Exception in task 0.0 in stage 116.0 (TID 750)
org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user defined function (`functions$$$Lambda$2967/352528797`: (array<double>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>).
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:198)
	at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(S

Py4JJavaError: An error occurred while calling o1197.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 116.0 failed 1 times, most recent failure: Lost task 0.0 in stage 116.0 (TID 750) (f46b4845c1e0 executor driver): org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user defined function (`functions$$$Lambda$2967/352528797`: (array<double>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>).
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:198)
	at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:893)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:893)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.NullPointerException
	at org.apache.spark.ml.functions$.$anonfun$arrayToVectorUdf$1(functions.scala:76)
	... 20 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2414)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2433)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:530)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:483)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:61)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4333)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3316)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4323)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4321)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4321)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:3316)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:3539)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:280)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:315)
	at sun.reflect.GeneratedMethodAccessor231.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user defined function (`functions$$$Lambda$2967/352528797`: (array<double>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>).
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:198)
	at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:893)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:893)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: java.lang.NullPointerException
	at org.apache.spark.ml.functions$.$anonfun$arrayToVectorUdf$1(functions.scala:76)
	... 20 more


## Oefening - classificatie

Voer de volgende stappen uit, volg de best-practices van het data science vak:
* Genereer een dataset met de make_classification functie van Scikit-Learn (of gebruik randomSplit op een bestaande dataset in PySpark).
* Converteer de dataset naar een PySpark DataFrame.
* Voer basisverkenning uit: toon het schema van de data, bekijk de eerste rijen, en controleer de kolomtypes.
* Gebruik VectorAssembler om alle features in één vector-kolom (features) te combineren.
* Controleer de structuur van de output.
* Train een Logistic Regression model op de gegenereerde dummy data.
* Gebruik een training/test split van 80/20.
* Evalueer het model door de nauwkeurigheid (accuracy) te berekenen.

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

from sklearn.datasets import make_classification
import pandas as pd

# Maak dummy data
X, y = make_classification(n_samples=100, n_features=5, random_state=42)
df = pd.DataFrame(X, columns=[f"feature_{i}" for i in range(5)])
df['label'] = y

# Converteer naar Spark DataFrame
spark = SparkSession.builder.getOrCreate()
df_spark = spark.createDataFrame(df)

# Basisexploratie
df_spark.printSchema()
df_spark.show(5)

# Maak een assembler om features samen te voegen
assembler = VectorAssembler(inputCols=[f"feature_{i}" for i in range(5)], outputCol="features")
df_prepared = assembler.transform(df_spark)

# Bekijk het resultaat
df_prepared.select("features", "label").show(5, truncate=False)

# Train/test split
train_data, test_data = df_prepared.randomSplit([0.8, 0.2], seed=42)

# Logistic Regression model
lr = LogisticRegression(featuresCol="features", labelCol="label")
model = lr.fit(train_data)

# Voorspellingen en evaluatie
predictions = model.transform(test_data)
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy}")

## Oefening - hyperparameter tuning

Voer de volgende stappen uit, volg de best-practices van het data science vak:
* Genereer dummy data (of gebruik bestaande data) om een regressieprobleem op te lossen. (make_regression)
* Bouw een pipeline die de data voorbereidt door schaling uit te voeren, een RandomForest-model traint, en de beste hyperparameters selecteert. (Tip: ParamGridBuilder)
* Voer hyperparameter tuning uit en evalueer de prestaties van het beste model.

In [None]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorAssembler, MinMaxScaler
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
from sklearn.datasets import make_regression
import pandas as pd
from pyspark.sql import SparkSession

# Start een Spark sessie
spark = SparkSession.builder.getOrCreate()

# Genereer dummy regressie data
X, y = make_regression(n_samples=200, n_features=5, noise=0.1, random_state=42)
df = pd.DataFrame(X, columns=[f"feature_{i}" for i in range(5)])
df['label'] = y
df_spark = spark.createDataFrame(df)

# Stap 1: Data voorbereiden met VectorAssembler en MinMaxScaler
assembler = VectorAssembler(inputCols=[f"feature_{i}" for i in range(5)], outputCol="features_vector")
scaler = MinMaxScaler(inputCol="features_vector", outputCol="scaled_features")

# Stap 2: RandomForestRegressor model
rf = RandomForestRegressor(featuresCol="scaled_features", labelCol="label")

# Stap 3: Pipeline opbouwen
pipeline = Pipeline(stages=[assembler, scaler, rf])

# Stap 4: Hyperparameter grid instellen voor RandomForestRegressor
paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [10, 20, 50]) \
    .addGrid(rf.maxDepth, [5, 10, 15]) \
    .build()

# Stap 5: CrossValidator instellen
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse"),
                          numFolds=3)  # 3-fold cross-validation

# Train/test split
train_data, test_data = df_spark.randomSplit([0.8, 0.2], seed=42)

# Stap 6: Model trainen en tunen
cv_model = crossval.fit(train_data)

# Stap 7: Het beste model toepassen op de test set
best_model = cv_model.bestModel
predictions = best_model.transform(test_data)

# Evaluatie van het beste model met RMSE (Root Mean Squared Error)
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print(f"Beste model RMSE op test data: {rmse}")

# Beste hyperparameters weergeven
print("Beste hyperparameters:")
print("Aantal bomen (numTrees):", best_model.stages[-1]._java_obj.getNumTrees())
print("Maximale diepte (maxDepth):", best_model.stages[-1]._java_obj.getMaxDepth())