In [1]:
import findspark
import os
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

findspark.init(os.environ.get('SPARK_HOME'))
data_dir = os.environ.get('DATA_DIR')

In [2]:
spark = SparkSession.builder.appName('aggs').getOrCreate()

In [3]:
df = spark.read.csv(os.path.join(data_dir, 'sales_info.csv'), inferSchema=True, header=True)

In [4]:
df.show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|    Sam|200.0|
|   GOOG|Charlie|120.0|
|   GOOG|  Frank|340.0|
|   MSFT|   Tina|600.0|
|   MSFT|    Amy|124.0|
|   MSFT|Vanessa|243.0|
|     FB|   Carl|870.0|
|     FB|  Sarah|350.0|
|   APPL|   John|250.0|
|   APPL|  Linda|130.0|
|   APPL|   Mike|750.0|
|   APPL|  Chris|350.0|
+-------+-------+-----+



In [5]:
df.printSchema()

root
 |-- Company: string (nullable = true)
 |-- Person: string (nullable = true)
 |-- Sales: double (nullable = true)



## Groupby
groupBy groups the dataframe using the given column so one can run aggregation on thm it returns a `GroupedData` object.

In [6]:
df.groupBy('Company')

<pyspark.sql.group.GroupedData at 0x7f119318ff60>

returns a `GroupedData` object

Here is an example groupby by `Company` and then finding the meanin for each group.

In [7]:
df.groupBy('Company').mean().show()

+-------+-----------------+
|Company|       avg(Sales)|
+-------+-----------------+
|   APPL|            370.0|
|   GOOG|            220.0|
|     FB|            610.0|
|   MSFT|322.3333333333333|
+-------+-----------------+



As you can see spark creates a new column an name it with the aggregate function and the original column.

In [8]:
df.groupBy('Company').min().show()

+-------+----------+
|Company|min(Sales)|
+-------+----------+
|   APPL|     130.0|
|   GOOG|     120.0|
|     FB|     350.0|
|   MSFT|     124.0|
+-------+----------+



There are many available methods that you can apply to a groupeddata object. See the list [here](https://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.GroupedData):

In [9]:
grouped = df.groupBy('Company')
grouped.agg({'Sales': 'sum'}).show()

+-------+----------+
|Company|sum(Sales)|
+-------+----------+
|   APPL|    1480.0|
|   GOOG|     660.0|
|     FB|    1220.0|
|   MSFT|     967.0|
+-------+----------+



Applying more than one aggregation and reanaming (aliasing) at the same time:

In [10]:
grouped.agg(
    F.avg('Sales').alias('avg sum'),
    F.stddev('Sales').alias('std sum')).show()

+-------+-----------------+------------------+
|Company|          avg sum|           std sum|
+-------+-----------------+------------------+
|   APPL|            370.0|268.82460204874604|
|   GOOG|            220.0|111.35528725660043|
|     FB|            610.0| 367.6955262170047|
|   MSFT|322.3333333333333| 247.7182539364698|
+-------+-----------------+------------------+



## Aggregation

Sometimes you want to apply the agg funcitons on a column and not the groupeddata:

In [11]:
df.agg({'Sales': 'sum'}).show()

+----------+
|sum(Sales)|
+----------+
|    4327.0|
+----------+



In [12]:
df.select(F.countDistinct('Sales')).show()

+---------------------+
|count(DISTINCT Sales)|
+---------------------+
|                   11|
+---------------------+



In [13]:
df.select(F.avg('Sales')).show()

+-----------------+
|       avg(Sales)|
+-----------------+
|360.5833333333333|
+-----------------+



In [14]:
df.select(F.avg('Sales').alias('Average Sales')).show()

+-----------------+
|    Average Sales|
+-----------------+
|360.5833333333333|
+-----------------+



In [15]:
df.select(F.stddev('Sales').alias('std')).show()

+------------------+
|               std|
+------------------+
|250.08742410799007|
+------------------+



### format_number

In [16]:
std = df.select(F.stddev('Sales').alias('std'))
std.select(F.format_number('std', 2).alias('std')).show()

+------+
|   std|
+------+
|250.09|
+------+



## OrderBy
Simple orderby

In [17]:
df.orderBy('Sales').show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|Charlie|120.0|
|   MSFT|    Amy|124.0|
|   APPL|  Linda|130.0|
|   GOOG|    Sam|200.0|
|   MSFT|Vanessa|243.0|
|   APPL|   John|250.0|
|   GOOG|  Frank|340.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   MSFT|   Tina|600.0|
|   APPL|   Mike|750.0|
|     FB|   Carl|870.0|
+-------+-------+-----+



descending

In [18]:
df.orderBy(df['Sales'].desc()).show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|     FB|   Carl|870.0|
|   APPL|   Mike|750.0|
|   MSFT|   Tina|600.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   GOOG|  Frank|340.0|
|   APPL|   John|250.0|
|   MSFT|Vanessa|243.0|
|   GOOG|    Sam|200.0|
|   APPL|  Linda|130.0|
|   MSFT|    Amy|124.0|
|   GOOG|Charlie|120.0|
+-------+-------+-----+



hmmm!

# Missing data

In [19]:
df = spark.read.csv(
    os.path.join(data_dir, 'ContainsNull.csv'),
    header=True, inferSchema=True)

In [20]:
df.show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



Drop takes how, subset, and thresh as parameters:

In [21]:
df.na.drop().show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp4|Cindy|456.0|
+----+-----+-----+



Drops any row containing a null in any columns.

In [22]:
df.na.drop(thresh=2).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



you can also use it like pandas:

In [23]:
df.dropna(thresh=2).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [24]:
df.na.drop(how='all').show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [25]:
df.na.drop(subset=['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [26]:
df.na.fill(0).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|  0.0|
|emp2| null|  0.0|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [27]:
df.na.fill('EMPTY').show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2|EMPTY| null|
|emp3|EMPTY|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [28]:
mean_val = df.select(
    F.mean(df['Sales'])).collect()

This is a list of rows:

In [29]:
mean_val

[Row(avg(Sales)=400.5)]

In [30]:
df.na.fill(mean_val[0][0], subset=['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|400.5|
|emp2| null|400.5|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



It's possible to do it in one line:

In [31]:
df.na.fill(
    df.select(F.mean(df['Sales'])).collect()[0][0],
    subset=['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|400.5|
|emp2| null|400.5|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



you can also use the `fillna` method: 

In [32]:
df.fillna(
    df.select(F.mean(df['Sales'])).collect()[0][0],
    subset=['Sales']
).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|400.5|
|emp2| null|400.5|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+

