In [1]:
# Example: Linear Regression with Spark MLlib
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

# Initialize Spark Session
spark = SparkSession.builder.appName('MLlib Example').getOrCreate()

# Load sample data
data = [(1, 5.0, 20.0), (2, 10.0, 25.0), (3, 15.0, 30.0), (4, 20.0, 35.0)]
columns = ['ID', 'Feature', 'Target']
df = spark.createDataFrame(data, columns)

# Prepare data for modeling
assembler = VectorAssembler(inputCols=['Feature'], outputCol='Features')
df_transformed = assembler.transform(df)

# Train a linear regression model
lr = LinearRegression(featuresCol='Features', labelCol='Target')
model = lr.fit(df_transformed)

# Print model coefficients
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/11/24 12:51:55 WARN Utils: Your hostname, rahma, resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
25/11/24 12:51:55 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/24 12:52:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/11/24 12:52:50 WARN Instrumentation: [42ccae65] regParam is zero, which might cause numerical instability and overfitting.
25/11/24 12:52:59 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
25/11/24 12:53:00 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JN

Coefficients: [0.9999999999999992]
Intercept: 15.000000000000009


In [5]:
# Practice: Logistic Regression
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vectors, VectorUDT 
from pyspark.sql.functions import udf 

# Example dataset
data = [(1, [2.0, 3.0], 0), (2, [1.0, 5.0], 1), (3, [2.5, 4.5], 1), (4, [3.0, 6.0], 0)]
columns = ['ID', 'Features_Array', 'Label']
df = spark.createDataFrame(data, columns)

# Define a UDF to convert ArrayType to VectorUDT
array_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())

# Apply the UDF to create the 'Features' column directly
df_transformed = df.withColumn("Features", array_to_vector_udf("Features_Array"))

# Train logistic regression model
lr = LogisticRegression(featuresCol='Features', labelCol='Label')
model = lr.fit(df_transformed)

# Display coefficients and summary
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')

                                                                                

Coefficients: [-12.262057936866103,4.0873522690497905]
Intercept: 11.568912734332656


In [11]:
# Practice: KMeans Clustering
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vectors, VectorUDT 
from pyspark.sql.functions import udf 

# Example dataset
data = [(1, [1.0, 1.0]), (2, [5.0, 5.0]), (3, [10.0, 10.0]), (4, [15.0, 15.0])]
columns = ['ID', 'Features_Array'] 
df = spark.createDataFrame(data, columns)

# Define a UDF to convert ArrayType to VectorUDT
array_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())

# Apply the UDF to create the 'Features' column directly
df_transformed = df.withColumn("Features", array_to_vector_udf("Features_Array"))

# Train KMeans clustering model
kmeans = KMeans(featuresCol='Features', k=2)
model = kmeans.fit(df_transformed)

# Show cluster centers
centers = model.clusterCenters()
print(f'Cluster Centers: {centers}')



Cluster Centers: [array([12.5, 12.5]), array([3., 3.])]


                                                                                

In [57]:
#Homework

In [58]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.sql.types import DoubleType

In [59]:
spark = SparkSession.builder.appName("TitanicMLlibHomework").getOrCreate()

data = spark.read.csv(
    "titanic_cleaned.csv",
    header=True,
    inferSchema=True
)

data = data.withColumn("Survived", data["Survived"].cast(DoubleType()))

FEATURE_COLS = [
    'Pclass', 'Sex', 'Age', 'SibSp', 'Parch', 'Fare', 'Embarked'
]
LABEL_COL = 'Survived'

assembler = VectorAssembler(
    inputCols=FEATURE_COLS,
    outputCol="rawFeatures"
)

scaler = StandardScaler(
    inputCol="rawFeatures",
    outputCol="features",
    withStd=True,
    withMean=False
)

train_data, test_data = data.randomSplit([0.7, 0.3], seed=42)
print("Data Preparation Complete.")
data.show(5)

