In [2]:
import findspark
findspark.init()
   
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql.types import StructType, StructField, DateType, StringType, IntegerType
from pyspark.sql.functions import expr
 
conf = SparkConf().setMaster("local[3]").setAppName("SparkDataFrameGroupBy")
spark = SparkSession.builder.config(conf=conf).getOrCreate()
spark.sparkContext.setLogLevel("WARN")
print(spark)

<pyspark.sql.session.SparkSession object at 0x7fa918e781c0>


In [60]:
import pandas as pd
import numpy as np

countries = ["USA","Mexico","Brazil","Canada"]
cars = ["BMW X5","BMW X7","Ford Explorer","Ford Expedition","Jeep Wrangler","Jeep Cherokee"]
weeks = []
for i in range(1,5):
    weeks.append(f"Week_{i}")
    
num_records = 96
df = pd.DataFrame({"country":np.random.choice(countries,num_records),
                   "car":np.random.choice(cars,num_records),
                   "week":np.random.choice(weeks,num_records),
                   "units_sales":np.random.randint(20,size = num_records),
                   "used_new":np.random.choice(["Used","New"],num_records),
                   "price_per_unit":np.random.randint(low = 20000, high = 55000,size = num_records)
                  })
print(df.head(10))
print(df.shape)

  country              car    week  units_sales used_new  price_per_unit
0  Canada    Jeep Cherokee  Week_2            5      New           37177
1     USA           BMW X7  Week_3            1      New           27322
2  Canada    Ford Explorer  Week_1            1     Used           35826
3     USA    Jeep Cherokee  Week_4           12      New           24610
4  Canada    Ford Explorer  Week_1            0      New           35688
5  Mexico  Ford Expedition  Week_1           17      New           51116
6     USA    Jeep Cherokee  Week_4            6      New           44375
7  Canada           BMW X7  Week_2            5     Used           23811
8     USA    Ford Explorer  Week_1            5     Used           27027
9  Canada           BMW X7  Week_4           15     Used           51215
(96, 6)


In [61]:
sparkDF=spark.createDataFrame(df) 
sparkDF.printSchema()
sparkDF.show(10)

root
 |-- country: string (nullable = true)
 |-- car: string (nullable = true)
 |-- week: string (nullable = true)
 |-- units_sales: long (nullable = true)
 |-- used_new: string (nullable = true)
 |-- price_per_unit: long (nullable = true)

+-------+---------------+------+-----------+--------+--------------+
|country|            car|  week|units_sales|used_new|price_per_unit|
+-------+---------------+------+-----------+--------+--------------+
| Canada|  Jeep Cherokee|Week_2|          5|     New|         37177|
|    USA|         BMW X7|Week_3|          1|     New|         27322|
| Canada|  Ford Explorer|Week_1|          1|    Used|         35826|
|    USA|  Jeep Cherokee|Week_4|         12|     New|         24610|
| Canada|  Ford Explorer|Week_1|          0|     New|         35688|
| Mexico|Ford Expedition|Week_1|         17|     New|         51116|
|    USA|  Jeep Cherokee|Week_4|          6|     New|         44375|
| Canada|         BMW X7|Week_2|          5|    Used|         23811|


In [62]:
# Lets add a new Revenue Column
df1 = sparkDF.withColumn('revenue', (sparkDF.units_sales * sparkDF.price_per_unit))
df1.show(10)

+-------+---------------+------+-----------+--------+--------------+-------+
|country|            car|  week|units_sales|used_new|price_per_unit|revenue|
+-------+---------------+------+-----------+--------+--------------+-------+
| Canada|  Jeep Cherokee|Week_2|          5|     New|         37177| 185885|
|    USA|         BMW X7|Week_3|          1|     New|         27322|  27322|
| Canada|  Ford Explorer|Week_1|          1|    Used|         35826|  35826|
|    USA|  Jeep Cherokee|Week_4|         12|     New|         24610| 295320|
| Canada|  Ford Explorer|Week_1|          0|     New|         35688|      0|
| Mexico|Ford Expedition|Week_1|         17|     New|         51116| 868972|
|    USA|  Jeep Cherokee|Week_4|          6|     New|         44375| 266250|
| Canada|         BMW X7|Week_2|          5|    Used|         23811| 119055|
|    USA|  Ford Explorer|Week_1|          5|    Used|         27027| 135135|
| Canada|         BMW X7|Week_4|         15|    Used|         51215| 768225|

In [71]:
revenueByCountry = df1.groupBy("country").sum("revenue")
revenueByCountry.show()

+-------+------------+
|country|sum(revenue)|
+-------+------------+
|    USA|     6249630|
| Mexico|     9239064|
| Canada|    11064790|
| Brazil|     7088142|
+-------+------------+



In [70]:
revenueByCountry.sort("sum(revenue)").show()

+-------+------------+
|country|sum(revenue)|
+-------+------------+
|    USA|     6249630|
| Brazil|     7088142|
| Mexico|     9239064|
| Canada|    11064790|
+-------+------------+



In [72]:
revenueByCountry.sort("sum(revenue)",ascending=False).show()

+-------+------------+
|country|sum(revenue)|
+-------+------------+
| Canada|    11064790|
| Mexico|     9239064|
| Brazil|     7088142|
|    USA|     6249630|
+-------+------------+



In [64]:
revenueByCountryCar = df1.groupBy("country","car").sum("revenue").orderBy("country","car")
revenueByCountryCar.show()

