Linear Regression with Spark MLlib

In [13]:
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

# Initialize Spark Session
spark = SparkSession.builder.appName("MLlib Example").getOrCreate()

# Load sample data
data = [(1, 5.0, 20.0), (2, 10.0, 25.0), (3, 15.0, 30.0), (4, 20.0, 35.0)]
columns = ['ID', 'Feature', 'Target']
df = spark.createDataFrame(data, columns)

# Prepare data for Modeling
assembler = VectorAssembler(inputCols=['Feature'], outputCol='features')
df_transformed = assembler.transform(df)

# Train a Linear Regression model
lr = LinearRegression(featuresCol='features', labelCol='Target')
model = lr.fit(df_transformed)

# Print model coefficients
print(f"Coefficients: {model.coefficients}")
print(f"Intercept: {model.intercept}")



24/12/04 09:52:56 WARN Instrumentation: [f9c9e5be] regParam is zero, which might cause numerical instability and overfitting.


Coefficients: [0.9999999999999992]
Intercept: 15.000000000000009


Logistic Regression

In [14]:
# Practice: Logistic Regression
from pyspark.ml.classification import LogisticRegression

# Example dataset
data = [(1, 2.0, 3.0, 0), (2, 1.0, 5.0, 1), (3, 2.5, 4.5, 1), (4, 3.0, 6.0, 0)]
columns = ['ID', 'Feature1', 'Feature2', 'Label']
df = spark.createDataFrame(data, columns)

# Use VectorAssembler to change features columns to a vector
assembler = VectorAssembler(inputCols=['Feature1', 'Feature2'], outputCol='Features')

# Transform the data
df = assembler.transform(df)

# Train logistic regression model
lr = LogisticRegression(featuresCol='Features', labelCol='Label')
model = lr.fit(df)

# Display coefficients and summary
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')

Coefficients: [-12.262057891021882,4.087352253767593]
Intercept: 11.568912688492174


KMeans Clustering


In [15]:
# Practice: KMeans Clustering
from pyspark.ml.clustering import KMeans
from pyspark.ml.linalg import Vectors
# Example dataset
#data = [(1, [1.0, 1.0]), (2, [5.0, 5.0]), (3, [10.0, 10.0]), (4, [15.0, 15.0])]
# columns = ['ID', 'Features']
# df = spark.createDataFrame(data, columns)

# Example dataset
data = [(1, Vectors.dense([1.0, 1.0])), (2, Vectors.dense([5.0, 5.0])), (3, Vectors.dense([10.0, 10.0])), (4, Vectors.dense([15.0, 15.0]))]
columns = ['ID', 'Features']
df = spark.createDataFrame(data, columns)

# Train KMeans clustering model
kmeans = KMeans(featuresCol='Features', k=2)
model = kmeans.fit(df)

# Show cluster centers
centers = model.clusterCenters()
print(f'Cluster Centers: {centers}')


Cluster Centers: [array([12.5, 12.5]), array([3., 3.])]


Home Work

Load Dataset into Spark


In [16]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, FloatType

# Inisialisasi Spark Session
spark = SparkSession.builder.appName("Stock_Classification").getOrCreate()

# Muat file TXT (ganti path sesuai lokasi file)
data = spark.read.option("delimiter", ",").csv("/workspaces/Praktikum-Big-Data/agu-us.txt", header=True, inferSchema=True)

# Periksa data yang dimuat
data.show(5)


24/12/04 09:53:17 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


+----------+------+------+------+------+-------+-------+
|      Date|  Open|  High|   Low| Close| Volume|OpenInt|
+----------+------+------+------+------+-------+-------+
|2005-02-25|14.651|14.769| 14.61|14.769| 568538|      0|
|2005-02-28|14.937|15.241|14.803|15.241|1238865|      0|
|2005-03-01|15.215|15.289|14.879| 15.17| 906195|      0|
|2005-03-02|15.088|15.088| 14.75|14.944| 744314|      0|
|2005-03-03|15.005|15.197|14.971| 15.14| 489677|      0|
+----------+------+------+------+------+-------+-------+
only showing top 5 rows



Transformasi Data dan Vektorisasi Fitur

