# Pemrosesan Data

## pembersihan data dan transformasi data

In [None]:
from pyspark.sql.functions import col, regexp_replace, to_date, concat_ws
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from pyspark.sql import SparkSession 

# Membuat SparkSession
spark = SparkSession.builder.appName("PreprocessingNewsData").getOrCreate()

# Membaca data dari file CSV
df = spark.read.csv("hdfs://hadoop-namenode:8020/user/news_data/articles_data.csv", header=True, inferSchema=True)

# Membersihkan teks dengan menghapus karakter khusus dari kolom title, reporter, editor, dan content
df_clean = df.withColumn("title", regexp_replace(col("title"), "[^a-zA-Z0-9\\s]", "")) \
             .withColumn("reporter", regexp_replace(col("reporter"), "[^a-zA-Z0-9\\s]", "")) \
             .withColumn("editor", regexp_replace(col("editor"), "[^a-zA-Z0-9\\s]", "")) \
             .withColumn("content", regexp_replace(col("content"), "[^a-zA-Z0-9\\s]", ""))

# Konversi kolom date_time ke format tanggal
df_clean = df_clean.withColumn("date_time", to_date(col("date_time"), "yyyy-MM-dd"))

# Tokenisasi teks pada kolom title dan content
tokenizer_title = Tokenizer(inputCol="title", outputCol="title_words")
tokenizer_content = Tokenizer(inputCol="content", outputCol="content_words")

df_tokenized = tokenizer_title.transform(df_clean)
df_tokenized = tokenizer_content.transform(df_tokenized)

# Menghapus stopwords dari kolom title dan content
remover_title = StopWordsRemover(inputCol="title_words", outputCol="title_filtered")
remover_content = StopWordsRemover(inputCol="content_words", outputCol="content_filtered")

df_preprocessed = remover_title.transform(df_tokenized)
df_preprocessed = remover_content.transform(df_preprocessed)

# Mengonversi kolom array menjadi string menggunakan concat_ws
df_result = df_preprocessed.withColumn("title_filtered", concat_ws(" ", col("title_filtered"))) \
                           .withColumn("content_filtered", concat_ws(" ", col("content_filtered")))

# Menampilkan hasil akhir preprocessing
df_result.select("title_filtered", "reporter", "editor", "date_time", "content_filtered").show(5, truncate=False)

# Menyimpan hasil preprocessing ke HDFS sebagai CSV
output_path = "hdfs://hadoop-namenode:8020/user/news_data/hasilpreprocessing.csv"

df_result.select("title_filtered", "reporter", "editor", "date_time", "content_filtered") \
         .write.csv(output_path, header=True, mode="overwrite")

#df = spark.read.csv("hdfs://hadoop-namenode:8020/user/news_data/articles_data.csv", header=True, inferSchema=True)


# Pemodelan
## Analisis Tren Kata Kunci Menggunakan TF-IDF

In [None]:
from pyspark.ml.feature import Tokenizer, HashingTF, IDF
from pyspark.sql.functions import col, concat_ws, udf
from pyspark.sql.types import ArrayType, FloatType
from pyspark.sql import SparkSession

# Membuat SparkSession
spark = SparkSession.builder.appName("TFIDFProcessing").getOrCreate()

# Membaca data hasil preprocessing dari file CSV
input_path = "hdfs://hadoop-namenode:8020/user/news_data/hasilpreprocessing.csv"
df_result = spark.read.csv(input_path, header=True, inferSchema=True)

# Menggabungkan kata-kata yang telah difilter dari kolom title dan content
df_filtered = df_result.withColumn("filtered_words", concat_ws(" ", col("title_filtered"), col("content_filtered")))

# Tokenisasi teks dari kolom filtered_words
tokenizer_filtered = Tokenizer(inputCol="filtered_words", outputCol="words")
df_tokenized_filtered = tokenizer_filtered.transform(df_filtered)

# Menghitung Term Frequency (TF)
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=1000)
featurizedData = hashingTF.transform(df_tokenized_filtered)

# Menghitung Inverse Document Frequency (IDF)
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

# Konversi kolom features (vektor) menjadi array float
def vector_to_array(v):
    return v.toArray().tolist()

vector_to_array_udf = udf(vector_to_array, ArrayType(FloatType()))
rescaledData = rescaledData.withColumn("features_array", vector_to_array_udf(col("features")))

