# Klasifikasi Artikel menggunakan Apache Spark

In [1]:
import numpy as np

from pyspark.sql import SparkSession
from pyspark.sql import SQLContext

import plotly.express as px
import plotly.graph_objs as go

In [2]:
# buat instance Apache Spark
# saya pakai di local (i5-10400@2.90 GHz, 24 GB RAM, 4 TB HDD)
spark = SparkSession.builder \
    .master('local[*]') \
    .appName('tugaspda') \
    .config('spark.executor.memory', '12gb') \
    .getOrCreate()

# SQL context untuk eksekusi perintah SQL
sc = SQLContext(spark)

In [3]:
# baca dataset parquet
df = spark.read.load("dataset/dataset_trial2.parquet").cache()

In [4]:
# sampel 5 data teratas
df.show(5)

+--------------------+--------------+--------------------+-------------+--------------------+
|                 url|        author|               title|     category|             content|
+--------------------+--------------+--------------------+-------------+--------------------+
|https://uzone.id/...|Birgitta Ajeng|10 Hashtag Terpop...|     Digilife|Uzone.id - Twitte...|
|https://uzone.id/...|  Niken Nurani|7 Idola K-Pop yan...|        Music|Bisa menduduki pe...|
|https://uzone.id/...|      arah.com|Ada Penampakan Ga...|   Technology|Baru-baru ini Bad...|
|https://uzone.id/...|      arah.com|Aktris Hollywood ...|Entertainment|Di Hollywood, pen...|
|https://uzone.id/...| Tomy Tresnady|Andhara Early Kec...|     Digilife|Ilustrasi (Foto: ...|
+--------------------+--------------+--------------------+-------------+--------------------+
only showing top 5 rows



In [5]:
# hitung berapa banyak 
categories_count_df = df.groupBy("category").count().toPandas().sort_values("count")
all_categories_mean = categories_count_df["count"].mean()
categories_count = len(categories_count_df)

fig = px.bar(categories_count_df, x="category", y="count")
fig.add_shape(go.layout.Shape(type="line", x0=-.5, y0=all_categories_mean, x1=categories_count, y1=all_categories_mean, line=dict(color="Orange", dash="dash")))
fig.add_annotation(x=2, y=all_categories_mean, text=f"Rata-rata: {all_categories_mean:.2f}", showarrow=False, yshift=20)

fig.show()

# Apache Spark MLib

In [6]:
from pyspark.ml import Pipeline

from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import HashingTF
from pyspark.ml.feature import IDF

from pyspark.ml.classification import NaiveBayes
from pyspark.ml.classification import LogisticRegression

from pyspark.ml.evaluation import MulticlassClassificationEvaluator

from pyspark.ml.tuning import CrossValidator
from pyspark.ml.tuning import ParamGridBuilder

In [7]:
# stopwords khusus untuk bahasa Indonesia
stopwords_indonesian = []
with open("dataset/stopwords-id.txt") as f:
    stopwords_indonesian = [word for word in f]

In [8]:
# tambahkan kolom banyaknya kategori di setiap baris
# lebih mudah menggunakan SQL dibandingkan menggunakan Window di PySpark
df.registerTempTable('original_pages')
df_input = sc.sql('SELECT content, category, COUNT(category) OVER (PARTITION BY category) AS count FROM original_pages ORDER BY category')

In [9]:
# pilih kategori dalam list
#df_selected = df_input.filter(df["category"].isin(["Technology", "Auto", "Lifestyle", "Sport", "News"]))

# pilih data jika kategori tersebut terdapat lebih atau sama dengan 100 rows
df_selected = df_input.filter(df_input["count"] >= 100)

# hitung berapa banyak sampel untuk tiap kategori
print("Total data untuk diproses: ", df_selected.count())
df_selected.groupBy("category").count().show()

Total data untuk diproses:  1778
+-------------+-----+
|     category|count|
+-------------+-----+
|Entertainment|  431|
|         Film|  109|
|         Auto|  202|
|   Technology|  194|
|    Lifestyle|  172|
|        Sport|  270|
|         News|  400|
+-------------+-----+



In [10]:
# melakukan label encoding
indexer = StringIndexer(inputCol="category", outputCol="label")
df_input = indexer.fit(df_selected).transform(df_selected)

