In [4]:
import pandas as pd
import os

from pyspark.sql import SparkSession
from functools import reduce
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, isnan, when, count
from pyspark.ml.feature import Imputer, StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

In [11]:
# Create a SparkSession
spark = SparkSession.builder \
    .appName("ProductReviews") \
    .getOrCreate()

schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("product_id", StringType(), True),
    StructField("product_parent", IntegerType(), True),
    StructField("product_title", StringType(), True),
    StructField("vine", StringType(), True),
    StructField("verified_purchase", StringType(), True),
    StructField("review_headline", StringType(), True),
    StructField("review_body", StringType(), True),
    StructField("review_date", StringType(), True),
    StructField("marketplace_id", IntegerType(), True),
    StructField("product_category_id", IntegerType(), True),
    StructField("label", StringType(), True)
])

# List of file paths for training data
data_folder = 'data'
data_files = 8

train_files = []

for file in range(1, data_files+1):
    train_files.append(f'{data_folder}/train-{file}.csv')

# Read each CSV file into a PySpark DataFrame
dfs = [spark.read.csv(file, header=True, schema=schema) for file in train_files]

# Merge all DataFrames into one
merged_df = reduce(DataFrame.unionByName, dfs)

# Display the first few rows of the merged DataFrame
merged_df.show()

24/03/07 11:30:02 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , product_id, product_parent, product_title, vine, verified_purchase, review_headline, review_body, review_date, marketplace_id, product_category_id, label
 Schema: id, product_id, product_parent, product_title, vine, verified_purchase, review_headline, review_body, review_date, marketplace_id, product_category_id, label
Expected: id but found: 
CSV file: file:///home/jovyan/work/data/train-1.csv


+---+----------+--------------+--------------------+----+-----------------+--------------------+--------------------+--------------------+--------------+-------------------+----------+
| id|product_id|product_parent|       product_title|vine|verified_purchase|     review_headline|         review_body|         review_date|marketplace_id|product_category_id|     label|
+---+----------+--------------+--------------------+----+-----------------+--------------------+--------------------+--------------------+--------------+-------------------+----------+
|  9|B001N2MZT8|     903886718|    Green Zone [DVD]|   N|                Y|          green zone|I found at first ...|          2010-11-15|             1|                  3|     False|
| 11|B00GCBVE0Q|     282740618|Le secret de Gree...|   N|                Y|                null|J'ai aimé cette h...|          2014-11-23|             2|                  3|     False|
| 19|1423165691|     883799517|A Disney Sketchbook.|   N|                N|

                                                                                

In [12]:
# Delete duplicate rows
duplicate_rows = merged_df.count() - merged_df.dropDuplicates().count()
print(f"Number of duplicate rows: {duplicate_rows}")

merged_df.dropDuplicates()

24/03/07 11:30:21 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , product_id, product_parent, product_title, vine, verified_purchase, review_headline, review_body, review_date, marketplace_id, product_category_id, label
 Schema: id, product_id, product_parent, product_title, vine, verified_purchase, review_headline, review_body, review_date, marketplace_id, product_category_id, label
Expected: id but found: 
CSV file: file:///home/jovyan/work/data/train-5.csv
24/03/07 11:30:21 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , product_id, product_parent, product_title, vine, verified_purchase, review_headline, review_body, review_date, marketplace_id, product_category_id, label
 Schema: id, product_id, product_parent, product_title, vine, verified_purchase, review_headline, review_body, review_date, marketplace_id, product_category_id, label
Expected: id but found: 
CSV file: file:///home/jovyan/work/data/train-3.csv
24/03/07 11:30

Number of duplicate rows: 0


DataFrame[id: int, product_id: string, product_parent: int, product_title: string, vine: string, verified_purchase: string, review_headline: string, review_body: string, review_date: string, marketplace_id: int, product_category_id: int, label: string]

In [17]:
# Ensuring categorical values can only have correct values
merged_df = merged_df.withColumn("vine", when(merged_df["vine"].isin('Y', 'N'), merged_df["vine"]).otherwise(None))
merged_df = merged_df.withColumn("verified_purchase", when(merged_df["verified_purchase"].isin('Y', 'N'), merged_df["verified_purchase"]).otherwise(None))
merged_df = merged_df.withColumn("label", when(merged_df["label"].isin('True', 'False'), merged_df["label"]).otherwise(None))

In [21]:
# Select columns with categorical values
columns_to_check = ["vine", "verified_purchase", "label"]

# Remove rows where specified columns contain null or NaN values
merged_df = merged_df.filter(
    (col(columns_to_check[0]).isNotNull()) &
    (col(columns_to_check[1]).isNotNull()) &
    (col(columns_to_check[2]).isNotNull())
)

In [22]:
# Count missing values in each column
missing_value_counts = merged_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in merged_df.columns])

# Display the count of missing values for each column
missing_value_counts.show()

24/03/07 11:47:17 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , product_id, product_parent, product_title, vine, verified_purchase, review_headline, review_body, review_date, marketplace_id, product_category_id, label
 Schema: id, product_id, product_parent, product_title, vine, verified_purchase, review_headline, review_body, review_date, marketplace_id, product_category_id, label
Expected: id but found: 
CSV file: file:///home/jovyan/work/data/train-7.csv
24/03/07 11:47:17 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , product_id, product_parent, product_title, vine, verified_purchase, review_headline, review_body, review_date, marketplace_id, product_category_id, label
 Schema: id, product_id, product_parent, product_title, vine, verified_purchase, review_headline, review_body, review_date, marketplace_id, product_category_id, label
Expected: id but found: 
CSV file: file:///home/jovyan/work/data/train-3.csv
24/03/07 11:47

+---+----------+--------------+-------------+----+-----------------+---------------+-----------+-----------+--------------+-------------------+-----+
| id|product_id|product_parent|product_title|vine|verified_purchase|review_headline|review_body|review_date|marketplace_id|product_category_id|label|
+---+----------+--------------+-------------+----+-----------------+---------------+-----------+-----------+--------------+-------------------+-----+
|  0|         0|             0|           64|   0|                0|            636|          0|          1|             0|                  0|    0|
+---+----------+--------------+-------------+----+-----------------+---------------+-----------+-----------+--------------+-------------------+-----+



                                                                                

In [23]:
merged_df.groupBy("vine").count().show()

[Stage 50:>                                                         (0 + 1) / 1]

+----+-----+
|vine|count|
+----+-----+
|   N| 8549|
|   Y|   14|
+----+-----+



                                                                                

In [24]:
merged_df.groupBy("verified_purchase").count().show()

                                                                                

+-----------------+-----+
|verified_purchase|count|
+-----------------+-----+
|                Y| 5953|
|                N| 2610|
+-----------------+-----+



In [25]:
merged_df.groupBy("label").count().show()



+-----+-----+
|label|count|
+-----+-----+
|False| 4991|
| True| 3572|
+-----+-----+



                                                                                