## Import necessary libraries and connect to HDFS on server

In [1]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import pyspark

from pyspark.sql.session import SparkSession
from pyspark.sql import functions as F

LOCAL_IP = "stockvn.ddns.net"
HDFS_ADDR = "hdfs://namenode:9000/user/root"

spark = SparkSession.builder.remote(f"sc://{LOCAL_IP}:15002").appName("local_testing").getOrCreate()

print(spark.version)



4.0.1


## Load f319 data

In [2]:
file_paths = [f"{HDFS_ADDR}/f319_data/posts_{i:04d}.csv" for i in range(1, 10)]
df_demo = spark.read.csv(
    file_paths,
    header=True, 
    multiLine=True, # handle text with newlines 
    escape='"',  # handle escaped quotes
    quote='"', # hand quoted text
    encoding="utf-8" 
)

df_demo.show()

+--------------------+--------------------+----+-----------------+--------------------+
|         topic_title|           topic_url|page|        post_time|             content|
+--------------------+--------------------+----+-----------------+--------------------+
|Thương cho cổ đôn...|https://f319.com/...|   1|29/09/2025, 09:56|Nhìn VIC tăng mà ...|
|Thương cho cổ đôn...|https://f319.com/...|   1|29/09/2025, 09:58|FPT hết nhiệm vụ ...|
|Thương cho cổ đôn...|https://f319.com/...|   1|29/09/2025, 10:28|SK chắc cũng cay....|
|Thương cho cổ đôn...|https://f319.com/...|   1|29/09/2025, 13:52|VNM sắp ra tin tạ...|
|Thương cho cổ đôn...|https://f319.com/...|   1|29/09/2025, 20:21|freelancerv đã vi...|
|Thương cho cổ đôn...|https://f319.com/...|   1|29/09/2025, 21:34|Em đang chờ cơ hộ...|
|Thương cho cổ đôn...|https://f319.com/...|   1|29/09/2025, 22:38|Có j mà thượng, a...|
|Thương cho cổ đôn...|https://f319.com/...|   1|30/09/2025, 09:48|ATDPM đã viết:\n↑...|
|Thương cho cổ đôn...|https://f3

## Dropping rows with null values

In [3]:
print(f"Number of rows before: {df_demo.count()}")
df_demo = df_demo.dropna(subset=['topic_title', 'post_time', 'content'])
print(f"Number of rows after dropna: {df_demo.count()}")

Number of rows before: 110769
Number of rows after dropna: 110008


## Handle post_time column

In [4]:
# We keep the date only
df_demo = df_demo.withColumn("post_time", F.split(F.col("post_time"), ",")[0])
df_demo.show()

+--------------------+--------------------+----+----------+--------------------+
|         topic_title|           topic_url|page| post_time|             content|
+--------------------+--------------------+----+----------+--------------------+
|Thương cho cổ đôn...|https://f319.com/...|   1|29/09/2025|Nhìn VIC tăng mà ...|
|Thương cho cổ đôn...|https://f319.com/...|   1|29/09/2025|FPT hết nhiệm vụ ...|
|Thương cho cổ đôn...|https://f319.com/...|   1|29/09/2025|SK chắc cũng cay....|
|Thương cho cổ đôn...|https://f319.com/...|   1|29/09/2025|VNM sắp ra tin tạ...|
|Thương cho cổ đôn...|https://f319.com/...|   1|29/09/2025|freelancerv đã vi...|
|Thương cho cổ đôn...|https://f319.com/...|   1|29/09/2025|Em đang chờ cơ hộ...|
|Thương cho cổ đôn...|https://f319.com/...|   1|29/09/2025|Có j mà thượng, a...|
|Thương cho cổ đôn...|https://f319.com/...|   1|30/09/2025|ATDPM đã viết:\n↑...|
|Thương cho cổ đôn...|https://f319.com/...|   1|30/09/2025|stockboyHN đã viế...|
|Thương cho cổ đôn...|https:

