In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline

In [2]:
# Create SparkSession
spark = SparkSession.builder.appName("fb_live_thailand.csv").getOrCreate()


In [4]:
# Load data file into DataFrame (replace 'FBLiveTH.csv' with your actual file)
df = spark.read.csv("fb_live_thailand.csv", header=True, inferSchema=True)


In [12]:
# StringIndexer for status_type and status_published
indexer_type = StringIndexer(inputCol="status_type", outputCol="status_type_ind")
indexer_pub = StringIndexer(inputCol="status_published", outputCol="status_published_ind", handleInvalid="keep")

In [6]:
# OneHotEncoder for indexed columns
encoder = OneHotEncoder(
    inputCols=["status_type_ind", "status_published_ind"],
    outputCols=["status_type_enc", "status_published_enc"]
)

In [7]:
# VectorAssembler for features
assembler = VectorAssembler(
    inputCols=["status_type_enc", "status_published_enc"],
    outputCol="features"
)


In [19]:
# Decision Tree Classifier
dt = DecisionTreeClassifier(labelCol="status_type_ind", featuresCol="features")

# Pipeline
pipeline = Pipeline(stages=[indexer_type, indexer_pub, encoder, assembler, dt])

# Split data into train and test sets
train, test = df.randomSplit([0.7, 0.3], seed=42)

# Fit pipeline model on the training data
model = pipeline.fit(train)

# Transform both train and test sets using the fitted model
transformed_train = model.transform(train)
predictions = model.transform(test)

# Evaluator
evaluator = MulticlassClassificationEvaluator(
    labelCol="status_type_ind",
    predictionCol="prediction"
)

In [20]:
accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})
precision = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})
recall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})
f1 = evaluator.evaluate(predictions, {evaluator.metricName: "f1"})
test_error = 1.0 - accuracy

print(f"Accuracy: {accuracy}")
print(f"Precision: {precision}")
print(f"Recall: {recall}")
print(f"F1 Score: {f1}")
print(f"Test Error: {test_error}")

spark.stop()

Accuracy: 1.0
Precision: 1.0
Recall: 1.0
F1 Score: 1.0
Test Error: 0.0
