# Hands-On Pertemuan 14: Advanced Machine Learning using Spark MLlib

## Objectives:
- Understand and implement advanced machine learning tasks using Spark MLlib.
- Build and evaluate models using real-world datasets.
- Explore techniques like feature engineering and hyperparameter tuning.


## Introduction to Spark MLlib
Spark MLlib is a scalable library for machine learning that integrates seamlessly with the Spark ecosystem. It supports a wide range of tasks, including regression, classification, clustering, and collaborative filtering.

In [1]:
# Example: Linear Regression with Spark MLlib
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

# Initialize Spark Session
spark = SparkSession.builder.appName('MLlib Example').getOrCreate()

# Load sample data
data = [(1, 5.0, 20.0), (2, 10.0, 25.0), (3, 15.0, 30.0), (4, 20.0, 35.0)]
columns = ['ID', 'Feature', 'Target']
df = spark.createDataFrame(data, columns)

# Prepare data for modeling
assembler = VectorAssembler(inputCols=['Feature'], outputCol='Features')
df_transformed = assembler.transform(df)

# Train a linear regression model
lr = LinearRegression(featuresCol='Features', labelCol='Target')
model = lr.fit(df_transformed)

# Print model coefficients
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')


Coefficients: [0.9999999999999992]
Intercept: 15.000000000000009


In [3]:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, DoubleType
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression

# Example dataset
data = [(1, [2.0, 3.0], 0), (2, [1.0, 5.0], 1), (3, [2.5, 4.5], 1), (4, [3.0, 6.0], 0)]
columns = ['ID', 'Features', 'Label']
df = spark.createDataFrame(data, columns)

# Define a UDF to convert the array to a DenseVector
@udf(returnType=VectorUDT())
def to_vector(arr):
    return Vectors.dense(arr)

# Apply the UDF to the 'Features' column
df = df.withColumn('Features', to_vector(df['Features']))

# Create a VectorAssembler
assembler = VectorAssembler(inputCols=['Features'], outputCol='Features_vec')

# Transform the DataFrame
df = assembler.transform(df)

# Select the relevant columns and rename 'Features_vec' to 'Features'
df = df.select('ID', 'Features_vec', 'Label').withColumnRenamed('Features_vec', 'Features')

# Train the logistic regression model
lr = LogisticRegression(featuresCol='Features', labelCol='Label')
model = lr.fit(df)

# Display coefficients and summary
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')

Coefficients: [-12.262057929180484,4.087352266486688]
Intercept: 11.56891272665312


In [4]:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, DoubleType
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans

# Example dataset
data = [(1, [1.0, 1.0]), (2, [5.0, 5.0]), (3, [10.0, 10.0]), (4, [15.0, 15.0])]
columns = ['ID', 'Features']
df = spark.createDataFrame(data, columns)

# Define a UDF to convert the array to a DenseVector
@udf(returnType=VectorUDT())
def to_vector(arr):
    return Vectors.dense(arr)

# Apply the UDF to the 'Features' column
df = df.withColumn('Features', to_vector(df['Features']))

# Create a VectorAssembler
assembler = VectorAssembler(inputCols=['Features'], outputCol='Features_vec')

# Transform the DataFrame
df = assembler.transform(df)

# Select the relevant columns and rename 'Features_vec' to 'Features'
df = df.select('ID', 'Features_vec').withColumnRenamed('Features_vec', 'Features')

# Train KMeans clustering model
kmeans = KMeans(featuresCol='Features', k=2)
model = kmeans.fit(df)

# Show cluster centers
centers = model.clusterCenters()
print(f'Cluster Centers: {centers}')

Cluster Centers: [array([12.5, 12.5]), array([3., 3.])]


## Homework
- Load a real-world dataset into Spark and prepare it for machine learning tasks.
- Build a classification model using Spark MLlib and evaluate its performance.
- Explore hyperparameter tuning using cross-validation.


