In [0]:
#Aggregating is the act of collecting something together and is a cornerstone of big data analytics.In an aggregation, you will specify a key or grouping and an aggregation function that specifies how you should transform one or more columns. This function must produce one result for each group, given multiple input values. 

#The simplest grouping is to just summarize a complete DataFrame by performing an aggregation in a select statement.

#A “group by” allows you to specify one or more keys as well as one or more aggregation functions to transform the value columns.

#A “window” gives you the ability to specify one or more keys as well as one or more aggregation functions to transform the value columns. However, the rows input to the function are somehow related to the current row.

#A “grouping set,” which you can use to aggregate at multiple different levels. Grouping sets are available as a primitive in SQL and via rollups and cubes in DataFrames.

# “rollup” makes it possible for you to specify one or more keys as well as one or more aggregation functions to transform the value columns, which will be summarized hierarchically.

#A “cube” allows you to specify one or more keys as well as one or more aggregation functions to transform the value columns, which will be summarized across all combinations of columns.

In [0]:
df = spark.read.format("csv").option("header","true").option("inferschema","true").load("dbfs:/FileStore/shared_uploads/annemchandrareddy123@gmail.com/*.csv").coalesce(5)
df.cache()
df.createOrReplaceTempView("dfTable")
df.show(10)

+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|    InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
|   536365|     null|WHITE HANGING HEA...|       6|01-12-2010 8.26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|01-12-2010 8.26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|01-12-2010 8.26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|01-12-2010 8.26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|01-12-2010 8.26|     3.39|     17850|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|01-12-2010 8.26|     7.65|     17850|United Kingdom|
|   536365|    21730|GLASS STAR FROSTE...|       6|01-12-2010 8.

In [0]:
#As mentioned, basic aggregations apply to an entire DataFrame. The simplest example is the count method:
print(df.count())

#If you’ve been reading this book chapter by chapter, you know that count is actually an action as opposed to a transformation, and so it returns immediately. You can use count to get an idea of the total size of your dataset but another common pattern is to use it to cache an entire DataFrame in memory, just like we did in this example.

#Now, this method is a bit of an outlier because it exists as a method (in this case) as opposed to a function and is eagerly evaluated instead of a lazy transformation. In the next section, we will see count used as a lazy function, as well.

#The first function worth going over is count, except in this example it will perform as a transformation instead of an action.
from pyspark.sql.functions import count
df.select(count("StockCode")).show()

1108363
+----------------+
|count(StockCode)|
+----------------+
|         1108362|
+----------------+



In [0]:
#There are a number of gotchas when it comes to null values and counting. For instance, when performing a count(*), Spark will count null values (including rows containing all nulls). However, when counting an individual column, Spark will not count the null values.

In [0]:
from pyspark.sql.functions import countDistinct
df.select(countDistinct("StockCode")).show()

+-------------------------+
|count(DISTINCT StockCode)|
+-------------------------+
|                     4385|
+-------------------------+



In [0]:
#approx_count_distinct
#Often, we find ourselves working with large datasets and the exact distinct count is irrelevant. There are times when an approximation to a certain degree of accuracy will work just fine, and for that, you can use the approx_count_distinct function:
from pyspark.sql.functions import approx_count_distinct
df.select(approx_count_distinct("StockCode", 0.1)).show()#0.01

#You will notice that approx_count_distinct took another parameter with which you can specify the maximum estimation error allowed. In this case, we specified a rather large error and thus receive an answer that is quite far off but does complete more quickly than countDistinct. You will see much greater performance gains with larger datasets.

+--------------------------------+
|approx_count_distinct(StockCode)|
+--------------------------------+
|                            3633|
+--------------------------------+



In [0]:
from pyspark.sql.functions import first, last, min, max, sum, sumDistinct, avg, mean, expr
df.select(first("StockCode"),last("StockCode")).show()

df.select(min("Quantity"), max("Quantity")).show()

df.select(sum("Quantity")).show()

df.select(sumDistinct("Quantity")).show()

+----------------+---------------+
|first(StockCode)|last(StockCode)|
+----------------+---------------+
|            null|  United States|
+----------------+---------------+

+-------------+-------------+
|min(Quantity)|max(Quantity)|
+-------------+-------------+
|    and Saba"|     Zimbabwe|
+-------------+-------------+

+-------------+
|sum(Quantity)|
+-------------+
|  1.0519548E7|
+-------------+





+----------------------+
|sum(DISTINCT Quantity)|
+----------------------+
|               29310.0|
+----------------------+



In [0]:
df.select(
    count("Quantity").alias("Total_transactions"),
    sum("Quantity").alias("Total_purchases"),
    avg("Quantity").alias("avg_of_purchases"),
    expr("mean(Quantity)").alias("mean_of_purchases")
).selectExpr("Total_transactions","Total_purchases","avg_of_purchases","mean_of_purchases").show()


+------------------+---------------+----------------+-----------------+
|Total_transactions|Total_purchases|avg_of_purchases|mean_of_purchases|
+------------------+---------------+----------------+-----------------+
|           1107604|    1.0519548E7|9.50841377116097| 9.50841377116097|
+------------------+---------------+----------------+-----------------+



