# Libraries

In [1]:
import os
import subprocess
import time
import threading
import re
import praw
import nltk
from nltk.corpus import words
import sys, findspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, current_timestamp

# NLTK (Natural Language Toolkit)  Setup
1. `words` corpus is downloaded to use a list of valid English words for keyword validation, filtering, or cleaning operations.
2. corpus is converted into a lowercase 

In [2]:

nltk.download('words')
english_words = set(w.lower() for w in words.words())

[nltk_data] Downloading package words to C:\Users\Adarsh
[nltk_data]     Ranjan\AppData\Roaming\nltk_data...
[nltk_data]   Package words is already up-to-date!


# PATHS
1. Local + HDFS

In [3]:
# Local directories
raw_batches = r"E:\Coding\BDA-PySpark\realtime-pipeline\reddit_streaming\raw_batches"
filtered_batches = r"E:\Coding\BDA-PySpark\realtime-pipeline\reddit_streaming\filtered_batches"
os.makedirs(raw_batches, exist_ok=True)
os.makedirs(filtered_batches, exist_ok=True)

# HDFS directories
hdfs_uri = "hdfs://localhost:9000"
hdfs_raw_dir = f"{hdfs_uri}/user/adarsh/realtime_pipeline/raw_batches"
hdfs_filtered_dir = f"{hdfs_uri}/user/adarsh/realtime_pipeline/filtered_batches"

HDFS_CMD = r"E:\hadoop\bin\hdfs.cmd"

# Spark checkpoint
checkpoint_dir = r"E:\Coding\BDA-PySpark\realtime-pipeline\checkpoints"

os.environ['HADOOP_USER_NAME'] = 'AdarshRanjan'

# Reddit API Connection
1. Uses PRAW (Python Reddit API Wrapper) library.

In [4]:
reddit = praw.Reddit(
    client_id = "ID",
    client_secret = "secret", 
    user_agent="user"
)
print(" Reddit API connected.")

 Reddit API connected.


# Data and Log Cleanup Script

In [5]:
import shutil

log = r"E:\Coding\BDA-PySpark\realtime-pipeline\logs"



confirm = input("This will delete local batch files, logs, checkpoints, and HDFS data. Continue? (y/n): ")

