In [1]:
# Example: Linear Regression with Spark MLlib
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

# Initialize Spark Session
spark = SparkSession.builder.appName('MLlib Example').getOrCreate()

# Load sample data
data = [(1, 5.0, 20.0), (2, 10.0, 25.0), (3, 15.0, 30.0), (4, 20.0, 35.0)]
columns = ['ID', 'Feature', 'Target']
df = spark.createDataFrame(data, columns)

# Prepare data for modeling
assembler = VectorAssembler(inputCols=['Feature'], outputCol='Features')
df_transformed = assembler.transform(df)

# Train a linear regression model
lr = LinearRegression(featuresCol='Features', labelCol='Target')
model = lr.fit(df_transformed)

# Print model coefficients
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')


25/11/29 08:16:43 WARN Utils: Your hostname, dafa-Aspire-A514-55G resolves to a loopback address: 127.0.1.1; using 192.168.100.27 instead (on interface wlp43s0)
25/11/29 08:16:43 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/29 08:16:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/11/29 08:16:49 WARN Instrumentation: [f578dd9c] regParam is zero, which might cause numerical instability and overfitting.
25/11/29 08:16:51 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
25/11/29 08:16:51 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
25/11/29 08:16:51 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


Coefficients: [0.9999999999999992]
Intercept: 15.000000000000009


In [2]:
# Practice: Logistic Regression
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.linalg import Vectors

data = [
    (1, Vectors.dense([2.0, 3.0]), 0),
    (2, Vectors.dense([1.0, 5.0]), 1),
    (3, Vectors.dense([2.5, 4.5]), 1),
    (4, Vectors.dense([3.0, 6.0]), 0)
]

columns = ['ID', 'Features', 'Label']
df = spark.createDataFrame(data, columns)

lr = LogisticRegression(featuresCol='Features', labelCol='Label')
model = lr.fit(df)

print("Coefficients:", model.coefficients)
print("Intercept:", model.intercept)



Coefficients: [-12.262057937838394,4.087352269372807]
Intercept: 11.568912735310269


25/11/29 08:16:58 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [6]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans

data = [
    (1, 1.0, 1.0),
    (2, 5.0, 5.0),
    (3, 10.0, 10.0),
    (4, 15.0, 15.0)
]
columns = ['ID', 'x', 'y']
df = spark.createDataFrame(data, columns)

assembler = VectorAssembler(inputCols=['x', 'y'], outputCol='Features')
df_vec = assembler.transform(df)

kmeans = KMeans(featuresCol='Features', k=2)
model = kmeans.fit(df_vec)

centers = model.clusterCenters()
print(f'Cluster Centers: {centers}')


Cluster Centers: [array([12.5, 12.5]), array([3., 3.])]


In [14]:
# ======================================================
# 1. Load Data
# ======================================================
df = spark.read.csv(
    "data.csv",
    header=True,
    inferSchema=True
)

# Drop kolom tidak dipakai
df = df.drop("_c32")

# Label Encoding (diagnosis -> label)
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="diagnosis", outputCol="label")
df = indexer.fit(df).transform(df)

# Vector Assembler (fitur -> features)
feature_cols = [c for c in df.columns if c not in ("id", "diagnosis", "label")]

from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features"
)

df = assembler.transform(df)

# Trainâ€“test split
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

# ======================================================
# 2 Build & Evaluate Model
# ======================================================
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

lr = LogisticRegression(featuresCol="features", labelCol="label")
model = lr.fit(train_df)

predictions = model.transform(test_df)
predictions.select("label", "prediction", "probability").show(10, truncate=False)

# Evaluasi akurasi
evaluator = MulticlassClassificationEvaluator(
    labelCol="label",
    predictionCol="prediction",
    metricName="accuracy"
)

accuracy = evaluator.evaluate(predictions)
print("Accuracy (Before Tuning):", accuracy)

# Confusion Matrix
predictions.groupBy("label", "prediction").count().show()

# ======================================================
# 3 Hyperparameter Tuning + Cross-Validation
# ======================================================
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Model dasar
lr = LogisticRegression(featuresCol="features", labelCol="label")

# Variasi parameter
paramGrid = (
    ParamGridBuilder()
    .addGrid(lr.regParam, [0.01, 0.1, 1.0])        # Regularization
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])  # L2 - mix - L1
    .build()
)

# CrossValidator
cv = CrossValidator(
    estimator=lr,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=5,
    parallelism=2
)

# Train model dengan cross-validation
cv_model = cv.fit(train_df)

# Evaluasi setelah tuning
pred_cv = cv_model.transform(test_df)
accuracy_cv = evaluator.evaluate(pred_cv)

print("Accuracy After Cross-Validation:", accuracy_cv)


+-----+----------+-----------+
|label|prediction|probability|
+-----+----------+-----------+
|0.0  |0.0       |[1.0,0.0]  |
|0.0  |0.0       |[1.0,0.0]  |
|0.0  |0.0       |[1.0,0.0]  |
|0.0  |0.0       |[1.0,0.0]  |
|0.0  |0.0       |[1.0,0.0]  |
|1.0  |1.0       |[0.0,1.0]  |
|1.0  |1.0       |[0.0,1.0]  |
|0.0  |0.0       |[1.0,0.0]  |
|1.0  |1.0       |[0.0,1.0]  |
|1.0  |1.0       |[0.0,1.0]  |
+-----+----------+-----------+
only showing top 10 rows

Accuracy (Before Tuning): 0.9651162790697675
+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  1.0|       1.0|   36|
|  0.0|       1.0|    2|
|  1.0|       0.0|    1|
|  0.0|       0.0|   47|
+-----+----------+-----+



25/11/29 10:14:51 WARN BlockManager: Block rdd_599_0 already exists on this machine; not re-adding it


Accuracy After Cross-Validation: 1.0