In [0]:
from pyspark.sql.functions import var_pop, stddev_pop
from pyspark.sql.functions import var_samp, stddev_samp
df.select(var_pop("Quantity"), var_samp("Quantity"),
  stddev_pop("Quantity"), stddev_samp("Quantity")).show()

+-----------------+------------------+--------------------+---------------------+
|var_pop(Quantity)|var_samp(Quantity)|stddev_pop(Quantity)|stddev_samp(Quantity)|
+-----------------+------------------+--------------------+---------------------+
|46702.64952029336| 46702.69173394334|  216.10795802166416|   216.10805568960944|
+-----------------+------------------+--------------------+---------------------+



In [0]:

from pyspark.sql.functions import skewness, kurtosis
df.select(skewness("Quantity"), kurtosis("Quantity")).show()

+--------------------+------------------+
|  skewness(Quantity)|kurtosis(Quantity)|
+--------------------+------------------+
|-0.33458977206280915|121676.86256142345|
+--------------------+------------------+



In [0]:
from pyspark.sql.functions import covar_pop,covar_samp,corr

df.select(corr("InvoiceNo","Quantity"),covar_samp("InvoiceNo","Quantity"),covar_pop("InvoiceNo","Quantity")).show()

+-------------------------+-------------------------------+------------------------------+
|corr(InvoiceNo, Quantity)|covar_samp(InvoiceNo, Quantity)|covar_pop(InvoiceNo, Quantity)|
+-------------------------+-------------------------------+------------------------------+
|     9.317651161942416E-4|             2013.5817936353692|            2013.5799419467146|
+-------------------------+-------------------------------+------------------------------+



In [0]:
#In Spark, you can perform aggregations not just of numerical values using formulas, you can also perform them on complex types. For example, we can collect a list of values present in a given column or only the unique values by collecting to a set.


from pyspark.sql.functions import collect_set, collect_list
df.agg(collect_set("Country"), collect_list("Country")).show()

#SELECT collect_set(Country), collect_set(Country) FROM dfTable

