**_pySpark Basics: Moving Average Imputation_**

_by Jeff Levy (jlevy@urban.org)_

_Last Updated: 31 Jul 2017, Spark v2.1_

_**Abstract:** In this guide we will demonstrate pySpark's windowing function by imputing missing values using a moving average._

_**Main operations used:** `window`, `partitionBy`, `orderBy`, `over`, `when`, `otherwise`_

In [None]:
from pyspark.sql import SparkSession

# Spark session & context
spark = SparkSession.builder.master("local").getOrCreate()
sc = spark.sparkContext

***

Think of windowing as **defining a range around any given row that counts as the window**, then **performing an operation just within that window.**  So if we define our window range as -1 to +1, then it will go over the data and examine every row along with its preceeding and following rows.  If we define it as 0 to +3, it will look at every row and the three following rows.  Obviously for this to make sense **the rows must be in a meaningful order**, and we have to **deliniate any groups within the data.**  That is to say, if we have panel data with 12 monthly observations for all 50 states, and we don't group before we define a window, then a -1 to +1 window might include November and December for one state, then January for the next state.

There are many ways to handle imputing missing data, and many uses for pySpark's windowing function.  We will demonstrate the intersection of these two concepts using a modified version of the dimaonds dataset, where several values in the `'price'` column have been deleted:

In [1]:
df = spark.read.csv('diamonds_nulls.csv', 
                    inferSchema=True, header=True, sep=',', nullValue='')

Show the dataframe

+-----+---------+-----+-------+-----+-----+-----+----+----+----+
|carat|      cut|color|clarity|depth|table|price|   x|   y|   z|
+-----+---------+-----+-------+-----+-----+-----+----+----+----+
| 0.23|    Ideal|    E|    SI2| 61.5| 55.0|  326|3.95|3.98|2.43|
| 0.21|  Premium|    E|    SI1| 59.8| 61.0|  326|3.89|3.84|2.31|
| 0.23|     Good|    E|    VS1| 56.9| 65.0|  327|4.05|4.07|2.31|
| 0.29|  Premium|    I|    VS2| 62.4| 58.0|  334| 4.2|4.23|2.63|
| 0.31|     Good|    J|    SI2| 63.3| 58.0|  335|4.34|4.35|2.75|
| 0.24|Very Good|    J|   VVS2| 62.8| 57.0|  336|3.94|3.96|2.48|
| 0.24|Very Good|    I|   VVS1| 62.3| 57.0|  336|3.95|3.98|2.47|
| 0.26|Very Good|    H|    SI1| 61.9| 55.0|  337|4.07|4.11|2.53|
| 0.22|     Fair|    E|    VS2| 65.1| 61.0|  337|3.87|3.78|2.49|
| 0.23|Very Good|    H|    VS1| 59.4| 61.0|  338| 4.0|4.05|2.39|
|  0.3|     Good|    J|    SI1| 64.0| 55.0|  339|4.25|4.28|2.73|
| 0.23|    Ideal|    J|    VS1| 62.8| 56.0|  340|3.93| 3.9|2.46|
| 0.22|  Premium|    F|  

We can also show the subset where the value for `'price'` has been replaced with `null`:

# Defining a Window

The first step to windowing is to **define the window parameters.**  We do this by combining three elements: the grouping (`partitionBy`), the ordering (`orderBy`) and the range (`rowsBetween`).  The window we define is then assigned to a variable, which we will use to perform computations.

In [4]:
from pyspark.sql import Window

In [5]:
window = Window.partitionBy('cut', 'clarity').orderBy('price').rowsBetween(-3, 3)

In [6]:
window

<pyspark.sql.window.WindowSpec at 0x7faf3ca68690>

**We now have an object of type `WindowSpec` that knows what the window should look like.  **

