### Initial Setup

In [None]:
%pip install emoji transformers torch nltk contractions

Note: you may need to restart the kernel to use updated packages.


In [13]:
# --- Import Libraries ---
import pandas as pd
import os
import re
import nltk
import emoji
import glob
import contractions
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
from nltk.tokenize import word_tokenize

In [3]:
# --- Initial Setup and NLTK Downloads ---
# Download necessary NLTK data. You only need to run this once.
print("Downloading NLTK data...")
nltk.download('punkt_tab')
nltk.download('stopwords')
nltk.download('wordnet')
print("NLTK data downloaded.")

Downloading NLTK data...
NLTK data downloaded.


[nltk_data] Downloading package punkt_tab to /root/nltk_data...
[nltk_data]   Package punkt_tab is already up-to-date!
[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


### Import Data

In [4]:
# Define the path to your processed chunks directory
PROCESSED_DIR = 'datasets/processed_chunks/'

### Drop Unwanted Column

In [5]:
# Get a list of all the chunked Parquet files
parquet_files = sorted(glob.glob(os.path.join(PROCESSED_DIR, 'processed_chunk_*.parquet')))

# Define the list of columns you want to drop
columns_to_drop = [
    'kind_comment', 'commentId', 'channelId_comment', 'videoId', 'authorId',
    'parentCommentId', 'kind_video', 'channelId_video',
    'title', 'description', 'tags', 'defaultLanguage', 'defaultAudioLanguage', 
    'contentDuration', 'topicCategories', 'cleaned_text'
]

# --- PART 1: Drop Columns from Each Parquet File ---
print(f"Found {len(parquet_files)} Parquet files to update.")
print("Starting to drop specified columns from each file...")

for i, file_path in enumerate(parquet_files):
    print(f"  > Processing chunk {i+1}/{len(parquet_files)}: {os.path.basename(file_path)}")
    
    # Load one chunk
    df_chunk = pd.read_parquet(file_path)
    
    # Drop the columns, using errors='ignore' in case a column is already missing
    df_chunk = df_chunk.drop(columns=columns_to_drop, errors='ignore')
    
    # Overwrite the original file with the updated DataFrame
    df_chunk.to_parquet(file_path, index=False)

print(f"\n✅ SUCCESS! All {len(parquet_files)} chunk files have been updated.")

Found 48 Parquet files to update.
Starting to drop specified columns from each file...
  > Processing chunk 1/48: processed_chunk_1.parquet


  > Processing chunk 2/48: processed_chunk_10.parquet
  > Processing chunk 3/48: processed_chunk_11.parquet
  > Processing chunk 4/48: processed_chunk_12.parquet
  > Processing chunk 5/48: processed_chunk_13.parquet
  > Processing chunk 6/48: processed_chunk_14.parquet
  > Processing chunk 7/48: processed_chunk_15.parquet
  > Processing chunk 8/48: processed_chunk_16.parquet
  > Processing chunk 9/48: processed_chunk_17.parquet
  > Processing chunk 10/48: processed_chunk_18.parquet
  > Processing chunk 11/48: processed_chunk_19.parquet
  > Processing chunk 12/48: processed_chunk_2.parquet
  > Processing chunk 13/48: processed_chunk_20.parquet
  > Processing chunk 14/48: processed_chunk_21.parquet
  > Processing chunk 15/48: processed_chunk_22.parquet
  > Processing chunk 16/48: processed_chunk_23.parquet
  > Processing chunk 17/48: processed_chunk_24.parquet
  > Processing chunk 18/48: processed_chunk_25.parquet
  > Processing chunk 19/48: processed_chunk_26.parquet
  > Processing chun

### Missing Value

In [6]:
# --- PART 2: Verify Columns and Check for Missing Values ---
print("\nVerifying columns and checking for missing values across all chunks...")

# Check the schema of the first file to confirm columns were dropped
if parquet_files:
    first_chunk_df = pd.read_parquet(parquet_files[0])
    print("\nColumns in the updated Parquet files:")
    print(first_chunk_df.columns.to_list())

# Initialize a dictionary to hold the total count of missing values for each column
total_missing_values = {}

# Loop through the updated files again to count NaNs
for i, file_path in enumerate(parquet_files):
    # Load the chunk
    df_chunk = pd.read_parquet(file_path)
    
    # Calculate missing values for the current chunk
    missing_in_chunk = df_chunk.isnull().sum()
    
    # Add the chunk's missing counts to the total
    for col, count in missing_in_chunk.items():
        if col not in total_missing_values:
            total_missing_values[col] = 0
        total_missing_values[col] += count

# Convert the dictionary to a pandas Series for nice printing
missing_values_series = pd.Series(total_missing_values)

print("\nMissing value count for every column across all files:")
print(missing_values_series)


Verifying columns and checking for missing values across all chunks...



Columns in the updated Parquet files:
['textOriginal', 'likeCount_comment', 'publishedAt_comment', 'updatedAt', 'quarter', 'publishedAt_video', 'viewCount', 'likeCount_video', 'favouriteCount', 'commentCount', 'comment_length', 'is_reply', 'video_topics', 'duration_seconds', 'textAvailable']

Missing value count for every column across all files:
textOriginal                0
likeCount_comment           0
publishedAt_comment         0
updatedAt                   0
quarter                     0
publishedAt_video        1529
viewCount               47557
likeCount_video        253707
favouriteCount          47557
commentCount            47557
comment_length              0
is_reply                    0
video_topics                0
duration_seconds        47557
textAvailable               0
dtype: int64


In [7]:
print(f"Found {len(parquet_files)} Parquet files to clean.")
print("Starting to remove rows with missing values from each file...")

# --- PART 1: Remove Rows with Missing Values ---
total_rows_removed = 0

for i, file_path in enumerate(parquet_files):
    # Load one chunk
    df_chunk = pd.read_parquet(file_path)
    
    # Get the number of rows before cleaning
    rows_before = len(df_chunk)
    
    # Drop any row that has at least one missing value (NaN)
    df_chunk = df_chunk.dropna()
    
    # Get the number of rows after cleaning
    rows_after = len(df_chunk)
    
    # Calculate how many rows were removed in this chunk
    rows_removed = rows_before - rows_after
    total_rows_removed += rows_removed
    
    print(f"  > Processing chunk {i+1}/{len(parquet_files)}: Removed {rows_removed} rows.")
    
    # Overwrite the original file with the cleaned DataFrame
    df_chunk.to_parquet(file_path, index=False)

print(f"\n✅ SUCCESS! All chunk files have been cleaned.")
print(f"A total of {total_rows_removed} rows with missing values were removed across all files.")

Found 48 Parquet files to clean.
Starting to remove rows with missing values from each file...


  > Processing chunk 1/48: Removed 5261 rows.
  > Processing chunk 2/48: Removed 5419 rows.
  > Processing chunk 3/48: Removed 5354 rows.
  > Processing chunk 4/48: Removed 5567 rows.
  > Processing chunk 5/48: Removed 5360 rows.
  > Processing chunk 6/48: Removed 5389 rows.
  > Processing chunk 7/48: Removed 5279 rows.
  > Processing chunk 8/48: Removed 5284 rows.
  > Processing chunk 9/48: Removed 5345 rows.
  > Processing chunk 10/48: Removed 5431 rows.
  > Processing chunk 11/48: Removed 5394 rows.
  > Processing chunk 12/48: Removed 5383 rows.
  > Processing chunk 13/48: Removed 5368 rows.
  > Processing chunk 14/48: Removed 5380 rows.
  > Processing chunk 15/48: Removed 5461 rows.
  > Processing chunk 16/48: Removed 5396 rows.
  > Processing chunk 17/48: Removed 5506 rows.
  > Processing chunk 18/48: Removed 5309 rows.
  > Processing chunk 19/48: Removed 5339 rows.
  > Processing chunk 20/48: Removed 5316 rows.
  > Processing chunk 21/48: Removed 5318 rows.
  > Processing chunk 2

In [8]:
# --- PART 2: Final Verification ---
print("\nVerifying that no missing values remain...")

# Initialize a counter for any remaining missing values
total_missing_count = 0

# Loop through the cleaned files to double-check
for i, file_path in enumerate(parquet_files):
    # Load the cleaned chunk
    df_chunk = pd.read_parquet(file_path)
    
    # Sum up all missing values in this chunk. The result should be 0.
    missing_in_chunk = df_chunk.isnull().sum().sum()
    total_missing_count += missing_in_chunk

print(f"\nFinal verification complete.")
print(f"Total missing values across all files: {total_missing_count}")

if total_missing_count == 0:
    print("✅ The entire dataset is now clean and contains no missing values.")
else:
    print("⚠️ Warning: Missing values were still found. Please review the process.")


Verifying that no missing values remain...



Final verification complete.
Total missing values across all files: 0
✅ The entire dataset is now clean and contains no missing values.


### Spam Detection

In [9]:
# --- Rule-Based Quality Filtering ---
print("\n--- Applying Rule-Based Quality Filtering to All Chunks ---")

# Define the minimum number of words a comment must have to be kept
MIN_WORD_COUNT = 3
total_rows_removed = 0

for i, file_path in enumerate(parquet_files):
    # Load one chunk
    df_chunk = pd.read_parquet(file_path)
    
    # Get the number of rows before filtering
    rows_before = len(df_chunk)
    
    # Calculate word count on the 'textOriginal' column
    if 'textOriginal' in df_chunk.columns:
        word_counts = df_chunk['textOriginal'].str.split().str.len()
        
        # Apply the filter
        df_chunk = df_chunk[word_counts >= MIN_WORD_COUNT].copy()
        
        # Get the number of rows after filtering
        rows_after = len(df_chunk)
        
        # Calculate how many rows were removed in this chunk
        rows_removed = rows_before - rows_after
        total_rows_removed += rows_removed
        
        print(f"  > Processing chunk {i+1}/{len(parquet_files)}: Removed {rows_removed} rows with fewer than {MIN_WORD_COUNT} words.")
        
        # Overwrite the original file with the filtered DataFrame
        df_chunk.to_parquet(file_path, index=False)
    else:
        print(f"  > Skipping chunk {i+1}/{len(parquet_files)}: 'textOriginal' column not found.")

print(f"\n✅ SUCCESS! Filtering is complete.")
print(f"A total of {total_rows_removed} rows were removed across all files.")


--- Applying Rule-Based Quality Filtering to All Chunks ---
  > Processing chunk 1/48: Removed 25506 rows with fewer than 3 words.
  > Processing chunk 2/48: Removed 25230 rows with fewer than 3 words.
  > Processing chunk 3/48: Removed 25101 rows with fewer than 3 words.
  > Processing chunk 4/48: Removed 25166 rows with fewer than 3 words.
  > Processing chunk 5/48: Removed 25461 rows with fewer than 3 words.
  > Processing chunk 6/48: Removed 25282 rows with fewer than 3 words.
  > Processing chunk 7/48: Removed 25417 rows with fewer than 3 words.
  > Processing chunk 8/48: Removed 25285 rows with fewer than 3 words.
  > Processing chunk 9/48: Removed 25386 rows with fewer than 3 words.
  > Processing chunk 10/48: Removed 24945 rows with fewer than 3 words.
  > Processing chunk 11/48: Removed 25422 rows with fewer than 3 words.
  > Processing chunk 12/48: Removed 25098 rows with fewer than 3 words.
  > Processing chunk 13/48: Removed 25336 rows with fewer than 3 words.
  > Processi

### Text Preprocessing

In [11]:
# --- Define the Preprocessing Function and Initialize Tools ---

# Initialize tools
lemmatizer = WordNetLemmatizer()
stop_words = set(stopwords.words('english'))

def clean_text(text):
    """
    Applies a series of preprocessing steps to clean a given text string.
    """
    if not isinstance(text, str) or text.strip() == "":
        return ""   # return empty string if not a valid string

    # Convert to lowercase
    text = text.lower()

    # Remove URLs
    text = re.sub(r'http\S+|www\S+|https\S+', '', text, flags=re.MULTILINE)

    # Remove HTML tags
    text = re.sub(r"<.*?>", "", text)

    # Remove mentions and hashtags
    text = re.sub(r"[@#]\w+", "", text)

    # Remove timestamps like 1:23 or 00:45
    text = re.sub(r"\b\d{1,2}:\d{2}\b", "", text)

    # Remove emojis
    text = emoji.replace_emoji(text, replace="")
    
    # Expand contractions (e.g., "don't" -> "do not")
    text = contractions.fix(text)

    # Remove non-alphabetic characters
    text = re.sub(r'[^a-zA-Z\s]', '', text)

    # Normalize elongated words ("soooo" -> "soo")
    text = re.sub(r"(.)\1{2,}", r"\1\1", text)

    # Tokenize the text
    tokens = word_tokenize(text)

    # Remove stopwords and lemmatize
    cleaned_tokens = [lemmatizer.lemmatize(word) for word in tokens if word not in stop_words]

    # Join tokens back into a string
    return " ".join(cleaned_tokens)

In [14]:
# --- Apply Cleaning Function to All Parquet Chunks ---

print(f"\nFound {len(parquet_files)} Parquet files to preprocess.")
print("Starting advanced text preprocessing on the 'textAvailable' column...")

for i, file_path in enumerate(parquet_files):
    print(f"  > Processing chunk {i+1}/{len(parquet_files)}: {os.path.basename(file_path)}")
    
    # Load one chunk
    df_chunk = pd.read_parquet(file_path)

    # Apply the cleaning function and store the result in a new 'cleaned_text' column
    # We use 'textAvailable' as the source, per your instructions.
    if 'textAvailable' in df_chunk.columns:
        df_chunk['cleaned_text'] = df_chunk['textAvailable'].apply(clean_text)
        
        # Overwrite the original file with the updated DataFrame
        df_chunk.to_parquet(file_path, index=False)
    else:
        print(f"  > SKIPPING: 'textAvailable' column not found in this chunk.")

print(f"\n✅ SUCCESS! Text preprocessing is complete for all {len(parquet_files)} files.")


Found 48 Parquet files to preprocess.
Starting advanced text preprocessing on the 'textAvailable' column...
  > Processing chunk 1/48: processed_chunk_1.parquet
  > Processing chunk 2/48: processed_chunk_10.parquet
  > Processing chunk 3/48: processed_chunk_11.parquet
  > Processing chunk 4/48: processed_chunk_12.parquet
  > Processing chunk 5/48: processed_chunk_13.parquet
  > Processing chunk 6/48: processed_chunk_14.parquet
  > Processing chunk 7/48: processed_chunk_15.parquet
  > Processing chunk 8/48: processed_chunk_16.parquet
  > Processing chunk 9/48: processed_chunk_17.parquet
  > Processing chunk 10/48: processed_chunk_18.parquet
  > Processing chunk 11/48: processed_chunk_19.parquet
  > Processing chunk 12/48: processed_chunk_2.parquet
  > Processing chunk 13/48: processed_chunk_20.parquet
  > Processing chunk 14/48: processed_chunk_21.parquet
  > Processing chunk 15/48: processed_chunk_22.parquet
  > Processing chunk 16/48: processed_chunk_23.parquet
  > Processing chunk 1

In [16]:
# --- Verification Step ---
print("\nVerifying the result by inspecting the first chunk...")

# Choose any chunk file to inspect, for example, the first one
file_to_check = os.path.join(PROCESSED_DIR, 'processed_chunk_1.parquet')

print(f"Inspecting file: {os.path.basename(file_to_check)}")

# Load just this single, small chunk
df_one_chunk = pd.read_parquet(file_to_check)

# 1. Check the columns
print("\n✅ Columns in the updated chunk:")
print(df_one_chunk.columns)

# 2. Check the top 5 rows
print("\n✅ Top 5 rows of the updated chunk:")
df_one_chunk.head()


Verifying the result by inspecting the first chunk...
Inspecting file: processed_chunk_1.parquet



✅ Columns in the updated chunk:
Index(['textOriginal', 'likeCount_comment', 'publishedAt_comment', 'updatedAt',
       'quarter', 'publishedAt_video', 'viewCount', 'likeCount_video',
       'favouriteCount', 'commentCount', 'comment_length', 'is_reply',
       'video_topics', 'duration_seconds', 'textAvailable', 'cleaned_text'],
      dtype='object')

✅ Top 5 rows of the updated chunk:


Unnamed: 0,textOriginal,likeCount_comment,publishedAt_comment,updatedAt,quarter,publishedAt_video,viewCount,likeCount_video,favouriteCount,commentCount,comment_length,is_reply,video_topics,duration_seconds,textAvailable,cleaned_text
0,PLEASE LESBIAN FLAG I BEG YOU \n\nYou would ro...,0,2023-08-15 21:48:52+00:00,2023-08-15 21:48:52+00:00,2023Q3,2023-08-15 21:22:52+00:00,9856583.0,307922.0,0.0,5901.0,49,False,"Lifestyle (sociology), Physical attractiveness",29.0,PLEASE LESBIAN FLAG I BEG YOU \n\nYou would ro...,please lesbian flag beg would rock tried hair ...
1,Apply mashed potato juice and mixed it with curd,0,2023-10-02 13:08:22+00:00,2023-10-02 13:08:22+00:00,2023Q4,2023-10-01 06:30:15+00:00,1148157.0,55043.0,0.0,164.0,48,True,"Lifestyle (sociology), Physical attractiveness",60.0,Apply mashed potato juice and mixed it with cu...,apply mashed potato juice mixed curd foundatio...
2,69 missed calls from mars👽,0,2024-05-31 12:03:12+00:00,2024-05-31 12:03:12+00:00,2024Q2,2023-03-05 17:36:18+00:00,14590307.0,313755.0,0.0,4226.0,26,False,"Lifestyle (sociology), Physical attractiveness",20.0,69 missed calls from mars👽 How To Make Small E...,missed call mar make small eye look bigger mak...
3,you look like raven from phenomena raven no cap,0,2020-02-15 22:28:44+00:00,2020-02-15 22:28:44+00:00,2020Q1,2020-01-23 21:00:00+00:00,12347504.0,504342.0,0.0,19920.0,47,False,"Lifestyle (sociology), Physical attractiveness",1025.0,you look like raven from phenomena raven no ca...,look like raven phenomenon raven cap black gir...
4,Sahi disha me ja ja raha india ka Future..,0,2021-09-03 06:51:48+00:00,2021-09-03 06:51:48+00:00,2021Q3,2021-08-03 13:00:38+00:00,11552362.0,205104.0,0.0,616.0,42,False,"Lifestyle (sociology), Physical attractiveness",16.0,Sahi disha me ja ja raha india ka Future.. Kau...,sahi disha ja ja raha india ka future kaun kau...


### Final Setup

In [17]:
# --- Define Paths ---

# Define the path for the final output
OUTPUT_DIR = 'datasets/'
OUTPUT_CSV_FILE = os.path.join(OUTPUT_DIR, 'cleaned_merged_data.csv')

# --- Create Output Directory ---
# Ensure the destination folder exists before we try to save to it
os.makedirs(OUTPUT_DIR, exist_ok=True)
print(f"Output directory '{OUTPUT_DIR}' is ready.")

# --- Load and Merge All Parquet Files ---

# Get a list of all the chunked Parquet files
parquet_files = sorted(glob.glob(os.path.join(PROCESSED_DIR, 'processed_chunk_*.parquet')))

if not parquet_files:
    print("\nError: No Parquet files found in the processed directory.")
    print("Please ensure the previous preprocessing steps ran successfully.")
else:
    print(f"\nFound {len(parquet_files)} Parquet files to merge.")
    print("Starting the merge process...")

    # Create a list to hold each loaded DataFrame chunk
    all_chunks_list = []

    # Loop through each file path, load the file, and add it to our list
    for i, file_path in enumerate(parquet_files):
        print(f"  > Loading chunk {i+1}/{len(parquet_files)}: {os.path.basename(file_path)}")
        df_chunk = pd.read_parquet(file_path)
        all_chunks_list.append(df_chunk)

    # Concatenate all the DataFrames in the list into one single DataFrame
    print("\nConcatenating all chunks into a single DataFrame...")
    final_df = pd.concat(all_chunks_list, ignore_index=True)
    
    print("Concatenation complete.")
    print(f"The final dataset has {len(final_df)} rows.")

    # --- Save the Merged DataFrame to a CSV File ---
    print(f"\nSaving the final DataFrame to: {OUTPUT_CSV_FILE}")
    
    # Save to CSV, index=False is important to avoid writing row numbers
    final_df.to_csv(OUTPUT_CSV_FILE, index=False)

    print(f"\n✅ SUCCESS! The merged CSV file has been created.")

Output directory 'datasets/' is ready.

Found 48 Parquet files to merge.
Starting the merge process...
  > Loading chunk 1/48: processed_chunk_1.parquet
  > Loading chunk 2/48: processed_chunk_10.parquet
  > Loading chunk 3/48: processed_chunk_11.parquet
  > Loading chunk 4/48: processed_chunk_12.parquet
  > Loading chunk 5/48: processed_chunk_13.parquet
  > Loading chunk 6/48: processed_chunk_14.parquet
  > Loading chunk 7/48: processed_chunk_15.parquet
  > Loading chunk 8/48: processed_chunk_16.parquet
  > Loading chunk 9/48: processed_chunk_17.parquet
  > Loading chunk 10/48: processed_chunk_18.parquet
  > Loading chunk 11/48: processed_chunk_19.parquet
  > Loading chunk 12/48: processed_chunk_2.parquet
  > Loading chunk 13/48: processed_chunk_20.parquet
  > Loading chunk 14/48: processed_chunk_21.parquet
  > Loading chunk 15/48: processed_chunk_22.parquet
  > Loading chunk 16/48: processed_chunk_23.parquet
  > Loading chunk 17/48: processed_chunk_24.parquet
  > Loading chunk 18/48: