In [0]:
df = spark.read.csv('dbfs:/FileStore/whole_retail_data/online_retail_dataset.csv', header=True, inferSchema=True).coalesce(5)

In [0]:
df.cache()

Out[2]: DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: int, InvoiceDate: string, UnitPrice: double, CustomerID: int, Country: string]

In [0]:
df.createOrReplaceTempView("dfTable")

In [0]:
df.show(5)

+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/2010 8:26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
only showing top 5 rows



In [0]:
df.count()

Out[5]: 541909

In [0]:
from pyspark.sql.functions import count
df.select(count("CustomerID")).show()

+-----------------+
|count(CustomerID)|
+-----------------+
|           406829|
+-----------------+



In [0]:
def count_non_nulls(col):
    return (col,df.select(count(col)).show())
    
counts = [count_non_nulls(col) for col in df.columns]

+----------------+
|count(InvoiceNo)|
+----------------+
|          541909|
+----------------+

+----------------+
|count(StockCode)|
+----------------+
|          541909|
+----------------+

+------------------+
|count(Description)|
+------------------+
|            540455|
+------------------+

+---------------+
|count(Quantity)|
+---------------+
|         541909|
+---------------+

+------------------+
|count(InvoiceDate)|
+------------------+
|            541909|
+------------------+

+----------------+
|count(UnitPrice)|
+----------------+
|          541909|
+----------------+

+-----------------+
|count(CustomerID)|
+-----------------+
|           406829|
+-----------------+

+--------------+
|count(Country)|
+--------------+
|        541909|
+--------------+



In [0]:
from pyspark.sql.functions import countDistinct
df.select(countDistinct("StockCode")).show()

+-------------------------+
|count(DISTINCT StockCode)|
+-------------------------+
|                     4070|
+-------------------------+



In [0]:
from pyspark.sql.functions import approx_count_distinct
df.select(approx_count_distinct("StockCode", 0.1)).show()

+--------------------------------+
|approx_count_distinct(StockCode)|
+--------------------------------+
|                            3364|
+--------------------------------+



In [0]:
from pyspark.sql.functions import first, last
df.select(first("StockCode"), last("StockCode")).show()

+----------------+---------------+
|first(StockCode)|last(StockCode)|
+----------------+---------------+
|          85123A|          22138|
+----------------+---------------+



In [0]:
from pyspark.sql.functions import min, max
df.select(min("Quantity"), max("Quantity")).show()

+-------------+-------------+
|min(Quantity)|max(Quantity)|
+-------------+-------------+
|       -80995|        80995|
+-------------+-------------+



In [0]:
from pyspark.sql.functions import sum
df.select(sum("Quantity")).show()

+-------------+
|sum(Quantity)|
+-------------+
|      5176450|
+-------------+



In [0]:
from pyspark.sql.functions import sumDistinct
df.select(sumDistinct("Quantity")).show()

+----------------------+
|sum(DISTINCT Quantity)|
+----------------------+
|                 29310|
+----------------------+



In [0]:
from pyspark.sql.functions import sum, count, avg, expr
df.select(
count("Quantity").alias("total_transactions"),
sum("Quantity").alias("total_purchases"),
avg("Quantity").alias("avg_purchases"),
expr("mean(Quantity)").alias("mean_purchases"))\
.selectExpr(
"total_purchases/total_transactions",
"avg_purchases",
"mean_purchases").show()

+--------------------------------------+----------------+----------------+
|(total_purchases / total_transactions)|   avg_purchases|  mean_purchases|
+--------------------------------------+----------------+----------------+
|                      9.55224954743324|9.55224954743324|9.55224954743324|
+--------------------------------------+----------------+----------------+



The variance is the average
of the squared differences from the mean, and the standard deviation is the square root of the
variance.

In [0]:
from pyspark.sql.functions import stddev_pop, var_pop, stddev_samp, var_samp
df.select(var_pop('Quantity'), var_samp('Quantity'), stddev_pop('Quantity'), stddev_samp('Quantity')).show()