In [5]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [7]:
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.sql.functions import col, when, regexp_replace
from pyspark.sql.types import DoubleType, IntegerType, StructType, StructField, StringType

# Create Spark session
spark = SparkSession.builder \
    .appName('Anime Analysis') \
    .getOrCreate()

# Define schema untuk memastikan pembacaan data yang benar
schema = StructType([
    StructField("Rank", IntegerType(), True),
    StructField("Name", StringType(), True),
    StructField("Japanese_name", StringType(), True),
    StructField("Type", StringType(), True),
    StructField("Episodes", StringType(), True),
    StructField("Studio", StringType(), True),
    StructField("Release_season", StringType(), True),
    StructField("Tags", StringType(), True),
    StructField("Rating", DoubleType(), True),
    StructField("Release_year", IntegerType(), True),
    StructField("End_year", StringType(), True),
    StructField("Description", StringType(), True),
    StructField("Content_Warning", StringType(), True),
    StructField("Related_Mange", StringType(), True),
    StructField("Related_anime", StringType(), True),
    StructField("Voice_actors", StringType(), True),
    StructField("staff", StringType(), True)
])

# Baca CSV dengan parameter yang disesuaikan
df = spark.read.format("csv") \
    .option("header", "true") \
    .option("multiline", "true") \
    .option("escape", "\"") \
    .option("quote", "\"") \
    .option("sep", ",") \
    .schema(schema) \
    .load('/content/drive/MyDrive/Bigdata_Dataset/Anime.csv')

# Bersihkan dan persiapkan data
df_cleaned = df.select(
    col("Rank"),
    col("Type"),
    col("Episodes").cast(IntegerType()),
    col("Release_season"),
    col("Rating"),
    col("Release_year"),
    col("End_year").cast(IntegerType())
)

# Handle null values
df_cleaned = df_cleaned.na.fill({
    "Episodes": 0,
    "Rating": 0.0,
    "End_year": 0,
    "Type": "Unknown",
    "Release_season": "Unknown"
})

# Convert categorical variables
categorical_cols = ["Type", "Release_season"]
indexers = [StringIndexer(inputCol=col, outputCol=col+"_index", handleInvalid="keep")
           for col in categorical_cols]

# Apply indexers
for indexer in indexers:
    df_cleaned = indexer.fit(df_cleaned).transform(df_cleaned)

# Prepare feature vector
numeric_cols = ["Rank", "Episodes", "Rating", "Release_year", "End_year"]
feature_cols = numeric_cols + [col+"_index" for col in categorical_cols]

# Create vector assembler
assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features",
    handleInvalid="keep"
)

# Transform the data
final_data = assembler.transform(df_cleaned)

# Show the prepared dataset
print("Schema of prepared dataset:")
final_data.printSchema()
print("\nSample of prepared dataset:")
final_data.show(5, truncate=False)

# Save the prepared dataset if needed
# final_data.write.mode("overwrite").parquet("/path/to/save/prepared_anime_data")

Schema of prepared dataset:
root
 |-- Rank: integer (nullable = true)
 |-- Type: string (nullable = false)
 |-- Episodes: integer (nullable = false)
 |-- Release_season: string (nullable = false)
 |-- Rating: double (nullable = false)
 |-- Release_year: integer (nullable = true)
 |-- End_year: integer (nullable = false)
 |-- Type_index: double (nullable = false)
 |-- Release_season_index: double (nullable = false)
 |-- features: vector (nullable = true)


