In [1]:
# Example: Linear Regression with Spark MLlib
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

# Initialize Spark Session
spark = SparkSession.builder.appName('MLlib Example').getOrCreate()

# Load sample data
data = [(1, 5.0, 20.0), (2, 10.0, 25.0), (3, 15.0, 30.0), (4, 20.0, 35.0)]
columns = ['ID', 'Feature', 'Target']
df = spark.createDataFrame(data, columns)

# Prepare data for modeling
assembler = VectorAssembler(inputCols=['Feature'], outputCol='Features')
df_transformed = assembler.transform(df)

# Train a linear regression model
lr = LinearRegression(featuresCol='Features', labelCol='Target')
model = lr.fit(df_transformed)

# Print model coefficients
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')


2025-11-26 10:18:20,732 WARN util.Utils: Your hostname, david resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
2025-11-26 10:18:20,739 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2025-11-26 10:18:23,755 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2025-11-26 10:18:24,034 WARN spark.SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).
2025-11-26 10:18:56,243 WARN util.Instrumentation: [27dcc3cb] regParam is zero, which might cause numerical instability and overfitting.
2025-11-26 10:19:02,575 WARN netlib.InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.bl

Coefficients: [0.9999999999999992]
Intercept: 15.000000000000009


In [3]:
# Practice: Logistic Regression
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.linalg import Vectors

# Example dataset dengan Vector
data = [
    (1, Vectors.dense([2.0, 3.0]), 0),
    (2, Vectors.dense([1.0, 5.0]), 1),
    (3, Vectors.dense([2.5, 4.5]), 1),
    (4, Vectors.dense([3.0, 6.0]), 0)
]
columns = ['ID', 'Features', 'Label']
df = spark.createDataFrame(data, columns)

# Train logistic regression model
lr = LogisticRegression(featuresCol='Features', labelCol='Label')
model = lr.fit(df)

# Display coefficients and summary
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')

# Optional: Show predictions
predictions = model.transform(df)
predictions.show()