+-----------------+------------------+--------------------+---------------------+
|var_pop(Quantity)|var_samp(Quantity)|stddev_pop(Quantity)|stddev_samp(Quantity)|
+-----------------+------------------+--------------------+---------------------+
|47559.30364660923| 47559.39140929892|  218.08095663447835|   218.08115785023455|
+-----------------+------------------+--------------------+---------------------+



In [0]:
%sql
SELECT var_pop(Quantity), var_samp(Quantity),
stddev_pop(Quantity), stddev_samp(Quantity)
FROM dfTable

var_pop(Quantity),var_samp(Quantity),stddev_pop(Quantity),stddev_samp(Quantity)
47559.30364660923,47559.39140929892,218.08095663447835,218.08115785023452


Skewness and kurtosis are both measurements of extreme points in your data. Skewness
measures the asymmetry of the values in your data around the mean, whereas kurtosis is a
measure of the tail of data

These are both relevant specifically when modeling your data as a
probability distribution of a random variable.

In [0]:
from pyspark.sql.functions import skewness, kurtosis
df.select(skewness("Quantity"), kurtosis("Quantity")).show()

+--------------------+------------------+
|  skewness(Quantity)|kurtosis(Quantity)|
+--------------------+------------------+
|-0.26407557610528376|119768.05495530753|
+--------------------+------------------+



Correlation measures the Pearson correlation
coefficient, which is scaled between –1 and +1. The covariance is scaled according to the inputs
in the data. 

Covariance and correlation are two terms that are opposed and are both used in statistics and regression analysis. Covariance shows you how the two variables differ, whereas correlation shows you how the two variables are related

In [0]:
from pyspark.sql.functions import corr, covar_pop, covar_samp, round
df.select(round(corr("UnitPrice", "Quantity"),4), covar_samp("UnitPrice", "Quantity"),
covar_pop("UnitPrice", "Quantity")).show()

+-----------------------------------+-------------------------------+------------------------------+
|round(corr(UnitPrice, Quantity), 4)|covar_samp(UnitPrice, Quantity)|covar_pop(UnitPrice, Quantity)|
+-----------------------------------+-------------------------------+------------------------------+
|                            -0.0012|             -26.05876125793705|           -26.058713170968097|
+-----------------------------------+-------------------------------+------------------------------+



In [0]:
from pyspark.sql.functions import collect_set, collect_list
df.agg(collect_set("Country")).show(1,False)
df.agg(collect_set("Country"), collect_list("Country")).show()

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|collect_set(Country)                                                                                                                                                                                                                                                                                                                                                                              |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [0]:
df.groupBy("InvoiceNo", "CustomerId").count().show(5)

+---------+----------+-----+
|InvoiceNo|CustomerId|count|
+---------+----------+-----+
|   536846|     14573|   76|
|   537026|     12395|   12|
|   537883|     14437|    5|
|   538068|     17978|   12|
|   538279|     14952|    7|
+---------+----------+-----+
only showing top 5 rows



In [0]:
from pyspark.sql.functions import count, mean
df.groupBy("InvoiceNo", "CustomerId").agg(count('Quantity').alias('grouped_quan'), expr("mean(Quantity)")).show(5)

+---------+----------+------------+------------------+
|InvoiceNo|CustomerId|grouped_quan|    mean(Quantity)|
+---------+----------+------------+------------------+
|   536846|     14573|          76| 1.763157894736842|
|   537026|     12395|          12|              44.0|
|   537883|     14437|           5|              12.0|
|   538068|     17978|          12|41.583333333333336|
|   538279|     14952|           7| 67.42857142857143|
+---------+----------+------------+------------------+
only showing top 5 rows



A group-by takes data, and every row can go only into one grouping. A window function
calculates a return value for every input row of a table based on a group of rows, called a frame.
Each row can fall into one or more frames.

Spark supports three kinds of window functions: ranking functions, analytic functions,
and aggregate functions.

In [0]:
from pyspark.sql.functions import col, to_date
dfWithDate = df.withColumn("date", to_date(col('InvoiceDate'), 'MM/d/yyyy H:mm'))
dfWithDate.createOrReplaceTempView('dfWithDate')
dfWithDate.show(5,False)

