In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import input_file_name, regexp_extract, col, lit, count, when
from pyspark.sql.types import IntegerType

BASE_PATH = "hdfs://namenode:9000/user/misinfo"
PATH_REGEX = r".*/user/misinfo/([^/]+)/([^/]+)/(nodes.csv|edges.txt)"

# Initialize Spark Session with explicit HDFS configuration
spark = SparkSession.builder \
    .appName("MisinfoDataCleaning") \
    .master("local[*]") \
    .config("spark.driver.memory", "1g") \
    .config("spark.hadoop.fs.defaultFS", "hdfs://namenode:9000") \
    .config("spark.hadoop.fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem") \
    .getOrCreate()

print("Spark Session Initialized and HDFS configs applied.")

PySparkRuntimeError: [JAVA_GATEWAY_EXITED] Java gateway process exited before sending its port number.

In [None]:
def extract_and_enrich(file_name, columns, schema):
    """
    Reads files using a wildcard path, extracts the label and graph ID, 
    and adds them as new columns.
    """
    if file_name.endswith(".csv"):
        # CSV files use comma delimiter
        separator = ","
    elif file_name.endswith(".txt"):
        # FIX: Cannot use regex for separator in Spark CSV Reader. 
        # Using single space, which is the delimiter for the edges.txt files.
        separator = " "
        
    # CRITICAL FIX: The double wildcard `/*/*` ensures Spark finds files
    # nested two levels deep within the BASE_PATH (e.g., /Conspiracy/2501/nodes.csv)
    full_path = f"{BASE_PATH}/*/*/{file_name}" 
    
    # 1. EXTRACT: Read all files matching the wildcard path
    # We use 'sep=separator' to enforce the correct delimiter for each file type.
    df = spark.read \
        .csv(full_path, header=True, inferSchema=False, sep=separator) 

    # Rename columns to match the defined schema
    df = df.toDF(*columns)

    # 2. TRANSFORM (Enrichment): Extract metadata using Regex
    df = df.withColumn("file_path", input_file_name()) \
           .withColumn("label", regexp_extract(col("file_path"), PATH_REGEX, 1)) \
           .withColumn("graph_id", regexp_extract(col("file_path"), PATH_REGEX, 2)) \
           .drop("file_path")
    
    # Cast essential columns to correct types (IntegerType is critical for graph analysis)
    for column in schema:
        df = df.withColumn(column, col(column).cast(IntegerType()))

    return df

# Define column names and types 
NODE_COLS = ["id", "time", "friends", "followers"]
NODE_SCHEMA = ["id", "time", "friends", "followers"] 

EDGE_COLS = ["src_node_id", "dst_node_id"]
EDGE_SCHEMA = ["src_node_id", "dst_node_id"] 

# Read and enrich the Node and Edge DataFrames
print("Reading and enriching Nodes...")
all_nodes_df = extract_and_enrich("nodes.csv", NODE_COLS, NODE_SCHEMA)

print("Reading and enriching Edges...")
all_edges_df = extract_and_enrich("edges.txt", EDGE_COLS, EDGE_SCHEMA)

print("Initial data extraction complete.")

Reading and enriching Nodes...
Reading and enriching Edges...
Initial data extraction complete.


In [None]:
# --- INITIAL QUALITY CHECK (ON RAW MERGED DATA) ---
print("\n--- STARTING DATA QUALITY REPORT (RAW DATA) ---")

# Check for Node duplicates (same node ID within the same graph ID)
raw_node_dups_count = all_nodes_df.groupBy("graph_id", "id").count().filter("count > 1").count()
print(f"❌ FAIL: Found {raw_node_dups_count} duplicate nodes." if raw_node_dups_count > 0 else "✅ PASS: No duplicate nodes found.")

# Check for Edge duplicates (same source-destination pair within the same graph ID)
raw_edge_dups_count = all_edges_df.groupBy("graph_id", "src_node_id", "dst_node_id").count().filter("count > 1").count()
print(f"❌ FAIL: Found {raw_edge_dups_count} duplicate edges." if raw_edge_dups_count > 0 else "✅ PASS: No duplicate edges found.")

