In [36]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("SparkAdvancedOps")\
        .master("spark://spark-master:7077").getOrCreate() 
df = spark.read.format("csv")\
          .option("header", "true")\
          .option("inferSchema", "true")\
          .load("/home/jovyan/data/online-retail-dataset.csv")\
          .coalesce(5)
df.cache()

DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: int, InvoiceDate: string, UnitPrice: double, CustomerID: int, Country: string]

We can use the function **count** to count the number of items (rows) in a group defined by one or more columns. The function **countDistinct** is for counting the number of distinct items in a group.

In [10]:
from pyspark.sql.functions import count
df.select(count("StockCode")).show()

+----------------+
|count(StockCode)|
+----------------+
|          541909|
+----------------+



In [11]:
from pyspark.sql.functions import countDistinct
df.select(countDistinct("StockCode")).show()

+-------------------------+
|count(DISTINCT StockCode)|
+-------------------------+
|                     4070|
+-------------------------+



We can also use an approximation of the distinct count. This is efficient compared with counting all. 

In [12]:
from pyspark.sql.functions import approx_count_distinct
df.select(approx_count_distinct("StockCode", 0.1)).show()

+--------------------------------+
|approx_count_distinct(StockCode)|
+--------------------------------+
|                            3364|
+--------------------------------+



We can also get first, last, minimum, and maximum values from a group (one or more columns) in a DataFrame.

In [13]:
from pyspark.sql.functions import first, last, min, max

df.select(first("StockCode"), last("StockCode")).show()
df.select(min("Quantity"), max("Quantity")).show()

+----------------+---------------+
|first(StockCode)|last(StockCode)|
+----------------+---------------+
|          85123A|          22138|
+----------------+---------------+

+-------------+-------------+
|min(Quantity)|max(Quantity)|
+-------------+-------------+
|       -80995|        80995|
+-------------+-------------+



We can also compute the sum for each numeric columns for each group.

In [14]:
from pyspark.sql.functions import sum, sumDistinct

df.select(sum("Quantity")).show() 
df.select(sumDistinct("Quantity")).show() 

+-------------+
|sum(Quantity)|
+-------------+
|      5176450|
+-------------+

+----------------------+
|sum(DISTINCT Quantity)|
+----------------------+
|                 29310|
+----------------------+



We can also compute average values for each numeric columns for each group. mean() is an alias for avg(). Note that each select clause returns a new data frame. 

In [15]:
from pyspark.sql.functions import sum, count, avg, expr

df.select(
    count("Quantity").alias("total_transactions"),
    sum("Quantity").alias("total_purchases"),
    avg("Quantity").alias("avg_purchases"),
    expr("mean(Quantity)").alias("mean_purchases"))\
  .selectExpr(
    "total_purchases/total_transactions",
    "avg_purchases",
    "mean_purchases").show()

+--------------------------------------+----------------+----------------+
|(total_purchases / total_transactions)|   avg_purchases|  mean_purchases|
+--------------------------------------+----------------+----------------+
|                      9.55224954743324|9.55224954743324|9.55224954743324|
+--------------------------------------+----------------+----------------+



Spark has both the formula for the sample standard deviation and the formula for the population standard deviation.

In [17]:
from pyspark.sql.functions import var_pop, stddev_pop
from pyspark.sql.functions import var_samp, stddev_samp

df.select(var_pop("Quantity"), var_samp("Quantity"),
  stddev_pop("Quantity"), stddev_samp("Quantity")).show()

+------------------+------------------+--------------------+---------------------+
| var_pop(Quantity)|var_samp(Quantity)|stddev_pop(Quantity)|stddev_samp(Quantity)|
+------------------+------------------+--------------------+---------------------+
|47559.303646609354| 47559.39140929905|  218.08095663447864|   218.08115785023486|
+------------------+------------------+--------------------+---------------------+



Spark has the formula for computing the skewness and kurtosis of the values in a group. Skewness measures the asymmetry of the values in your data around the mean. kurtosis is a measure of the tail of data.

