## Setting up Spark

In [None]:
import os, sys
os.environ["SPARK_HOME"] = "/usr/hdp/current/spark2.1"
sys.path.insert(0, os.path.join(os.environ["SPARK_HOME"], 'python'))
sys.path.insert(0, os.path.join(os.environ["SPARK_HOME"], 'python/lib/py4j-0.10.4-src.zip'))

In [None]:
import pyspark

In [None]:
sparkConf = pyspark.SparkConf() \
    .set("spark.executor.memory", "2560m")\
    .set("spark.driver.memory", "2560m")\
    .set("spark.yarn.executor.memoryOverhead", 3584)\
    .set("spark.yarn.driver.memoryOverhead", 3584)\
    .set("spark.python.worker.memory", "1536m")\
    .set("spark.executor.instances", 11)\
    .set("spark.default.parallelism", 300)

In [None]:
sc = pyspark.SparkContext(
    master='yarn-client',
    appName='seminar4',
    conf=sparkConf
)
sc

In [None]:
port = sc.uiWebUrl.split(':')[-1]
print 'http://cluster1:{}'.format(port)

## Getting the data

In [None]:
! wget "http://files.grouplens.org/datasets/movielens/ml-latest.zip" -O "/data/movielens.zip"

In [None]:
! unzip /data/movielens.zip -d /data/movielens/

In [None]:
! hdfs dfs -mkdir sem4

In [None]:
import os
import subprocess
for csv in os.listdir('/data/movielens/ml-latest/'):
    subprocess.check_output('hdfs dfs -put /data/movielens/ml-latest/{} sem4/'.format(csv), shell=True)
    print csv, 'done'

## More FUN with DataFrames

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as sf
ss = SparkSession(sc)

Assume we need to build a recommender system for films. First, we will need to generate some features for each movie.

Let's read user ratings. By using some reader parameters, we can infer column names and types from data.

In [None]:
def read(name):
    return ss.read.csv('sem4/{}'.format(name), header=True, inferSchema=True)

In [None]:
ratings = read('ratings.csv')

In [None]:
ratings.printSchema()

We already know how to calculate mean movie rating, let's do that

In [None]:
%%time
movie_mean_rating = ratings.groupby('movieId').agg(sf.mean('rating'))
movie_mean_rating.show(5)

Now let's read movie descriptions.

In [None]:
movie_data = read('movies.csv')

In [None]:
movie_data.show(5)

We have only 3 columns, one with id and two other are not very useful for any model in their current form. Let's build our feature space.

First, we need to add rating data to our dataframe. We can acheive that with join.

In [None]:
movie_data = movie_data.join(movie_mean_rating, on='movieId', how='inner')
movie_data.show(5)

Join have two parameters, besides the second dataframe. 'on' is a column name, instance of Column or a list of any of the above. 'how' is the type of join, more on that can be found [here](https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-joins.html)

### Execution plan and persistance

Now, as you remember, Spark uses lazy evaluation, which means there were no real computations (except for .show) yet. But, you can view future physical plan of evaluation. Sometimes it's useful for debugging. 

In [None]:
movie_data.explain()

Okay, we've got some heavy computations here. Remember, each time you call an action on dataframe, Spark executes the plan from the very beginning. Sometimes it can reuse already computed data from cache, but it's preferable to explicitly state that you want your dataframe persisted. This will guarantee that the first time you call some action on any descendant of this dataframe, Spark will save intermediate results and use them, when you call action next time.

In [None]:
movie_data = movie_data.cache()

