In [10]:
!pip install findspark
!pip install pyspark numpy



In [1]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

# Initialize Spark Session
spark = SparkSession.builder.appName("AdvancedML").getOrCreate()

# Sample data for regression
# Format: (ID, Feature, Label)
data = [(1, 5.0, 20.0), (2, 10.0, 40.0), (3, 15.0, 60.0)]
columns = ["ID", "Feature", "Label"]
df = spark.createDataFrame(data, columns)

# Prepare features using VectorAssembler
# Spark ML mengharuskan fitur input digabung menjadi satu kolom vektor
assembler = VectorAssembler(inputCols=["Feature"], outputCol="Features")
df = assembler.transform(df)

# Train linear regression model
lr = LinearRegression(featuresCol="Features", labelCol="Label")
model = lr.fit(df)

# Display coefficients and intercept
print(f"Coefficients: {model.coefficients}")
print(f"Intercept: {model.intercept}")

25/12/02 19:38:01 WARN Utils: Your hostname, asyrof-VirtualBox resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
25/12/02 19:38:01 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/02 19:38:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/12/02 19:38:16 WARN Instrumentation: [42b2acc7] regParam is zero, which might cause numerical instability and overfitting.
25/12/02 19:38:21 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
25/12/02 19:38:21 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


Coefficients: [4.000000000000004]
Intercept: -5.076362500572609e-14


In [6]:
from pyspark.ml.classification import LogisticRegression

data = [(1, 2.0, 0), (2, 2.5, 1), (3, 1.5, 0), (4, 3.0, 1)]
columns = ["ID", "Feature", "Label"]
df = spark.createDataFrame(data, columns)

assembler = VectorAssembler(inputCols=["Feature"], outputCol="Features")
df = assembler.transform(df)

lr = LogisticRegression(featuresCol="Features", labelCol="Label")
model = lr.fit(df)

print(f"Coefficients: {model.coefficients}")
print(f"Intercept: {model.intercept}")


Coefficients: [67.77891676420779]
Intercept: -152.51530779434026


In [7]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.linalg import Vectors

data = [(1, Vectors.dense([1.0, 1.0])), (2, Vectors.dense([5.0, 5.0])), 
        (3, Vectors.dense([10.0, 10.0])), (4, Vectors.dense([15.0, 15.0]))]
columns = ["ID", "Features"]
df = spark.createDataFrame(data, columns)

kmeans = KMeans(featuresCol="Features", k=2)
model = kmeans.fit(df)

centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

Cluster Centers: 
[12.5 12.5]
[3. 3.]


In [5]:
from pyspark.sql.functions import col
from pyspark.ml.feature import VectorAssembler

# Initialize Spark Session (Wajib jika belum inisialisasi)
try:
    spark
except NameError:
    spark = SparkSession.builder.appName("MLlibHomework").getOrCreate()

file_path = "file:///home/asyrof/Downloads/games.csv"

# Load Dataset games.csv
df_games = spark.read.csv(file_path, header=True, inferSchema=True)

# Data Cleaning: Hapus baris yang memiliki nilai null di kolom penting
df_games = df_games.na.drop(subset=["price_final", "user_reviews", "positive_ratio", "rating"])

# Konversi tipe data numerik jika diperlukan (untuk menjamin perhitungan ML)
df_games = df_games.withColumn("user_reviews", col("user_reviews").cast("double"))
df_games = df_games.withColumn("price_final", col("price_final").cast("double"))

df_games.printSchema()

root
 |-- app_id: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- date_release: date (nullable = true)
 |-- win: boolean (nullable = true)
 |-- mac: boolean (nullable = true)
 |-- linux: boolean (nullable = true)
 |-- rating: string (nullable = true)
 |-- positive_ratio: integer (nullable = true)
 |-- user_reviews: double (nullable = true)
 |-- price_final: double (nullable = true)
 |-- price_original: double (nullable = true)
 |-- discount: double (nullable = true)
 |-- steam_deck: boolean (nullable = true)



In [6]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

# 1. Feature Engineering
# Menggabungkan 'user_reviews' dan 'positive_ratio' menjadi satu kolom vektor 'Features'
assembler_lr = VectorAssembler(
    inputCols=["user_reviews", "positive_ratio"], 
    outputCol="Features"
)
df_lr = assembler_lr.transform(df_games)

# 2. Split Data (70% Training, 30% Testing)
train_data, test_data = df_lr.randomSplit([0.7, 0.3], seed=42)

# 3. Modeling
# LabelCol adalah target yang ingin diprediksi (Harga)
lr = LinearRegression(featuresCol="Features", labelCol="price_final")
lr_model = lr.fit(train_data)

# 4. Evaluasi
print("--- Hasil Linear Regression ---")
print(f"Coefficients (Pengaruh Review & Ratio): {lr_model.coefficients}")
print(f"Intercept (Harga Dasar): {lr_model.intercept}")

