<a href="https://colab.research.google.com/github/ASN-Lab/Big-Data/blob/main/Week_14.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Week 14 (Spark Mlib for Machine Learning)

##Practice

###Spark Mlib

In [1]:
# Example: Linear Regression with Spark MLlib
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

# Initialize Spark Session
spark = SparkSession.builder.appName('MLlib Example').getOrCreate()

# Load sample data
data = [(1, 5.0, 20.0), (2, 10.0, 25.0), (3, 15.0, 30.0), (4, 20.0, 35.0)]
columns = ['ID', 'Feature', 'Target']
df = spark.createDataFrame(data, columns)

# Prepare data for modeling
assembler = VectorAssembler(inputCols=['Feature'], outputCol='Features')
df_transformed = assembler.transform(df)

# Train a linear regression model
lr = LinearRegression(featuresCol='Features', labelCol='Target')
model = lr.fit(df_transformed)

# Print model coefficients
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')

Coefficients: [0.9999999999999992]
Intercept: 15.000000000000009


###Logistic Regression

In [4]:
# Practice: Logistic Regression
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler

# Example dataset
# Original: data = [(1, [2.0, 3.0], 0), ...] where 'RawFeatures' was array<double>
# Problem: VectorAssembler does not accept array<double> as an inputCol directly.
# Solution: Treat the elements of the array as separate features by restructuring data.
data = [(1, 2.0, 3.0, 0), (2, 1.0, 5.0, 1), (3, 2.5, 4.5, 1), (4, 3.0, 6.0, 0)]
columns = ['ID', 'Feature1', 'Feature2', 'Label'] # Define separate columns for features
df = spark.createDataFrame(data, columns)

# Prepare data for modeling: assemble 'Feature1' and 'Feature2' into a vector column named 'Features'
assembler = VectorAssembler(inputCols=['Feature1', 'Feature2'], outputCol='Features')
df_transformed = assembler.transform(df)

# Train logistic regression model
lr = LogisticRegression(featuresCol='Features', labelCol='Label')
model = lr.fit(df_transformed) # Use the transformed DataFrame

# Display coefficients and summary
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')

Coefficients: [-12.262057929180484,4.087352266486688]
Intercept: 11.56891272665312


###kMeans Clustering

In [6]:
# Practice: KMeans Clustering
from pyspark.ml.clustering import KMeans
from pyspark.ml.linalg import Vectors # Import Vectors for DenseVector

# Example dataset
# Use Vectors.dense to create a DenseVector for the features column
data = [(1, Vectors.dense([1.0, 1.0])),
        (2, Vectors.dense([5.0, 5.0])),
        (3, Vectors.dense([10.0, 10.0])),
        (4, Vectors.dense([15.0, 15.0]))]
columns = ['ID', 'Features']
df = spark.createDataFrame(data, columns)

# Train KMeans clustering model
kmeans = KMeans(featuresCol='Features', k=2)
model = kmeans.fit(df)

# Show cluster centers
centers = model.clusterCenters()
print(f'Cluster Centers: {centers}')

Cluster Centers: [array([12.5, 12.5]), array([3., 3.])]


##Homework

###Inisialisasi & load dataset

In [14]:
from pyspark.sql import SparkSession

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("ML_Comparison") \
    .config("spark.driver.memory", "6g") \
    .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()

spark

# Load dataset
csv_path = "/content/full.csv"

df = spark.read.option("header", True).option("inferSchema", True).csv(csv_path)
df.printSchema()
df.show(5)

root
 |-- commit: string (nullable = true)
 |-- author: string (nullable = true)
 |-- date: string (nullable = true)
 |-- message: string (nullable = true)
 |-- repo: string (nullable = true)