In [18]:
from pyspark.sql.functions import skewness, kurtosis
df.select(skewness("Quantity"), kurtosis("Quantity")).show()

+--------------------+------------------+
|  skewness(Quantity)|kurtosis(Quantity)|
+--------------------+------------------+
|-0.26407557610527843|119768.05495536518|
+--------------------+------------------+



We can calculate the correlation of two columns of a DataFrame as a double value. Correlation measures the Pearson correlation coefficient, which is scaled between –1 and +1.
We can also compute the sample and population covariance of two columns.

In [19]:
from pyspark.sql.functions import corr, covar_pop, covar_samp

df.select(corr("InvoiceNo", "Quantity"), covar_samp("InvoiceNo", "Quantity"),
  covar_pop("InvoiceNo", "Quantity")).show()

+-------------------------+-------------------------------+------------------------------+
|corr(InvoiceNo, Quantity)|covar_samp(InvoiceNo, Quantity)|covar_pop(InvoiceNo, Quantity)|
+-------------------------+-------------------------------+------------------------------+
|     4.912186085637639E-4|             1052.7280543913773|            1052.7260778752732|
+-------------------------+-------------------------------+------------------------------+



We do aggregations on complex types (collection).
collect_list- returns a list of objects with duplicates.
collect_set - returns a set of objects with duplicate elements eliminated.

In [20]:
from pyspark.sql.functions import collect_set, collect_list

df.agg(collect_set("Country"), collect_list("Country")).show(2)

