# Student Details

***

**Name:** Hoai Nhan Nguyen <br>
**Student Number:** sba24098 <br>
**Course:** Higher Diploma in Science in Artificial Intelligence Applications

***

# Data Cleaning and Transformation


**Importing Apache Spark Libraries.**

In [1]:
# Importing libraries 
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col, sum, when, regexp_replace, round, udf

**Creating a new Spark Session.**

In [3]:
# Creating new SparkSession
spark = SparkSession.builder \
    .appName("MySparkApp") \
    .master("local[*]") \
    .config("spark.driver.host", "localhost") \
    .getOrCreate()

**Reading the Amazon-Products.csv in Hadoop.**

In [None]:
# Reading Amazon-Products.csv in Hadoop while applying options to read it correctly
df = spark.read.option("header", "true") \
               .option("inferSchema", "true") \
               .option("multiLine", "true") \
               .option("escape", "\"") \
               .option("quote", "\"") \
               .csv("hdfs://localhost:9000/user1/big_data_ca1/data/Amazon-Products.csv")

**Understanding the structure of the Spark Dataframe.**

In [None]:
# Reviewing the schema of the Spark Dataframe
df.printSchema()

In [None]:
# Dropping the columns that are not required for this task 
df = df.drop("_c0","image","link")
df.printSchema()

In [None]:
# Reviewing the rows of the Spark dataframe 
df.show()

In [None]:
# Get the number of rows
num_rows = df.count()
# Get the number of columns
num_columns = len(df.columns)

# Printing the shape (rows, columns)
print(f"Shape of the DataFrame: ({num_rows}, {num_columns})")


**Checking the null values in the Spark Dataframe.**

In [None]:
# Counting the null or empty values per column
df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]).show()

**Handling Null values in the Spark Dataframe.**

In [None]:
# Filtering out rows where both 'discount_price' and 'actual_price' are null.
df_clean = df.filter(~(col("discount_price").isNull() & col("actual_price").isNull()))

# Filling the null values for ratings, no_of_ratings, discount_price and actual_price to 0
df_clean = df_clean.fillna({"ratings": 0,"no_of_ratings": 0, "discount_price":0, "actual_price":0})

In [None]:
# Counting the null or empty values per column
df_clean.select([
    sum(when(col(column_name).isNull() | (col(column_name) == ""), 1).otherwise(0)).alias(column_name + "_nulls")
    for column_name in df.columns
]).show()

**Handling duplicate rows in the Spark Dataframe.**

In [None]:
# Removing the duplicate rows based on name from the Spark DataFrame 
df_clean = df_clean.dropDuplicates(["name"])


**Cleaning the ratings and no_of_rating columns.**

In [None]:
# Ensuring the rows where the 'ratings' column contains valid numbers (integers or decimals).
df_clean = df_clean.filter(F.col('ratings').rlike(r'^[0-9]*\.?[0-9]+$'))

# Ensuring ratings are between 0 and 5.0
df_clean = df_clean.filter((F.col('ratings') >= 0) & (F.col('ratings') <= 5.0))


In [None]:
# Removing commas from 'no_of_ratings'
df_clean = df_clean.withColumn("no_of_ratings", regexp_replace(col("no_of_ratings"), ",", ""))

# Ensuring the rows where the 'no_of_ratings' column contains valid numbers (integers).
df_clean = df_clean.filter(col("no_of_ratings").rlike("^[0-9]+$"))

**Converting currency from Indian Rupee to Euro for the actual_price and discount_price columns.**

In [None]:
# Removing ₹, commas and convert to double
df_converted = df_clean.withColumn(
    "actual_price",
    regexp_replace(col("actual_price"), "[₹,]", "").cast("double")  
)

# Converting INR to EUR (using conversion rate: 1 INR = 0.011 EUR)
conversion_rate = 0.011
df_converted = df_converted.withColumn(
    "actual_price",
    round(col("actual_price") * conversion_rate, 2)
)

In [None]:
# Removing ₹, commas and convert to double
df_converted = df_converted.withColumn(
    "discount_price",
    regexp_replace(col("discount_price"), "[₹,]", "").cast("double") 
)

