In [1]:
# Importing libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, isnan, when, count, round
from pyspark.sql.types import DoubleType, IntegerType
from pyspark.sql.window import Window
from pyspark.ml.feature import Imputer
import dash
import dash_html_components as html
import dash_core_components as dcc
import dash_bootstrap_components as dbc
import plotly.express as px
import plotly.graph_objects as go

The dash_html_components package is deprecated. Please replace
`import dash_html_components as html` with `from dash import html`
  import dash_html_components as html
The dash_core_components package is deprecated. Please replace
`import dash_core_components as dcc` with `from dash import dcc`
  import dash_core_components as dcc


# Starting Spark session and Reading the datasets from hdfs

In [2]:
# Set up Spark session
spark = SparkSession.builder.appName("AmazonProductAnalysis").getOrCreate()

In [3]:
# HDFS path
hdfs_path = "hdfs://172.17.0.183:9000/"

# Read the CSV dataset from HDFS
# data_csv = hdfs_path + "Amazon.csv"
data_csv = "Amazon.csv"
df_csv = spark.read.csv(data_csv, header=True, inferSchema=True, sep=',')

In [4]:
# Reading JSON Dataset from HDFS
# data_json = hdfs_path + "Amazon.json"
data_json = "Amazon.json"
df_json = spark.read.option("multiline","true").json(data_json)

# Merging both csv and json dataframes

In [5]:
df=df_csv.unionByName(df_json)

# Checking Schema of Dataframe and Casting Columns

In [6]:
df = df.withColumn("ratings", df["ratings"].cast(DoubleType()))
df = df.withColumn("no_of_ratings", df["no_of_ratings"].cast(IntegerType()))

# Filling NULL values using mean

In [7]:
imputer = Imputer(
    inputCols=["ratings", "no_of_ratings", "discount_price", "actual_price"],
    outputCols=["ratings", "no_of_ratings", "discount_price", "actual_price"]
).setStrategy("mean")

df = imputer.fit(df).transform(df)

# Verify Actual Price is Greater than Discount Price

In [8]:
condition_result = df.filter(col("actual_price") > col("discount_price")).count() == df.count()

# If the result is False, return the rows where 'actual_price' is smaller than 'discount_price'
if not condition_result:
    rows_with_actual_price_smaller = df.filter(col("actual_price") <= col("discount_price"))

    # Count the number of rows
    num_dropped_rows = rows_with_actual_price_smaller.count()

    # Drop those rows from the original DataFrame
    df = df.filter(col("actual_price") > col("discount_price"))

# Creating Discount Percent Column

In [9]:
df = df.withColumn("discount_percentage", round((((col("actual_price") - col("discount_price")) / col("actual_price")) * 100), ))

In [10]:
df_filtered = df.filter(col('discount_percentage') != 100.00)

In [11]:
df = df_filtered.filter(col('actual_price') > 0)

# PYSPARK SQL 

In [12]:
df.createOrReplaceTempView("amazon_products")

In [13]:
###########################DISCOUNT########################

# SQL Query for Top 10 Subcategories with the Highest Average Discount Percentage
top_subcategories_avg_discount_query = """
SELECT
    sub_category,
    AVG(discount_percentage) AS avg_discount_percentage
FROM amazon_products
GROUP BY sub_category
ORDER BY avg_discount_percentage DESC
LIMIT 10
"""

# SQL Query for Top 5 Main Categories with the Highest Proportion of Discounted Products
top_main_categories_discount_proportion_query = """
SELECT
    main_category,
    COUNT(*) AS total_products,
    SUM(CASE WHEN discount_percentage > 0 THEN 1 ELSE 0 END) AS discounted_products,
    (COUNT(*) - SUM(CASE WHEN discount_percentage > 0 THEN 1 ELSE 0 END)) AS non_discounted_products
FROM amazon_products
GROUP BY main_category
ORDER BY (discounted_products / total_products) DESC
LIMIT 5
"""
# SQL Query for the Lowest Discount Percentage and Corresponding Main Category
lowest_discount_query = """
SELECT
    main_category,
    MIN(discount_percentage) AS lowest_discount
FROM amazon_products
GROUP BY main_category
ORDER BY lowest_discount ASC
LIMIT 1
"""

# SQL Query for the Highest Discount Percentage and Corresponding Main Category
highest_discount_query = """
SELECT
    main_category,
    MAX(discount_percentage) AS highest_discount
FROM amazon_products
GROUP BY main_category
ORDER BY highest_discount DESC
LIMIT 1
"""

