# Data Analysis and PreProcessing

## Setting up Environment

In [1]:
from IPython.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))
import warnings
warnings.filterwarnings('ignore')
import os
os.environ['PYSPARK_PYTHON'] = '/home/subha/miniconda3/bin/python'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/home/subha/miniconda3/bin/python'

## Starting Pyspark with Master 10G Memory

In [2]:
import findspark
findspark.init('/home/subha/aiwork/spark')
# Initializing the spark context
#import pyspark.pandas as ps
#pdf_incidents = df_incidents.to_pandas_on_spark()
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, lower, regexp_replace
from pyspark.sql.types import StringType, ArrayType
import nltk
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
import re
from pyspark.sql.functions import *

# Configure Spark to use multiple threads
spark = SparkSession.builder.appName("Amazon Reviews Analysis")\
    .master("local[*]")\
    .config("spark.executorEnv.PYSPARK_PYTHON", "/home/subha/miniconda3/bin/python")\
    .config("spark.driver.maxResultSize","10g")\
    .config("spark.executor.instances", "4")\
    .config("spark.executor.cores", "2")\
    .config("spark.executor.memory", "2g") \
    .getOrCreate()

25/02/23 14:20:38 WARN Utils: Your hostname, neoshiva resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/02/23 14:20:38 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/02/23 14:20:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Gathering Train and Test Data

### Reading Test and Train Datasets

In [3]:
df_test = spark.read.text("amazon_train_dataset/test.ft.txt")
df_train = spark.read.text("amazon_train_dataset/train.ft.txt")

### Shaping the Datasets (Feature Split)

In [4]:
# Show the first few rows to understand the data format
def shaping_datasets(df):
    leng = len("__label__1")
    # Split the text into two parts: label and review text
    df_split = df.withColumn("label", split(col("value"), " ").getItem(0))\
    .withColumn("review_text", substring(df.value,12,10000))  # get review text from the second word onward)
    
    # Map label to sentiment: "negative" for __label__1, "positive" for __label__2
    # df_sentiment_reviews = df_split.withColumn(
    #     "sentiment",
    #     when(col("label") == "__label__1", "negative")
    #     .when(col("label") == "__label__2", "positive")
    #     .otherwise("unknown")
    # )
    df_sentiment_reviews = df_split.withColumn(
        "sentiment",
        when(col("label") == "__label__1", 0)
        .when(col("label") == "__label__2", 1)
        .otherwise("unknown")
    )
    
    # Select only the relevant columns: sentiment and review text
    df_sentiment_reviews = df_sentiment_reviews.select("sentiment", "review_text")
    
    return df_sentiment_reviews

## Calling the Function
df_test_p = shaping_datasets(df_test)
df_train_p = shaping_datasets(df_train)

### Display and Counts

In [5]:
display(df_train_p.show(5))

+---------+--------------------+
|sentiment|         review_text|
+---------+--------------------+
|        1|Stuning even for ...|
|        1|The best soundtra...|
|        1|Amazing!: This so...|
|        1|Excellent Soundtr...|
|        1|Remember, Pull Yo...|
+---------+--------------------+
only showing top 5 rows



None

In [6]:
df_train_p.count()

                                                                                

3600000

In [7]:
df_test_p.show(10)

+---------+--------------------+
|sentiment|         review_text|
+---------+--------------------+
|        1|Great CD: My love...|
|        1|One of the best g...|
|        0|Batteries died wi...|
|        1|works fine, but M...|
|        1|Great for the non...|
|        0|DVD Player crappe...|
|        0|Incorrect Disc: I...|
|        0|DVD menu select p...|
|        1|Unique Weird Orie...|
|        0|Not an "ultimate ...|
+---------+--------------------+
only showing top 10 rows



In [8]:
df_test_p.count()

400000

## Natural Language Preprocessing

### NLTK resources

In [8]:
# Download required NLTK resources
nltk.download('punkt')
nltk.download('stopwords')
nltk.download('wordnet')
nltk.download('averaged_perceptron_tagger')

