## Aggregations

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark=SparkSession.builder.appName("My app").getOrCreate()

In [3]:
df = spark.read.format("csv")\
    .option("header","true")\
    .option("inferSchema","true")\
    .load("retail-data/all/online-retail-dataset.csv")\
    .coalesce(5)

In [4]:
df.cache()

DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: int, InvoiceDate: string, UnitPrice: double, CustomerID: int, Country: string]

In [5]:
df.createOrReplaceTempView("dfTable")

In [6]:
df.count()

541909

### count

In [7]:
from pyspark.sql.functions import count
df.select(count("StockCode")).collect() #count as transformation

[Row(count(StockCode)=541909)]

### Count Distinct

In [8]:
from pyspark.sql.functions import countDistinct
df.select(countDistinct("StockCode")).collect()

[Row(count(DISTINCT StockCode)=4070)]

### Approximate Count Distinct

In [9]:
from pyspark.sql.functions import approx_count_distinct
df.select(approx_count_distinct("StockCode",0.1)).collect()

[Row(approx_count_distinct(StockCode)=3364)]

### First ans Last

In [10]:
from pyspark.sql.functions import first,last
df.select(first("StockCode"),last("StockCode")).collect()

[Row(first(StockCode)='85123A', last(StockCode)='22138')]

### Min and Max

In [11]:
from pyspark.sql.functions import min,max
df.select(min("Quantity"),max("Quantity")).collect()

[Row(min(Quantity)=-80995, max(Quantity)=80995)]

### Sum

In [12]:
from pyspark.sql.functions import sum
df.select(sum("Quantity")).show()

+-------------+
|sum(Quantity)|
+-------------+
|      5176450|
+-------------+



### sumDistinct

In [13]:
from pyspark.sql.functions import sumDistinct
df.select(sumDistinct("Quantity")).show()

+----------------------+
|sum(DISTINCT Quantity)|
+----------------------+
|                 29310|
+----------------------+



### Average

In [14]:
from pyspark.sql.functions import sum,count,avg,expr

df.select(
    count("Quantity").alias("total_transactions"),
    sum("Quantity").alias("total_purchases"),
    avg("Quantity").alias("avg_purchases"),
    expr("mean(Quantity)").alias("mean_purchases"))\
    .selectExpr(
        "total_purchases/total_transactions",
        "avg_purchases",
        "mean_purchases")\
    .collect()

[Row((total_purchases / total_transactions)=9.55224954743324, avg_purchases=9.55224954743324, mean_purchases=9.55224954743324)]

### Variance and Standard Deviation

In [15]:
from pyspark.sql.functions import var_pop,stddev_pop
from pyspark.sql.functions import var_samp,stddev_samp

df.select(
    var_pop("Quantity"),
    var_samp("Quantity"),
    stddev_pop("Quantity"),
    stddev_samp("Quantity"))\
    .collect()

[Row(var_pop(Quantity)=47559.303646609056, var_samp(Quantity)=47559.391409298754, stddev_pop(Quantity)=218.08095663447796, stddev_samp(Quantity)=218.08115785023418)]

### Skewness and Kurtosis

In [16]:
from pyspark.sql.functions import skewness, kurtosis

df.select(
    skewness("Quantity"),
    kurtosis("Quantity"))\
    .collect()

[Row(skewness(Quantity)=-0.2640755761052562, kurtosis(Quantity)=119768.05495536952)]

### Covariance and Correlation 

In [17]:
from pyspark.sql.functions import corr,covar_pop,covar_samp

df.select(
    corr("InvoiceNo","Quantity"),
    covar_samp("InvoiceNo","Quantity"),
    covar_pop("InvoiceNo","Quantity"))\
    .show()

+-------------------------+-------------------------------+------------------------------+
|corr(InvoiceNo, Quantity)|covar_samp(InvoiceNo, Quantity)|covar_pop(InvoiceNo, Quantity)|
+-------------------------+-------------------------------+------------------------------+
|     4.912186085635685E-4|             1052.7280543902734|            1052.7260778741693|
+-------------------------+-------------------------------+------------------------------+



### Aggregating to Complext Types

In [18]:
from pyspark.sql.functions import collect_set,collect_list

