In [1]:
# Example: Linear Regression with Spark MLlib
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

# Initialize Spark Session
spark = SparkSession.builder.appName('MLlib Example').getOrCreate()

# Load sample data
data = [(1, 5.0, 20.0), (2, 10.0, 25.0), (3, 15.0, 30.0), (4, 20.0, 35.0)]
columns = ['ID', 'Feature', 'Target']
df = spark.createDataFrame(data, columns)

# Prepare data for modeling
assembler = VectorAssembler(inputCols=['Feature'], outputCol='Features')
df_transformed = assembler.transform(df)

# Train a linear regression model
lr = LinearRegression(featuresCol='Features', labelCol='Target')
model = lr.fit(df_transformed)

# Print model coefficients
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')

25/12/03 10:43:33 WARN Utils: Your hostname, Rama resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
25/12/03 10:43:33 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


25/12/03 10:43:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/12/03 10:43:49 WARN Instrumentation: [a5c0ee22] regParam is zero, which might cause numerical instability and overfitting.




25/12/03 10:43:53 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
25/12/03 10:43:53 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS


                                                                                

25/12/03 10:43:53 WARN InstanceBuilder$NativeLAPACK: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
Coefficients: [0.9999999999999992]
Intercept: 15.000000000000009


----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 37110)
Traceback (most recent call last):
  File "/home/wleowleo/anaconda3/envs/spark-env/lib/python3.10/socketserver.py", line 316, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/home/wleowleo/anaconda3/envs/spark-env/lib/python3.10/socketserver.py", line 347, in process_request
    self.finish_request(request, client_address)
  File "/home/wleowleo/anaconda3/envs/spark-env/lib/python3.10/socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/home/wleowleo/anaconda3/envs/spark-env/lib/python3.10/socketserver.py", line 747, in __init__
    self.handle()
  File "/home/wleowleo/anaconda3/envs/spark-env/lib/python3.10/site-packages/pyspark/accumulators.py", line 281, in handle
    poll(accum_updates)
  File "/home/wleowleo/anaconda3/envs/spark-env/lib/python3.10/site-packages/pyspark/a

In [4]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.linalg import Vectors

# Dataset dengan DenseVector
data = [
    (1, Vectors.dense(2.0, 3.0), 0),
    (2, Vectors.dense(1.0, 5.0), 1),
    (3, Vectors.dense(2.5, 4.5), 1),
    (4, Vectors.dense(3.0, 6.0), 0)
]

df = spark.createDataFrame(data, ['ID', 'Features', 'Label'])

# Logistic Regression
lr = LogisticRegression(featuresCol='Features', labelCol='Label')
model = lr.fit(df)

print("Coefficients:", model.coefficients)
print("Intercept:", model.intercept)


                                                                                

Coefficients: [-12.26205794067705,4.087352270317018]
Intercept: 11.568912738159003


In [5]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.linalg import Vectors

data = [
    (1, Vectors.dense([1.0, 1.0])),
    (2, Vectors.dense([5.0, 5.0])),
    (3, Vectors.dense([10.0, 10.0])),
    (4, Vectors.dense([15.0, 15.0]))
]

df = spark.createDataFrame(data, ['ID', 'Features'])

kmeans = KMeans(featuresCol='Features', k=2)
model = kmeans.fit(df)

centers = model.clusterCenters()
print("Cluster Centers:", centers)


[Stage 45:>                                                         (0 + 5) / 5]

Cluster Centers: [array([12.5, 12.5]), array([3., 3.])]


                                                                                

In [2]:
#1.Setup SparkSession
# Membuat session Spark agar bisa menjalankan PySpark.
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Homework") \
    .getOrCreate()

#2. Load Dataset
df = spark.read.csv("Housing.csv", header=True, inferSchema=True)
df.printSchema()
df.show(5)
df.count()

root
 |-- price: integer (nullable = true)
 |-- area: integer (nullable = true)
 |-- bedrooms: integer (nullable = true)
 |-- bathrooms: integer (nullable = true)
 |-- stories: integer (nullable = true)
 |-- mainroad: string (nullable = true)
 |-- guestroom: string (nullable = true)
 |-- basement: string (nullable = true)
 |-- hotwaterheating: string (nullable = true)
 |-- airconditioning: string (nullable = true)
 |-- parking: integer (nullable = true)
 |-- prefarea: string (nullable = true)
 |-- furnishingstatus: string (nullable = true)