###############################################PRICE #######################################################
########################### ACTUAL PRICE ########################

# SQL Query for Top 10 Subcategories with the Highest Average Actual Price
top_subcategories_avg_actual_price_query = """
SELECT
    sub_category,
    AVG(actual_price) AS avg_actual_price
FROM amazon_products
GROUP BY sub_category
ORDER BY avg_actual_price DESC
LIMIT 10
"""

# SQL Query for Top 5 Main Categories with the Highest Average Actual Price
top_main_categories_avg_actual_price_query = """
SELECT
    main_category,
    AVG(actual_price) AS avg_actual_price
FROM amazon_products
GROUP BY main_category
ORDER BY avg_actual_price DESC
LIMIT 5
"""

# SQL Query for the Lowest Actual Price and Corresponding Main Category
lowest_actual_price_query = """
SELECT
    main_category,
    MIN(actual_price) AS lowest_actual_price
FROM amazon_products
GROUP BY main_category
ORDER BY lowest_actual_price ASC
LIMIT 1
"""

# SQL Query for the Highest Actual Price and Corresponding Main Category
highest_actual_price_query = """
SELECT
    main_category,
    MAX(actual_price) AS highest_actual_price
FROM amazon_products
GROUP BY main_category
ORDER BY highest_actual_price DESC
LIMIT 1
"""

########################### RATINGS ########################

# SQL Query for Top 10 Subcategories with the Highest Average Ratings
top_subcategories_avg_ratings_query = """
SELECT
    sub_category,
    AVG(ratings) AS avg_ratings
FROM amazon_products
GROUP BY sub_category
ORDER BY avg_ratings DESC
LIMIT 10
"""

# SQL Query for Top 5 Main Categories with the Highest Average Ratings
top_main_categories_avg_ratings_query = """
SELECT
    main_category,
    AVG(ratings) AS avg_ratings
FROM amazon_products
GROUP BY main_category
ORDER BY avg_ratings DESC
LIMIT 5
"""

# SQL Query for the Lowest Ratings and Corresponding Main Category
lowest_ratings_query = """
SELECT
    main_category,
    MIN(ratings) AS lowest_ratings
FROM amazon_products
GROUP BY main_category
ORDER BY lowest_ratings ASC
LIMIT 1
"""

# SQL Query for the Highest Ratings and Corresponding Main Category
highest_ratings_query = """
SELECT
    main_category,
    MAX(ratings) AS highest_ratings
FROM amazon_products
GROUP BY main_category
ORDER BY highest_ratings DESC
LIMIT 1
"""

############################## POPULARITY ##############################

# SQL Query for Top 10 Subcategories with the Highest Count of Ratings (Popularity)
top_subcategories_popularity_query = """
SELECT
    sub_category,
    COUNT(ratings) AS ratings_count
FROM amazon_products
GROUP BY sub_category
ORDER BY ratings_count DESC
LIMIT 10
"""

# SQL Query for Top 5 Main Categories with the Highest Count of Ratings (Popularity)
top_main_categories_popularity_query = """
SELECT
    main_category,
    COUNT(ratings) AS ratings_count
FROM amazon_products
GROUP BY main_category
ORDER BY ratings_count DESC
LIMIT 5
"""


In [14]:
# Execute Spark SQL queries
top_subcategories_avg_discount = spark.sql(top_subcategories_avg_discount_query).toPandas()
top_main_categories_discount_proportion = spark.sql(top_main_categories_discount_proportion_query).toPandas()
lowest_discount = spark.sql(lowest_discount_query).toPandas()
highest_discount = spark.sql(highest_discount_query).toPandas()

top_subcategories_avg_actual_price = spark.sql(top_subcategories_avg_actual_price_query).toPandas()
top_main_categories_avg_actual_price = spark.sql(top_main_categories_avg_actual_price_query).toPandas()
lowest_actual_price = spark.sql(lowest_actual_price_query).toPandas()
highest_actual_price = spark.sql(highest_actual_price_query).toPandas()

top_subcategories_avg_ratings = spark.sql(top_subcategories_avg_ratings_query).toPandas()
top_main_categories_avg_ratings = spark.sql(top_main_categories_avg_ratings_query).toPandas()
lowest_ratings = spark.sql(lowest_ratings_query).toPandas()
highest_ratings = spark.sql(highest_ratings_query).toPandas()

top_subcategories_popularity = spark.sql(top_subcategories_popularity_query).toPandas()
top_main_categories_popularity = spark.sql(top_main_categories_popularity_query).toPandas()

