## Aggregate Functions

Let us see how to perform aggregations within each group while projecting the raw data that is used to perform the aggregation.

 * We have functions such as `sum`, `avg`, `min`, `max` etc which can be used to aggregate the data.
 * We need to create `WindowSpec` object using `partitionBy` to get aggregations within each group.
 * Typically we don’t need to sort the data to perform aggregations, however if we want to perform cumulative aggregations using rowsBetween, then we have to sort the data using cumulative criteria.
 * Let us try to get total departure delay, minimum departure delay, maximum departure delay and average departure delay for each day for each airport. We will ignore all those flights which are departured early or ontime.

Let us start spark context for this Notebook so that we can execute the code provided. You can sign up for our [10 node state of the art cluster/labs](https://labs.itversity.com/plans) to learn Spark SQL using our unique integrated LMS.

In [None]:
from pyspark.sql import SparkSession

import getpass
username = getpass.getuser()

spark = SparkSession. \
    builder. \
    config('spark.ui.port', '0'). \
    config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
    enableHiveSupport(). \
    appName(f'{username} | Python - Windowing Functions'). \
    master('yarn'). \
    getOrCreate()

If you are going to use CLIs, you can use Spark SQL using one of the 3 approaches.

**Using Spark SQL**

```
spark2-sql \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse
```

**Using Scala**

```
spark2-shell \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse
```

**Using Pyspark**

```
pyspark2 \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse
```

In [None]:
airlines_path = "/public/airlines_all/airlines-part/flightmonth=200801"

In [None]:
airlines = spark. \
  read. \
  parquet(airlines_path)

In [None]:
from pyspark.sql.functions import col, lit, lpad, concat

In [None]:
from pyspark.sql.functions import min, max, sum, avg

In [None]:
from pyspark.sql.window import Window

In [None]:
airlines.printSchema()

In [None]:
spec = Window. \
    partitionBy("FlightDate", "Origin")

In [None]:
airlines. \
    filter("IsDepDelayed = 'YES' and Cancelled = 0"). \
    select(concat("Year", 
                  lpad("Month", 2, "0"), 
                  lpad("DayOfMonth", 2, "0")
                 ).alias("FlightDate"),
           "Origin",
           "UniqueCarrier",
           "FlightNum",
           "CRSDepTime",
           "IsDepDelayed",
           col("DepDelay").cast("int").alias("DepDelay")
          ). \
    withColumn("DepDelayMin", min("DepDelay").over(spec)). \
    withColumn("DepDelayMax", max("DepDelay").over(spec)). \
    withColumn("DepDelaySum", sum("DepDelay").over(spec)). \
    withColumn("DepDelayAvg", avg("DepDelay").over(spec)). \
    orderBy("FlightDate", "Origin", "DepDelay"). \
    show()