+--------------------+---------------------+
|collect_set(Country)|collect_list(Country)|
+--------------------+---------------------+
|[Portugal, Italy,...| [United Kingdom, ...|
+--------------------+---------------------+



We can use the groupby clause (like SQL groupby) to group the data on one or more columns, and perform some calculations on the other columns that end up in that group. 

In [21]:
df.groupBy("InvoiceNo", "CustomerId").count().show()

+---------+----------+-----+
|InvoiceNo|CustomerId|count|
+---------+----------+-----+
|   536846|     14573|   76|
|   537026|     12395|   12|
|   537883|     14437|    5|
|   538068|     17978|   12|
|   538279|     14952|    7|
|   538800|     16458|   10|
|   538942|     17346|   12|
|  C539947|     13854|    1|
|   540096|     13253|   16|
|   540530|     14755|   27|
|   541225|     14099|   19|
|   541978|     13551|    4|
|   542093|     17677|   16|
|   543188|     12567|   63|
|   543590|     17377|   19|
|  C543757|     13115|    1|
|  C544318|     12989|    1|
|   544578|     12365|    1|
|   545165|     16339|   20|
|   545289|     14732|   30|
+---------+----------+-----+
only showing top 20 rows



We normally do aggregations over groups. We can pass-in arbitrary expressions with agg function. 

In [22]:
from pyspark.sql.functions import count

df.groupBy("InvoiceNo").agg(
  count("Quantity").alias("quan"), # give the column name quan for count values
  expr("count(Quantity)")).show()

+---------+----+---------------+
|InvoiceNo|quan|count(Quantity)|
+---------+----+---------------+
|   536596|   6|              6|
|   536938|  14|             14|
|   537252|   1|              1|
|   537691|  20|             20|
|   538041|   1|              1|
|   538184|  26|             26|
|   538517|  53|             53|
|   538879|  19|             19|
|   539275|   6|              6|
|   539630|  12|             12|
|   540499|  24|             24|
|   540540|  22|             22|
|  C540850|   1|              1|
|   540976|  48|             48|
|   541432|   4|              4|
|   541518| 101|            101|
|   541783|  35|             35|
|   542026|   9|              9|
|   542375|   6|              6|
|  C542604|   8|              8|
+---------+----+---------------+
only showing top 20 rows



In [23]:
df.groupBy("InvoiceNo").agg(expr("avg(Quantity)"),expr("stddev_pop(Quantity)"))\
.show()

+---------+------------------+--------------------+
|InvoiceNo|     avg(Quantity)|stddev_pop(Quantity)|
+---------+------------------+--------------------+
|   536596|               1.5|  1.1180339887498947|
|   536938|33.142857142857146|  20.698023172885524|
|   537252|              31.0|                 0.0|
|   537691|              8.15|   5.597097462078001|
|   538041|              30.0|                 0.0|
|   538184|12.076923076923077|   8.142590198943392|
|   538517|3.0377358490566038|  2.3946659604837897|
|   538879|21.157894736842106|  11.811070444356483|
|   539275|              26.0|  12.806248474865697|
|   539630|20.333333333333332|  10.225241100118645|
|   540499|              3.75|  2.6653642652865788|
|   540540|2.1363636363636362|  1.0572457590557278|
|  C540850|              -1.0|                 0.0|
|   540976|10.520833333333334|   6.496760677872902|
|   541432|             12.25|  10.825317547305483|
|   541518| 23.10891089108911|  20.550782784878713|
|   541783|1

**Window Functions**

We can use window functions to carry out some unique aggregations by either computing some aggregation on a specific “window” of data. A window function calculates a return value for every input row of a table based on a group of rows, called a frame. Each row can fall into one or more frames. A common use case is to take a look at a rolling average of some value for which each row represents one day. Each row would end up in seven different frames.

The first step to define the window. We can use Window class, which has three methods.
partitionBy(*cols) - divide into groups
orderBy(*cols) - ordering within the group
rowsBetween(start, end) - the frame specification (the rowsBetween statement) states which rows will be included in the frame. In the following examole, all previous rows up to the current row.

In [24]:
from pyspark.sql.functions import col, to_date
from pyspark.sql.window import Window
from pyspark.sql.functions import desc

dfWithDate = df.withColumn("date", to_date(col("InvoiceDate"), "M/d/y H:mm")) # our dataframe with date column

windowSpec = Window\
  .partitionBy("CustomerId", "date")\
  .orderBy(desc("Quantity"))\
  .rowsBetween(Window.unboundedPreceding, Window.currentRow)

Now, we can apply window functions over the defined window. Spark supports three kinds of window functions: ranking functions, analytic functions, and aggregate functions. Let’s consider the following example.
An aggregate function - find the maximum purchase quantity over all time
A ranking function - determine which date had the maximum purchase quantity for every customer.

In [25]:
from pyspark.sql.functions import max, dense_rank, rank

maxPurchaseQuantity = max(col("Quantity")).over(windowSpec)
purchaseDenseRank = dense_rank().over(windowSpec)
purchaseRank = rank().over(windowSpec)


The above functions return columns (or expressions), which can be used in a DataFrame select statement

In [26]:
dfWithDate.where("CustomerId IS NOT NULL").orderBy("CustomerId")\
  .select(
    col("CustomerId"),
    col("date"),
    col("Quantity"),
    purchaseRank.alias("quantityRank"),
    purchaseDenseRank.alias("quantityDenseRank"),
    maxPurchaseQuantity.alias("maxPurchaseQuantity")).show()

+----------+----------+--------+------------+-----------------+-------------------+
|CustomerId|      date|Quantity|quantityRank|quantityDenseRank|maxPurchaseQuantity|
+----------+----------+--------+------------+-----------------+-------------------+
|     12346|2011-01-18|   74215|           1|                1|              74215|
|     12346|2011-01-18|  -74215|           2|                2|              74215|
|     12347|2010-12-07|      36|           1|                1|                 36|
|     12347|2010-12-07|      30|           2|                2|                 36|
|     12347|2010-12-07|      24|           3|                3|                 36|
|     12347|2010-12-07|      12|           4|                4|                 36|
|     12347|2010-12-07|      12|           4|                4|                 36|
|     12347|2010-12-07|      12|           4|                4|                 36|
|     12347|2010-12-07|      12|           4|                4|             

We can use grouping sets for aggregations across multiple groups. With Dataframe, we have to use the rollup and cube operators for implementing grouping sets.

Grouping sets depend on null values for aggregation levels. If you do not filter-out null values, you will get incorrect results. This applies to cubes, rollups, and grouping sets.

With the rollup function, we can create a multi-dimensional rollup for the current DataFrame using the specified columns, so we can run aggregation on them.

The following rollup looks across time (with our new Date column) and space (with the Country column) and creates a new DataFrame that includes the grand total over all dates, the grand total for each date in the DataFrame, and the subtotal for each country on each date in the DataFrame.

In [28]:
dfNoNull = dfWithDate.drop()
rolledUpDF = dfNoNull.rollup("Date", "Country").agg(sum("Quantity"))\
.selectExpr("Date", "Country", "`sum(Quantity)` as total_quantity")\
.orderBy("Date")
rolledUpDF.show()

+----------+--------------+--------------+
|      Date|       Country|total_quantity|
+----------+--------------+--------------+
|      null|          null|       5176450|
|2010-12-01|       Germany|           117|
|2010-12-01|        France|           449|
|2010-12-01|     Australia|           107|
|2010-12-01|United Kingdom|         23949|
|2010-12-01|        Norway|          1852|
|2010-12-01|          null|         26814|
|2010-12-01|          EIRE|           243|
|2010-12-01|   Netherlands|            97|
|2010-12-02|       Germany|           146|
|2010-12-02|          EIRE|             4|
|2010-12-02|          null|         21023|
|2010-12-02|United Kingdom|         20873|
|2010-12-03|         Spain|           400|
|2010-12-03|        France|           239|
|2010-12-03|       Germany|           170|
|2010-12-03|   Switzerland|           110|
|2010-12-03|United Kingdom|         10439|
|2010-12-03|          EIRE|          2575|
|2010-12-03|      Portugal|            65|
+----------

A cube does the rollups across all dimensions. It takes a list of grouping columns and applies aggregate expressions to all possible combinations of them. 

In [29]:
dfNoNull.cube("Date", "Country").agg(sum(col("Quantity")))\
.select("Date", "Country", "sum(Quantity)").orderBy("Date").show()

+----+--------------------+-------------+
|Date|             Country|sum(Quantity)|
+----+--------------------+-------------+
|null|             Denmark|         8188|
|null|             Lebanon|          386|
|null|              Canada|         2763|
|null|              Sweden|        35637|
|null|               Japan|        25218|
|null|      United Kingdom|      4263829|
|null|              Greece|         1556|
|null|             Germany|       117448|
|null|      Czech Republic|          592|
|null|              France|       110480|
|null|               Italy|         7999|
|null|              Poland|         3653|
|null|                 USA|         1034|
|null|United Arab Emirates|          982|
|null|             Iceland|         2458|
|null|           Singapore|         5234|
|null|           Lithuania|          652|
|null|               Malta|          944|
|null|  European Community|          497|
|null|             Finland|        10666|
+----+--------------------+-------

Pivots a column of the current DataFrame and perform the specified aggregation. We can convert the distinct values in a row into columns with Pivot.

In the following example, DataFrame will now have a column for every combination of unique country name, numeric variable, and a column specifying the date. For example, for USA, we have the following columns: USA_sum(Quantity), USA_sum(UnitPrice), USA_sum(CustomerID). This represents one for each numeric column in our dataset (because we just performed an aggregation (sum) over all of them).

In [33]:
pivoted = dfWithDate.groupBy("date").pivot("Country").sum()
pivoted.printSchema()
pivoted.show(5)

root
 |-- date: date (nullable = true)
 |-- Australia_sum(CAST(Quantity AS BIGINT)): long (nullable = true)
 |-- Australia_sum(UnitPrice): double (nullable = true)
 |-- Australia_sum(CAST(CustomerID AS BIGINT)): long (nullable = true)
 |-- Austria_sum(CAST(Quantity AS BIGINT)): long (nullable = true)
 |-- Austria_sum(UnitPrice): double (nullable = true)
 |-- Austria_sum(CAST(CustomerID AS BIGINT)): long (nullable = true)
 |-- Bahrain_sum(CAST(Quantity AS BIGINT)): long (nullable = true)
 |-- Bahrain_sum(UnitPrice): double (nullable = true)
 |-- Bahrain_sum(CAST(CustomerID AS BIGINT)): long (nullable = true)
 |-- Belgium_sum(CAST(Quantity AS BIGINT)): long (nullable = true)
 |-- Belgium_sum(UnitPrice): double (nullable = true)
 |-- Belgium_sum(CAST(CustomerID AS BIGINT)): long (nullable = true)
 |-- Brazil_sum(CAST(Quantity AS BIGINT)): long (nullable = true)
 |-- Brazil_sum(UnitPrice): double (nullable = true)
 |-- Brazil_sum(CAST(CustomerID AS BIGINT)): long (nullable = true)
 |-- Can

**Join Operations**

We can perform join operations over DataFrames. In SQL, Join table operations combine columns from one or more tables in a relational database. Here, Dataframes instead of tables.
The following is the dataset for learning join operations. 

https://spark.apache.org/docs/latest/sql-ref-syntax-qry-select-join.html

In [37]:
person = spark.createDataFrame([
    (0, "Bill Chambers", 0, [100]),
    (1, "Matei Zaharia", 1, [500, 250, 100]),
    (2, "Michael Armbrust", 1, [250, 100])])\
    .toDF("id", "name", "graduate_program", "spark_status")

graduateProgram = spark.createDataFrame([
    (0, "Masters", "School of Information", "UC Berkeley"),
    (2, "Masters", "EECS", "UC Berkeley"),
    (1, "Ph.D.", "EECS", "UC Berkeley")])\
    .toDF("id", "degree", "department", "school")

sparkStatus = spark.createDataFrame([
    (500, "Vice President"),
    (250, "PMC Member"),
    (100, "Contributor")])\
    .toDF("id", "status")

In [38]:
joinExpression = person["graduate_program"] == graduateProgram['id']
person.join(graduateProgram, joinExpression,"inner").show()

+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
| id|            name|graduate_program|   spark_status| id| degree|          department|     school|
+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
|  0|   Bill Chambers|               0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|  1|   Matei Zaharia|               1|[500, 250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|  2|Michael Armbrust|               1|     [250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
+---+----------------+----------------+---------------+---+-------+--------------------+-----------+



In [39]:
person.join(graduateProgram, joinExpression,"inner").show()

+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
| id|            name|graduate_program|   spark_status| id| degree|          department|     school|
+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
|  0|   Bill Chambers|               0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|  1|   Matei Zaharia|               1|[500, 250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|  2|Michael Armbrust|               1|     [250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
+---+----------------+----------------+---------------+---+-------+--------------------+-----------+



In [41]:
person.join(graduateProgram, joinExpression, "outer").show()

+----+----------------+----------------+---------------+---+-------+--------------------+-----------+
|  id|            name|graduate_program|   spark_status| id| degree|          department|     school|
+----+----------------+----------------+---------------+---+-------+--------------------+-----------+
|   0|   Bill Chambers|               0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|   1|   Matei Zaharia|               1|[500, 250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|   2|Michael Armbrust|               1|     [250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|null|            null|            null|           null|  2|Masters|                EECS|UC Berkeley|
+----+----------------+----------------+---------------+---+-------+--------------------+-----------+



In [42]:
graduateProgram.join(person, joinExpression, "left_outer").show()

+---+-------+--------------------+-----------+----+----------------+----------------+---------------+
| id| degree|          department|     school|  id|            name|graduate_program|   spark_status|
+---+-------+--------------------+-----------+----+----------------+----------------+---------------+
|  0|Masters|School of Informa...|UC Berkeley|   0|   Bill Chambers|               0|          [100]|
|  1|  Ph.D.|                EECS|UC Berkeley|   1|   Matei Zaharia|               1|[500, 250, 100]|
|  1|  Ph.D.|                EECS|UC Berkeley|   2|Michael Armbrust|               1|     [250, 100]|
|  2|Masters|                EECS|UC Berkeley|null|            null|            null|           null|
+---+-------+--------------------+-----------+----+----------------+----------------+---------------+



In [43]:
person.join(graduateProgram, joinExpression, "right_outer").show()

+----+----------------+----------------+---------------+---+-------+--------------------+-----------+
|  id|            name|graduate_program|   spark_status| id| degree|          department|     school|
+----+----------------+----------------+---------------+---+-------+--------------------+-----------+
|   0|   Bill Chambers|               0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|   1|   Matei Zaharia|               1|[500, 250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|   2|Michael Armbrust|               1|     [250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|null|            null|            null|           null|  2|Masters|                EECS|UC Berkeley|
+----+----------------+----------------+---------------+---+-------+--------------------+-----------+



In [44]:
graduateProgram.join(person, joinExpression, "left_anti").show()

+---+-------+----------+-----------+
| id| degree|department|     school|
+---+-------+----------+-----------+
|  2|Masters|      EECS|UC Berkeley|
+---+-------+----------+-----------+



In [45]:
graduateProgram.join(person, joinExpression, "left_semi").show()

+---+-------+--------------------+-----------+
| id| degree|          department|     school|
+---+-------+--------------------+-----------+
|  0|Masters|School of Informa...|UC Berkeley|
|  1|  Ph.D.|                EECS|UC Berkeley|
+---+-------+--------------------+-----------+



In [46]:
graduateProgram.join(person, joinExpression, "cross").show()

+---+-------+--------------------+-----------+---+----------------+----------------+---------------+
| id| degree|          department|     school| id|            name|graduate_program|   spark_status|
+---+-------+--------------------+-----------+---+----------------+----------------+---------------+
|  0|Masters|School of Informa...|UC Berkeley|  0|   Bill Chambers|               0|          [100]|
|  1|  Ph.D.|                EECS|UC Berkeley|  1|   Matei Zaharia|               1|[500, 250, 100]|
|  1|  Ph.D.|                EECS|UC Berkeley|  2|Michael Armbrust|               1|     [250, 100]|
+---+-------+--------------------+-----------+---+----------------+----------------+---------------+



We can also perform joins based on complex types by using join expressions. Any expression is a valid join expression as long as it returns a Boolean.
In the following example, the join expression check If “id” is in “spark_status” column data (which is an array).

In [47]:
from pyspark.sql.functions import expr

person.withColumnRenamed("id", "personId")\
    .join(sparkStatus, expr("array_contains(spark_status, id)")).show()


+--------+----------------+----------------+---------------+---+--------------+
|personId|            name|graduate_program|   spark_status| id|        status|
+--------+----------------+----------------+---------------+---+--------------+
|       0|   Bill Chambers|               0|          [100]|100|   Contributor|
|       1|   Matei Zaharia|               1|[500, 250, 100]|500|Vice President|
|       1|   Matei Zaharia|               1|[500, 250, 100]|250|    PMC Member|
|       1|   Matei Zaharia|               1|[500, 250, 100]|100|   Contributor|
|       2|Michael Armbrust|               1|     [250, 100]|250|    PMC Member|
|       2|Michael Armbrust|               1|     [250, 100]|100|   Contributor|
+--------+----------------+----------------+---------------+---+--------------+



**User Defined Functions**

We also define and use our own custom functions as part of transformations (within the expressions). 


In [15]:
from pyspark.sql.functions import col
from pyspark.sql.functions import udf
# define the function
def power3(double_value):
    return double_value ** 3
# register the function
power3udf = udf(power3)
# use the function with SQL quries
udfExampleDF = spark.range(5).toDF("num")
udfExampleDF.select(power3udf(col("num"))).show(2)

+-----------+
|power3(num)|
+-----------+
|          0|
|          1|
+-----------+
only showing top 2 rows



In [34]:
# Stop the spark context
spark.stop()