# Converting INR to EUR (using conversion rate: 1 INR = 0.011 EUR)
conversion_rate = 0.011
df_converted = df_converted.withColumn(
    "discount_price",
    round(col("discount_price") * conversion_rate, 2) 
)

**Saving converted Spark Dataframe as a CSV file in Hadoop**

In [None]:
df_converted.write \
  .option("header", "true") \
  .option("quoteAll", "true") \
  .option("escape", "\"") \
  .mode("overwrite")\
  .csv("hdfs://localhost:9000/user1/big_data_ca1/data/Amazon-Products-Cleaned.csv")

# Inserting CSV Data into HBase 

**Importing HappyBase Library to connect to HBase.**

In [None]:
import happybase

**Reading the Amazon-Products-Cleaned.csv in Hadoop**

In [None]:
# Reading Amazon-Products-Cleaned.csv in Hadoop while applying options to read it correctly
df = spark.read.option("header", "true") \
               .option("inferSchema", "true") \
               .option("multiLine", "true") \
               .option("escape", "\"") \
               .option("quote", "\"") \
               .csv("hdfs://localhost:9000/user1/big_data_ca1/data/Amazon-Products-Cleaned.csv")

**Reviewing the Schema of Amazon-Products-Cleaned.csv**

In [None]:
# Reviewing the schema of the Spark Dataframe
df.printSchema()

In [None]:
df.show()

**Connecting to Hbase**

In [None]:
# Connecting to HBase with the happybase library 
connection = happybase.Connection('localhost')
connection.open()

**Creating and adding data to the table.**

In [None]:
# Defining the column families 
column_families = {
    'Item_Info': dict(),
    'Ratings_Info': dict(),
    'Pricing_Info': dict(),
}

# Creating the table amazon_products if it doesn't exist
table_name = 'amazon_products'
if table_name not in connection.tables():
    connection.create_table(table_name, column_families)

# Getting the table amazon_products
table = connection.table(table_name)

# For loop to add the data from Amazon-Products-Cleaned.csv to the table amazon_products
for idx, row in enumerate(df.rdd.collect(), start=1):
    # Generating a 6-character row key for each item
    row_key = str(idx).zfill(6)
    
    # Adding the data to the table
    table.put(row_key, {
        'Item_Info:name': row['name'],
        'Item_Info:main_category': row['main_category'],
        'Item_Info:sub_category': row['sub_category'],
        'Ratings_Info:ratings': str(row['ratings']),
        'Ratings_Info:no_of_ratings': str(row['no_of_ratings']),
        'Pricing_Info:discount_price': str(row['discount_price']),
        'Pricing_Info:actual_price': str(row['actual_price'])
    })


**Closing the connection**

In [None]:
connection.close()

# Apache Spark - Basic Analysis and Insights 

**Importing Libraries for visualisations**

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import subprocess

**Reading the Amazon-Products-Cleaned.csv in Hadoop**

In [None]:
# Reading Amazon-Products-Cleaned.csv in Hadoop while applying options to read it correctly
df = spark.read.option("header", "true") \
               .option("inferSchema", "true") \
               .option("multiLine", "true") \
               .option("escape", "\"") \
               .option("quote", "\"") \
               .csv("hdfs://localhost:9000/user1/big_data_ca1/data/Amazon-Products-Cleaned.csv")

**Filtering dataframe for the top 5 for ratings >= 4.5 and number of ratings > 1000**

In [None]:
# Filtering dataframe for the top 5 for ratings >= 4.5 and number of ratings > 1000
df_top_5_product_review = df.filter((F.col("ratings") >= 4.5) & (F.col("no_of_ratings") > 1000)) \
  .orderBy(F.desc("ratings"), F.desc("no_of_ratings")) \
  .limit(5)

df_top_5_product_review.show()

In [None]:
# Changing the Apache Spark Dataframe to Panda Dataframe
df_top_5_product_review_pd = df_top_5_product_review.toPandas()

