Latihan

In [1]:
# Example: Linear Regression with Spark MLlib
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

# Initialize Spark Session
spark = SparkSession.builder.appName('MLlib Example').getOrCreate()

# Load sample data
data = [(1, 5.0, 20.0), (2, 10.0, 25.0), (3, 15.0, 30.0), (4, 20.0, 35.0)]
columns = ['ID', 'Feature', 'Target']
df = spark.createDataFrame(data, columns)

# Prepare data for modeling
assembler = VectorAssembler(inputCols=['Feature'], outputCol='Features')
df_transformed = assembler.transform(df)

# Train a linear regression model
lr = LinearRegression(featuresCol='Features', labelCol='Target')
model = lr.fit(df_transformed)

# Print model coefficients
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')

25/11/29 13:40:40 WARN Utils: Your hostname, septianadaw-VirtualBox resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
25/11/29 13:40:40 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/29 13:40:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/11/29 13:40:59 WARN Instrumentation: [76d1b842] regParam is zero, which might cause numerical instability and overfitting.
25/11/29 13:41:02 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
25/11/29 13:41:03 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


Coefficients: [0.9999999999999992]
Intercept: 15.000000000000009


In [5]:
# Practice: Logistic Regression
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.linalg import Vectors

# Example dataset
data = [
    (1, Vectors.dense([2.0, 3.0]), 0),
    (2, Vectors.dense([1.0, 5.0]), 1),
    (3, Vectors.dense([2.5, 4.5]), 1),
    (4, Vectors.dense([3.0, 6.0]), 0)
]

columns = ['ID', 'Features', 'Label']
df = spark.createDataFrame(data, columns)

# Train logistic regression model
lr = LogisticRegression(featuresCol='Features', labelCol='Label')
model = lr.fit(df)

# Display coefficients and summary
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')

Coefficients: [-12.262057917018774,4.0873522624320255]
Intercept: 11.568912714495273


In [7]:
# Practice: KMeans Clustering
from pyspark.ml.clustering import KMeans
from pyspark.ml.linalg import Vectors

# Example dataset (perbaikan)
data = [
    (1, Vectors.dense([1.0, 1.0])),
    (2, Vectors.dense([5.0, 5.0])),
    (3, Vectors.dense([10.0, 10.0])),
    (4, Vectors.dense([15.0, 15.0]))
]

columns = ['ID', 'Features']
df = spark.createDataFrame(data, columns)

# Train KMeans clustering model
kmeans = KMeans(featuresCol='Features', k=2)
model = kmeans.fit(df)

# Show cluster centers
centers = model.clusterCenters()
print(f'Cluster Centers: {centers}')

                                                                                

Cluster Centers: [array([12.5, 12.5]), array([3., 3.])]


HOMEWORK

1. Load a real-world dataset into Spark and prepare it for machine learning tasks.

In [13]:
# A. Load Dataset & Data Preparation
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Membuat Spark session
spark = SparkSession.builder.appName("Septiana Pertemuan 14").getOrCreate()

# Load dataset (CSV)
df = spark.read.csv("Titanic-Dataset.csv", header=True, inferSchema=True)

# Menampilkan Data
print("5 Baris Pertama: ")
df.show(5)
print("Schema Data: ")
df.printSchema()

5 Baris Pertama: 
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| NULL|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282| 7925.0| NULL|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| NULL|       S|
+-----------+--------+------+--------------------+------+----+

In [14]:
# B. Drop Duplikasi
# Jumlah baris sebelum dropDuplicates
rows_before = df.count()
print("Jumlah baris sebelum dropDuplicates:", rows_before)

# Menghitung jumlah baris unik
rows_unique = df.dropDuplicates().count()

# Menghitung jumlah duplikasi
duplicate_count = rows_before - rows_unique
print("Jumlah baris duplikat:", duplicate_count)

# Menghapus baris duplikat
df = df.dropDuplicates()

# Jumlah baris setelah dropDuplicates
rows_after = df.count()
print("Jumlah baris setelah dropDuplicates:", rows_after)

Jumlah baris sebelum dropDuplicates: 895
Jumlah baris duplikat: 4
Jumlah baris setelah dropDuplicates: 891


In [15]:
# C. Menghapus kolom yang tidak relevan untuk model machine learning
df_clean = df.drop("Name", "Ticket", "Cabin")
print("Kolom yang tersisa: ")
df_clean.printSchema()

Kolom yang tersisa: 
root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Embarked: string (nullable = true)



In [16]:
# D. Missing Values

from pyspark.sql.functions import col, when, sum as spark_sum, mean

# Mengecek jumlah missing value pada setiap kolom
print("Jumlah Missing Values Setiap Kolom:")
df_clean.select([
    spark_sum(when(col(c).isNull(), 1).otherwise(0)).alias(c)
    for c in df_clean.columns
]).show()

# Imputasi 
# Menghitung mean kolom Age untuk mengisi nilai kosong
mean_age = df_clean.select(mean("Age")).first()[0]
df_clean = df_clean.fillna({"Age": mean_age, "Embarked": "S"})

# Mengecek kembali jumlah missing value setelah proses imputasi
print("Missing Values setelah imputasi:")
df_clean.select([
    spark_sum(when(col(c).isNull(), 1).otherwise(0)).alias(c)
    for c in df_clean.columns
]).show()

