In [None]:
pip install pyspark

In [None]:
# IMPORTING NECESSARY DEPENDENCIES

In [None]:
from pyspark.sql import SparkSession

In [None]:
from pyspark.sql.functions import col, sum

In [None]:
#Initializing a Spark Session using PySpark library

In [None]:
#SparkSession.builder: This is a builder pattern used to configure and create a SparkSession.

#.appName(“SalesDataAnalysis”): to set the name of the Spark application to “SalesDataAnal-ysis”.
#.getOrCreate(): This method retrieves an existing SparkSession if it exists or creates a new one if none exists.

In [None]:
spark = SparkSession.builder \
    .appName("SalesDataAnalysis") \
    .getOrCreate()

In [None]:
# Read CSV file into DataFrame
sales_df = spark.read.csv("/content/sales_data_sample.csv", header=True, inferSchema=True)

In [None]:
sales_df

DataFrame[ORDERNUMBER: int, QUANTITYORDERED: int, PRICEEACH: double, ORDERLINENUMBER: int, SALES: double, ORDERDATE: string, STATUS: string, QTR_ID: int, MONTH_ID: int, YEAR_ID: int, PRODUCTLINE: string, MSRP: int, PRODUCTCODE: string, CUSTOMERNAME: string, PHONE: string, ADDRESSLINE1: string, ADDRESSLINE2: string, CITY: string, STATE: string, POSTALCODE: string, COUNTRY: string, TERRITORY: string, CONTACTLASTNAME: string, CONTACTFIRSTNAME: string, DEALSIZE: string]

In [None]:
#Performing data cleaning: handling missing values and remov- ing duplicates

In [None]:
cleaned_sales_df = sales_df.dropDuplicates().na.drop()

In [None]:
 cleaned_sales_df

DataFrame[ORDERNUMBER: int, QUANTITYORDERED: int, PRICEEACH: double, ORDERLINENUMBER: int, SALES: double, ORDERDATE: string, STATUS: string, QTR_ID: int, MONTH_ID: int, YEAR_ID: int, PRODUCTLINE: string, MSRP: int, PRODUCTCODE: string, CUSTOMERNAME: string, PHONE: string, ADDRESSLINE1: string, ADDRESSLINE2: string, CITY: string, STATE: string, POSTALCODE: string, COUNTRY: string, TERRITORY: string, CONTACTLASTNAME: string, CONTACTFIRSTNAME: string, DEALSIZE: string]

In [None]:
#The “PRODUCTCODE” column is being used to aggregate the cleaned_sales_df DataFrame, which shows the cleaned sales data after handling missing values and eliminating duplicates.

#Groups of rows with identical values in the “PRODUCTCODE” column are produced by the groupBy(“PRODUCTCODE”) function. This indicates that a grouping of all rows with the same product code will occur.

#The grouped data is then aggregated using the agg() method. Sum(“SALES”) in agg() determines the total of the “SALES” column for each group. The total sales amount for each product code is determined in this way.

#Lastly, a new name “TotalSalesAmount” is assigned to the aggregated column, which represents the total sales amount for each product code, using alias(“TotalSalesAmount”).

In [None]:
# Handle missing values (replace nulls with 0)
sales_df = sales_df.na.fill(0)

In [None]:
# Remove duplicates
sales_df = sales_df.dropDuplicates()

In [None]:
sales_df.printSchema()

root
 |-- ORDERNUMBER: integer (nullable = true)
 |-- QUANTITYORDERED: integer (nullable = true)
 |-- PRICEEACH: double (nullable = false)
 |-- ORDERLINENUMBER: integer (nullable = true)
 |-- SALES: double (nullable = false)
 |-- ORDERDATE: string (nullable = true)
 |-- STATUS: string (nullable = true)
 |-- QTR_ID: integer (nullable = true)
 |-- MONTH_ID: integer (nullable = true)
 |-- YEAR_ID: integer (nullable = true)
 |-- PRODUCTLINE: string (nullable = true)
 |-- MSRP: integer (nullable = true)
 |-- PRODUCTCODE: string (nullable = true)
 |-- CUSTOMERNAME: string (nullable = true)
 |-- PHONE: string (nullable = true)
 |-- ADDRESSLINE1: string (nullable = true)
 |-- ADDRESSLINE2: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- STATE: string (nullable = true)
 |-- POSTALCODE: string (nullable = true)
 |-- COUNTRY: string (nullable = true)
 |-- TERRITORY: string (nullable = true)
 |-- CONTACTLASTNAME: string (nullable = true)
 |-- CONTACTFIRSTNAME: string (nullable = 

In [None]:
# Calculate total sales amount for each product
product_sales_df = cleaned_sales_df.groupBy("PRODUCTCODE").agg(sum("SALES"). alias("TotalSalesAmount"))

In [None]:
product_sales_df

DataFrame[PRODUCTCODE: string, TotalSalesAmount: double]

In [None]:
#Final result is stored in product_sales_results

In [None]:
# Output the results to a new CSV file
product_sales_df.coalesce(1).write.csv("/content/product_sales_results.csv", mode="overwrite", header=True)

In [None]:
product_sales_df.show()

+-----------+------------------+
|PRODUCTCODE|  TotalSalesAmount|
+-----------+------------------+
|   S18_4600|12936.099999999999|
|   S18_1749|           11174.1|
|   S12_3891|          12547.32|
|   S18_2248|           3931.64|
|   S32_1268| 4706.639999999999|
|   S12_1099|            5019.9|
|   S18_2795|          19255.22|
|   S24_1937|           9554.31|
|   S32_3522|          12700.04|
|   S18_1097|          12626.71|
|   S12_1666|          18616.09|
|   S24_3969|            3647.1|
|   S24_4048|          10691.41|
|   S24_1578|           8652.03|
|   S18_3320|10743.779999999999|
|   S18_3136|16425.280000000002|
|   S32_2509|           3873.24|
|   S24_2887|          18576.34|
|   S18_4409|12215.029999999999|
|   S10_4757|           1201.25|
+-----------+------------------+
only showing top 20 rows



In [None]:
# Stop SparkSession
spark.stop()