# Mengonversi array menjadi string (separated by commas)
rescaledData = rescaledData.withColumn("features_string", concat_ws(",", col("features_array")))

# Menampilkan hasil TF-IDF
rescaledData.select("filtered_words", "features_string").show(5)

# Menyimpan hasil TF-IDF ke HDFS sebagai CSV
output_path = "hdfs://hadoop-namenode:8020/user/news_data/hasil_tfidf.csv"
rescaledData.select("filtered_words", "features_string") \
            .write.csv(output_path, header=True, mode="overwrite")


## Analisis Sentimen Menggunakan Naive Bayes

In [None]:
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.classification import NaiveBayes

# Menambahkan label sentimen (contoh sederhana untuk demonstrasi)
from pyspark.sql.functions import when
df_labeled = df_filtered.withColumn("label", 
            when(df_filtered["title"].contains("baik"), 1)
            .when(df_filtered["title"].contains("buruk"), -1)
            .otherwise(0))

# Mengubah teks menjadi fitur dengan CountVectorizer
cv = CountVectorizer(inputCol="filtered_words", outputCol="features")
cv_model = cv.fit(df_labeled)
df_vectorized = cv_model.transform(df_labeled)

# Membagi data menjadi train dan test
train_data, test_data = df_vectorized.randomSplit([0.8, 0.2], seed=1234)

# Melatih model Naive Bayes
nb = NaiveBayes()
model = nb.fit(train_data)

# Memprediksi data uji
predictions = model.transform(test_data)
predictions.select("filtered_words", "label", "prediction").show(5)

# Evaluasi Model
## Akurasi, Precision, Recall, dan F1-Score

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Menghitung Akurasi
accuracy_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = accuracy_evaluator.evaluate(predictions)
print(f"Akurasi Model: {accuracy}")

# Menghitung Precision
precision_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
precision = precision_evaluator.evaluate(predictions)
print(f"Precision Model: {precision}")

# Menghitung Recall
recall_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")
recall = recall_evaluator.evaluate(predictions)
print(f"Recall Model: {recall}")

# Menghitung F1-Score
f1_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
f1_score = f1_evaluator.evaluate(predictions)
print(f"F1-Score Model: {f1_score}")

# Confusion Matrix

In [None]:
from pyspark.mllib.evaluation import MulticlassMetrics

# Mengonversi DataFrame ke RDD untuk MulticlassMetrics
prediction_and_labels = predictions.select("prediction", "label").rdd.map(lambda x: (float(x['prediction']), float(x['label'])))

# Menghitung Confusion Matrix
metrics = MulticlassMetrics(prediction_and_labels)
confusion_matrix = metrics.confusionMatrix()
print("Confusion Matrix:")
print(confusion_matrix)

# Visualisasi Hasil
## Visualisasi Distribusi Sentimen

In [None]:
import pandas as pd
import matplotlib.pyplot as plt

# Mengunduh hasil prediksi dari HDFS
# hdfs dfs -get /user/username/news_data/sentiment_predictions ~/sentiment_predictions

# Membaca data hasil prediksi
df = pd.read_csv('~/sentiment_predictions/part-00000', names=['title', 'filtered_words', 'label', 'prediction'])

# Menghitung distribusi sentimen
sentiment_counts = df['prediction'].value_counts()
sentiment_labels = ['Negatif (-1)', 'Netral (0)', 'Positif (1)']

# Plot Pie Chart Sentimen
plt.figure(figsize=(8, 6))
plt.pie(sentiment_counts, labels=sentiment_labels, autopct='%1.1f%%', startangle=140)
plt.title('Distribusi Sentimen Berita')
plt.show()

# Word Cloud untuk Kata Kunci Populer

In [None]:
from wordcloud import WordCloud

# Menggabungkan semua kata dari kolom 'filtered_words'
all_words = ' '.join(df['filtered_words'].dropna())

# Membuat Word Cloud
wordcloud = WordCloud(width=800, height=600, background_color='white').generate(all_words)

# Menampilkan Word Cloud
plt.figure(figsize=(10, 8))
plt.imshow(wordcloud, interpolation='bilinear')
plt.axis('off')
plt.title('Word Cloud Kata Kunci Populer')
plt.show()