df.agg(
    collect_set("Country"),
    collect_list("Country"))\
    .show()

+--------------------+---------------------+
|collect_set(Country)|collect_list(Country)|
+--------------------+---------------------+
|[Portugal, Italy,...| [United Kingdom, ...|
+--------------------+---------------------+



### Grouping

In [19]:
df.groupBy("invoiceNo").count().show()

+---------+-----+
|invoiceNo|count|
+---------+-----+
|   536596|    6|
|   536938|   14|
|   537252|    1|
|   537691|   20|
|   538041|    1|
|   538184|   26|
|   538517|   53|
|   538879|   19|
|   539275|    6|
|   539630|   12|
|   540499|   24|
|   540540|   22|
|  C540850|    1|
|   540976|   48|
|   541432|    4|
|   541518|  101|
|   541783|   35|
|   542026|    9|
|   542375|    6|
|   536597|   28|
+---------+-----+
only showing top 20 rows



In [20]:
df.groupBy("InvoiceNo","CustomerId").count().show()

+---------+----------+-----+
|InvoiceNo|CustomerId|count|
+---------+----------+-----+
|   536846|     14573|   76|
|   537026|     12395|   12|
|   537883|     14437|    5|
|   538068|     17978|   12|
|   538279|     14952|    7|
|   538800|     16458|   10|
|   538942|     17346|   12|
|  C539947|     13854|    1|
|   540096|     13253|   16|
|   540530|     14755|   27|
|   541225|     14099|   19|
|   541978|     13551|    4|
|   542093|     17677|   16|
|   536596|      NULL|    6|
|   537252|      NULL|    1|
|   538041|      NULL|    1|
|   537159|     14527|   28|
|   537213|     12748|    6|
|   538191|     15061|   16|
|  C539301|     13496|    1|
+---------+----------+-----+
only showing top 20 rows



### Grouping with expressions

In [21]:
df.groupBy("InvoiceNo")\
    .agg(
        count("Quantity").alias("quan"),
        expr("count(Quantity)"))\
    .show()

+---------+----+---------------+
|InvoiceNo|quan|count(Quantity)|
+---------+----+---------------+
|   536596|   6|              6|
|   536938|  14|             14|
|   537252|   1|              1|
|   537691|  20|             20|
|   538041|   1|              1|
|   538184|  26|             26|
|   538517|  53|             53|
|   538879|  19|             19|
|   539275|   6|              6|
|   539630|  12|             12|
|   540499|  24|             24|
|   540540|  22|             22|
|  C540850|   1|              1|
|   540976|  48|             48|
|   541432|   4|              4|
|   541518| 101|            101|
|   541783|  35|             35|
|   542026|   9|              9|
|   542375|   6|              6|
|   536597|  28|             28|
+---------+----+---------------+
only showing top 20 rows



### Grouping with Maps

In [22]:
df.groupBy("InvoiceNo")\
    .agg(expr("avg(Quantity)"),
         expr("stddev_pop(Quantity)"))\
    .show()

+---------+------------------+--------------------+
|InvoiceNo|     avg(Quantity)|stddev_pop(Quantity)|
+---------+------------------+--------------------+
|   536596|               1.5|  1.1180339887498947|
|   536938|33.142857142857146|  20.698023172885524|
|   537252|              31.0|                 0.0|
|   537691|              8.15|   5.597097462078001|
|   538041|              30.0|                 0.0|
|   538184|12.076923076923077|   8.142590198943392|
|   538517|3.0377358490566038|  2.3946659604837897|
|   538879|21.157894736842106|  11.811070444356483|
|   539275|              26.0|  12.806248474865697|
|   539630|20.333333333333332|  10.225241100118645|
|   540499|              3.75|  2.6653642652865788|
|   540540|2.1363636363636362|  1.0572457590557278|
|  C540850|              -1.0|                 0.0|
|   540976|10.520833333333334|   6.496760677872902|
|   541432|             12.25|  10.825317547305483|
|   541518| 23.10891089108911|  20.550782784878713|
|   541783|1

### Window Functions

In [27]:
from pyspark.sql.functions import col, to_date
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")
dfWithDate = df.withColumn("date", to_date(col("InvoiceDate"), "MM/d/yyyy H:mm"))
dfWithDate.createOrReplaceTempView("dfWithDate")

In [28]:
from pyspark.sql.window import Window
from pyspark.sql.functions import desc
windowSpec = Window\
.partitionBy("CustomerId", "date")\
.orderBy(desc("Quantity"))\
.rowsBetween(Window.unboundedPreceding, Window.currentRow)

In [29]:
from pyspark.sql.functions import max
maxPurchaseQuantity = max(col("Quantity")).over(windowSpec)

In [30]:

from pyspark.sql.functions import dense_rank, rank
purchaseDenseRank = dense_rank().over(windowSpec)
purchaseRank = rank()\
 .over(windowSpec)
from pyspark.sql.functions import col
dfWithDate.where("CustomerId IS NOT NULL").orderBy("CustomerId")\
.select(
col("CustomerId"),
col("date"),
col("Quantity"),
purchaseRank.alias("quantityRank"),
purchaseDenseRank.alias("quantityDenseRank")).show()

+----------+----------+--------+------------+-----------------+
|CustomerId|      date|Quantity|quantityRank|quantityDenseRank|
+----------+----------+--------+------------+-----------------+
|     12346|2011-01-18|   74215|           1|                1|
|     12346|2011-01-18|  -74215|           2|                2|
|     12347|2010-12-07|      36|           1|                1|
|     12347|2010-12-07|      30|           2|                2|
|     12347|2010-12-07|      24|           3|                3|
|     12347|2010-12-07|      12|           4|                4|
|     12347|2010-12-07|      12|           4|                4|
|     12347|2010-12-07|      12|           4|                4|
|     12347|2010-12-07|      12|           4|                4|
|     12347|2010-12-07|      12|           4|                4|
|     12347|2010-12-07|      12|           4|                4|
|     12347|2010-12-07|      12|           4|                4|
|     12347|2010-12-07|      12|        

In [31]:
from pyspark.sql.functions import col

dfWithDate\
    .where("CustomerId IS NOT NULL")\
    .orderBy("CustomerId")\
    .select(
        col("CustomerId"),
        col("date"),
        col("Quantity"),
        purchaseRank.alias("quantityRank"),
        purchaseDenseRank.alias("quantityDenseRank"),
        maxPurchaseQuantity.alias("maxPurchaseQuantity"))\
    .show()

+----------+----------+--------+------------+-----------------+-------------------+
|CustomerId|      date|Quantity|quantityRank|quantityDenseRank|maxPurchaseQuantity|
+----------+----------+--------+------------+-----------------+-------------------+
|     12346|2011-01-18|   74215|           1|                1|              74215|
|     12346|2011-01-18|  -74215|           2|                2|              74215|
|     12347|2010-12-07|      36|           1|                1|                 36|
|     12347|2010-12-07|      30|           2|                2|                 36|
|     12347|2010-12-07|      24|           3|                3|                 36|
|     12347|2010-12-07|      12|           4|                4|                 36|
|     12347|2010-12-07|      12|           4|                4|                 36|
|     12347|2010-12-07|      12|           4|                4|                 36|
|     12347|2010-12-07|      12|           4|                4|             

### Rollups
A Rollup is a multi-dimensional aggregation that performs a variety of group by style calculations
for us

In [32]:
rolledUpDF = dfWithDate.rollup("Date","Country")\
    .agg(sum("Quantity"))\
    .selectExpr("Date","Country","`sum(Quantity)` as total_quantity")\
    .orderBy("Date")

rolledUpDF.show(20)

+----------+--------------+--------------+
|      Date|       Country|total_quantity|
+----------+--------------+--------------+
|      NULL|          NULL|       5176450|
|2010-12-01|     Australia|           107|
|2010-12-01|United Kingdom|         23949|
|2010-12-01|        France|           449|
|2010-12-01|        Norway|          1852|
|2010-12-01|          NULL|         26814|
|2010-12-01|       Germany|           117|
|2010-12-01|          EIRE|           243|
|2010-12-01|   Netherlands|            97|
|2010-12-02|          NULL|         21023|
|2010-12-02|United Kingdom|         20873|
|2010-12-02|       Germany|           146|
|2010-12-02|          EIRE|             4|
|2010-12-03|          NULL|         14830|
|2010-12-03|      Portugal|            65|
|2010-12-03|         Spain|           400|
|2010-12-03|United Kingdom|         10439|
|2010-12-03|   Switzerland|           110|
|2010-12-03|       Germany|           170|
|2010-12-03|          EIRE|          2575|
+----------

In [33]:
rolledUpDF.where("Country IS NULL").show()

+----------+-------+--------------+
|      Date|Country|total_quantity|
+----------+-------+--------------+
|      NULL|   NULL|       5176450|
|2010-12-01|   NULL|         26814|
|2010-12-02|   NULL|         21023|
|2010-12-03|   NULL|         14830|
|2010-12-05|   NULL|         16395|
|2010-12-06|   NULL|         21419|
|2010-12-07|   NULL|         24995|
|2010-12-08|   NULL|         22741|
|2010-12-09|   NULL|         18431|
|2010-12-10|   NULL|         20297|
|2010-12-12|   NULL|         10565|
|2010-12-13|   NULL|         17623|
|2010-12-14|   NULL|         20098|
|2010-12-15|   NULL|         18229|
|2010-12-16|   NULL|         29632|
|2010-12-17|   NULL|         16069|
|2010-12-19|   NULL|          3795|
|2010-12-20|   NULL|         14965|
|2010-12-21|   NULL|         15467|
|2010-12-22|   NULL|          3192|
+----------+-------+--------------+
only showing top 20 rows



In [34]:
rolledUpDF.where("Date IS NULL").show()

+----+-------+--------------+
|Date|Country|total_quantity|
+----+-------+--------------+
|NULL|   NULL|       5176450|
+----+-------+--------------+



### Cube
 A cube takes the rollup takes a rollup to a level deeper. Rather than treating
 things hierarchically a cube does the same thing across all dimensions.

In [35]:
# The grand total across all dates and countries
# The grand total for each date across all countries
# The grand total for each country on each date
# The grand total for each country across all date
dfWithDate.cube("Date","Country")\
    .agg(sum(col("Quantity")))\
    .select("Date","Country","sum(Quantity)")\
    .orderBy("Date")\
    .show(20)

+----+---------------+-------------+
|Date|        Country|sum(Quantity)|
+----+---------------+-------------+
|NULL|         Cyprus|         6317|
|NULL|        Belgium|        23152|
|NULL|        Austria|         4827|
|NULL|        Germany|       117448|
|NULL|      Lithuania|          652|
|NULL|         Poland|         3653|
|NULL|        Iceland|         2458|
|NULL|      Australia|        83653|
|NULL|        Finland|        10666|
|NULL|         Norway|        19247|
|NULL|          Italy|         7999|
|NULL|           EIRE|       142637|
|NULL| United Kingdom|      4263829|
|NULL|          Spain|        26824|
|NULL|        Lebanon|          386|
|NULL|        Bahrain|          260|
|NULL|           NULL|      5176450|
|NULL|         Israel|         4353|
|NULL|Channel Islands|         9479|
|NULL|    Switzerland|        30325|
+----+---------------+-------------+
only showing top 20 rows



### Pivot

In [87]:
pivoted = dfWithDate\
    .groupBy("date")\
    .pivot("Country")\
    .agg({"quantity":"sum"})

In [88]:
pivoted.columns

['date',
 'Australia',
 'Austria',
 'Bahrain',
 'Belgium',
 'Brazil',
 'Canada',
 'Channel Islands',
 'Cyprus',
 'Czech Republic',
 'Denmark',
 'EIRE',
 'European Community',
 'Finland',
 'France',
 'Germany',
 'Greece',
 'Hong Kong',
 'Iceland',
 'Israel',
 'Italy',
 'Japan',
 'Lebanon',
 'Lithuania',
 'Malta',
 'Netherlands',
 'Norway',
 'Poland',
 'Portugal',
 'RSA',
 'Saudi Arabia',
 'Singapore',
 'Spain',
 'Sweden',
 'Switzerland',
 'USA',
 'United Arab Emirates',
 'United Kingdom',
 'Unspecified']

In [89]:
pivoted.where("date>'2011-12-05'").select("USA").show()

+----+
| USA|
+----+
|NULL|
|NULL|
|-196|
|NULL|
+----+