The first portion, `partionBy('cut', 'clarity')` is somewhat misleadningly named, as it is not related to *partitions* in Spark, which are segments of the distributed data, roughly analogous to individual computers within the cluster.  It is much more closely related to `groupBy`, as discussed for example in the *basics 2.ipynb* tutorial.  It tells pySpark that the windows should only be computed within each grouping of the columns `'cut'` and `'clarity'`.  Like `groupBy`, `partitionBy` can take one or more criteria.

The second portion, `orderBy('price')` simply sorts the data by price *within each partitionBy column*.

And finally, `rowsBetween(-3, 3)` specifies the size of the window.  In this case it includes seven rows in each window - the current row plus the three before and the three after.

# Operations Over a Window

The next step is to apply this window to an operation, which we can do using the `over` method.  Here we will use `mean` as our aggregator, but you can do this with any valid aggregator function.

In [7]:
from pyspark.sql.functions import mean

Create a dataframe measuring the `mean` price of the `Price` over your windowobject of the original dataframe. 

Show the results.

What this creates is a *column object* that contains the set of SQL instructions needed to create the data.  It hasn't been discussed in these tutorials, but pySpark is capable of taking SQL formatted instructions for most operations, and in the case of windowing, SQL is what underlies the Python code.

**Remember that pySpark dataframes are immutable, so we cannot just fill in missing values.**  Instead we have to create a new column, then recast the dataframe to include it:

And it returns a dataframe sorted by the specifications from our window function with the new column fully calculated.  Note that the first entry computes a window of 0, +3, the second entry a window of -1, +3, the third -2, +3 and the fourth finally -3, +3.  It would be reasonable to expect it to compute `null` values where the full window range can't be operated over; neither way is necessarily wrong, but make sure you note how pySpark handles it.

# Imputation

Due to immutability, we will recast the dataframe with yet another column that takes the value from the `'price'` column if it **is not `null`**, and fills in the value from the `'moving_avg'` column if it **is `null`**.  We will do this using pySpark's built in *`when... otherwise`* conditionals.  It is an intuitive, if not very Pythonic, formulation.  

In [10]:
from pyspark.sql.functions import when, col

def replace_null(orig, ma):
    return when(orig.isNull(), ma).otherwise(orig)

Cast the condition into a new column named `'imputed'`, using the`withColumn` function. 

In [None]:
df_new.show()

We can see in the above, on the first row the price is `null`, and the imputed column shows the moving average value.  On all the other rows the price has an actual value, and the imputed column uses those values.  Below we can look again at the top 50 rows where price is `null`:

In [13]:
df_new.where(df['price'].isNull()).show(50)

+-----+---------+-----+-------+-----+-----+-----+----+----+----+-----------------+-----------------+
|carat|      cut|color|clarity|depth|table|price|   x|   y|   z|       moving_avg|          imputed|
+-----+---------+-----+-------+-----+-----+-----+----+----+----+-----------------+-----------------+
| 0.73|  Premium|    F|    VS2| 62.5| 57.0| null|5.75| 5.7|3.58|            356.0|            356.0|
| 0.92|  Premium|    D|     I1| 63.0| 58.0| null|6.18|6.13|3.88|394.3333333333333|394.3333333333333|
| 0.71|Very Good|    J|   VVS2| 61.1| 58.0| null| 5.7|5.75| 3.5|            356.0|            356.0|
| 0.35|    Ideal|    I|    VS1| 60.9| 57.0| null|4.54|4.59|2.78|            340.0|            340.0|
| 0.34|    Ideal|    E|    VS1| 61.2| 55.0| null|4.52|4.56|2.77|            349.0|            349.0|
| 0.58|    Ideal|    F|    VS1| 60.3| 57.0| null|5.47|5.44|3.29|            357.0|            357.0|
|  0.7|     Good|    F|    VS1| 59.4| 62.0| null|5.71|5.76| 3.4|353.6666666666667|353.66666

And we see that in all cases, the `'imputed'` column has the moving average value listed.