# Pertemuan 14: Advanced Machine Learning using Spark MLlib

## Introduction to Spark MLlib
Spark MLlib is a scalable library for machine learning that integrates seamlessly with the Spark ecosystem. It supports a wide range of tasks, including regression, classification, clustering, and collaborative filtering.

In [None]:
# Example: Linear Regression with Spark MLlib
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

# Initialize Spark Session
spark = SparkSession.builder.appName('MLlib Example').getOrCreate()

# Load sample data
data = [(1, 5.0, 20.0), (2, 10.0, 25.0), (3, 15.0, 30.0), (4, 20.0, 35.0)]
columns = ['ID', 'Feature', 'Target']
df = spark.createDataFrame(data, columns)

# Prepare data for modeling
assembler = VectorAssembler(inputCols=['Feature'], outputCol='Features')
df_transformed = assembler.transform(df)

# Train a linear regression model
lr = LinearRegression(featuresCol='Features', labelCol='Target')
model = lr.fit(df_transformed)

# Print model coefficients
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')


25/11/21 09:49:17 WARN Utils: Your hostname, rino-VirtualBox resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
25/11/21 09:49:17 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/21 09:49:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/11/21 09:49:29 WARN Instrumentation: [880e6534] regParam is zero, which might cause numerical instability and overfitting.
25/11/21 09:49:31 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
25/11/21 09:49:31 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
25/11/21 09:49:31 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(the

Coefficients: [0.9999999999999992]
Intercept: 15.000000000000009


In [None]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler

# Example dataset
data = [(1, 2.0, 3.0, 0),
        (2, 1.0, 5.0, 1),
        (3, 2.5, 4.5, 1),
        (4, 3.0, 6.0, 0)]

columns = ['ID', 'F1', 'F2', 'Label']
df = spark.createDataFrame(data, columns)

# Convert features into a vector
assembler = VectorAssembler(inputCols=['F1', 'F2'], outputCol='Features')
df_vector = assembler.transform(df)

# Train logistic regression model
lr = LogisticRegression(featuresCol='Features', labelCol='Label')
model = lr.fit(df_vector)

# Display coefficients and summary
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')



                                                                                

Coefficients: [-12.262057928656196,4.087352266314726]
Intercept: 11.568912726114789


In [None]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler

# Example dataset
data = [(1, 1.0, 1.0),
        (2, 5.0, 5.0),
        (3, 10.0, 10.0),
        (4, 15.0, 15.0)]

columns = ['ID', 'F1', 'F2']
df = spark.createDataFrame(data, columns)

# Convert to vector
assembler = VectorAssembler(inputCols=['F1', 'F2'], outputCol='Features')
df_vector = assembler.transform(df)

# Train KMeans clustering model
kmeans = KMeans(featuresCol='Features', k=2)
model = kmeans.fit(df_vector)

# Show cluster centers
centers = model.clusterCenters()
print(f'Cluster Centers: {centers}')


[Stage 48:>                                                         (0 + 4) / 4]

Cluster Centers: [array([12.5, 12.5]), array([3., 3.])]


                                                                                

## Homework
- Load a real-world dataset into Spark and prepare it for machine learning tasks.
- Build a classification model using Spark MLlib and evaluate its performance.
- Explore hyperparameter tuning using cross-validation.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

#1. Start Spark Session
spark = SparkSession.builder.appName("Titanic-MLlib").getOrCreate()

# 2. Load Dataset
df = spark.read.csv("train.csv", header=True, inferSchema=True)
df.printSchema()

# 3. Select dan Bersihkan Data
df = df.select(
    "Survived", "Pclass", "Sex", "Age", "Fare", "Embarked"
)

# Isi missing values
df = df.fillna({
    "Age": df.agg({"Age": "mean"}).first()[0],
    "Embarked": "S"
})

# 4. Feature Engineering
# Encoding kolom kategori
sex_indexer = StringIndexer(inputCol="Sex", outputCol="SexIndex")
emb_indexer = StringIndexer(inputCol="Embarked", outputCol="EmbarkedIndex")

encoder = OneHotEncoder(
    inputCols=["SexIndex", "EmbarkedIndex"],
    outputCols=["SexVec", "EmbarkedVec"]
)

# 5. Vector Assembler (Gabungkan fitur)
assembler = VectorAssembler(
    inputCols=["Pclass", "Age", "Fare", "SexVec", "EmbarkedVec"],
    outputCol="features"
)

# 6. Model
lr = LogisticRegression(labelCol="Survived", featuresCol="features")

# 7. Pipeline
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[sex_indexer, emb_indexer, encoder, assembler, lr])

# 8. Train-test split
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)

# 9. Hyperparameter Tuning
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.1, 0.5])
             .addGrid(lr.maxIter, [10, 20, 50])
             .build())

evaluator = BinaryClassificationEvaluator(
    labelCol="Survived",
    rawPredictionCol="rawPrediction",
    metricName="areaUnderROC"
)

cv = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=3
)

# 10. Training Final Model
cv_model = cv.fit(train_data)

# 11. Evaluation
predictions = cv_model.transform(test_data)

auc = evaluator.evaluate(predictions)
print("AUC Score:", auc)

predictions.select("Survived", "prediction", "probability").show(10, truncate=False)


25/11/21 10:38:23 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)

AUC Score: 0.8650468384074942
+--------+----------+----------------------------------------+
|Survived|prediction|probability                             |
+--------+----------+----------------------------------------+
|0       |1.0       |[0.11673391185638055,0.8832660881436194]|
|0       |1.0       |[0.48190279316756396,0.5180972068324361]|
|0       |1.0       |[0.3737633511389316,0.6262366488610684] |
|0       |0.0       |[0.5433539344723657,0.4566460655276343] |
|0       |0.0       |[0.5486642317163983,0.45133576828360167]|
|0 