# Check for missing values (Nulls)
null_count = all_nodes_df.select([count(when(col(c).isNull(), c)).alias(c) for c in all_nodes_df.columns]).collect()[0]
total_nulls = sum(null_count[c] for c in null_count.asDict())
print(f"✅ PASS: No missing values found (Total Nulls: {total_nulls}).")

print("--- END DATA QUALITY REPORT (RAW DATA) ---")


--- STARTING DATA QUALITY REPORT (RAW DATA) ---
❌ FAIL: Found 64 duplicate nodes.
❌ FAIL: Found 16 duplicate edges.
✅ PASS: No missing values found (Total Nulls: 0).
--- END DATA QUALITY REPORT (RAW DATA) ---


In [None]:
# --- TRANSFORMATION: CLEANING AND DEDUPLICATION ---
print("\n--- FULL CLEANING PROCESS IN PROGRESS ---")

# 1. NULL VALUE HANDLING (Robustness Check)
# We use .na.drop() to remove any rows containing Null or NaN values.
# This ensures a clean dataset for graph analysis, regardless of the inspection result in Cell 3.
raw_node_count = all_nodes_df.count()
clean_nodes_df = all_nodes_df.na.drop()
null_dropped_node_count = clean_nodes_df.count()

print(f"Nodes before Null Drop: {raw_node_count}")
print(f"Nodes after Null Drop:  {null_dropped_node_count}")
print(f"Null rows removed:      {raw_node_count - null_dropped_node_count}")

# Repeat for edges
raw_edge_count = all_edges_df.count()
clean_edges_df = all_edges_df.na.drop()
null_dropped_edge_count = clean_edges_df.count()

print(f"\nEdges before Null Drop: {raw_edge_count}")
print(f"Edges after Null Drop:  {null_dropped_edge_count}")
print(f"Null rows removed:      {raw_edge_count - null_dropped_edge_count}")


# 2. DEDUPLICATION
# Next, remove exact duplicate rows based on key columns AFTER handling nulls.

# Nodes Deduplication
initial_dedup_node_count = clean_nodes_df.count()
clean_nodes_df = clean_nodes_df.dropDuplicates(["graph_id", "id"])
final_node_count = clean_nodes_df.count()

print(f"\nNodes before Deduplication: {initial_dedup_node_count}")
print(f"Nodes after Deduplication:  {final_node_count}")
print(f"Duplicates removed:         {initial_dedup_node_count - final_node_count}")


# Edges Deduplication
initial_dedup_edge_count = clean_edges_df.count()
clean_edges_df = clean_edges_df.dropDuplicates(["graph_id", "src_node_id", "dst_node_id"])
final_edge_count = clean_edges_df.count()

print(f"\nEdges before Deduplication: {initial_dedup_edge_count}")
print(f"Edges after Deduplication:  {final_edge_count}")
print(f"Duplicates removed:         {initial_dedup_edge_count - final_edge_count}")


# Cache the clean data for immediate use and verification
clean_nodes_df.cache()
clean_edges_df.cache()

print("\nFull cleaning process (Null Handling & Deduplication) complete. Clean DataFrames cached.")



--- FULL CLEANING PROCESS IN PROGRESS ---
Nodes before Null Drop: 214915
Nodes after Null Drop:  214915
Null rows removed:      0

Edges before Null Drop: 504074
Edges after Null Drop:  504074
Null rows removed:      0

Nodes before Deduplication: 214915
Nodes after Deduplication:  214851
Duplicates removed:         64

Edges before Deduplication: 504074
Edges after Deduplication:  504058
Duplicates removed:         16

Full cleaning process (Null Handling & Deduplication) complete. Clean DataFrames cached.


In [None]:
# --- LOAD: SAVING OPTIMIZED PARQUET FILES ---
CLEAN_PATH = "hdfs://namenode:9000/user/misinfo"

print("\nSaving clean data to 'all_nodes_clean.parquet'...")
# Overwrite mode is used so you can safely rerun the cell
clean_nodes_df.write.mode("overwrite").parquet(f"{CLEAN_PATH}/all_nodes_clean.parquet")