# Function to take only the first 3 words from the product name
def first_three_words(name):
    words = name.split()
    return ' '.join(words[:3])

# Adjusting the product name for readability 
df_top_5_product_review_pd['name'] = df_top_5_product_review_pd['name'].apply(first_three_words)

# Setting the figure size
plt.figure(figsize=(10, 6))

# Creating the bar chart using seaborn
sns.barplot(
    x="no_of_ratings",
    y="name",
    data=df_top_5_product_review_pd,
    hue="name",
    palette="tab10"
)

# Styling the plot
plt.grid(True, which='major', linestyle='--', linewidth=0.5, zorder=0)
plt.title("Top 5 Products with Highest Ratings and Over 1000 Reviews", loc="center", pad=15, fontsize=12)
plt.xlabel("Number of Reviews", loc="center", labelpad=15, fontsize=8.5)
plt.ylabel("")  
plt.tick_params(axis='both', labelsize=8.5)

# Saving the plot as an image and uploading it to Hadoop
plt.savefig("top_5_products_based_on_highest_rating.png", dpi=300, bbox_inches='tight')
subprocess.run(["hdfs", "dfs", "-put", "-f", "top_5_products_based_on_highest_rating.png", "/user1/big_data_ca1/images/"])

# Showing the plot
plt.show()


**Filtering dataframe for the bottom 5 for ratings >= 1 and number of ratings > 1000**

In [None]:
# Filtering dataframe for the bottom 5 for ratings >= 1 and number of ratings > 1000
df_bottom_5_products_review = df.filter((F.col("ratings") >= 1) & (F.col("no_of_ratings") > 1000)) \
  .orderBy(F.asc("ratings"), F.asc("no_of_ratings")) \
  .limit(5)

df_bottom_5_products_review.show()

In [None]:
# Changing the Apache Spark Dataframe to Panda Dataframe
df_bottom_5_products_review_pd = df_bottom_5_products_review.toPandas()
# Adjusting the product name for readability 
df_bottom_5_products_review_pd['name'] = df_bottom_5_products_review_pd['name'].apply(first_three_words)

# Setting the figure size
plt.figure(figsize=(10, 6))

# Creating the bar chart using seaborn
sns.barplot(
    x="no_of_ratings",
    y="name",
    data=df_bottom_5_products_review_pd,
    hue="name",
    palette="tab10"
)

# Styling the plot
plt.grid(True, which='major', linestyle='--', linewidth=0.5, zorder=0)
plt.title("Bottom 5 Product with Lowest Ratings and Over 1000 Reviews", loc="center", pad=15, fontsize=12)
plt.xlabel("Number of Reviews", loc="center", labelpad=15, fontsize=8.5)
plt.ylabel("")  
plt.tick_params(axis='both', labelsize=8.5)

# Saving the plot as an image and uploading it to Hadoop
plt.savefig("bottom_5_products_based_on_lowest_rating.png", dpi=300, bbox_inches='tight')
subprocess.run(["hdfs", "dfs", "-put", "-f", "bottom_5_products_based_on_lowest_rating.png", "/user1/big_data_ca1/images/"])

# Showing the plot
plt.show()

**Grouping by 'main_category' and calculate summary statistics for prices**

In [None]:
# Grouping by 'main_category' and calculate summary statistics for prices
df_price_stats = df.groupBy("main_category").agg(
    F.min("actual_price").alias("min_actual_price"),
    F.max("actual_price").alias("max_actual_price"),
    F.percentile_approx("actual_price", 0.5).alias("median_actual_price"),
    F.min("discount_price").alias("min_discount_price"),
    F.max("discount_price").alias("max_discount_price"),
    F.percentile_approx("discount_price", 0.5).alias("median_discount_price")
).orderBy(col("median_actual_price").desc())

# Show dataframe
df_price_stats.show(5)

In [None]:
# Changing the Apache Spark Dataframe to Panda Dataframe
df_price_stats_pd = df_price_stats.toPandas()

