In [1]:
import dask.dataframe as dd
import re
from dask.diagnostics import ProgressBar
import nltk
from nltk.corpus import stopwords

nltk.download('stopwords')
stop_words = set(stopwords.words('english'))

# Load Parquet
df = dd.read_parquet('D:/jn/Amazon_cs_proj/Issue_categor_proj_Phase1/merged_parquet')

# Drop rows where key fields are missing
required_fields = ['review_body', 'review_headline', 'star_rating', 'product_category']
df = df.dropna(subset=required_fields)

# Fill or drop missing optional fields
df['product_title'] = df['product_title'].fillna('unknown_title')
df['verified_purchase'] = df['verified_purchase'].fillna('N')
df['vine'] = df['vine'].fillna('N')
df['review_date'] = df['review_date'].fillna('1900-01-01')

# Define text cleaning function
def clean_text(text):
    if not isinstance(text, str):
        return ''
    text = text.lower()
    text = re.sub(r'<.*?>', ' ', text)  # remove HTML
    text = re.sub(r'http\S+', '', text)  # remove URLs
    text = re.sub(r'[^a-z\s]', '', text)  # remove special chars
    text = ' '.join([word for word in text.split() if word not in stop_words])
    return text

# Apply cleaning to multiple text columns
df['cleaned_review_body'] = df['review_body'].map(clean_text, meta=('review_body', 'str'))
df['cleaned_review_headline'] = df['review_headline'].map(clean_text, meta=('review_headline', 'str'))
df['cleaned_product_title'] = df['product_title'].map(clean_text, meta=('product_title', 'str'))

# Save cleaned data to new Parquet directory
output_dir = 'D:/jn/Amazon_cs_proj/Issue_categor_proj_Phase1/cleaned_full_parquet'

with ProgressBar():
    df.to_parquet(output_dir, write_index=False)

print("✅ Full dataset cleaned and saved to new Parquet folder.")


[nltk_data] Downloading package stopwords to
[nltk_data]     C:\Users\DELL\AppData\Roaming\nltk_data...
[nltk_data]   Unzipping corpora\stopwords.zip.


[########################################] | 100% Completed | 67m 22s
✅ Full dataset cleaned and saved to new Parquet folder.
