In [36]:
# ============================================================
# 1. IMPORT MODULE
# ============================================================
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, isnan, count
from pyspark.sql.types import DoubleType, IntegerType

# MLlib
from pyspark.ml.feature import (
    StringIndexer, OneHotEncoder, VectorAssembler
)
from pyspark.ml.classification import (
    LogisticRegression, RandomForestClassifier
)
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import (
    MulticlassClassificationEvaluator,
    BinaryClassificationEvaluator
)
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator


# ============================================================
# 2. DEFINE SPARK SESSION
# ============================================================
spark = SparkSession.builder \
    .appName("Titanic_MLlib") \
    .master("local[*]") \
    .config("spark.hadoop.fs.defaultFS", "file:///") \
    .getOrCreate()

print("Spark session created.")


# ============================================================
# 3. LOAD TITANIC DATASET
# ============================================================
df = spark.read.csv(
    "file:///home/andra/Downloads/titanic_cleaned.csv",
    header=True,
    inferSchema=True
)

print("Dataset Loaded.")
df.show(10)
df.printSchema()


# ============================================================
# 4. CEK MISSING VALUE
# ============================================================
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()


# ============================================================
# 5. DETECT CATEGORICAL & NUMERIC COLUMNS
# ============================================================
numeric_cols = [c for c, dtype in df.dtypes if dtype in ("double", "int")]
categorical_cols = [c for c, dtype in df.dtypes if dtype == "string"]

print("Numeric Columns:", numeric_cols)
print("Categorical Columns:", categorical_cols)


25/12/04 03:31:11 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


Spark session created.
Dataset Loaded.
+-----------+--------+------+--------------------+------+-----------------+-----+-----+----------------+-------+--------+--------------------+------------+
|PassengerId|Survived|Pclass|                Name|   Sex|              Age|SibSp|Parch|          Ticket|   Fare|Embarked|           Fare_norm|Fare_outlier|
+-----------+--------+------+--------------------+------+-----------------+-----+-----+----------------+-------+--------+--------------------+------------+
|          1|       0|     3|Braund, Mr. Owen ...|  Male|             22.0|    1|    0|       A/5 21171|   7.25|       S|0.014151057562208049|       false|
|          2|       1|     1|Cumings, Mrs. Joh...|Female|             38.0|    1|    0|        PC 17599|71.2833|       C| 0.13913573538264068|        true|
|          3|       1|     3|Heikkinen, Miss. ...|Female|             26.0|    0|    0|STON/O2. 3101282|  7.925|       S|0.015468569817999833|       false|
|          4|       1|   

In [36]:
# ============================================================
# 1. IMPORT MODULE
# ============================================================
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, isnan, count
from pyspark.sql.types import DoubleType, IntegerType

# MLlib
from pyspark.ml.feature import (
    StringIndexer, OneHotEncoder, VectorAssembler
)
from pyspark.ml.classification import (
    LogisticRegression, RandomForestClassifier
)
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import (
    MulticlassClassificationEvaluator,
    BinaryClassificationEvaluator
)
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator


# ============================================================
# 2. DEFINE SPARK SESSION
# ============================================================
spark = SparkSession.builder \
    .appName("Titanic_MLlib") \
    .master("local[*]") \
    .config("spark.hadoop.fs.defaultFS", "file:///") \
    .getOrCreate()

print("Spark session created.")


# ============================================================
# 3. LOAD TITANIC DATASET
# ============================================================
df = spark.read.csv(
    "file:///home/andra/Downloads/titanic_cleaned.csv",
    header=True,
    inferSchema=True
)

print("Dataset Loaded.")
df.show(10)
df.printSchema()


# ============================================================
# 4. CEK MISSING VALUE
# ============================================================
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()


# ============================================================
# 5. DETECT CATEGORICAL & NUMERIC COLUMNS
# ============================================================
numeric_cols = [c for c, dtype in df.dtypes if dtype in ("double", "int")]
categorical_cols = [c for c, dtype in df.dtypes if dtype == "string"]

print("Numeric Columns:", numeric_cols)
print("Categorical Columns:", categorical_cols)


25/12/04 03:31:11 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


Spark session created.
Dataset Loaded.
+-----------+--------+------+--------------------+------+-----------------+-----+-----+----------------+-------+--------+--------------------+------------+
|PassengerId|Survived|Pclass|                Name|   Sex|              Age|SibSp|Parch|          Ticket|   Fare|Embarked|           Fare_norm|Fare_outlier|
+-----------+--------+------+--------------------+------+-----------------+-----+-----+----------------+-------+--------+--------------------+------------+
|          1|       0|     3|Braund, Mr. Owen ...|  Male|             22.0|    1|    0|       A/5 21171|   7.25|       S|0.014151057562208049|       false|
|          2|       1|     1|Cumings, Mrs. Joh...|Female|             38.0|    1|    0|        PC 17599|71.2833|       C| 0.13913573538264068|        true|
|          3|       1|     3|Heikkinen, Miss. ...|Female|             26.0|    0|    0|STON/O2. 3101282|  7.925|       S|0.015468569817999833|       false|
|          4|       1|   

In [37]:
# ============================================================
# 1. LABEL COLUMN (Survived)
# ============================================================
label_col = "Survived"

# ============================================================
# 2. CATEGORICAL + NUMERIC COLUMNS
# ============================================================
numeric_cols = ['PassengerId', 'Pclass', 'Age', 'SibSp', 'Parch', 'Fare', 'Fare_norm']
categorical_cols = ['Name', 'Sex', 'Ticket', 'Embarked']

# ============================================================
# 3. STRING INDEXING FOR CATEGORICAL
# ============================================================
indexers = [
    StringIndexer(inputCol=c, outputCol=c + "_index", handleInvalid="keep")
    for c in categorical_cols
]

# ============================================================
# 4. ONE-HOT ENCODING
# ============================================================
encoders = [
    OneHotEncoder(inputCol=c + "_index", outputCol=c + "_vec")
    for c in categorical_cols
]

# ============================================================
# 5. ASSEMBLE FINAL FEATURES
# ============================================================
feature_cols = numeric_cols + [c + "_vec" for c in categorical_cols]

assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features"
)

# ============================================================
# 6. RANDOM FOREST MODEL
# ============================================================
rf = RandomForestClassifier(
    labelCol="label",
    featuresCol="features",
    numTrees=100
)

# Label indexing
label_indexer = StringIndexer(inputCol=label_col, outputCol="label")

# ============================================================
# 7. PIPELINE
# ============================================================
pipeline = Pipeline(
    stages=indexers + encoders + [label_indexer, assembler, rf]
)

# ============================================================
# 8. TRAIN TEST SPLIT
# ============================================================
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)
print("Train:", train_df.count(), "Test:", test_df.count())

# ============================================================
# 9. PARAM GRID (HYPERPARAMETER TUNING)
# ============================================================
paramGrid = (
    ParamGridBuilder()
    .addGrid(rf.numTrees, [100, 200])
    .addGrid(rf.maxDepth, [5, 10])
    .build()
)

evaluator = MulticlassClassificationEvaluator(
    labelCol="label",
    metricName="accuracy"
)

cv = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=3,
    parallelism=2
)

# ============================================================
# 10. TRAIN MODEL (CROSS VALIDATION)
# ============================================================
cv_model = cv.fit(train_df)
best_model = cv_model.bestModel

# ============================================================
# 11. PREDICT
# ============================================================
pred = best_model.transform(test_df)
pred.select("Survived", "label", "prediction").show(20)

# ============================================================
# 12. EVALUATION
# ============================================================
accuracy = evaluator.evaluate(pred)
evaluator_f1 = MulticlassClassificationEvaluator(labelCol="label", metricName="f1")
f1 = evaluator_f1.evaluate(pred)

print("Accuracy:", accuracy)
print("F1 Score:", f1)

# CONFUSION MATRIX
pred.groupBy("label", "prediction").count().show()


Train: 746 Test: 145


25/12/04 03:33:02 WARN DAGScheduler: Broadcasting large task binary with size 1121.4 KiB
25/12/04 03:33:02 WARN DAGScheduler: Broadcasting large task binary with size 1281.1 KiB
25/12/04 03:33:03 WARN DAGScheduler: Broadcasting large task binary with size 1445.0 KiB
25/12/04 03:33:04 WARN DAGScheduler: Broadcasting large task binary with size 1368.5 KiB
25/12/04 03:33:11 WARN DAGScheduler: Broadcasting large task binary with size 1128.9 KiB
25/12/04 03:33:11 WARN DAGScheduler: Broadcasting large task binary with size 1286.5 KiB
25/12/04 03:33:12 WARN DAGScheduler: Broadcasting large task binary with size 1451.3 KiB
25/12/04 03:33:13 WARN DAGScheduler: Broadcasting large task binary with size 1380.3 KiB
25/12/04 03:33:20 WARN DAGScheduler: Broadcasting large task binary with size 1059.8 KiB
25/12/04 03:33:20 WARN DAGScheduler: Broadcasting large task binary with size 1209.3 KiB
25/12/04 03:33:20 WARN DAGScheduler: Broadcasting large task binary with size 1366.7 KiB
25/12/04 03:33:21 WAR

+--------+-----+----------+
|Survived|label|prediction|
+--------+-----+----------+
|       1|  1.0|       0.0|
|       0|  0.0|       0.0|
|       1|  1.0|       1.0|
|       0|  0.0|       0.0|
|       1|  1.0|       0.0|
|       1|  1.0|       0.0|
|       0|  0.0|       0.0|
|       0|  0.0|       0.0|
|       0|  0.0|       0.0|
|       0|  0.0|       0.0|
|       1|  1.0|       0.0|
|       0|  0.0|       0.0|
|       0|  0.0|       0.0|
|       1|  1.0|       0.0|
|       0|  0.0|       0.0|
|       0|  0.0|       0.0|
|       0|  0.0|       0.0|
|       0|  0.0|       0.0|
|       0|  0.0|       0.0|
|       0|  0.0|       0.0|
+--------+-----+----------+
only showing top 20 rows



25/12/04 03:33:28 WARN DAGScheduler: Broadcasting large task binary with size 1423.3 KiB


Accuracy: 0.7724137931034483
F1 Score: 0.7588230595127146


25/12/04 03:33:28 WARN DAGScheduler: Broadcasting large task binary with size 1421.6 KiB


+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  1.0|       1.0|   36|
|  0.0|       1.0|    2|
|  1.0|       0.0|   31|
|  0.0|       0.0|   76|
+-----+----------+-----+



25/12/04 03:33:29 WARN DAGScheduler: Broadcasting large task binary with size 1391.2 KiB