.cache() is shortcut for .persist() with Memory storage level. More on that topic [here](https://spark.apache.org/docs/latest/rdd-programming-guide.html#which-storage-level-to-choose)

Another approach is to use .checkpoint(). This will force Spark to write full intermediate results to disk (you need to setup checkpoint dir first). It will actually change physical plan of that dataframe to 'read from disk'

In [None]:
sc.setCheckpointDir('chkp')
movie_data.checkpoint().explain()

### Generating simple features

Ok, now lets turn those two string columns into something useful. As we can see, title column also contains release year. 

If we were using RDD API, we would use .map to do it. In DataFrame API, it can be done via User Defined Functions.

If you call pyspark.sql.functions.udf on your regular function, it will turn it into UDF, and then you can use it in your expressions. Now it will accept columns as arguments and return other column.

Let's extract title without year from title column.

In [None]:
def title(t):
    return '('.join(t.split('(')[:-1])

title_udf = sf.udf(title)
movie_data = movie_data.withColumn('short_title', title_udf('title'))
movie_data.limit(5).toPandas()

Also, udf can be used as a decorator. Now, write an udf that will extract year information. Remember, not every title has year at the end. You can return -1 in that case.

In [None]:
@sf.udf
def get_year(title):
    pass

movie_data = movie_data.withColumn('year', get_year('title').cast('integer'))
movie_data.limit(5).toPandas()

Now, let's do something with that genres column. Let's create a boolean column for each genre, which is true if movie belongs to that genre. First, we need to create a list of all genres.

As you can see, if film has multiple genres, they are divided by '|'. So, we can use sf.split to get an array of genres.

By the way, columns can have complex types, like array, map, vector etc

In [None]:
movie_data.select(sf.split('genres', '\|')).printSchema()

After that we need 'explode' those arrays (which is equivalent to .flatMap in RDD API) and collect unique genres

In [None]:
col = sf.explode(sf.split('genres', '\|'))
genres = movie_data.select(col.alias('genre'))
genres = list(genres.drop_duplicates().toPandas()['genre'])
genres

Now, let's create a column for each genre using .like (regular expression) with genre name

In [None]:
for g in genres:
    movie_data = movie_data.withColumn(g, sf.col('genres').like(g))
movie_data = movie_data.drop('genres')
movie_data.limit(5).toPandas()

## Window aggregations

Sometimes you need to calculate some aggregations, which are dependent on values in your data. For example, for each row difference between some column value and maximum of that column. Normally, first you need to compute that maximum and then use it in expressions, but with window functions that could be avoided. 

At its core, a window function calculates a return value for every input row of a table based on a group of rows, called the Frame. Every input row can have a unique frame associated with it. This characteristic of window functions makes them more powerful than other functions and allows users to express various data processing tasks that are hard (if not impossible) to be expressed without window functions in a concise way.

In [None]:
from pyspark.sql.window import Window

Let's try and calculate for each movie the number of movies of the same genre, that were released in previous two years. This feature can show how popular the genre was at the time of film release.
Assume same genre means 'same set of genres', so we will create a boolean array from each genre column.

In [None]:
all_genres_col = sf.array(map(sf.col, genres)).alias('gs')

Now, we'll define WindowSpecification. It'll tell Spark how to build Frame for each movie. We need that frame to include all films, which has the same value in 'gs' column and year between (current_film_year - 2) and current_film_year.

Then, we select needed columns and call sf.count on movieId, adding .over(w) to use our window specification

In [None]:
w = Window.partitionBy('gs')\
          .orderBy('year')\
          .rangeBetween(-2, 0)
movie_data.select('movieId', 'title', 'year', all_genres_col)\
    .withColumn('release_before_count', sf.count('movieId').over(w) - 1)\
    .filter(sf.col('year') != -1).orderBy('year').show()

As an exercise, for each film calculate difference between it's rating and maximum rating of films with same set of genres and that were released between 5 years before and 5 years after film release year.

In [None]:
ws = ...
rd_col = ...
movie_data.select('movieId', 'title', 'year', 'avg(rating)', all_genres_col)\
    .withColumn('rating_diff', rd_col)\
    .show()

### Broadcasts and accumulators

Here is a little example on how to use broadcasts and accumulators.

Broadcast are useful when you have some heavy stateless object, that you need in your compulatons, or you don't want to recreate something for each partition (like compiled regexp). You just need call sc.broadcast on your object, and then call .value inside worker code to access it.

Accumulators are useful when you need to get some statistics, while you compute something else. Remember, accumulators will not be filled until you call some action on your dataframe

In [None]:
%%time
import re
devil_re = sc.broadcast(re.compile('devil', re.IGNORECASE))

acc = sc.accumulator(0)

@sf.udf 
def short_title_with_acc_bc(x):
    if devil_re.value.search(x):
        acc.add(1)
    return title(x)


md = movie_data.withColumn('title2', short_title_with_acc_bc('title'))
print acc.value
md.collect()
print acc.value