## Large Scale Machine Learning Made Easy using Azure Databricks with Snowflake
Hello! This is a demonstration of how you can use Azure Databricks to build a recommendation algorithm from your Snowflake database tables. We use the publicly available Netflix movies data, a record of 100 million movie ratings by Netflix users. This is publicly available data from the [Kaggle website](https://www.kaggle.com/netflix-inc/netflix-prize-data). 

First we explore the dataset, showing the capability to use open source python packages ggplot and seaborn within the notebook. Then the PySpark package pyspark.ml is used to train the recommendation algorithm and is applied to the whole dataset. This will provide 3 movie recommendations for each user based on their ratings so far. Then we will export these recommendations back to the Snowflake database.

*NB: cluster specs: Azure Standard_DS3_v2 with 2 worker nodes and autoscaling enabled. Total run time for different sample sizes: 5mins for 1m, 9mins for 10m, 20min for 100m (full dataset)*

#### Extract the data from the Snowflake database using the spark-snowflake connector package. 
*NB: you will need to attach the snowflake-spark connector package to your cluster before you are able to execute read/write commands with the Snowflake database.*

In [3]:
sfOptions = {
  "sfURL" : "your.sf.portal.url.com",
  "sfAccount" : "yourAccount",
  "sfUser" : "USER",
  "sfPassword" : "PWD",
  "sfDatabase" : "YOU_DB",
  "sfSchema" : "PUBLIC",
  "sfWarehouse" : "YOUR_WH",
}
SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

#full netflix ratings dataset
df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
  .options(**sfOptions) \
  .option("query",  "select * from netflix_ratings_wide") \
  .load()

#movie title reference table
movie_titles = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
  .options(**sfOptions) \
  .option("query",  "select MOVIE_ID, MOVIE_TITLE from netflix_movie_titles") \
  .load()

In [4]:
display(df)

In [5]:
df_num_rows = df.count()
print('Number of datapoints: ', df_num_rows)

## Data exploration with ggplot and seaborn
In any machine experiment an essential part is gaining an understanding of the data, usually by producing data analysis and visualisations. This can help find any patterns or artefacts of the data.
Using display(df) and clicking the bar chart button beneath the cell is an easy way to view plots of a sample of your data. For anything beyond that, it is best to use python pacakges like ggplot and seaborn to create nicer, static visualisations of data.

In [7]:
from pyspark.sql.functions import * 
from pyspark.sql.types import *
movie_rating_stats = df.groupby('MOVIE_TITLE').agg(mean("RATING"), stddev("RATING")).withColumnRenamed('avg(RATING)', "Mean Rating").withColumnRenamed('stddev_samp(RATING)', 'Std Rating')
movie_rating_stats = movie_rating_stats.withColumn("Mean Rating",movie_rating_stats["Mean Rating"].cast(FloatType()))

#### Distribution of average movie ratings

In [9]:
from ggplot import *
import seaborn as sns
m_ratings_plot = ggplot(movie_rating_stats.toPandas(), aes('Mean Rating')) + geom_bar() + ylab('Frequency') +ggtitle('Distribution of Average Movie Ratings') + xlim(1,5)
display(m_ratings_plot)

#### A few movies' ratings distributions: boxplots showing mean and inter-quartile range

In [11]:
movie_sample_size = 6/df_num_rows
sample_movies = list(df.sample(False, movie_sample_size, 4).toPandas()['MOVIE_TITLE'].unique())

In [12]:
import seaborn as sns
sns.set(font_scale = 0.7)
import matplotlib.pyplot as plt
fig, ax = plt.subplots()
ax = sns.boxplot(x="MOVIE_TITLE", y="RATING", data = df.filter(df.MOVIE_TITLE.isin(sample_movies)).withColumn("RATING",df["RATING"].cast(FloatType())).toPandas())
ax.set(xlabel = '')
plt.xticks(rotation=10)
display(fig)

##### Movie ratings by year of creation

In [14]:
yearly_ratings = df.groupby('YEAR').mean().select(['YEAR', 'avg(RATING)']).withColumnRenamed('avg(RATING)', 'Mean Rating')

In [15]:
m_ratings_dist = ggplot(yearly_ratings.withColumn("Mean Rating",yearly_ratings["Mean Rating"].cast(FloatType())).toPandas(), aes(x = 'YEAR', y = 'Mean Rating')) + geom_smooth(level = 0.9) + xlim(1890, 2010) + ylim(2,4) + xlab('Year') + ylab('Average Movie Rating')
display(m_ratings_dist)

##### User ratings distribution sample

In [17]:
import pandas as pd
user_sample_size = 4/df_num_rows
sample_users = list(df.sample(False, user_sample_size, 4).select('USER_ID').toPandas()['USER_ID'].unique())

In [18]:
fig, ax = plt.subplots()
ax = sns.violinplot(x="USER_ID", y="RATING", data = df.filter(df.USER_ID.isin(sample_users)).withColumn("RATING",df["RATING"].cast(FloatType())).toPandas(), inner = None)
ax = sns.stripplot(x="USER_ID", y="RATING", data = df.filter(df.USER_ID.isin(sample_users)).withColumn("RATING",df["RATING"].cast(FloatType())).toPandas(), jitter = True, color = ".3")
plt.xticks(rotation=10)
display(fig)

You could take this exploration much further - examining correlations, grouping users, etc. For succinctness we will move on to the model building now.
### The model 
We build a model which recommends three movies to users based on the users' historical ratings.

The training of the model only takes two lines of code. The first initiates the parameterisation of the model, and the second fits it. The model is a Spark optimised version of the Alternating Least Squares (ALS) algorithm. This uses clever matrix factorization techniques to build recommendations. We specify the 'userCol', 'itemCol' and 'ratingCol' to let the underlying functions fit to our purpose. The 'coldStartStrategy' argument specifies how we handle unseen users/movies when it comes to recommendations. For simplicity, these cases are dropped from model training. An additional method is used on the fitted model to create the 'recommendations' DataFrame which gives us 3 movie recommendations for each user.

In [20]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
als = ALS(maxIter=10, regParam=0.1, userCol = 'USER_ID', itemCol='MOVIE_ID', ratingCol='RATING', coldStartStrategy = "drop")
model = als.fit(df)
#Create recommendations from the model
recommendations = model.recommendForAllUsers(3)

##### Reshape the data into something to export back to Snowflake

In [22]:
flat_recs = recommendations.select("USER_ID", recommendations.recommendations[0]['MOVIE_ID'].alias('REC1_MOVIE_ID'), recommendations.recommendations[1]['MOVIE_ID'].alias('REC2_MOVIE_ID'), recommendations.recommendations[2]['MOVIE_ID'].alias('REC3_MOVIE_ID'))
final_recs = flat_recs.join(movie_titles, (movie_titles.MOVIE_ID == flat_recs.REC1_MOVIE_ID), how = 'inner').drop('MOVIE_ID').withColumnRenamed('MOVIE_TITLE', 'REC1_MOVIE_TITLE').join(movie_titles, (movie_titles.MOVIE_ID == flat_recs.REC2_MOVIE_ID)).drop('MOVIE_ID').withColumnRenamed('MOVIE_TITLE', 'REC2_MOVIE_TITLE').join(movie_titles, (movie_titles.MOVIE_ID == flat_recs.REC3_MOVIE_ID)).drop('MOVIE_ID').withColumnRenamed('MOVIE_TITLE', 'REC3_MOVIE_TITLE')

#### The recommendations

Finally - explore the recommendations you've built. You may recognise some movies - and see some similar movies being recommended to a user. This means it's doing what we expect it to do. Another sanity check would be to take a sample of a few users' highest rated movies - and see if they seem similar to the ones being recommended to them. We'll leave that for today and show you how to load the results table into Snowflake.

In [24]:
display(final_recs)

#### Load to Snowflake as a new table in your database

In [26]:
final_recs.write.format("net.snowflake.spark.snowflake").options(**sfOptions).option('dbtable', 'NETFLIX_RECOMMENDATIONS').mode('overwrite').saveAsTable('NETFLIX_RECOMMENDATIONS')

#### Conclusion

With Snowflake as the data store and Azure Databricks as development environment powered by a Spark cluster, there is no limit for your machine learning experiments. Load and explore large datasets in the same way you would small. Use the open source machine learning libraries available in PySpark to train optimised models to your use case in only a few lines of code. In this example we recommended Netflix movies - but these engines have huge potential in other use cases - marketing campaigns or product recommendations spring to mind. Grab your Snowflake data and get exploring!