+--------------------+---------------------+
|collect_set(Country)|collect_list(Country)|
+--------------------+---------------------+
|[Portugal, Italy,...| [United Kingdom, ...|
+--------------------+---------------------+



In [0]:
#As we saw earlier, counting is a bit of a special case because it exists as a method. For this, usually we prefer to use the count function. Rather than passing that function as an expression into a select statement, we specify it as within agg. This makes it possible for you to pass-in arbitrary expressions that just need to have some aggregation specified. You can even do things like alias a column after transforming it for later use in your data flow:

from pyspark.sql.functions import count

df.groupBy("InvoiceNo").agg(
    count("Quantity").alias("quan"),
    expr("count(Quantity)")).show()

+---------+----+---------------+
|InvoiceNo|quan|count(Quantity)|
+---------+----+---------------+
|   536596|  18|             18|
|   536938|  42|             42|
|   537252|   3|              3|
|   537691|  60|             60|
|   538041|   3|              3|
|   538184|  52|             52|
|   538517| 106|            106|
|   538879|  38|             38|
|   539275|  12|             12|
|   539630|  24|             24|
|   540499|  48|             48|
|   540540|  44|             44|
|  C540850|   2|              2|
|   540976|  96|             96|
|   541432|   8|              8|
|   541518| 202|            202|
|   541783|  70|             70|
|   542026|  18|             18|
|   542375|  12|             12|
|  C542604|  16|             16|
+---------+----+---------------+
only showing top 20 rows



In [0]:
#Sometimes, it can be easier to specify your transformations as a series of Maps for which the key is the column, and the value is the aggregation function (as a string) that you would like to perform. You can reuse multiple column names if you specify them inline, as well:

df.groupBy("InvoiceNo").agg(expr("avg(Quantity)"),expr("stddev_pop(Quantity)"))\
  .show()

+---------+------------------+--------------------+
|InvoiceNo|     avg(Quantity)|stddev_pop(Quantity)|
+---------+------------------+--------------------+
|   536596|               1.5|   1.118033988749895|
|   536938|33.142857142857146|  20.698023172885524|
|   537252|              31.0|                 0.0|
|   537691|              8.15|   5.597097462078001|
|   538041|              30.0|                 0.0|
|   538184|12.076923076923077|   8.142590198943392|
|   538517|3.0377358490566038|  2.3946659604837897|
|   538879|21.157894736842106|  11.811070444356483|
|   539275|              26.0|  12.806248474865697|
|   539630|20.333333333333332|  10.225241100118645|
|   540499|              3.75|  2.6653642652865788|
|   540540|2.1363636363636362|  1.0572457590557278|
|  C540850|              -1.0|                 0.0|
|   540976|10.520833333333334|   6.496760677872902|
|   541432|             12.25|  10.825317547305483|
|   541518| 23.10891089108911|  20.550782784878713|
|   541783|1

In [0]:

#You can also use window functions to carry out some unique aggregations by either computing some aggregation on a specific “window” of data, which you define by using a reference to the current data. This window specification determines which rows will be passed in to this function. Now this is a bit abstract and probably similar to a standard group-by, so let’s differentiate them a bit more.

#A group-by takes data, and every row can go only into one grouping. A window function calculates a return value for every input row of a table based on a group of rows, called a frame. Each row can fall into one or more frames. A common use case is to take a look at a rolling average of some value for which each row represents one day. If you were to do this, each row would end up in seven different frames. We cover defining frames a little later, but for your reference, Spark supports three kinds of window functions: ranking functions, analytic functions, and aggregate functions.

from pyspark.sql.functions import col, to_date
dfWithDate = df.withColumn("date", to_date(col("InvoiceDate"), "MM/d/yyyy H:mm"))
dfWithDate.createOrReplaceTempView("dfWithDate")

from pyspark.sql.window import Window
from pyspark.sql.functions import desc
windowSpec = Window\
  .partitionBy("CustomerId", "date")\
  .orderBy(desc("Quantity"))\
  .rowsBetween(Window.unboundedPreceding, Window.currentRow)

#e want to use an aggregation function to learn more about each specific customer. An example might be establishing the maximum purchase quantity over all time. To answer this, we use the same aggregation functions that we saw earlier by passing a column name or expression. In addition, we indicate the window specification that defines to which frames of data this function will apply:
from pyspark.sql.functions import max
maxPurchaseQuantity = max(col("Quantity")).over(windowSpec)

#You will notice that this returns a column (or expressions). We can now use this in a DataFrame select statement. Before doing so, though, we will create the purchase quantity rank. To do that we use the dense_rank function to determine which date had the maximum purchase quantity for every customer. We use dense_rank as opposed to rank to avoid gaps in the ranking sequence when there are tied values (or in our case, duplicate rows):
from pyspark.sql.functions import dense_rank, rank
purchaseDenseRank = dense_rank().over(windowSpec)
purchaseRank = rank().over(windowSpec)

from pyspark.sql.functions import col

dfWithDate.where("CustomerId IS NOT NULL").orderBy("CustomerId")\
  .select(
    col("CustomerId"),
    col("date"),
    col("Quantity"),
    purchaseRank.alias("quantityRank"),
    purchaseDenseRank.alias("quantityDenseRank"),
    maxPurchaseQuantity.alias("maxPurchaseQuantity")).show()


+----------+----+--------+------------+-----------------+-------------------+
|CustomerId|date|Quantity|quantityRank|quantityDenseRank|maxPurchaseQuantity|
+----------+----+--------+------------+-----------------+-------------------+
|     12346|null|   74215|           1|                1|              74215|
|     12346|null|  -74215|           2|                2|              74215|
|   12346.0|null|   74215|           1|                1|              74215|
|   12346.0|null|  -74215|           2|                2|              74215|
|     12347|null|       8|           1|                1|                  8|
|     12347|null|       8|           1|                1|                  8|
|     12347|null|       8|           1|                1|                  8|
|     12347|null|       8|           1|                1|                  8|
|     12347|null|       8|           1|                1|                  8|
|     12347|null|       8|           1|                1|       

In [0]:
#Thus far in this chapter, we’ve seen simple group-by expressions that we can use to aggregate on a set of columns with the values in those columns. However, sometimes we want something a bit more complete—an aggregation across multiple groups. We achieve this by using grouping sets. Grouping sets are a low-level tool for combining sets of aggregations together. They give you the ability to create arbitrary aggregation in their group-by statements.

dfNoNull = dfWithDate.drop()
dfNoNull.createOrReplaceTempView("dfNoNull")

spark.sql("SELECT CustomerId, stockCode, sum(Quantity) FROM dfNoNull GROUP BY customerId, stockCode ORDER BY CustomerId DESC, stockCode DESC").show()

spark.sql("SELECT CustomerId, stockCode, sum(Quantity) FROM dfNoNull GROUP BY customerId, stockCode GROUPING SETS((customerId, stockCode)) ORDER BY CustomerId DESC, stockCode DESC").show()

+----------+-----------+-------------+
|CustomerId|  stockCode|sum(Quantity)|
+----------+-----------+-------------+
|       Yes|99999999999|         null|
|       Yes|          8|         null|
|       Yes|         72|         null|
|       Yes|         57|         null|
|       Yes|         56|         null|
|       Yes|         55|         null|
|       Yes|         54|         null|
|       Yes|         50|         null|
|       Yes|         49|         null|
|       Yes|         48|         null|
|       Yes|         47|         null|
|       Yes|         46|         null|
|       Yes|         45|         null|
|       Yes|         44|         null|
|       Yes|         43|         null|
|       Yes|         42|         null|
|       Yes|         41|         null|
|       Yes|         40|         null|
|       Yes|         39|         null|
|       Yes|         38|         null|
+----------+-----------+-------------+
only showing top 20 rows

+----------+-----------+-------------+