In [None]:
# setting up pyspark environment
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

!pip install pyspark
!pip install -U -q PyDrive

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

import pyspark
import pyspark.sql  as pyspark_sql
import pyspark.sql.types as pyspark_types
import pyspark.sql.functions  as F
from pyspark import SparkContext, SparkConf

conf = SparkConf().set('spark ui port','4050')
sc = pyspark.SparkContext(conf=conf)
from pyspark.sql import SparkSession


# Create a SparkSession
spark = SparkSession.builder \
    .appName("Read CSV") \
    .getOrCreate()

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=68ef4caeefd971c776833b5d1055e03d7c3e9459728e763df42221b189091bdb
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [None]:
spark = SparkSession.builder \
    .appName("DecimalFormatExample") \
    .config("spark.sql.decimalOperations.allowPrecision", "true") \
    .config("spark.sql.decimalOperations.allowScale", "true") \
    .getOrCreate()

In [None]:
# Read csv file
df = spark.read.csv("/content/drive/MyDrive/salerecord.csv" ,header = True,inferSchema= True)
df.show()

+--------------------+------------+---------------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
|              Region|     Country|      Item Type|Sales Channel|Order Priority|Order Date| Order ID| Ship Date|Units Sold|Unit Price|Unit Cost|Total Revenue|Total Cost|Total Profit|
+--------------------+------------+---------------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
|Australia and Oce...|       Palau|Office Supplies|       Online|             H|  3/6/2016|517073523| 3/26/2016|      2401|    651.21|   524.96|   1563555.21|1260428.96|   303126.25|
|              Europe|      Poland|      Beverages|       Online|             L| 4/18/2010|380507028| 5/26/2010|      9340|     47.45|    31.79|     443183.0|  296918.6|    146264.4|
|       North America|      Canada|         Cereal|       Online|             M|  1/8

In [None]:
df.printSchema()

