##SALES DATA ANALYSIS

###Setup for the pySpark

In [32]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

In [48]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [49]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
#for MaxRows
#spark.conf.set('spark.sql.repl.eagerEval.maxNumRows', True)
#To Enable the Spark Rows Show
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)
#spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better

###Importing Libraries

In [50]:
from pyspark.sql import functions as F
from functools import reduce
from operator import add
from pyspark.sql.functions import col
from pyspark.sql import DataFrame
import plotly.express as px
import pandas as pd
import numpy as np

In [51]:
spark = SparkSession.builder \
    .appName("SalesDataAnalysis") \
    .getOrCreate()

###Importing Sales Dataest

In [52]:
sales_data = spark.read.csv("sales_data.csv", header=True, inferSchema=True)

The given dataset contains total 2824 rows and 25 columns.

###Analysing Dataset

In [72]:
sales_data.printSchema()

#Before Data Cleaning
sales_data.show()

root
 |-- ORDERNUMBER: integer (nullable = true)
 |-- QUANTITYORDERED: integer (nullable = true)
 |-- PRICEEACH: double (nullable = true)
 |-- ORDERLINENUMBER: integer (nullable = true)
 |-- SALES: double (nullable = true)
 |-- ORDERDATE: string (nullable = true)
 |-- STATUS: string (nullable = true)
 |-- QTR_ID: integer (nullable = true)
 |-- MONTH_ID: integer (nullable = true)
 |-- YEAR_ID: integer (nullable = true)
 |-- PRODUCTLINE: string (nullable = true)
 |-- MSRP: integer (nullable = true)
 |-- PRODUCTCODE: string (nullable = true)
 |-- CUSTOMERNAME: string (nullable = true)
 |-- PHONE: string (nullable = true)
 |-- ADDRESSLINE1: string (nullable = true)
 |-- ADDRESSLINE2: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- STATE: string (nullable = true)
 |-- POSTALCODE: string (nullable = true)
 |-- COUNTRY: string (nullable = true)
 |-- TERRITORY: string (nullable = true)
 |-- CONTACTLASTNAME: string (nullable = true)
 |-- CONTACTFIRSTNAME: string (nullable = tr

###Dropping NULL Values

In [54]:
cleaned_sales_data=sales_data.na.drop()

#After Data cleaning
cleaned_sales_data.show()

+-----------+---------------+---------+---------------+-------+---------------+-------+------+--------+-------+------------+----+-----------+--------------------+---------------+--------------------+------------+------------+--------+----------+---------+---------+---------------+----------------+--------+
|ORDERNUMBER|QUANTITYORDERED|PRICEEACH|ORDERLINENUMBER|  SALES|      ORDERDATE| STATUS|QTR_ID|MONTH_ID|YEAR_ID| PRODUCTLINE|MSRP|PRODUCTCODE|        CUSTOMERNAME|          PHONE|        ADDRESSLINE1|ADDRESSLINE2|        CITY|   STATE|POSTALCODE|  COUNTRY|TERRITORY|CONTACTLASTNAME|CONTACTFIRSTNAME|DEALSIZE|
+-----------+---------------+---------+---------------+-------+---------------+-------+------+--------+-------+------------+----+-----------+--------------------+---------------+--------------------+------------+------------+--------+----------+---------+---------+---------------+----------------+--------+
|      10223|             37|    100.0|              1|3965.66| 2/20/2004 0:

###Dropping Duplicate Values

In [55]:
dropped_duplicates_data = cleaned_sales_data.dropna().dropDuplicates()
dropped_duplicates_data.show()

+-----------+---------------+---------+---------------+-------+---------------+----------+------+--------+-------+----------------+----+-----------+--------------------+---------------+--------------------+------------+------------+--------+----------+---------+---------+---------------+----------------+--------+
|ORDERNUMBER|QUANTITYORDERED|PRICEEACH|ORDERLINENUMBER|  SALES|      ORDERDATE|    STATUS|QTR_ID|MONTH_ID|YEAR_ID|     PRODUCTLINE|MSRP|PRODUCTCODE|        CUSTOMERNAME|          PHONE|        ADDRESSLINE1|ADDRESSLINE2|        CITY|   STATE|POSTALCODE|  COUNTRY|TERRITORY|CONTACTLASTNAME|CONTACTFIRSTNAME|DEALSIZE|
+-----------+---------------+---------+---------------+-------+---------------+----------+------+--------+-------+----------------+----+-----------+--------------------+---------------+--------------------+------------+------------+--------+----------+---------+---------+---------------+----------------+--------+
|      10163|             31|    100.0|              2|

###Calculating Totals Sales By Each Product

In [71]:
calculate_total_sales_data = dropped_duplicates_data.groupBy("PRODUCTLINE").sum("SALES").withColumnRenamed("sum(SALES)", "TotalSales")
calculate_total_sales_data.show()

+----------------+-----------------+
|     PRODUCTLINE|       TotalSales|
+----------------+-----------------+
|     Motorcycles|        116738.31|
|    Vintage Cars|        199436.24|
|           Ships|         15146.83|
|Trucks and Buses|         120698.5|
|    Classic Cars|379958.2299999999|
|          Trains|          7530.65|
|          Planes|63772.09000000001|
+----------------+-----------------+



###Creating New CSV File

In [74]:
calculate_total_sales_data.coalesce(1).write.csv("product_sales_summary.csv", header=True)

In [75]:
spark.stop()