# Groupby and aggregate functions

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import round

In [2]:
spark = SparkSession.builder.appName('groupby_aggregate_function').getOrCreate()
spark

We are going to consider the dataset in https://www.kaggle.com/datasets/varungitboi/employee-salary-dataset/data

In [3]:
df_pyspark = spark.read.csv('data/employee_data.csv', header = True , inferSchema=True)
df_pyspark = df_pyspark.drop('_c0')
df_pyspark.show()

+---+------+---+--------------+----------------+------+
| id|groups|age|healthy_eating|active_lifestyle|salary|
+---+------+---+--------------+----------------+------+
|  0|     A| 36|             5|               5|  2297|
|  1|     A| 55|             3|               5|  1134|
|  2|     A| 61|             8|               1|  4969|
|  3|     O| 29|             3|               6|   902|
|  4|     O| 34|             6|               2|  3574|
|  5|     O| 42|             5|               3|  2761|
|  6|    AB| 53|             4|               6|  1484|
|  7|     B| 41|             8|               6|  3809|
|  8|     A| 47|             5|               6|  2065|
|  9|     A| 31|             4|               8|  1020|
| 10|     A| 47|             6|               9|  1950|
| 11|     O| 40|             7|               1|  4387|
| 12|     O| 41|             3|               2|  1830|
| 13|     O| 46|             6|               8|  2182|
| 14|    AB| 51|             7|               5|

In [4]:
df_pyspark.printSchema()

root
 |-- id: integer (nullable = true)
 |-- groups: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- healthy_eating: integer (nullable = true)
 |-- active_lifestyle: integer (nullable = true)
 |-- salary: integer (nullable = true)



**Groupby and agregate functions works together**. In that way, the `groupBy()` function is used to group the DataFrame rows based on one or more columns, and aggregate functions are used to perform calculations on those grouped data. 
Aggregate functions allow us to compute summary statistics or derive new columns based on the grouped data.

We want to know in which group there is the maximum concentration of the salary.

In [5]:
# Groupby + aggregate function
df_pyspark.groupBy('groups').sum('salary').show()

+------+-----------+
|groups|sum(salary)|
+------+-----------+
|     B|     279097|
|     O|     849570|
|     A|     816083|
|    AB|     282711|
+------+-----------+



Now how many people is in each group:

In [6]:
df_pyspark.groupBy('groups').count().show()

+------+-----+
|groups|count|
+------+-----+
|     B|  125|
|     O|  375|
|     A|  375|
|    AB|  125|
+------+-----+



According to the age, in which age we obtain the maximun salary mean.

In [7]:
df_avg_salary = df_pyspark.groupBy('age').mean('salary')
df_avg_salary.show()

+---+------------------+
|age|       avg(salary)|
+---+------------------+
| 31|1892.6521739130435|
| 53| 2082.777777777778|
| 34|2025.3333333333333|
| 28| 2399.153846153846|
| 26|2479.2631578947367|
| 27|2041.2666666666667|
| 44|           2125.24|
| 22|            2453.1|
| 47|            1727.3|
| 52| 2288.823529411765|
| 40|            2477.5|
| 20|2158.9411764705883|
| 57|2068.5714285714284|
| 54|2368.3076923076924|
| 48|            2244.5|
| 19| 2497.590909090909|
| 64|          2093.625|
| 41|           2412.28|
| 43|2006.2272727272727|
| 37|2494.0434782608695|
+---+------------------+
only showing top 20 rows



In [8]:
df_avg_salary.printSchema()

root
 |-- age: integer (nullable = true)
 |-- avg(salary): double (nullable = true)



Using the aggregate function, we can compute the total salary:

In [9]:
df_pyspark.agg({'salary' : 'sum'}).show()

+-----------+
|sum(salary)|
+-----------+
|    2227461|
+-----------+



Considering the columns `active_lifestyle` we want to know which is the maximum `salary`.