root
 |-- Region: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Item Type: string (nullable = true)
 |-- Sales Channel: string (nullable = true)
 |-- Order Priority: string (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Order ID: integer (nullable = true)
 |-- Ship Date: string (nullable = true)
 |-- Units Sold: integer (nullable = true)
 |-- Unit Price: double (nullable = true)
 |-- Unit Cost: double (nullable = true)
 |-- Total Revenue: double (nullable = true)
 |-- Total Cost: double (nullable = true)
 |-- Total Profit: double (nullable = true)



In [None]:
# change the date format
from pyspark.sql.functions import to_date,col

sales = df.withColumn("Ship Date", to_date(col("Ship Date"), "M/d/yyyy")).withColumn("Order Date", to_date(col("Order Date"), "M/d/yyyy"))

In [None]:
sales.show()

+--------------------+------------+---------------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
|              Region|     Country|      Item Type|Sales Channel|Order Priority|Order Date| Order ID| Ship Date|Units Sold|Unit Price|Unit Cost|Total Revenue|Total Cost|Total Profit|
+--------------------+------------+---------------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
|Australia and Oce...|       Palau|Office Supplies|       Online|             H|2016-03-06|517073523|2016-03-26|      2401|    651.21|   524.96|   1563555.21|1260428.96|   303126.25|
|              Europe|      Poland|      Beverages|       Online|             L|2010-04-18|380507028|2010-05-26|      9340|     47.45|    31.79|     443183.0|  296918.6|    146264.4|
|       North America|      Canada|         Cereal|       Online|             M|2015-

In [None]:
# using pivot
sales.groupby("Region").pivot("Item type").agg(F.sum("Total Revenue")).show()

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|              Region|           Baby Food|           Beverages|              Cereal|             Clothes|           Cosmetics|              Fruits|           Household|                Meat|     Office Supplies|       Personal Care|              Snacks|          Vegetables|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|Middle East and N...|6.589853417871995...|1.229548072805000...|5.314294756979993E10|2.838726869503998...|1.132455730452004...|     2.41171967715E9|1.731096050655099...|1.0930

In [None]:
# to see highest which region is selling highest units
salesbyregion = sales.groupBy("Region").agg(F.sum("Units Sold").alias("Total Units Sold"))
max_val = salesbyregion.select(F.max('Total Units Sold')).collect()[0][0]
salesbyregion.filter(F.col('Total Units Sold') == max_val).show()

+------------------+----------------+
|            Region|Total Units Sold|
+------------------+----------------+
|Sub-Saharan Africa|      6486855992|
+------------------+----------------+



In [None]:
# highest cost of country
costbycountry = sales.groupby("Country").agg(F.sum("Total Cost").alias("Total Cost")) # group country according to totalcost
max_val = costbycountry.select(F.max('Total cost')).collect()[0][0]
costbycountry.filter(F.col('Total cost') == max_val).show()


+-------+--------------------+
|Country|          Total Cost|
+-------+--------------------+
| Rwanda|2.563928572492000...|
+-------+--------------------+



In [None]:
sales.groupBy("Region").agg(F.sum("Units Sold").alias("Total Units Sold")).filter(
    F.col('Total Units Sold') == salesbyregion.select(F.max('Total Units Sold')).collect()[0][0]
).show()



+------------------+----------------+
|            Region|Total Units Sold|
+------------------+----------------+
|Sub-Saharan Africa|      6486855992|
+------------------+----------------+



In [None]:
salesbyregion.select(F.max('Total Units Sold')).collect()[0][0]

6486855992

In [None]:
import pyspark.sql.functions as F
sales2018 = sales.filter(F.year("Order Date") == "2018")

In [None]:
# filter data of 2020
sales2020 = sales.filter(F.year("Order Date")== "2020")

In [None]:
import pyspark.sql.functions as F

INBOTH = sales2018.alias('df1').join(sales2020.alias('df2'), F.col("df1.Item Type") == F.col("df2.Item Type"), "inner").select('df1.*')
INBOTH.show()


+------+-------+---------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
|Region|Country|Item Type|Sales Channel|Order Priority|Order Date| Order ID| Ship Date|Units Sold|Unit Price|Unit Cost|Total Revenue|Total Cost|Total Profit|
+------+-------+---------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
|Europe| Monaco|Baby Food|      Offline|             H|2018-10-29|839051043|2018-10-30|      2704|    255.28|   159.42|    690277.12| 431071.68|   259205.44|
|Europe| Monaco|Baby Food|      Offline|             H|2018-10-29|839051043|2018-10-30|      2704|    255.28|   159.42|    690277.12| 431071.68|   259205.44|
|Europe| Monaco|Baby Food|      Offline|             H|2018-10-29|839051043|2018-10-30|      2704|    255.28|   159.42|    690277.12| 431071.68|   259205.44|
|Europe| Monaco|Baby Food|      Offline|            

In [None]:

sales.groupBy("Region", "Item Type").agg(F.sum(F.col("Total Profit")).alias("Total Profit")).orderBy(F.asc(F.col("Region")), F.desc("Total Profit")).show()


+--------------------+---------------+--------------------+
|              Region|      Item Type|        Total Profit|
+--------------------+---------------+--------------------+
|                Asia|      Cosmetics|5.290387261525008...|
|                Asia|      Household|5.040832476651991E10|
|                Asia|Office Supplies|   3.845193257875E10|
|                Asia|      Baby Food|2.916394227226006E10|
|                Asia|         Cereal|2.693676558461999...|
|                Asia|        Clothes|2.234550654720004...|
|                Asia|     Vegetables|1.920983982096000...|
|                Asia|           Meat|    1.73770278968E10|
|                Asia|         Snacks|1.678817727203999...|
|                Asia|  Personal Care| 7.608278249040002E9|
|                Asia|      Beverages| 4.758665133059997E9|
|                Asia|         Fruits| 7.344768080099986E8|
|Australia and Oce...|      Cosmetics|2.941896830715001E10|
|Australia and Oce...|      Household|2.

In [None]:
import pyspark.sql.functions as F
# import pandas as pd
# sales.groupby("Country","Region","Item Type").agg(
#     F.sum("Total Profit").alias("Total Profit")
# ).orderBy("Country","Region",F.desc("Total Profit")).toPandas().head(50)



sales.groupby("Country","Region","Item Type").agg(
    F.sum("Total Revenue").alias("Total Revenue")
).orderBy("Country","Region",F.desc("Total Revenue")).show()

+-----------+--------------------+---------------+--------------------+
|    Country|              Region|      Item Type|       Total Revenue|
+-----------+--------------------+---------------+--------------------+
|Afghanistan|Middle East and N...|      Household|     7.48595318903E9|
|Afghanistan|Middle East and N...|Office Supplies| 7.329326221350002E9|
|Afghanistan|Middle East and N...|      Cosmetics|4.8975655524000025E9|
|Afghanistan|Middle East and N...|           Meat| 4.819488259740001E9|
|Afghanistan|Middle East and N...|      Baby Food|2.8656716785599995E9|
|Afghanistan|Middle East and N...|         Cereal|      2.3137662592E9|
|Afghanistan|Middle East and N...|     Vegetables|1.7521981747399995E9|
|Afghanistan|Middle East and N...|         Snacks|     1.73436450102E9|
|Afghanistan|Middle East and N...|        Clothes|1.2510146004799998E9|
|Afghanistan|Middle East and N...|  Personal Care| 9.350629589399999E8|
|Afghanistan|Middle East and N...|      Beverages|       5.41049

In [None]:
from pyspark.sql.window import Window
import pyspark.sql.functions as F

window_spec = Window.partitionBy("Country", "Region", "Item Type").orderBy(F.desc("Total Revenue"))
result = sales.withColumn("Total Revenue", F.sum("Total Revenue").over(window_spec))
result = result.withColumn("row_number", F.row_number().over(Window.partitionBy("Country", "Region").orderBy(F.desc("Total Revenue")))) \
               .filter(F.col("row_number") == 1) \
               .drop("row_number")

# Display only the selected columns: "Country", "Region", "Item Type", and "Total Revenue"
result.select("Country", "Region", "Item Type", "Total Revenue") \
      .orderBy("Country", "Region", F.desc("Total Revenue")) \
      .show()


+--------------------+--------------------+---------------+-------------------+
|             Country|              Region|      Item Type|      Total Revenue|
+--------------------+--------------------+---------------+-------------------+
|         Afghanistan|Middle East and N...|      Household|    7.48595318903E9|
|             Albania|              Europe|Office Supplies|7.458215658179998E9|
|             Algeria|Middle East and N...|      Household|7.509104734910005E9|
|             Andorra|              Europe|      Household|7.485526164499994E9|
|              Angola|  Sub-Saharan Africa|      Household|     7.5525469628E9|
|Antigua and Barbuda |Central America a...|      Household|7.551797832130007E9|
|             Armenia|              Europe|      Household|7.575654402859997E9|
|           Australia|Australia and Oce...|      Household|7.504088700290007E9|
|             Austria|              Europe|      Household|7.462722119020013E9|
|          Azerbaijan|Middle East and N.

In [None]:
del result

In [None]:
sales.printSchema()

root
 |-- Region: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Item Type: string (nullable = true)
 |-- Sales Channel: string (nullable = true)
 |-- Order Priority: string (nullable = true)
 |-- Order Date: date (nullable = true)
 |-- Order ID: integer (nullable = true)
 |-- Ship Date: date (nullable = true)
 |-- Units Sold: integer (nullable = true)
 |-- Unit Price: double (nullable = true)
 |-- Unit Cost: double (nullable = true)
 |-- Total Revenue: double (nullable = true)
 |-- Total Cost: double (nullable = true)
 |-- Total Profit: double (nullable = true)



In [None]:
#selected columns as per requirements
columns = ["Region","Item Type","Total Profit"]
filterdata = sales.select(*columns)


In [None]:
filterdata.show()
filterdata.printSchema()

+--------------------+---------------+------------+
|              Region|      Item Type|Total Profit|
+--------------------+---------------+------------+
|Australia and Oce...|Office Supplies|   303126.25|
|              Europe|      Beverages|    146264.4|
|       North America|         Cereal|     9124.77|
|              Europe|         Snacks|    77967.96|
|Middle East and N...|         Cereal|   622521.93|
|  Sub-Saharan Africa|Office Supplies|   344536.25|
|              Europe|  Personal Care|    33505.22|
|Middle East and N...|      Cosmetics|  1338625.13|
|  Sub-Saharan Africa|        Clothes|   271434.24|
|       North America|  Personal Care|    81169.34|
|              Europe|      Cosmetics|   1264034.9|
|  Sub-Saharan Africa|        Clothes|   643554.72|
|              Europe|  Personal Care|   193513.32|
|              Europe|  Personal Care|   154971.04|
|                Asia|         Cereal|   284905.44|
|  Sub-Saharan Africa|         Snacks|   546823.38|
|           

In [None]:
# result of first requriments
import pyspark.sql.functions as F

filterdata.groupBy("Region", "Item Type").agg(
    F.sum("Total Profit").alias("Total Profit")
  ).orderBy(
      "Region",F.desc("Total Profit")
  ).show()

+--------------------+---------------+--------------------+
|              Region|      Item Type|        Total Profit|
+--------------------+---------------+--------------------+
|                Asia|      Cosmetics|5.290387261525008...|
|                Asia|      Household|5.040832476651991E10|
|                Asia|Office Supplies|   3.845193257875E10|
|                Asia|      Baby Food|2.916394227226006E10|
|                Asia|         Cereal|2.693676558461999...|
|                Asia|        Clothes|2.234550654720004...|
|                Asia|     Vegetables|1.920983982096000...|
|                Asia|           Meat|    1.73770278968E10|
|                Asia|         Snacks|1.678817727203999...|
|                Asia|  Personal Care| 7.608278249040002E9|
|                Asia|      Beverages| 4.758665133059997E9|
|                Asia|         Fruits| 7.344768080099986E8|
|Australia and Oce...|      Cosmetics|2.941896830715001E10|
|Australia and Oce...|      Household|2.

In [None]:

result1 = filterdata.groupBy("Region", "Item Type") \
           .agg(sum("Total Profit").alias("Total Profit")) \
           .orderBy("Region", desc("Total Profit"))
result1.show()

+--------------------+---------------+--------------------+
|              Region|      Item Type|        Total Profit|
+--------------------+---------------+--------------------+
|                Asia|      Cosmetics|5.290387261525008...|
|                Asia|      Household|5.040832476651991E10|
|                Asia|Office Supplies|   3.845193257875E10|
|                Asia|      Baby Food|2.916394227226006E10|
|                Asia|         Cereal|2.693676558461999...|
|                Asia|        Clothes|2.234550654720004...|
|                Asia|     Vegetables|1.920983982096000...|
|                Asia|           Meat|    1.73770278968E10|
|                Asia|         Snacks|1.678817727203999...|
|                Asia|  Personal Care| 7.608278249040002E9|
|                Asia|      Beverages| 4.758665133059997E9|
|                Asia|         Fruits| 7.344768080099986E8|
|Australia and Oce...|      Cosmetics|2.941896830715001E10|
|Australia and Oce...|      Household|2.

In [None]:
from pyspark.sql.types import DecimalType
decimal_type = DecimalType(20, 2)

result1.withColumn("Total Profit" , col("Total Profit").cast(decimal_type)).show()

+--------------------+---------------+--------------+
|              Region|      Item Type|  Total Profit|
+--------------------+---------------+--------------+
|                Asia|      Cosmetics|52903872615.25|
|                Asia|      Household|50408324766.52|
|                Asia|Office Supplies|38451932578.75|
|                Asia|      Baby Food|29163942272.26|
|                Asia|         Cereal|26936765584.62|
|                Asia|        Clothes|22345506547.20|
|                Asia|     Vegetables|19209839820.96|
|                Asia|           Meat|17377027896.80|
|                Asia|         Snacks|16788177272.04|
|                Asia|  Personal Care| 7608278249.04|
|                Asia|      Beverages| 4758665133.06|
|                Asia|         Fruits|  734476808.01|
|Australia and Oce...|      Cosmetics|29418968307.15|
|Australia and Oce...|      Household|28022646645.12|
|Australia and Oce...|Office Supplies|21366695187.50|
|Australia and Oce...|      

In [None]:
# filterdata.groupBy("Region","Item Type").agg(F.sum("Total Profit").alias("Total Profit")).orderBy("Region","Item Type").show()
saleregion = filterdata.groupBy("Region","Item Type").agg(F.sum("Total Profit").alias("Total Profit")).orderBy("Region","Item Type")
saleregion.show(100)


+--------------------+---------------+--------------------+
|              Region|      Item Type|        Total Profit|
+--------------------+---------------+--------------------+
|                Asia|      Baby Food|2.916394227226006E10|
|                Asia|      Beverages| 4.758665133059997E9|
|                Asia|         Cereal|2.693676558461999...|
|                Asia|        Clothes|2.234550654720004...|
|                Asia|      Cosmetics|5.290387261525008...|
|                Asia|         Fruits| 7.344768080099986E8|
|                Asia|      Household|5.040832476651991E10|
|                Asia|           Meat|    1.73770278968E10|
|                Asia|Office Supplies|   3.845193257875E10|
|                Asia|  Personal Care| 7.608278249040002E9|
|                Asia|         Snacks|1.678817727203999...|
|                Asia|     Vegetables|1.920983982096000...|
|Australia and Oce...|      Baby Food|1.617843253903998...|
|Australia and Oce...|      Beverages| 2

In [None]:
from pyspark.sql.types import DecimalType
decimal_type = DecimalType(20, 2)

saleregion.withColumn("Total Profit", col("Total Profit").cast(decimal_type)).show()


+--------------------+---------------+--------------+
|              Region|      Item Type|  Total Profit|
+--------------------+---------------+--------------+
|                Asia|      Baby Food|29163942272.26|
|                Asia|      Beverages| 4758665133.06|
|                Asia|         Cereal|26936765584.62|
|                Asia|        Clothes|22345506547.20|
|                Asia|      Cosmetics|52903872615.25|
|                Asia|         Fruits|  734476808.01|
|                Asia|      Household|50408324766.52|
|                Asia|           Meat|17377027896.80|
|                Asia|Office Supplies|38451932578.75|
|                Asia|  Personal Care| 7608278249.04|
|                Asia|         Snacks|16788177272.04|
|                Asia|     Vegetables|19209839820.96|
|Australia and Oce...|      Baby Food|16178432539.04|
|Australia and Oce...|      Beverages| 2642391478.08|
|Australia and Oce...|         Cereal|14964340375.08|
|Australia and Oce...|      