print("Saving clean data to 'all_edges_clean.parquet'...")
clean_edges_df.write.mode("overwrite").parquet(f"{CLEAN_PATH}/all_edges_clean.parquet")

print("\nSUCCESS: Clean datasets saved to HDFS!")



Saving clean data to 'all_nodes_clean.parquet'...
Saving clean data to 'all_edges_clean.parquet'...

SUCCESS: Clean datasets saved to HDFS!


In [None]:
# --- FINAL CLEANING SUMMARY REPORT ---
print("Generating Final Cleaning Summary Report...")

def get_counts(df, name_suffix):
    """Helper to get counts grouped by label."""
    return df.groupBy("label").count().withColumnRenamed("count", "count_" + name_suffix)

# Calculate and join raw vs. clean counts for Nodes and Edges
raw_n_stats = get_counts(all_nodes_df, "raw")
clean_n_stats = get_counts(clean_nodes_df, "clean")
node_comparison = raw_n_stats.join(clean_n_stats, "label", "outer").collect()

raw_e_stats = get_counts(all_edges_df, "raw")
clean_e_stats = get_counts(clean_edges_df, "clean")
edge_comparison = raw_e_stats.join(clean_e_stats, "label", "outer").collect()

def print_row(dataset, raw, clean):
    """Formats and prints a single row of the summary table."""
    removed = raw - clean
    # Prevent DivisionByZeroError if raw count is 0
    pct = (removed / raw) * 100 if raw and raw > 0 else 0.0
    print(f"|{dataset:<22}|{raw:<10}|{clean:<11}|{removed:<8}|{pct:<10.2f}|")

print("\nCLEANING SUMMARY")
print("+" + "-"*22 + "+" + "-"*10 + "+" + "-"*11 + "+" + "-"*8 + "+" + "-"*10 + "+")
print(f"|{'Dataset':<22}|{'Raw_Count':<10}|{'Cleaned_Cnt':<11}|{'Removed':<8}|{'% Removed':<10}|")
print("+" + "-"*22 + "+" + "-"*10 + "+" + "-"*11 + "+" + "-"*8 + "+" + "-"*10 + "+")

# Process Nodes
for row in node_comparison:
    label = row['label']
    if "Non" in label: pretty_name = "Non-Conspiracy Nodes"
    elif "Conspiracy" in label: pretty_name = "Conspiracy Nodes"
    else: pretty_name = "Other Nodes"
    
    print_row(pretty_name, row['count_raw'], row['count_clean'])

print("+" + "-"*22 + "+" + "-"*10 + "+" + "-"*11 + "+" + "-"*8 + "+" + "-"*10 + "+")

# Process Edges
for row in edge_comparison:
    label = row['label']
    if "Non" in label: pretty_name = "Non-Conspiracy Edges"
    elif "Conspiracy" in label: pretty_name = "Conspiracy Edges"
    else: pretty_name = "Other Edges"
    
    print_row(pretty_name, row['count_raw'], row['count_clean'])

print("+" + "-"*22 + "+" + "-"*10 + "+" + "-"*11 + "+" + "-"*8 + "+" + "-"*10 + "+")


Generating Final Cleaning Summary Report...

CLEANING SUMMARY
+----------------------+----------+-----------+--------+----------+
|Dataset               |Raw_Count |Cleaned_Cnt|Removed |% Removed |
+----------------------+----------+-----------+--------+----------+
|Conspiracy Nodes      |19072     |19047      |25      |0.13      |
|Non-Conspiracy Nodes  |157146    |157140     |6       |0.00      |
|Other Nodes           |38697     |38664      |33      |0.09      |
+----------------------+----------+-----------+--------+----------+
|Conspiracy Edges      |58227     |58223      |4       |0.01      |
|Non-Conspiracy Edges  |328501    |328501     |0       |0.00      |
|Other Edges           |117346    |117334     |12      |0.01      |
+----------------------+----------+-----------+--------+----------+