[nltk_data] Downloading package punkt to /home/subha/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package stopwords to /home/subha/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package wordnet to /home/subha/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     /home/subha/nltk_data...
[nltk_data]   Package averaged_perceptron_tagger is already up-to-
[nltk_data]       date!


True

### Lemmetizer and Stop Words

In [9]:
lemmatizer = WordNetLemmatizer()
stop_words = set(stopwords.words('english'))

### Clean, Tokenize and UDF creation

In [36]:
def clean_text(text):
    """
    Clean text by removing special characters, numbers, and converting to lowercase
    """
    if not text:
        return text
    
    # Convert to lowercase
    text = text.lower()
    
    # Remove special characters and numbers
    text = re.sub(r'[^a-zA-Z\s]', '', text)
    
    # Remove extra whitespace
    text = re.sub(r'\s+', ' ', text).strip()
    
    return text

def tokenize_and_preprocess(text):
    """
    Tokenize, remove stopwords, and lemmatize text
    """
    if not text:
        return []
    
    # Tokenize
    tokens = word_tokenize(text)
    
    # Remove stopwords and lemmatize
    # tokens = [lemmatizer.lemmatize(token) for token in tokens 
    #          if token.lower() not in stop_words and len(token) > 2]
    tokens = [token for token in tokens 
             if token.lower() not in stop_words and len(token) > 2]
    
    return tokens
    
# Register UDFs
clean_text_udf = udf(clean_text, StringType())
tokenize_and_preprocess_udf = udf(tokenize_and_preprocess, ArrayType(StringType()))

### NLTK Review Process Method

In [11]:
def process_reviews_with_nltk(reviews_df):
    """
    Process reviews using NLTK for text cleaning and preprocessing
    
    Args:
        reviews_df: DataFrame with 'sentiment' and 'reviews' columns
    Returns:
        DataFrame with processed text
    """
    
    # Apply text cleaning
    processed_df = reviews_df.withColumn(
        "cleaned_text",
        clean_text_udf(col("review_text"))
    )
    
    # Apply tokenization, stopword removal, and lemmatization
    processed_df = processed_df.withColumn(
        "processed_tokens",
        tokenize_and_preprocess_udf(col("cleaned_text"))
    )
    
    # Convert tokens back to text
    processed_df = processed_df.withColumn(
        "processed_text",
        udf(lambda x: ' '.join(x) if x else '', StringType())(col("processed_tokens"))
    )
    
    return processed_df

## Executing the Preprocessing Steps

### Test and Train Spark DF created

In [37]:
test_processed_reviews_df = process_reviews_with_nltk(df_test_p)
train_processed_reviews_df = process_reviews_with_nltk(df_train_p)
test_processed_reviews_df.printSchema()

root
 |-- sentiment: string (nullable = false)
 |-- review_text: string (nullable = true)
 |-- cleaned_text: string (nullable = true)
 |-- processed_tokens: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- processed_text: string (nullable = true)



### Writing to Output Location

In [38]:
test_processed_reviews_df.select("cleaned_text","sentiment").coalesce(1).write.mode("overwrite").parquet("output/cleandata/test_data")

                                                                                

In [39]:
train_processed_reviews_df.select("cleaned_text","sentiment").coalesce(1).write.mode("overwrite").parquet("output/cleandata/train_data")

                                                                                

### Creating Samples for hyper Parameter Tuning

#### Train Data

In [40]:
train_processed_reviews_df.createOrReplaceTempView("train_processed_reviews_df")
test_processed_reviews_df.createOrReplaceTempView("test_processed_reviews_df")

In [41]:
sql = """
select * 
from train_processed_reviews_df 
where sentiment = 0  limit 180000
"""
tr_df_1 = spark.sql(sql)

sql ="""
select * 
from train_processed_reviews_df 
where sentiment = 1  limit 180000
"""
tr_df_2 = spark.sql(sql)

u_tr_df = tr_df_1.union(tr_df_2)