+--------+----+--------+---------+-------+--------+---------+--------+---------------+---------------+-------+--------+----------------+
|   price|area|bedrooms|bathrooms|stories|mainroad|guestroom|basement|hotwaterheating|airconditioning|parking|prefarea|furnishingstatus|
+--------+----+--------+---------+-------+--------+---------+--------+---------------+---------------+-------+--------+----------------+
|13300000|7420|       4|        2|      3|

545

In [3]:
from pyspark.sql.functions import when, col

median_price = df.approxQuantile("price", [0.5], 0.0)[0]

df = df.withColumn("label", when(col("price") > median_price, 1).otherwise(0))
df.select("price", "label").show(10)


+--------+-----+
|   price|label|
+--------+-----+
|13300000|    1|
|12250000|    1|
|12250000|    1|
|12215000|    1|
|11410000|    1|
|10850000|    1|
|10150000|    1|
|10150000|    1|
| 9870000|    1|
| 9800000|    1|
+--------+-----+
only showing top 10 rows



In [4]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline

# Categorical → Indexed
indexer = StringIndexer(inputCol="furnishingstatus", outputCol="furnishingstatus_index")

# OneHot → Vector
encoder = OneHotEncoder(inputCols=["furnishingstatus_index"],
                        outputCols=["furnishingstatus_vec"])


In [5]:
from pyspark.ml.feature import VectorAssembler

numeric_cols = ["area", "bedrooms", "bathrooms", "stories"]

assembler = VectorAssembler(
    inputCols=numeric_cols + ["furnishingstatus_vec"],
    outputCol="features"
)


In [6]:
train, test = df.randomSplit([0.8, 0.2], seed=123)


In [7]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(
    labelCol="label",
    featuresCol="features"
)


In [8]:
pipeline = Pipeline(stages=[indexer, encoder, assembler, rf])


In [9]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

paramGrid = (ParamGridBuilder()
             .addGrid(rf.numTrees, [20, 50, 100])
             .addGrid(rf.maxDepth, [3, 5, 10])
             .build())

evaluator = BinaryClassificationEvaluator()

cv = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=3
)


In [10]:
cv_model = cv.fit(train)
best_model = cv_model.bestModel


                                                                                

25/12/04 08:47:26 WARN DAGScheduler: Broadcasting large task binary with size 1009.7 KiB
25/12/04 08:47:26 WARN DAGScheduler: Broadcasting large task binary with size 1167.6 KiB
25/12/04 08:47:26 WARN DAGScheduler: Broadcasting large task binary with size 1243.9 KiB
25/12/04 08:47:46 WARN DAGScheduler: Broadcasting large task binary with size 1024.6 KiB
25/12/04 08:47:46 WARN DAGScheduler: Broadcasting large task binary with size 1169.6 KiB
25/12/04 08:47:47 WARN DAGScheduler: Broadcasting large task binary with size 1253.1 KiB
25/12/04 08:48:02 WARN DAGScheduler: Broadcasting large task binary with size 1091.2 KiB
25/12/04 08:48:02 WARN DAGScheduler: Broadcasting large task binary with size 1165.9 KiB


In [11]:
pred = best_model.transform(test)
auc = evaluator.evaluate(pred)

print("AUC =", auc)


AUC = 0.8468169761273209


In [12]:
rf_best = best_model.stages[-1]   # RandomForest ada di stage terakhir

print("Best numTrees:", rf_best.getNumTrees)
print("Best maxDepth:", rf_best.getOrDefault("maxDepth"))


Best numTrees: 20
Best maxDepth: 3


In [13]:
importances = rf_best.featureImportances

feature_names = numeric_cols + ["furnishingstatus_vec"]

print("=== Feature Importances ===")
for name, importance in zip(feature_names, importances):
    print(f"{name}: {importance}")


=== Feature Importances ===
area: 0.6586149502561843
bedrooms: 0.18320256482455802
bathrooms: 0.08375230345994504
stories: 0.02520539002624911
furnishingstatus_vec: 0.00307514731315197
