In [14]:
import findspark
findspark.init()
import pyspark
from pyspark.sql.functions import *

In [4]:
from pyspark.sql import SparkSession

In [5]:
spark = SparkSession.builder.appName("Cpastone Project").getOrCreate()

In [6]:
spark

In [75]:
df = spark.read.load("5000 Sales Records (1).csv", format="csv", sep = ",", inferSchema= "True", header = "True")

In [51]:
df.show()

+--------------------+--------------------+---------------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
|              Region|             Country|      Item Type|Sales Channel|Order Priority|Order Date| Order ID| Ship Date|Units Sold|Unit Price|Unit Cost|Total Revenue|Total Cost|Total Profit|
+--------------------+--------------------+---------------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
|Central America a...|Antigua and Barbuda |      Baby Food|       Online|             M|12/20/2013|957081544| 1/11/2014|       552|    255.28|   159.42|    140914.56|  87999.84|    52914.72|
|Central America a...|              Panama|         Snacks|      Offline|             C|  7/5/2010|301644504| 7/26/2010|      2167|    152.58|    97.44|    330640.86| 211152.48|   119488.38|
|              Europe|      Czech Republic|  

In [52]:
df.printSchema()

root
 |-- Region: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Item Type: string (nullable = true)
 |-- Sales Channel: string (nullable = true)
 |-- Order Priority: string (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Order ID: integer (nullable = true)
 |-- Ship Date: string (nullable = true)
 |-- Units Sold: integer (nullable = true)
 |-- Unit Price: double (nullable = true)
 |-- Unit Cost: double (nullable = true)
 |-- Total Revenue: double (nullable = true)
 |-- Total Cost: double (nullable = true)
 |-- Total Profit: double (nullable = true)



In [79]:
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")
df1 = df.withColumn('Order Date', to_date(df["Order Date"], 'MM/dd/yyyy')).withColumn('Ship Date', to_date(df["Ship Date"], 'MM/dd/yyyy'))

In [82]:
df1.printSchema()
df1.show(5)

root
 |-- Region: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Item Type: string (nullable = true)
 |-- Sales Channel: string (nullable = true)
 |-- Order Priority: string (nullable = true)
 |-- Order Date: date (nullable = true)
 |-- Order ID: integer (nullable = true)
 |-- Ship Date: date (nullable = true)
 |-- Units Sold: integer (nullable = true)
 |-- Unit Price: double (nullable = true)
 |-- Unit Cost: double (nullable = true)
 |-- Total Revenue: double (nullable = true)
 |-- Total Cost: double (nullable = true)
 |-- Total Profit: double (nullable = true)

+--------------------+--------------------+---------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
|              Region|             Country|Item Type|Sales Channel|Order Priority|Order Date| Order ID| Ship Date|Units Sold|Unit Price|Unit Cost|Total Revenue|Total Cost|Total Profit|
+--------------------+----------------

##### 1)yearly sales report

In [137]:
yearly_sales = df1.groupBy(year("Order Date").alias("Year")).agg(sum("Total Revenue").alias("Sales")).orderBy("Year")

In [138]:
yearly_sales.show()

+----+--------------------+
|Year|               Sales|
+----+--------------------+
|2010| 8.177653681700003E8|
|2011|      8.3762222024E8|
|2012| 9.036474228000004E8|
|2013| 9.028807039600003E8|
|2014| 8.563977455599985E8|
|2015| 9.845037861999993E8|
|2016| 8.553197613500007E8|
|2017|4.7055220025999993E8|
+----+--------------------+



##### 2)Yearly sum for all country

In [139]:
country_yearly_sales = df1.groupBy(["Country",year("Order Date").alias("Years")]).agg(sum("Total Revenue").alias("Sales")).orderBy("Country","Years")

In [140]:
country_yearly_sales.show()

+-----------+-----+------------------+
|    Country|Years|             Sales|
+-----------+-----+------------------+
|Afghanistan| 2011|         527705.38|
|Afghanistan| 2012|2673576.5799999996|
|Afghanistan| 2013|1608517.0999999999|
|Afghanistan| 2014|7612873.7299999995|
|Afghanistan| 2015|        5163510.99|
|Afghanistan| 2016|        1205425.05|
|Afghanistan| 2017|         508089.88|
|    Albania| 2010|        8531301.81|
|    Albania| 2011|        3265577.84|
|    Albania| 2012|        5984703.94|
|    Albania| 2013|         644070.88|
|    Albania| 2014|         235189.76|
|    Albania| 2015|        6377576.71|
|    Albania| 2016|         6017628.7|
|    Albania| 2017|        1168804.23|
|    Algeria| 2010|        1131473.33|
|    Algeria| 2011|         964772.08|
|    Algeria| 2012|        2906605.58|
|    Algeria| 2013|        2034334.18|
|    Algeria| 2014|        1834602.26|
+-----------+-----+------------------+
only showing top 20 rows



##### 3)Select yr, SUM(amt) from finaldata

In [141]:
yearly_sales = df1.groupBy(year("Order Date").alias("Year")).agg(sum("Total Revenue").alias("Sales")).orderBy("Year")
yearly_sales.show()

+----+--------------------+
|Year|               Sales|
+----+--------------------+
|2010| 8.177653681700003E8|
|2011|      8.3762222024E8|
|2012| 9.036474228000004E8|
|2013| 9.028807039600003E8|
|2014| 8.563977455599985E8|
|2015| 9.845037861999993E8|
|2016| 8.553197613500007E8|
|2017|4.7055220025999993E8|
+----+--------------------+



##### 6) monthly sales report of a perticular year

In [115]:
monthly_sales = df1.groupBy([year("Order Date").alias("Years"), month("Order Date").alias("Month")]).agg(sum("Total Cost").alias("Sales")).orderBy("Years","Month")
monthly_sales.show()

+-----+-----+--------------------+
|Years|Month|               Sales|
+-----+-----+--------------------+
| 2010|    1|5.6117500219999984E7|
| 2010|    2| 6.149703725999998E7|
| 2010|    3|3.2563351479999997E7|
| 2010|    4| 5.008999801000001E7|
| 2010|    5| 6.251231557999999E7|
| 2010|    6| 5.518854506000001E7|
| 2010|    7|2.8828290519999996E7|
| 2010|    8| 5.163789438999999E7|
| 2010|    9|        4.10263253E7|
| 2010|   10|       4.209123058E7|
| 2010|   11| 3.881841093000001E7|
| 2010|   12|       5.583074315E7|
| 2011|    1| 3.531044195000001E7|
| 2011|    2|4.8128704180000015E7|
| 2011|    3| 4.932200064999999E7|
| 2011|    4|3.0250918640000004E7|
| 2011|    5| 5.099740488000001E7|
| 2011|    6|       4.028547157E7|
| 2011|    7|6.0774376599999994E7|
| 2011|    8|5.4067997589999996E7|
+-----+-----+--------------------+
only showing top 20 rows



##### 7)quarterly sales report of a particular year

In [142]:
q = df1.groupBy([year("Order Date").alias("Year"), quarter("Order Date").alias("Quarter")]).agg(sum("Total Revenue")).orderBy("Year","Quarter")
q.show()

+----+-------+--------------------+
|Year|Quarter|  sum(Total Revenue)|
+----+-------+--------------------+
|2010|      1|2.1568648214000002E8|
|2010|      2| 2.328106550899999E8|
|2010|      3|1.7718539354999992E8|
|2010|      4| 1.920828373899999E8|
|2011|      1| 1.958348384799999E8|
|2011|      2|      1.7647134479E8|
|2011|      3|      2.3487859321E8|
|2011|      4|2.3043744375999996E8|
|2012|      1|2.5948719668999994E8|
|2012|      2|2.2292645934999993E8|
|2012|      3|2.1255167141999993E8|
|2012|      4|2.0868209533999994E8|
|2013|      1|2.3419277444999996E8|
|2013|      2|1.8661791348999992E8|
|2013|      3|2.4164184929999998E8|
|2013|      4|2.4042816672000012E8|
|2014|      1|2.3005481114000008E8|
|2014|      2|2.1748007145999995E8|
|2014|      3|2.1132196698999998E8|
|2014|      4|1.9754089597000015E8|
+----+-------+--------------------+
only showing top 20 rows



##### 9) half yearly sales

In [132]:
#made copy of existing data
half = df1.alias("half")

In [143]:
half = half.withColumn("Year",year("Order Date")).withColumn("Month",month("Order Date"))
half = half.withColumn("half year", expr("(Month - 1) div 6 + 1"))
half_yearly_sales = half.groupBy('Year', 'half year').agg(sum('Total Revenue').alias('Sales')).orderBy("Year","half year")
half_yearly_sales.show()

+----+---------+--------------------+
|Year|half year|               Sales|
+----+---------+--------------------+
|2010|        1| 4.484971372299999E8|
|2010|        2|3.6926823094000006E8|
|2011|        1|3.7230618326999974E8|
|2011|        2| 4.653160369699997E8|
|2012|        1|4.8241365604000014E8|
|2012|        2| 4.212337667599995E8|
|2013|        1|4.2081068793999994E8|
|2013|        2|4.8207001602000016E8|
|2014|        1| 4.475348826000001E8|
|2014|        2| 4.088628629600001E8|
|2015|        1| 5.000842648100001E8|
|2015|        2|      4.8441952139E8|
|2016|        1|3.9953615623000014E8|
|2016|        2| 4.557836051199997E8|
|2017|        1|      4.0358302055E8|
|2017|        2|6.6969179709999986E7|
+----+---------+--------------------+



##### 16.) in a specific quart which product made more bussiness

In [133]:
df1.printSchema()

root
 |-- Region: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Item Type: string (nullable = true)
 |-- Sales Channel: string (nullable = true)
 |-- Order Priority: string (nullable = true)
 |-- Order Date: date (nullable = true)
 |-- Order ID: integer (nullable = true)
 |-- Ship Date: date (nullable = true)
 |-- Units Sold: integer (nullable = true)
 |-- Unit Price: double (nullable = true)
 |-- Unit Cost: double (nullable = true)
 |-- Total Revenue: double (nullable = true)
 |-- Total Cost: double (nullable = true)
 |-- Total Profit: double (nullable = true)



In [144]:
p = df1.groupBy(["Item Type", quarter("Order Date").alias("Quarter")]).agg(sum("Total Revenue").alias("Sales")).orderBy("Item Type","Quarter")
p.show()

+---------+-------+--------------------+
|Item Type|Quarter|               Sales|
+---------+-------+--------------------+
|Baby Food|      1|1.4241075607999998E8|
|Baby Food|      2|      1.5099863056E8|
|Baby Food|      3| 1.619238487199999E8|
|Baby Food|      4|1.2540859752000003E8|
|Beverages|      1|        2.83470096E7|
|Beverages|      2|2.6607207900000002E7|
|Beverages|      3|2.7641570450000007E7|
|Beverages|      4|2.2181831099999998E7|
|   Cereal|      1|1.3220524129999992E8|
|   Cereal|      2| 8.704483479999998E7|
|   Cereal|      3| 8.622985139999999E7|
|   Cereal|      4| 9.555299819999999E7|
|  Clothes|      1|4.9302218400000006E7|
|  Clothes|      2| 6.015197392000001E7|
|  Clothes|      3|        6.03859424E7|
|  Clothes|      4| 4.691073487999997E7|
|Cosmetics|      1| 2.622369320000001E8|
|Cosmetics|      2| 2.484646947999999E8|
|Cosmetics|      3| 2.182480540000001E8|
|Cosmetics|      4|2.0749293399999997E8|
+---------+-------+--------------------+
only showing top