## Handle content column

In [5]:
pattern = r"^(.*?) đã viết:\n↑\n"

df_demo = df_demo.withColumn(
    "post_type",
    F.when(F.col("content").rlike(pattern), F.lit("reply")).otherwise(F.lit('original'))
)
df_demo.show()


+--------------------+--------------------+----+----------+--------------------+---------+
|         topic_title|           topic_url|page| post_time|             content|post_type|
+--------------------+--------------------+----+----------+--------------------+---------+
|Thương cho cổ đôn...|https://f319.com/...|   1|29/09/2025|Nhìn VIC tăng mà ...| original|
|Thương cho cổ đôn...|https://f319.com/...|   1|29/09/2025|FPT hết nhiệm vụ ...| original|
|Thương cho cổ đôn...|https://f319.com/...|   1|29/09/2025|SK chắc cũng cay....| original|
|Thương cho cổ đôn...|https://f319.com/...|   1|29/09/2025|VNM sắp ra tin tạ...| original|
|Thương cho cổ đôn...|https://f319.com/...|   1|29/09/2025|freelancerv đã vi...|    reply|
|Thương cho cổ đôn...|https://f319.com/...|   1|29/09/2025|Em đang chờ cơ hộ...| original|
|Thương cho cổ đôn...|https://f319.com/...|   1|29/09/2025|Có j mà thượng, a...| original|
|Thương cho cổ đôn...|https://f319.com/...|   1|30/09/2025|ATDPM đã viết:\n↑...|    reply|

In [6]:
pattern_reply = r"^.*? đã viết:\n↑\n(.*?)(?:\nXem tất cả)?\n"
df_demo = df_demo.withColumn(
    'reply_to_post',
    F.when(
        F.col("post_type") == "reply",
        F.regexp_extract("content", pattern_reply, 1)
    )
).withColumn(
    "content",
    F.when(
        F.col('post_type') == "reply",
        # Remove the quoted part from the content 
        F.regexp_replace("content", pattern_reply, "")
    ).otherwise(F.col("content"))
)

df_demo.show()

+--------------------+--------------------+----+----------+--------------------+---------+--------------------+
|         topic_title|           topic_url|page| post_time|             content|post_type|       reply_to_post|
+--------------------+--------------------+----+----------+--------------------+---------+--------------------+
|Thương cho cổ đôn...|https://f319.com/...|   1|29/09/2025|Nhìn VIC tăng mà ...| original|                NULL|
|Thương cho cổ đôn...|https://f319.com/...|   1|29/09/2025|FPT hết nhiệm vụ ...| original|                NULL|
|Thương cho cổ đôn...|https://f319.com/...|   1|29/09/2025|SK chắc cũng cay....| original|                NULL|
|Thương cho cổ đôn...|https://f319.com/...|   1|29/09/2025|VNM sắp ra tin tạ...| original|                NULL|
|Thương cho cổ đôn...|https://f319.com/...|   1|29/09/2025|Thoái mới bay đượ...|    reply|SK chắc cũng cay....|
|Thương cho cổ đôn...|https://f319.com/...|   1|29/09/2025|Em đang chờ cơ hộ...| original|              

## Aggregate Posts by Date and Topic

**Output Schema:**  
`date | topic_title | posts`

**Description:**  
Combines all posts from the same topic within a specific date

