In [1]:
!pip install pyspark



In [2]:
# Example: Linear Regression with Spark MLlib
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

# Initialize Spark Session
spark = SparkSession.builder.appName('MLlib Example').getOrCreate()

# Load sample data
data = [(1, 5.0, 20.0), (2, 10.0, 25.0), (3, 15.0, 30.0), (4, 20.0, 35.0)]
columns = ['ID', 'Feature', 'Target']
df = spark.createDataFrame(data, columns)

# Prepare data for modeling
assembler = VectorAssembler(inputCols=['Feature'], outputCol='Features')
df_transformed = assembler.transform(df)

# Train a linear regression model
lr = LinearRegression(featuresCol='Features', labelCol='Target')
model = lr.fit(df_transformed)

# Print model coefficients
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')

Coefficients: [0.9999999999999992]
Intercept: 15.000000000000009


In [3]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression

# Buat SparkSession
spark = SparkSession.builder.appName("Logistic Regression Example").getOrCreate()

# Contoh dataset
data = [
    (1, 2.0, 3.0, 0),
    (2, 1.0, 5.0, 1),
    (3, 2.5, 4.5, 1),
    (4, 3.0, 6.0, 0)
]
columns = ['ID', 'Feature1', 'Feature2', 'Label']
df = spark.createDataFrame(data, columns)

# Menggabungkan fitur menjadi satu kolom vektor
assembler = VectorAssembler(inputCols=['Feature1', 'Feature2'], outputCol='Features')
df = assembler.transform(df)

# Inisialisasi dan latih model Logistic Regression
lr = LogisticRegression(featuresCol='Features', labelCol='Label')
model = lr.fit(df)

# Tampilkan koefisien dan intercept
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')

# Menutup SparkSession
spark.stop()

Coefficients: [-12.262057929180484,4.087352266486688]
Intercept: 11.56891272665312


In [4]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans

# Buat SparkSession
spark = SparkSession.builder.appName("KMeans Clustering Example").getOrCreate()

# Contoh dataset
data = [
    (1, 1.0, 1.0),
    (2, 5.0, 5.0),
    (3, 10.0, 10.0),
    (4, 15.0, 15.0)
]
columns = ['ID', 'Feature1', 'Feature2']
df = spark.createDataFrame(data, columns)

# Menggabungkan fitur menjadi satu kolom vektor
assembler = VectorAssembler(inputCols=['Feature1', 'Feature2'], outputCol='Features')
df = assembler.transform(df)

# Latih model KMeans clustering
kmeans = KMeans(featuresCol='Features', k=2)
model = kmeans.fit(df)

# Tampilkan pusat cluster
centers = model.clusterCenters()
print(f'Cluster Centers: {centers}')

# Menutup SparkSession
spark.stop()

Cluster Centers: [array([12.5, 12.5]), array([3., 3.])]


# HOMEWORK

1. Load a Real-World Dataset into Spark and Prepare it for Machine Learning Tasks

In [10]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col

# Inisialisasi Spark session
spark = SparkSession.builder.appName("Netflix Classification").getOrCreate()

# Memuat dataset Netflix (misalnya dari file CSV)
data = spark.read.csv("Netflix_Movies_and_TV_Shows.csv", header=True, inferSchema=True)

# Menampilkan beberapa baris pertama dataset untuk memverifikasi data
data.show()

+--------+-------+-----------+------------+------+---------+--------------+
|   Title|   Type|      Genre|Release Year|Rating| Duration|       Country|
+--------+-------+-----------+------------+------+---------+--------------+
| Title 1|TV Show|     Comedy|        1955|    PG|3 Seasons|         Japan|
| Title 2|TV Show|     Horror|        2020|     G|3 Seasons|         India|
| Title 3|TV Show|     Action|        1966| TV-PG|  140 min| United States|
| Title 4|  Movie|   Thriller|        2011| PG-13|3 Seasons|        Canada|
| Title 5|TV Show|    Romance|        1959| TV-14|  172 min|         India|
| Title 6|  Movie|     Action|        2007| PG-13|3 Seasons|         Japan|
| Title 7|  Movie|    Romance|        1977| TV-14|   68 min| United States|
| Title 8|  Movie|     Comedy|        1971| TV-PG|  104 min|         Japan|
| Title 9|  Movie|      Drama|        2000| PG-13|2 Seasons|        Canada|
|Title 10|  Movie|   Thriller|        1975| TV-MA|1 Seasons|         India|
|Title 11|TV

In [11]:
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.sql import SparkSession

# Membuat sesi Spark
spark = SparkSession.builder.appName("FeatureEngineering").getOrCreate()

# Menghapus baris dengan nilai yang hilang
data = data.na.drop()

# Mengubah kolom kategori menjadi numerik
indexer_genre = StringIndexer(inputCol="Genre", outputCol="Genre_index")
indexer_country = StringIndexer(inputCol="Country", outputCol="Country_index")