# Retieving the top 5 main category
df_top_5_main_category = df_price_stats_pd.head(5)
# Applying the main category as a title for readability 
df_top_5_main_category.loc[:, "main_category"] = df_top_5_main_category["main_category"].apply(str.title)

# Setting the figure size
plt.figure(figsize=(10, 6))

# Creating the bar chart using seaborn
sns.barplot(
    x="median_actual_price",
    y="main_category",
    data=df_top_5_main_category,
    hue="main_category",
    palette="tab10"
)

# Styling the plot
plt.grid(True, which='major', linestyle='--', linewidth=0.5, zorder=0)
plt.title("Top 5 Main Categories Based on Median Price", loc="center", pad=15, fontsize=12)
plt.xlabel("Price in Euro", loc="center", labelpad=15, fontsize=8.5)
plt.ylabel("")  
plt.tick_params(axis='both', labelsize=8.5)

# Saving the plot as an image and uploading it to Hadoop
plt.savefig("top_5_main_categories_based_on_median_price.png", dpi=300, bbox_inches='tight')
subprocess.run(["hdfs", "dfs", "-put", "-f", "top_5_main_categories_based_on_median_price.png", "/user1/big_data_ca1/images/"])

# Showing the plot
plt.show()


**Calculating the total discount loss based on actual price - discount price**

In [None]:
# Calculating the total discount loss based on actual price - discount price
total_discount_loss = df.withColumn("discount_loss", F.col("actual_price") - F.col("discount_price")) \
                      .agg(
                          F.format_number(F.sum("actual_price"), 2).alias("total_actual_price"),
                          F.format_number(F.sum("discount_loss"), 2).alias("total_discount_loss")
                      ) 

total_discount_loss.show()

In [None]:
# Calculating the total discount loss based on actual price - discount price (without format_number)
total_discount_loss = df.withColumn("discount_loss", F.col("actual_price") - F.col("discount_price")) \
                      .agg(
                          F.sum("actual_price").alias("total_actual_price"),
                          F.sum("discount_loss").alias("total_discount_loss")
                      ) 

# Changing the Apache Spark Dataframe to Panda Dataframe
total_discount_loss_pd = total_discount_loss.toPandas()  

# Reshape the data for calculation and amount
total_discount_loss_reshape_sorted = pd.melt(total_discount_loss_pd, var_name="calculation", value_name="amount").sort_values(by="amount", ascending=False)

# Converting the amount value as an integer for readability 
total_discount_loss_reshape_sorted['amount'] = total_discount_loss_reshape_sorted['amount'].astype(int)
# Applying the caculation values as a title and removing "_" for readability 
total_discount_loss_reshape_sorted['calculation'] = total_discount_loss_reshape_sorted['calculation'].str.replace('_', ' ').str.title()

# Setting the figure size
plt.figure(figsize=(10, 6))

# Creating the bar chart using seaborn
sns.barplot(
    x="amount",
    y="calculation",
    data=total_discount_loss_reshape_sorted,
    hue="amount",
    palette="tab10"
)

# Styling the plot
plt.grid(True, which='major', linestyle='--', linewidth=0.5, zorder=0)
plt.title("Total Price vs. Total Discount Loss Comparison", loc="center", pad=15, fontsize=12)
plt.xlabel("Price in Euro", loc="center", labelpad=15, fontsize=8.5)
plt.ylabel("")  
plt.tick_params(axis='both', labelsize=8.5)

# Added a legend with title and move it outside the plot
plt.legend(title='Amount', bbox_to_anchor=(1,1), loc='upper left')

# Saving the plot as an image and uploading it to Hadoop
plt.savefig("total_price_vs_total_discount_loss.png", dpi=300, bbox_inches='tight')
subprocess.run(["hdfs", "dfs", "-put", "-f", "total_price_vs_total_discount_loss.png", "/user1/big_data_ca1/images/"])

# Showing the plot
plt.show()

**Grouping by 'main_category' and calculate average rating and total number of ratings**

In [None]:
# Grouping by 'main_category' and calculate average rating and total number of ratings
df_rating = df.groupBy("main_category") \
  .agg(
      F.round(F.avg("ratings"), 1).alias("average_rating"),
      F.sum("no_of_ratings").alias("total_number_ratings")
  ) \
  .orderBy(F.col("average_rating").desc()) \