Data Preparation Complete.
+--------------------+--------+------+----+---+-------------------+-----+-----+------+--------------------+-----+--------+
|         PassengerId|Survived|Pclass|Name|Sex|                Age|SibSp|Parch|Ticket|                Fare|Cabin|Embarked|
+--------------------+--------+------+----+---+-------------------+-----+-----+------+--------------------+-----+--------+
|                 0.0|     0.0|   1.0| 108|  1| 0.2711736617240512|0.125|  0.0|   523|0.014151057562208049|  147|       2|
|0.001123595505617...|     1.0|   0.0| 190|  0| 0.4722292033174164|0.125|  0.0|   596| 0.13913573538264068|   81|       0|
|0.002247191011235955|     1.0|   1.0| 353|  0|0.32143754712239253|  0.0|  0.0|   669|0.015468569817999833|  147|       2|
|0.003370786516853...|     1.0|   0.0| 272|  0| 0.4345312892686604|0.125|  0.0|    49| 0.10364429745562033|   55|       2|
|0.004494382022471...|     0.0|   1.0|  15|  1| 0.4345312892686604|  0.0|  0.0|   472|0.015712553569072387|  147

In [65]:
lr_model_baseline = model_baseline.stages[-1]

print("Koefisien (Vector):")
print(lr_model_baseline.coefficients) 

print("\nIntercept (Scalar):")
print(lr_model_baseline.intercept)

Koefisien (Vector):
[-0.6997331007788276,-1.259628672572061,-0.4443307818378029,-0.47817359850907015,-0.0731722378047105,0.17298027075355055,-0.22634632112264633]

Intercept (Scalar):
3.708890815818252


In [43]:
pipeline_baseline = Pipeline(stages=[assembler, scaler, lr])

model_baseline = pipeline_baseline.fit(train_data)

predictions_baseline = model_baseline.transform(test_data)

mc_evaluator = MulticlassClassificationEvaluator(
    labelCol=LABEL_COL,
    predictionCol="prediction",
    metricName="accuracy"
)

accuracy_baseline = mc_evaluator.evaluate(predictions_baseline)

print(f"Akurasi Model Baseline (tanpa Tuning): {accuracy_baseline:.4f}")

auc_evaluator = BinaryClassificationEvaluator(
    labelCol=LABEL_COL,
    rawPredictionCol="rawPrediction",
    metricName="areaUnderROC"
)
auc_baseline = auc_evaluator.evaluate(predictions_baseline)
print(f"AUC Model Baseline (tanpa Tuning): {auc_baseline:.4f}")

Akurasi Model Baseline (tanpa Tuning): 0.7931
AUC Model Baseline (tanpa Tuning): 0.8851


In [None]:
paramGrid = (
    ParamGridBuilder()
    .addGrid(lr.regParam, [0.01, 0.1, 0.5])
    .addGrid(lr.maxIter, [10, 50, 100])
    .build()
)

cv = CrossValidator(
    estimator=pipeline_baseline, 
    estimatorParamMaps=paramGrid,
    evaluator=auc_evaluator,
    numFolds=5,
    seed=42
)

cvModel = cv.fit(train_data)
best_model = cvModel.bestModel

predictions_tuned = cvModel.transform(test_data)
accuracy_tuned = mc_evaluator.evaluate(predictions_tuned)
auc_tuned = auc_evaluator.evaluate(predictions_tuned)

best_lr_model = best_model.stages[-1]

print("Performa Model Terbaik (Setelah Tuning) ")
print(f"Akurasi Model Terbaik (Tuned): {accuracy_tuned:.4f}")
print(f"Area Under ROC (AUC) Model Terbaik (Tuned): {auc_tuned:.4f}")

print("\nHyperparameters Terbaik")
print(f"Best regParam: {best_lr_model.getRegParam()}")
print(f"Best maxIter: {best_lr_model.getMaxIter()}")