# Menerapkan StringIndexer untuk setiap kolom
data = indexer_genre.fit(data).transform(data)
data = indexer_country.fit(data).transform(data)

# Menggabungkan fitur menjadi satu vektor
assembler = VectorAssembler(inputCols=["Genre_index", "Country_index"], outputCol="features_vector")

# Menghasilkan vektor fitur
final_data = assembler.transform(data)

# Menampilkan 10 baris pertama
final_data.show(10)

+--------+-------+--------+------------+------+---------+-------------+-----------+-------------+---------------+
|   Title|   Type|   Genre|Release Year|Rating| Duration|      Country|Genre_index|Country_index|features_vector|
+--------+-------+--------+------------+------+---------+-------------+-----------+-------------+---------------+
| Title 1|TV Show|  Comedy|        1955|    PG|3 Seasons|        Japan|        4.0|          7.0|      [4.0,7.0]|
| Title 2|TV Show|  Horror|        2020|     G|3 Seasons|        India|        0.0|          6.0|      [0.0,6.0]|
| Title 3|TV Show|  Action|        1966| TV-PG|  140 min|United States|        6.0|          5.0|      [6.0,5.0]|
| Title 4|  Movie|Thriller|        2011| PG-13|3 Seasons|       Canada|        1.0|          0.0|      [1.0,0.0]|
| Title 5|TV Show| Romance|        1959| TV-14|  172 min|        India|        3.0|          6.0|      [3.0,6.0]|
| Title 6|  Movie|  Action|        2007| PG-13|3 Seasons|        Japan|        6.0|     

2. Build a classification model using Spark MLlib and evaluate its performance.

In [3]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Membuat sesi Spark
spark = SparkSession.builder.appName("NetflixClassification").getOrCreate()

# Membaca dataset
data = spark.read.csv("Netflix_Movies_and_TV_Shows.csv", header=True, inferSchema=True).na.drop()

# Preprocessing data
data = StringIndexer(inputCol="Type", outputCol="label").fit(data).transform(data)
data = StringIndexer(inputCol="Genre", outputCol="Genre_index").fit(data).transform(data)
data = StringIndexer(inputCol="Country", outputCol="Country_index").fit(data).transform(data)
data = VectorAssembler(inputCols=["Genre_index", "Country_index", "Release Year"], outputCol="features").transform(data)

# Memisahkan data
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

# Membuat dan melatih model
lr = LogisticRegression(featuresCol="features", labelCol="label")
model_lr = lr.fit(train_data)

# Membuat prediksi
predictions = model_lr.transform(test_data)

# Mengevaluasi model
accuracy = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy").evaluate(predictions)

# Menampilkan akurasi
print(f"Akurasi: {accuracy}")

Akurasi: 0.5373406193078324


3. Explore hyperparameter tuning using cross-validation.

In [8]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Membuat sesi Spark
spark = SparkSession.builder.appName("NetflixHyperparameterTuning").getOrCreate()

# Membaca dataset
data = spark.read.csv("Netflix_Movies_and_TV_Shows.csv", header=True, inferSchema=True).na.drop()

# Preprocessing data
data = StringIndexer(inputCol="Type", outputCol="label").fit(data).transform(data)
data = StringIndexer(inputCol="Genre", outputCol="Genre_index").fit(data).transform(data)
data = StringIndexer(inputCol="Country", outputCol="Country_index").fit(data).transform(data)
data = VectorAssembler(inputCols=["Genre_index", "Country_index", "Release Year"], outputCol="features").transform(data)

# Memisahkan data
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

# Membuat model Logistic Regression
lr = LogisticRegression(featuresCol="features", labelCol="label")

# Membuat parameter grid untuk hyperparameter tuning
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.1, 1.0]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

# Evaluator untuk cross-validation
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")

# Membuat CrossValidator
crossval = CrossValidator(estimator=lr,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=5)  # 5-fold cross-validation

# Melakukan cross-validation untuk mencari model terbaik
cv_model = crossval.fit(train_data)

# Memprediksi pada data pengujian
predictions = cv_model.transform(test_data)

# Mengevaluasi akurasi model terbaik
accuracy = evaluator.evaluate(predictions)

# Menampilkan akurasi dan parameter terbaik
print(f"Akurasi Model Terbaik: {accuracy}")
print(f"Hyperparameter Terbaik: {cv_model.bestModel.explainParams()}")

Akurasi Model Terbaik: 0.5209471766848816
Hyperparameter Terbaik: aggregationDepth: suggested depth for treeAggregate (>= 2). (default: 2)
elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty. (default: 0.0, current: 0.5)
family: The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial (default: auto)
featuresCol: features column name. (default: features, current: features)
fitIntercept: whether to fit an intercept term. (default: True)
labelCol: label column name. (default: label, current: label)
lowerBoundsOnCoefficients: The lower bounds on coefficients if fitting under bound constrained optimization. The bound matrix must be compatible with the shape (1, number of features) for binomial regression, or (number of classes, number of features) for multinomial regression. (undefined)
lowerBoundsOnIntercepts: The