<a href="https://colab.research.google.com/github/fahminmahili/business-retail-sales-analytics-pyspark/blob/main/BigDataAnalyticsOnBusinessRetailSalesDataset.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [26]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


Task 1: Importing Required Libraries, Loading the Dataset, Performing RDD Transformations(map, filter, reduceByKey, count, and groupBy)

In [28]:
!pip install -q findspark


In [30]:
!pip install pyspark



In [8]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Big Data Analytics On Business Retail Sales Dataset").getOrCreate()

# Load the dataset
file_path = "/content/drive/MyDrive/Assignment2/superstore_dataset2011-2015.csv"
rdd_data = spark.read.csv(file_path, header=True, inferSchema=True)

rdd_data.show()

+------+---------------+----------+---------+--------------+-----------+-----------------+-----------+-------------+----------------+-------------+-----------+------+------------+----------------+---------------+------------+--------------------+-------+--------+--------+-------+-------------+--------------+
|Row ID|       Order ID|Order Date|Ship Date|     Ship Mode|Customer ID|    Customer Name|    Segment|         City|           State|      Country|Postal Code|Market|      Region|      Product ID|       Category|Sub-Category|        Product Name|  Sales|Quantity|Discount| Profit|Shipping Cost|Order Priority|
+------+---------------+----------+---------+--------------+-----------+-----------------+-----------+-------------+----------------+-------------+-----------+------+------------+----------------+---------------+------------+--------------------+-------+--------+--------+-------+-------------+--------------+
| 42433|   AG-2011-2040|  1/1/2011| 6/1/2011|Standard Class|   TB-1128

In [9]:
# Import necessary functions
from pyspark.sql.functions import col

# Map Transformation
mapped_rdd = rdd_data.rdd.map(lambda row: (row['Category'], row['Profit']))

# Filter Transformation
filtered_rdd = mapped_rdd.filter(lambda x: x[1] > 0)

# Reduce Transformation
reduced_rdd = filtered_rdd.reduceByKey(lambda x, y: x + y)

# Count Transformation
count_result = rdd_data.count()

# GroupBy Transformation
grouped_rdd = rdd_data.groupBy("Category").agg({"Profit": "sum"})


In [10]:
# Map Transformation
mapped_rdd = rdd_data.rdd.map(lambda row: (row['Category'], row['Profit']))
mapped_result = mapped_rdd.collect()
print("Mapped Result:")
for result in mapped_result[:5]:  # Display the first 5 results for brevity
    print(result)

Mapped Result:
('Office Supplies', 106.14)
('Office Supplies', 36.036)
('Office Supplies', 29.64)
('Office Supplies', -26.055)
('Furniture', 37.77)


In [11]:
# Filter Transformation
filtered_rdd = mapped_rdd.filter(lambda x: x[1] > 0)
filtered_result = filtered_rdd.collect()
print("\nFiltered Result:")
for result in filtered_result[:5]:  # Display the first 5 results for brevity
    print(result)


Filtered Result:
('Office Supplies', 106.14)
('Office Supplies', 36.036)
('Office Supplies', 29.64)
('Furniture', 37.77)
('Office Supplies', 15.342)


In [12]:
# Reduce Transformation
reduced_rdd = filtered_rdd.reduceByKey(lambda x, y: x + y)
reduced_result = reduced_rdd.collect()
print("\nReduced Result:")
for result in reduced_result[:5]:  # Display the first 5 results for brevity
    print(result)


Reduced Result:
('Furniture', 656161.5209000008)
('Office Supplies', 779424.8878000025)
('Technology', 950207.4800999991)


In [13]:
# Count Transformation
count_result = rdd_data.count()
print(f"\nCount Result: {count_result}")


Count Result: 51290


In [14]:
# GroupBy Transformation
grouped_rdd = rdd_data.groupBy("Category").agg({"Profit": "sum"})
grouped_result = grouped_rdd.collect()
print("\nGrouped Result:")
grouped_rdd.show()


Grouped Result:
+---------------+------------------+
|       Category|       sum(Profit)|
+---------------+------------------+
|Office Supplies|516615.91190000053|
|      Furniture| 286439.8782000004|
|     Technology| 663712.0816800005|
+---------------+------------------+



Task 2: Utilizing Spark SQL Queries (aggregation, filtering and sorting)


In [31]:
from pyspark.sql import SparkSession

# Create a temporary view
rdd_data.createOrReplaceTempView("superstore_data")

# Write SQL queries
aggregation_query = spark.sql("SELECT Category, SUM(Profit) as TotalProfit FROM superstore_data GROUP BY Category")
filtering_query = spark.sql("SELECT * FROM superstore_data WHERE Profit > 0")
sorting_query = spark.sql("SELECT * FROM superstore_data ORDER BY Profit DESC")


In [32]:
# Create a temporary view
rdd_data.createOrReplaceTempView("superstore_data")

# Write SQL queries
aggregation_query = spark.sql("SELECT Category, SUM(Profit) as TotalProfit FROM superstore_data GROUP BY Category")
print("Aggregation Query Result:")
aggregation_query.show()