# DASHBOARD USING DASH

In [15]:
# Create Dash app
app = dash.Dash(__name__, external_stylesheets=[dbc.themes.BOOTSTRAP])

app.layout = html.Div(children=[
    # Navbar
    html.Nav(
        children=[
            html.A('Amazon Product Analysis', href='#discount', className='nav-link', style={'margin-left':'2px','margin-right': '600px',
                                                                                    'font-weight': 'bold','color':'orange'}),
            html.A('Discount', href='#discount', className='nav-link', style={'margin-right': '10px', 'font-weight': 'bold', 'color':'white'}),
            html.A('Price', href='#price', className='nav-link', style={'margin-right': '10px', 'font-weight': 'bold','color':'white'}),
            html.A('Ratings', href='#ratings', className='nav-link', style={'margin-right': '10px', 'font-weight': 'bold','color':'white'}),
            html.A('Popularity', href='#popularity', className='nav-link', style={'color':'white','font-weight': 'bold'}),
        ],
        className='navbar navbar-expand-lg navbar-dark bg-dark',
        style={'position':'sticky','top': '0','z-index':'3'}
    ),

    # Content
    html.H1("Dashboard"),
    html.Hr(),
    # Discount Section
    html.Section(
        id='discount',
        children=[
            html.H2("Discount Analysis",style={'color': 'orange'}),
            dcc.Graph(figure=px.bar(top_subcategories_avg_discount, x='sub_category', y='avg_discount_percentage',
                                    title='Top 10 Subcategories with the Highest Average Discount Percentage')),
            dcc.Graph(figure=px.bar(top_main_categories_discount_proportion, x='main_category',
                                    y='discounted_products', color='discounted_products',
                                    title='Top 5 Main Categories - Discount Proportion')),
              html.H3("Lowest Discount Category"),
            dbc.Table.from_dataframe(lowest_discount, striped=True, bordered=True, hover=True),
            html.H3("Highest Discount Category"),
            dbc.Table.from_dataframe(highest_discount, striped=True, bordered=True, hover=True)
        ]
    ),
    html.Hr(),
    # Price Section
    html.Section(
        id='price',
        children=[
            html.H2("Price Analysis",style={'color': 'orange'}),
            dcc.Graph(figure=px.bar(top_subcategories_avg_actual_price, x='sub_category', y='avg_actual_price',
                                    title='Top 10 Subcategories with the Highest Average Actual Price')),
            dcc.Graph(figure=px.bar(top_main_categories_avg_actual_price, x='main_category', y='avg_actual_price',
                                    title='Top 5 Main Categories with the Highest Average Actual Price')),
            html.H3("Lowest Price Category"),
            dbc.Table.from_dataframe(lowest_actual_price, striped=True, bordered=True, hover=True),
            html.H3("Highest Price Category"),
            dbc.Table.from_dataframe(highest_actual_price, striped=True, bordered=True, hover=True)
        ]
    ),
    html.Hr(),

    # Ratings Section
    html.Section(
        id='ratings',
        children=[
            html.H2("Ratings Analysis",style={'color': 'orange'}),
            dcc.Graph(figure=px.bar(top_subcategories_avg_ratings, x='sub_category', y='avg_ratings',
                                    title='Top 10 Subcategories with the Highest Average Ratings')),
            dcc.Graph(figure=px.bar(top_main_categories_avg_ratings, x='main_category', y='avg_ratings',
                                    title='Top 5 Main Categories with the Highest Average Ratings')),
             html.H3("Lowest Rating Category"),
            dbc.Table.from_dataframe(lowest_ratings, striped=True, bordered=True, hover=True),
             html.H3("Highest Rating Category"),
            dbc.Table.from_dataframe(highest_ratings, striped=True, bordered=True, hover=True),
            
        ]
    ),
    html.Hr(),
    # Popularity Section
    html.Section(
        id='popularity',
        children=[
            html.H2("Popularity Analysis",style={'color': 'orange'}),
            dcc.Graph(figure=px.bar(top_subcategories_popularity, x='sub_category', y='ratings_count',
                                    title='Top 10 Subcategories with the Highest Count of Ratings (Popularity)')),
            dcc.Graph(figure=px.bar(top_main_categories_popularity, x='main_category', y='ratings_count',
                                    title='Top 5 Main Categories with the Highest Count of Ratings (Popularity)'))
        ]
    ),
])

html.Hr()

# Run the app
if __name__ == '__main__':
    app.run_server(debug=True)

# The app is running on "http://localhost:8050/"