In [42]:
u_tr_df.select("cleaned_text","sentiment").coalesce(1).write.mode("overwrite").parquet("output/cleandata/train_data_sample")

                                                                                

#### Test and Validation

In [43]:
sql = """
select * 
from test_processed_reviews_df 
where sentiment = 0  limit 20000
"""
te_df_1 = spark.sql(sql)
val_df_1, test_df_1 = te_df_1.randomSplit([0.5, 0.5]) 

sql1 ="""
select * 
from test_processed_reviews_df 
where sentiment = 1  limit 20000
"""
te_df_2 = spark.sql(sql1)
val_df_2, test_df_2 = te_df_2.randomSplit([0.5, 0.5]) 

u_val_te_df = val_df_1.union(val_df_2)
u_test_te_df = test_df_1.union(test_df_2)

In [44]:
print(val_df_1.count(),val_df_2.count(),test_df_1.count(),test_df_2.count())



10032 9913 9968 10087


                                                                                

In [45]:
u_val_te_df.select("cleaned_text","sentiment").coalesce(1).write.mode("overwrite").parquet("output/cleandata/val_data_sample")
u_test_te_df.select("cleaned_text","sentiment").coalesce(1).write.mode("overwrite").parquet("output/cleandata/test_data_sample")

                                                                                

## Appendix

In [46]:
test_processed_reviews_df.show()