In [11]:
# tokenisasi
tokenizer = Tokenizer(inputCol="content", outputCol="rawWords")
df_tokenized = tokenizer.transform(df_input)

# menghapus stopword
stopwords_remover = StopWordsRemover(inputCol="rawWords", outputCol="words", stopWords=stopwords_indonesian)
df_stopwords = stopwords_remover.transform(df_tokenized)

# menghitung TF
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=40)
df_tf = hashingTF.transform(df_stopwords)

# menghitung IDF dari TF
idf = IDF(inputCol="rawFeatures", outputCol="features")
df_tfidf = idf.fit(df_tf).transform(df_tf)

# menampilkan 5 kolom teratas
df_tfidf.show(5)

+--------------------+--------+-----+-----+--------------------+--------------------+--------------------+--------------------+
|             content|category|count|label|            rawWords|               words|         rawFeatures|            features|
+--------------------+--------+-----+-----+--------------------+--------------------+--------------------+--------------------+
|Honda City Uzone....|    Auto|  202|  3.0|[honda, city, uzo...|[honda, city, uzo...|(40,[0,1,2,3,4,5,...|(40,[0,1,2,3,4,5,...|
|Uzone.id— PT Astr...|    Auto|  202|  3.0|[uzone.id—, pt, a...|[uzone.id—, pt, a...|(40,[0,1,2,3,4,5,...|(40,[0,1,2,3,4,5,...|
|Perusahaan layana...|    Auto|  202|  3.0|[perusahaan, laya...|[perusahaan, laya...|(40,[0,1,2,3,4,5,...|(40,[0,1,2,3,4,5,...|
|PT Blue Bird Tbk ...|    Auto|  202|  3.0|[pt, blue, bird, ...|[pt, blue, bird, ...|(40,[0,1,2,3,4,5,...|(40,[0,1,2,3,4,5,...|
|Kementerian Perhu...|    Auto|  202|  3.0|[kementerian, per...|[kementerian, per...|(40,[0,1,2,3,4,5,..

In [12]:
# ini untuk versi train-test split
train_featurized_df, test_featurized_df = df_tfidf.randomSplit([0.6, 0.3], 42)

# ini untuk versi pipeline dan fine tuning dengan grid search dan cross validation
train_df, test_df = df_input.randomSplit([0.6, 0.3], 42)

In [13]:
# untuk melakukan prediksi dan menghitung akurasi
def run_prediction_eval(model, test_dataset):
    predictions = model.transform(test_dataset)
    predictions.select("label", "prediction").show(5)

    evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
    accuracy = evaluator.evaluate(predictions)
    print(f"Test set accuracy = {accuracy:.4f}")

In [14]:
# untuk menampilkan plot akurasi pada cross validation
def plot_crossval_metrics(metrics):
    max_fold = np.argmax(metrics) + 1
    max_accuracy = np.max(metrics)

    fig = px.line(x=np.arange(1, len(metrics) + 1), y=metrics, labels=dict(x="Tries", y="Accuracy"))
    fig.add_shape(go.layout.Shape(type="line", x0=max_fold, y0=0, x1=max_fold, y1=max_accuracy, line=dict(color="Orange", dash="dash")))
    fig.update_layout(xaxis=dict(tickmode='linear', dtick=1))
    fig.show()

In [15]:
# menampilkan parameter terbaik dari cross validation
def show_best_parameters(crossval_model):
    print(f"Best metrics = {np.max(crossval_model.avgMetrics):.4f}")
    for key, value in crossval_model.getEstimatorParamMaps()[np.argmax(crossval_model.avgMetrics)].items():
        print(f"{key.name} = {value:.4f}")

## Training menggunakan Train-Test Split

In [16]:
# modelling menggunakan Naive Bayes
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")
nb_model = nb.fit(train_featurized_df)

# evaluasi akurasi model
run_prediction_eval(nb_model, test_featurized_df)

+-----+----------+
|label|prediction|
+-----+----------+
|  3.0|       0.0|
|  3.0|       0.0|
|  3.0|       0.0|
|  3.0|       0.0|
|  3.0|       0.0|
+-----+----------+
only showing top 5 rows

Test set accuracy = 0.2739


In [17]:
# modelling menggunakan Logistic Regression
lr = LogisticRegression()
lr_model = lr.fit(train_featurized_df)

# evaluasi akurasi model
run_prediction_eval(lr_model, test_featurized_df)

+-----+----------+
|label|prediction|
+-----+----------+
|  3.0|       0.0|
|  3.0|       3.0|
|  3.0|       3.0|
|  3.0|       3.0|
|  3.0|       3.0|
+-----+----------+
only showing top 5 rows

Test set accuracy = 0.4814


## Training menggunakan Cross Validation dan Fine Tuning

### Model Naive Bayes

In [18]:
# membuat objek pemrosesan data
tokenizer = Tokenizer(inputCol="content", outputCol="rawWords")
stopwords_remover = StopWordsRemover(inputCol="rawWords", outputCol="words", stopWords=stopwords_indonesian)
tf = HashingTF(inputCol="words", outputCol="rawFeatures")
idf = IDF(inputCol="rawFeatures", outputCol="features")
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")

# menjadikan objek tersebut ke dalam sebuah pipeline agar bisa dieksekusi dalam satu proses berurutan
nb_pipeline = Pipeline(stages=[tokenizer, stopwords_remover, tf, idf, nb])

# membuat evaluator untuk mengukur performa model
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")

# membuat parameter grid search, untuk mencari parameter model yang terbaik
paramGrid = ParamGridBuilder() \
    .addGrid(tf.numFeatures, [10, 100, 1000]) \
    .addGrid(nb.smoothing, [0.0, 0.2, 0.4, 0.6, 0.8, 1.0]) \
    .build()

In [19]:
# melakukan proses cross validation
crossval = CrossValidator(estimator=nb_pipeline, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)
nb_cvModel = crossval.fit(train_df)

In [20]:
# plot metrics
plot_crossval_metrics(nb_cvModel.avgMetrics)

In [21]:
# menampilkan parameter yang terbaik
show_best_parameters(nb_cvModel)

Best metrics = 0.7127
numFeatures = 1000.0000
smoothing = 1.0000


In [22]:
# evaluasi model
run_prediction_eval(nb_cvModel, test_df)

+-----+----------+
|label|prediction|
+-----+----------+
|  3.0|       3.0|
|  3.0|       3.0|
|  3.0|       3.0|
|  3.0|       3.0|
|  3.0|       3.0|
+-----+----------+
only showing top 5 rows

Test set accuracy = 0.6840


### Model Logistic Regression

In [23]:
# membuat objek pemrosesan data
tokenizer = Tokenizer(inputCol="content", outputCol="rawWords")
stopwords_remover = StopWordsRemover(inputCol="rawWords", outputCol="words", stopWords=stopwords_indonesian)
tf = HashingTF(inputCol="words", outputCol="rawFeatures")
idf = IDF(inputCol="rawFeatures", outputCol="features")
lr = LogisticRegression()

# menjadikan objek tersebut ke dalam sebuah pipeline agar bisa dieksekusi dalam satu proses berurutan
lr_pipeline = Pipeline(stages=[tokenizer, stopwords_remover, tf, idf, lr])

# membuat evaluator untuk mengukur performa model
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")

# membuat parameter grid search, untuk mencari parameter model yang terbaik
paramGrid = ParamGridBuilder() \
    .addGrid(tf.numFeatures, [10, 100, 1000]) \
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .addGrid(lr.maxIter, [10, 50, 100]) \
    .build()

In [24]:
# melakukan proses cross validation
crossval = CrossValidator(estimator=lr_pipeline, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)
lr_cvModel = crossval.fit(train_df)

In [25]:
# plot metrics
plot_crossval_metrics(lr_cvModel.avgMetrics)

In [26]:
# menampilkan parameter yang terbaik
show_best_parameters(lr_cvModel)

Best metrics = 0.6687
numFeatures = 1000.0000
regParam = 0.0100
maxIter = 10.0000


In [27]:
# evaluasi model
run_prediction_eval(lr_cvModel, test_df)

+-----+----------+
|label|prediction|
+-----+----------+
|  3.0|       3.0|
|  3.0|       3.0|
|  3.0|       3.0|
|  3.0|       3.0|
|  3.0|       3.0|
+-----+----------+
only showing top 5 rows

Test set accuracy = 0.6694