df_rating.show()

In [None]:
# Changing the Apache Spark Dataframe to Panda Dataframe
df_rating_pd = df_rating.toPandas()

# Retieving the top 5 main category
df_rating_top_5_main_category = df_rating_pd.head(5)

# Applying the main category as a title for readability 
df_rating_top_5_main_category.loc[:, "main_category"] = df_rating_top_5_main_category["main_category"].apply(str.title)

# Setting the figure size
plt.figure(figsize=(10, 6))

# Creating the bar chart using seaborn
sns.barplot(
    x="average_rating",
    y="main_category",
    data=df_rating_top_5_main_category,
    hue="main_category",
    palette="tab10"
)

# Styling the plot
plt.grid(True, which='major', linestyle='--', linewidth=0.5, zorder=0)
plt.title("Top 5 Main Categories Based on Average Rating", loc="center", pad=15, fontsize=12)
plt.xlabel("Average Rating", loc="center", labelpad=15, fontsize=8.5)
plt.ylabel("")  
plt.tick_params(axis='both', labelsize=8.5)

# Saving the plot as an image and uploading it to Hadoop
plt.savefig("top_5_main_categories_based_on_average_rating.png", dpi=300, bbox_inches='tight')
subprocess.run(["hdfs", "dfs", "-put", "-f", "top_5_main_categories_based_on_average_rating.png", "/user1/big_data_ca1/images/"])

# Showing the plot
plt.show()

# Apache Spark - Content-Based Recommendation

**Importing libraries for content-based recommendation**

In [9]:
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.types import DoubleType

from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline

**Reading the Amazon-Products-Cleaned.csv in Hadoop**

In [5]:
# Reading Amazon-Products-Cleaned.csv in Hadoop while applying options to read it correctly
df = spark.read.option("header", "true") \
               .option("inferSchema", "true") \
               .option("multiLine", "true") \
               .option("escape", "\"") \
               .option("quote", "\"") \
               .csv("hdfs://localhost:9000/user1/big_data_ca1/data/Amazon-Products-Cleaned.csv")

**Adding product_id column**

In [6]:
# Adding a new column to the dataframe named product_id while using the monotonically_increasing_id() function
df = df.withColumn("product_id", monotonically_increasing_id())

# Selecting the order of the columns of the dataframe
df = df.select(["product_id", "name", "main_category", "sub_category", "ratings", "no_of_ratings", "discount_price", "actual_price"])

# Print the the dataframe Schema
df.printSchema()

root
 |-- product_id: long (nullable = false)
 |-- name: string (nullable = true)
 |-- main_category: string (nullable = true)
 |-- sub_category: string (nullable = true)
 |-- ratings: double (nullable = true)
 |-- no_of_ratings: integer (nullable = true)
 |-- discount_price: double (nullable = true)
 |-- actual_price: double (nullable = true)



**Transforming DataFrame into Numeric Features**

In [10]:
# Indexing the categorical features ["main_category", "sub_category"]
indexers = [
    StringIndexer(inputCol=col, outputCol=f"{col}_idx", handleInvalid="keep") 
    for col in ["main_category", "sub_category"]
]

# Encoding the categorical features ["main_category", "sub_category"]
encoders = [
    OneHotEncoder(inputCol=f"{col}_idx", outputCol=f"{col}_vec") 
    for col in ["main_category", "sub_category"]
]

# Assembling all numeric features into a single vector named "features"
assembler = VectorAssembler(
    inputCols=["main_category_vec", "sub_category_vec", 
               "ratings", "no_of_ratings", "discount_price", "actual_price"],
    outputCol="features"
)

# Creating a Spark Pipeline that combines all stages (indexers, encoders, and the assembler)
pipeline = Pipeline(stages=indexers + encoders + [assembler])

# Fitting the pipeline to the DataFrame 
model = pipeline.fit(df)