Coefficients: [-12.262057936866103,4.0873522690497905]
Intercept: 11.568912734332656
+---+---------+-----+--------------------+--------------------+----------+
| ID| Features|Label|       rawPrediction|         probability|prediction|
+---+---------+-----+--------------------+--------------------+----------+
|  1|[2.0,3.0]|    0|[0.69314633225017...|[0.66666647815335...|       0.0|
|  2|[1.0,5.0]|    1|[-19.743616142715...|[2.66352302390614...|       1.0|
|  3|[2.5,4.5]|    1|[0.69314689710854...|[0.66666660367746...|       0.0|
|  4|[3.0,6.0]|    0|[0.69314746196691...|[0.66666672920154...|       0.0|
+---+---------+-----+--------------------+--------------------+----------+



In [5]:
# Practice: KMeans Clustering
from pyspark.ml.clustering import KMeans
from pyspark.ml.linalg import Vectors

# Example dataset dengan Vector
data = [
    (1, Vectors.dense([1.0, 1.0])),
    (2, Vectors.dense([5.0, 5.0])),
    (3, Vectors.dense([10.0, 10.0])),
    (4, Vectors.dense([15.0, 15.0]))
]
columns = ['ID', 'Features']
df = spark.createDataFrame(data, columns)

# Train KMeans clustering model
kmeans = KMeans(featuresCol='Features', k=2)
model = kmeans.fit(df)

# Show cluster centers
centers = model.clusterCenters
print(f'Cluster Centers: {centers}')

# Optional: Show predictions
predictions = model.transform(df)
predictions.show()

                                                                                

Cluster Centers: <bound method KMeansModel.clusterCenters of KMeansModel: uid=KMeans_9b76f68cee62, k=2, distanceMeasure=euclidean, numFeatures=2>
+---+-----------+----------+
| ID|   Features|prediction|
+---+-----------+----------+
|  1|  [1.0,1.0]|         1|
|  2|  [5.0,5.0]|         1|
|  3|[10.0,10.0]|         0|
|  4|[15.0,15.0]|         0|
+---+-----------+----------+



In [17]:
from pyspark.sql import SparkSession
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.linalg import Vectors


spark = SparkSession.builder.appName('MLlib Homework').getOrCreate()

data = [
   
    (Vectors.dense([25.0, 35000.0]), 0.0),
    (Vectors.dense([28.0, 42000.0]), 0.0),
    (Vectors.dense([22.0, 30000.0]), 0.0),
    (Vectors.dense([26.0, 38000.0]), 0.0),
    (Vectors.dense([24.0, 32000.0]), 0.0),
    (Vectors.dense([45.0, 85000.0]), 1.0),
    (Vectors.dense([50.0, 90000.0]), 1.0),
    (Vectors.dense([42.0, 80000.0]), 1.0),
    (Vectors.dense([48.0, 95000.0]), 1.0),
    (Vectors.dense([44.0, 87000.0]), 1.0),
    (Vectors.dense([35.0, 55000.0]), 0.0), 
    (Vectors.dense([35.0, 55000.0]), 1.0), 
    (Vectors.dense([40.0, 60000.0]), 0.0),
    (Vectors.dense([40.0, 60000.0]), 1.0),  
    (Vectors.dense([30.0, 75000.0]), 0.0),  
    (Vectors.dense([30.0, 75000.0]), 1.0),  
    (Vectors.dense([45.0, 45000.0]), 0.0),
    (Vectors.dense([45.0, 45000.0]), 1.0),  
    (Vectors.dense([33.0, 48000.0]), 1.0),  
    (Vectors.dense([38.0, 52000.0]), 0.0),  
    (Vectors.dense([29.0, 68000.0]), 1.0),  
    (Vectors.dense([41.0, 58000.0]), 0.0),  
]

df = spark.createDataFrame(data, ['Features', 'Label'])
print("Dataset size:", df.count())
df.show()


train_data, test_data = df.randomSplit([0.7, 0.3], seed=42)

lr = LogisticRegression(featuresCol='Features', labelCol='Label')
model = lr.fit(train_data)

predictions = model.transform(test_data)
evaluator = BinaryClassificationEvaluator(labelCol='Label')
auc = evaluator.evaluate(predictions)

print(f"Initial AUC: {auc:.4f}")


paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.001, 0.01, 0.1, 1.0]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .addGrid(lr.maxIter, [10, 50]) \
    .build()

crossval = CrossValidator(
    estimator=lr,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=3
)

cv_model = crossval.fit(train_data)
best_model = cv_model.bestModel

best_predictions = best_model.transform(test_data)
best_auc = evaluator.evaluate(best_predictions)


print("\nPredictions (focus on challenging samples):")
best_predictions.select('Features', 'Label', 'prediction').filter('Label != prediction').show()

print("\nCorrect predictions:")
best_predictions.select('Features', 'Label', 'prediction').filter('Label = prediction').show(5)


print("\nModel coefficients:")
print(f"Coefficients: {best_model.coefficients}")
print(f"Intercept: {best_model.intercept}")

spark.stop()

2025-11-26 22:24:13,406 WARN spark.SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).


Dataset size: 22
+--------------+-----+
|      Features|Label|
+--------------+-----+
|[25.0,35000.0]|  0.0|
|[28.0,42000.0]|  0.0|
|[22.0,30000.0]|  0.0|
|[26.0,38000.0]|  0.0|
|[24.0,32000.0]|  0.0|
|[45.0,85000.0]|  1.0|
|[50.0,90000.0]|  1.0|
|[42.0,80000.0]|  1.0|
|[48.0,95000.0]|  1.0|
|[44.0,87000.0]|  1.0|
|[35.0,55000.0]|  0.0|
|[35.0,55000.0]|  1.0|
|[40.0,60000.0]|  0.0|
|[40.0,60000.0]|  1.0|
|[30.0,75000.0]|  0.0|
|[30.0,75000.0]|  1.0|
|[45.0,45000.0]|  0.0|
|[45.0,45000.0]|  1.0|
|[33.0,48000.0]|  1.0|
|[38.0,52000.0]|  0.0|
+--------------+-----+
only showing top 20 rows

Initial AUC: 0.9000

Predictions (focus on challenging samples):
+--------------+-----+----------+
|      Features|Label|prediction|
+--------------+-----+----------+
|[29.0,68000.0]|  1.0|       0.0|
|[40.0,60000.0]|  0.0|       1.0|
+--------------+-----+----------+


Correct predictions:
+--------------+-----+----------+
|      Features|Label|prediction|
+--------------+-----+----------+
|[25.0,3500