Jumlah Missing Values Setiap Kolom:
+-----------+--------+------+---+---+-----+-----+----+--------+
|PassengerId|Survived|Pclass|Sex|Age|SibSp|Parch|Fare|Embarked|
+-----------+--------+------+---+---+-----+-----+----+--------+
|          0|       0|     0|  0|177|    0|    0|   0|       2|
+-----------+--------+------+---+---+-----+-----+----+--------+

Missing Values setelah imputasi:
+-----------+--------+------+---+---+-----+-----+----+--------+
|PassengerId|Survived|Pclass|Sex|Age|SibSp|Parch|Fare|Embarked|
+-----------+--------+------+---+---+-----+-----+----+--------+
|          0|       0|     0|  0|  0|    0|    0|   0|       0|
+-----------+--------+------+---+---+-----+-----+----+--------+



In [17]:
# E. Encoding Kolom Kategorikal (String ke Numeric)
# Import StringIndexer untuk mengubah string menjadi angka
from pyspark.ml.feature import StringIndexer, OneHotEncoder

# Mengubah kolom Sex menjadi angka
indexer_sex = StringIndexer(inputCol="Sex", outputCol="SexIndex")

# Mengubah kolom Embarked menjadi angka
indexer_embarked = StringIndexer(inputCol="Embarked", outputCol="EmbarkedIndex")

# OneHotEncoder untuk membuat vektor biner
encoder = OneHotEncoder(
    inputCols=["SexIndex", "EmbarkedIndex"],
    outputCols=["SexVec", "EmbarkedVec"]
)

In [18]:
# F. Gabungkan Semua Fitur Menjadi Vector

# Import VectorAssembler untuk menggabungkan fitur
from pyspark.ml.feature import VectorAssembler

# Daftar kolom fitur yang digunakan
feature_cols = ["Pclass", "Age", "SibSp", "Parch", "Fare", "SexVec", "EmbarkedVec"]

# Menggabungkan semua fitur menjadi satu kolom "features"
assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features"
)

2. Build a Classification Model Using Spark MLlib

In [25]:
# Import RandomForestClassifier untuk klasifikasi
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline

# Membuat model Random Forest
rf = RandomForestClassifier(
    labelCol="Survived",
    featuresCol="features"
)

# Membuat Pipeline untuk menggabungkan seluruh tahap preprocessing dan model
pipeline = Pipeline(stages=[indexer_sex, indexer_embarked, encoder, assembler, rf])

# Split Train dan Test: Membagi data 80% training dan 20% testing
train_df, test_df = df_clean.randomSplit([0.8, 0.2], seed=42)

# Train Model: Melatih model menggunakan data training
model = pipeline.fit(train_df)

# Membuat prediksi pada data test
predictions = model.transform(test_df)

# Menampilkan beberapa hasil prediksi
predictions.select("Survived", "prediction", "probability").show(5)

+--------+----------+--------------------+
|Survived|prediction|         probability|
+--------+----------+--------------------+
|       1|       0.0|[0.56188488508122...|
|       0|       0.0|[0.75774398187318...|
|       1|       0.0|[0.55301519130739...|
|       0|       0.0|[0.78046049430939...|
|       1|       1.0|[0.47207410845731...|
+--------+----------+--------------------+
only showing top 5 rows



In [26]:
# Evaluasi Model (Accuracy, F1, Precision, Recall)
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Membuat Evaluator
evaluator = MulticlassClassificationEvaluator(
    labelCol="Survived",
    predictionCol="prediction"
)

# Accuracy, F1 Score, Precision, Recall
accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})
f1 = evaluator.evaluate(predictions, {evaluator.metricName: "f1"})
precision = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})
recall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})

# Menampilkan hasil evaluasi
print("Hasil Evaluasi Model: ")
print("Accuracy :", accuracy)
print("F1 Score :", f1)
print("Precision:", precision)
print("Recall   :", recall)

Hasil Evaluasi Model: 
Accuracy : 0.8275862068965517
F1 Score : 0.8227029342471621
Precision: 0.8491157127621756
Recall   : 0.8275862068965518


3. Hyperparameter Tuning (Cross-Validation)

In [None]:
# Import builder untuk membuat grid parameter
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Daftar kombinasi parameter yang akan diuji
paramGrid = (ParamGridBuilder()
             .addGrid(rf.maxDepth, [3, 5, 7])
             .addGrid(rf.numTrees, [50, 100])
             .build())

# Membuat CrossValidator
cv = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=5
)

# Melatih Model Dengan Cross Validation
cv_model = cv.fit(train_df)

# Prediksi Menggunakan Model
cv_predictions = cv_model.transform(test_df)

# Evaluasi Model
# Menghitung accuracy, F1, Precision, Recall terbaik
best_accuracy = evaluator.evaluate(cv_predictions, {evaluator.metricName: "accuracy"})
best_f1 = evaluator.evaluate(cv_predictions, {evaluator.metricName: "f1"})
precision = evaluator.evaluate(cv_predictions, {evaluator.metricName: "weightedPrecision"})
recall = evaluator.evaluate(cv_predictions, {evaluator.metricName: "weightedRecall"})

# Menampilkan hasil evaluasi terbaik
print("Best CV Accuracy :", best_accuracy)
print("Best CV F1       :", best_f1)
print("Best CV Precision:", precision)
print("Best CV Recall   :", recall)

