In [3]:
# Example: Linear Regression with Spark MLlib 
from pyspark.sql import SparkSession 
from pyspark.ml.regression import LinearRegression 
from pyspark.ml.feature import VectorAssembler 

# Initialize Spark Session 
spark = SparkSession.builder.appName('MLlib Example').getOrCreate()

# Load sample data 
data = [(1, 5.0, 20.0), (2, 10.0, 25.0), (3, 15.0, 30.0), (4, 20.0, 35.0)]
columns = ['ID', 'Feature', 'Target']
df = spark.createDataFrame(data, columns) 

# Prepare data for modeling 
assembler = VectorAssembler(inputCols=['Feature'], outputCol='Features')
df_transformed = assembler.transform(df) 

# Train a linear regression model 
lr = LinearRegression(featuresCol='Features', labelCol='Target') 
model = lr.fit(df_transformed) 

# Print model coefficients 
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')

25/12/17 22:41:03 WARN Instrumentation: [ee13875d] regParam is zero, which might cause numerical instability and overfitting.
                                                                                

Coefficients: [0.9999999999999992]
Intercept: 15.000000000000009


In [13]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf

# Dataset
data = [(1, [2.0, 3.0], 0), (2, [1.0, 5.0], 1), (3, [2.5, 4.5], 1), (4, [3.0, 6.0], 0)]
columns = ['ID', 'Features', 'Label']
df = spark.createDataFrame(data, columns)

to_vector = udf(lambda x: Vectors.dense(x), VectorUDT())
df = df.withColumn("Features", to_vector("Features"))

# Train Logistic Regression
lr = LogisticRegression(featuresCol='Features', labelCol='Label')
model = lr.fit(df)

# Output
print("Coefficients:", model.coefficients)
print("Intercept:", model.intercept)


                                                                                

Coefficients: [-12.262057930705247,4.087352266992818]
Intercept: 11.568912728188634


In [12]:
# Practice: KMeans Clustering
from pyspark.ml.clustering import KMeans

# Example dataset
data = [(1, [1.0, 1.0]), (2, [5.0, 5.0]), (3, [10.0, 10.0]), (4, [15.0, 15.0])]
columns = ['ID', 'Features']
df = spark.createDataFrame(data, columns)

to_vector = udf(lambda x: Vectors.dense(x), VectorUDT())
df = df.withColumn("Features", to_vector("Features"))

# Train KMeans clustering model
kmeans = KMeans(featuresCol='Features', k=2)
model = kmeans.fit(df)

# Show cluster centers
centers = model.clusterCenters()
print(f'Cluster Centers: {centers}')



Cluster Centers: [array([5.33333333, 5.33333333]), array([15., 15.])]


                                                                                

In [15]:
### Homework 
import kagglehub 
import os 


# Download latest version
path = kagglehub.dataset_download("uciml/iris")

print("Path to dataset files:", path)
os.listdir(path)

Path to dataset files: /home/hadoop/.cache/kagglehub/datasets/uciml/iris/versions/2


['Iris.csv', 'database.sqlite']

In [17]:
# Load dataset 
spark = SparkSession.builder.appName("IrisClassification").getOrCreate()

df = spark.read.csv(
    path + "/Iris.csv",
    header=True,
    inferSchema=True
)

df.show(5)
df.printSchema()

25/12/18 03:10:46 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


+---+-------------+------------+-------------+------------+-----------+
| Id|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|    Species|
+---+-------------+------------+-------------+------------+-----------+
|  1|          5.1|         3.5|          1.4|         0.2|Iris-setosa|
|  2|          4.9|         3.0|          1.4|         0.2|Iris-setosa|
|  3|          4.7|         3.2|          1.3|         0.2|Iris-setosa|
|  4|          4.6|         3.1|          1.5|         0.2|Iris-setosa|
|  5|          5.0|         3.6|          1.4|         0.2|Iris-setosa|
+---+-------------+------------+-------------+------------+-----------+
only showing top 5 rows

root
 |-- Id: integer (nullable = true)
 |-- SepalLengthCm: double (nullable = true)
 |-- SepalWidthCm: double (nullable = true)
 |-- PetalLengthCm: double (nullable = true)
 |-- PetalWidthCm: double (nullable = true)
 |-- Species: string (nullable = true)



In [20]:
# Preprocessing 
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=[
        "SepalLengthCm",
        "SepalWidthCm",
        "PetalLengthCm",
        "PetalWidthCm"
    ],
    outputCol="Features"
)

df = assembler.transform(df)

In [21]:
# Encoding String menjadi Numerik
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(
    inputCol="Species",
    outputCol="Label"
)

df = indexer.fit(df).transform(df)

df.select("Features", "Label").show(5)

+-----------------+-----+
|         Features|Label|
+-----------------+-----+
|[5.1,3.5,1.4,0.2]|  0.0|
|[4.9,3.0,1.4,0.2]|  0.0|
|[4.7,3.2,1.3,0.2]|  0.0|
|[4.6,3.1,1.5,0.2]|  0.0|
|[5.0,3.6,1.4,0.2]|  0.0|
+-----------------+-----+
only showing top 5 rows



In [22]:
# Split data
train, test = df.randomSplit([0.8, 0.2], seed=42)

from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(
    featuresCol="Features",
    labelCol="Label"
)

model = lr.fit(train)

In [23]:
# Evaluasi Model
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

predictions = model.transform(test)

evaluator = MulticlassClassificationEvaluator(
    labelCol="Label",
    predictionCol="prediction",
    metricName="accuracy"
)

accuracy = evaluator.evaluate(predictions)
print("Accuracy:", accuracy)

Accuracy: 1.0


In [27]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Parameter grid
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.1, 1.0]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

# Cross Validation
cv = CrossValidator(
    estimator=lr,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=3
)

# Train model
cvModel = cv.fit(train)
bestModel = cvModel.bestModel

print("Best Coefficient Matrix:")
print(bestModel.coefficientMatrix)

print("Best Intercept Vector:")
print(bestModel.interceptVector)

Best Coefficient Matrix:
DenseMatrix([[-1.17604933,  2.36600666, -0.98323465, -2.11817066],
             [ 0.63471523, -0.69736921, -0.15942961, -0.95409922],
             [ 0.5413341 , -1.66863745,  1.14266425,  3.07226988]])
Best Intercept Vector:
[5.666092129745435,2.0110613568025393,-7.677153486547974]
