## Pandas UDF
This notebook demostrates how to use pandas_udf function to work with pyspark DataFrame with a flavor of pandas DataFrame

### Import libraries and data

In [1]:
import findspark
findspark.init('/Users/ying/spark-2.3.2-bin-hadoop2.7/')  #provide spark path 

In [2]:
import warnings
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession

We are going to use `pandas_udf` and PandasUDFType from `pyspark.sql.functions`. As pandas_udf was built on top of Apache Arrow, make sure pyarrow is installed. To install pyarrow  
**Conda**:
```bash
conda install -c conda-forge pyarrow
```
Or **Pip**:
```bash
pip install pyarrow
```

In [3]:
from pyspark.sql.functions import pandas_udf, PandasUDFType

In [4]:
# Do not show warnings
warnings.filterwarnings('ignore')

Create SparkSession for DataFrame

In [5]:
spark = SparkSession.builder.getOrCreate()

Load data. Here I am using a subset of movie rating downloaded from MovieLens.

In [6]:
df = spark.read.csv('rating_10000.csv', sep=',', inferSchema=True,
                    header=True)

In [7]:
df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



In [8]:
# remove timestamp column since I am not going to work with it.
df = df.drop('timestamp')

Also make a copy of pandas dataframe from spark dataframe

In [9]:
pdf = df.toPandas()

### Task 1. Demeaning rating using Scalar Pandas UDFs
Group the dataframe by `userId` and subtract each rating by the mean rating within each user group. As we will only use `rating` to calculate the `demeaned_rating`, we will use the Scalar Pandas UDF, which takes a column and return a column. 
The function is exactly the same function we will use with a pandas dataframe. We just add a pandas_udf decorator with specified `returnType` and `functionType`.

First, let see how we do it with pandas dataframe - define a demean function and use it with `withColumn`.

In [10]:
def demean(rating):
    return rating - rating.mean()

In [11]:
pdf['demean'] = pdf['rating'].transform(demean)

In [12]:
pdf.head(7)

Unnamed: 0,userId,movieId,rating,demean
0,9761,11,3.0,-0.49275
1,9761,14,2.0,-1.49275
2,9761,17,3.0,-0.49275
3,9761,19,1.0,-2.49275
4,9761,21,3.0,-0.49275
5,9761,31,4.0,0.50725
6,9761,32,4.0,0.50725


In [13]:
pdf.shape

(10000, 4)

Now we use pandas_udf to work with spark dataframe:

In [14]:
@pandas_udf('double', PandasUDFType.SCALAR)
def demean(rating):
    return rating - rating.mean()

In [15]:
df_demean = df.withColumn('demeaned_rating', demean('rating'))

In [16]:
df_demean.show(7)

+------+-------+------+---------------+
|userId|movieId|rating|demeaned_rating|
+------+-------+------+---------------+
|  9761|     11|   3.0|       -0.49275|
|  9761|     14|   2.0|       -1.49275|
|  9761|     17|   3.0|       -0.49275|
|  9761|     19|   1.0|       -2.49275|
|  9761|     21|   3.0|       -0.49275|
|  9761|     31|   4.0|        0.50725|
|  9761|     32|   4.0|        0.50725|
+------+-------+------+---------------+
only showing top 7 rows



We can actually get the demeaned rating by passing the expression directly to `withColumn` (as shown below). Just wanted to show how Scalar Pandas UDF works. We will also see how the Grouped Map Pandas UDFs works with `groupBy`. 

In [17]:
from pyspark.sql.functions import avg

In [18]:
df.withColumn('demeaned', df['rating']-df.select(avg('rating')).collect()[0][0]).show(7)

+------+-------+------+--------+
|userId|movieId|rating|demeaned|
+------+-------+------+--------+
|  9761|     11|   3.0|-0.49275|
|  9761|     14|   2.0|-1.49275|
|  9761|     17|   3.0|-0.49275|
|  9761|     19|   1.0|-2.49275|
|  9761|     21|   3.0|-0.49275|
|  9761|     31|   4.0| 0.50725|
|  9761|     32|   4.0| 0.50725|
+------+-------+------+--------+
only showing top 7 rows



### Task 2. Demeaning rating within each user group using Group Map Pandas UDF
When define the function, we pass a pandas dataframe as argument and return a pandas dataframe. We also need to pass the `returnType` and `functionType`, which is `PandasUDFType.GROUPED_MAP` in this case, to the function decorator. (In [spark 2.4.0](https://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html#grouped-aggregate), `PandasUDFType.GROUPED_AGG` is also available for aggregation operations.) Then we can use this funnction with `groupBy` and `apply`.


In [19]:
demean_schema = 'userId int, movieId int, rating double, user_demeaned_rating double'

In [20]:
@pandas_udf(demean_schema, PandasUDFType.GROUPED_MAP)
def demean_user(df):  #df is pandas dataframe
    df['user_demeaned_rating'] = df['rating'] - df['rating'].mean()
    return df

In [21]:
df_user_demean = df.groupBy('userId').apply(demean_user)

In [22]:
df_user_demean.show(7)

+------+-------+------+--------------------+
|userId|movieId|rating|user_demeaned_rating|
+------+-------+------+--------------------+
| 14832|      1|   3.0| -0.3076923076923075|
| 14832|      2|   4.0|  0.6923076923076925|
| 14832|      5|   3.0| -0.3076923076923075|
| 14832|     34|   3.0| -0.3076923076923075|
| 14832|     48|   4.0|  0.6923076923076925|
| 14832|     60|   3.0| -0.3076923076923075|
| 14832|    107|   3.0| -0.3076923076923075|
+------+-------+------+--------------------+
only showing top 7 rows



### Task 3. Perform train test split within each user group using Group Map Pandas UDF
In this task, I will pick about half of the users and take 10 movie ratings from each if the user has more than 25 ratings. I will make a new column `istest` to indicate if the rating is picked (1 otherwise 0) and then split the dataset into traing and test sets. Group Map Pandas UDF is really handy in this case

In [23]:
istest_schema = 'userId int, movieId int, rating double, istest int'

In [24]:
TEST_MOVIES=10
#To define pandas_udf to split the data within user
@pandas_udf(istest_schema, PandasUDFType.GROUPED_MAP)
def istest(df):
    '''
    Select around 40% of users in the dataset as test users.
    take n movies from test user that has more than 25 ratings as test dataset.
    return train, test dataset'''   
    is_test_user = np.random.random()  #assign a random number and only pick ratings if it is > 0.5
    new_test = np.zeros(len(df))
    if is_test_user > 0.5 and len(df)> 25:      
        test_ind = np.random.choice(np.arange(len(df)), TEST_MOVIES, replace=False)
        new_test[test_ind]=1
    df['istest'] = new_test
    return df

In [25]:
df_with_ind = df.groupBy('userId').apply(istest).cache()  

Due to the lazy evaluation property of spark, the split result will change everytime we call action operation like `show()` or `count()`. Use `cache()` to storage it in memory for the sake of consistency in this notebook. 

In [26]:
df_with_ind.groupBy('istest').count().show()

+------+-----+
|istest|count|
+------+-----+
|     1|  270|
|     0| 9730|
+------+-----+



In [27]:
# split dataset according to the 'istest' column
train = df_with_ind.filter('istest=0')
test = df_with_ind.filter('istest=1')

In [28]:
train.count(), test.count()

(9730, 270)