+---------+---------+-----------------------------------+--------+--------------+---------+----------+--------------+----------+
|InvoiceNo|StockCode|Description                        |Quantity|InvoiceDate   |UnitPrice|CustomerID|Country       |date      |
+---------+---------+-----------------------------------+--------+--------------+---------+----------+--------------+----------+
|536365   |85123A   |WHITE HANGING HEART T-LIGHT HOLDER |6       |12/1/2010 8:26|2.55     |17850     |United Kingdom|2010-12-01|
|536365   |71053    |WHITE METAL LANTERN                |6       |12/1/2010 8:26|3.39     |17850     |United Kingdom|2010-12-01|
|536365   |84406B   |CREAM CUPID HEARTS COAT HANGER     |8       |12/1/2010 8:26|2.75     |17850     |United Kingdom|2010-12-01|
|536365   |84029G   |KNITTED UNION FLAG HOT WATER BOTTLE|6       |12/1/2010 8:26|3.39     |17850     |United Kingdom|2010-12-01|
|536365   |84029E   |RED WOOLLY HOTTIE WHITE HEART.     |6       |12/1/2010 8:26|3.39     |17850 

**partitionby** is unrelated to the partitioning scheme concept that we have covered thus far. It’s just a
similar concept that describes how we will be breaking up our group. 
The **ordering** determines
the ordering within a given partition, and, finally, the frame specification (the **rowsBetween**
statement) states which rows will be included in the frame based on its reference to the current
input row. 

In the following example, we look at all previous rows up to the current row:

In [0]:
from pyspark.sql.functions import desc
from pyspark.sql.window import Window

windowSpec = Window.partitionBy("CustomerId", "date")\
.orderBy(desc("Quantity"))\
.rowsBetween(Window.unboundedPreceding,Window.currentRow)

Now we want to use an aggregation function to learn more about each specific customer. An
example might be establishing the maximum purchase quantity over all time.

In [0]:
from pyspark.sql.functions import max
maxPurchaseQuantity = max(col("Quantity")).over(windowSpec)

In [0]:
maxPurchaseQuantity

Out[25]: Column<'max(Quantity) OVER (PARTITION BY CustomerId, date ORDER BY Quantity DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)'>

*We can now use this in a DataFrame
select statement. Before doing so, though, we will create the purchase quantity rank. To do that
we use the dense_rank function to determine which date had the maximum purchase quantity
for every customer. We use dense_rank as opposed to rank to avoid gaps in the ranking
sequence when there are tied values (or in our case, duplicate rows):*

In [0]:
from pyspark.sql.functions import dense_rank, rank
purchaseDenseRank = dense_rank().over(windowSpec)
purchaseRank = rank().over(windowSpec)

In [0]:
spark.conf.set("spark.sql.legacy.timeParserPolicy","LEGACY")

In [0]:
from pyspark.sql.functions import col
dfWithDate.where("CustomerId IS NOT NULL").orderBy("CustomerId")\
.select(
col("CustomerId"),
col("date"),
col("Quantity"),
purchaseRank.alias("quantityRank"),
purchaseDenseRank.alias("quantityDenseRank"),
maxPurchaseQuantity.alias("maxPurchaseQuantity")).show(100)

+----------+----------+--------+------------+-----------------+-------------------+
|CustomerId|      date|Quantity|quantityRank|quantityDenseRank|maxPurchaseQuantity|
+----------+----------+--------+------------+-----------------+-------------------+
|     12346|2011-01-18|   74215|           1|                1|              74215|
|     12346|2011-01-18|  -74215|           2|                2|              74215|
|     12347|2010-12-07|      36|           1|                1|                 36|
|     12347|2010-12-07|      30|           2|                2|                 36|
|     12347|2010-12-07|      24|           3|                3|                 36|
|     12347|2010-12-07|      12|           4|                4|                 36|
|     12347|2010-12-07|      12|           4|                4|                 36|
|     12347|2010-12-07|      12|           4|                4|                 36|
|     12347|2010-12-07|      12|           4|                4|             