Sample of prepared dataset:
+----+-----+--------+--------------+------+------------+--------+----------+--------------------+----------------------------------+
|Rank|Type |Episodes|Release_season|Rating|Release_year|End_year|Type_index|Release_season_index|features                          |
+----+-----+--------+--------------+------+------------+--------+----------+--------------------+----------------------------------+
|1   |TV   |0       |Fall          |4.6   |NULL        |0       |0.0       |2.0                 |[1.0,0.0,4.6,NaN

In [9]:
final_data.write.mode("overwrite").parquet("/content/drive/MyDrive/Bigdata_Dataset/prepared_anime_data")

In [45]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import col, when, count
from pyspark.sql.types import DoubleType, StructType, StructField

def train_rating_classifier(df_cleaned):
    """
    Train classifier to predict rating category based on features
    """
    try:
        # 1. Check initial data
        print("\nInitial data count:", df_cleaned.count())

        # 2. Check for null values
        print("\nNull values in each column:")
        df_cleaned.select([count(when(col(c).isNull(), c)).alias(c)
                         for c in df_cleaned.columns]).show()

        # 3. Prepare features and label
        feature_cols = ["Episodes", "Release_year", "Release_season_index"]

        # Create rating categories and handle nulls
        df_ml = df_cleaned.withColumn(
            "rating_category",
            when(col("Rating").isNull(), None)
            .when(col("Rating") < 2.5, 0.0)
            .when((col("Rating") >= 2.5) & (col("Rating") < 3.5), 1.0)
            .when((col("Rating") >= 3.5) & (col("Rating") < 4.0), 2.0)
            .otherwise(3.0)
        )

        # Select and cast columns, handle nulls
        df_ml = df_ml.select(
            col("rating_category").cast("double"),
            *[when(col(c).isNull(), 0.0).otherwise(col(c)).cast("double").alias(c)
              for c in feature_cols]
        ).na.drop()  # Remove any remaining null values

        # 4. Check processed data
        print("\nProcessed data count:", df_ml.count())
        print("\nRating category distribution:")
        df_ml.groupBy("rating_category").count().orderBy("rating_category").show()

        # 5. Create feature vector
        assembler = VectorAssembler(
            inputCols=feature_cols,
            outputCol="features",
            handleInvalid="skip"
        )
        vector_data = assembler.transform(df_ml)

        # Check vector data
        print("\nVector data count:", vector_data.count())

        # 6. Split data
        train_data, test_data = vector_data.randomSplit([0.7, 0.3], seed=42)

        print("\nTraining data count:", train_data.count())
        print("Test data count:", test_data.count())

        # 7. Train model
        rf = RandomForestClassifier(
            featuresCol="features",
            labelCol="rating_category",
            numTrees=20,
            maxDepth=7
        )

        model = rf.fit(train_data)

        # 8. Evaluate model
        predictions = model.transform(test_data)
        evaluator = MulticlassClassificationEvaluator(
            labelCol="rating_category",
            predictionCol="prediction",
            metricName="accuracy"
        )
        accuracy = evaluator.evaluate(predictions)
        print(f"\nModel Accuracy: {accuracy:.4f}")

        return model, assembler

    except Exception as e:
        print(f"Error in training: {str(e)}")
        import traceback
        traceback.print_exc()
        return None, None

# Test the model
try:
    # First, let's check the data
    print("Original data overview:")
    df_cleaned.select("Rating", "Episodes", "Release_year", "Release_season_index").show(5)

    # Train model
    print("\nTraining model...")
    model, assembler = train_rating_classifier(df_cleaned)

    if model and assembler:
        # Create sample new anime data
        sample_data = [
            (12.0, 2024.0, 1.0),
            (24.0, 2024.0, 2.0),
            (13.0, 2024.0, 3.0),
            (50.0, 2024.0, 4.0)
        ]

        schema = StructType([
            StructField("Episodes", DoubleType(), True),
            StructField("Release_year", DoubleType(), True),
            StructField("Release_season_index", DoubleType(), True)
        ])

        new_anime = spark.createDataFrame(sample_data, schema=schema)

        print("\nNew Anime Data:")
        new_anime.show()

        # Make predictions
        vector_data = assembler.transform(new_anime)
        predictions = model.transform(vector_data)

        # Format predictions
        final_predictions = predictions.select(
            "Episodes",
            "Release_year",
            "Release_season_index",
            when(col("prediction") == 0.0, "Poor (< 2.5)")
            .when(col("prediction") == 1.0, "Average (2.5-3.5)")
            .when(col("prediction") == 2.0, "Good (3.5-4.0)")
            .otherwise("Excellent (> 4.0)")
            .alias("Predicted_Rating_Category")
        )

        print("\nPredictions for New Anime:")
        final_predictions.show()

except Exception as e:
    print(f"Error: {str(e)}")
    import traceback
    traceback.print_exc()

Original data overview:
+------+--------+------------+--------------------+
|Rating|Episodes|Release_year|Release_season_index|
+------+--------+------------+--------------------+
|   4.6|       0|        NULL|                 2.0|
|   4.6|      13|        NULL|                 1.0|
|  4.58|      12|        NULL|                 0.0|
|  4.58|      64|        NULL|                 1.0|
|  4.57|      10|        NULL|                 1.0|
+------+--------+------------+--------------------+
only showing top 5 rows


Training model...

Initial data count: 18495

Null values in each column:
+----+----+--------+--------------+------+------------+--------+----------+--------------------+
|Rank|Type|Episodes|Release_season|Rating|Release_year|End_year|Type_index|Release_season_index|
+----+----+--------+--------------+------+------------+--------+----------+--------------------+
|   0|   0|       0|             0|     0|       18495|       0|         0|                   0|
+----+----+--------+

In [47]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

def tune_hyperparameters(df_cleaned, assembler):
    """
    Perform hyperparameter tuning using cross-validation
    """
    try:
        # 1. Prepare data
        print("\nPreparing data for tuning...")

        # Select necessary columns and handle nulls
        feature_cols = ["Episodes", "Release_year", "Release_season_index"]
        df_ml = df_cleaned.select(
            "Rating",
            *feature_cols
        ).na.fill(0)  # Fill nulls with 0

        # Create rating categories
        df_ml = df_ml.withColumn(
            "rating_category",
            when(col("Rating") < 2.5, 0.0)
            .when((col("Rating") >= 2.5) & (col("Rating") < 3.5), 1.0)
            .when((col("Rating") >= 3.5) & (col("Rating") < 4.0), 2.0)
            .otherwise(3.0)
        )

        # Check data after transformation
        print(f"\nData count after category transformation: {df_ml.count()}")
        print("\nRating category distribution:")
        df_ml.groupBy("rating_category").count().orderBy("rating_category").show()

        # Create feature vector
        vector_data = assembler.transform(df_ml)
        print(f"\nVector data count: {vector_data.count()}")

        # Drop any rows with null features or labels
        vector_data = vector_data.na.drop(subset=["features", "rating_category"])
        print(f"Final vector data count: {vector_data.count()}")

        if vector_data.count() == 0:
            raise Exception("Dataset is empty after preprocessing")

        # 2. Create new RandomForestClassifier
        rf = RandomForestClassifier(
            featuresCol="features",
            labelCol="rating_category",
            seed=42
        )

        # 3. Create parameter grid
        paramGrid = ParamGridBuilder() \
            .addGrid(rf.numTrees, [10, 20]) \
            .addGrid(rf.maxDepth, [5, 7]) \
            .addGrid(rf.minInstancesPerNode, [1, 2]) \
            .build()

        # 4. Create evaluator
        evaluator = MulticlassClassificationEvaluator(
            labelCol="rating_category",
            predictionCol="prediction",
            metricName="accuracy"
        )

        # 5. Split data
        train_data, test_data = vector_data.randomSplit([0.7, 0.3], seed=42)
        print(f"\nTraining data count: {train_data.count()}")
        print(f"Test data count: {test_data.count()}")

        if train_data.count() == 0 or test_data.count() == 0:
            raise Exception("Training or test data is empty after split")

        # 6. Create and fit CrossValidator
        crossval = CrossValidator(
            estimator=rf,
            estimatorParamMaps=paramGrid,
            evaluator=evaluator,
            numFolds=3,
            seed=42
        )

        print("\nStarting cross-validation...")
        print("This may take a while as it tests multiple parameter combinations.")
        cvModel = crossval.fit(train_data)

        # 7. Get and evaluate best model
        best_model = cvModel.bestModel
        predictions = best_model.transform(test_data)

        # 8. Print results
        print("\nBest Model Parameters:")
        print(f"Number of Trees: {best_model.numTrees}")
        print(f"Max Depth: {best_model.maxDepth}")
        print(f"Min Instances Per Node: {best_model.minInstancesPerNode}")

        print("\nBest Model Performance:")
        metrics = {
            "accuracy": "accuracy",
            "f1": "f1",
            "precision": "weightedPrecision",
            "recall": "weightedRecall"
        }

        for metric_name, metric_value in metrics.items():
            evaluator.setMetricName(metric_value)
            score = evaluator.evaluate(predictions)
            print(f"{metric_name.title()}: {score:.4f}")

        print("\nConfusion Matrix:")
        predictions.groupBy("rating_category", "prediction") \
            .count() \
            .orderBy("rating_category", "prediction") \
            .show()

        return best_model

    except Exception as e:
        print(f"Error in hyperparameter tuning: {str(e)}")
        import traceback
        traceback.print_exc()
        return None

# Test the tuning
try:
    print("Starting hyperparameter tuning...")

    # Check if df_cleaned has data
    print(f"\nInitial data count: {df_cleaned.count()}")
    print("\nSample of initial data:")
    df_cleaned.select("Rating", "Episodes", "Release_year", "Release_season_index").show(5)

    best_model = tune_hyperparameters(df_cleaned, assembler)

    if best_model:
        print("\nHyperparameter tuning completed successfully!")

        # Test the best model with sample data
        sample_data = [
            (12.0, 2024.0, 1.0),
            (24.0, 2024.0, 2.0),
            (13.0, 2024.0, 3.0),
            (50.0, 2024.0, 4.0)
        ]

        schema = StructType([
            StructField("Episodes", DoubleType(), True),
            StructField("Release_year", DoubleType(), True),
            StructField("Release_season_index", DoubleType(), True)
        ])

        new_anime = spark.createDataFrame(sample_data, schema=schema)
        vector_data = assembler.transform(new_anime)

        predictions = best_model.transform(vector_data)
        final_predictions = predictions.select(
            "Episodes",
            "Release_year",
            "Release_season_index",
            when(col("prediction") == 0.0, "Poor (< 2.5)")
            .when(col("prediction") == 1.0, "Average (2.5-3.5)")
            .when(col("prediction") == 2.0, "Good (3.5-4.0)")
            .otherwise("Excellent (> 4.0)")
            .alias("Predicted_Rating_Category")
        )

        print("\nPredictions with Best Model:")
        final_predictions.show()

except Exception as e:
    print(f"Error: {str(e)}")
    import traceback
    traceback.print_exc()

Starting hyperparameter tuning...

Initial data count: 18495

Sample of initial data:
+------+--------+------------+--------------------+
|Rating|Episodes|Release_year|Release_season_index|
+------+--------+------------+--------------------+
|   4.6|       0|        NULL|                 2.0|
|   4.6|      13|        NULL|                 1.0|
|  4.58|      12|        NULL|                 0.0|
|  4.58|      64|        NULL|                 1.0|
|  4.57|      10|        NULL|                 1.0|
+------+--------+------------+--------------------+
only showing top 5 rows


Preparing data for tuning...

Data count after category transformation: 18495

Rating category distribution:
+---------------+-----+
|rating_category|count|
+---------------+-----+
|            0.0| 3484|
|            1.0| 9813|
|            2.0| 4298|
|            3.0|  900|
+---------------+-----+


Vector data count: 18495
Final vector data count: 18495

Training data count: 13020
Test data count: 5475

Starting c