# Melihat hasil prediksi pada data test
predictions = lr_model.transform(test_data)
predictions.select("price_final", "prediction").show(5)

25/12/02 19:40:43 WARN Instrumentation: [1532d316] regParam is zero, which might cause numerical instability and overfitting.
                                                                                

--- Hasil Linear Regression ---
Coefficients (Pengaruh Review & Ratio): [7.621025326749801e-06,-0.005823325571842184]
Intercept (Harga Dasar): 9.012476896394295
+-----------+------------------+
|price_final|        prediction|
+-----------+------------------+
|       4.99| 8.565824317717862|
|       4.99| 8.562742697844717|
|       9.99| 8.686038510196534|
|       9.99| 8.583495612031664|
|       10.0|12.825664735383562|
+-----------+------------------+
only showing top 5 rows



                                                                                

In [7]:
from pyspark.ml.classification import LogisticRegression
from pyspark.sql.functions import when
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# 1. Label Engineering
# Kita buat kolom baru 'is_high_rated': 1 jika positive_ratio > 80, 0 jika tidak.
df_log = df_games.withColumn("label_class", when(df_games["positive_ratio"] > 80, 1).otherwise(0))

# 2. Feature Engineering
# Menggunakan 'price_final' dan 'user_reviews' sebagai fitur prediksi
assembler_log = VectorAssembler(
    inputCols=["price_final", "user_reviews"], 
    outputCol="Features"
)
df_log_final = assembler_log.transform(df_log)

# 3. Split Data
train_log, test_log = df_log_final.randomSplit([0.8, 0.2], seed=42)

# 4. Modeling
log_reg = LogisticRegression(featuresCol="Features", labelCol="label_class")
log_model = log_reg.fit(train_log)

# 5. Evaluasi
print("\n--- Hasil Logistic Regression ---")
predictions_log = log_model.transform(test_log)
predictions_log.select("price_final", "user_reviews", "label_class", "prediction").show(5)

evaluator_log = BinaryClassificationEvaluator(labelCol="label_class")
auc = evaluator_log.evaluate(predictions_log)
print(f"Area Under ROC (AUC): {auc:.4f}")

                                                                                


--- Hasil Logistic Regression ---
+-----------+------------+-----------+----------+
|price_final|user_reviews|label_class|prediction|
+-----------+------------+-----------+----------+
|       4.99|      1757.0|          0|       1.0|
|       4.99|     10522.0|          1|       1.0|
|       9.99|     10654.0|          0|       1.0|
|       10.0|    574470.0|          1|       1.0|
|       4.99|      1282.0|          1|       1.0|
+-----------+------------+-----------+----------+
only showing top 5 rows



                                                                                

Area Under ROC (AUC): 0.5309


In [9]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import ClusteringEvaluator

# 1. Feature Engineering
# Kita hanya menggunakan fitur numerik untuk clustering
assembler_km = VectorAssembler(
    inputCols=["price_final", "user_reviews"], 
    outputCol="Features"
)
df_km = assembler_km.transform(df_games)

# 2. Modeling
# k=3: Kita ingin mencari 3 tipe kelompok game (misal: Murah-Sepi, Mahal-Populer, dsb)
# PERBAIKAN ERROR: Menggunakan featuresCol="Features" (dengan F kapital)
kmeans = KMeans(featuresCol="Features", k=3)
model_km = kmeans.fit(df_km)

# 3. Hasil Clustering
predictions_km = model_km.transform(df_km)
centers = model_km.clusterCenters()

# 4. Evaluasi (Silhouette Score)
# PERBAIKAN ERROR: Menggunakan featuresCol="Features" (dengan F kapital)
evaluator_km = ClusteringEvaluator(featuresCol="Features")
silhouette = evaluator_km.evaluate(predictions_km)

print("\n--- Hasil K-Means Clustering ---")
print("Pusat Cluster (Centroids - [Price, Reviews]):")
for i, center in enumerate(centers):
    print(f"Cluster {i}: {center}")
print(f"\nSilhouette Score: {silhouette:.4f}")

# Menampilkan game masuk ke cluster mana
predictions_km.select("title", "price_final", "user_reviews", "prediction").show(5)


--- Hasil K-Means Clustering ---
Pusat Cluster (Centroids - [Price, Reviews]):
Cluster 0: [   8.61745314 1270.21603761]
Cluster 1: [1.50000e+01 7.49446e+06]
Cluster 2: [1.36071429e+01 7.40574786e+05]

Silhouette Score: 0.9992
+--------------------+-----------+------------+----------+
|               title|price_final|user_reviews|prediction|
+--------------------+-----------+------------+----------+
|Prince of Persia:...|       9.99|      2199.0|         0|
|BRINK: Agents of ...|       2.99|        21.0|         0|
|Monaco: What's Yo...|      14.99|      3722.0|         0|
|  Escape Dead Island|      14.99|       873.0|         0|
|Dungeon of the EN...|      11.99|      8784.0|         0|
+--------------------+-----------+------------+----------+
only showing top 5 rows