In [10]:
df_pyspark.groupBy('active_lifestyle').max('salary').show()

+----------------+-----------+
|active_lifestyle|max(salary)|
+----------------+-----------+
|               1|       5550|
|               6|       4972|
|               3|       5086|
|               5|       5204|
|               9|       4276|
|               4|       5435|
|               8|       4508|
|               7|       4158|
|              10|       2881|
|               2|       5318|
|               0|       4038|
+----------------+-----------+



Now, let us consider the minimum `salary`.

In [11]:
df_pyspark.groupBy('active_lifestyle').min('salary').show()

+----------------+-----------+
|active_lifestyle|min(salary)|
+----------------+-----------+
|               1|       1481|
|               6|        779|
|               3|        665|
|               5|        553|
|               9|        788|
|               4|        785|
|               8|        662|
|               7|        670|
|              10|        556|
|               2|        667|
|               0|       2294|
+----------------+-----------+



According to the `active_lifestyle` we want to know which is the average of salary, so

In [12]:
df_avg_sal_by_lifestyle = df_pyspark.groupBy('active_lifestyle').avg('salary')
df_avg_sal_by_lifestyle.show()

+----------------+------------------+
|active_lifestyle|       avg(salary)|
+----------------+------------------+
|               1|3291.6923076923076|
|               6|2219.8732394366198|
|               3|            2753.0|
|               5|2286.2916666666665|
|               9|        1684.53125|
|               4|2373.1923076923076|
|               8|1894.8333333333333|
|               7|1951.4110429447853|
|              10|1599.1333333333334|
|               2|2770.3823529411766|
|               0| 3539.714285714286|
+----------------+------------------+



In [13]:
df_avg_sal_by_lifestyle.dtypes

[('active_lifestyle', 'int'), ('avg(salary)', 'double')]

If we want to see the `avg(salary)` rounded, we use

In [14]:
df_rounded_avg_sal_by_lifestyle = df_avg_sal_by_lifestyle.withColumn("rounded_avg(salary)", round(df_avg_sal_by_lifestyle["avg(salary)"], 2))
df_rounded_avg_sal_by_lifestyle.show()

+----------------+------------------+-------------------+
|active_lifestyle|       avg(salary)|rounded_avg(salary)|
+----------------+------------------+-------------------+
|               1|3291.6923076923076|            3291.69|
|               6|2219.8732394366198|            2219.87|
|               3|            2753.0|             2753.0|
|               5|2286.2916666666665|            2286.29|
|               9|        1684.53125|            1684.53|
|               4|2373.1923076923076|            2373.19|
|               8|1894.8333333333333|            1894.83|
|               7|1951.4110429447853|            1951.41|
|              10|1599.1333333333334|            1599.13|
|               2|2770.3823529411766|            2770.38|
|               0| 3539.714285714286|            3539.71|
+----------------+------------------+-------------------+



Now, we are going to sort by the column `active_lifestyle`

In [15]:
sorted_df_rounded_avg_sal_by_lifestyle = df_rounded_avg_sal_by_lifestyle.orderBy("active_lifestyle")
sorted_df_rounded_avg_sal_by_lifestyle.show()

+----------------+------------------+-------------------+
|active_lifestyle|       avg(salary)|rounded_avg(salary)|
+----------------+------------------+-------------------+
|               0| 3539.714285714286|            3539.71|
|               1|3291.6923076923076|            3291.69|
|               2|2770.3823529411766|            2770.38|
|               3|            2753.0|             2753.0|
|               4|2373.1923076923076|            2373.19|
|               5|2286.2916666666665|            2286.29|
|               6|2219.8732394366198|            2219.87|
|               7|1951.4110429447853|            1951.41|
|               8|1894.8333333333333|            1894.83|
|               9|        1684.53125|            1684.53|
|              10|1599.1333333333334|            1599.13|
+----------------+------------------+-------------------+

