# Hands-On Pertemuan 14: Advanced Machine Learning using Spark MLlib

## Objectives:
- Understand and implement advanced machine learning tasks using Spark MLlib.
- Build and evaluate models using real-world datasets.
- Explore techniques like feature engineering and hyperparameter tuning.


## Introduction to Spark MLlib
Spark MLlib is a scalable library for machine learning that integrates seamlessly with the Spark ecosystem. It supports a wide range of tasks, including regression, classification, clustering, and collaborative filtering.

In [None]:
# Example: Linear Regression with Spark MLlib
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

# Initialize Spark Session
spark = SparkSession.builder.appName('MLlib Example').getOrCreate()

# Load sample data
data = [(1, 5.0, 20.0), (2, 10.0, 25.0), (3, 15.0, 30.0), (4, 20.0, 35.0)]
columns = ['ID', 'Feature', 'Target']
df = spark.createDataFrame(data, columns)

# Prepare data for modeling
assembler = VectorAssembler(inputCols=['Feature'], outputCol='Features')
df_transformed = assembler.transform(df)

# Train a linear regression model
lr = LinearRegression(featuresCol='Features', labelCol='Target')
model = lr.fit(df_transformed)

# Print model coefficients
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')

In [None]:
# Practice: Logistic Regression
from pyspark.ml.classification import LogisticRegression

# Inisialisasi SparkSession
spark = SparkSession.builder.appName("LogisticRegressionExample").getOrCreate()

# Contoh dataset
data = [(1, 2.0, 3.0, 0), (2, 1.0, 5.0, 1), (3, 2.5, 4.5, 1), (4, 3.0, 6.0, 0)]
columns = ['ID', 'Feature1', 'Feature2', 'Label']
df = spark.createDataFrame(data, columns)

# mengubah kolom feature1 dan feature2 menjadi vector
assembler = VectorAssembler(inputCols=['Feature1', 'Feature2'], outputCol='Features')
df = assembler.transform(df)

# melatih model
lr = LogisticRegression(featuresCol='Features', labelCol='Label')
model = lr.fit(df)

# menampilkan hasil
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')

In [None]:
# Practice: Logistic Regression
from pyspark.ml.classification import LogisticRegression

# Example dataset
data = [(1, [2.0, 3.0], 0), (2, [1.0, 5.0], 1), (3, [2.5, 4.5], 1), (4, [3.0, 6.0], 0)]
columns = ['ID', 'Features', 'Label']
df = spark.createDataFrame(data, columns)

# Train logistic regression model
lr = LogisticRegression(featuresCol='Features', labelCol='Label')
model = lr.fit(df)

# Display coefficients and summary
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')


In [None]:
# Practice: Logistic Regression
from pyspark.ml.classification import LogisticRegression

# contoh dataset
data = [(1, 2.0, 3.0, 0), (2, 1.0, 5.0, 1), (3, 2.5, 4.5, 1), (4, 3.0, 6.0, 0)]
columns = ['ID', 'Feature1', 'Feature2', 'Label']
df = spark.createDataFrame(data, columns)

# menguhab kolom feature1 dan feature2 menjadi vector
assembler = VectorAssembler(inputCols=['Feature1', 'Feature2'], outputCol='Features')
df = assembler.transform(df)

# melatih model
lr = LogisticRegression(featuresCol='Features', labelCol='Label')
model = lr.fit(df)

# menampilkan hasil
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')

In [None]:
# Practice: KMeans Clustering
from pyspark.ml.clustering import KMeans

# Example dataset
data = [(1, [1.0, 1.0]), (2, [5.0, 5.0]), (3, [10.0, 10.0]), (4, [15.0, 15.0])]
columns = ['ID', 'Features']
df = spark.createDataFrame(data, columns)

# Train KMeans clustering model
kmeans = KMeans(featuresCol='Features', k=2)
model = kmeans.fit(df)

# Show cluster centers
centers = model.clusterCenters()
print(f'Cluster Centers: {centers}')


## Homework
- Load a real-world dataset into Spark and prepare it for machine learning tasks.
- Build a classification model using Spark MLlib and evaluate its performance.
- Explore hyperparameter tuning using cross-validation.


In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

spark = SparkSession.builder.appName("Homework").getOrCreate()
dataPath = "aom.us.txt"
df = spark.read.csv(dataPath, header=True, inferSchema=True)
df.show(5)

df = df.withColumn("Target", (df["Close"] > df["Open"]).cast("int"))
df = df.select("Open", "High", "Low", "Close", "Volume", "Target")
df = df.na.drop()

featureCols = ["Open", "High", "Low", "Close", "Volume"]
assembler = VectorAssembler(inputCols=featureCols, outputCol="Features")
df = assembler.transform(df)
trainDf, testDf = df.randomSplit([0.8, 0.2], seed=42)

lr = LogisticRegression(featuresCol="Features", labelCol="Target")
model = lr.fit(trainDf)
predictions = model.transform(testDf)
evaluator = MulticlassClassificationEvaluator(labelCol="Target", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Model Accuracy: {accuracy}")

paramGrid = ParamGridBuilder().addGrid(lr.regParam, [0.01, 0.1, 1.0]).addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]).build()
crossval = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)
cvModel = crossval.fit(trainDf)
bestModel = cvModel.bestModel
predictions = bestModel.transform(testDf)
accuracy = evaluator.evaluate(predictions)
print(f"Best Model Accuracy after Tuning: {accuracy}")

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# Inisialisasi Spark Session
spark = SparkSession.builder.appName("COVID Analysis").getOrCreate()

# Baca dataset
dataPath = "covid.csv"  # Ganti dengan lokasi file Anda
df = spark.read.csv(dataPath, header=True, inferSchema=True)
df.show(5)

# Menambahkan kolom Target (misalnya: apakah ada kasus baru 'new_confirmed')
df = df.withColumn("Target", (df["new_confirmed"] > 0).cast("int"))

# Pilih kolom yang relevan untuk analisis
df = df.select("last_available_confirmed", "last_available_deaths", "estimated_population", "Target").na.drop()

# Membuat kolom fitur
featureCols = ["last_available_confirmed", "last_available_deaths", "estimated_population"]
assembler = VectorAssembler(inputCols=featureCols, outputCol="Features")
df = assembler.transform(df)

# Split data menjadi train dan test
trainDf, testDf = df.randomSplit([0.8, 0.2], seed=42)

# Inisialisasi Logistic Regression
lr = LogisticRegression(featuresCol="Features", labelCol="Target")
model = lr.fit(trainDf)

# Evaluasi model
predictions = model.transform(testDf)
evaluator = MulticlassClassificationEvaluator(labelCol="Target", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Model Accuracy: {accuracy}")

# Tuning Hyperparameter
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.1, 1.0]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

crossval = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)
cvModel = crossval.fit(trainDf)

# Evaluasi Model Terbaik
bestModel = cvModel.bestModel
predictions = bestModel.transform(testDf)
accuracy = evaluator.evaluate(predictions)
print(f"Best Model Accuracy after Tuning: {accuracy}")