+--------------------+--------------------+--------------------+--------------------+-----------------+
|              commit|              author|                date|             message|             repo|
+--------------------+--------------------+--------------------+--------------------+-----------------+
|692bba578efb5e305...|Mortada Mehyar <m...|Wed Apr 21 12:27:...|DOC: add example ...|pandas-dev/pandas|
|855696cde0ef5d80a...|Patrick Hoefler <...|Wed Apr 21 01:23:...|Add keyword sort ...|pandas-dev/pandas|
|eaaefd140289a5103...|attack68 <2425655...|Wed Apr 21 01:21:...|ENH: `Styler.high...|pandas-dev/pandas|
|aab87997058f3c74b...|attack68 <2425655...|Wed Apr 21 01:01:...|ENH: add `decimal...|pandas-dev/pandas|
|9c43cd7675d961740...|Simon Hawkins <si...|Tue Apr 20 23:58:...|[ArrowStringArr

###Data cleaning

In [27]:
# Deduplikasi
from pyspark.sql.functions import col, when, to_timestamp, hour, dayofweek, length
from pyspark.sql.types import IntegerType

df = df.dropDuplicates()
df = df.na.drop(subset=["message"])

# Konversi timestamp
df = df.withColumn("date", to_timestamp("date"))
df = df.filter(col("date").isNotNull())

# Feature engineering
df = (df
      .withColumn("hour", hour("date"))
      .withColumn("day_of_week", dayofweek("date"))
      .withColumn("msg_len", length("message"))
     )

# Labeling for classification: Create a binary label based on msg_len
# Calculate the median message length
median_msg_len = df.approxQuantile("msg_len", [0.5], 0.01)[0]
df = df.withColumn("label_class", when(col("msg_len") > median_msg_len, 1).otherwise(0).cast(IntegerType()))

print(f"Created 'label_class' column based on median 'msg_len' ({median_msg_len:.2f}).")

Created 'label_class' column based on median 'msg_len' (6.00).


###Feature & pipelin

In [28]:
from pyspark.ml.feature import VectorAssembler, MinMaxScaler
from pyspark.ml import Pipeline

feature_cols = ["hour", "day_of_week", "msg_len"]

assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
scaler = MinMaxScaler(inputCol="features", outputCol="scaled_features")

###Training

In [29]:
train, test = df.randomSplit([0.8, 0.2], seed=42)

###Clasification: Logistic regression model

In [30]:
from pyspark.ml.classification import LogisticRegression

lr_clf = LogisticRegression(
    featuresCol="scaled_features",
    labelCol="label_class",
    maxIter=20
)

pipeline_clf = Pipeline(stages=[assembler, scaler, lr_clf])
model_clf = pipeline_clf.fit(train)

pred_clf = model_clf.transform(test)
pred_clf.select("scaled_features", "label_class", "prediction").show(10)

+--------------------+-----------+----------+
|     scaled_features|label_class|prediction|
+--------------------+-----------+----------+
|[0.0,0.6666666666...|          0|       0.0|
|[0.0,0.3333333333...|          1|       1.0|
|[0.0,0.6666666666...|          1|       1.0|
|[0.0,0.8333333333...|          1|       1.0|
|[0.0,0.8333333333...|          1|       1.0|
|[0.0,0.1666666666...|          0|       0.0|
|[0.0,0.1666666666...|          1|       1.0|
|[0.0,0.1666666666...|          0|       0.0|
|[0.0,0.6666666666...|          1|       1.0|
|           (3,[],[])|          0|       0.0|
+--------------------+-----------+----------+
only showing top 10 rows



In [31]:
# Evaluate model
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator_clf = BinaryClassificationEvaluator(
    labelCol="label_class",
    rawPredictionCol="rawPrediction",
    metricName="areaUnderROC"
)

auc = evaluator_clf.evaluate(pred_clf)
print("AUC Logistic Regression =", auc)

AUC Logistic Regression = 1.0


###Regresi: Linear regression model

In [33]:
from pyspark.ml.regression import LinearRegression

lr_reg = LinearRegression(
    featuresCol="scaled_features",
    labelCol="msg_len",
    maxIter=20
)

pipeline_reg = Pipeline(stages=[assembler, scaler, lr_reg])
model_reg = pipeline_reg.fit(train)

pred_reg = model_reg.transform(test)
pred_reg.select("msg_len", "prediction").show(10)

+-------+------------------+
|msg_len|        prediction|
+-------+------------------+
|      5| 4.999999999999998|
|     11| 11.00000000000002|
|     18|18.000000000000007|
|     18|17.999999999999996|
|     18|17.999999999999996|
|      5| 5.000000000000025|
|     29|29.000000000000036|
|      5| 5.000000000000025|
|     11|11.000000000000002|
|      5| 5.000000000000034|
+-------+------------------+
only showing top 10 rows



In [35]:
# Evaluate model
from pyspark.ml.evaluation import RegressionEvaluator

evaluator_reg = RegressionEvaluator(
    labelCol="msg_len",
    predictionCol="prediction",
    metricName="rmse"
)

rmse = evaluator_reg.evaluate(pred_reg)
print("RMSE Linear Regression =", rmse)

RMSE Linear Regression = 1.9928651389931068e-14


###Hyperparameter

In [36]:
# Logistic regression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

param_grid_clf = (ParamGridBuilder()
                  .addGrid(lr_clf.regParam, [0.01, 0.1, 1.0])
                  .addGrid(lr_clf.elasticNetParam, [0.0, 0.5, 1.0])
                  .build())

cv_clf = CrossValidator(
    estimator=pipeline_clf,
    estimatorParamMaps=param_grid_clf,
    evaluator=evaluator_clf,
    numFolds=3
)

cv_clf_model = cv_clf.fit(train)
cv_pred = cv_clf_model.transform(test)

auc_cv = evaluator_clf.evaluate(cv_pred)
print("AUC Logistic Regression (Cross-Validated) =", auc_cv)

AUC Logistic Regression (Cross-Validated) = 1.0


In [37]:
# Linear regression
param_grid_reg = (ParamGridBuilder()
                  .addGrid(lr_reg.regParam, [0.01, 0.1, 1.0])
                  .addGrid(lr_reg.elasticNetParam, [0.0, 0.5, 1.0])
                  .build())

cv_reg = CrossValidator(
    estimator=pipeline_reg,
    estimatorParamMaps=param_grid_reg,
    evaluator=evaluator_reg,
    numFolds=3
)

cv_reg_model = cv_reg.fit(train)
cv_reg_pred = cv_reg_model.transform(test)

rmse_cv = evaluator_reg.evaluate(cv_reg_pred)
print("RMSE Linear Regression (Cross-Validated) =", rmse_cv)

RMSE Linear Regression (Cross-Validated) = 0.008741489800610154