**Posts Format:**
```text
- <original post>
- Reply to: <parent post> | Replied content: <reply>

In [7]:
from pyspark.sql.window import Window
from pyspark.sql import DataFrame, functions as F


def aggregate_posts(df: DataFrame) -> DataFrame:
    # Step 1: convert post_time to DateType
    df = df.withColumn("post_time", F.to_date("post_time", "dd/MM/yyyy"))

    # Step 2: create formatted post string
    df = df.withColumn(
        "post_string",
        F.when(
            F.col("post_type") == "reply",
            F.concat(F.lit("- Reply to content: "), F.col("reply_to_post"),
                     F.lit(" | Replied content: "), F.col("content"))
        ).otherwise(F.concat(F.lit("- "), F.col("content")))
    )
    
    df = df.select('topic_title', 'post_time', 'post_string')
    
    
    df = df.withColumnRenamed('post_time', 'date')
    
    # Step 3: aggregate data grouped by post_time and topic_title
    grouped_df = df.groupBy('date', 'topic_title').agg(
        F.concat_ws("\n", F.collect_list("post_string")).alias("posts")
    )
    # Step 4: for the current assignment only, filter out posts before 2020-01-01
    grouped_df = grouped_df.filter(grouped_df.date > "2020-01-01")
    
    return grouped_df

In [8]:
agg_df = aggregate_posts(df_demo)
agg_df.show()

+----------+--------------------+--------------------+
|      date|         topic_title|               posts|
+----------+--------------------+--------------------+
|2020-02-29|Phân tích kỹ thuậ...|- Reply to conten...|
|2020-03-09|Phân tích kỹ thuậ...|- 2 trai, 2 gái, ...|
|2020-07-30|Phân tích kỹ thuậ...|- -\n@noithatma\n...|
|2020-08-27|PVS tìm lại vinh ...|- PVS dự nay thế ...|
|2020-08-30|Phân tích kỹ thuậ...|- ad ơi ko xem đu...|
|2020-09-02|PVS tìm lại vinh ...|- Trong họ P, từ ...|
|2020-09-17|PVS tìm lại vinh ...|- Crude Oil WTI (...|
|2020-09-24|PVS tìm lại vinh ...|- Dự PVS nay sao ...|
|2020-10-10|Phân tích kỹ thuậ...|- các bạn cho tớ ...|
|2020-10-15|PVS tìm lại vinh ...|- OIL dựng đứng l...|
|2020-10-17|Phân tích kỹ thuậ...|- Mục tiêu đằng s...|
|2020-10-26|PVS tìm lại vinh ...|- Reply to conten...|
|2020-10-30|PVS tìm lại vinh ...|- https://www.pts...|
|2020-11-03|PVS tìm lại vinh ...|- OIL tăng nay P ...|
|2020-11-18|PVS tìm lại vinh ...|- Reply to conten...|
|2020-11-2

## Full Pipeline for F319 Data Preprocessing
Processed data will be saved in .csv files

In [24]:
import os
from pyspark.sql import functions as F

def f319_data_preprocessing(spark, HDFS_ADDR, num_paths=514, batch_size=100):
    """
    Preprocess f319_data posts in batches and aggregate posts per date and topic.
    
    Args:
        spark: SparkSession
        HDFS_ADDR: base HDFS path for CSV files
        num_paths: total number of CSV files
        batch_size: number of files to read per batch
    """
    
    for batch_start in range(1, num_paths + 1, batch_size):
        batch_end = min(batch_start + batch_size, num_paths)
        
        
        file_paths = [f"{HDFS_ADDR}/f319_data/posts_{i:04d}.csv" for i in range(batch_start, batch_end)]
        print(f"Processing batch {batch_start}-{batch_end-1} ({len(file_paths)} files)...")
        
        # Read CSV batch
        df = spark.read.csv(
            file_paths,
            header=True,
            multiLine=True,
            escape='"',
            quote='"',
            encoding="utf-8"
        )
        
        
        # Drop rows with nulls in essential columns
        df = df.dropna(subset=['topic_title', 'post_time', 'content'])
        
        # Clean and convert post_time
        df = (df
              .withColumn("post_time", F.split(F.col("post_time"), ",")[0])
              .withColumn("post_time", F.to_date("post_time", "dd/MM/yyyy"))
        )
        
        # Detect replies and original posts
        reply_pattern = r"^(.*?) đã viết:\n↑\n"
        reply_extract = r"^.*? đã viết:\n↑\n(.*?)(?:\nXem tất cả)?\n"
        
        df = (df
              .withColumn("post_type", F.when(F.col("content").rlike(reply_pattern), "reply").otherwise("original"))
              .withColumn("reply_to_post", F.when(F.col("post_type") == "reply", F.regexp_extract("content", reply_extract, 1)))
              .withColumn("content", F.when(F.col("post_type") == "reply",
                                             F.regexp_replace("content", reply_extract, "")
                                            ).otherwise(F.col("content")))
        )
        
        # Aggregate posts using your aggregate_posts function
        agg_df = aggregate_posts(df)
        
        # Write aggregated batch as CSV
        output_path = f"{HDFS_ADDR}/aggregated_f319_data/aggregated_f319_posts_{batch_start}-{batch_end-1}"
        agg_df.write.csv(
            output_path,
            mode="overwrite",
            header=True
        )
        print(f"Saved aggregated batch to {output_path}")


In [25]:
f319_data_preprocessing(spark, HDFS_ADDR)

Processing batch 1-100 (100 files)...
Saved aggregated batch to hdfs://namenode:9000/user/root/aggregated_f319_data/aggregated_f319_posts_1-100
Processing batch 101-200 (100 files)...
Saved aggregated batch to hdfs://namenode:9000/user/root/aggregated_f319_data/aggregated_f319_posts_101-200
Processing batch 201-300 (100 files)...
Saved aggregated batch to hdfs://namenode:9000/user/root/aggregated_f319_data/aggregated_f319_posts_201-300
Processing batch 301-400 (100 files)...
Saved aggregated batch to hdfs://namenode:9000/user/root/aggregated_f319_data/aggregated_f319_posts_301-400
Processing batch 401-500 (100 files)...
Saved aggregated batch to hdfs://namenode:9000/user/root/aggregated_f319_data/aggregated_f319_posts_401-500
Processing batch 501-513 (13 files)...
Saved aggregated batch to hdfs://namenode:9000/user/root/aggregated_f319_data/aggregated_f319_posts_501-513


In [38]:
batch_path = f"{HDFS_ADDR}/aggregated_f319_data/aggregated_f319_posts_1-100"
df = spark.read.csv(
    batch_path,
    header=True,
    multiLine=True,
    encoding="utf-8"
)


In [41]:
df.show(5, truncate=100)

+----------+----------------------------------------------------------------+----------------------------------------------------------------------------------------------------+
|      date|                                                     topic_title|                                                                                               posts|
+----------+----------------------------------------------------------------+----------------------------------------------------------------------------------------------------+
|2020-01-02|Hagl+Thaco= Hành trình vĩ đại của Vua trái cây,nông sản Châu Á !|- HAGL ( HNG/ HAG ) Thập kỷ mất mát đã qua !\nĐiểm tin HAGL :\n1/ HNG giảm được hơn 4000 tỷ trái ...|
|2020-01-02|                               Tôi uống cả em ... và uống cả ...|- Uống có trách nhiệm ... nhẽ nên nhìn nhận ... là một nét văn minh ...\nĐã uống rượu bia ... thì...|
|2020-01-03|      Phân tích kỹ thuật (TA) - (chỉ chia sẻ kiến thức, ko spam)|- Reply to content: Bác nào 

In [40]:
df.count()

125461

In [30]:
from pyspark.sql.functions import col
null_count_date = df.filter(col('date').isNull()).count()
null_count_topic = df.filter(col('topic_title').isNull()).count()
null_count_posts = df.filter(col('posts').isNull()).count()

print(f"Null count in 'date': {null_count_date}")
print(f"Null count in 'topic_title': {null_count_topic}")
print(f"Null count in 'posts': {null_count_posts}")

Null count in 'date': 0
Null count in 'topic_title': 0
Null count in 'posts': 0
