In [1]:
import findspark
findspark.init()

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Spark HDFS Jupyter") \
    .master("local[*]") \
    .getOrCreate()

spark

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/12/26 22:59:38 WARN Utils: Your hostname, DuLieuLonUbuntu, resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
25/12/26 22:59:39 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/26 23:00:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Xử lí dữ liệu
## Tải dữ liệu lên HDFS

In [None]:
path_fake = "hdfs://localhost:9000/user/hdoop/newsdata/Fake.csv"
path_true = "hdfs://localhost:9000/user/hdoop/newsdata/True.csv"

df_fake = spark.read.csv(path_fake, header=True, inferSchema=True)
df_true = spark.read.csv(path_true, header=True, inferSchema=True)

df_fake.show(5)
df_true.show(5)

                                                                                

+--------------------+--------------------+-------+-----------------+
|               title|                text|subject|             date|
+--------------------+--------------------+-------+-----------------+
| Donald Trump Sen...|Donald Trump just...|   News|December 31, 2017|
| Drunk Bragging T...|House Intelligenc...|   News|December 31, 2017|
| Sheriff David Cl...|On Friday, it was...|   News|December 30, 2017|
| Trump Is So Obse...|On Christmas day,...|   News|December 29, 2017|
| Pope Francis Jus...|Pope Francis used...|   News|December 25, 2017|
+--------------------+--------------------+-------+-----------------+
only showing top 5 rows
+--------------------+--------------------+------------+------------------+
|               title|                text|     subject|              date|
+--------------------+--------------------+------------+------------------+
|As U.S. budget fi...|WASHINGTON (Reute...|politicsNews|December 31, 2017 |
|U.S. military to ...|WASHINGTON (Reute...

## Gán label true = 1, fake = 0

In [None]:
from pyspark.sql.functions import lit

df_fake = df_fake.withColumn("label", lit(0))
df_true = df_true.withColumn("label", lit(1))

data = df_fake.union(df_true)

df_fake.count()

                                                                                

23489

In [None]:
data.show(5)

                                                                                

+--------------------+--------------------+-------+-----------------+-----+
|               title|                text|subject|             date|label|
+--------------------+--------------------+-------+-----------------+-----+
| Donald Trump Sen...|Donald Trump just...|   News|December 31, 2017|    0|
| Drunk Bragging T...|House Intelligenc...|   News|December 31, 2017|    0|
| Sheriff David Cl...|On Friday, it was...|   News|December 30, 2017|    0|
| Trump Is So Obse...|On Christmas day,...|   News|December 29, 2017|    0|
| Pope Francis Jus...|Pope Francis used...|   News|December 25, 2017|    0|
+--------------------+--------------------+-------+-----------------+-----+
only showing top 5 rows


### Xóa dòng có dữ liệu NULL

In [None]:
from pyspark.sql.functions import col, col, count, when

data.select([count(when(col(c).isNull(), c)).alias(c) for c in data.columns]).show()


                                                                                

+-----+----+-------+----+-----+
|title|text|subject|date|label|
+-----+----+-------+----+-----+
|    0|   8|      8|   8|    0|
+-----+----+-------+----+-----+



In [None]:
data.dropna(how='any')

DataFrame[title: string, text: string, subject: string, date: string, label: int]

In [None]:
data.count()

                                                                                

44906

In [None]:
from pyspark.sql.functions import col

data.printSchema()

data_safe = data.filter(col("text").isNotNull())

data_safe.count()

data = data_safe.dropDuplicates()
data.count()

root
 |-- title: string (nullable = true)
 |-- text: string (nullable = true)
 |-- subject: string (nullable = true)
 |-- date: string (nullable = true)
 |-- label: integer (nullable = false)



                                                                                

44466

### Làm sạch dữ liệu(loại bỏ tên trang báo và khu vực ở đầu)

In [None]:
from pyspark.sql.functions import regexp_replace, col, trim

# Giải thích Regex:
# ^          : Bắt đầu dòng
# .*?        : Bất kỳ ký tự nào (Tên thành phố, bang...)
# \(.*?\)    : Nội dung trong ngoặc đơn (Tên hãng tin bất kỳ)
# \s*-\s* : Dấu gạch ngang ngăn cách
# Ví dụ sẽ khớp: "WASHINGTON (Reuters) - " hoặc "NEW YORK/LONDON (AP) - "
robust_pattern = r"^.*?\s*\(.*?\)\s*-\s*"

# Thực hiện xóa
data_clean_text = data.withColumn("text", regexp_replace(col("text"), robust_pattern, ""))
data_clean_text = data_clean_text.withColumn("text", trim(col("text")))

# Kiểm tra kết quả
print("--- Mẫu dữ liệu sau khi làm sạch Dateline ---")
data_clean_text.filter("label = 1").select("text").show(5, truncate=100)
data = data_clean_text

--- Mẫu dữ liệu sau khi làm sạch Dateline ---


[Stage 27:>                                                         (0 + 1) / 1]

+----------------------------------------------------------------------------------------------------+
|                                                                                                text|
+----------------------------------------------------------------------------------------------------+
|Republicans could hold onto control of Virginia’s legislature after a race that had appeared to c...|
|Michigan Governor Rick Snyder said on Friday the state would hold a special election on Nov. 6, 2...|
|U.S. Special Counsel Robert Mueller’s office has spent about $3.2 million in the first 4-1/2 mont...|
|President Donald Trump’s 90 percent cut to Obamacare advertising has U.S. health insurers in many...|
|President Donald Trump will announce next week his choice for who will lead the Federal Reserve, ...|
+----------------------------------------------------------------------------------------------------+
only showing top 5 rows


                                                                                

In [None]:
data.cache()

DataFrame[title: string, text: string, subject: string, date: string, label: int]

# Test mô hình

In [None]:
import numpy as np
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover

### Tiền sử lí

In [None]:
regex_tokenizer = RegexTokenizer(inputCol="text", outputCol="words", pattern="\\W+")

data_tokenized = regex_tokenizer.transform(data)

remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")

data_cleaned = remover.transform(data_tokenized)

data_cleaned.select("text", "words", "filtered_words").show(5, truncate=50)

[Stage 32:>                                                         (0 + 1) / 1]

+--------------------------------------------------+--------------------------------------------------+--------------------------------------------------+
|                                              text|                                             words|                                    filtered_words|
+--------------------------------------------------+--------------------------------------------------+--------------------------------------------------+
|This maniac wants  The Handmaid s Tale  to beco...|[this, maniac, wants, the, handmaid, s, tale, t...|[maniac, wants, handmaid, tale, become, reality...|
|West Virginia is solid Trump country. The argum...|[west, virginia, is, solid, trump, country, the...|[west, virginia, solid, trump, country, argumen...|
|President Donald Trump has had some strong word...|[president, donald, trump, has, had, some, stro...|[president, donald, trump, strong, words, fierc...|
|Earlier today, Donald Trump once again disgrace...|[earlier, today, d

                                                                                

In [None]:
data_cleaned.show(5, truncate=50)

+--------------------------------------------------+--------------------------------------------------+-------+------------------+-----+--------------------------------------------------+--------------------------------------------------+
|                                             title|                                              text|subject|              date|label|                                             words|                                    filtered_words|
+--------------------------------------------------+--------------------------------------------------+-------+------------------+-----+--------------------------------------------------+--------------------------------------------------+
| WATCH: Republican Lawmaker Claims Forcing Wome...|This maniac wants  The Handmaid s Tale  to beco...|   News|  November 6, 2017|    0|[this, maniac, wants, the, handmaid, s, tale, t...|[maniac, wants, handmaid, tale, become, reality...|
| W. Virginia Halloween Store Boasts Shockin

Dùng kỹ thuật TF-IDF (Term Frequency - Inverse Document Frequency).

TF (HashingTF): Đếm tần suất xuất hiện của từ trong bài viết. (Từ nào xuất hiện nhiều trong 1 bài thì quan trọng với bài đó).

IDF: Đánh giá độ "hiếm" của từ trong toàn bộ tập dữ liệu. (Từ nào bài nào cũng có thì ít quan trọng, từ nào hiếm mới là đặc trưng).

In [None]:
from pyspark.ml.feature import HashingTF, IDF

hashingTF = HashingTF(inputCol="filtered_words", outputCol="rawFeatures", numFeatures=10000)
featurized_data = hashingTF.transform(data_cleaned)

idf = IDF(inputCol="rawFeatures", outputCol="features")
idf_model = idf.fit(featurized_data)

final_data = idf_model.transform(featurized_data)

final_data.select("filtered_words", "features").show(5, truncate=50)

                                                                                

+--------------------------------------------------+--------------------------------------------------+
|                                    filtered_words|                                          features|
+--------------------------------------------------+--------------------------------------------------+
|[maniac, wants, handmaid, tale, become, reality...|(10000,[148,217,291,320,353,379,391,398,453,503...|
|[west, virginia, solid, trump, country, argumen...|(10000,[80,157,366,452,461,488,608,649,659,665,...|
|[president, donald, trump, strong, words, fierc...|(10000,[35,132,141,157,223,366,387,452,524,548,...|
|[earlier, today, donald, trump, disgraced, unit...|(10000,[94,132,366,429,452,479,505,533,673,688,...|
|[donald, trump, drags, us, war, north, korea, w...|(10000,[12,132,224,366,383,419,452,613,666,743,...|
+--------------------------------------------------+--------------------------------------------------+
only showing top 5 rows


# su dung Logistic Regression kiểm tra mô hình


In [None]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

train_data, val_data, test_data = final_data.randomSplit([0.8, 0.1, 0.1], seed=42)

train_data.groupby("label").count().show()
val_data.groupby("label").count().show()
test_data.groupby("label").count().show()

lr = LogisticRegression(featuresCol='features', labelCol='label')
lr_model = lr.fit(train_data)

                                                                                

+-----+-----+
|label|count|
+-----+-----+
|    1|16966|
|    0|18552|
+-----+-----+



                                                                                

+-----+-----+
|label|count|
+-----+-----+
|    1| 2102|
|    0| 2343|
+-----+-----+



                                                                                

+-----+-----+
|label|count|
+-----+-----+
|    1| 2143|
|    0| 2360|
+-----+-----+



                                                                                

In [None]:
predictions = lr_model.transform(test_data)

predictions.select("label", "features", "prediction").show(5)

                                                                                

+-----+--------------------+----------+
|label|            features|prediction|
+-----+--------------------+----------+
|    0|(10000,[70,132,42...|       0.0|
|    0|(10000,[7,43,47,5...|       0.0|
|    0|(10000,[7,132,201...|       0.0|
|    1|(10000,[15,87,108...|       1.0|
|    1|(10000,[1,14,132,...|       1.0|
+-----+--------------------+----------+
only showing top 5 rows


In [None]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Test Accuracy = {accuracy}")

                                                                                

Test Accuracy = 0.9704641350210971


In [None]:
print(f"Test Accuracy = {accuracy * 100:.2f}%")

Test Accuracy = 97.05%


# Tạo Pipiline đưa dữ liệu và train

In [None]:
data_clean_text.show(5)

+--------------------+--------------------+-------+------------------+-----+
|               title|                text|subject|              date|label|
+--------------------+--------------------+-------+------------------+-----+
| WATCH: Republica...|This maniac wants...|   News|  November 6, 2017|    0|
| W. Virginia Hall...|West Virginia is ...|   News|  October 19, 2017|    0|
| Trump Is So Bad ...|President Donald ...|   News|September 27, 2017|    0|
| Defense Secretar...|Earlier today, Do...|   News| September 3, 2017|    0|
| Trump STUPIDLY A...|If Donald Trump d...|   News| September 3, 2017|    0|
+--------------------+--------------------+-------+------------------+-----+
only showing top 5 rows


In [None]:
data.show(5)

+--------------------+--------------------+-------+------------------+-----+
|               title|                text|subject|              date|label|
+--------------------+--------------------+-------+------------------+-----+
| WATCH: Republica...|This maniac wants...|   News|  November 6, 2017|    0|
| W. Virginia Hall...|West Virginia is ...|   News|  October 19, 2017|    0|
| Trump Is So Bad ...|President Donald ...|   News|September 27, 2017|    0|
| Defense Secretar...|Earlier today, Do...|   News| September 3, 2017|    0|
| Trump STUPIDLY A...|If Donald Trump d...|   News| September 3, 2017|    0|
+--------------------+--------------------+-------+------------------+-----+
only showing top 5 rows


In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import NGram

train_data, val_data, test_data = data.randomSplit([0.8, 0.1, 0.1], seed=42)

# 1. Các bước cũ
regexTokenizer = RegexTokenizer(inputCol="text", outputCol="words", pattern="\\W+")
remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")



# 2. Tạo N-Grams (Ghép 2 từ liền nhau)
ngram = NGram(n=2, inputCol="filtered_words", outputCol="bigrams")

# 3. HasingTF
hashingTF = HashingTF(inputCol="bigrams", outputCol="rawFeatures", numFeatures=20000)
idf = IDF(inputCol="rawFeatures", outputCol="features")
lr = LogisticRegression(featuresCol="features", labelCol="label")

# 4. Pipeline mới
pipeline = Pipeline(stages=[regexTokenizer, remover, ngram, hashingTF, idf, lr])

# 5. Train lại và Lưu lại model
pipeline_model = pipeline.fit(train_data)
pipeline_model.write().overwrite().save("hdfs://localhost:9000/user/hdoop/fake_news_model_final")

                                                                                

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

def evaluate_model(data, name):
    predictions = pipeline_model.transform(data)
    evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
    accuracy = evaluator.evaluate(predictions)
   
    evaluator_f1 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
    f1_score = evaluator_f1.evaluate(predictions)

    print(f"{name} - Accuracy: {accuracy * 100:.2f}%, F1 Score: {f1_score * 100:.2f}%")

    predictions.groupby("label", "prediction").count().show()
evaluate_model(train_data, "Train Data")
evaluate_model(val_data, "Validation Data")
evaluate_model(test_data, "Test Data")

                                                                                

Train Data - Accuracy: 100.00%, F1 Score: 100.00%


                                                                                

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    0|       0.0|18552|
|    1|       1.0|16965|
|    1|       0.0|    1|
+-----+----------+-----+



                                                                                

Validation Data - Accuracy: 94.89%, F1 Score: 94.89%


                                                                                

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    0|       0.0| 2242|
|    1|       1.0| 1976|
|    1|       0.0|  126|
|    0|       1.0|  101|
+-----+----------+-----+



                                                                                

Test Data - Accuracy: 94.91%, F1 Score: 94.91%


                                                                                

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    0|       0.0| 2260|
|    1|       1.0| 2014|
|    0|       1.0|  100|
|    1|       0.0|  129|
+-----+----------+-----+