for local_dir, hdfs_dir in [(raw_batches, hdfs_raw_dir), (filtered_batches, hdfs_filtered_dir)]:
    # Local batch cleanup
    if confirm.lower() == 'y' and os.path.exists(local_dir):
        deleted_files = 0
        for f in os.listdir(local_dir):
            if (f.startswith("r_batch") or f.startswith("f_batch")) and f.endswith(".txt"):
                os.remove(os.path.join(local_dir, f))
                deleted_files += 1
        print(f"[LOCAL] Deleted {deleted_files} old batch files in {local_dir}")
    else:
        print(f"[LOCAL] Skipped deletion of local files in {local_dir}")

    # HDFS cleanup 
    if confirm.lower() == 'y':
        try:
            subprocess.run([HDFS_CMD, "dfs", "-rm", "-r", "-skipTrash", hdfs_dir],
                           check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
            print(f"[HDFS] Deleted existing HDFS folder {hdfs_dir}")
        except subprocess.CalledProcessError:
            print(f"[HDFS] HDFS folder {hdfs_dir} did not exist or could not be deleted")
    else:
        print(f"[HDFS] Skipped deletion of HDFS folder {hdfs_dir}")

    # Ensure local folder exists 
    os.makedirs(local_dir, exist_ok=True)
    print(f"[LOCAL] Ensured local folder exists: {local_dir}")

    # Create placeholder file 
    placeholder_file = os.path.join(local_dir, "Placeholder.txt")
    with open(placeholder_file, "w", encoding="utf-8") as f:
        f.write("placeholder\n")
    print(f"[LOCAL] Created placeholder file: {placeholder_file}")

    # Ensure HDFS folder exists 
    subprocess.run([HDFS_CMD, "dfs", "-mkdir", "-p", hdfs_dir], check=True)
    print(f"[HDFS] Ensured HDFS folder exists: {hdfs_dir}")

    # Upload placeholder to HDFS
    subprocess.run([HDFS_CMD, "dfs", "-put", "-f", placeholder_file, hdfs_dir], check=True)
    print(f"[HDFS] Uploaded placeholder file to HDFS: {hdfs_dir}")

# Clean up logs 
if confirm.lower() == 'y':
    if os.path.exists(log):
        try:
            shutil.rmtree(log)
            print(f"[LOCAL] Deleted entire logs directory: {log}")
            
            # Recreate empty directory
            os.makedirs(log, exist_ok=True)
            print(f"[LOCAL] Recreated empty logs directory: {log}")
            
        except Exception as e:
            print(f"[LOCAL] Failed to delete logs directory: {e}")
    else:
        print(f"[LOCAL] Logs directory doesn't exist: {log}")
else:
    print(f"[LOCAL] Skipped deletion (no recreation)")

print("\n Cleanup and placeholder setup complete.")

[LOCAL] Skipped deletion of local files in E:\Coding\BDA-PySpark\realtime-pipeline\reddit_streaming\raw_batches
[HDFS] Skipped deletion of HDFS folder hdfs://localhost:9000/user/adarsh/realtime_pipeline/raw_batches
[LOCAL] Ensured local folder exists: E:\Coding\BDA-PySpark\realtime-pipeline\reddit_streaming\raw_batches
[LOCAL] Created placeholder file: E:\Coding\BDA-PySpark\realtime-pipeline\reddit_streaming\raw_batches\Placeholder.txt
[HDFS] Ensured HDFS folder exists: hdfs://localhost:9000/user/adarsh/realtime_pipeline/raw_batches
[HDFS] Uploaded placeholder file to HDFS: hdfs://localhost:9000/user/adarsh/realtime_pipeline/raw_batches
[LOCAL] Skipped deletion of local files in E:\Coding\BDA-PySpark\realtime-pipeline\reddit_streaming\filtered_batches
[HDFS] Skipped deletion of HDFS folder hdfs://localhost:9000/user/adarsh/realtime_pipeline/filtered_batches
[LOCAL] Ensured local folder exists: E:\Coding\BDA-PySpark\realtime-pipeline\reddit_streaming\filtered_batches
[LOCAL] Created pla

# Emoji Replacement
1. Converts emojis in words emotions

In [6]:
emoji_dict = {
    "😂": "joy", "🤣": "laugh", "😊": "happy", "😄": "smile", "😁": "grin", "😆": "laugh",
    "😃": "smile", "😎": "cool", "👍": "like", "❤️": "love", "💖": "love", "💯": "great",

    "😢": "sad", "😭": "cry", "🙁": "sad", "😔": "disappointed", "😟": "worried", "😞": "sad",
    "😩": "tired", "😫": "tired",

    "😡": "angry", "😠": "angry", "👿": "angry", "💀": "death", "🔥": "fire", "⚡": "shock",

    "😍": "love", "😘": "kiss", "🥰": "love", "💘": "love", "💓": "love", "💝": "love",

    "😲": "surprise", "😮": "surprise", "😳": "shocked", "😱": "shocked", "🤯": "mindblown",

    "😴": "sleepy", "💤": "sleep", "😌": "relieved", "😪": "sleepy", "😇": "innocent",

    "👏": "clap", "🤝": "handshake", "✨": "sparkle", "🌟": "star", "🎉": "celebration", "🎊": "celebration",

    "👎": "dislike", "💔": "heartbroken", "☹️": "sad", "🤢": "disgust", "🤮": "disgust",

    "🤔": "thinking", "🙄": "eyeroll", "🤷": "shrug", "😐": "neutral", "😶": "silent", "😬": "nervous"
}

def replace_emojis(text):
    for emoji, meaning in emoji_dict.items():
        text = text.replace(emoji, f" {meaning} ")
    return text


# Data Pre-Processing

`clean_raw_text`: For Topic Modeling\
pre-processing for raw batch files

In [7]:
from nltk.corpus import stopwords
from nltk import pos_tag
from nltk.stem import WordNetLemmatizer
import re

lemmatizer = WordNetLemmatizer()

# Stop words list
stop_words = {
    "a", "an", "the", "and", "or", "is", "are", "to", "of", "in", "that", "this",
    "it", "on", "for", "with", "as", "was", "at", "by", "be", "from", "has", "have",
    "u", "im", "yeah", "oh"
}

def clean_raw_text(text, min_words=50, chunk_size=150):
    
    original_text = text
    
    text = replace_emojis(text)
    
    text = re.sub(r"http\S+|www\S+|https\S+", "", text)
    text = re.sub(r"[@#]\w+", "", text)
    
    text = text.lower()
    
    text = re.sub(r"[^a-z\s.,!?'-]", "", text)
    text = re.sub(r"\s+", " ", text).strip()
    
    text = re.sub(r"(\w)'m(\w)", r"\1m\2", text)  
    text = re.sub(r"(\w)'(\w)", r"\1\2", text)    
    
    text = text.replace("dont", "don't")
    text = text.replace("cant", "can't")
    text = text.replace("wont", "won't")
    text = text.replace("didnt", "didn't")
    text = text.replace("doesnt", "doesn't")
    text = text.replace("im", "i'm")
    text = text.replace("ive", "i've")
    text = text.replace("youre", "you're")
    text = text.replace("theyre", "they're")
    text = text.replace("thats", "that's")
    
    
    words_list = [
        w for w in text.split()
        if len(w) > 1  
    ]
    
    if len(words_list) < min_words:
        return None  
    
    return [" ".join(words_list)]


`clean_filtered_text`: For Sentiment Analysis\
pre-processing for filtered batch files


In [8]:
def clean_filtered_text(text, min_words=30):
   
    text = text.strip()
    
    
    text = replace_emojis(text)
    
  
    text = re.sub(r"http\S+|www\S+|https\S+", "", text)
    text = re.sub(r"/[ru]/\w+", "", text)
    text = re.sub(r"[@#]\w+", "", text)
    
    text = re.sub(r"(.)\1{3,}", r"\1\1", text)  
    
    text = text.replace("dont", "don't")
    text = text.replace("cant", "can't")
    text = text.replace("wont", "won't")
    text = text.replace("didnt", "didn't")
    text = text.replace("doesnt", "doesn't")
    text = text.replace("isnt", "isn't")
    text = text.replace("arent", "aren't")
    text = text.replace("wasnt", "wasn't")
    text = text.replace("werent", "weren't")
    text = text.replace("hasnt", "hasn't")
    text = text.replace("havent", "haven't")
    text = text.replace("hadnt", "hadn't")
    text = text.replace("wouldnt", "wouldn't")
    text = text.replace("shouldnt", "shouldn't")
    text = text.replace("couldnt", "couldn't")
    text = text.replace("im", "i'm")
    text = text.replace("ive", "i've")
    text = text.replace("youre", "you're")
    text = text.replace("youve", "you've")
    text = text.replace("theyre", "they're")
    text = text.replace("theyve", "they've")
    text = text.replace("its", "it's")
    text = text.replace("thats", "that's")
    text = text.replace("whats", "what's")
    text = text.replace("heres", "here's")
    text = text.replace("theres", "there's")
    
    text = re.sub(r"(\w)'(\w)", r"\1\2", text)  
    
    text = re.sub(r"[^\w\s.,!?'-]", "", text)
    
    text = re.sub(r"\s+", " ", text).strip()
    
    word_count = len(text.split())
    if word_count < min_words:
        return None  
    return [text]


`get_next_batch_number`
1. Dynamically identifies the next available batch number based on existing files.
2. Ensures sequential consistent naming like r_batch1.txt, r_batch2.txt, etc.

`append_log_to_csv`
1. Logs each uploaded batch's metadata to a CSV file (timestamp, file size, duration, etc.).

In [9]:
from datetime import datetime
import time
import threading
import subprocess
import os
import pandas as pd

def get_next_batch_number(folder, prefix="batch", suffix=".txt"):
    """Get the next batch number dynamically based on existing files."""
    existing = [f for f in os.listdir(folder) if f.startswith(prefix) and f.endswith(suffix)]
    if not existing:
        return 1
    numbers = []
    for f in existing:
        try:
            num = int(re.findall(r'\d+', f)[0])
            numbers.append(num)
        except:
            continue
    return max(numbers) + 1


def append_log_to_csv(log_entry):
    """Append single log entry to CSV"""
    log_csv_path = r"E:\Coding\BDA-PySpark\realtime-pipeline\logs\upload_logs.csv"
    
    os.makedirs(os.path.dirname(log_csv_path), exist_ok=True)
    
    if not os.path.exists(log_csv_path):
        df = pd.DataFrame([log_entry])
        df.to_csv(log_csv_path, index=False)
    else:
        df = pd.DataFrame([log_entry])
        df.to_csv(log_csv_path, mode='a', header=False, index=False)

# Reddit Streaming Thread (Heart of the pipeline)
1. Continuously fetches new comments from Reddit’s public “all” subreddit.
2. Cleans each comment using both pipelines (clean_raw_text and clean_filtered_text).
3. Buffers cleaned text until either:\
    (a). 50 comments are collected, or\
    (b). a time threshold (e.g., 70–90 seconds) is reached.
4. Writes data to batch files (one raw and one filtered).
5. Uploads the files to the corresponding HDFS directories.
6. Records details of each upload in a CSV log.
7. Runs continuously in a background thread (daemon=True).

In [10]:
buffer_raw, buffer_filtered = [], []
last_write_time_raw, last_write_time_filtered = time.time(), time.time()
batch_count_raw = get_next_batch_number(raw_batches, prefix="r_batch")
batch_count_filtered = get_next_batch_number(filtered_batches, prefix="f_batch")

def stream_reddit_comments():
    global buffer_raw, buffer_filtered
    global last_write_time_raw, last_write_time_filtered
    global batch_count_raw, batch_count_filtered

    while True:
        try:
            for comment in reddit.subreddit("all").stream.comments(skip_existing=True):
                text = comment.body.replace("\n", " ").strip()
                
                lines_raw = clean_raw_text(text)
                lines_filtered = clean_filtered_text(text)

                if lines_raw:
                    buffer_raw.extend(lines_raw)
                if lines_filtered:
                    buffer_filtered.extend(lines_filtered)

                current_time = time.time()

                if len(buffer_raw) >= 50 or (current_time - last_write_time_raw >= 90):
                    if buffer_raw:
                        start_time = time.time()
                        
                        filename = f"r_batch{batch_count_raw}.txt"
                        local_path = os.path.join(raw_batches, filename)
                        
                       
                        with open(local_path, "w", encoding="utf-8") as f:
                            f.write("\n\n".join(buffer_raw))  

                        
                       
                        file_size = os.path.getsize(local_path)
                        
                      
                        subprocess.run([
                            r"E:\hadoop\bin\hdfs.cmd", "dfs", "-put", "-f", local_path, hdfs_raw_dir
                        ], check=True)
                        
                        
                        duration = time.time() - start_time
                        
                        timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
                        print(f"[{timestamp}] Uploaded {filename} to HDFS folder: raw_batches | Lines: {len(buffer_raw)}")
                        
                        log_entry = {
                            "timestamp": timestamp,
                            "filename": filename,
                            "folder_type": "raw_batches",
                            "batch_number": batch_count_raw,
                            "lines_count": len(buffer_raw),
                            "file_size_bytes": file_size,
                            "file_size_kb": round(file_size / 1024, 2),
                            "processing_duration_sec": round(duration, 3),
                            "status": "success"
                        }
                        append_log_to_csv(log_entry)
                        
                        buffer_raw.clear()
                        last_write_time_raw = current_time
                        batch_count_raw += 1

                if len(buffer_filtered) >= 50 or (current_time - last_write_time_filtered >= 70):
                    if buffer_filtered:
                      
                        start_time = time.time()
                        
                        filename = f"f_batch{batch_count_filtered}.txt"
                        local_path = os.path.join(filtered_batches, filename)
                        
                       
                        with open(local_path, "w", encoding="utf-8") as f:
                            f.write("\n".join(buffer_filtered))
                        
                    
                        file_size = os.path.getsize(local_path)
                        
                       
                        subprocess.run([
                            r"E:\hadoop\bin\hdfs.cmd", "dfs", "-put", "-f", local_path, hdfs_filtered_dir
                        ], check=True)
                        
                        
                        duration = time.time() - start_time
                        
                        timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
                        print(f"[{timestamp}] Uploaded {filename} to HDFS folder: filtered_batches | Lines: {len(buffer_filtered)}")
                        
                        log_entry = {
                            "timestamp": timestamp,
                            "filename": filename,
                            "folder_type": "filtered_batches",
                            "batch_number": batch_count_filtered,
                            "lines_count": len(buffer_filtered),
                            "file_size_bytes": file_size,
                            "file_size_kb": round(file_size / 1024, 2),
                            "processing_duration_sec": round(duration, 3),
                            "status": "success"
                        }
                        append_log_to_csv(log_entry)
                        
                        buffer_filtered.clear()
                        last_write_time_filtered = current_time
                        batch_count_filtered += 1

                time.sleep(1)

        except Exception as e:
            timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            print(f"[{timestamp}] Stream Error: {e}")
            time.sleep(5)

reddit_thread = threading.Thread(target=stream_reddit_comments, daemon=True)
reddit_thread.start()


[2025-10-29 06:40:26] Uploaded f_batch1.txt to HDFS folder: filtered_batches | Lines: 14
[2025-10-29 06:40:46] Uploaded r_batch1.txt to HDFS folder: raw_batches | Lines: 6
[2025-10-29 06:41:37] Uploaded f_batch2.txt to HDFS folder: filtered_batches | Lines: 26
[2025-10-29 06:42:17] Uploaded r_batch2.txt to HDFS folder: raw_batches | Lines: 11
[2025-10-29 06:42:47] Uploaded f_batch3.txt to HDFS folder: filtered_batches | Lines: 13
[2025-10-29 06:43:48] Uploaded r_batch3.txt to HDFS folder: raw_batches | Lines: 10
[2025-10-29 06:44:02] Uploaded f_batch4.txt to HDFS folder: filtered_batches | Lines: 17
[2025-10-29 06:45:16] Uploaded f_batch5.txt to HDFS folder: filtered_batches | Lines: 17
[2025-10-29 06:45:23] Uploaded r_batch4.txt to HDFS folder: raw_batches | Lines: 10
[2025-10-29 06:46:26] Uploaded f_batch6.txt to HDFS folder: filtered_batches | Lines: 13
[2025-10-29 06:46:53] Uploaded r_batch5.txt to HDFS folder: raw_batches | Lines: 10
[2025-10-29 06:47:33] Uploaded f_batch7.txt to 

# SPARK STREAMING
1. Starts a SparkSession named “RedditRealTimeProcessor”.


In [11]:
os.environ["SPARK_HOME"] = r"E:\Coding\BDA-PySpark\spark-3.4.1-bin-hadoop3"
os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable
findspark.init()

spark = SparkSession.builder \
    .appName("RedditRealTimeProcessor") \
    .master("local[*]") \
    .getOrCreate()
spark.sparkContext.setLogLevel("WARN")



`spark.streams.awaitAnyTermination()` keeps the streaming session active and waits indefinitely for incoming data.

In [None]:
spark.streams.awaitAnyTermination()