Aggregation Query Result:
+---------------+------------------+
|       Category|       TotalProfit|
+---------------+------------------+
|Office Supplies|516615.91190000053|
|      Furniture| 286439.8782000004|
|     Technology| 663712.0816800005|
+---------------+------------------+



In [33]:
filtering_query = spark.sql("SELECT * FROM superstore_data WHERE Profit > 0")

print("\nFiltering Query Result:")
filtering_query.show()



Filtering Query Result:
+------+---------------+----------+---------+--------------+-----------+-----------------+---------+-------------+--------------------+-------------+-----------+------+------------+----------------+---------------+------------+--------------------+-------+--------+--------+------+-------------+--------------+
|Row ID|       Order ID|Order Date|Ship Date|     Ship Mode|Customer ID|    Customer Name|  Segment|         City|               State|      Country|Postal Code|Market|      Region|      Product ID|       Category|Sub-Category|        Product Name|  Sales|Quantity|Discount|Profit|Shipping Cost|Order Priority|
+------+---------------+----------+---------+--------------+-----------+-----------------+---------+-------------+--------------------+-------------+-----------+------+------------+----------------+---------------+------------+--------------------+-------+--------+--------+------+-------------+--------------+
| 42433|   AG-2011-2040|  1/1/2011| 6/1/20

In [35]:
sorting_query = spark.sql("SELECT * FROM superstore_data ORDER BY Profit DESC")

print("\nSorting Query Result:")
sorting_query.show()



Sorting Query Result:
+------+---------------+----------+----------+--------------+-----------+--------------------+-----------+-------------+----------------+--------------+-----------+------+------------+----------------+---------------+------------+--------------------+---------+--------+--------+---------+-------------+--------------+
|Row ID|       Order ID|Order Date| Ship Date|     Ship Mode|Customer ID|       Customer Name|    Segment|         City|           State|       Country|Postal Code|Market|      Region|      Product ID|       Category|Sub-Category|        Product Name|    Sales|Quantity|Discount|   Profit|Shipping Cost|Order Priority|
+------+---------------+----------+----------+--------------+-----------+--------------------+-----------+-------------+----------------+--------------+-----------+------+------------+----------------+---------------+------------+--------------------+---------+--------+--------+---------+-------------+--------------+
| 38123| CA-2013-118

Task 3: Using Spark DataFrames (select, groupBy, orderBy)

In [36]:
from pyspark.sql import SparkSession

# Read the dataset using Spark DataFrames
df_data = spark.read.csv(file_path, header=True, inferSchema=True)

# Perform transformations and actions
selected_df = df_data.select("Category", "Profit")
grouped_df = df_data.groupBy("Category").agg({"Profit": "sum"})
ordered_df = df_data.orderBy("Profit", ascending=False)


In [37]:
# Read the dataset using Spark DataFrames
df_data = spark.read.csv(file_path, header=True, inferSchema=True)

# Perform Select transformation
print("Selected DataFrame:")
selected_df = df_data.select("Category", "Profit")
selected_df.show()


Selected DataFrame:
+---------------+-------+
|       Category| Profit|
+---------------+-------+
|Office Supplies| 106.14|
|Office Supplies| 36.036|
|Office Supplies|  29.64|
|Office Supplies|-26.055|
|      Furniture|  37.77|
|Office Supplies| 15.342|
|     Technology|   71.4|
|      Furniture| 3.4196|
|Office Supplies|  92.88|
|Office Supplies|  68.31|
|Office Supplies|  137.4|
|Office Supplies| 20.024|
|      Furniture|  148.5|
|Office Supplies|   15.3|
|Office Supplies|  11.79|
|     Technology|-19.136|
|Office Supplies|  20.34|
|Office Supplies|    9.6|
|Office Supplies|    9.9|
|Office Supplies|    0.0|
+---------------+-------+
only showing top 20 rows



In [38]:
# Perform GroupBy transformation
print("\nGrouped DataFrame:")
grouped_df = df_data.groupBy("Category").agg({"Profit": "sum"})
grouped_df.show()



Grouped DataFrame:
+---------------+------------------+
|       Category|       sum(Profit)|
+---------------+------------------+
|Office Supplies|516615.91190000053|
|      Furniture| 286439.8782000004|
|     Technology| 663712.0816800005|
+---------------+------------------+



In [39]:
# Perform OrderBy transformation
print("\nOrdered DataFrame:")
ordered_df = df_data.orderBy("Profit", ascending=False)
ordered_df.show()



Ordered DataFrame:
+------+---------------+----------+----------+--------------+-----------+--------------------+-----------+-------------+----------------+--------------+-----------+------+------------+----------------+---------------+------------+--------------------+---------+--------+--------+---------+-------------+--------------+
|Row ID|       Order ID|Order Date| Ship Date|     Ship Mode|Customer ID|       Customer Name|    Segment|         City|           State|       Country|Postal Code|Market|      Region|      Product ID|       Category|Sub-Category|        Product Name|    Sales|Quantity|Discount|   Profit|Shipping Cost|Order Priority|
+------+---------------+----------+----------+--------------+-----------+--------------------+-----------+-------------+----------------+--------------+-----------+------+------------+----------------+---------------+------------+--------------------+---------+--------+--------+---------+-------------+--------------+
| 38123| CA-2013-118689