# Applying the fitted pipeline model to the DataFrame to transform i
df_features = model.transform(df)

                                                                                

**Function to calculate similar recommended products**

In [18]:
def recommend_similar_products(product_id, top_n):
    
    # Display dataframe row for the specified product ID
    print(f"Product ID {product_id} information:")
    df.filter(df['product_id'] == product_id).show()
    
    # Get the row for the specified product ID based on df_features
    product_row = df_features.filter(col("product_id") == product_id).select("product_id", "features").head()
    # Get the feature vector for the input product
    product_vector = product_row['features']

    # Select product IDs and features (excluding the reference product itself)
    product_features = df_features.filter(col("product_id") != product_id).select("product_id", "features")

    # Function to compute cosine similarity 
    def cosine_similarity_udf(vec1, vec2):
        dot_product = float(vec1.dot(vec2))
        norm1 = float(vec1.norm(2))
        norm2 = float(vec2.norm(2))
        return dot_product / (norm1 * norm2)

    # Register the UDF for cosine similarity
    cosine_similarity = F.udf(lambda x: cosine_similarity_udf(x, product_vector), DoubleType())

    # Apply the cosine similarity calculation for each product in the DataFrame
    product_features = product_features.withColumn(
        "cosine_similarity_score", cosine_similarity(col("features"))
    )

    # Get the top_n most similar products by ordering by cosine similarity score
    similar_df = product_features.orderBy(col("cosine_similarity_score").desc()).limit(top_n)
    
    # Collect recommended product IDs
    recommended_product_ids = [row['product_id'] for row in similar_df.select('product_id').collect()]
    
    # Filtering dataframe based on recommended product IDs
    recommended_df = df.filter(df['product_id'].isin(recommended_product_ids))

    # Show the filtered rows of recommended products
    print(f"\nTop {top_n} recommended products based on Product ID {product_id}:")
    recommended_df.show()

    # Show the filtered rows of the top N recommended products with their cosine similarity scores
    print("Recommended Product IDs with Similarity Scores")
    similar_df.select("product_id", "cosine_similarity_score").show()

**Executing recommend_similar_products function**

In [19]:
# Executing function for product ID 123456 and top 5 recommended products
recommend_similar_products(123456, 5)

Product ID 123456 information:
+----------+--------------------+-------------------+------------------+-------+-------------+--------------+------------+
|product_id|                name|      main_category|      sub_category|ratings|no_of_ratings|discount_price|actual_price|
+----------+--------------------+-------------------+------------------+-------+-------------+--------------+------------+
|    123456|Prolite Curtain R...|tv, audio & cameras|Camera Accessories|    4.0|           11|          8.79|        9.89|
+----------+--------------------+-------------------+------------------+-------+-------------+--------------+------------+



                                                                                


Top 5 recommended products based on Product ID 123456:
+----------+--------------------+-------------------+------------------+-------+-------------+--------------+------------+
|product_id|                name|      main_category|      sub_category|ratings|no_of_ratings|discount_price|actual_price|
+----------+--------------------+-------------------+------------------+-------+-------------+--------------+------------+
|    122266|Power Smart EN-EL...|tv, audio & cameras|Camera Accessories|    4.3|           13|          8.79|       10.99|
|    172423|WELBORN Camera Ba...|tv, audio & cameras|Camera Accessories|    3.9|            9|          6.59|         7.7|
|8590047494|Oligitdi Movie Di...|tv, audio & cameras|Camera Accessories|    4.0|           11|          8.79|        11.0|
|8590106468|WELBORN NP-BD1 Ba...|tv, audio & cameras|Camera Accessories|    3.8|           12|          9.89|       11.66|
|8590109147|WildRoar Camera B...|tv, audio & cameras|Camera Accessories|    4.5|   



+----------+-----------------------+
|product_id|cosine_similarity_score|
+----------+-----------------------+
|8590109147|     0.9988977909425116|
|8590106468|     0.9988128018831507|
|8590047494|     0.9987414553535601|
|    122266|     0.9979306341686875|
|    172423|     0.9978844938350759|
+----------+-----------------------+



                                                                                