In [18]:
from pyspark.ml.feature import VectorAssembler

from pyspark.sql.functions import lit

# Add a dummy Label column for demonstration
data = data.withColumn("Label", lit(1))

# Definisikan fitur yang akan digunakan dalam model
feature_columns = ["Open", "High", "Low", "Volume"]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

# Transformasikan data ke dalam format fitur vektor
assembled_data = assembler.transform(data).select("features", "Label")

# Periksa hasil transformasi
assembled_data.show(5)


+--------------------+-----+
|            features|Label|
+--------------------+-----+
|[14.651,14.769,14...|    1|
|[14.937,15.241,14...|    1|
|[15.215,15.289,14...|    1|
|[15.088,15.088,14...|    1|
|[15.005,15.197,14...|    1|
+--------------------+-----+
only showing top 5 rows



Bagi Data ke Dalam Set Pelatihan dan Pengujian

In [19]:
# Bagi data menjadi 70% pelatihan dan 30% pengujian
train_data, test_data = assembled_data.randomSplit([0.7, 0.3], seed=42)

Bangun Model Klasifikasi Menggunakan Logistic Regression

In [20]:
from pyspark.ml.classification import LogisticRegression

# Inisialisasi model Logistic Regression
lr = LogisticRegression(labelCol="Label", featuresCol="features")

# Latih model dengan data pelatihan
lr_model = lr.fit(train_data)

# Lakukan prediksi pada data pengujian
predictions = lr_model.transform(test_data)

# Tampilkan beberapa hasil prediksi
predictions.select("Label", "prediction").show(5)


24/12/04 09:53:33 WARN Instrumentation: [b2f68789] All labels are the same value and fitIntercept=true, so the coefficients will be zeros. Training is not needed.


+-----+----------+
|Label|prediction|
+-----+----------+
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
+-----+----------+
only showing top 5 rows



Evaluasi Kinerja Model

In [21]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Inisialisasi evaluator dengan metrik AUC (Area Under ROC)
evaluator = BinaryClassificationEvaluator(labelCol="Label", metricName="areaUnderROC")

# Hitung dan tampilkan nilai AUC
auc = evaluator.evaluate(predictions)
print(f"Test AUC: {auc}")


Test AUC: 1.0


Penyetelan Hiperparameter Menggunakan Validasi Silang

In [None]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Buat grid parameter untuk diuji
param_grid = (ParamGridBuilder()
              .addGrid(lr.regParam, [0.01, 0.1, 0.5])       # Parameter regularisasi
              .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) # Parameter elastic net
              .build())

# Buat CrossValidator dengan 5 fold
cross_val = CrossValidator(estimator=lr,
                           estimatorParamMaps=param_grid,
                           evaluator=evaluator,
                           numFolds=5)

# Latih model dengan validasi silang
cv_model = cross_val.fit(train_data)

# Dapatkan model terbaik dari validasi silang
best_model = cv_model.bestModel

# Evaluasi model terbaik pada data pengujian
best_predictions = best_model.transform(test_data)
best_auc = evaluator.evaluate(best_predictions)
print(f"Best Model Test AUC: {best_auc}")


24/12/04 09:54:22 WARN Instrumentation: [d3d81d59] All labels are the same value and fitIntercept=true, so the coefficients will be zeros. Training is not needed.
24/12/04 09:54:23 WARN Instrumentation: [4a4492a9] All labels are the same value and fitIntercept=true, so the coefficients will be zeros. Training is not needed.
24/12/04 09:54:23 WARN Instrumentation: [b9a8368f] All labels are the same value and fitIntercept=true, so the coefficients will be zeros. Training is not needed.
24/12/04 09:54:24 WARN Instrumentation: [6146e3e6] All labels are the same value and fitIntercept=true, so the coefficients will be zeros. Training is not needed.
24/12/04 09:54:24 WARN Instrumentation: [458ace70] All labels are the same value and fitIntercept=true, so the coefficients will be zeros. Training is not needed.
24/12/04 09:54:24 WARN Instrumentation: [1f658a13] All labels are the same value and fitIntercept=true, so the coefficients will be zeros. Training is not needed.
24/12/04 09:54:24 WARN

Best Model Test AUC: 1.0