+-------+---------------+------------+
|country|            car|sum(revenue)|
+-------+---------------+------------+
| Brazil|         BMW X5|      231160|
| Brazil|         BMW X7|      540211|
| Brazil|Ford Expedition|      714889|
| Brazil|  Ford Explorer|     2654641|
| Brazil|  Jeep Cherokee|      157801|
| Brazil|  Jeep Wrangler|     2789440|
| Canada|         BMW X5|     1241302|
| Canada|         BMW X7|     3322440|
| Canada|Ford Expedition|     2025161|
| Canada|  Ford Explorer|     1321340|
| Canada|  Jeep Cherokee|     2411229|
| Canada|  Jeep Wrangler|      743318|
| Mexico|         BMW X5|     1135589|
| Mexico|         BMW X7|     1816172|
| Mexico|Ford Expedition|     2205278|
| Mexico|  Ford Explorer|      686194|
| Mexico|  Jeep Cherokee|     1632978|
| Mexico|  Jeep Wrangler|     1762853|
|    USA|         BMW X5|      643776|
|    USA|         BMW X7|      838552|
+-------+---------------+------------+
only showing top 20 rows



In [65]:
countByCountry = df1.groupBy("country").count()
countByCountry.show()

+-------+-----+
|country|count|
+-------+-----+
|    USA|   21|
| Mexico|   26|
| Canada|   28|
| Brazil|   21|
+-------+-----+



In [66]:

from pyspark.sql.functions import *
salesAggregate = df1.groupBy("country").agg(
    sum("units_sales").alias("tot_unit"),
    min("units_sales").alias("min_unit_sl"),
    max("units_sales").alias("max_unit_sl"),
    avg("units_sales").alias("avg_unit_sl"),
    avg("price_per_unit").alias("avg_price"),
    sum("revenue").alias("total_rev")
    
    
)
salesAggregate.show()

+-------+--------+-----------+-----------+-----------------+-----------------+---------+
|country|tot_unit|min_unit_sl|max_unit_sl|      avg_unit_sl|        avg_price|total_rev|
+-------+--------+-----------+-----------+-----------------+-----------------+---------+
|    USA|     171|          0|         17|8.142857142857142|36700.71428571428|  6249630|
| Mexico|     255|          0|         19|9.807692307692308|          33909.0|  9239064|
| Canada|     272|          0|         19|9.714285714285714|39565.96428571428| 11064790|
| Brazil|     185|          0|         19| 8.80952380952381|39308.80952380953|  7088142|
+-------+--------+-----------+-----------+-----------------+-----------------+---------+



In [95]:
salesdf1Agg = df1.groupBy("country","car").agg(
    {"revenue":"sum",
     "units_sales":"sum"}
)
salesdf1Agg.show()

+-------+---------------+------------+----------------+
|country|            car|sum(revenue)|sum(units_sales)|
+-------+---------------+------------+----------------+
|    USA|  Jeep Wrangler|      881345|              29|
| Mexico|  Ford Explorer|      686194|              16|
| Mexico|  Jeep Cherokee|     1632978|              47|
| Canada|  Ford Explorer|     1321340|              26|
|    USA|  Jeep Cherokee|      981436|              35|
|    USA|  Ford Explorer|     1870958|              51|
| Canada|Ford Expedition|     2025161|              43|
|    USA|         BMW X7|      838552|              16|
| Canada|         BMW X7|     3322440|              72|
| Brazil|         BMW X5|      231160|               6|
| Mexico|         BMW X5|     1135589|              38|
| Mexico|  Jeep Wrangler|     1762853|              53|
| Brazil|         BMW X7|      540211|              14|
| Canada|  Jeep Cherokee|     2411229|              62|
| Mexico|Ford Expedition|     2205278|          

In [92]:
salesdf2Agg = df1.groupBy("country","car").agg(
    min('price_per_unit'),
    max('price_per_unit'),
    avg('price_per_unit')
).orderBy("country","car")
salesdf2Agg.show()

+-------+---------------+-------------------+-------------------+-------------------+
|country|            car|min(price_per_unit)|max(price_per_unit)|avg(price_per_unit)|
+-------+---------------+-------------------+-------------------+-------------------+
| Brazil|         BMW X5|              25724|              44928|            35326.0|
| Brazil|         BMW X7|              37562|              39169| 38450.333333333336|
| Brazil|Ford Expedition|              34072|              53829|            45813.5|
| Brazil|  Ford Explorer|              27342|              54949| 40275.166666666664|
| Brazil|  Jeep Cherokee|              22543|              22543|            22543.0|
| Brazil|  Jeep Wrangler|              21461|              51734|            38406.8|
| Canada|         BMW X5|              21940|              44953|           32668.75|
| Canada|         BMW X7|              23811|              53185|          43570.375|
| Canada|Ford Expedition|              43727|         

In [87]:
saleByCountryWeek = df1.groupBy("country","week").sum("units_sales","revenue").orderBy("country","week")
saleByCountryWeek.show()

+-------+------+----------------+------------+
|country|  week|sum(units_sales)|sum(revenue)|
+-------+------+----------------+------------+
| Brazil|Week_1|              59|     2628165|
| Brazil|Week_2|              57|     2292588|
| Brazil|Week_3|              20|      588736|
| Brazil|Week_4|              49|     1578653|
| Canada|Week_1|              60|     2616092|
| Canada|Week_2|              95|     4494235|
| Canada|Week_3|              26|      766523|
| Canada|Week_4|              91|     3187940|
| Mexico|Week_1|              53|     2017848|
| Mexico|Week_2|              34|      901941|
| Mexico|Week_3|              83|     2993657|
| Mexico|Week_4|              85|     3325618|
|    USA|Week_1|               5|      135135|
|    USA|Week_2|              22|      794121|
|    USA|Week_3|              82|     3466109|
|    USA|Week_4|              62|     1854265|
+-------+------+----------------+------------+