Grouping sets are a low-level tool for combining sets of aggregations together. They give you the
ability to create arbitrary aggregation in their group-by statements.

**Grouping sets depend on null values for aggregation levels. If you do not filter-out null values, you
will get incorrect results. This applies to cubes, rollups, and grouping sets.**

In [0]:
dfNoNull = dfWithDate.drop()
dfNoNull.createOrReplaceTempView("dfNoNull")

In [0]:
%sql
SELECT CustomerId, stockCode, sum(Quantity) FROM dfNoNull
GROUP BY customerId, stockCode GROUPING SETS((customerId, stockCode),(customerId))
ORDER BY CustomerId DESC, stockCode DESC

customerId,stockCode,sum(Quantity)
18287,85173,48
18287,85040A,48
18287,85039B,120
18287,85039A,96
18287,84920,4
18287,84584,6
18287,84507C,6
18287,72351B,24
18287,72351A,24
18287,72349B,60


a rollup that looks across time (with our new Date column) and space (with the
Country column) and creates a new DataFrame that includes the grand total over all dates, the
grand total for each date in the DataFrame, and the subtotal for each country on each date in the
DataFrame:

In [0]:
rolledUpDF = dfNoNull.rollup("Date", "Country").agg(sum("Quantity"))\
.selectExpr("Date", "Country", "`sum(Quantity)` as total_quantity")\
.orderBy("Date")
rolledUpDF.show()

+----------+--------------+--------------+
|      Date|       Country|total_quantity|
+----------+--------------+--------------+
|      null|          null|       5176450|
|2010-12-01|     Australia|           107|
|2010-12-01|United Kingdom|         23949|
|2010-12-01|        France|           449|
|2010-12-01|          null|         26814|
|2010-12-01|        Norway|          1852|
|2010-12-01|       Germany|           117|
|2010-12-01|          EIRE|           243|
|2010-12-01|   Netherlands|            97|
|2010-12-02|          null|         21023|
|2010-12-02|United Kingdom|         20873|
|2010-12-02|       Germany|           146|
|2010-12-02|          EIRE|             4|
|2010-12-03|      Portugal|            65|
|2010-12-03|          null|         14830|
|2010-12-03|   Switzerland|           110|
|2010-12-03|       Germany|           170|
|2010-12-03|         Spain|           400|
|2010-12-03|United Kingdom|         10439|
|2010-12-03|          EIRE|          2575|
+----------

A cube takes the rollup to a level deeper. Rather than treating elements hierarchically, a cube
does the same thing across all dimensions. This means that it won’t just go by date over the
entire time period, but also the country. To pose this as a question again, can you make a table
that includes the following?
- The total across all dates and countries
- The total for each date across all countries
- The total for each country on each date
- The total for each country across all dates

In [0]:
from pyspark.sql.functions import sum
dfNoNull.cube("Date", "Country").agg(sum(col("Quantity")))\
.select("Date", "Country", "sum(Quantity)").orderBy("Date").show(10)

+----+--------------+-------------+
|Date|       Country|sum(Quantity)|
+----+--------------+-------------+
|null|        France|       110480|
|null|        Cyprus|         6317|
|null|         Italy|         7999|
|null|     Lithuania|          652|
|null|         Japan|        25218|
|null|        Poland|         3653|
|null|United Kingdom|      4263829|
|null|     Singapore|         5234|
|null|Czech Republic|          592|
|null|       Germany|       117448|
+----+--------------+-------------+
only showing top 10 rows



In [0]:
from pyspark.sql.functions import desc, grouping_id
dfNoNull.cube("Date", "Country").agg(grouping_id(),sum(col("Quantity"))).orderBy(desc("grouping_id()")).show()
#.select("Date", "Country", "sum(Quantity)")

+----+--------------------+-------------+-------------+
|Date|             Country|grouping_id()|sum(Quantity)|
+----+--------------------+-------------+-------------+
|null|                null|            3|      5176450|
|null|                EIRE|            2|       142637|
|null|United Arab Emirates|            2|          982|
|null|             Iceland|            2|         2458|
|null|           Singapore|            2|         5234|
|null|              Sweden|            2|        35637|
|null|             Lebanon|            2|          386|
|null|           Lithuania|            2|          652|
|null|              Canada|            2|         2763|
|null|              Greece|            2|         1556|
|null|             Denmark|            2|         8188|
|null|              France|            2|       110480|
|null|             Finland|            2|        10666|
|null|               Japan|            2|        25218|
|null|              Cyprus|            2|       