+---------+--------------------+--------------------+--------------------+--------------------+
|sentiment|         review_text|        cleaned_text|    processed_tokens|      processed_text|
+---------+--------------------+--------------------+--------------------+--------------------+
|        1|Great CD: My love...|great cd my lovel...|[great, lovely, p...|great lovely pat ...|
|        1|One of the best g...|one of the best g...|[one, best, game,...|one best game mus...|
|        0|Batteries died wi...|batteries died wi...|[batteries, died,...|batteries died wi...|
|        1|works fine, but M...|works fine but ma...|[works, fine, mah...|works fine maha e...|
|        1|Great for the non...|great for the non...|[great, nonaudiop...|great nonaudiophi...|
|        0|DVD Player crappe...|dvd player crappe...|[dvd, player, cra...|dvd player crappe...|
|        0|Incorrect Disc: I...|incorrect disc i ...|[incorrect, disc,...|incorrect disc lo...|
|        0|DVD menu select p...|dvd menu

In [47]:
test_processed_reviews_df.createOrReplaceTempView("test_processed_reviews_df")

In [17]:

spark.sql("select count(1),sentiment from test_processed_reviews_df group by sentiment ").show()

+--------+---------+
|count(1)|sentiment|
+--------+---------+
|  200000|        0|
|  200000|        1|
+--------+---------+



In [18]:
train_processed_reviews_df.createOrReplaceTempView("train_processed_reviews_df")

In [19]:
spark.sql("select count(1),sentiment from train_processed_reviews_df group by sentiment ").show()

[Stage 13:=>                                                      (1 + 31) / 32]

+--------+---------+
|count(1)|sentiment|
+--------+---------+
| 1800000|        0|
| 1800000|        1|
+--------+---------+



                                                                                

In [23]:
sql = """
select * 
from train_processed_reviews_df 
where sentiment = 0  limit 180000
"""
tr_df_1 = spark.sql(sql)

"""
select * 
from train_processed_reviews_df 
where sentiment = 1  limit 180000
"""
tr_df_2 = spark.sql(sql)

u_tr_df = tr_df_1.union(tr_df_2)

In [24]:
u_tr_df.count()

                                                                                

360000

In [48]:
u_tr_df.show()



+---------+--------------------+--------------------+--------------------+--------------------+
|sentiment|         review_text|        cleaned_text|    processed_tokens|      processed_text|
+---------+--------------------+--------------------+--------------------+--------------------+
|        0|Buyer beware: Thi...|buyer beware this...|[buyer, beware, s...|buyer beware self...|
|        0|The Worst!: A com...|the worst a compl...|[worst, complete,...|worst complete wa...|
|        0|Oh please: I gues...|oh please i guess...|[please, guess, r...|please guess roma...|
|        0|Awful beyond beli...|awful beyond beli...|[awful, beyond, b...|awful beyond beli...|
|        0|Don't try to fool...|dont try to fool ...|[dont, try, fool,...|dont try fool fak...|
|        0|sizes recomended ...|sizes recomended ...|[sizes, recomende...|sizes recomended ...|
|        0|mens ultrasheer: ...|mens ultrasheer t...|[mens, ultrasheer...|mens ultrasheer m...|
|        0|Another Abysmal D...|another 

                                                                                

In [31]:
sql = """
select * 
from test_processed_reviews_df 
where sentiment = 0  limit 20000
"""
te_df_1 = spark.sql(sql)
val_df_1, test_df_1 = te_df_1.randomSplit([0.5, 0.5]) 

"""
select * 
from test_processed_reviews_df 
where sentiment = 1  limit 20000
"""
te_df_2 = spark.sql(sql)
val_df_2, test_df_2 = te_df_2.randomSplit([0.5, 0.5]) 

u_val_te_df = val_df_1.union(val_df_1)
u_test_te_df = test_df_1.union(test_df_1)

In [39]:
u_val_te_df.show()



+---------+--------------------+--------------------+--------------------+--------------------+
|sentiment|         review_text|        cleaned_text|    processed_tokens|      processed_text|
+---------+--------------------+--------------------+--------------------+--------------------+
|        0|!!!Not enough to ...|not enough to get...|[enough, get, goo...|enough get good r...|
|        0|!!!PLEASE NOTE!!!...|please note pleas...|[please, note, pl...|please note pleas...|
|        0|" Smile"............|smileit never rea...|[smileit, never, ...|smileit never rea...|
|        0|" this is not it....|this is not it th...|[barto, reviewer,...|barto reviewer ac...|
|        0|"And the Band Pla...|and the band play...|[band, played, pu...|band played pulle...|
|        0|"Baby Dry" but Co...|baby dry but cove...|[baby, dry, cover...|baby dry covered ...|
|        0|"Be Afraid, Be Ve...|be afraid be very...|[afraid, afraid, ...|afraid afraid def...|
|        0|"Beware! Take car...|beware t

                                                                                

In [49]:
u_test_te_df.show()



+---------+--------------------+--------------------+--------------------+--------------------+
|sentiment|         review_text|        cleaned_text|    processed_tokens|      processed_text|
+---------+--------------------+--------------------+--------------------+--------------------+
|        0|!!!! DO NOT PURCH...|do not purchase t...|[purchase, thermo...|purchase thermome...|
|        0|" Smile"............|smileit never rea...|[smileit, never, ...|smileit never rea...|
|        0|" this is not it....|this is not it th...|[barto, reviewer,...|barto reviewer ac...|
|        0|"1" star, because...|star because of t...|[star, book, good...|star book good fi...|
|        0|"And the Band Pla...|and the band play...|[band, played, pu...|band played pulle...|
|        0|"Baby Dry" but Co...|baby dry but cove...|[baby, dry, cover...|baby dry covered ...|
|        0|"Be Afraid, Be Ve...|be afraid be very...|[afraid, afraid, ...|afraid afraid def...|
|        0|"Call me disapoin...|call me 

                                                                                

In [30]:
split1, split2 = te_df_1.randomSplit([0.5, 0.5]) 

In [None]:
sql = """
select * 
from test_processed_reviews_df 
where sentiment = 0  limit 10000
"""
te_df_1 = spark.sql(sql)

"""
select * 
from test_processed_reviews_df 
where sentiment = 1  limit 10000
"""
te_df_2 = spark.sql(sql)

u_te_df = te_df_1.union(te_df_2)

In [32]:
u_val_te_df.createOrReplaceTempView("test")

In [33]:
spark.sql("select count(1),sentiment from test group by sentiment").show()



+--------+---------+
|count(1)|sentiment|
+--------+---------+
|    9976|        0|
|   10110|        1|
+--------+---------+



                                                                                