In [0]:
pivoted = dfWithDate.groupBy("date").pivot("Country").sum()
pivoted.where("date > '2011-12-05'").select("date" ,"`USA_sum(Quantity)`").show()

+----------+-----------------+
|      date|USA_sum(Quantity)|
+----------+-----------------+
|2011-12-06|             null|
|2011-12-09|             null|
|2011-12-08|             -196|
|2011-12-07|             null|
+----------+-----------------+



To create a UDAF, you must inherit from the UserDefinedAggregateFunction base class and
implement the following methods:
- inputSchema represents input arguments as a StructType
- bufferSchema represents intermediate UDAF results as a StructType
- dataType represents the return DataType
- deterministic is a Boolean value that specifies whether this UDAF will return the same result for a given input
- initialize allows you to initialize values of an aggregation buffer
- update describes how you should update the internal buffer based on a given row
- merge describes how two aggregation buffers should be merged
- evaluate will generate the final result of the aggregation

UDAFs are currently available only in Scala or Java

In [0]:
#UDAF in Python

a = sc.parallelize([[1, 'a'],
                    [1, 'b'],
                    [1, 'b'],
                    [2, 'c']]).toDF(['id', 'value'])
a.show()

+---+-----+
| id|value|
+---+-----+
|  1|    a|
|  1|    b|
|  1|    b|
|  2|    c|
+---+-----+



In [0]:
from pyspark.sql.functions import collect_list
x = a.groupBy('id').agg(collect_list('value').alias('value_list'))
x.show()

+---+----------+
| id|value_list|
+---+----------+
|  1| [a, b, b]|
|  2|       [c]|
+---+----------+



In [0]:
# UDF which will count all the occurences of the letter ‘a’
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
def find_a(x):
  """Count 'a's in list."""
  output_count = 0
  for i in x:
    if i == 'a':
      output_count += 1
  return output_count

find_a_udf = udf(find_a, IntegerType())
a.groupBy('id').agg(find_a_udf(collect_list('value')).alias('a_count')).show()

+---+-------+
| id|a_count|
+---+-------+
|  1|      1|
|  2|      0|
+---+-------+



Another way is to use pandas udf

In [0]:
df = spark.createDataFrame(
    [("a", 1, 0), ("a", -1, 42), ("b", 3, -1), ("b", 10, -2)],
    ("key", "value1", "value2")
)

In [0]:
'''from pyspark.sql.types import *

schema = StructType([
    StructField("key", StringType()),
    StructField("avg_min", DoubleType())
])'''

[0;36m  File [0;32m"<command-3083458468313970>"[0;36m, line [0;32m1[0m
[0;31m    from pyspark.sql.types import *,DataType[0m
[0m                                   ^[0m
[0;31mSyntaxError[0m[0;31m:[0m invalid syntax


In [0]:
import pandas as pd

from pyspark.sql.functions import pandas_udf
from pyspark.sql.functions import PandasUDFType
#schema, 
'''@pandas_udf("key string, avg_min double", functionType=PandasUDFType.GROUPED_MAP)
def g(df):
    result = pd.DataFrame(df.groupby(df.key).apply(
        lambda x: x.loc[:, ["value1", "value2"]].min(axis=1).mean()
    ))
    result.reset_index(inplace=True, drop=False)
    return result
    
df.groupby("key").apply(g).show()    
    '''

Out[50]: '@pandas_udf("key string, avg_min double", functionType=PandasUDFType.GROUPED_MAP)\ndef g(df):\n    result = pd.DataFrame(df.groupby(df.key).apply(\n        lambda x: x.loc[:, ["value1", "value2"]].min(axis=1).mean()\n    ))\n    result.reset_index(inplace=True, drop=False)\n    return result'